Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .devcontainer/.env
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,7 @@ GOOGLE_SERVICE_ACCOUNT_KEY='{"gcs_base_url": "http://localhost:4443","disable_oa
GOOGLE_SERVICE_ENDPOINT=http://localhost:4443

# Others
# run pgrx tests with a single thread to avoid race conditions
RUST_TEST_THREADS=1
# pgrx runs test on the port base_port + pg_version
PGRX_TEST_PG_BASE_PORT=5454
4 changes: 3 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ jobs:
postgresql-server-dev-${{ env.PG_MAJOR }} \
postgresql-client-${{ env.PG_MAJOR }} \
libpq-dev
echo "export PG_MAJOR=${{ env.PG_MAJOR }}" >> $GITHUB_ENV

- name: Install azure-cli
run: |
Expand All @@ -94,7 +95,8 @@ jobs:
- name: Install and configure pgrx
run: |
cargo install --locked [email protected]
cargo pgrx init --pg${{ env.PG_MAJOR }} /usr/lib/postgresql/${{ env.PG_MAJOR }}/bin/pg_config
cargo pgrx init --pg${{ env.PG_MAJOR }} /usr/lib/postgresql/${{ env.PG_MAJOR }}/bin/pg_config \
--base-testing-port $PGRX_TEST_PG_BASE_PORT

