Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ aws-credential-types = {version = "1", default-features = false}
azure_storage = {version = "0.21", default-features = false}
futures = "0.3"
home = "0.5"
libc = {version = "0.2", default-features = false }
object_store = {version = "=0.12.2", default-features = false, features = ["aws", "azure", "fs", "gcp", "http"]}
once_cell = "1"
parquet = {version = "56", default-features = false, features = [
Expand Down
25 changes: 21 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ COPY table FROM 's3://mybucket/data.parquet' WITH (format 'parquet');
## Quick Reference
- [Installation From Source](#installation-from-source)
- [Usage](#usage)
- [Copy FROM/TO Parquet files TO/FROM Postgres tables](#copy-tofrom-parquet-files-fromto-postgres-tables)
- [Copy FROM/TO Parquet files TO/FROM Postgres tables](#copy-fromto-parquet-files-tofrom-postgres-tables)
- [COPY FROM/TO Parquet stdin/stdout TO/FROM Postgres tables)](#copy-fromto-parquet-stdinstdout-tofrom-postgres-tables)
- [COPY FROM/TO Parquet program stream TO/FROM Postgres tables)](#copy-fromto-parquet-program-stream-tofrom-postgres-tables)
- [Inspect Parquet schema](#inspect-parquet-schema)
- [Inspect Parquet metadata](#inspect-parquet-metadata)
- [Inspect Parquet column statistics](#inspect-parquet-column-statistics)
Expand Down Expand Up @@ -64,11 +66,11 @@ psql> "CREATE EXTENSION pg_parquet;"

## Usage
There are mainly 3 things that you can do with `pg_parquet`:
1. You can export Postgres tables/queries to Parquet files,
1. You can export Postgres tables/queries to Parquet files, stdin/stdout or a program's stream,
2. You can ingest data from Parquet files to Postgres tables,
3. You can inspect the schema and metadata of Parquet files.

### COPY to/from Parquet files from/to Postgres tables
### COPY from/to Parquet files to/from Postgres tables
You can use PostgreSQL's `COPY` command to read and write from/to Parquet files. Below is an example of how to write a PostgreSQL table, with complex types, into a Parquet file and then to read the Parquet file content back into the same table.

```sql
Expand Down Expand Up @@ -107,7 +109,9 @@ COPY product_example FROM '/tmp/product_example.parquet';
SELECT * FROM product_example;
```

You can also use `COPY` command to read and write Parquet stream from/to standard input and output. Below is an example usage (you have to specify `format = parquet`):
### COPY from/to Parquet stdin/stdout to/from Postgres tables

You can use `COPY` command to read and write Parquet stream from/to standard input and output. Below is an example usage (you have to specify `format = parquet`):

```bash
psql -d pg_parquet -p 28817 -h localhost -c "create table product_example_reconstructed (like product_example);"
Expand All @@ -117,6 +121,19 @@ psql -d pg_parquet -p 28817 -h localhost -c "copy product_example to stdout (for
COPY 2
```

### COPY from/to Parquet program stream to/from Postgres tables

You can use `COPY` command to read and write Parquet stream from/to a program's input and output. Below is an example usage (you have to specify `format = parquet`):

```bash
psql -d pg_parquet -p 28817 -h localhost -c "copy product_example_reconstructed to program 'cat > /tmp/test.parquet' (format parquet);"
COPY 2

psql -d pg_parquet -p 28817 -h localhost -c "copy product_example_reconstructed from program 'cat /tmp/test.parquet' (format parquet);"
COPY 2
```


### Inspect Parquet schema
You can call `SELECT * FROM parquet.schema(<uri>)` to discover the schema of the Parquet file at given uri.

Expand Down
86 changes: 72 additions & 14 deletions src/arrow_parquet/uri_utils.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{ffi::CStr, panic, sync::Arc};
use std::{ffi::CStr, fs::File, os::fd::FromRawFd, panic, sync::Arc};

use arrow::datatypes::SchemaRef;
use object_store::{path::Path, ObjectStoreScheme};
Expand All @@ -15,8 +15,9 @@ use pgrx::{
ereport,
ffi::c_char,
pg_sys::{
get_role_oid, has_privs_of_role, palloc0, superuser, AsPgCStr, DataDir, FileClose,
FilePathName, GetUserId, InvalidOid, OpenTemporaryFile, TempTablespacePath, MAXPGPATH,
get_role_oid, has_privs_of_role, palloc0, superuser, AsPgCStr, ClosePipeStream, DataDir,
FileClose, FilePathName, GetUserId, InvalidOid, OpenPipeStream, OpenTemporaryFile,
TempTablespacePath, MAXPGPATH, PG_BINARY_R, PG_BINARY_W,
},
};
use url::Url;
Expand All @@ -39,11 +40,46 @@ pub(crate) struct ParsedUriInfo {
pub(crate) bucket: Option<String>,
pub(crate) path: Path,
pub(crate) scheme: ObjectStoreScheme,
pub(crate) stdio_tmp_fd: Option<i32>,
// tmp_fd is used as intermediate file for copying data to/from stdin/out or program pipes
pub(crate) tmp_fd: Option<i32>,
// pipe_file is used to hold the pipe file descriptor for copying data to/from a program
// call open_program_pipe to open the pipe to a program
pub(crate) is_program: bool,
pub(crate) pipe_file: *mut libc::FILE,
}

impl ParsedUriInfo {
pub(crate) fn for_std_inout() -> Self {
Self::with_tmp_file()
}

pub(crate) fn for_program() -> Self {
let mut uri_info = Self::with_tmp_file();
uri_info.is_program = true;
uri_info
}

pub(crate) fn open_program_pipe(&mut self, program: &str, copy_from: bool) -> File {
let pipe_mode = if copy_from { PG_BINARY_R } else { PG_BINARY_W };

let pipe_file = unsafe { OpenPipeStream(program.as_pg_cstr(), pipe_mode.as_ptr()) };

if pipe_file.is_null() {
panic!("Failed to open pipe stream for program: {}", program);
}

self.pipe_file = pipe_file as _;

let fd = unsafe { libc::fileno(self.pipe_file) };

if fd < 0 {
panic!("Failed to get file descriptor for pipe stream: {}", program);
}

unsafe { File::from_raw_fd(fd) }
}

fn with_tmp_file() -> Self {
// open temp postgres file, which is removed after transaction ends
let tmp_path_fd = unsafe { OpenTemporaryFile(false) };

Expand All @@ -70,11 +106,15 @@ impl ParsedUriInfo {

let mut parsed_uri = Self::try_from(tmp_path.as_str()).unwrap_or_else(|e| panic!("{}", e));

parsed_uri.stdio_tmp_fd = Some(tmp_path_fd);
parsed_uri.tmp_fd = Some(tmp_path_fd);

parsed_uri
}

fn is_std_inout(&self) -> bool {
self.tmp_fd.is_some() && !self.is_program
}

fn try_parse_uri(uri: &str) -> Result<Url, String> {
if !uri.contains("://") {
// local file
Expand Down Expand Up @@ -128,16 +168,23 @@ impl TryFrom<&str> for ParsedUriInfo {
bucket,
path,
scheme,
stdio_tmp_fd: None,
tmp_fd: None,
is_program: false,
pipe_file: std::ptr::null_mut(),
})
}
}

impl Drop for ParsedUriInfo {
fn drop(&mut self) {
if let Some(stdio_tmp_fd) = self.stdio_tmp_fd {
if let Some(tmp_fd) = self.tmp_fd {
// close temp file, postgres api will remove it on close
unsafe { FileClose(stdio_tmp_fd) };
unsafe { FileClose(tmp_fd) };
}

if !self.pipe_file.is_null() {
// close pipe file, postgres api will remove it on close
unsafe { ClosePipeStream(self.pipe_file as _) };
}
}
}
Expand Down Expand Up @@ -275,30 +322,41 @@ pub(crate) fn ensure_access_privilege_to_uri(uri_info: &ParsedUriInfo, copy_from
}

// permission check is not needed for stdin/out
if uri_info.stdio_tmp_fd.is_some() {
if uri_info.is_std_inout() {
return;
}

let user_id = unsafe { GetUserId() };
let is_file = uri_info.uri.scheme() == "file";

let required_role_name = if is_file {
let required_role_name = if uri_info.is_program {
"pg_execute_server_program"
} else if is_file {
if copy_from {
"pg_read_server_files"
} else {
"pg_write_server_files"
}
} else if copy_from {
PARQUET_OBJECT_STORE_READ_ROLE
} else {
PARQUET_OBJECT_STORE_WRITE_ROLE
// object_store
if copy_from {
PARQUET_OBJECT_STORE_READ_ROLE
} else {
PARQUET_OBJECT_STORE_WRITE_ROLE
}
};

let required_role_id =
unsafe { get_role_oid(required_role_name.to_string().as_pg_cstr(), false) };

let operation_str = if copy_from { "from" } else { "to" };
let object_type = if is_file { "file" } else { "remote uri" };
let object_type = if uri_info.is_program {
"program"
} else if is_file {
"file"
} else {
"remote uri"
};

if !unsafe { has_privs_of_role(user_id, required_role_id) } {
ereport!(
Expand Down
2 changes: 2 additions & 0 deletions src/parquet_copy_hook.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
pub(crate) mod copy_from;
pub(crate) mod copy_from_program;
pub(crate) mod copy_from_stdin;
pub(crate) mod copy_to;
pub(crate) mod copy_to_dest_receiver;
pub(crate) mod copy_to_program;
pub(crate) mod copy_to_split_dest_receiver;
pub(crate) mod copy_to_stdout;
pub(crate) mod copy_utils;
Expand Down
18 changes: 12 additions & 6 deletions src/parquet_copy_hook/copy_from.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,12 @@ use pgrx::{

use crate::{
arrow_parquet::{parquet_reader::ParquetReaderContext, uri_utils::ParsedUriInfo},
parquet_copy_hook::copy_utils::{
copy_from_stmt_create_option_list, copy_stmt_lock_mode, copy_stmt_relation_oid,
parquet_copy_hook::{
copy_from_program::copy_program_to_file,
copy_utils::{
copy_from_stmt_create_option_list, copy_stmt_is_std_inout, copy_stmt_lock_mode,
copy_stmt_program, copy_stmt_relation_oid,
},
},
};

Expand Down Expand Up @@ -117,7 +121,7 @@ pub(crate) fn execute_copy_from(
p_stmt: &PgBox<PlannedStmt>,
query_string: &CStr,
query_env: &PgBox<QueryEnvironment>,
uri_info: &ParsedUriInfo,
mut uri_info: ParsedUriInfo,
) -> u64 {
let rel_oid = copy_stmt_relation_oid(p_stmt);

Expand Down Expand Up @@ -146,13 +150,15 @@ pub(crate) fn execute_copy_from(
let match_by = copy_from_stmt_match_by(p_stmt);

unsafe {
if uri_info.stdio_tmp_fd.is_some() {
if let Some(program) = copy_stmt_program(p_stmt) {
copy_program_to_file(&mut uri_info, &program);
} else if copy_stmt_is_std_inout(p_stmt) {
let is_binary = true;
copy_stdin_to_file(uri_info, tupledesc.natts as _, is_binary);
copy_stdin_to_file(&uri_info, tupledesc.natts as _, is_binary);
}

// parquet reader context is used throughout the COPY FROM operation.
let parquet_reader_context = ParquetReaderContext::new(uri_info, match_by, &tupledesc);
let parquet_reader_context = ParquetReaderContext::new(&uri_info, match_by, &tupledesc);
push_parquet_reader_context(parquet_reader_context);

// makes sure to set binary format
Expand Down
22 changes: 22 additions & 0 deletions src/parquet_copy_hook/copy_from_program.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use crate::arrow_parquet::uri_utils::{uri_as_string, ParsedUriInfo};

pub(crate) unsafe fn copy_program_to_file(uri_info: &mut ParsedUriInfo, program: &str) {
// get tmp file
let path = uri_as_string(&uri_info.uri);

// open and then get pipe file
let copy_from = true;
let mut pipe_file = uri_info.open_program_pipe(program, copy_from);

// create or overwrite the local file
let mut file = std::fs::OpenOptions::new()
.write(true)
.truncate(true)
.create(true)
.open(&path)
.unwrap_or_else(|e| panic!("{}", e));

// Write pipe file to temp file
std::io::copy(&mut pipe_file, &mut file)
.unwrap_or_else(|e| panic!("Failed to copy command stdout to file: {e}"));
}
33 changes: 28 additions & 5 deletions src/parquet_copy_hook/copy_to_dest_receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@ use pg_sys::{
};
use pgrx::{prelude::*, FromDatum, PgList, PgMemoryContexts, PgTupleDesc};

use crate::arrow_parquet::{
field_ids::FieldIds,
parquet_writer::ParquetWriterContext,
uri_utils::{ParsedUriInfo, RECORD_BATCH_SIZE},
use crate::{
arrow_parquet::{
field_ids::FieldIds,
parquet_writer::ParquetWriterContext,
uri_utils::{ParsedUriInfo, RECORD_BATCH_SIZE},
},
parquet_copy_hook::copy_to_program::copy_file_to_program,
};

use super::{
Expand All @@ -33,6 +36,7 @@ pub(crate) struct CopyToParquetDestReceiver {
collected_tuple_column_sizes: *mut i64,
target_batch_size: i64,
uri: *const c_char,
program: *mut c_char,
is_to_stdout: bool,
copy_options: CopyToParquetOptions,
copy_memory_context: MemoryContext,
Expand Down Expand Up @@ -150,6 +154,21 @@ impl CopyToParquetDestReceiver {
}
}

fn copy_to_program(&self) {
if !self.program.is_null() {
let program = unsafe {
CStr::from_ptr(self.program)
.to_str()
.expect("invalid program")
};

let uri = unsafe { CStr::from_ptr(self.uri).to_str().expect("invalid uri") };
let mut uri_info = ParsedUriInfo::try_from(uri).expect("invalid uri");

unsafe { copy_file_to_program(&mut uri_info, program) };
}
}

pub(crate) fn cleanup(&mut self) {
if !self.parquet_writer_context.is_null() {
let parquet_writer_context = unsafe { Box::from_raw(self.parquet_writer_context) };
Expand Down Expand Up @@ -301,7 +320,9 @@ pub(crate) extern "C-unwind" fn copy_shutdown(dest: *mut DestReceiver) {

parquet_dest.finish();

if parquet_dest.is_to_stdout {
if !parquet_dest.program.is_null() {
parquet_dest.copy_to_program();
} else if parquet_dest.is_to_stdout {
parquet_dest.copy_to_stdout();
}

Expand Down Expand Up @@ -355,6 +376,7 @@ fn tuple_column_sizes(tuple_datums: &[Option<Datum>], tupledesc: &PgTupleDesc) -
#[pg_guard]
pub(crate) extern "C-unwind" fn create_copy_to_parquet_dest_receiver(
uri: *const c_char,
program: *mut c_char,
is_to_stdout: bool,
options: CopyToParquetOptions,
) -> *mut CopyToParquetDestReceiver {
Expand Down Expand Up @@ -387,6 +409,7 @@ pub(crate) extern "C-unwind" fn create_copy_to_parquet_dest_receiver(
parquet_dest.dest.rDestroy = Some(copy_destroy);
parquet_dest.dest.mydest = CommandDest::DestCopyOut;
parquet_dest.uri = uri;
parquet_dest.program = program;
parquet_dest.is_to_stdout = is_to_stdout;
parquet_dest.tupledesc = std::ptr::null_mut();
parquet_dest.parquet_writer_context = std::ptr::null_mut();
Expand Down
Loading