Skip to content

Commit 36bf1c8

Browse files
committed
readme
1 parent 324434d commit 36bf1c8

File tree

4 files changed

+33
-10
lines changed

4 files changed

+33
-10
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ aws-credential-types = {version = "1", default-features = false}
2929
azure_storage = {version = "0.21", default-features = false}
3030
futures = "0.3"
3131
home = "0.5"
32+
libc = {version = "0.2", default-features = false }
3233
object_store = {version = "=0.12.2", default-features = false, features = ["aws", "azure", "fs", "gcp", "http"]}
3334
once_cell = "1"
3435
parquet = {version = "56", default-features = false, features = [

README.md

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@ COPY table FROM 's3://mybucket/data.parquet' WITH (format 'parquet');
1818
## Quick Reference
1919
- [Installation From Source](#installation-from-source)
2020
- [Usage](#usage)
21-
- [Copy FROM/TO Parquet files TO/FROM Postgres tables](#copy-tofrom-parquet-files-fromto-postgres-tables)
21+
- [Copy FROM/TO Parquet files TO/FROM Postgres tables](#copy-fromto-parquet-files-tofrom-postgres-tables)
22+
- [COPY FROM/TO Parquet stdin/stdout TO/FROM Postgres tables)](#copy-fromto-parquet-stdinstdout-tofrom-postgres-tables)
23+
- [COPY FROM/TO Parquet program stream TO/FROM Postgres tables)](#copy-fromto-parquet-program-stream-tofrom-postgres-tables)
2224
- [Inspect Parquet schema](#inspect-parquet-schema)
2325
- [Inspect Parquet metadata](#inspect-parquet-metadata)
2426
- [Inspect Parquet column statistics](#inspect-parquet-column-statistics)
@@ -64,11 +66,11 @@ psql> "CREATE EXTENSION pg_parquet;"
6466

6567
## Usage
6668
There are mainly 3 things that you can do with `pg_parquet`:
67-
1. You can export Postgres tables/queries to Parquet files,
69+
1. You can export Postgres tables/queries to Parquet files, stdin/stdout or a program's stream,
6870
2. You can ingest data from Parquet files to Postgres tables,
6971
3. You can inspect the schema and metadata of Parquet files.
7072

71-
### COPY to/from Parquet files from/to Postgres tables
73+
### COPY from/to Parquet files to/from Postgres tables
7274
You can use PostgreSQL's `COPY` command to read and write from/to Parquet files. Below is an example of how to write a PostgreSQL table, with complex types, into a Parquet file and then to read the Parquet file content back into the same table.
7375

7476
```sql
@@ -107,7 +109,9 @@ COPY product_example FROM '/tmp/product_example.parquet';
107109
SELECT * FROM product_example;
108110
```
109111

110-
You can also use `COPY` command to read and write Parquet stream from/to standard input and output. Below is an example usage (you have to specify `format = parquet`):
112+
### COPY from/to Parquet stdin/stdout to/from Postgres tables
113+
114+
You can use `COPY` command to read and write Parquet stream from/to standard input and output. Below is an example usage (you have to specify `format = parquet`):
111115

112116
```bash
113117
psql -d pg_parquet -p 28817 -h localhost -c "create table product_example_reconstructed (like product_example);"
@@ -117,6 +121,19 @@ psql -d pg_parquet -p 28817 -h localhost -c "copy product_example to stdout (for
117121
COPY 2
118122
```
119123

124+
### COPY from/to Parquet program stream to/from Postgres tables
125+
126+
You can use `COPY` command to read and write Parquet stream from/to a program's input and output. Below is an example usage (you have to specify `format = parquet`):
127+
128+
```bash
129+
psql -d pg_parquet -p 28817 -h localhost -c "copy product_example_reconstructed to program 'cat > /tmp/test.parquet' (format parquet);"
130+
COPY 2
131+
132+
psql -d pg_parquet -p 28817 -h localhost -c "copy product_example_reconstructed from program 'cat /tmp/test.parquet' (format parquet);"
133+
COPY 2
134+
```
135+
136+
120137
### Inspect Parquet schema
121138
You can call `SELECT * FROM parquet.schema(<uri>)` to discover the schema of the Parquet file at given uri.
122139

src/arrow_parquet/uri_utils.rs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use pgrx::{
1717
pg_sys::{
1818
get_role_oid, has_privs_of_role, palloc0, superuser, AsPgCStr, ClosePipeStream, DataDir,
1919
FileClose, FilePathName, GetUserId, InvalidOid, OpenPipeStream, OpenTemporaryFile,
20-
TempTablespacePath, __sFILE, MAXPGPATH, PG_BINARY_R, PG_BINARY_W,
20+
TempTablespacePath, MAXPGPATH, PG_BINARY_R, PG_BINARY_W,
2121
},
2222
};
2323
use url::Url;
@@ -45,7 +45,7 @@ pub(crate) struct ParsedUriInfo {
4545
// pipe_file is used to hold the pipe file descriptor for copying data to/from a program
4646
// call open_program_pipe to open the pipe to a program
4747
pub(crate) is_program: bool,
48-
pub(crate) pipe_file: *mut __sFILE,
48+
pub(crate) pipe_file: *mut libc::FILE,
4949
}
5050

5151
impl ParsedUriInfo {
@@ -68,11 +68,15 @@ impl ParsedUriInfo {
6868
panic!("Failed to open pipe stream for program: {}", program);
6969
}
7070

71-
self.pipe_file = pipe_file;
71+
self.pipe_file = pipe_file as _;
7272

73-
let pipe_fd = (unsafe { *self.pipe_file })._file;
73+
let fd = unsafe { libc::fileno(self.pipe_file) };
7474

75-
unsafe { File::from_raw_fd(pipe_fd as _) }
75+
if fd < 0 {
76+
panic!("Failed to get file descriptor for pipe stream: {}", program);
77+
}
78+
79+
unsafe { File::from_raw_fd(fd) }
7680
}
7781

7882
fn with_tmp_file() -> Self {
@@ -180,7 +184,7 @@ impl Drop for ParsedUriInfo {
180184

181185
if !self.pipe_file.is_null() {
182186
// close pipe file, postgres api will remove it on close
183-
unsafe { ClosePipeStream(self.pipe_file) };
187+
unsafe { ClosePipeStream(self.pipe_file as _) };
184188
}
185189
}
186190
}

0 commit comments

Comments
 (0)