Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .devcontainer/.env
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,7 @@ GOOGLE_SERVICE_ACCOUNT_KEY='{"gcs_base_url": "http://localhost:4443","disable_oa
GOOGLE_SERVICE_ENDPOINT=http://localhost:4443

# Others
# run pgrx tests with a single thread to avoid race conditions
RUST_TEST_THREADS=1
# pgrx runs test on the port base_port + pg_version
PGRX_TEST_PG_BASE_PORT=5454
4 changes: 3 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ jobs:
postgresql-server-dev-${{ env.PG_MAJOR }} \
postgresql-client-${{ env.PG_MAJOR }} \
libpq-dev
echo "export PG_MAJOR=${{ env.PG_MAJOR }}" >> $GITHUB_ENV

- name: Install azure-cli
run: |
Expand All @@ -94,7 +95,8 @@ jobs:
- name: Install and configure pgrx
run: |
cargo install --locked [email protected]
cargo pgrx init --pg${{ env.PG_MAJOR }} /usr/lib/postgresql/${{ env.PG_MAJOR }}/bin/pg_config
cargo pgrx init --pg${{ env.PG_MAJOR }} /usr/lib/postgresql/${{ env.PG_MAJOR }}/bin/pg_config \
--base-testing-port $PGRX_TEST_PG_BASE_PORT

- name: Install cargo-llvm-cov for coverage report
run: cargo install --locked [email protected]
Expand Down
6 changes: 5 additions & 1 deletion src/arrow_parquet/parquet_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,11 @@ pub(crate) struct ParquetReaderContext {
}

