Skip to content

Commit 08c012a

Browse files
committed
Support file_size_bytes option
COPY TO parquet now supports a new option, called `file_size_bytes`, which lets you generate parquet files with target size = `file_size_bytes`. When a parquet file exceeds the target size, it will be flushed and a new parquet file will be generated under a parent directory. (parent directory will be the path without the parquet extension) e.g. ```sql COPY (select 'hellooooo' || i from generate_series(1, 1000000) i) to '/tmp/test.parquet' with (file_size_bytes 1048576); ``` ```bash > ls -alh /tmp/test/ 1.4M data_0.parquet 1.4M data_1.parquet 1.4M data_2.parquet 1.4M data_3.parquet 114K data_4.parquet ```
1 parent 6014e11 commit 08c012a

File tree

13 files changed

+580
-150
lines changed

13 files changed

+580
-150
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,7 @@ Supported authorization methods' priority order is shown below:
242242
## Copy Options
243243
`pg_parquet` supports the following options in the `COPY TO` command:
244244
- `format parquet`: you need to specify this option to read or write Parquet files which does not end with `.parquet[.<compression>]` extension,
245+
- `file_size_bytes <int>`: the total byte size per Parquet file. When set, the parquet files, with target size, are created under parent directory (named the same as file name without file extension). By default, when not specified, a single file is generated without creating a parent folder.
245246
- `row_group_size <int>`: the number of rows in each row group while writing Parquet files. The default row group size is `122880`,
246247
- `row_group_size_bytes <int>`: the total byte size of rows in each row group while writing Parquet files. The default row group size bytes is `row_group_size * 1024`,
247248
- `compression <string>`: the compression format to use while writing Parquet files. The supported compression formats are `uncompressed`, `snappy`, `gzip`, `brotli`, `lz4`, `lz4raw` and `zstd`. The default compression format is `snappy`. If not specified, the compression format is determined by the file extension,

src/arrow_parquet/parquet_writer.rs

Lines changed: 31 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,14 @@ use pgrx::{heap_tuple::PgHeapTuple, AllocatedByRust, PgTupleDesc};
1111

