Skip to content

Commit fa24da5

Browse files
committed
Support copying from glob patterns
1 parent 1b5878d commit fa24da5

File tree

6 files changed

+275
-103
lines changed

6 files changed

+275
-103
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ aws-config = { version = "1", default-features = false, features = ["rustls","rt
2727
aws-credential-types = {version = "1", default-features = false}
2828
azure_storage = {version = "0.21", default-features = false}
2929
futures = "0.3"
30+
glob = "0.3"
3031
home = "0.5"
3132
object_store = {version = "0.11", default-features = false, features = ["aws", "azure", "http"]}
3233
once_cell = "1"

src/arrow_parquet/parquet_reader.rs

Lines changed: 167 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
1-
use std::sync::Arc;
1+
use std::{
2+
ops::{Deref, DerefMut},
3+
sync::Arc,
4+
};
25

36
use arrow::array::RecordBatch;
47
use arrow_cast::{cast_with_options, CastOptions};
8+
use arrow_schema::SchemaRef;
59
use futures::StreamExt;
610
use parquet::arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStream};
711
use pgrx::{
@@ -22,6 +26,7 @@ use crate::{
2226
parquet_schema_string_from_attributes,
2327
},
2428
},
29+
parquet_udfs::list::list_uri,
2530
pgrx_utils::{collect_attributes_for, CollectAttributesFor},
2631
type_compat::{geometry::reset_postgis_context, map::reset_map_context},
2732
PG_BACKEND_TOKIO_RUNTIME,
@@ -36,18 +41,113 @@ use super::{
3641
uri_utils::{parquet_reader_from_uri, ParsedUriInfo},
3742
};
3843

44+
pub(crate) struct SingleParquetReader {
45+
reader: ParquetRecordBatchStream<ParquetObjectReader>,
46+
attribute_contexts: Vec<ArrowToPgAttributeContext>,
47+
match_by: MatchBy,
48+
}
49+
50+
impl SingleParquetReader {
51+
fn new(
52+
uri_info: ParsedUriInfo,
53+
match_by: MatchBy,
54+
tupledesc_schema: SchemaRef,
55+
attributes: &[FormData_pg_attribute],
56+
) -> Self {
57+
let reader = parquet_reader_from_uri(uri_info);
58+
59+
// Ensure that the file schema matches the tupledesc schema.
60+
// Gets cast_to_types for each attribute if a cast is needed for the attribute's columnar array
61+
// to match the expected columnar array for its tupledesc type.
62+
let cast_to_types = ensure_file_schema_match_tupledesc_schema(
63+
reader.schema().clone(),
64+
tupledesc_schema.clone(),
65+
attributes,
66+
match_by,
67+
);
68+
69+
let attribute_contexts = collect_arrow_to_pg_attribute_contexts(
70+
attributes,
71+
&tupledesc_schema.fields,
72+
Some(cast_to_types),
73+
);
74+
75+
SingleParquetReader {
76+
reader,
77+
attribute_contexts,
78+
match_by,
79+
}
80+
}
81+
82+
fn attribute_count(&self) -> usize {
83+
self.attribute_contexts.len()
84+
}
85+
86+
fn record_batch_to_tuple_datums(&self, record_batch: RecordBatch) -> Vec<Option<Datum>> {
87+
let mut datums = vec![];
88+
89+
for (attribute_idx, attribute_context) in self.attribute_contexts.iter().enumerate() {
90+
let name = attribute_context.name();
91+
92+
let column_array = match self.match_by {
93+
MatchBy::Position => record_batch
94+
.columns()
95+
.get(attribute_idx)
96+
.unwrap_or_else(|| panic!("column {} not found", name)),
97+
98+
MatchBy::Name => record_batch
99+
.column_by_name(name)
100+
.unwrap_or_else(|| panic!("column {} not found", name)),
101+
};
102+
103+
let datum = if attribute_context.needs_cast() {
104+
// should fail instead of returning None if the cast fails at runtime
105+
let cast_options = CastOptions {
106+
safe: false,
107+
..Default::default()
108+
};
109+
110+
let casted_column_array =
111+
cast_with_options(&column_array, attribute_context.data_type(), &cast_options)
112+
.unwrap_or_else(|e| panic!("failed to cast column {}: {}", name, e));
113+
114+
to_pg_datum(casted_column_array.to_data(), attribute_context)
115+
} else {
116+
to_pg_datum(column_array.to_data(), attribute_context)
117+
};
118+
119+
datums.push(datum);
120+
}
121+
122+
datums
123+
}
124+
}
125+
39126
pub(crate) struct ParquetReaderContext {
40127
buffer: Vec<u8>,
41128
offset: usize,
42129
started: bool,
43130
finished: bool,
44-
parquet_reader: ParquetRecordBatchStream<ParquetObjectReader>,
45-
attribute_contexts: Vec<ArrowToPgAttributeContext>,
131+
parquet_readers: Vec<SingleParquetReader>,
132+
current_parquet_reader_idx: usize,
46133
binary_out_funcs: Vec<PgBox<FmgrInfo>>,
47-
match_by: MatchBy,
48134
per_row_memory_ctx: PgMemoryContexts,
49135
}
50136

137+
impl Deref for ParquetReaderContext {
138+
type Target = SingleParquetReader;
139+
140+
fn deref(&self) -> &Self::Target {
141+
&self.parquet_readers[self.current_parquet_reader_idx]
142+
}
143+
}
144+
145+
impl DerefMut for ParquetReaderContext {
146+
fn deref_mut(&mut self) -> &mut Self::Target {
147+
&mut self.parquet_readers[self.current_parquet_reader_idx]
148+
}
149+
}
150+
51151
impl ParquetReaderContext {
52152
pub(crate) fn new(uri_info: ParsedUriInfo, match_by: MatchBy, tupledesc: &PgTupleDesc) -> Self {
53153
// Postgis and Map contexts are used throughout reading the parquet file.
@@ -57,10 +157,6 @@ impl ParquetReaderContext {
57157

58158
error_if_copy_from_match_by_position_with_generated_columns(tupledesc, match_by);
59159

60-
let parquet_reader = parquet_reader_from_uri(uri_info);
61-
62-
let parquet_file_schema = parquet_reader.schema();
63-
64160
let attributes = collect_attributes_for(CollectAttributesFor::CopyFrom, tupledesc);
65161

66162
pgrx::debug2!(
@@ -72,21 +168,29 @@ impl ParquetReaderContext {
72168

73169
let tupledesc_schema = Arc::new(tupledesc_schema);
74170

75-
// Ensure that the file schema matches the tupledesc schema.
76-
// Gets cast_to_types for each attribute if a cast is needed for the attribute's columnar array
77-
// to match the expected columnar array for its tupledesc type.
78-
let cast_to_types = ensure_file_schema_match_tupledesc_schema(
79-
parquet_file_schema.clone(),
80-
tupledesc_schema.clone(),
81-
&attributes,
82-
match_by,
83-
);
84-
85-
let attribute_contexts = collect_arrow_to_pg_attribute_contexts(
86-
&attributes,
87-
&tupledesc_schema.fields,
88-
Some(cast_to_types),
89-
);
171+
let parquet_readers = if uri_info.is_pattern() {
172+
list_uri(uri_info)
173+
.into_iter()
174+
.map(|(file_uri, _)| {
175+
let file_uri_info = ParsedUriInfo::try_from(file_uri.as_str())
176+
.unwrap_or_else(|e| panic!("failed to parse file uri {}: {}", file_uri, e));
177+
178+
SingleParquetReader::new(
179+
file_uri_info,
180+
match_by,
181+
tupledesc_schema.clone(),
182+
&attributes,
183+
)
184+
})
185+
.collect()
186+
} else {
187+
vec![SingleParquetReader::new(
188+
uri_info,
189+
match_by,
190+
tupledesc_schema.clone(),
191+
&attributes,
192+
)]
193+
};
90194

91195
let binary_out_funcs = Self::collect_binary_out_funcs(&attributes);
92196

@@ -95,10 +199,9 @@ impl ParquetReaderContext {
95199
ParquetReaderContext {
96200
buffer: Vec::new(),
97201
offset: 0,
98-
attribute_contexts,
99-
parquet_reader,
202+
parquet_readers,
203+
current_parquet_reader_idx: 0,
100204
binary_out_funcs,
101-
match_by,
102205
started: false,
103206
finished: false,
104207
per_row_memory_ctx,
@@ -128,47 +231,8 @@ impl ParquetReaderContext {
128231
}
129232
}
130233

131-
fn record_batch_to_tuple_datums(
132-
record_batch: RecordBatch,
133-
attribute_contexts: &[ArrowToPgAttributeContext],
134-
match_by: MatchBy,
135-
) -> Vec<Option<Datum>> {
136-
let mut datums = vec![];
137-
138-
for (attribute_idx, attribute_context) in attribute_contexts.iter().enumerate() {
139-
let name = attribute_context.name();
140-
141-
let column_array = match match_by {
142-
MatchBy::Position => record_batch
143-
.columns()
144-
.get(attribute_idx)
145-
.unwrap_or_else(|| panic!("column {} not found", name)),
146-
147-
MatchBy::Name => record_batch
148-
.column_by_name(name)
149-
.unwrap_or_else(|| panic!("column {} not found", name)),
150-
};
151-
152-
let datum = if attribute_context.needs_cast() {
153-
// should fail instead of returning None if the cast fails at runtime
154-
let cast_options = CastOptions {
155-
safe: false,
156-
..Default::default()
157-
};
158-
159-
let casted_column_array =
160-
cast_with_options(&column_array, attribute_context.data_type(), &cast_options)
161-
.unwrap_or_else(|e| panic!("failed to cast column {}: {}", name, e));
162-
163-
to_pg_datum(casted_column_array.to_data(), attribute_context)
164-
} else {
165-
to_pg_datum(column_array.to_data(), attribute_context)
166-
};
167-
168-
datums.push(datum);
169-
}
170-
171-
datums
234+
fn has_more_parquet_readers(&self) -> bool {
235+
self.current_parquet_reader_idx < self.parquet_readers.len() - 1
172236
}
173237

174238
pub(crate) fn read_parquet(&mut self) -> bool {
@@ -183,7 +247,7 @@ impl ParquetReaderContext {
183247

184248
// read a record batch from the parquet file. Record batch will contain
185249
// DEFAULT_BATCH_SIZE rows as we configured in the parquet reader.
186-
let record_batch = PG_BACKEND_TOKIO_RUNTIME.block_on(self.parquet_reader.next());
250+
let record_batch = PG_BACKEND_TOKIO_RUNTIME.block_on(self.reader.next());
187251

188252
if let Some(batch_result) = record_batch {
189253
let record_batch =
@@ -198,6 +262,10 @@ impl ParquetReaderContext {
198262
let record_batch = record_batch.slice(i, 1);
199263
self.copy_row(record_batch);
200264
}
265+
} else if self.has_more_parquet_readers() {
266+
// move to the next parquet reader
267+
self.current_parquet_reader_idx += 1;
268+
self.read_parquet();
201269
} else {
202270
// finish PG copy
203271
self.copy_finish();
@@ -208,43 +276,39 @@ impl ParquetReaderContext {
208276

209277
fn copy_row(&mut self, record_batch: RecordBatch) {
210278
unsafe {
211-
self.per_row_memory_ctx.switch_to(|_context| {
212-
/* 2 bytes: per-tuple header */
213-
let natts = self.attribute_contexts.len() as i16;
214-
let attnum_len_bytes = natts.to_be_bytes();
215-
self.buffer.extend_from_slice(&attnum_len_bytes);
216-
217-
// convert the columnar arrays in record batch to tuple datums
218-
let tuple_datums = Self::record_batch_to_tuple_datums(
219-
record_batch,
220-
&self.attribute_contexts,
221-
self.match_by,
222-
);
223-
224-
// write the tuple datums to the ParquetReader's internal buffer in PG copy format
225-
for (datum, out_func) in tuple_datums.into_iter().zip(self.binary_out_funcs.iter())
226-
{
227-
if let Some(datum) = datum {
228-
let datum_bytes: *mut varlena = SendFunctionCall(out_func.as_ptr(), datum);
229-
230-
/* 4 bytes: attribute's data size */
231-
let data_size = varsize_any_exhdr(datum_bytes);
232-
let data_size_bytes = (data_size as i32).to_be_bytes();
233-
self.buffer.extend_from_slice(&data_size_bytes);
234-
235-
/* variable bytes: attribute's data */
236-
let data = vardata_any(datum_bytes) as _;
237-
let data_bytes = std::slice::from_raw_parts(data, data_size);
238-
self.buffer.extend_from_slice(data_bytes);
239-
} else {
240-
/* 4 bytes: null */
241-
let null_value = -1_i32;
242-
let null_value_bytes = null_value.to_be_bytes();
243-
self.buffer.extend_from_slice(&null_value_bytes);
244-
}
279+
let mut old_ctx = self.per_row_memory_ctx.set_as_current();
280+
281+
/* 2 bytes: per-tuple header */
282+
let natts = self.attribute_count() as i16;
283+
let attnum_len_bytes = natts.to_be_bytes();
284+
self.buffer.extend_from_slice(&attnum_len_bytes);
285+
286+
// convert the columnar arrays in record batch to tuple datums
287+
let tuple_datums = self.record_batch_to_tuple_datums(record_batch);
288+
289+
// write the tuple datums to the ParquetReader's internal buffer in PG copy format
290+
for (datum, out_func) in tuple_datums.into_iter().zip(self.binary_out_funcs.iter()) {
291+
if let Some(datum) = datum {
292+
let datum_bytes: *mut varlena = SendFunctionCall(out_func.as_ptr(), datum);
293+
294+
/* 4 bytes: attribute's data size */
295+
let data_size = varsize_any_exhdr(datum_bytes);
296+
let data_size_bytes = (data_size as i32).to_be_bytes();
297+
self.buffer.extend_from_slice(&data_size_bytes);
298+
299+
/* variable bytes: attribute's data */
300+
let data = vardata_any(datum_bytes) as _;
301+
let data_bytes = std::slice::from_raw_parts(data, data_size);
302+
self.buffer.extend_from_slice(data_bytes);
303+
} else {
304+
/* 4 bytes: null */
305+
let null_value = -1_i32;
306+
let null_value_bytes = null_value.to_be_bytes();
307+
self.buffer.extend_from_slice(&null_value_bytes);
245308
}
246-
});
309+
}
247310

311+
old_ctx.set_as_current();
248312
self.per_row_memory_ctx.reset();
249313
};
250314
}

src/arrow_parquet/uri_utils.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,23 @@ impl ParsedUriInfo {
7373
uri.scheme(), uri))
7474
}
7575
}
76+
77+
pub(crate) fn base_uri(&self) -> String {
78+
if self.uri.scheme() == "file" {
79+
// root path for local file
80+
return "/".to_string();
81+
}
82+
83+
format!(
84+
"{}://{}",
85+
self.uri.scheme(),
86+
self.uri.host().expect("missing host")
87+
)
88+
}
89+
90+
pub(crate) fn is_pattern(&self) -> bool {
91+
self.path.to_string().contains('*') || self.path.to_string().contains("**")
92+
}
7693
}
7794

7895
impl TryFrom<&str> for ParsedUriInfo {

src/parquet_udfs.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
1+
pub(crate) mod list;
12
pub(crate) mod metadata;
23
pub(crate) mod schema;

0 commit comments

Comments
 (0)