impl ParquetReaderContext {
pub(crate) fn new(uri_info: ParsedUriInfo, match_by: MatchBy, tupledesc: &PgTupleDesc) -> Self {
pub(crate) fn new(
uri_info: &ParsedUriInfo,
match_by: MatchBy,
tupledesc: &PgTupleDesc,
) -> Self {
// Postgis and Map contexts are used throughout reading the parquet file.
// We need to reset them to avoid reading the stale data. (e.g. extension could be dropped)
reset_postgis_context();
Expand Down
10 changes: 2 additions & 8 deletions src/arrow_parquet/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl ParquetWriterContext {

let writer_props = Self::writer_props(tupledesc, options);

let parquet_writer = parquet_writer_from_uri(uri_info, schema.clone(), writer_props);
let parquet_writer = parquet_writer_from_uri(&uri_info, schema.clone(), writer_props);

let attribute_contexts =
collect_pg_to_arrow_attribute_contexts(&attributes, &schema.fields);
Expand Down Expand Up @@ -130,7 +130,7 @@ impl ParquetWriterContext {
}

// finalize flushes the in progress rows to a new row group and finally writes metadata to the file.
fn finalize(&mut self) {
pub(crate) fn finalize(&mut self) {
PG_BACKEND_TOKIO_RUNTIME
.block_on(self.parquet_writer.finish())
.unwrap_or_else(|e| panic!("failed to finish parquet writer: {}", e));
Expand All @@ -156,9 +156,3 @@ impl ParquetWriterContext {
RecordBatch::try_new(schema, attribute_arrays).expect("Expected record batch")
}
}

impl Drop for ParquetWriterContext {
fn drop(&mut self) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better to make Drop fail safe.

self.finalize();
}
}
81 changes: 63 additions & 18 deletions src/arrow_parquet/uri_utils.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{panic, sync::Arc};
use std::{ffi::CStr, panic, sync::Arc};

use arrow::datatypes::SchemaRef;
use object_store::{path::Path, ObjectStoreScheme};
Expand All @@ -13,7 +13,11 @@
};
use pgrx::{
ereport,
pg_sys::{get_role_oid, has_privs_of_role, superuser, AsPgCStr, GetUserId},
ffi::c_char,
pg_sys::{
get_role_oid, has_privs_of_role, palloc0, superuser, AsPgCStr, DataDir, FileClose,
FilePathName, GetUserId, InvalidOid, OpenTemporaryFile, TempTablespacePath, MAXPGPATH,
},
};
use url::Url;

Expand All @@ -29,15 +33,48 @@
const PARQUET_OBJECT_STORE_WRITE_ROLE: &str = "parquet_object_store_write";

// ParsedUriInfo is a struct that holds the parsed uri information.
#[derive(Debug, Clone)]
#[derive(Debug)]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make no cloneable to make sure Drop runs once

pub(crate) struct ParsedUriInfo {
pub(crate) uri: Url,
pub(crate) bucket: Option<String>,
pub(crate) path: Path,
pub(crate) scheme: ObjectStoreScheme,
pub(crate) stdio_tmp_fd: Option<i32>,
}

impl ParsedUriInfo {
pub(crate) fn for_stdout() -> Self {
// open temp postgres file, which is removed after transaction ends
let tmp_path_fd = unsafe { OpenTemporaryFile(false) };

let tmp_path = unsafe {
let data_dir = CStr::from_ptr(DataDir).to_str().expect("invalid base dir");

let tmp_tblspace_path: *const c_char = palloc0(MAXPGPATH as _) as _;
TempTablespacePath(tmp_tblspace_path as _, InvalidOid);
let tmp_tblspace_path = CStr::from_ptr(tmp_tblspace_path)
.to_str()
.expect("invalid temp tablespace path");

let tmp_file_path = FilePathName(tmp_path_fd);
let tmp_file_path = CStr::from_ptr(tmp_file_path)
.to_str()
.expect("invalid temp path");

let tmp_path = std::path::Path::new(data_dir)
.join(tmp_tblspace_path)
.join(tmp_file_path);

tmp_path.to_str().expect("invalid tmp path").to_string()
};

let mut parsed_uri = Self::try_from(tmp_path.as_str()).unwrap_or_else(|e| panic!("{}", e));

parsed_uri.stdio_tmp_fd = Some(tmp_path_fd);

parsed_uri
}

fn try_parse_uri(uri: &str) -> Result<Url, String> {
if !uri.contains("://") {
// local file
Expand Down Expand Up @@ -92,10 +129,20 @@
bucket,
path,
scheme,
stdio_tmp_fd: None,
})
}
}

impl Drop for ParsedUriInfo {
fn drop(&mut self) {
if let Some(stdio_tmp_fd) = self.stdio_tmp_fd {
// close temp file, postgres api will remove it on close
unsafe { FileClose(stdio_tmp_fd) };
}
}
}

pub(crate) fn uri_as_string(uri: &Url) -> String {
if uri.scheme() == "file" {
// removes file:// prefix from the local path uri
Expand All @@ -109,7 +156,7 @@
uri.to_string()
}

pub(crate) fn parquet_schema_from_uri(uri_info: ParsedUriInfo) -> SchemaDescriptor {
pub(crate) fn parquet_schema_from_uri(uri_info: &ParsedUriInfo) -> SchemaDescriptor {
let parquet_reader = parquet_reader_from_uri(uri_info);

let arrow_schema = parquet_reader.schema();
Expand All @@ -119,19 +166,18 @@
.unwrap_or_else(|e| panic!("{}", e))
}

pub(crate) fn parquet_metadata_from_uri(uri_info: ParsedUriInfo) -> Arc<ParquetMetaData> {
pub(crate) fn parquet_metadata_from_uri(uri_info: &ParsedUriInfo) -> Arc<ParquetMetaData> {
let uri = uri_info.uri.clone();

let copy_from = true;
let (parquet_object_store, location) = get_or_create_object_store(uri_info.clone(), copy_from);
let (parquet_object_store, location) = get_or_create_object_store(uri_info, copy_from);

PG_BACKEND_TOKIO_RUNTIME.block_on(async {
let object_store_meta = parquet_object_store
.head(&location)
.await
.unwrap_or_else(|e| {
panic!(
"failed to get object store metadata for uri {}: {}",
uri_info.uri, e
)
panic!("failed to get object store metadata for uri {}: {}", uri, e)

Check warning on line 180 in src/arrow_parquet/uri_utils.rs

View check run for this annotation

Codecov / codecov/patch

src/arrow_parquet/uri_utils.rs#L180

Added line #L180 was not covered by tests
});

let parquet_object_reader =
Expand All @@ -149,20 +195,19 @@
pub(crate) const RECORD_BATCH_SIZE: i64 = 1024;

pub(crate) fn parquet_reader_from_uri(
uri_info: ParsedUriInfo,
uri_info: &ParsedUriInfo,
) -> ParquetRecordBatchStream<ParquetObjectReader> {
let uri = uri_info.uri.clone();

let copy_from = true;
let (parquet_object_store, location) = get_or_create_object_store(uri_info.clone(), copy_from);
let (parquet_object_store, location) = get_or_create_object_store(uri_info, copy_from);

PG_BACKEND_TOKIO_RUNTIME.block_on(async {
let object_store_meta = parquet_object_store
.head(&location)
.await
.unwrap_or_else(|e| {
panic!(
"failed to get object store metadata for uri {}: {}",
uri_info.uri, e
)
panic!("failed to get object store metadata for uri {}: {}", uri, e)
});

let parquet_object_reader =
Expand Down Expand Up @@ -205,12 +250,12 @@
}

pub(crate) fn parquet_writer_from_uri(
uri_info: ParsedUriInfo,
uri_info: &ParsedUriInfo,
arrow_schema: SchemaRef,
writer_props: WriterProperties,
) -> AsyncArrowWriter<ParquetObjectWriter> {
let copy_from = false;
let (parquet_object_store, location) = get_or_create_object_store(uri_info.clone(), copy_from);
let (parquet_object_store, location) = get_or_create_object_store(uri_info, copy_from);

let parquet_object_writer = ParquetObjectWriter::new(parquet_object_store, location);

Expand Down
14 changes: 6 additions & 8 deletions src/object_store/object_store_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use super::{
static mut OBJECT_STORE_CACHE: Lazy<ObjectStoreCache> = Lazy::new(ObjectStoreCache::new);

pub(crate) fn get_or_create_object_store(
uri_info: ParsedUriInfo,
uri_info: &ParsedUriInfo,
copy_from: bool,
) -> (Arc<dyn ObjectStore>, Path) {
#[allow(static_mut_refs)]
Expand All @@ -45,15 +45,13 @@ impl ObjectStoreCache {

fn get_or_create(
&mut self,
uri_info: ParsedUriInfo,
uri_info: &ParsedUriInfo,
copy_from: bool,
) -> (Arc<dyn ObjectStore>, Path) {
let ParsedUriInfo {
uri,
path,
scheme,
bucket,
} = uri_info;
let uri = uri_info.uri.clone();
let scheme = uri_info.scheme.clone();
let bucket = uri_info.bucket.clone();
let path = uri_info.path.clone();

// no need to cache local files
if scheme == ObjectStoreScheme::Local {
Expand Down
2 changes: 2 additions & 0 deletions src/parquet_copy_hook.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
pub(crate) mod copy_from;
pub(crate) mod copy_from_stdin;
pub(crate) mod copy_to;
pub(crate) mod copy_to_dest_receiver;
pub(crate) mod copy_to_split_dest_receiver;
pub(crate) mod copy_to_stdout;
pub(crate) mod copy_utils;
pub(crate) mod hook;
pub(crate) mod pg_compat;
16 changes: 12 additions & 4 deletions src/parquet_copy_hook/copy_from.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@ use crate::{
},
};

use super::copy_utils::{
copy_from_stmt_match_by, copy_stmt_attribute_list, copy_stmt_create_namespace_item,
copy_stmt_create_parse_state, create_filtered_tupledesc_for_relation,
use super::{
copy_from_stdin::copy_stdin_to_file,
copy_utils::{
copy_from_stmt_match_by, copy_stmt_attribute_list, copy_stmt_create_namespace_item,
copy_stmt_create_parse_state, create_filtered_tupledesc_for_relation,
},
};

// stack to store parquet reader contexts for COPY FROM.
Expand Down Expand Up @@ -113,7 +116,7 @@ pub(crate) fn execute_copy_from(
p_stmt: &PgBox<PlannedStmt>,
query_string: &CStr,
query_env: &PgBox<QueryEnvironment>,
uri_info: ParsedUriInfo,
uri_info: &ParsedUriInfo,
) -> u64 {
let rel_oid = copy_stmt_relation_oid(p_stmt);

Expand All @@ -140,6 +143,11 @@ pub(crate) fn execute_copy_from(
let match_by = copy_from_stmt_match_by(p_stmt);

unsafe {
if uri_info.stdio_tmp_fd.is_some() {
let is_binary = true;
copy_stdin_to_file(uri_info, tupledesc.natts as _, is_binary);
}

// parquet reader context is used throughout the COPY FROM operation.
let parquet_reader_context = ParquetReaderContext::new(uri_info, match_by, &tupledesc);
push_parquet_reader_context(parquet_reader_context);
Expand Down
Loading