1212
use crate::{
1313
arrow_parquet::{
14-
compression::{PgParquetCompression, PgParquetCompressionWithLevel},
14+
compression::PgParquetCompressionWithLevel,
1515
pg_to_arrow::context::collect_pg_to_arrow_attribute_contexts,
1616
schema_parser::{
1717
parquet_schema_string_from_attributes, parse_arrow_schema_from_attributes,
1818
},
1919
uri_utils::parquet_writer_from_uri,
2020
},
21+
parquet_copy_hook::copy_to_split_dest_receiver::CopyToParquetOptions,
2122
pgrx_utils::{collect_attributes_for, CollectAttributesFor},
2223
type_compat::{
2324
geometry::{geoparquet_metadata_json_from_tupledesc, reset_postgis_context},
@@ -38,13 +39,13 @@ pub(crate) struct ParquetWriterContext {
3839
parquet_writer: AsyncArrowWriter<ParquetObjectWriter>,
3940
schema: SchemaRef,
4041
attribute_contexts: Vec<PgToArrowAttributeContext>,
42+
options: CopyToParquetOptions,
4143
}
4244

4345
impl ParquetWriterContext {
4446
pub(crate) fn new(
4547
uri_info: ParsedUriInfo,
46-
compression: PgParquetCompression,
47-
compression_level: i32,
48+
options: CopyToParquetOptions,
4849
tupledesc: &PgTupleDesc,
4950
) -> ParquetWriterContext {
5051
// Postgis and Map contexts are used throughout writing the parquet file.
@@ -62,7 +63,7 @@ impl ParquetWriterContext {
6263
let schema = parse_arrow_schema_from_attributes(&attributes);
6364
let schema = Arc::new(schema);
6465

65-
let writer_props = Self::writer_props(tupledesc, compression, compression_level);
66+
let writer_props = Self::writer_props(tupledesc, options);
6667

6768
let parquet_writer = parquet_writer_from_uri(uri_info, schema.clone(), writer_props);
6869

@@ -73,22 +74,20 @@ impl ParquetWriterContext {
7374
parquet_writer,
7475
schema,
7576
attribute_contexts,
77+
options,
7678
}
7779
}
7880

79-
fn writer_props(
80-
tupledesc: &PgTupleDesc,
81-
compression: PgParquetCompression,
82-
compression_level: i32,
83-
) -> WriterProperties {
81+
fn writer_props(tupledesc: &PgTupleDesc, options: CopyToParquetOptions) -> WriterProperties {
8482
let compression = PgParquetCompressionWithLevel {
85-
compression,
86-
compression_level,
83+
compression: options.compression,
84+
compression_level: options.compression_level,
8785
};
8886

8987
let mut writer_props_builder = WriterProperties::builder()
9088
.set_statistics_enabled(EnabledStatistics::Page)
9189
.set_compression(compression.into())
90+
.set_max_row_group_size(options.row_group_size as usize)
9291
.set_created_by("pg_parquet".to_string());
9392

9493
let geometry_columns_metadata_value = geoparquet_metadata_json_from_tupledesc(tupledesc);
@@ -103,10 +102,9 @@ impl ParquetWriterContext {
103102
writer_props_builder.build()
104103
}
105104

106-
pub(crate) fn write_new_row_group(
107-
&mut self,
108-
tuples: Vec<Option<PgHeapTuple<AllocatedByRust>>>,
109-
) {
105+
// write_tuples writes the tuples to the parquet file. It flushes the in progress rows to a new row group
106+
// if the row group size is reached.
107+
pub(crate) fn write_tuples(&mut self, tuples: Vec<Option<PgHeapTuple<AllocatedByRust>>>) {
110108
let record_batch =
111109
Self::pg_tuples_to_record_batch(tuples, &self.attribute_contexts, self.schema.clone());
112110

@@ -116,9 +114,24 @@ impl ParquetWriterContext {
116114
.block_on(parquet_writer.write(&record_batch))
117115
.unwrap_or_else(|e| panic!("failed to write record batch: {}", e));
118116

117+
if parquet_writer.in_progress_rows() >= self.options.row_group_size as _
118+
|| parquet_writer.in_progress_size() >= self.options.row_group_size_bytes as _
119+
{
120+
PG_BACKEND_TOKIO_RUNTIME
121+
.block_on(parquet_writer.flush())
122+
.unwrap_or_else(|e| panic!("failed to flush record batch: {}", e));
123+
}
124+
}
125+
126+
// finalize flushes the in progress rows to a new row group and finally writes metadata to the file.
127+
fn finalize(&mut self) {
119128
PG_BACKEND_TOKIO_RUNTIME
120-
.block_on(parquet_writer.flush())
121-
.unwrap_or_else(|e| panic!("failed to flush record batch: {}", e));
129+
.block_on(self.parquet_writer.finish())
130+
.unwrap_or_else(|e| panic!("failed to finish parquet writer: {}", e));
131+
}
132+
133+
pub(crate) fn bytes_written(&self) -> usize {
134+
self.parquet_writer.bytes_written()
122135
}
123136

124137
fn pg_tuples_to_record_batch(
@@ -140,10 +153,6 @@ impl ParquetWriterContext {
140153

141154
impl Drop for ParquetWriterContext {
142155
fn drop(&mut self) {
143-
PG_BACKEND_TOKIO_RUNTIME
144-
.block_on(self.parquet_writer.finish())
145-
.unwrap_or_else(|e| {
146-
panic!("failed to close parquet writer: {}", e);
147-
});
156+
self.finalize();
148157
}
149158
}

src/arrow_parquet/uri_utils.rs

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ use pgrx::{
1818
use url::Url;
1919

2020
use crate::{
21-
arrow_parquet::parquet_writer::DEFAULT_ROW_GROUP_SIZE,
2221
object_store::{
2322
aws::parse_s3_bucket, azure::parse_azure_blob_container,
2423
object_store_cache::get_or_create_object_store,
@@ -140,6 +139,9 @@ pub(crate) fn parquet_metadata_from_uri(uri_info: ParsedUriInfo) -> Arc<ParquetM
140139
})
141140
}
142141

142+
// default # of records per batch during arrow-parquet conversions (RecordBatch api)
143+
pub(crate) const RECORD_BATCH_SIZE: i64 = 1024;
144+
143145
pub(crate) fn parquet_reader_from_uri(
144146
uri_info: ParsedUriInfo,
145147
) -> ParquetRecordBatchStream<ParquetObjectReader> {
@@ -166,13 +168,36 @@ pub(crate) fn parquet_reader_from_uri(
166168

167169
pgrx::debug2!("Converted arrow schema is: {}", builder.schema());
168170

171+
let batch_size = calculate_reader_batch_size(builder.metadata());
172+
169173
builder
170-
.with_batch_size(DEFAULT_ROW_GROUP_SIZE as usize)
174+
.with_batch_size(batch_size)
171175
.build()
172176
.unwrap_or_else(|e| panic!("{}", e))
173177
})
174178
}
175179

180+
fn calculate_reader_batch_size(metadata: &Arc<ParquetMetaData>) -> usize {
181+
const MAX_ARROW_ARRAY_SIZE: i64 = i32::MAX as _;
182+
183+
for row_group in metadata.row_groups() {
184+
for column in row_group.columns() {
185+
// try our best to get the size of the column
186+
let column_size = column
187+
.unencoded_byte_array_data_bytes()
188+
.unwrap_or(column.uncompressed_size());
189+
190+
if column_size > MAX_ARROW_ARRAY_SIZE {
191+
// to prevent decoding large arrays into memory, process one row at a time
192+
return 1;
193+
}
194+
}
195+
}
196+
197+
// default batch size
198+
RECORD_BATCH_SIZE as _
199+
}
200+
176201
pub(crate) fn parquet_writer_from_uri(
177202
uri_info: ParsedUriInfo,
178203
arrow_schema: SchemaRef,

src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ mod type_compat;
1818
#[allow(unused_imports)]
1919
pub use crate::arrow_parquet::compression::PgParquetCompression;
2020
#[allow(unused_imports)]
21-
pub use crate::parquet_copy_hook::copy_to_dest_receiver::create_copy_to_parquet_dest_receiver;
21+
pub use crate::parquet_copy_hook::copy_to_split_dest_receiver::create_copy_to_parquet_split_dest_receiver;
2222

2323
pgrx::pg_module_magic!();
2424

src/object_store/local_file.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,13 @@ pub(crate) fn create_local_file_object_store(
1313
let path = uri_as_string(uri);
1414

1515
if !copy_from {
16+
// create parent folder if it doesn't exist
17+
let parent = std::path::Path::new(&path)
18+
.parent()
19+
.unwrap_or_else(|| panic!("invalid parent for path: {}", path));
20+
21+
std::fs::create_dir_all(parent).unwrap_or_else(|e| panic!("{}", e));
22+
1623
// create or overwrite the local file
1724
std::fs::OpenOptions::new()
1825
.write(true)

src/parquet_copy_hook.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
pub(crate) mod copy_from;
22
pub(crate) mod copy_to;
33
pub(crate) mod copy_to_dest_receiver;
4+
pub(crate) mod copy_to_split_dest_receiver;
45
pub(crate) mod copy_utils;
56
pub(crate) mod hook;
67
pub(crate) mod pg_compat;

0 commit comments

Comments
 (0)