- name: Install cargo-llvm-cov for coverage report
run: cargo install --locked [email protected]
Expand Down
14 changes: 12 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
[![CI lints and tests](https://github.com/CrunchyData/pg_parquet/actions/workflows/ci.yml/badge.svg)](https://github.com/CrunchyData/pg_parquet/actions/workflows/ci.yml)
[![codecov](https://codecov.io/gh/CrunchyData/pg_parquet/graph/badge.svg?token=6BPS0DSKJ2)](https://codecov.io/gh/CrunchyData/pg_parquet)

`pg_parquet` is a PostgreSQL extension that allows you to read and write [Parquet files](https://parquet.apache.org), which are located in `S3` or `file system`, from PostgreSQL via `COPY TO/FROM` commands. It depends on [Apache Arrow](https://arrow.apache.org/rust/arrow/) project to read and write Parquet files and [pgrx](https://github.com/pgcentralfoundation/pgrx) project to extend PostgreSQL's `COPY` command.
`pg_parquet` is a PostgreSQL extension that allows you to read and write [Parquet files](https://parquet.apache.org), which are located in `S3`, `Azure Blob Storage`, `Google Cloud Storage`, `http(s) endpoints` or `file system`, from PostgreSQL via `COPY TO/FROM` commands. It depends on [Apache Arrow](https://arrow.apache.org/rust/arrow/) project to read and write Parquet files and [pgrx](https://github.com/pgcentralfoundation/pgrx) project to extend PostgreSQL's `COPY` command.

```sql
-- Copy a query result into Parquet in S3
Expand Down Expand Up @@ -61,7 +61,7 @@ There are mainly 3 things that you can do with `pg_parquet`:
3. You can inspect the schema and metadata of Parquet files.

### COPY to/from Parquet files from/to Postgres tables
You can use PostgreSQL's `COPY` command to read and write Parquet files. Below is an example of how to write a PostgreSQL table, with complex types, into a Parquet file and then to read the Parquet file content back into the same table.
You can use PostgreSQL's `COPY` command to read and write from/to Parquet files. Below is an example of how to write a PostgreSQL table, with complex types, into a Parquet file and then to read the Parquet file content back into the same table.

```sql
-- create composite types
Expand Down Expand Up @@ -99,6 +99,16 @@ COPY product_example FROM '/tmp/product_example.parquet';
SELECT * FROM product_example;
```

You can also use `COPY` command to read and write Parquet stream from/to standard input and output. Below is an example usage (you have to specify `format = parquet`):

```bash
psql -d pg_parquet -p 28817 -h localhost -c "create table product_example_reconstructed (like product_example);"
CREATE TABLE

psql -d pg_parquet -p 28817 -h localhost -c "copy product_example to stdout (format parquet);" | psql -d pg_parquet -p 28817 -h localhost -c "copy product_example_reconstructed from stdin (format parquet);"
COPY 2
```

### Inspect Parquet schema
You can call `SELECT * FROM parquet.schema(<uri>)` to discover the schema of the Parquet file at given uri.

Expand Down
6 changes: 5 additions & 1 deletion src/arrow_parquet/parquet_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,11 @@ pub(crate) struct ParquetReaderContext {
}

impl ParquetReaderContext {
pub(crate) fn new(uri_info: ParsedUriInfo, match_by: MatchBy, tupledesc: &PgTupleDesc) -> Self {
pub(crate) fn new(
uri_info: &ParsedUriInfo,
match_by: MatchBy,
tupledesc: &PgTupleDesc,
) -> Self {
// Postgis and Map contexts are used throughout reading the parquet file.
// We need to reset them to avoid reading the stale data. (e.g. extension could be dropped)
reset_postgis_context();
Expand Down
10 changes: 2 additions & 8 deletions src/arrow_parquet/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl ParquetWriterContext {

let writer_props = Self::writer_props(tupledesc, options);

let parquet_writer = parquet_writer_from_uri(uri_info, schema.clone(), writer_props);
let parquet_writer = parquet_writer_from_uri(&uri_info, schema.clone(), writer_props);

let attribute_contexts =
collect_pg_to_arrow_attribute_contexts(&attributes, &schema.fields);
Expand Down Expand Up @@ -130,7 +130,7 @@ impl ParquetWriterContext {
}

// finalize flushes the in progress rows to a new row group and finally writes metadata to the file.
fn finalize(&mut self) {
pub(crate) fn finalize(&mut self) {
PG_BACKEND_TOKIO_RUNTIME
.block_on(self.parquet_writer.finish())
.unwrap_or_else(|e| panic!("failed to finish parquet writer: {}", e));
Expand All @@ -156,9 +156,3 @@ impl ParquetWriterContext {
RecordBatch::try_new(schema, attribute_arrays).expect("Expected record batch")
}
}

impl Drop for ParquetWriterContext {
fn drop(&mut self) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better to make Drop fail safe.

self.finalize();
}
}
81 changes: 63 additions & 18 deletions src/arrow_parquet/uri_utils.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{panic, sync::Arc};
use std::{ffi::CStr, panic, sync::Arc};

use arrow::datatypes::SchemaRef;
use object_store::{path::Path, ObjectStoreScheme};
Expand All @@ -13,7 +13,11 @@ use parquet::{
};
use pgrx::{
ereport,
pg_sys::{get_role_oid, has_privs_of_role, superuser, AsPgCStr, GetUserId},
ffi::c_char,
pg_sys::{
get_role_oid, has_privs_of_role, palloc0, superuser, AsPgCStr, DataDir, FileClose,
FilePathName, GetUserId, InvalidOid, OpenTemporaryFile, TempTablespacePath, MAXPGPATH,
},
};
use url::Url;

Expand All @@ -29,15 +33,48 @@ const PARQUET_OBJECT_STORE_READ_ROLE: &str = "parquet_object_store_read";
const PARQUET_OBJECT_STORE_WRITE_ROLE: &str = "parquet_object_store_write";

// ParsedUriInfo is a struct that holds the parsed uri information.
#[derive(Debug, Clone)]
#[derive(Debug)]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make no cloneable to make sure Drop runs once

pub(crate) struct ParsedUriInfo {
pub(crate) uri: Url,
pub(crate) bucket: Option<String>,
pub(crate) path: Path,
pub(crate) scheme: ObjectStoreScheme,
pub(crate) stdio_tmp_fd: Option<i32>,
}

impl ParsedUriInfo {
pub(crate) fn for_std_inout() -> Self {
// open temp postgres file, which is removed after transaction ends
let tmp_path_fd = unsafe { OpenTemporaryFile(false) };

let tmp_path = unsafe {
let data_dir = CStr::from_ptr(DataDir).to_str().expect("invalid base dir");

let tmp_tblspace_path: *const c_char = palloc0(MAXPGPATH as _) as _;
TempTablespacePath(tmp_tblspace_path as _, InvalidOid);
let tmp_tblspace_path = CStr::from_ptr(tmp_tblspace_path)
.to_str()
.expect("invalid temp tablespace path");

let tmp_file_path = FilePathName(tmp_path_fd);
let tmp_file_path = CStr::from_ptr(tmp_file_path)
.to_str()
.expect("invalid temp path");

let tmp_path = std::path::Path::new(data_dir)
.join(tmp_tblspace_path)
.join(tmp_file_path);

tmp_path.to_str().expect("invalid tmp path").to_string()
};

let mut parsed_uri = Self::try_from(tmp_path.as_str()).unwrap_or_else(|e| panic!("{}", e));

parsed_uri.stdio_tmp_fd = Some(tmp_path_fd);

parsed_uri
}

fn try_parse_uri(uri: &str) -> Result<Url, String> {
if !uri.contains("://") {
// local file
Expand Down Expand Up @@ -92,10 +129,20 @@ impl TryFrom<&str> for ParsedUriInfo {
bucket,
path,
scheme,
stdio_tmp_fd: None,
})
}
}

impl Drop for ParsedUriInfo {
fn drop(&mut self) {
if let Some(stdio_tmp_fd) = self.stdio_tmp_fd {
// close temp file, postgres api will remove it on close
unsafe { FileClose(stdio_tmp_fd) };
}
}
}

pub(crate) fn uri_as_string(uri: &Url) -> String {
if uri.scheme() == "file" {
// removes file:// prefix from the local path uri
Expand All @@ -109,7 +156,7 @@ pub(crate) fn uri_as_string(uri: &Url) -> String {
uri.to_string()
}

pub(crate) fn parquet_schema_from_uri(uri_info: ParsedUriInfo) -> SchemaDescriptor {
pub(crate) fn parquet_schema_from_uri(uri_info: &ParsedUriInfo) -> SchemaDescriptor {
let parquet_reader = parquet_reader_from_uri(uri_info);

let arrow_schema = parquet_reader.schema();
Expand All @@ -119,19 +166,18 @@ pub(crate) fn parquet_schema_from_uri(uri_info: ParsedUriInfo) -> SchemaDescript
.unwrap_or_else(|e| panic!("{}", e))
}

pub(crate) fn parquet_metadata_from_uri(uri_info: ParsedUriInfo) -> Arc<ParquetMetaData> {
pub(crate) fn parquet_metadata_from_uri(uri_info: &ParsedUriInfo) -> Arc<ParquetMetaData> {
let uri = uri_info.uri.clone();

let copy_from = true;
let (parquet_object_store, location) = get_or_create_object_store(uri_info.clone(), copy_from);
let (parquet_object_store, location) = get_or_create_object_store(uri_info, copy_from);

PG_BACKEND_TOKIO_RUNTIME.block_on(async {
let object_store_meta = parquet_object_store
.head(&location)
.await
.unwrap_or_else(|e| {
panic!(
"failed to get object store metadata for uri {}: {}",
uri_info.uri, e
)
panic!("failed to get object store metadata for uri {}: {}", uri, e)
});

let parquet_object_reader =
Expand All @@ -149,20 +195,19 @@ pub(crate) fn parquet_metadata_from_uri(uri_info: ParsedUriInfo) -> Arc<ParquetM
pub(crate) const RECORD_BATCH_SIZE: i64 = 1024;

pub(crate) fn parquet_reader_from_uri(
uri_info: ParsedUriInfo,
uri_info: &ParsedUriInfo,
) -> ParquetRecordBatchStream<ParquetObjectReader> {
let uri = uri_info.uri.clone();

let copy_from = true;
let (parquet_object_store, location) = get_or_create_object_store(uri_info.clone(), copy_from);
let (parquet_object_store, location) = get_or_create_object_store(uri_info, copy_from);

PG_BACKEND_TOKIO_RUNTIME.block_on(async {
let object_store_meta = parquet_object_store
.head(&location)
.await
.unwrap_or_else(|e| {
panic!(
"failed to get object store metadata for uri {}: {}",
uri_info.uri, e
)
panic!("failed to get object store metadata for uri {}: {}", uri, e)
});

let parquet_object_reader =
Expand Down Expand Up @@ -205,12 +250,12 @@ fn calculate_reader_batch_size(metadata: &Arc<ParquetMetaData>) -> usize {
}

pub(crate) fn parquet_writer_from_uri(
uri_info: ParsedUriInfo,
uri_info: &ParsedUriInfo,
arrow_schema: SchemaRef,
writer_props: WriterProperties,
) -> AsyncArrowWriter<ParquetObjectWriter> {
let copy_from = false;
let (parquet_object_store, location) = get_or_create_object_store(uri_info.clone(), copy_from);
let (parquet_object_store, location) = get_or_create_object_store(uri_info, copy_from);

let parquet_object_writer = ParquetObjectWriter::new(parquet_object_store, location);

Expand Down
14 changes: 6 additions & 8 deletions src/object_store/object_store_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use super::{
static mut OBJECT_STORE_CACHE: Lazy<ObjectStoreCache> = Lazy::new(ObjectStoreCache::new);

pub(crate) fn get_or_create_object_store(
uri_info: ParsedUriInfo,
uri_info: &ParsedUriInfo,
copy_from: bool,
) -> (Arc<dyn ObjectStore>, Path) {
#[allow(static_mut_refs)]
Expand All @@ -45,15 +45,13 @@ impl ObjectStoreCache {

fn get_or_create(
&mut self,
uri_info: ParsedUriInfo,
uri_info: &ParsedUriInfo,
copy_from: bool,
) -> (Arc<dyn ObjectStore>, Path) {
let ParsedUriInfo {
uri,
path,
scheme,
bucket,
} = uri_info;
let uri = uri_info.uri.clone();
let scheme = uri_info.scheme.clone();
let bucket = uri_info.bucket.clone();
let path = uri_info.path.clone();

// no need to cache local files
if scheme == ObjectStoreScheme::Local {
Expand Down
2 changes: 2 additions & 0 deletions src/parquet_copy_hook.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
pub(crate) mod copy_from;
pub(crate) mod copy_from_stdin;
pub(crate) mod copy_to;
pub(crate) mod copy_to_dest_receiver;
pub(crate) mod copy_to_split_dest_receiver;
pub(crate) mod copy_to_stdout;
pub(crate) mod copy_utils;
pub(crate) mod hook;
pub(crate) mod pg_compat;
16 changes: 12 additions & 4 deletions src/parquet_copy_hook/copy_from.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@ use crate::{
},
};

use super::copy_utils::{
copy_from_stmt_match_by, copy_stmt_attribute_list, copy_stmt_create_namespace_item,
copy_stmt_create_parse_state, create_filtered_tupledesc_for_relation,
use super::{
copy_from_stdin::copy_stdin_to_file,
copy_utils::{
copy_from_stmt_match_by, copy_stmt_attribute_list, copy_stmt_create_namespace_item,
copy_stmt_create_parse_state, create_filtered_tupledesc_for_relation,
},
};

// stack to store parquet reader contexts for COPY FROM.
Expand Down Expand Up @@ -113,7 +116,7 @@ pub(crate) fn execute_copy_from(
p_stmt: &PgBox<PlannedStmt>,
query_string: &CStr,
query_env: &PgBox<QueryEnvironment>,
uri_info: ParsedUriInfo,
uri_info: &ParsedUriInfo,
) -> u64 {
let rel_oid = copy_stmt_relation_oid(p_stmt);

Expand All @@ -140,6 +143,11 @@ pub(crate) fn execute_copy_from(
let match_by = copy_from_stmt_match_by(p_stmt);

unsafe {
if uri_info.stdio_tmp_fd.is_some() {
let is_binary = true;
copy_stdin_to_file(uri_info, tupledesc.natts as _, is_binary);
}

// parquet reader context is used throughout the COPY FROM operation.
let parquet_reader_context = ParquetReaderContext::new(uri_info, match_by, &tupledesc);
push_parquet_reader_context(parquet_reader_context);
Expand Down
Loading