Skip to content

Commit 6428599

Browse files
committed
Support file_size_bytes option
COPY TO parquet now supports a new option, called `file_size_bytes`, which lets you generate parquet files with target size = `file_size_bytes`. When a parquet file exceeds the target size, it will be flushed and a new parquet file will be generated under a parent directory. (parent directory will be the path without the parquet extension) e.g. ```sql COPY (select 'hellooooo' || i from generate_series(1, 1000000) i) to '/tmp/test.parquet' with (file_size_bytes 1048576); ``` ```bash > ls -alh /tmp/test/ 1.4M data_0.parquet 1.4M data_1.parquet 1.4M data_2.parquet 1.4M data_3.parquet 114K data_4.parquet ```
1 parent 3ff46d5 commit 6428599

File tree

10 files changed

+499
-105
lines changed

10 files changed

+499
-105
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,7 @@ Supported authorization methods' priority order is shown below:
242242
## Copy Options
243243
`pg_parquet` supports the following options in the `COPY TO` command:
244244
- `format parquet`: you need to specify this option to read or write Parquet files which does not end with `.parquet[.<compression>]` extension,
245+
- `file_size_bytes <int>`: the total byte size per Parquet file. When set, the parquet files, with target size, are created under parent directory (named the same as file name without file extension). By default, when not specified, a single file is generated without creating a parent folder.
245246
- `row_group_size <int>`: the number of rows in each row group while writing Parquet files. The default row group size is `122880`,
246247
- `row_group_size_bytes <int>`: the total byte size of rows in each row group while writing Parquet files. The default row group size bytes is `row_group_size * 1024`,
247248
- `compression <string>`: the compression format to use while writing Parquet files. The supported compression formats are `uncompressed`, `snappy`, `gzip`, `brotli`, `lz4`, `lz4raw` and `zstd`. The default compression format is `snappy`. If not specified, the compression format is determined by the file extension,

src/arrow_parquet/parquet_writer.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,10 @@ impl ParquetWriterContext {
119119
.unwrap_or_else(|e| panic!("failed to flush record batch: {}", e));
120120
}
121121

