Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions src/arrow_parquet/parquet_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use pgrx::{
vardata_any, varsize_any_exhdr, void_mut_ptr, AllocatedByPostgres, PgBox, PgMemoryContexts,
PgTupleDesc,
};
use url::Url;

use crate::{
arrow_parquet::{
Expand All @@ -34,7 +33,7 @@ use super::{
schema_parser::{
ensure_file_schema_match_tupledesc_schema, parse_arrow_schema_from_attributes,
},
uri_utils::parquet_reader_from_uri,
uri_utils::{parquet_reader_from_uri, ParsedUriInfo},
};

pub(crate) struct ParquetReaderContext {
Expand All @@ -50,15 +49,15 @@ pub(crate) struct ParquetReaderContext {
}

impl ParquetReaderContext {
pub(crate) fn new(uri: Url, match_by: MatchBy, tupledesc: &PgTupleDesc) -> Self {
pub(crate) fn new(uri_info: ParsedUriInfo, match_by: MatchBy, tupledesc: &PgTupleDesc) -> Self {
// Postgis and Map contexts are used throughout reading the parquet file.
// We need to reset them to avoid reading the stale data. (e.g. extension could be dropped)
reset_postgis_context();
reset_map_context();

error_if_copy_from_match_by_position_with_generated_columns(tupledesc, match_by);

let parquet_reader = parquet_reader_from_uri(&uri);
let parquet_reader = parquet_reader_from_uri(uri_info);

let parquet_file_schema = parquet_reader.schema();

Expand Down
10 changes: 6 additions & 4 deletions src/arrow_parquet/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use parquet::{
format::KeyValue,
};
use pgrx::{heap_tuple::PgHeapTuple, AllocatedByRust, PgTupleDesc};
use url::Url;

use crate::{
arrow_parquet::{
Expand All @@ -27,7 +26,10 @@ use crate::{
PG_BACKEND_TOKIO_RUNTIME,
};

use super::pg_to_arrow::{context::PgToArrowAttributeContext, to_arrow_array};
use super::{
pg_to_arrow::{context::PgToArrowAttributeContext, to_arrow_array},
uri_utils::ParsedUriInfo,
};

pub(crate) const DEFAULT_ROW_GROUP_SIZE: i64 = 122880;
pub(crate) const DEFAULT_ROW_GROUP_SIZE_BYTES: i64 = DEFAULT_ROW_GROUP_SIZE * 1024;
Expand All @@ -40,7 +42,7 @@ pub(crate) struct ParquetWriterContext {

impl ParquetWriterContext {
pub(crate) fn new(
uri: Url,
uri_info: ParsedUriInfo,
compression: PgParquetCompression,
compression_level: i32,
tupledesc: &PgTupleDesc,
Expand All @@ -62,7 +64,7 @@ impl ParquetWriterContext {

let writer_props = Self::writer_props(tupledesc, compression, compression_level);

let parquet_writer = parquet_writer_from_uri(&uri, schema.clone(), writer_props);
let parquet_writer = parquet_writer_from_uri(uri_info, schema.clone(), writer_props);

let attribute_contexts =
collect_pg_to_arrow_attribute_contexts(&attributes, &schema.fields);
Expand Down
106 changes: 88 additions & 18 deletions src/arrow_parquet/uri_utils.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{panic, sync::Arc};

use arrow::datatypes::SchemaRef;
use object_store::{path::Path, ObjectStoreScheme};
use parquet::{
arrow::{
arrow_to_parquet_schema,
Expand All @@ -19,20 +20,76 @@ use url::Url;

use crate::{
arrow_parquet::parquet_writer::DEFAULT_ROW_GROUP_SIZE,
object_store::object_store_cache::get_or_create_object_store, PG_BACKEND_TOKIO_RUNTIME,
object_store::{
aws::parse_s3_bucket, azure::parse_azure_blob_container,
object_store_cache::get_or_create_object_store,
},
PG_BACKEND_TOKIO_RUNTIME,
};

const PARQUET_OBJECT_STORE_READ_ROLE: &str = "parquet_object_store_read";
const PARQUET_OBJECT_STORE_WRITE_ROLE: &str = "parquet_object_store_write";

pub(crate) fn parse_uri(uri: &str) -> Url {
if !uri.contains("://") {
// local file
return Url::from_file_path(uri)
.unwrap_or_else(|_| panic!("not a valid file path: {}", uri));
// ParsedUriInfo is a struct that holds the parsed uri information.
#[derive(Debug, Clone)]
pub(crate) struct ParsedUriInfo {
pub(crate) uri: Url,
pub(crate) bucket: Option<String>,
pub(crate) path: Path,
pub(crate) scheme: ObjectStoreScheme,
}

impl ParsedUriInfo {
fn try_parse_uri(uri: &str) -> Result<Url, String> {
if !uri.contains("://") {
// local file
Url::from_file_path(uri).map_err(|_| format!("not a valid file path: {}", uri))
} else {
Url::parse(uri).map_err(|e| e.to_string())
}
}

Url::parse(uri).unwrap_or_else(|e| panic!("{}", e))
fn try_parse_scheme(uri: &Url) -> Result<(ObjectStoreScheme, Path), String> {
ObjectStoreScheme::parse(uri).map_err(|_| {
format!(
"unrecognized uri {}. pg_parquet supports local paths, s3:// or azure:// schemes.",
uri
)
})
}

fn try_parse_bucket(scheme: &ObjectStoreScheme, uri: &Url) -> Result<Option<String>, String> {
match scheme {
ObjectStoreScheme::AmazonS3 => parse_s3_bucket(uri)
.ok_or(format!("unsupported s3 uri {uri}"))
.map(Some),
ObjectStoreScheme::MicrosoftAzure => parse_azure_blob_container(uri)
.ok_or(format!("unsupported azure blob storage uri: {uri}"))
.map(Some),
ObjectStoreScheme::Local => Ok(None),
_ => Err(format!("unsupported scheme {} in uri {}. pg_parquet supports local paths, s3:// or azure:// schemes.",
uri.scheme(), uri))
}
}
}

impl TryFrom<&str> for ParsedUriInfo {
type Error = String;

fn try_from(uri: &str) -> Result<Self, Self::Error> {
let uri = Self::try_parse_uri(uri)?;

let (scheme, path) = Self::try_parse_scheme(&uri)?;

let bucket = Self::try_parse_bucket(&scheme, &uri)?;

Ok(ParsedUriInfo {
uri: uri.clone(),
bucket,
path,
scheme,
})
}
}

pub(crate) fn uri_as_string(uri: &Url) -> String {
Expand All @@ -48,24 +105,27 @@ pub(crate) fn uri_as_string(uri: &Url) -> String {
uri.to_string()
}

pub(crate) fn parquet_schema_from_uri(uri: &Url) -> SchemaDescriptor {
let parquet_reader = parquet_reader_from_uri(uri);
pub(crate) fn parquet_schema_from_uri(uri_info: ParsedUriInfo) -> SchemaDescriptor {
let parquet_reader = parquet_reader_from_uri(uri_info);

let arrow_schema = parquet_reader.schema();

arrow_to_parquet_schema(arrow_schema).unwrap_or_else(|e| panic!("{}", e))
}

pub(crate) fn parquet_metadata_from_uri(uri: &Url) -> Arc<ParquetMetaData> {
pub(crate) fn parquet_metadata_from_uri(uri_info: ParsedUriInfo) -> Arc<ParquetMetaData> {
let copy_from = true;
let (parquet_object_store, location) = get_or_create_object_store(uri, copy_from);
let (parquet_object_store, location) = get_or_create_object_store(uri_info.clone(), copy_from);

PG_BACKEND_TOKIO_RUNTIME.block_on(async {
let object_store_meta = parquet_object_store
.head(&location)
.await
.unwrap_or_else(|e| {
panic!("failed to get object store metadata for uri {}: {}", uri, e)
panic!(
"failed to get object store metadata for uri {}: {}",
uri_info.uri, e
)
});

let parquet_object_reader =
Expand All @@ -79,16 +139,21 @@ pub(crate) fn parquet_metadata_from_uri(uri: &Url) -> Arc<ParquetMetaData> {
})
}

pub(crate) fn parquet_reader_from_uri(uri: &Url) -> ParquetRecordBatchStream<ParquetObjectReader> {
pub(crate) fn parquet_reader_from_uri(
uri_info: ParsedUriInfo,
) -> ParquetRecordBatchStream<ParquetObjectReader> {
let copy_from = true;
let (parquet_object_store, location) = get_or_create_object_store(uri, copy_from);
let (parquet_object_store, location) = get_or_create_object_store(uri_info.clone(), copy_from);

PG_BACKEND_TOKIO_RUNTIME.block_on(async {
let object_store_meta = parquet_object_store
.head(&location)
.await
.unwrap_or_else(|e| {
panic!("failed to get object store metadata for uri {}: {}", uri, e)
panic!(
"failed to get object store metadata for uri {}: {}",
uri_info.uri, e
)
});

let parquet_object_reader =
Expand All @@ -108,17 +173,22 @@ pub(crate) fn parquet_reader_from_uri(uri: &Url) -> ParquetRecordBatchStream<Par
}

pub(crate) fn parquet_writer_from_uri(
uri: &Url,
uri_info: ParsedUriInfo,
arrow_schema: SchemaRef,
writer_props: WriterProperties,
) -> AsyncArrowWriter<ParquetObjectWriter> {
let copy_from = false;
let (parquet_object_store, location) = get_or_create_object_store(uri, copy_from);
let (parquet_object_store, location) = get_or_create_object_store(uri_info.clone(), copy_from);

let parquet_object_writer = ParquetObjectWriter::new(parquet_object_store, location);

AsyncArrowWriter::try_new(parquet_object_writer, arrow_schema, Some(writer_props))
.unwrap_or_else(|e| panic!("failed to create parquet writer for uri {}: {}", uri, e))
.unwrap_or_else(|e| {
panic!(
"failed to create parquet writer for uri {}: {}",
uri_info.uri, e
)
})
}

pub(crate) fn ensure_access_privilege_to_uri(uri: &Url, copy_from: bool) {
Expand Down
61 changes: 26 additions & 35 deletions src/object_store/object_store_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,22 @@ use once_cell::sync::Lazy;
use pgrx::{ereport, PgLogLevel, PgSqlErrorCode};
use url::Url;

use super::{
aws::parse_s3_bucket, azure::parse_azure_blob_container, create_azure_object_store,
create_local_file_object_store, create_s3_object_store,
};
use crate::arrow_parquet::uri_utils::ParsedUriInfo;

use super::{create_azure_object_store, create_local_file_object_store, create_s3_object_store};

// OBJECT_STORE_CACHE is a global cache for object stores per Postgres session.
// It caches object stores based on the scheme and bucket.
// Local paths are not cached.
static mut OBJECT_STORE_CACHE: Lazy<ObjectStoreCache> = Lazy::new(ObjectStoreCache::new);

pub(crate) fn get_or_create_object_store(
uri: &Url,
uri_info: ParsedUriInfo,
copy_from: bool,
) -> (Arc<dyn ObjectStore>, Path) {
#[allow(static_mut_refs)]
unsafe {
OBJECT_STORE_CACHE.get_or_create(uri, copy_from)
OBJECT_STORE_CACHE.get_or_create(uri_info, copy_from)
}
}

Expand All @@ -41,21 +40,30 @@ impl ObjectStoreCache {
}
}

fn get_or_create(&mut self, uri: &Url, copy_from: bool) -> (Arc<dyn ObjectStore>, Path) {
let (scheme, path) = ObjectStoreScheme::parse(uri).unwrap_or_else(|_| {
panic!(
"unrecognized uri {}. pg_parquet supports local paths, s3:// or azure:// schemes.",
uri
)
});

// no need to cache for local files
fn get_or_create(
&mut self,
uri_info: ParsedUriInfo,
copy_from: bool,
) -> (Arc<dyn ObjectStore>, Path) {
let ParsedUriInfo {
uri,
path,
scheme,
bucket,
} = uri_info;

// no need to cache local files
if scheme == ObjectStoreScheme::Local {
let item = Self::create(scheme, uri, copy_from);
let item = Self::create(scheme, &uri, copy_from);
return (item.object_store, path);
}

let key = ObjectStoreCacheKey::from_uri(uri, scheme.clone());
let bucket = bucket.expect("bucket is None");

let key = ObjectStoreCacheKey {
scheme: scheme.clone(),
bucket,
};

if let Some(item) = self.cache.get(&key) {
if item.expired(&key.bucket) {
Expand All @@ -65,7 +73,7 @@ impl ObjectStoreCache {
}
}

let item = Self::create(scheme, uri, copy_from);
let item = Self::create(scheme, &uri, copy_from);

self.cache.insert(key, item.clone());

Expand Down Expand Up @@ -126,23 +134,6 @@ struct ObjectStoreCacheKey {
bucket: String,
}

impl ObjectStoreCacheKey {
fn from_uri(uri: &Url, scheme: ObjectStoreScheme) -> Self {
let bucket = match scheme {
ObjectStoreScheme::AmazonS3 => parse_s3_bucket(uri).unwrap_or_else(|| panic!("unsupported s3 uri: {uri}")),
ObjectStoreScheme::MicrosoftAzure => parse_azure_blob_container(uri).unwrap_or_else(|| panic!("unsupported azure blob storage uri: {uri}")),
ObjectStoreScheme::Local => panic!("local paths should not be cached"),
_ => panic!(
"unsupported scheme {} in uri {}. pg_parquet supports local paths, s3:// or azure:// schemes.",
uri.scheme(),
uri
),
};

ObjectStoreCacheKey { scheme, bucket }
}
}

impl Hash for ObjectStoreCacheKey {
fn hash<H: Hasher>(&self, state: &mut H) {
let schema_tag = self.scheme.clone() as i32;
Expand Down
7 changes: 3 additions & 4 deletions src/parquet_copy_hook/copy_from.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@ use pgrx::{
},
void_mut_ptr, PgBox, PgLogLevel, PgRelation, PgSqlErrorCode,
};
use url::Url;

use crate::{
arrow_parquet::parquet_reader::ParquetReaderContext,
arrow_parquet::{parquet_reader::ParquetReaderContext, uri_utils::ParsedUriInfo},
parquet_copy_hook::copy_utils::{
copy_from_stmt_create_option_list, copy_stmt_lock_mode, copy_stmt_relation_oid,
},
Expand Down Expand Up @@ -114,7 +113,7 @@ pub(crate) fn execute_copy_from(
p_stmt: &PgBox<PlannedStmt>,
query_string: &CStr,
query_env: &PgBox<QueryEnvironment>,
uri: Url,
uri_info: ParsedUriInfo,
) -> u64 {
let rel_oid = copy_stmt_relation_oid(p_stmt);

Expand Down Expand Up @@ -142,7 +141,7 @@ pub(crate) fn execute_copy_from(

unsafe {
// parquet reader context is used throughout the COPY FROM operation.
let parquet_reader_context = ParquetReaderContext::new(uri, match_by, &tupledesc);
let parquet_reader_context = ParquetReaderContext::new(uri_info, match_by, &tupledesc);
push_parquet_reader_context(parquet_reader_context);

// makes sure to set binary format
Expand Down
Loading