Skip to content

Commit 29769de

Browse files
Copy to/from stdout/stdin with (format parquet) (#121)
We might use temp files as intermediate step. For COPY TO stdout, table => temp file => stdout. For COPY FROM stdin, stdin => file => table. There will be intermediate file IO overhead but this is the simplest and decent solution for now (considering most of the time will be lost during row to columnar conversions). Closes #69.
1 parent 0de2de1 commit 29769de

File tree

21 files changed

+761
-111
lines changed

21 files changed

+761
-111
lines changed

.devcontainer/.env

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,7 @@ GOOGLE_SERVICE_ACCOUNT_KEY='{"gcs_base_url": "http://localhost:4443","disable_oa
2828
GOOGLE_SERVICE_ENDPOINT=http://localhost:4443
2929

3030
# Others
31+
# run pgrx tests with a single thread to avoid race conditions
3132
RUST_TEST_THREADS=1
33+
# pgrx runs test on the port base_port + pg_version
34+
PGRX_TEST_PG_BASE_PORT=5454

.github/workflows/ci.yml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ jobs:
8484
postgresql-server-dev-${{ env.PG_MAJOR }} \
8585
postgresql-client-${{ env.PG_MAJOR }} \
8686
libpq-dev
87+
echo "export PG_MAJOR=${{ env.PG_MAJOR }}" >> $GITHUB_ENV
8788
8889
- name: Install azure-cli
8990
run: |
@@ -94,7 +95,8 @@ jobs:
9495
- name: Install and configure pgrx
9596
run: |
9697
cargo install --locked [email protected]
97-
cargo pgrx init --pg${{ env.PG_MAJOR }} /usr/lib/postgresql/${{ env.PG_MAJOR }}/bin/pg_config
98+
cargo pgrx init --pg${{ env.PG_MAJOR }} /usr/lib/postgresql/${{ env.PG_MAJOR }}/bin/pg_config \
99+
--base-testing-port $PGRX_TEST_PG_BASE_PORT
98100
99101
- name: Install cargo-llvm-cov for coverage report
100102
run: cargo install --locked [email protected]

README.md

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
[![CI lints and tests](https://github.com/CrunchyData/pg_parquet/actions/workflows/ci.yml/badge.svg)](https://github.com/CrunchyData/pg_parquet/actions/workflows/ci.yml)
66
[![codecov](https://codecov.io/gh/CrunchyData/pg_parquet/graph/badge.svg?token=6BPS0DSKJ2)](https://codecov.io/gh/CrunchyData/pg_parquet)
77

8-
`pg_parquet` is a PostgreSQL extension that allows you to read and write [Parquet files](https://parquet.apache.org), which are located in `S3` or `file system`, from PostgreSQL via `COPY TO/FROM` commands. It depends on [Apache Arrow](https://arrow.apache.org/rust/arrow/) project to read and write Parquet files and [pgrx](https://github.com/pgcentralfoundation/pgrx) project to extend PostgreSQL's `COPY` command.
8+
`pg_parquet` is a PostgreSQL extension that allows you to read and write [Parquet files](https://parquet.apache.org), which are located in `S3`, `Azure Blob Storage`, `Google Cloud Storage`, `http(s) endpoints` or `file system`, from PostgreSQL via `COPY TO/FROM` commands. It depends on [Apache Arrow](https://arrow.apache.org/rust/arrow/) project to read and write Parquet files and [pgrx](https://github.com/pgcentralfoundation/pgrx) project to extend PostgreSQL's `COPY` command.
99

1010
```sql
1111
-- Copy a query result into Parquet in S3
@@ -61,7 +61,7 @@ There are mainly 3 things that you can do with `pg_parquet`:
6161
3. You can inspect the schema and metadata of Parquet files.
6262

6363
### COPY to/from Parquet files from/to Postgres tables
64-
You can use PostgreSQL's `COPY` command to read and write Parquet files. Below is an example of how to write a PostgreSQL table, with complex types, into a Parquet file and then to read the Parquet file content back into the same table.
64+
You can use PostgreSQL's `COPY` command to read and write from/to Parquet files. Below is an example of how to write a PostgreSQL table, with complex types, into a Parquet file and then to read the Parquet file content back into the same table.
6565

6666
```sql
6767
-- create composite types
@@ -99,6 +99,16 @@ COPY product_example FROM '/tmp/product_example.parquet';
9999
SELECT * FROM product_example;
100100
```
101101

102+
You can also use `COPY` command to read and write Parquet stream from/to standard input and output. Below is an example usage (you have to specify `format = parquet`):
103+
104+
```bash
105+
psql -d pg_parquet -p 28817 -h localhost -c "create table product_example_reconstructed (like product_example);"
106+
CREATE TABLE
107+
108+
psql -d pg_parquet -p 28817 -h localhost -c "copy product_example to stdout (format parquet);" | psql -d pg_parquet -p 28817 -h localhost -c "copy product_example_reconstructed from stdin (format parquet);"
109+
COPY 2
110+
```
111+
102112
### Inspect Parquet schema
103113
You can call `SELECT * FROM parquet.schema(<uri>)` to discover the schema of the Parquet file at given uri.
104114

src/arrow_parquet/parquet_reader.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,11 @@ pub(crate) struct ParquetReaderContext {
5050
}
5151

5252
impl ParquetReaderContext {
53-
pub(crate) fn new(uri_info: ParsedUriInfo, match_by: MatchBy, tupledesc: &PgTupleDesc) -> Self {
53+
pub(crate) fn new(
54+
uri_info: &ParsedUriInfo,
55+
match_by: MatchBy,
56+
tupledesc: &PgTupleDesc,
57+
) -> Self {
5458
// Postgis and Map contexts are used throughout reading the parquet file.
5559
// We need to reset them to avoid reading the stale data. (e.g. extension could be dropped)
5660
reset_postgis_context();

src/arrow_parquet/parquet_writer.rs

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ impl ParquetWriterContext {
7171

7272
let writer_props = Self::writer_props(tupledesc, options);
7373

74-
let parquet_writer = parquet_writer_from_uri(uri_info, schema.clone(), writer_props);
74+
let parquet_writer = parquet_writer_from_uri(&uri_info, schema.clone(), writer_props);
7575

7676
let attribute_contexts =
7777
collect_pg_to_arrow_attribute_contexts(&attributes, &schema.fields);
@@ -130,7 +130,7 @@ impl ParquetWriterContext {
130130
}
131131

132132
// finalize flushes the in progress rows to a new row group and finally writes metadata to the file.
133-
fn finalize(&mut self) {
133+
pub(crate) fn finalize(&mut self) {
134134
PG_BACKEND_TOKIO_RUNTIME
135135
.block_on(self.parquet_writer.finish())
136136
.unwrap_or_else(|e| panic!("failed to finish parquet writer: {}", e));
@@ -156,9 +156,3 @@ impl ParquetWriterContext {
156156
RecordBatch::try_new(schema, attribute_arrays).expect("Expected record batch")
157157
}
158158
}
159-
160-
impl Drop for ParquetWriterContext {
161-
fn drop(&mut self) {
162-
self.finalize();
163-
}
164-
}

src/arrow_parquet/uri_utils.rs

Lines changed: 63 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{panic, sync::Arc};
1+
use std::{ffi::CStr, panic, sync::Arc};
22

33
use arrow::datatypes::SchemaRef;
44
use object_store::{path::Path, ObjectStoreScheme};
@@ -13,7 +13,11 @@ use parquet::{
1313
};
1414
use pgrx::{
1515
ereport,
16-
pg_sys::{get_role_oid, has_privs_of_role, superuser, AsPgCStr, GetUserId},
16+
ffi::c_char,
17+
pg_sys::{
18+
get_role_oid, has_privs_of_role, palloc0, superuser, AsPgCStr, DataDir, FileClose,
19+
FilePathName, GetUserId, InvalidOid, OpenTemporaryFile, TempTablespacePath, MAXPGPATH,
20+
},
1721
};
1822
use url::Url;
1923

@@ -29,15 +33,48 @@ const PARQUET_OBJECT_STORE_READ_ROLE: &str = "parquet_object_store_read";
2933
const PARQUET_OBJECT_STORE_WRITE_ROLE: &str = "parquet_object_store_write";
3034

3135
// ParsedUriInfo is a struct that holds the parsed uri information.
32-
#[derive(Debug, Clone)]
36+
#[derive(Debug)]
3337
pub(crate) struct ParsedUriInfo {
3438
pub(crate) uri: Url,
3539
pub(crate) bucket: Option<String>,
3640
pub(crate) path: Path,
3741
pub(crate) scheme: ObjectStoreScheme,
42+
pub(crate) stdio_tmp_fd: Option<i32>,
3843
}
3944

4045
impl ParsedUriInfo {
46+
pub(crate) fn for_std_inout() -> Self {
47+
// open temp postgres file, which is removed after transaction ends
48+
let tmp_path_fd = unsafe { OpenTemporaryFile(false) };
49+
50+
let tmp_path = unsafe {
51+
let data_dir = CStr::from_ptr(DataDir).to_str().expect("invalid base dir");
52+
53+
let tmp_tblspace_path: *const c_char = palloc0(MAXPGPATH as _) as _;
54+
TempTablespacePath(tmp_tblspace_path as _, InvalidOid);
55+
let tmp_tblspace_path = CStr::from_ptr(tmp_tblspace_path)
56+
.to_str()
57+
.expect("invalid temp tablespace path");
58+
59+
let tmp_file_path = FilePathName(tmp_path_fd);
60+
let tmp_file_path = CStr::from_ptr(tmp_file_path)
61+
.to_str()
62+
.expect("invalid temp path");
63+
64+
let tmp_path = std::path::Path::new(data_dir)
65+
.join(tmp_tblspace_path)
66+
.join(tmp_file_path);
67+
68+
tmp_path.to_str().expect("invalid tmp path").to_string()
69+
};
70+
71+
let mut parsed_uri = Self::try_from(tmp_path.as_str()).unwrap_or_else(|e| panic!("{}", e));
72+
73+
parsed_uri.stdio_tmp_fd = Some(tmp_path_fd);
74+
75+
parsed_uri
76+
}
77+
4178
fn try_parse_uri(uri: &str) -> Result<Url, String> {
4279
if !uri.contains("://") {
4380
// local file
@@ -92,10 +129,20 @@ impl TryFrom<&str> for ParsedUriInfo {
92129
bucket,
93130
path,
94131
scheme,
132+
stdio_tmp_fd: None,
95133
})
96134
}
97135
}
98136

137+
impl Drop for ParsedUriInfo {
138+
fn drop(&mut self) {
139+
if let Some(stdio_tmp_fd) = self.stdio_tmp_fd {
140+
// close temp file, postgres api will remove it on close
141+
unsafe { FileClose(stdio_tmp_fd) };
142+
}
143+
}
144+
}
145+
99146
pub(crate) fn uri_as_string(uri: &Url) -> String {
100147
if uri.scheme() == "file" {
101148
// removes file:// prefix from the local path uri
@@ -109,7 +156,7 @@ pub(crate) fn uri_as_string(uri: &Url) -> String {
109156
uri.to_string()
110157
}
111158

112-
pub(crate) fn parquet_schema_from_uri(uri_info: ParsedUriInfo) -> SchemaDescriptor {
159+
pub(crate) fn parquet_schema_from_uri(uri_info: &ParsedUriInfo) -> SchemaDescriptor {
113160
let parquet_reader = parquet_reader_from_uri(uri_info);
114161

115162
let arrow_schema = parquet_reader.schema();
@@ -119,19 +166,18 @@ pub(crate) fn parquet_schema_from_uri(uri_info: ParsedUriInfo) -> SchemaDescript
119166
.unwrap_or_else(|e| panic!("{}", e))
120167
}
121168

122-
pub(crate) fn parquet_metadata_from_uri(uri_info: ParsedUriInfo) -> Arc<ParquetMetaData> {
169+
pub(crate) fn parquet_metadata_from_uri(uri_info: &ParsedUriInfo) -> Arc<ParquetMetaData> {
170+
let uri = uri_info.uri.clone();
171+
123172
let copy_from = true;
124-
let (parquet_object_store, location) = get_or_create_object_store(uri_info.clone(), copy_from);
173+
let (parquet_object_store, location) = get_or_create_object_store(uri_info, copy_from);
125174

126175
PG_BACKEND_TOKIO_RUNTIME.block_on(async {
127176
let object_store_meta = parquet_object_store
128177
.head(&location)
129178
.await
130179
.unwrap_or_else(|e| {
131-
panic!(
132-
"failed to get object store metadata for uri {}: {}",
133-
uri_info.uri, e
134-
)
180+
panic!("failed to get object store metadata for uri {}: {}", uri, e)
135181
});
136182

137183
let parquet_object_reader =
@@ -149,20 +195,19 @@ pub(crate) fn parquet_metadata_from_uri(uri_info: ParsedUriInfo) -> Arc<ParquetM
149195
pub(crate) const RECORD_BATCH_SIZE: i64 = 1024;
150196

151197
pub(crate) fn parquet_reader_from_uri(
152-
uri_info: ParsedUriInfo,
198+
uri_info: &ParsedUriInfo,
153199
) -> ParquetRecordBatchStream<ParquetObjectReader> {
200+
let uri = uri_info.uri.clone();
201+
154202
let copy_from = true;
155-
let (parquet_object_store, location) = get_or_create_object_store(uri_info.clone(), copy_from);
203+
let (parquet_object_store, location) = get_or_create_object_store(uri_info, copy_from);
156204

157205
PG_BACKEND_TOKIO_RUNTIME.block_on(async {
158206
let object_store_meta = parquet_object_store
159207
.head(&location)
160208
.await
161209
.unwrap_or_else(|e| {
162-
panic!(
163-
"failed to get object store metadata for uri {}: {}",
164-
uri_info.uri, e
165-
)
210+
panic!("failed to get object store metadata for uri {}: {}", uri, e)
166211
});
167212

168213
let parquet_object_reader =
@@ -205,12 +250,12 @@ fn calculate_reader_batch_size(metadata: &Arc<ParquetMetaData>) -> usize {
205250
}
206251

207252
pub(crate) fn parquet_writer_from_uri(
208-
uri_info: ParsedUriInfo,
253+
uri_info: &ParsedUriInfo,
209254
arrow_schema: SchemaRef,
210255
writer_props: WriterProperties,
211256
) -> AsyncArrowWriter<ParquetObjectWriter> {
212257
let copy_from = false;
213-
let (parquet_object_store, location) = get_or_create_object_store(uri_info.clone(), copy_from);
258+
let (parquet_object_store, location) = get_or_create_object_store(uri_info, copy_from);
214259

215260
let parquet_object_writer = ParquetObjectWriter::new(parquet_object_store, location);
216261

src/object_store/object_store_cache.rs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use super::{
2323
static mut OBJECT_STORE_CACHE: Lazy<ObjectStoreCache> = Lazy::new(ObjectStoreCache::new);
2424

2525
pub(crate) fn get_or_create_object_store(
26-
uri_info: ParsedUriInfo,
26+
uri_info: &ParsedUriInfo,
2727
copy_from: bool,
2828
) -> (Arc<dyn ObjectStore>, Path) {
2929
#[allow(static_mut_refs)]
@@ -45,15 +45,13 @@ impl ObjectStoreCache {
4545

4646
fn get_or_create(
4747
&mut self,
48-
uri_info: ParsedUriInfo,
48+
uri_info: &ParsedUriInfo,
4949
copy_from: bool,
5050
) -> (Arc<dyn ObjectStore>, Path) {
51-
let ParsedUriInfo {
52-
uri,
53-
path,
54-
scheme,
55-
bucket,
56-
} = uri_info;
51+
let uri = uri_info.uri.clone();
52+
let scheme = uri_info.scheme.clone();
53+
let bucket = uri_info.bucket.clone();
54+
let path = uri_info.path.clone();
5755

5856
// no need to cache local files
5957
if scheme == ObjectStoreScheme::Local {

src/parquet_copy_hook.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
pub(crate) mod copy_from;
2+
pub(crate) mod copy_from_stdin;
23
pub(crate) mod copy_to;
34
pub(crate) mod copy_to_dest_receiver;
45
pub(crate) mod copy_to_split_dest_receiver;
6+
pub(crate) mod copy_to_stdout;
57
pub(crate) mod copy_utils;
68
pub(crate) mod hook;
79
pub(crate) mod pg_compat;

src/parquet_copy_hook/copy_from.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,12 @@ use crate::{
1818
},
1919
};
2020

21-
use super::copy_utils::{
22-
copy_from_stmt_match_by, copy_stmt_attribute_list, copy_stmt_create_namespace_item,
23-
copy_stmt_create_parse_state, create_filtered_tupledesc_for_relation,
21+
use super::{
22+
copy_from_stdin::copy_stdin_to_file,
23+
copy_utils::{
24+
copy_from_stmt_match_by, copy_stmt_attribute_list, copy_stmt_create_namespace_item,
25+
copy_stmt_create_parse_state, create_filtered_tupledesc_for_relation,
26+
},
2427
};
2528

2629
// stack to store parquet reader contexts for COPY FROM.
@@ -113,7 +116,7 @@ pub(crate) fn execute_copy_from(
113116
p_stmt: &PgBox<PlannedStmt>,
114117
query_string: &CStr,
115118
query_env: &PgBox<QueryEnvironment>,
116-
uri_info: ParsedUriInfo,
119+
uri_info: &ParsedUriInfo,
117120
) -> u64 {
118121
let rel_oid = copy_stmt_relation_oid(p_stmt);
119122

@@ -140,6 +143,11 @@ pub(crate) fn execute_copy_from(
140143
let match_by = copy_from_stmt_match_by(p_stmt);
141144

142145
unsafe {
146+
if uri_info.stdio_tmp_fd.is_some() {
147+
let is_binary = true;
148+
copy_stdin_to_file(uri_info, tupledesc.natts as _, is_binary);
149+
}
150+
143151
// parquet reader context is used throughout the COPY FROM operation.
144152
let parquet_reader_context = ParquetReaderContext::new(uri_info, match_by, &tupledesc);
145153
push_parquet_reader_context(parquet_reader_context);

0 commit comments

Comments
 (0)