122+
pub(crate) fn bytes_written(&self) -> usize {
123+
self.parquet_writer.bytes_written()
124+
}
125+
122126
fn pg_tuples_to_record_batch(
123127
tuples: Vec<Option<PgHeapTuple<AllocatedByRust>>>,
124128
attribute_contexts: &[PgToArrowAttributeContext],

src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ mod type_compat;
1818
#[allow(unused_imports)]
1919
pub use crate::arrow_parquet::compression::PgParquetCompression;
2020
#[allow(unused_imports)]
21-
pub use crate::parquet_copy_hook::copy_to_dest_receiver::create_copy_to_parquet_dest_receiver;
21+
pub use crate::parquet_copy_hook::copy_to_split_dest_receiver::create_copy_to_parquet_split_dest_receiver;
2222

2323
pgrx::pg_module_magic!();
2424

src/object_store/local_file.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@ pub(crate) fn create_local_file_object_store(
1313
let path = uri_as_string(uri);
1414

1515
if !copy_from {
16+
// create parent folder if it doesn't exist
17+
let parent = std::path::Path::new(&path).parent().unwrap();
18+
std::fs::create_dir_all(parent).unwrap_or_else(|e| panic!("{}", e));
19+
1620
// create or overwrite the local file
1721
std::fs::OpenOptions::new()
1822
.write(true)

src/parquet_copy_hook.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
pub(crate) mod copy_from;
22
pub(crate) mod copy_to;
33
pub(crate) mod copy_to_dest_receiver;
4+
pub(crate) mod copy_to_split_dest_receiver;
45
pub(crate) mod copy_utils;
56
pub(crate) mod hook;
67
pub(crate) mod pg_compat;

src/parquet_copy_hook/copy_to_dest_receiver.rs

Lines changed: 60 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -9,22 +9,12 @@ use pg_sys::{
99
};
1010
use pgrx::{prelude::*, FromDatum, PgList, PgMemoryContexts, PgTupleDesc};
1111

12-
use crate::arrow_parquet::{
13-
compression::{PgParquetCompression, INVALID_COMPRESSION_LEVEL},
14-
parquet_writer::{ParquetWriterContext, DEFAULT_ROW_GROUP_SIZE, DEFAULT_ROW_GROUP_SIZE_BYTES},
15-
uri_utils::parse_uri,
16-
};
12+
use crate::arrow_parquet::{parquet_writer::ParquetWriterContext, uri_utils::parse_uri};
1713

18-
#[repr(C)]
19-
struct CopyToParquetOptions {
20-
pub row_group_size: i64,
21-
pub row_group_size_bytes: i64,
22-
pub compression: PgParquetCompression,
23-
pub compression_level: i32,
24-
}
14+
use super::copy_to_split_dest_receiver::CopyToParquetOptions;
2515

2616
#[repr(C)]
27-
struct CopyToParquetDestReceiver {
17+
pub(crate) struct CopyToParquetDestReceiver {
2818
dest: DestReceiver,
2919
natts: usize,
3020
tupledesc: TupleDesc,
@@ -95,6 +85,16 @@ impl CopyToParquetDestReceiver {
9585
.any(|size| size > MAX_ARROW_ARRAY_SIZE)
9686
}
9787

88+
pub(crate) fn collected_bytes(&self) -> usize {
89+
let current_parquet_writer_context = unsafe {
90+
self.parquet_writer_context
91+
.as_ref()
92+
.expect("parquet writer context is not found")
93+
};
94+
95+
current_parquet_writer_context.bytes_written()
96+
}
97+
9898
fn write_tuples_to_parquet(&mut self) {
9999
debug_assert!(!self.tupledesc.is_null());
100100

@@ -143,7 +143,11 @@ impl CopyToParquetDestReceiver {
143143
}
144144

145145
#[pg_guard]
146-
extern "C" fn copy_startup(dest: *mut DestReceiver, _operation: i32, tupledesc: TupleDesc) {
146+
pub(crate) extern "C" fn copy_startup(
147+
dest: *mut DestReceiver,
148+
_operation: i32,
149+
tupledesc: TupleDesc,
150+
) {
147151
let parquet_dest = unsafe {
148152
(dest as *mut CopyToParquetDestReceiver)
149153
.as_mut()
@@ -184,7 +188,7 @@ extern "C" fn copy_startup(dest: *mut DestReceiver, _operation: i32, tupledesc:
184188
}
185189

186190
#[pg_guard]
187-
extern "C" fn copy_receive(slot: *mut TupleTableSlot, dest: *mut DestReceiver) -> bool {
191+
pub(crate) extern "C" fn copy_receive(slot: *mut TupleTableSlot, dest: *mut DestReceiver) -> bool {
188192
let parquet_dest = unsafe {
189193
(dest as *mut CopyToParquetDestReceiver)
190194
.as_mut()
@@ -235,7 +239,7 @@ extern "C" fn copy_receive(slot: *mut TupleTableSlot, dest: *mut DestReceiver) -
235239
}
236240

237241
#[pg_guard]
238-
extern "C" fn copy_shutdown(dest: *mut DestReceiver) {
242+
pub(crate) extern "C" fn copy_shutdown(dest: *mut DestReceiver) {
239243
let parquet_dest = unsafe {
240244
(dest as *mut CopyToParquetDestReceiver)
241245
.as_mut()
@@ -250,79 +254,7 @@ extern "C" fn copy_shutdown(dest: *mut DestReceiver) {
250254
}
251255

252256
#[pg_guard]
253-
extern "C" fn copy_destroy(_dest: *mut DestReceiver) {}
254-
255-
// create_copy_to_parquet_dest_receiver creates a new CopyToParquetDestReceiver that can be
256-
// used as a destination receiver for COPY TO command. All arguments, except "uri", are optional
257-
// and have default values if not provided.
258-
#[pg_guard]
259-
#[no_mangle]
260-
pub extern "C" fn create_copy_to_parquet_dest_receiver(
261-
uri: *const c_char,
262-
row_group_size: *const i64,
263-
row_group_size_bytes: *const i64,
264-
compression: *const PgParquetCompression,
265-
compression_level: *const i32,
266-
) -> *mut DestReceiver {
267-
let per_copy_context = unsafe {
268-
AllocSetContextCreateExtended(
269-
CurrentMemoryContext as _,
270-
"ParquetCopyDestReceiver".as_pg_cstr(),
271-
ALLOCSET_DEFAULT_MINSIZE as _,
272-
ALLOCSET_DEFAULT_INITSIZE as _,
273-
ALLOCSET_DEFAULT_MAXSIZE as _,
274-
)
275-
};
276-
277-
let row_group_size = if row_group_size.is_null() {
278-
DEFAULT_ROW_GROUP_SIZE
279-
} else {
280-
unsafe { *row_group_size }
281-
};
282-
283-
let row_group_size_bytes = if row_group_size_bytes.is_null() {
284-
DEFAULT_ROW_GROUP_SIZE_BYTES
285-
} else {
286-
unsafe { *row_group_size_bytes }
287-
};
288-
289-
let compression = if compression.is_null() {
290-
PgParquetCompression::default()
291-
} else {
292-
unsafe { *compression }
293-
};
294-
295-
let compression_level = if compression_level.is_null() {
296-
compression
297-
.default_compression_level()
298-
.unwrap_or(INVALID_COMPRESSION_LEVEL)
299-
} else {
300-
unsafe { *compression_level }
301-
};
302-
303-
let mut parquet_dest =
304-
unsafe { PgBox::<CopyToParquetDestReceiver, AllocatedByPostgres>::alloc0() };
305-
306-
parquet_dest.dest.receiveSlot = Some(copy_receive);
307-
parquet_dest.dest.rStartup = Some(copy_startup);
308-
parquet_dest.dest.rShutdown = Some(copy_shutdown);
309-
parquet_dest.dest.rDestroy = Some(copy_destroy);
310-
parquet_dest.dest.mydest = CommandDest::DestCopyOut;
311-
parquet_dest.uri = uri;
312-
parquet_dest.tupledesc = std::ptr::null_mut();
313-
parquet_dest.parquet_writer_context = std::ptr::null_mut();
314-
parquet_dest.natts = 0;
315-
parquet_dest.collected_tuple_count = 0;
316-
parquet_dest.collected_tuples = std::ptr::null_mut();
317-
parquet_dest.collected_tuple_column_sizes = std::ptr::null_mut();
318-
parquet_dest.copy_options.row_group_size = row_group_size;
319-
parquet_dest.copy_options.row_group_size_bytes = row_group_size_bytes;
320-
parquet_dest.copy_options.compression = compression;
321-
parquet_dest.copy_options.compression_level = compression_level;
322-
parquet_dest.per_copy_context = per_copy_context;
323-
324-
unsafe { std::mem::transmute(parquet_dest) }
325-
}
257+
pub(crate) extern "C" fn copy_destroy(_dest: *mut DestReceiver) {}
326258

327259
fn tuple_column_sizes(tuple_datums: &[Option<Datum>], tupledesc: &PgTupleDesc) -> Vec<i32> {
328260
let mut column_sizes = vec![];
@@ -361,3 +293,42 @@ fn tuple_column_sizes(tuple_datums: &[Option<Datum>], tupledesc: &PgTupleDesc) -
361293

362294
column_sizes
363295
}
296+
297+
// create_copy_to_parquet_dest_receiver creates a new CopyToParquetDestReceiver that can be
298+
// used as a destination receiver for COPY TO command. All arguments, except "uri", are optional
299+
// and have default values if not provided.
300+
#[pg_guard]
301+
pub(crate) fn create_copy_to_parquet_dest_receiver(
302+
uri: *const c_char,
303+
options: CopyToParquetOptions,
304+
) -> *mut CopyToParquetDestReceiver {
305+
let per_copy_context = unsafe {
306+
AllocSetContextCreateExtended(
307+
CurrentMemoryContext as _,
308+
"ParquetCopyDestReceiver".as_pg_cstr(),
309+
ALLOCSET_DEFAULT_MINSIZE as _,
310+
ALLOCSET_DEFAULT_INITSIZE as _,
311+
ALLOCSET_DEFAULT_MAXSIZE as _,
312+
)
313+
};
314+
315+
let mut parquet_dest =
316+
unsafe { PgBox::<CopyToParquetDestReceiver, AllocatedByPostgres>::alloc0() };
317+
318+
parquet_dest.dest.receiveSlot = Some(copy_receive);
319+
parquet_dest.dest.rStartup = Some(copy_startup);
320+
parquet_dest.dest.rShutdown = Some(copy_shutdown);
321+
parquet_dest.dest.rDestroy = Some(copy_destroy);
322+
parquet_dest.dest.mydest = CommandDest::DestCopyOut;
323+
parquet_dest.uri = uri;
324+
parquet_dest.tupledesc = std::ptr::null_mut();
325+
parquet_dest.parquet_writer_context = std::ptr::null_mut();
326+
parquet_dest.natts = 0;
327+
parquet_dest.collected_tuple_count = 0;
328+
parquet_dest.collected_tuples = std::ptr::null_mut();
329+
parquet_dest.collected_tuple_column_sizes = std::ptr::null_mut();
330+
parquet_dest.copy_options = options;
331+
parquet_dest.per_copy_context = per_copy_context;
332+
333+
parquet_dest.into_pg()
334+
}

0 commit comments

Comments
 (0)