Skip to content

Commit 78499b6

Browse files
committed
readme
1 parent 76ca0ac commit 78499b6

File tree

4 files changed

+33
-10
lines changed

4 files changed

+33
-10
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ aws-credential-types = {version = "1", default-features = false}
2828
azure_storage = {version = "0.21", default-features = false}
2929
futures = "0.3"
3030
home = "0.5"
31+
libc = {version = "0.2", default-features = false, features = ["std"] }
3132
object_store = {version = "=0.12.2", default-features = false, features = ["aws", "azure", "fs", "gcp", "http"]}
3233
once_cell = "1"
3334
parquet = {version = "55", default-features = false, features = [

README.md

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@ COPY table FROM 's3://mybucket/data.parquet' WITH (format 'parquet');
1818
## Quick Reference
1919
- [Installation From Source](#installation-from-source)
2020
- [Usage](#usage)
21-
- [Copy FROM/TO Parquet files TO/FROM Postgres tables](#copy-tofrom-parquet-files-fromto-postgres-tables)
21+
- [Copy FROM/TO Parquet files TO/FROM Postgres tables](#copy-fromto-parquet-files-tofrom-postgres-tables)
22+
- [COPY FROM/TO Parquet stdin/stdout TO/FROM Postgres tables)](#copy-fromto-parquet-stdinstdout-tofrom-postgres-tables)
23+
- [COPY FROM/TO Parquet program stream TO/FROM Postgres tables)](#copy-fromto-parquet-program-stream-tofrom-postgres-tables)
2224
- [Inspect Parquet schema](#inspect-parquet-schema)
2325
- [Inspect Parquet metadata](#inspect-parquet-metadata)
2426
- [Inspect Parquet column statistics](#inspect-parquet-column-statistics)
@@ -56,11 +58,11 @@ psql> "CREATE EXTENSION pg_parquet;"
5658

5759
## Usage
5860
There are mainly 3 things that you can do with `pg_parquet`:
59-
1. You can export Postgres tables/queries to Parquet files,
61+
1. You can export Postgres tables/queries to Parquet files, stdin/stdout or a program's stream,
6062
2. You can ingest data from Parquet files to Postgres tables,
6163
3. You can inspect the schema and metadata of Parquet files.
6264

63-
### COPY to/from Parquet files from/to Postgres tables
65+
### COPY from/to Parquet files to/from Postgres tables
6466
You can use PostgreSQL's `COPY` command to read and write from/to Parquet files. Below is an example of how to write a PostgreSQL table, with complex types, into a Parquet file and then to read the Parquet file content back into the same table.
6567

6668
```sql
@@ -99,7 +101,9 @@ COPY product_example FROM '/tmp/product_example.parquet';
99101
SELECT * FROM product_example;
100102
```
101103

102-
You can also use `COPY` command to read and write Parquet stream from/to standard input and output. Below is an example usage (you have to specify `format = parquet`):
104+
### COPY from/to Parquet stdin/stdout to/from Postgres tables
105+
106+
You can use `COPY` command to read and write Parquet stream from/to standard input and output. Below is an example usage (you have to specify `format = parquet`):
103107

104108
```bash
105109
psql -d pg_parquet -p 28817 -h localhost -c "create table product_example_reconstructed (like product_example);"
@@ -109,6 +113,19 @@ psql -d pg_parquet -p 28817 -h localhost -c "copy product_example to stdout (for
109113
COPY 2
110114
```
111115

116+
### COPY from/to Parquet program stream to/from Postgres tables
117+
118+
You can use `COPY` command to read and write Parquet stream from/to a program's input and output. Below is an example usage (you have to specify `format = parquet`):
119+
120+
```bash
121+
psql -d pg_parquet -p 28817 -h localhost -c "copy product_example_reconstructed to program 'cat > /tmp/test.parquet' (format parquet);"
122+
COPY 2
123+
124+
psql -d pg_parquet -p 28817 -h localhost -c "copy product_example_reconstructed from program 'cat /tmp/test.parquet' (format parquet);"
125+
COPY 2
126+
```
127+
128+
112129
### Inspect Parquet schema
113130
You can call `SELECT * FROM parquet.schema(<uri>)` to discover the schema of the Parquet file at given uri.
114131

src/arrow_parquet/uri_utils.rs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use pgrx::{
1717
pg_sys::{
1818
get_role_oid, has_privs_of_role, palloc0, superuser, AsPgCStr, ClosePipeStream, DataDir,
1919
FileClose, FilePathName, GetUserId, InvalidOid, OpenPipeStream, OpenTemporaryFile,
20-
TempTablespacePath, __sFILE, MAXPGPATH, PG_BINARY_R, PG_BINARY_W,
20+
TempTablespacePath, MAXPGPATH, PG_BINARY_R, PG_BINARY_W,
2121
},
2222
};
2323
use url::Url;
@@ -45,7 +45,7 @@ pub(crate) struct ParsedUriInfo {
4545
// pipe_file is used to hold the pipe file descriptor for copying data to/from a program
4646
// call open_program_pipe to open the pipe to a program
4747
pub(crate) is_program: bool,
48-
pub(crate) pipe_file: *mut __sFILE,
48+
pub(crate) pipe_file: *mut libc::FILE,
4949
}
5050

5151
impl ParsedUriInfo {
@@ -68,11 +68,15 @@ impl ParsedUriInfo {
6868
panic!("Failed to open pipe stream for program: {}", program);
6969
}
7070

71-
self.pipe_file = pipe_file;
71+
self.pipe_file = pipe_file as _;
7272

73-
let pipe_fd = (unsafe { *self.pipe_file })._file;
73+
let fd = unsafe { libc::fileno(self.pipe_file) };
7474

75-
unsafe { File::from_raw_fd(pipe_fd as _) }
75+
if fd < 0 {
76+
panic!("Failed to get file descriptor for pipe stream: {}", program);
77+
}
78+
79+
unsafe { File::from_raw_fd(fd) }
7680
}
7781

7882
fn with_tmp_file() -> Self {
@@ -180,7 +184,7 @@ impl Drop for ParsedUriInfo {
180184

181185
if !self.pipe_file.is_null() {
182186
// close pipe file, postgres api will remove it on close
183-
unsafe { ClosePipeStream(self.pipe_file) };
187+
unsafe { ClosePipeStream(self.pipe_file as _) };
184188
}
185189
}
186190
}

0 commit comments

Comments
 (0)