Skip to content

Commit f8c3d62

Browse files
Support file_size_bytes option (#100)
COPY TO parquet now supports a new option, called `file_size_bytes`, which lets you generate parquet files with target size = `file_size_bytes`. When a parquet file exceeds the target size, it will be flushed and a new parquet file will be generated under a parent directory. (parent directory will be the path without the parquet extension) e.g. ```sql COPY (select 'hellooooo' || i from generate_series(1, 1000000) i) to '/tmp/test.parquet' with (file_size_bytes '1MB'); ``` ```bash > ls -alh /tmp/test.parquet/ 1.4M data_0.parquet 1.4M data_1.parquet 1.4M data_2.parquet 1.4M data_3.parquet 114K data_4.parquet ``` Closes #107.
1 parent b626eb4 commit f8c3d62

File tree

12 files changed

+620
-151
lines changed

12 files changed

+620
-151
lines changed

README.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -273,8 +273,9 @@ Supported authorization methods' priority order is shown below:
273273
## Copy Options
274274
`pg_parquet` supports the following options in the `COPY TO` command:
275275
- `format parquet`: you need to specify this option to read or write Parquet files which does not end with `.parquet[.<compression>]` extension,
276-
- `row_group_size <int>`: the number of rows in each row group while writing Parquet files. The default row group size is `122880`,
277-
- `row_group_size_bytes <int>`: the total byte size of rows in each row group while writing Parquet files. The default row group size bytes is `row_group_size * 1024`,
276+
- `file_size_bytes <string>`: the total file size per Parquet file. When set, the parquet files, with target size, are created under parent directory (named the same as file name). By default, when not specified, a single file is generated without creating a parent folder. You can specify total bytes without unit like `file_size_bytes 2000000` or with unit (KB, MB, or GB) like `file_size_bytes '1MB'`,
277+
- `row_group_size <int64>`: the number of rows in each row group while writing Parquet files. The default row group size is `122880`,
278+
- `row_group_size_bytes <int64>`: the total byte size of rows in each row group while writing Parquet files. The default row group size bytes is `row_group_size * 1024`,
278279
- `compression <string>`: the compression format to use while writing Parquet files. The supported compression formats are `uncompressed`, `snappy`, `gzip`, `brotli`, `lz4`, `lz4raw` and `zstd`. The default compression format is `snappy`. If not specified, the compression format is determined by the file extension,
279280
- `compression_level <int>`: the compression level to use while writing Parquet files. The supported compression levels are only supported for `gzip`, `zstd` and `brotli` compression formats. The default compression level is `6` for `gzip (0-10)`, `1` for `zstd (1-22)` and `1` for `brotli (0-11)`.
280281

src/arrow_parquet/parquet_writer.rs

Lines changed: 31 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,14 @@ use pgrx::{heap_tuple::PgHeapTuple, AllocatedByRust, PgTupleDesc};
1111

1212
use crate::{
1313
arrow_parquet::{
14-
compression::{PgParquetCompression, PgParquetCompressionWithLevel},
14+
compression::PgParquetCompressionWithLevel,
1515
pg_to_arrow::context::collect_pg_to_arrow_attribute_contexts,
1616
schema_parser::{
1717
parquet_schema_string_from_attributes, parse_arrow_schema_from_attributes,
1818
},
1919
uri_utils::parquet_writer_from_uri,
2020
},
21+
parquet_copy_hook::copy_to_split_dest_receiver::CopyToParquetOptions,
2122
pgrx_utils::{collect_attributes_for, CollectAttributesFor},
2223
type_compat::{
2324
geometry::{geoparquet_metadata_json_from_tupledesc, reset_postgis_context},
@@ -38,13 +39,13 @@ pub(crate) struct ParquetWriterContext {
3839
parquet_writer: AsyncArrowWriter<ParquetObjectWriter>,
3940
schema: SchemaRef,
4041
attribute_contexts: Vec<PgToArrowAttributeContext>,
42+
options: CopyToParquetOptions,
4143
}
4244

4345
impl ParquetWriterContext {
4446
pub(crate) fn new(
4547
uri_info: ParsedUriInfo,
46-
compression: PgParquetCompression,
47-
compression_level: i32,
48+
options: CopyToParquetOptions,
4849
tupledesc: &PgTupleDesc,
4950
) -> ParquetWriterContext {
5051
// Postgis and Map contexts are used throughout writing the parquet file.
@@ -62,7 +63,7 @@ impl ParquetWriterContext {
6263
let schema = parse_arrow_schema_from_attributes(&attributes);
6364
let schema = Arc::new(schema);
6465

65-
let writer_props = Self::writer_props(tupledesc, compression, compression_level);
66+
let writer_props = Self::writer_props(tupledesc, options);
6667

6768
let parquet_writer = parquet_writer_from_uri(uri_info, schema.clone(), writer_props);
6869

@@ -73,22 +74,20 @@ impl ParquetWriterContext {
7374
parquet_writer,
7475
schema,
7576
attribute_contexts,
77+
options,
7678
}
7779
}
7880

79-
fn writer_props(
80-
tupledesc: &PgTupleDesc,
81-
compression: PgParquetCompression,
82-
compression_level: i32,
83-
) -> WriterProperties {
81+
fn writer_props(tupledesc: &PgTupleDesc, options: CopyToParquetOptions) -> WriterProperties {
8482
let compression = PgParquetCompressionWithLevel {
85-
compression,
86-
compression_level,
83+
compression: options.compression,
84+
compression_level: options.compression_level,
8785
};
8886

8987
let mut writer_props_builder = WriterProperties::builder()
9088
.set_statistics_enabled(EnabledStatistics::Page)
9189
.set_compression(compression.into())
90+
.set_max_row_group_size(options.row_group_size as usize)
9291
.set_created_by("pg_parquet".to_string());
9392

9493
let geometry_columns_metadata_value = geoparquet_metadata_json_from_tupledesc(tupledesc);
@@ -103,10 +102,9 @@ impl ParquetWriterContext {
103102
writer_props_builder.build()
104103
}
105104

106-
pub(crate) fn write_new_row_group(
107-
&mut self,
108-
tuples: Vec<Option<PgHeapTuple<AllocatedByRust>>>,
109-
) {
105+
// write_tuples writes the tuples to the parquet file. It flushes the in progress rows to a new row group
106+
// if the row group size is reached.
107+
pub(crate) fn write_tuples(&mut self, tuples: Vec<Option<PgHeapTuple<AllocatedByRust>>>) {
110108
let record_batch =
111109
Self::pg_tuples_to_record_batch(tuples, &self.attribute_contexts, self.schema.clone());
112110

@@ -116,9 +114,24 @@ impl ParquetWriterContext {
116114
.block_on(parquet_writer.write(&record_batch))
117115
.unwrap_or_else(|e| panic!("failed to write record batch: {}", e));
118116

117+
if parquet_writer.in_progress_rows() >= self.options.row_group_size as _
118+
|| parquet_writer.in_progress_size() >= self.options.row_group_size_bytes as _
119+
{
120+
PG_BACKEND_TOKIO_RUNTIME
121+
.block_on(parquet_writer.flush())
122+
.unwrap_or_else(|e| panic!("failed to flush record batch: {}", e));
123+
}
124+
}
125+
126+
// finalize flushes the in progress rows to a new row group and finally writes metadata to the file.
127+
fn finalize(&mut self) {
119128
PG_BACKEND_TOKIO_RUNTIME
120-
.block_on(parquet_writer.flush())
121-
.unwrap_or_else(|e| panic!("failed to flush record batch: {}", e));
129+
.block_on(self.parquet_writer.finish())
130+
.unwrap_or_else(|e| panic!("failed to finish parquet writer: {}", e));
131+
}
132+
133+
pub(crate) fn bytes_written(&self) -> usize {
134+
self.parquet_writer.bytes_written()
122135
}
123136

124137
fn pg_tuples_to_record_batch(
@@ -140,10 +153,6 @@ impl ParquetWriterContext {
140153

141154
impl Drop for ParquetWriterContext {
142155
fn drop(&mut self) {
143-
PG_BACKEND_TOKIO_RUNTIME
144-
.block_on(self.parquet_writer.finish())
145-
.unwrap_or_else(|e| {
146-
panic!("failed to close parquet writer: {}", e);
147-
});
156+
self.finalize();
148157
}
149158
}

src/arrow_parquet/uri_utils.rs

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ use pgrx::{
1818
use url::Url;
1919

2020
use crate::{
21-
arrow_parquet::parquet_writer::DEFAULT_ROW_GROUP_SIZE,
2221
object_store::{
2322
aws::parse_s3_bucket, azure::parse_azure_blob_container, http::parse_http_base_uri,
2423
object_store_cache::get_or_create_object_store,
@@ -143,6 +142,9 @@ pub(crate) fn parquet_metadata_from_uri(uri_info: ParsedUriInfo) -> Arc<ParquetM
143142
})
144143
}
145144

145+
// default # of records per batch during arrow-parquet conversions (RecordBatch api)
146+
pub(crate) const RECORD_BATCH_SIZE: i64 = 1024;
147+
146148
pub(crate) fn parquet_reader_from_uri(
147149
uri_info: ParsedUriInfo,
148150
) -> ParquetRecordBatchStream<ParquetObjectReader> {
@@ -169,13 +171,36 @@ pub(crate) fn parquet_reader_from_uri(
169171

170172
pgrx::debug2!("Converted arrow schema is: {}", builder.schema());
171173

174+
let batch_size = calculate_reader_batch_size(builder.metadata());
175+
172176
builder
173-
.with_batch_size(DEFAULT_ROW_GROUP_SIZE as usize)
177+
.with_batch_size(batch_size)
174178
.build()
175179
.unwrap_or_else(|e| panic!("{}", e))
176180
})
177181
}
178182

183+
fn calculate_reader_batch_size(metadata: &Arc<ParquetMetaData>) -> usize {
184+
const MAX_ARROW_ARRAY_SIZE: i64 = i32::MAX as _;
185+
186+
for row_group in metadata.row_groups() {
187+
for column in row_group.columns() {
188+
// try our best to get the size of the column
189+
let column_size = column
190+
.unencoded_byte_array_data_bytes()
191+
.unwrap_or(column.uncompressed_size());
192+
193+
if column_size > MAX_ARROW_ARRAY_SIZE {
194+
// to prevent decoding large arrays into memory, process one row at a time
195+
return 1;
196+
}
197+
}
198+
}
199+
200+
// default batch size
201+
RECORD_BATCH_SIZE as _
202+
}
203+
179204
pub(crate) fn parquet_writer_from_uri(
180205
uri_info: ParsedUriInfo,
181206
arrow_schema: SchemaRef,

src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ mod type_compat;
1818
#[allow(unused_imports)]
1919
pub use crate::arrow_parquet::compression::PgParquetCompression;
2020
#[allow(unused_imports)]
21-
pub use crate::parquet_copy_hook::copy_to_dest_receiver::create_copy_to_parquet_dest_receiver;
21+
pub use crate::parquet_copy_hook::copy_to_split_dest_receiver::create_copy_to_parquet_split_dest_receiver;
2222

2323
pgrx::pg_module_magic!();
2424

src/object_store/local_file.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,13 @@ pub(crate) fn create_local_file_object_store(
1313
let path = uri_as_string(uri);
1414

1515
if !copy_from {
16+
// create parent folder if it doesn't exist
17+
let parent = std::path::Path::new(&path)
18+
.parent()
19+
.unwrap_or_else(|| panic!("invalid parent for path: {}", path));
20+
21+
std::fs::create_dir_all(parent).unwrap_or_else(|e| panic!("{}", e));
22+
1623
// create or overwrite the local file
1724
std::fs::OpenOptions::new()
1825
.write(true)

src/parquet_copy_hook.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
pub(crate) mod copy_from;
22
pub(crate) mod copy_to;
33
pub(crate) mod copy_to_dest_receiver;
4+
pub(crate) mod copy_to_split_dest_receiver;
45
pub(crate) mod copy_utils;
56
pub(crate) mod hook;
67
pub(crate) mod pg_compat;

0 commit comments

Comments
 (0)