Skip to content

Commit 5b5725d

Browse files
committed
do arrow-parquet conversions per batch size
1 parent a5182bf commit 5b5725d

File tree

4 files changed

+76
-45
lines changed

4 files changed

+76
-45
lines changed

src/arrow_parquet/parquet_writer.rs

Lines changed: 27 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,14 @@ use pgrx::{heap_tuple::PgHeapTuple, AllocatedByRust, PgTupleDesc};
1111

1212
use crate::{
1313
arrow_parquet::{
14-
compression::{PgParquetCompression, PgParquetCompressionWithLevel},
14+
compression::PgParquetCompressionWithLevel,
1515
pg_to_arrow::context::collect_pg_to_arrow_attribute_contexts,
1616
schema_parser::{
1717
parquet_schema_string_from_attributes, parse_arrow_schema_from_attributes,
1818
},
1919
uri_utils::parquet_writer_from_uri,
2020
},
21+
parquet_copy_hook::copy_to_split_dest_receiver::CopyToParquetOptions,
2122
pgrx_utils::{collect_attributes_for, CollectAttributesFor},
2223
type_compat::{
2324
geometry::{geoparquet_metadata_json_from_tupledesc, reset_postgis_context},
@@ -38,13 +39,13 @@ pub(crate) struct ParquetWriterContext {
3839
parquet_writer: AsyncArrowWriter<ParquetObjectWriter>,
3940
schema: SchemaRef,
4041
attribute_contexts: Vec<PgToArrowAttributeContext>,
42+
options: CopyToParquetOptions,
4143
}
4244

4345
impl ParquetWriterContext {
4446
pub(crate) fn new(
4547
uri_info: ParsedUriInfo,
46-
compression: PgParquetCompression,
47-
compression_level: i32,
48+
options: CopyToParquetOptions,
4849
tupledesc: &PgTupleDesc,
4950
) -> ParquetWriterContext {
5051
// Postgis and Map contexts are used throughout writing the parquet file.
@@ -62,7 +63,7 @@ impl ParquetWriterContext {
6263
let schema = parse_arrow_schema_from_attributes(&attributes);
6364
let schema = Arc::new(schema);
6465

65-
let writer_props = Self::writer_props(tupledesc, compression, compression_level);
66+
let writer_props = Self::writer_props(tupledesc, options);
6667

6768
let parquet_writer = parquet_writer_from_uri(uri_info, schema.clone(), writer_props);
6869

@@ -73,22 +74,20 @@ impl ParquetWriterContext {
7374
parquet_writer,
7475
schema,
7576
attribute_contexts,
77+
options,
7678
}
7779
}
7880

79-
fn writer_props(
80-
tupledesc: &PgTupleDesc,
81-
compression: PgParquetCompression,
82-
compression_level: i32,
83-
) -> WriterProperties {
81+
fn writer_props(tupledesc: &PgTupleDesc, options: CopyToParquetOptions) -> WriterProperties {
8482
let compression = PgParquetCompressionWithLevel {
85-
compression,
86-
compression_level,
83+
compression: options.compression,
84+
compression_level: options.compression_level,
8785
};
8886

8987
let mut writer_props_builder = WriterProperties::builder()
9088
.set_statistics_enabled(EnabledStatistics::Page)
9189
.set_compression(compression.into())
90+
.set_max_row_group_size(options.row_group_size as usize)
9291
.set_created_by("pg_parquet".to_string());
9392

9493
let geometry_columns_metadata_value = geoparquet_metadata_json_from_tupledesc(tupledesc);
@@ -103,10 +102,9 @@ impl ParquetWriterContext {
103102
writer_props_builder.build()
104103
}
105104

106-
pub(crate) fn write_new_row_group(
107-
&mut self,
108-
tuples: Vec<Option<PgHeapTuple<AllocatedByRust>>>,
109-
) {
105+
// write_tuples writes the tuples to the parquet file. It flushes the in progress rows to a new row group
106+
// if the row group size is reached.
107+
pub(crate) fn write_tuples(&mut self, tuples: Vec<Option<PgHeapTuple<AllocatedByRust>>>) {
110108
let record_batch =
111109
Self::pg_tuples_to_record_batch(tuples, &self.attribute_contexts, self.schema.clone());
112110

@@ -116,9 +114,20 @@ impl ParquetWriterContext {
116114
.block_on(parquet_writer.write(&record_batch))
117115
.unwrap_or_else(|e| panic!("failed to write record batch: {}", e));
118116

117+
if parquet_writer.in_progress_rows() >= self.options.row_group_size as _
118+
|| parquet_writer.in_progress_size() >= self.options.row_group_size_bytes as _
119+
{
120+
PG_BACKEND_TOKIO_RUNTIME
121+
.block_on(parquet_writer.flush())
122+
.unwrap_or_else(|e| panic!("failed to flush record batch: {}", e));
123+
}
124+
}
125+
126+
// finalize flushes the in progress rows to a new row group and finally writes metadata to the file.
127+
fn finalize(&mut self) {
119128
PG_BACKEND_TOKIO_RUNTIME
120-
.block_on(parquet_writer.flush())
121-
.unwrap_or_else(|e| panic!("failed to flush record batch: {}", e));
129+
.block_on(self.parquet_writer.finish())
130+
.unwrap_or_else(|e| panic!("failed to finish parquet writer: {}", e));
122131
}
123132

124133
pub(crate) fn bytes_written(&self) -> usize {
@@ -144,10 +153,6 @@ impl ParquetWriterContext {
144153

145154
impl Drop for ParquetWriterContext {
146155
fn drop(&mut self) {
147-
PG_BACKEND_TOKIO_RUNTIME
148-
.block_on(self.parquet_writer.finish())
149-
.unwrap_or_else(|e| {
150-
panic!("failed to close parquet writer: {}", e);
151-
});
156+
self.finalize();
152157
}
153158
}

src/arrow_parquet/uri_utils.rs

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ use pgrx::{
1919
use url::Url;
2020

2121
use crate::{
22-
arrow_parquet::parquet_writer::DEFAULT_ROW_GROUP_SIZE,
2322
object_store::{
2423
aws::parse_s3_bucket, azure::parse_azure_blob_container,
2524
object_store_cache::get_or_create_object_store,
@@ -139,6 +138,9 @@ pub(crate) fn parquet_metadata_from_uri(uri_info: ParsedUriInfo) -> Arc<ParquetM
139138
})
140139
}
141140

141+
// default # of records per batch during arrow-parquet conversions (RecordBatch api)
142+
pub(crate) const RECORD_BATCH_SIZE: i64 = 1024;
143+
142144
pub(crate) fn parquet_reader_from_uri(
143145
uri_info: ParsedUriInfo,
144146
) -> ParquetRecordBatchStream<ParquetObjectReader> {
@@ -165,13 +167,36 @@ pub(crate) fn parquet_reader_from_uri(
165167

166168
pgrx::debug2!("Converted arrow schema is: {}", builder.schema());
167169

170+
let batch_size = calculate_reader_batch_size(builder.metadata());
171+
168172
builder
169-
.with_batch_size(DEFAULT_ROW_GROUP_SIZE as usize)
173+
.with_batch_size(batch_size)
170174
.build()
171175
.unwrap_or_else(|e| panic!("{}", e))
172176
})
173177
}
174178

179+
fn calculate_reader_batch_size(metadata: &Arc<ParquetMetaData>) -> usize {
180+
const MAX_ARROW_ARRAY_SIZE: i64 = i32::MAX as _;
181+
182+
for row_group in metadata.row_groups() {
183+
for column in row_group.columns() {
184+
// try our best to get the size of the column
185+
let column_size = column
186+
.unencoded_byte_array_data_bytes()
187+
.unwrap_or(column.uncompressed_size());
188+
189+
if column_size > MAX_ARROW_ARRAY_SIZE {
190+
// to prevent decoding large arrays into memory, process one row at a time
191+
return 1;
192+
}
193+
}
194+
}
195+
196+
// default batch size
197+
RECORD_BATCH_SIZE as _
198+
}
199+
175200
pub(crate) fn parquet_writer_from_uri(
176201
uri_info: ParsedUriInfo,
177202
arrow_schema: SchemaRef,

src/parquet_copy_hook/copy_to_dest_receiver.rs

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,10 @@ use pg_sys::{
99
};
1010
use pgrx::{prelude::*, FromDatum, PgList, PgMemoryContexts, PgTupleDesc};
1111

12-
use crate::arrow_parquet::{parquet_writer::ParquetWriterContext, uri_utils::parse_uri};
12+
use crate::arrow_parquet::{
13+
parquet_writer::ParquetWriterContext,
14+
uri_utils::{ParsedUriInfo, RECORD_BATCH_SIZE},
15+
};
1316

1417
use super::copy_to_split_dest_receiver::CopyToParquetOptions;
1518

@@ -22,6 +25,7 @@ pub(crate) struct CopyToParquetDestReceiver {
2225
collected_tuple_count: i64,
2326
collected_tuple_size: i64,
2427
collected_tuple_column_sizes: *mut i64,
28+
target_batch_size: i64,
2529
uri: *const c_char,
2630
copy_options: CopyToParquetOptions,
2731
per_copy_context: MemoryContext,
@@ -64,14 +68,6 @@ impl CopyToParquetDestReceiver {
6468
};
6569
}
6670

67-
fn collected_tuples_exceeds_row_group_size(&self) -> bool {
68-
self.collected_tuple_count >= self.copy_options.row_group_size
69-
}
70-
71-
fn collected_tuples_exceeds_row_group_size_bytes(&self) -> bool {
72-
self.collected_tuple_size >= self.copy_options.row_group_size_bytes
73-
}
74-
7571
fn collected_tuples_exceeds_max_col_size(&self, tuple_column_sizes: &[i32]) -> bool {
7672
const MAX_ARROW_ARRAY_SIZE: i64 = i32::MAX as _;
7773

@@ -118,7 +114,7 @@ impl CopyToParquetDestReceiver {
118114
.as_mut()
119115
.expect("parquet writer context is not found")
120116
};
121-
current_parquet_writer_context.write_new_row_group(tuples);
117+
current_parquet_writer_context.write_tuples(tuples);
122118

123119
self.reset_collected_tuples();
124120
}
@@ -171,6 +167,13 @@ pub(crate) extern "C" fn copy_startup(
171167
};
172168
parquet_dest.natts = tupledesc.len();
173169

170+
parquet_dest.target_batch_size = if parquet_dest.copy_options.row_group_size < RECORD_BATCH_SIZE
171+
{
172+
parquet_dest.copy_options.row_group_size
173+
} else {
174+
RECORD_BATCH_SIZE
175+
};
176+
174177
let uri = unsafe { CStr::from_ptr(parquet_dest.uri) }
175178
.to_str()
176179
.expect("uri is not a valid C string");
@@ -179,13 +182,9 @@ pub(crate) extern "C" fn copy_startup(
179182
panic!("{}", e.to_string());
180183
});
181184

182-
let compression = parquet_dest.copy_options.compression;
183-
184-
let compression_level = parquet_dest.copy_options.compression_level;
185-
186185
// leak the parquet writer context since it will be used during the COPY operation
187186
let parquet_writer_context =
188-
ParquetWriterContext::new(uri_info, compression, compression_level, &tupledesc);
187+
ParquetWriterContext::new(uri_info, parquet_dest.copy_options, &tupledesc);
189188
parquet_dest.parquet_writer_context = Box::into_raw(Box::new(parquet_writer_context));
190189
}
191190

@@ -220,6 +219,9 @@ pub(crate) extern "C" fn copy_receive(slot: *mut TupleTableSlot, dest: *mut Dest
220219

221220
let column_sizes = tuple_column_sizes(&datums, &tupledesc);
222221

222+
// we use arrow arrays as intermediate format when writing to parquet.
223+
// To not hit into arrow array size limit, write the tuples before
224+
// collecting new one into the batch.
223225
if parquet_dest.collected_tuples_exceeds_max_col_size(&column_sizes) {
224226
parquet_dest.write_tuples_to_parquet();
225227
}
@@ -229,9 +231,7 @@ pub(crate) extern "C" fn copy_receive(slot: *mut TupleTableSlot, dest: *mut Dest
229231

230232
parquet_dest.collect_tuple(heap_tuple, column_sizes);
231233

232-
if parquet_dest.collected_tuples_exceeds_row_group_size()
233-
|| parquet_dest.collected_tuples_exceeds_row_group_size_bytes()
234-
{
234+
if parquet_dest.collected_tuple_count == parquet_dest.target_batch_size {
235235
parquet_dest.write_tuples_to_parquet();
236236
}
237237
});
@@ -329,6 +329,7 @@ pub(crate) fn create_copy_to_parquet_dest_receiver(
329329
parquet_dest.collected_tuple_count = 0;
330330
parquet_dest.collected_tuples = std::ptr::null_mut();
331331
parquet_dest.collected_tuple_column_sizes = std::ptr::null_mut();
332+
parquet_dest.target_batch_size = 0;
332333
parquet_dest.copy_options = options;
333334
parquet_dest.per_copy_context = per_copy_context;
334335

src/pgrx_tests/copy_options.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,7 @@ mod tests {
331331
results
332332
});
333333

334-
assert_eq!(result_metadata, vec![2]);
334+
assert_eq!(result_metadata, vec![1]);
335335
}
336336

337337
#[pg_test]
@@ -381,7 +381,7 @@ mod tests {
381381

382382
let id_bytes = 4;
383383
let name_bytes = 1;
384-
let total_rows_size_bytes = (id_bytes + name_bytes) * 1_000_000;
384+
let total_rows_size_bytes = (id_bytes + name_bytes) * 1024 * 1024;
385385

386386
let row_group_size_bytes = total_rows_size_bytes / 10;
387387

@@ -409,7 +409,7 @@ mod tests {
409409
results
410410
});
411411

412-
assert_eq!(result_metadata, vec![10]);
412+
assert_eq!(result_metadata, vec![9]);
413413
}
414414

415415
#[pg_test]

0 commit comments

Comments
 (0)