Skip to content

Commit 7060d99

Browse files
Support "COPY FROM/TO PROGRAM '...' WITH (format parquet)" (#146)
Now it is possible to use programs with `COPY table TO/FROM PROGRAM '...' WITH (format parquet)` syntax. e.g. ```sql pg_parquet=# CREATE TABLE test_table(a int); CREATE TABLE pg_parquet=# INSERT INTO test_table SELECT i FROM generate_series(1,10) i; INSERT 0 10 pg_parquet=# COPY test_table TO PROGRAM 'cat > /tmp/test.parquet' WITH (format parquet); COPY 10 pg_parquet=# COPY test_table FROM PROGRAM 'cat /tmp/test.parquet' WITH (format parquet); COPY 10 ``` Similar to how we support `COPY TO/FROM stdin/stdout WITH(format parquet)`, we use a temp file as intermediate file and finally we copy from/to program's stdout/stdin by piping the temp file. Closes #147.
1 parent 0d80fcd commit 7060d99

File tree

14 files changed

+513
-40
lines changed

14 files changed

+513
-40
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ aws-credential-types = {version = "1", default-features = false}
2929
azure_storage = {version = "0.21", default-features = false}
3030
futures = "0.3"
3131
home = "0.5"
32+
libc = {version = "0.2", default-features = false }
3233
object_store = {version = "=0.12.2", default-features = false, features = ["aws", "azure", "fs", "gcp", "http"]}
3334
once_cell = "1"
3435
parquet = {version = "56", default-features = false, features = [

README.md

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@ COPY table FROM 's3://mybucket/data.parquet' WITH (format 'parquet');
1818
## Quick Reference
1919
- [Installation From Source](#installation-from-source)
2020
- [Usage](#usage)
21-
- [Copy FROM/TO Parquet files TO/FROM Postgres tables](#copy-tofrom-parquet-files-fromto-postgres-tables)
21+
- [Copy FROM/TO Parquet files TO/FROM Postgres tables](#copy-fromto-parquet-files-tofrom-postgres-tables)
22+
- [COPY FROM/TO Parquet stdin/stdout TO/FROM Postgres tables)](#copy-fromto-parquet-stdinstdout-tofrom-postgres-tables)
23+
- [COPY FROM/TO Parquet program stream TO/FROM Postgres tables)](#copy-fromto-parquet-program-stream-tofrom-postgres-tables)
2224
- [Inspect Parquet schema](#inspect-parquet-schema)
2325
- [Inspect Parquet metadata](#inspect-parquet-metadata)
2426
- [Inspect Parquet column statistics](#inspect-parquet-column-statistics)
@@ -64,11 +66,11 @@ psql> "CREATE EXTENSION pg_parquet;"
6466

6567
## Usage
6668
There are mainly 3 things that you can do with `pg_parquet`:
67-
1. You can export Postgres tables/queries to Parquet files,
69+
1. You can export Postgres tables/queries to Parquet files, stdin/stdout or a program's stream,
6870
2. You can ingest data from Parquet files to Postgres tables,
6971
3. You can inspect the schema and metadata of Parquet files.
7072

71-
### COPY to/from Parquet files from/to Postgres tables
73+
### COPY from/to Parquet files to/from Postgres tables
7274
You can use PostgreSQL's `COPY` command to read and write from/to Parquet files. Below is an example of how to write a PostgreSQL table, with complex types, into a Parquet file and then to read the Parquet file content back into the same table.
7375

7476
```sql
@@ -107,7 +109,9 @@ COPY product_example FROM '/tmp/product_example.parquet';
107109
SELECT * FROM product_example;
108110
```
109111

110-
You can also use `COPY` command to read and write Parquet stream from/to standard input and output. Below is an example usage (you have to specify `format = parquet`):
112+
### COPY from/to Parquet stdin/stdout to/from Postgres tables
113+
114+
You can use `COPY` command to read and write Parquet stream from/to standard input and output. Below is an example usage (you have to specify `format = parquet`):
111115

112116
```bash
113117
psql -d pg_parquet -p 28817 -h localhost -c "create table product_example_reconstructed (like product_example);"
@@ -117,6 +121,19 @@ psql -d pg_parquet -p 28817 -h localhost -c "copy product_example to stdout (for
117121
COPY 2
118122
```
119123

124+
### COPY from/to Parquet program stream to/from Postgres tables
125+
126+
You can use `COPY` command to read and write Parquet stream from/to a program's input and output. Below is an example usage (you have to specify `format = parquet`):
127+
128+
```bash
129+
psql -d pg_parquet -p 28817 -h localhost -c "copy product_example_reconstructed to program 'cat > /tmp/test.parquet' (format parquet);"
130+
COPY 2
131+
132+
psql -d pg_parquet -p 28817 -h localhost -c "copy product_example_reconstructed from program 'cat /tmp/test.parquet' (format parquet);"
133+
COPY 2
134+
```
135+
136+
120137
### Inspect Parquet schema
121138
You can call `SELECT * FROM parquet.schema(<uri>)` to discover the schema of the Parquet file at given uri.
122139

src/arrow_parquet/uri_utils.rs

Lines changed: 72 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{ffi::CStr, panic, sync::Arc};
1+
use std::{ffi::CStr, fs::File, os::fd::FromRawFd, panic, sync::Arc};
22

33
use arrow::datatypes::SchemaRef;
44
use object_store::{path::Path, ObjectStoreScheme};
@@ -15,8 +15,9 @@ use pgrx::{
1515
ereport,
1616
ffi::c_char,
1717
pg_sys::{
18-
get_role_oid, has_privs_of_role, palloc0, superuser, AsPgCStr, DataDir, FileClose,
19-
FilePathName, GetUserId, InvalidOid, OpenTemporaryFile, TempTablespacePath, MAXPGPATH,
18+
get_role_oid, has_privs_of_role, palloc0, superuser, AsPgCStr, ClosePipeStream, DataDir,
19+
FileClose, FilePathName, GetUserId, InvalidOid, OpenPipeStream, OpenTemporaryFile,
20+
TempTablespacePath, MAXPGPATH, PG_BINARY_R, PG_BINARY_W,
2021
},
2122
};
2223
use url::Url;
@@ -39,11 +40,46 @@ pub(crate) struct ParsedUriInfo {
3940
pub(crate) bucket: Option<String>,
4041
pub(crate) path: Path,
4142
pub(crate) scheme: ObjectStoreScheme,
42-
pub(crate) stdio_tmp_fd: Option<i32>,
43+
// tmp_fd is used as intermediate file for copying data to/from stdin/out or program pipes
44+
pub(crate) tmp_fd: Option<i32>,
45+
// pipe_file is used to hold the pipe file descriptor for copying data to/from a program
46+
// call open_program_pipe to open the pipe to a program
47+
pub(crate) is_program: bool,
48+
pub(crate) pipe_file: *mut libc::FILE,
4349
}
4450

4551
impl ParsedUriInfo {
4652
pub(crate) fn for_std_inout() -> Self {
53+
Self::with_tmp_file()
54+
}
55+
56+
pub(crate) fn for_program() -> Self {
57+
let mut uri_info = Self::with_tmp_file();
58+
uri_info.is_program = true;
59+
uri_info
60+
}
61+
62+
pub(crate) fn open_program_pipe(&mut self, program: &str, copy_from: bool) -> File {
63+
let pipe_mode = if copy_from { PG_BINARY_R } else { PG_BINARY_W };
64+
65+
let pipe_file = unsafe { OpenPipeStream(program.as_pg_cstr(), pipe_mode.as_ptr()) };
66+
67+
if pipe_file.is_null() {
68+
panic!("Failed to open pipe stream for program: {}", program);
69+
}
70+
71+
self.pipe_file = pipe_file as _;
72+
73+
let fd = unsafe { libc::fileno(self.pipe_file) };
74+
75+
if fd < 0 {
76+
panic!("Failed to get file descriptor for pipe stream: {}", program);
77+
}
78+
79+
unsafe { File::from_raw_fd(fd) }
80+
}
81+
82+
fn with_tmp_file() -> Self {
4783
// open temp postgres file, which is removed after transaction ends
4884
let tmp_path_fd = unsafe { OpenTemporaryFile(false) };
4985

@@ -70,11 +106,15 @@ impl ParsedUriInfo {
70106

71107
let mut parsed_uri = Self::try_from(tmp_path.as_str()).unwrap_or_else(|e| panic!("{}", e));
72108

73-
parsed_uri.stdio_tmp_fd = Some(tmp_path_fd);
109+
parsed_uri.tmp_fd = Some(tmp_path_fd);
74110

75111
parsed_uri
76112
}
77113

114+
fn is_std_inout(&self) -> bool {
115+
self.tmp_fd.is_some() && !self.is_program
116+
}
117+
78118
fn try_parse_uri(uri: &str) -> Result<Url, String> {
79119
if !uri.contains("://") {
80120
// local file
@@ -128,16 +168,23 @@ impl TryFrom<&str> for ParsedUriInfo {
128168
bucket,
129169
path,
130170
scheme,
131-
stdio_tmp_fd: None,
171+
tmp_fd: None,
172+
is_program: false,
173+
pipe_file: std::ptr::null_mut(),
132174
})
133175
}
134176
}
135177

136178
impl Drop for ParsedUriInfo {
137179
fn drop(&mut self) {
138-
if let Some(stdio_tmp_fd) = self.stdio_tmp_fd {
180+
if let Some(tmp_fd) = self.tmp_fd {
139181
// close temp file, postgres api will remove it on close
140-
unsafe { FileClose(stdio_tmp_fd) };
182+
unsafe { FileClose(tmp_fd) };
183+
}
184+
185+
if !self.pipe_file.is_null() {
186+
// close pipe file, postgres api will remove it on close
187+
unsafe { ClosePipeStream(self.pipe_file as _) };
141188
}
142189
}
143190
}
@@ -275,30 +322,41 @@ pub(crate) fn ensure_access_privilege_to_uri(uri_info: &ParsedUriInfo, copy_from
275322
}
276323

277324
// permission check is not needed for stdin/out
278-
if uri_info.stdio_tmp_fd.is_some() {
325+
if uri_info.is_std_inout() {
279326
return;
280327
}
281328

282329
let user_id = unsafe { GetUserId() };
283330
let is_file = uri_info.uri.scheme() == "file";
284331

285-
let required_role_name = if is_file {
332+
let required_role_name = if uri_info.is_program {
333+
"pg_execute_server_program"
334+
} else if is_file {
286335
if copy_from {
287336
"pg_read_server_files"
288337
} else {
289338
"pg_write_server_files"
290339
}
291-
} else if copy_from {
292-
PARQUET_OBJECT_STORE_READ_ROLE
293340
} else {
294-
PARQUET_OBJECT_STORE_WRITE_ROLE
341+
// object_store
342+
if copy_from {
343+
PARQUET_OBJECT_STORE_READ_ROLE
344+
} else {
345+
PARQUET_OBJECT_STORE_WRITE_ROLE
346+
}
295347
};
296348

297349
let required_role_id =
298350
unsafe { get_role_oid(required_role_name.to_string().as_pg_cstr(), false) };
299351

300352
let operation_str = if copy_from { "from" } else { "to" };
301-
let object_type = if is_file { "file" } else { "remote uri" };
353+
let object_type = if uri_info.is_program {
354+
"program"
355+
} else if is_file {
356+
"file"
357+
} else {
358+
"remote uri"
359+
};
302360

303361
if !unsafe { has_privs_of_role(user_id, required_role_id) } {
304362
ereport!(

src/parquet_copy_hook.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
pub(crate) mod copy_from;
2+
pub(crate) mod copy_from_program;
23
pub(crate) mod copy_from_stdin;
34
pub(crate) mod copy_to;
45
pub(crate) mod copy_to_dest_receiver;
6+
pub(crate) mod copy_to_program;
57
pub(crate) mod copy_to_split_dest_receiver;
68
pub(crate) mod copy_to_stdout;
79
pub(crate) mod copy_utils;

src/parquet_copy_hook/copy_from.rs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,12 @@ use pgrx::{
1313

1414
use crate::{
1515
arrow_parquet::{parquet_reader::ParquetReaderContext, uri_utils::ParsedUriInfo},
16-
parquet_copy_hook::copy_utils::{
17-
copy_from_stmt_create_option_list, copy_stmt_lock_mode, copy_stmt_relation_oid,
16+
parquet_copy_hook::{
17+
copy_from_program::copy_program_to_file,
18+
copy_utils::{
19+
copy_from_stmt_create_option_list, copy_stmt_is_std_inout, copy_stmt_lock_mode,
20+
copy_stmt_program, copy_stmt_relation_oid,
21+
},
1822
},
1923
};
2024

@@ -117,7 +121,7 @@ pub(crate) fn execute_copy_from(
117121
p_stmt: &PgBox<PlannedStmt>,
118122
query_string: &CStr,
119123
query_env: &PgBox<QueryEnvironment>,
120-
uri_info: &ParsedUriInfo,
124+
mut uri_info: ParsedUriInfo,
121125
) -> u64 {
122126
let rel_oid = copy_stmt_relation_oid(p_stmt);
123127

@@ -146,13 +150,15 @@ pub(crate) fn execute_copy_from(
146150
let match_by = copy_from_stmt_match_by(p_stmt);
147151

148152
unsafe {
149-
if uri_info.stdio_tmp_fd.is_some() {
153+
if let Some(program) = copy_stmt_program(p_stmt) {
154+
copy_program_to_file(&mut uri_info, &program);
155+
} else if copy_stmt_is_std_inout(p_stmt) {
150156
let is_binary = true;
151-
copy_stdin_to_file(uri_info, tupledesc.natts as _, is_binary);
157+
copy_stdin_to_file(&uri_info, tupledesc.natts as _, is_binary);
152158
}
153159

154160
// parquet reader context is used throughout the COPY FROM operation.
155-
let parquet_reader_context = ParquetReaderContext::new(uri_info, match_by, &tupledesc);
161+
let parquet_reader_context = ParquetReaderContext::new(&uri_info, match_by, &tupledesc);
156162
push_parquet_reader_context(parquet_reader_context);
157163

158164
// makes sure to set binary format
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
use crate::arrow_parquet::uri_utils::{uri_as_string, ParsedUriInfo};
2+
3+
pub(crate) unsafe fn copy_program_to_file(uri_info: &mut ParsedUriInfo, program: &str) {
4+
// get tmp file
5+
let path = uri_as_string(&uri_info.uri);
6+
7+
// open and then get pipe file
8+
let copy_from = true;
9+
let mut pipe_file = uri_info.open_program_pipe(program, copy_from);
10+
11+
// create or overwrite the local file
12+
let mut file = std::fs::OpenOptions::new()
13+
.write(true)
14+
.truncate(true)
15+
.create(true)
16+
.open(&path)
17+
.unwrap_or_else(|e| panic!("{}", e));
18+
19+
// Write pipe file to temp file
20+
std::io::copy(&mut pipe_file, &mut file)
21+
.unwrap_or_else(|e| panic!("Failed to copy command stdout to file: {e}"));
22+
}

src/parquet_copy_hook/copy_to_dest_receiver.rs

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,13 @@ use pg_sys::{
1212
};
1313
use pgrx::{prelude::*, FromDatum, PgList, PgMemoryContexts, PgTupleDesc};
1414

15-
use crate::arrow_parquet::{
16-
field_ids::FieldIds,
17-
parquet_writer::ParquetWriterContext,
18-
uri_utils::{ParsedUriInfo, RECORD_BATCH_SIZE},
15+
use crate::{
16+
arrow_parquet::{
17+
field_ids::FieldIds,
18+
parquet_writer::ParquetWriterContext,
19+
uri_utils::{ParsedUriInfo, RECORD_BATCH_SIZE},
20+
},
21+
parquet_copy_hook::copy_to_program::copy_file_to_program,
1922
};
2023

2124
use super::{
@@ -33,6 +36,7 @@ pub(crate) struct CopyToParquetDestReceiver {
3336
collected_tuple_column_sizes: *mut i64,
3437
target_batch_size: i64,
3538
uri: *const c_char,
39+
program: *mut c_char,
3640
is_to_stdout: bool,
3741
copy_options: CopyToParquetOptions,
3842
copy_memory_context: MemoryContext,
@@ -150,6 +154,21 @@ impl CopyToParquetDestReceiver {
150154
}
151155
}
152156

157+
fn copy_to_program(&self) {
158+
if !self.program.is_null() {
159+
let program = unsafe {
160+
CStr::from_ptr(self.program)
161+
.to_str()
162+
.expect("invalid program")
163+
};
164+
165+
let uri = unsafe { CStr::from_ptr(self.uri).to_str().expect("invalid uri") };
166+
let mut uri_info = ParsedUriInfo::try_from(uri).expect("invalid uri");
167+
168+
unsafe { copy_file_to_program(&mut uri_info, program) };
169+
}
170+
}
171+
153172
pub(crate) fn cleanup(&mut self) {
154173
if !self.parquet_writer_context.is_null() {
155174
let parquet_writer_context = unsafe { Box::from_raw(self.parquet_writer_context) };
@@ -301,7 +320,9 @@ pub(crate) extern "C-unwind" fn copy_shutdown(dest: *mut DestReceiver) {
301320

302321
parquet_dest.finish();
303322

304-
if parquet_dest.is_to_stdout {
323+
if !parquet_dest.program.is_null() {
324+
parquet_dest.copy_to_program();
325+
} else if parquet_dest.is_to_stdout {
305326
parquet_dest.copy_to_stdout();
306327
}
307328

@@ -355,6 +376,7 @@ fn tuple_column_sizes(tuple_datums: &[Option<Datum>], tupledesc: &PgTupleDesc) -
355376
#[pg_guard]
356377
pub(crate) extern "C-unwind" fn create_copy_to_parquet_dest_receiver(
357378
uri: *const c_char,
379+
program: *mut c_char,
358380
is_to_stdout: bool,
359381
options: CopyToParquetOptions,
360382
) -> *mut CopyToParquetDestReceiver {
@@ -387,6 +409,7 @@ pub(crate) extern "C-unwind" fn create_copy_to_parquet_dest_receiver(
387409
parquet_dest.dest.rDestroy = Some(copy_destroy);
388410
parquet_dest.dest.mydest = CommandDest::DestCopyOut;
389411
parquet_dest.uri = uri;
412+
parquet_dest.program = program;
390413
parquet_dest.is_to_stdout = is_to_stdout;
391414
parquet_dest.tupledesc = std::ptr::null_mut();
392415
parquet_dest.parquet_writer_context = std::ptr::null_mut();

0 commit comments

Comments
 (0)