Skip to content

Commit 2f0e2a8

Browse files
fix: skip if unrecognized or unsupported uri instead of throwing errors (#109)
1 parent 3ff46d5 commit 2f0e2a8

File tree

11 files changed

+234
-120
lines changed

11 files changed

+234
-120
lines changed

src/arrow_parquet/parquet_reader.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ use pgrx::{
1313
vardata_any, varsize_any_exhdr, void_mut_ptr, AllocatedByPostgres, PgBox, PgMemoryContexts,
1414
PgTupleDesc,
1515
};
16-
use url::Url;
1716

1817
use crate::{
1918
arrow_parquet::{
@@ -34,7 +33,7 @@ use super::{
3433
schema_parser::{
3534
ensure_file_schema_match_tupledesc_schema, parse_arrow_schema_from_attributes,
3635
},
37-
uri_utils::parquet_reader_from_uri,
36+
uri_utils::{parquet_reader_from_uri, ParsedUriInfo},
3837
};
3938

4039
pub(crate) struct ParquetReaderContext {
@@ -50,15 +49,15 @@ pub(crate) struct ParquetReaderContext {
5049
}
5150

5251
impl ParquetReaderContext {
53-
pub(crate) fn new(uri: Url, match_by: MatchBy, tupledesc: &PgTupleDesc) -> Self {
52+
pub(crate) fn new(uri_info: ParsedUriInfo, match_by: MatchBy, tupledesc: &PgTupleDesc) -> Self {
5453
// Postgis and Map contexts are used throughout reading the parquet file.
5554
// We need to reset them to avoid reading the stale data. (e.g. extension could be dropped)
5655
reset_postgis_context();
5756
reset_map_context();
5857

5958
error_if_copy_from_match_by_position_with_generated_columns(tupledesc, match_by);
6059

61-
let parquet_reader = parquet_reader_from_uri(&uri);
60+
let parquet_reader = parquet_reader_from_uri(uri_info);
6261

6362
let parquet_file_schema = parquet_reader.schema();
6463

src/arrow_parquet/parquet_writer.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ use parquet::{
88
format::KeyValue,
99
};
1010
use pgrx::{heap_tuple::PgHeapTuple, AllocatedByRust, PgTupleDesc};
11-
use url::Url;
1211

1312
use crate::{
1413
arrow_parquet::{
@@ -27,7 +26,10 @@ use crate::{
2726
PG_BACKEND_TOKIO_RUNTIME,
2827
};
2928

30-
use super::pg_to_arrow::{context::PgToArrowAttributeContext, to_arrow_array};
29+
use super::{
30+
pg_to_arrow::{context::PgToArrowAttributeContext, to_arrow_array},
31+
uri_utils::ParsedUriInfo,
32+
};
3133

3234
pub(crate) const DEFAULT_ROW_GROUP_SIZE: i64 = 122880;
3335
pub(crate) const DEFAULT_ROW_GROUP_SIZE_BYTES: i64 = DEFAULT_ROW_GROUP_SIZE * 1024;
@@ -40,7 +42,7 @@ pub(crate) struct ParquetWriterContext {
4042

4143
impl ParquetWriterContext {
4244
pub(crate) fn new(
43-
uri: Url,
45+
uri_info: ParsedUriInfo,
4446
compression: PgParquetCompression,
4547
compression_level: i32,
4648
tupledesc: &PgTupleDesc,
@@ -62,7 +64,7 @@ impl ParquetWriterContext {
6264

6365
let writer_props = Self::writer_props(tupledesc, compression, compression_level);
6466

65-
let parquet_writer = parquet_writer_from_uri(&uri, schema.clone(), writer_props);
67+
let parquet_writer = parquet_writer_from_uri(uri_info, schema.clone(), writer_props);
6668

6769
let attribute_contexts =
6870
collect_pg_to_arrow_attribute_contexts(&attributes, &schema.fields);

src/arrow_parquet/uri_utils.rs

Lines changed: 88 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::{panic, sync::Arc};
22

33
use arrow::datatypes::SchemaRef;
4+
use object_store::{path::Path, ObjectStoreScheme};
45
use parquet::{
56
arrow::{
67
arrow_to_parquet_schema,
@@ -19,20 +20,76 @@ use url::Url;
1920

2021
use crate::{
2122
arrow_parquet::parquet_writer::DEFAULT_ROW_GROUP_SIZE,
22-
object_store::object_store_cache::get_or_create_object_store, PG_BACKEND_TOKIO_RUNTIME,
23+
object_store::{
24+
aws::parse_s3_bucket, azure::parse_azure_blob_container,
25+
object_store_cache::get_or_create_object_store,
26+
},
27+
PG_BACKEND_TOKIO_RUNTIME,
2328
};
2429

2530
const PARQUET_OBJECT_STORE_READ_ROLE: &str = "parquet_object_store_read";
2631
const PARQUET_OBJECT_STORE_WRITE_ROLE: &str = "parquet_object_store_write";
2732

28-
pub(crate) fn parse_uri(uri: &str) -> Url {
29-
if !uri.contains("://") {
30-
// local file
31-
return Url::from_file_path(uri)
32-
.unwrap_or_else(|_| panic!("not a valid file path: {}", uri));
33+
// ParsedUriInfo is a struct that holds the parsed uri information.
34+
#[derive(Debug, Clone)]
35+
pub(crate) struct ParsedUriInfo {
36+
pub(crate) uri: Url,
37+
pub(crate) bucket: Option<String>,
38+
pub(crate) path: Path,
39+
pub(crate) scheme: ObjectStoreScheme,
40+
}
41+
42+
impl ParsedUriInfo {
43+
fn try_parse_uri(uri: &str) -> Result<Url, String> {
44+
if !uri.contains("://") {
45+
// local file
46+
Url::from_file_path(uri).map_err(|_| format!("not a valid file path: {}", uri))
47+
} else {
48+
Url::parse(uri).map_err(|e| e.to_string())
49+
}
3350
}
3451

35-
Url::parse(uri).unwrap_or_else(|e| panic!("{}", e))
52+
fn try_parse_scheme(uri: &Url) -> Result<(ObjectStoreScheme, Path), String> {
53+
ObjectStoreScheme::parse(uri).map_err(|_| {
54+
format!(
55+
"unrecognized uri {}. pg_parquet supports local paths, s3:// or azure:// schemes.",
56+
uri
57+
)
58+
})
59+
}
60+
61+
fn try_parse_bucket(scheme: &ObjectStoreScheme, uri: &Url) -> Result<Option<String>, String> {
62+
match scheme {
63+
ObjectStoreScheme::AmazonS3 => parse_s3_bucket(uri)
64+
.ok_or(format!("unsupported s3 uri {uri}"))
65+
.map(Some),
66+
ObjectStoreScheme::MicrosoftAzure => parse_azure_blob_container(uri)
67+
.ok_or(format!("unsupported azure blob storage uri: {uri}"))
68+
.map(Some),
69+
ObjectStoreScheme::Local => Ok(None),
70+
_ => Err(format!("unsupported scheme {} in uri {}. pg_parquet supports local paths, s3:// or azure:// schemes.",
71+
uri.scheme(), uri))
72+
}
73+
}
74+
}
75+
76+
impl TryFrom<&str> for ParsedUriInfo {
77+
type Error = String;
78+
79+
fn try_from(uri: &str) -> Result<Self, Self::Error> {
80+
let uri = Self::try_parse_uri(uri)?;
81+
82+
let (scheme, path) = Self::try_parse_scheme(&uri)?;
83+
84+
let bucket = Self::try_parse_bucket(&scheme, &uri)?;
85+
86+
Ok(ParsedUriInfo {
87+
uri: uri.clone(),
88+
bucket,
89+
path,
90+
scheme,
91+
})
92+
}
3693
}
3794

3895
pub(crate) fn uri_as_string(uri: &Url) -> String {
@@ -48,24 +105,27 @@ pub(crate) fn uri_as_string(uri: &Url) -> String {
48105
uri.to_string()
49106
}
50107

51-
pub(crate) fn parquet_schema_from_uri(uri: &Url) -> SchemaDescriptor {
52-
let parquet_reader = parquet_reader_from_uri(uri);
108+
pub(crate) fn parquet_schema_from_uri(uri_info: ParsedUriInfo) -> SchemaDescriptor {
109+
let parquet_reader = parquet_reader_from_uri(uri_info);
53110

54111
let arrow_schema = parquet_reader.schema();
55112

56113
arrow_to_parquet_schema(arrow_schema).unwrap_or_else(|e| panic!("{}", e))
57114
}
58115

59-
pub(crate) fn parquet_metadata_from_uri(uri: &Url) -> Arc<ParquetMetaData> {
116+
pub(crate) fn parquet_metadata_from_uri(uri_info: ParsedUriInfo) -> Arc<ParquetMetaData> {
60117
let copy_from = true;
61-
let (parquet_object_store, location) = get_or_create_object_store(uri, copy_from);
118+
let (parquet_object_store, location) = get_or_create_object_store(uri_info.clone(), copy_from);
62119

63120
PG_BACKEND_TOKIO_RUNTIME.block_on(async {
64121
let object_store_meta = parquet_object_store
65122
.head(&location)
66123
.await
67124
.unwrap_or_else(|e| {
68-
panic!("failed to get object store metadata for uri {}: {}", uri, e)
125+
panic!(
126+
"failed to get object store metadata for uri {}: {}",
127+
uri_info.uri, e
128+
)
69129
});
70130

71131
let parquet_object_reader =
@@ -79,16 +139,21 @@ pub(crate) fn parquet_metadata_from_uri(uri: &Url) -> Arc<ParquetMetaData> {
79139
})
80140
}
81141

82-
pub(crate) fn parquet_reader_from_uri(uri: &Url) -> ParquetRecordBatchStream<ParquetObjectReader> {
142+
pub(crate) fn parquet_reader_from_uri(
143+
uri_info: ParsedUriInfo,
144+
) -> ParquetRecordBatchStream<ParquetObjectReader> {
83145
let copy_from = true;
84-
let (parquet_object_store, location) = get_or_create_object_store(uri, copy_from);
146+
let (parquet_object_store, location) = get_or_create_object_store(uri_info.clone(), copy_from);
85147

86148
PG_BACKEND_TOKIO_RUNTIME.block_on(async {
87149
let object_store_meta = parquet_object_store
88150
.head(&location)
89151
.await
90152
.unwrap_or_else(|e| {
91-
panic!("failed to get object store metadata for uri {}: {}", uri, e)
153+
panic!(
154+
"failed to get object store metadata for uri {}: {}",
155+
uri_info.uri, e
156+
)
92157
});
93158

94159
let parquet_object_reader =
@@ -108,17 +173,22 @@ pub(crate) fn parquet_reader_from_uri(uri: &Url) -> ParquetRecordBatchStream<Par
108173
}
109174

110175
pub(crate) fn parquet_writer_from_uri(
111-
uri: &Url,
176+
uri_info: ParsedUriInfo,
112177
arrow_schema: SchemaRef,
113178
writer_props: WriterProperties,
114179
) -> AsyncArrowWriter<ParquetObjectWriter> {
115180
let copy_from = false;
116-
let (parquet_object_store, location) = get_or_create_object_store(uri, copy_from);
181+
let (parquet_object_store, location) = get_or_create_object_store(uri_info.clone(), copy_from);
117182

118183
let parquet_object_writer = ParquetObjectWriter::new(parquet_object_store, location);
119184

120185
AsyncArrowWriter::try_new(parquet_object_writer, arrow_schema, Some(writer_props))
121-
.unwrap_or_else(|e| panic!("failed to create parquet writer for uri {}: {}", uri, e))
186+
.unwrap_or_else(|e| {
187+
panic!(
188+
"failed to create parquet writer for uri {}: {}",
189+
uri_info.uri, e
190+
)
191+
})
122192
}
123193

124194
pub(crate) fn ensure_access_privilege_to_uri(uri: &Url, copy_from: bool) {

src/object_store/object_store_cache.rs

Lines changed: 26 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -10,23 +10,22 @@ use once_cell::sync::Lazy;
1010
use pgrx::{ereport, PgLogLevel, PgSqlErrorCode};
1111
use url::Url;
1212

13-
use super::{
14-
aws::parse_s3_bucket, azure::parse_azure_blob_container, create_azure_object_store,
15-
create_local_file_object_store, create_s3_object_store,
16-
};
13+
use crate::arrow_parquet::uri_utils::ParsedUriInfo;
14+
15+
use super::{create_azure_object_store, create_local_file_object_store, create_s3_object_store};
1716

1817
// OBJECT_STORE_CACHE is a global cache for object stores per Postgres session.
1918
// It caches object stores based on the scheme and bucket.
2019
// Local paths are not cached.
2120
static mut OBJECT_STORE_CACHE: Lazy<ObjectStoreCache> = Lazy::new(ObjectStoreCache::new);
2221

2322
pub(crate) fn get_or_create_object_store(
24-
uri: &Url,
23+
uri_info: ParsedUriInfo,
2524
copy_from: bool,
2625
) -> (Arc<dyn ObjectStore>, Path) {
2726
#[allow(static_mut_refs)]
2827
unsafe {
29-
OBJECT_STORE_CACHE.get_or_create(uri, copy_from)
28+
OBJECT_STORE_CACHE.get_or_create(uri_info, copy_from)
3029
}
3130
}
3231

@@ -41,21 +40,30 @@ impl ObjectStoreCache {
4140
}
4241
}
4342

44-
fn get_or_create(&mut self, uri: &Url, copy_from: bool) -> (Arc<dyn ObjectStore>, Path) {
45-
let (scheme, path) = ObjectStoreScheme::parse(uri).unwrap_or_else(|_| {
46-
panic!(
47-
"unrecognized uri {}. pg_parquet supports local paths, s3:// or azure:// schemes.",
48-
uri
49-
)
50-
});
51-
52-
// no need to cache for local files
43+
fn get_or_create(
44+
&mut self,
45+
uri_info: ParsedUriInfo,
46+
copy_from: bool,
47+
) -> (Arc<dyn ObjectStore>, Path) {
48+
let ParsedUriInfo {
49+
uri,
50+
path,
51+
scheme,
52+
bucket,
53+
} = uri_info;
54+
55+
// no need to cache local files
5356
if scheme == ObjectStoreScheme::Local {
54-
let item = Self::create(scheme, uri, copy_from);
57+
let item = Self::create(scheme, &uri, copy_from);
5558
return (item.object_store, path);
5659
}
5760

58-
let key = ObjectStoreCacheKey::from_uri(uri, scheme.clone());
61+
let bucket = bucket.expect("bucket is None");
62+
63+
let key = ObjectStoreCacheKey {
64+
scheme: scheme.clone(),
65+
bucket,
66+
};
5967

6068
if let Some(item) = self.cache.get(&key) {
6169
if item.expired(&key.bucket) {
@@ -65,7 +73,7 @@ impl ObjectStoreCache {
6573
}
6674
}
6775

68-
let item = Self::create(scheme, uri, copy_from);
76+
let item = Self::create(scheme, &uri, copy_from);
6977

7078
self.cache.insert(key, item.clone());
7179

@@ -126,23 +134,6 @@ struct ObjectStoreCacheKey {
126134
bucket: String,
127135
}
128136

129-
impl ObjectStoreCacheKey {
130-
fn from_uri(uri: &Url, scheme: ObjectStoreScheme) -> Self {
131-
let bucket = match scheme {
132-
ObjectStoreScheme::AmazonS3 => parse_s3_bucket(uri).unwrap_or_else(|| panic!("unsupported s3 uri: {uri}")),
133-
ObjectStoreScheme::MicrosoftAzure => parse_azure_blob_container(uri).unwrap_or_else(|| panic!("unsupported azure blob storage uri: {uri}")),
134-
ObjectStoreScheme::Local => panic!("local paths should not be cached"),
135-
_ => panic!(
136-
"unsupported scheme {} in uri {}. pg_parquet supports local paths, s3:// or azure:// schemes.",
137-
uri.scheme(),
138-
uri
139-
),
140-
};
141-
142-
ObjectStoreCacheKey { scheme, bucket }
143-
}
144-
}
145-
146137
impl Hash for ObjectStoreCacheKey {
147138
fn hash<H: Hasher>(&self, state: &mut H) {
148139
let schema_tag = self.scheme.clone() as i32;

src/parquet_copy_hook/copy_from.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,9 @@ use pgrx::{
1010
},
1111
void_mut_ptr, PgBox, PgLogLevel, PgRelation, PgSqlErrorCode,
1212
};
13-
use url::Url;
1413

1514
use crate::{
16-
arrow_parquet::parquet_reader::ParquetReaderContext,
15+
arrow_parquet::{parquet_reader::ParquetReaderContext, uri_utils::ParsedUriInfo},
1716
parquet_copy_hook::copy_utils::{
1817
copy_from_stmt_create_option_list, copy_stmt_lock_mode, copy_stmt_relation_oid,
1918
},
@@ -114,7 +113,7 @@ pub(crate) fn execute_copy_from(
114113
p_stmt: &PgBox<PlannedStmt>,
115114
query_string: &CStr,
116115
query_env: &PgBox<QueryEnvironment>,
117-
uri: Url,
116+
uri_info: ParsedUriInfo,
118117
) -> u64 {
119118
let rel_oid = copy_stmt_relation_oid(p_stmt);
120119

@@ -142,7 +141,7 @@ pub(crate) fn execute_copy_from(
142141

143142
unsafe {
144143
// parquet reader context is used throughout the COPY FROM operation.
145-
let parquet_reader_context = ParquetReaderContext::new(uri, match_by, &tupledesc);
144+
let parquet_reader_context = ParquetReaderContext::new(uri_info, match_by, &tupledesc);
146145
push_parquet_reader_context(parquet_reader_context);
147146

148147
// makes sure to set binary format

0 commit comments

Comments
 (0)