Skip to content

Commit 92971fe

Browse files
committed
do arrow-parquet conversions per batch size
1 parent 2c06724 commit 92971fe

File tree

4 files changed

+76
-45
lines changed

4 files changed

+76
-45
lines changed

src/arrow_parquet/parquet_writer.rs

Lines changed: 27 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,14 @@ use url::Url;
1212

1313
use crate::{
1414
arrow_parquet::{
15-
compression::{PgParquetCompression, PgParquetCompressionWithLevel},
15+
compression::PgParquetCompressionWithLevel,
1616
pg_to_arrow::context::collect_pg_to_arrow_attribute_contexts,
1717
schema_parser::{
1818
parquet_schema_string_from_attributes, parse_arrow_schema_from_attributes,
1919
},
2020
uri_utils::parquet_writer_from_uri,
2121
},
22+
parquet_copy_hook::copy_to_split_dest_receiver::CopyToParquetOptions,
2223
pgrx_utils::{collect_attributes_for, CollectAttributesFor},
2324
type_compat::{
2425
geometry::{geoparquet_metadata_json_from_tupledesc, reset_postgis_context},
@@ -36,13 +37,13 @@ pub(crate) struct ParquetWriterContext {
3637
parquet_writer: AsyncArrowWriter<ParquetObjectWriter>,
3738
schema: SchemaRef,
3839
attribute_contexts: Vec<PgToArrowAttributeContext>,
40+
options: CopyToParquetOptions,
3941
}
4042

4143
impl ParquetWriterContext {
4244
pub(crate) fn new(
4345
uri: Url,
44-
compression: PgParquetCompression,
45-
compression_level: i32,
46+
options: CopyToParquetOptions,
4647
tupledesc: &PgTupleDesc,
4748
) -> ParquetWriterContext {
4849
// Postgis and Map contexts are used throughout writing the parquet file.
@@ -60,7 +61,7 @@ impl ParquetWriterContext {
6061
let schema = parse_arrow_schema_from_attributes(&attributes);
6162
let schema = Arc::new(schema);
6263

63-
let writer_props = Self::writer_props(tupledesc, compression, compression_level);
64+
let writer_props = Self::writer_props(tupledesc, options);
6465

6566
let parquet_writer = parquet_writer_from_uri(&uri, schema.clone(), writer_props);
6667

@@ -71,22 +72,20 @@ impl ParquetWriterContext {
7172
parquet_writer,
7273
schema,
7374
attribute_contexts,
75+
options,
7476
}
7577
}
7678

77-
fn writer_props(
78-
tupledesc: &PgTupleDesc,
79-
compression: PgParquetCompression,
80-
compression_level: i32,
81-
) -> WriterProperties {
79+
fn writer_props(tupledesc: &PgTupleDesc, options: CopyToParquetOptions) -> WriterProperties {
8280
let compression = PgParquetCompressionWithLevel {
83-
compression,
84-
compression_level,
81+
compression: options.compression,
82+
compression_level: options.compression_level,
8583
};
8684

8785
let mut writer_props_builder = WriterProperties::builder()
8886
.set_statistics_enabled(EnabledStatistics::Page)
8987
.set_compression(compression.into())
88+
.set_max_row_group_size(options.row_group_size as usize)
9089
.set_created_by("pg_parquet".to_string());
9190

9291
let geometry_columns_metadata_value = geoparquet_metadata_json_from_tupledesc(tupledesc);
@@ -101,10 +100,9 @@ impl ParquetWriterContext {
101100
writer_props_builder.build()
102101
}
103102

104-
pub(crate) fn write_new_row_group(
105-
&mut self,
106-
tuples: Vec<Option<PgHeapTuple<AllocatedByRust>>>,
107-
) {
103+
// write_tuples writes the tuples to the parquet file. It flushes the in progress rows to a new row group
104+
// if the row group size is reached.
105+
pub(crate) fn write_tuples(&mut self, tuples: Vec<Option<PgHeapTuple<AllocatedByRust>>>) {
108106
let record_batch =
109107
Self::pg_tuples_to_record_batch(tuples, &self.attribute_contexts, self.schema.clone());
110108

@@ -114,9 +112,20 @@ impl ParquetWriterContext {
114112
.block_on(parquet_writer.write(&record_batch))
115113
.unwrap_or_else(|e| panic!("failed to write record batch: {}", e));
116114

115+
if parquet_writer.in_progress_rows() >= self.options.row_group_size as _
116+
|| parquet_writer.in_progress_size() >= self.options.row_group_size_bytes as _
117+
{
118+
PG_BACKEND_TOKIO_RUNTIME
119+
.block_on(parquet_writer.flush())
120+
.unwrap_or_else(|e| panic!("failed to flush record batch: {}", e));
121+
}
122+
}
123+
124+
// finalize flushes the in progress rows to a new row group and finally writes metadata to the file.
125+
fn finalize(&mut self) {
117126
PG_BACKEND_TOKIO_RUNTIME
118-
.block_on(parquet_writer.flush())
119-
.unwrap_or_else(|e| panic!("failed to flush record batch: {}", e));
127+
.block_on(self.parquet_writer.finish())
128+
.unwrap_or_else(|e| panic!("failed to finish parquet writer: {}", e));
120129
}
121130

122131
pub(crate) fn bytes_written(&self) -> usize {
@@ -142,10 +151,6 @@ impl ParquetWriterContext {
142151

143152
impl Drop for ParquetWriterContext {
144153
fn drop(&mut self) {
145-
PG_BACKEND_TOKIO_RUNTIME
146-
.block_on(self.parquet_writer.finish())
147-
.unwrap_or_else(|e| {
148-
panic!("failed to close parquet writer: {}", e);
149-
});
154+
self.finalize();
150155
}
151156
}

src/arrow_parquet/uri_utils.rs

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ use pgrx::{
1818
use url::Url;
1919

2020
use crate::{
21-
arrow_parquet::parquet_writer::DEFAULT_ROW_GROUP_SIZE,
2221
object_store::object_store_cache::get_or_create_object_store, PG_BACKEND_TOKIO_RUNTIME,
2322
};
2423

@@ -79,6 +78,9 @@ pub(crate) fn parquet_metadata_from_uri(uri: &Url) -> Arc<ParquetMetaData> {
7978
})
8079
}
8180

81+
// default # of records per batch during arrow-parquet conversions (RecordBatch api)
82+
pub(crate) const RECORD_BATCH_SIZE: i64 = 1024;
83+
8284
pub(crate) fn parquet_reader_from_uri(uri: &Url) -> ParquetRecordBatchStream<ParquetObjectReader> {
8385
let copy_from = true;
8486
let (parquet_object_store, location) = get_or_create_object_store(uri, copy_from);
@@ -100,13 +102,36 @@ pub(crate) fn parquet_reader_from_uri(uri: &Url) -> ParquetRecordBatchStream<Par
100102

101103
pgrx::debug2!("Converted arrow schema is: {}", builder.schema());
102104

105+
let batch_size = calculate_reader_batch_size(builder.metadata());
106+
103107
builder
104-
.with_batch_size(DEFAULT_ROW_GROUP_SIZE as usize)
108+
.with_batch_size(batch_size)
105109
.build()
106110
.unwrap_or_else(|e| panic!("{}", e))
107111
})
108112
}
109113

114+
fn calculate_reader_batch_size(metadata: &Arc<ParquetMetaData>) -> usize {
115+
const MAX_ARROW_ARRAY_SIZE: i64 = i32::MAX as _;
116+
117+
for row_group in metadata.row_groups() {
118+
for column in row_group.columns() {
119+
// try our best to get the size of the column
120+
let column_size = column
121+
.unencoded_byte_array_data_bytes()
122+
.unwrap_or(column.uncompressed_size());
123+
124+
if column_size > MAX_ARROW_ARRAY_SIZE {
125+
// to prevent decoding large arrays into memory, process one row at a time
126+
return 1;
127+
}
128+
}
129+
}
130+
131+
// default batch size
132+
RECORD_BATCH_SIZE as _
133+
}
134+
110135
pub(crate) fn parquet_writer_from_uri(
111136
uri: &Url,
112137
arrow_schema: SchemaRef,

src/parquet_copy_hook/copy_to_dest_receiver.rs

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,10 @@ use pg_sys::{
99
};
1010
use pgrx::{prelude::*, FromDatum, PgList, PgMemoryContexts, PgTupleDesc};
1111

12-
use crate::arrow_parquet::{parquet_writer::ParquetWriterContext, uri_utils::parse_uri};
12+
use crate::arrow_parquet::{
13+
parquet_writer::ParquetWriterContext,
14+
uri_utils::{parse_uri, RECORD_BATCH_SIZE},
15+
};
1316

1417
use super::copy_to_split_dest_receiver::CopyToParquetOptions;
1518

@@ -22,6 +25,7 @@ pub(crate) struct CopyToParquetDestReceiver {
2225
collected_tuple_count: i64,
2326
collected_tuple_size: i64,
2427
collected_tuple_column_sizes: *mut i64,
28+
target_batch_size: i64,
2529
uri: *const c_char,
2630
copy_options: CopyToParquetOptions,
2731
per_copy_context: MemoryContext,
@@ -64,14 +68,6 @@ impl CopyToParquetDestReceiver {
6468
};
6569
}
6670

67-
fn collected_tuples_exceeds_row_group_size(&self) -> bool {
68-
self.collected_tuple_count >= self.copy_options.row_group_size
69-
}
70-
71-
fn collected_tuples_exceeds_row_group_size_bytes(&self) -> bool {
72-
self.collected_tuple_size >= self.copy_options.row_group_size_bytes
73-
}
74-
7571
fn collected_tuples_exceeds_max_col_size(&self, tuple_column_sizes: &[i32]) -> bool {
7672
const MAX_ARROW_ARRAY_SIZE: i64 = i32::MAX as _;
7773

@@ -118,7 +114,7 @@ impl CopyToParquetDestReceiver {
118114
.as_mut()
119115
.expect("parquet writer context is not found")
120116
};
121-
current_parquet_writer_context.write_new_row_group(tuples);
117+
current_parquet_writer_context.write_tuples(tuples);
122118

123119
self.reset_collected_tuples();
124120
}
@@ -171,19 +167,22 @@ pub(crate) extern "C" fn copy_startup(
171167
};
172168
parquet_dest.natts = tupledesc.len();
173169

170+
parquet_dest.target_batch_size = if parquet_dest.copy_options.row_group_size < RECORD_BATCH_SIZE
171+
{
172+
parquet_dest.copy_options.row_group_size
173+
} else {
174+
RECORD_BATCH_SIZE
175+
};
176+
174177
let uri = unsafe { CStr::from_ptr(parquet_dest.uri) }
175178
.to_str()
176179
.expect("uri is not a valid C string");
177180

178181
let uri = parse_uri(uri);
179182

180-
let compression = parquet_dest.copy_options.compression;
181-
182-
let compression_level = parquet_dest.copy_options.compression_level;
183-
184183
// leak the parquet writer context since it will be used during the COPY operation
185184
let parquet_writer_context =
186-
ParquetWriterContext::new(uri, compression, compression_level, &tupledesc);
185+
ParquetWriterContext::new(uri, parquet_dest.copy_options, &tupledesc);
187186
parquet_dest.parquet_writer_context = Box::into_raw(Box::new(parquet_writer_context));
188187
}
189188

@@ -218,6 +217,9 @@ pub(crate) extern "C" fn copy_receive(slot: *mut TupleTableSlot, dest: *mut Dest
218217

219218
let column_sizes = tuple_column_sizes(&datums, &tupledesc);
220219

220+
// we use arrow arrays as intermediate format when writing to parquet.
221+
// To not hit into arrow array size limit, write the tuples before
222+
// collecting new one into the batch.
221223
if parquet_dest.collected_tuples_exceeds_max_col_size(&column_sizes) {
222224
parquet_dest.write_tuples_to_parquet();
223225
}
@@ -227,9 +229,7 @@ pub(crate) extern "C" fn copy_receive(slot: *mut TupleTableSlot, dest: *mut Dest
227229

228230
parquet_dest.collect_tuple(heap_tuple, column_sizes);
229231

230-
if parquet_dest.collected_tuples_exceeds_row_group_size()
231-
|| parquet_dest.collected_tuples_exceeds_row_group_size_bytes()
232-
{
232+
if parquet_dest.collected_tuple_count == parquet_dest.target_batch_size {
233233
parquet_dest.write_tuples_to_parquet();
234234
}
235235
});
@@ -327,6 +327,7 @@ pub(crate) fn create_copy_to_parquet_dest_receiver(
327327
parquet_dest.collected_tuple_count = 0;
328328
parquet_dest.collected_tuples = std::ptr::null_mut();
329329
parquet_dest.collected_tuple_column_sizes = std::ptr::null_mut();
330+
parquet_dest.target_batch_size = 0;
330331
parquet_dest.copy_options = options;
331332
parquet_dest.per_copy_context = per_copy_context;
332333

src/pgrx_tests/copy_options.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,7 @@ mod tests {
331331
results
332332
});
333333

334-
assert_eq!(result_metadata, vec![2]);
334+
assert_eq!(result_metadata, vec![1]);
335335
}
336336

337337
#[pg_test]
@@ -381,7 +381,7 @@ mod tests {
381381

382382
let id_bytes = 4;
383383
let name_bytes = 1;
384-
let total_rows_size_bytes = (id_bytes + name_bytes) * 1_000_000;
384+
let total_rows_size_bytes = (id_bytes + name_bytes) * 1024 * 1024;
385385

386386
let row_group_size_bytes = total_rows_size_bytes / 10;
387387

@@ -409,7 +409,7 @@ mod tests {
409409
results
410410
});
411411

412-
assert_eq!(result_metadata, vec![10]);
412+
assert_eq!(result_metadata, vec![9]);
413413
}
414414

415415
#[pg_test]

0 commit comments

Comments
 (0)