Skip to content

Commit 462c1a0

Browse files
committed
address
1 parent 59715d6 commit 462c1a0

File tree

6 files changed

+95
-56
lines changed

6 files changed

+95
-56
lines changed

README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -248,9 +248,9 @@ Supported authorization methods' priority order is shown below:
248248
## Copy Options
249249
`pg_parquet` supports the following options in the `COPY TO` command:
250250
- `format parquet`: you need to specify this option to read or write Parquet files which does not end with `.parquet[.<compression>]` extension,
251-
- `file_size_bytes <int>`: the total byte size per Parquet file. When set, the parquet files, with target size, are created under parent directory (named the same as file name without file extension). By default, when not specified, a single file is generated without creating a parent folder.
252-
- `row_group_size <int>`: the number of rows in each row group while writing Parquet files. The default row group size is `122880`,
253-
- `row_group_size_bytes <int>`: the total byte size of rows in each row group while writing Parquet files. The default row group size bytes is `row_group_size * 1024`,
251+
- `file_size_bytes <string>`: the total file size per Parquet file. When set, the parquet files, with target size, are created under parent directory (named the same as file name). By default, when not specified, a single file is generated without creating a parent folder. You can specify total bytes without unit like `file_size_bytes 2000000` or with unit (KB, MB, or GB) like `file_size_bytes '1MB'`,
252+
- `row_group_size <int64>`: the number of rows in each row group while writing Parquet files. The default row group size is `122880`,
253+
- `row_group_size_bytes <int64>`: the total byte size of rows in each row group while writing Parquet files. The default row group size bytes is `row_group_size * 1024`,
254254
- `compression <string>`: the compression format to use while writing Parquet files. The supported compression formats are `uncompressed`, `snappy`, `gzip`, `brotli`, `lz4`, `lz4raw` and `zstd`. The default compression format is `snappy`. If not specified, the compression format is determined by the file extension,
255255
- `compression_level <int>`: the compression level to use while writing Parquet files. The supported compression levels are only supported for `gzip`, `zstd` and `brotli` compression formats. The default compression level is `6` for `gzip (0-10)`, `1` for `zstd (1-22)` and `1` for `brotli (0-11)`.
256256

src/parquet_copy_hook/copy_to_dest_receiver.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -167,12 +167,9 @@ pub(crate) extern "C" fn copy_startup(
167167
};
168168
parquet_dest.natts = tupledesc.len();
169169

170-
parquet_dest.target_batch_size = if parquet_dest.copy_options.row_group_size < RECORD_BATCH_SIZE
171-
{
172-
parquet_dest.copy_options.row_group_size
173-
} else {
174-
RECORD_BATCH_SIZE
175-
};
170+
// handle when row group size is set less than RECORD_BATCH_SIZE
171+
parquet_dest.target_batch_size =
172+
std::cmp::min(parquet_dest.copy_options.row_group_size, RECORD_BATCH_SIZE);
176173

177174
let uri = unsafe { CStr::from_ptr(parquet_dest.uri) }
178175
.to_str()

src/parquet_copy_hook/copy_to_split_dest_receiver.rs

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -115,26 +115,11 @@ impl CopyToParquetSplitDestReceiver {
115115
.to_str()
116116
.expect("invalid uri");
117117

118-
let file_name = Path::new(uri)
119-
.file_name()
120-
.expect("invalid uri")
121-
.to_str()
122-
.expect("invalid uri");
123-
124-
let (file_name_prefix, file_extension) = match file_name.find('.') {
125-
Some(index) => file_name.split_at(index),
126-
None => (file_name, ""),
127-
};
128-
129-
let parent_folder = Path::new(uri)
130-
.parent()
131-
.expect("invalid uri")
132-
.join(file_name_prefix);
118+
let parent_folder = Path::new(uri);
133119

134-
// append child id to final part of uri
135120
let file_id = self.current_child_id;
136121

137-
let child_uri = parent_folder.join(format!("data_{file_id}{file_extension}"));
122+
let child_uri = parent_folder.join(format!("data_{file_id}.parquet"));
138123

139124
child_uri.to_str().expect("invalid uri").as_pg_cstr()
140125
}

src/parquet_copy_hook/copy_utils.rs

Lines changed: 70 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -64,13 +64,16 @@ pub(crate) fn validate_copy_to_options(p_stmt: &PgBox<PlannedStmt>, uri_info: Pa
6464
let file_size_bytes_option = copy_stmt_get_option(p_stmt, "file_size_bytes");
6565

6666
if !file_size_bytes_option.is_null() {
67-
let file_size_bytes = unsafe { defGetInt64(file_size_bytes_option.as_ptr()) };
67+
let file_size_bytes = unsafe { defGetString(file_size_bytes_option.as_ptr()) };
6868

69-
const ONE_MB: i64 = 1024 * 1024;
69+
let file_size_bytes = unsafe {
70+
CStr::from_ptr(file_size_bytes)
71+
.to_str()
72+
.expect("file_size_bytes option is not a valid CString")
73+
};
7074

71-
if file_size_bytes < ONE_MB {
72-
panic!("file_size_bytes must be at least {ONE_MB} bytes");
73-
}
75+
parse_file_size(file_size_bytes)
76+
.unwrap_or_else(|e| panic!("file_size_bytes option is not valid: {}", e));
7477
}
7578

7679
let row_group_size_option = copy_stmt_get_option(p_stmt, "row_group_size");
@@ -204,7 +207,16 @@ pub(crate) fn copy_to_stmt_file_size_bytes(p_stmt: &PgBox<PlannedStmt>) -> i64 {
204207
if file_size_bytes_option.is_null() {
205208
INVALID_FILE_SIZE_BYTES
206209
} else {
207-
unsafe { defGetInt64(file_size_bytes_option.as_ptr()) }
210+
let file_size_bytes = unsafe { defGetString(file_size_bytes_option.as_ptr()) };
211+
212+
let file_size_bytes = unsafe {
213+
CStr::from_ptr(file_size_bytes)
214+
.to_str()
215+
.expect("file_size_bytes option is not a valid CString")
216+
};
217+
218+
parse_file_size(file_size_bytes)
219+
.unwrap_or_else(|e| panic!("file_size_bytes option is not valid: {}", e)) as i64
208220
}
209221
}
210222

@@ -590,3 +602,55 @@ pub(crate) fn create_filtered_tupledesc_for_relation<'a>(
590602

591603
filtered_tupledesc
592604
}
605+
606+
/// Parses a size string like "1MB", "512KB", or just "1000000" into a byte count.
607+
/// Enforces a minimum of 1MB.
608+
fn parse_file_size(size_str: &str) -> Result<u64, String> {
609+
// Normalize casing and trim whitespace
610+
let size_str = size_str.trim().to_uppercase();
611+
612+
// Find the first non-digit character
613+
let mut idx = 0;
614+
for c in size_str.chars() {
615+
if !c.is_ascii_digit() {
616+
break;
617+
}
618+
idx += 1;
619+
}
620+
621+
// If there's no numeric portion, return an error
622+
if idx == 0 {
623+
return Err(format!("No numeric value found in '{}'", size_str));
624+
}
625+
626+
// Split into numeric part and (optional) unit
627+
let num_part = &size_str[..idx];
628+
let unit_part = size_str[idx..].trim();
629+
630+
// Convert the numeric portion
631+
let mut bytes = match num_part.parse::<u64>() {
632+
Ok(n) => n,
633+
Err(_) => return Err(format!("Invalid numeric portion in '{}'", size_str)),
634+
};
635+
636+
// Interpret the suffix, if present
637+
match unit_part {
638+
"" => { /* no suffix: treat as bytes */ }
639+
"KB" => bytes *= 1_024,
640+
"MB" => bytes *= 1_024 * 1_024,
641+
"GB" => bytes *= 1_024 * 1_024 * 1_024,
642+
_ => {
643+
return Err(format!(
644+
"Unrecognized unit '{}'. Allowed units are KB, MB or GB.",
645+
unit_part
646+
))
647+
}
648+
}
649+
650+
// Enforce a minimum of 1MB
651+
if bytes < 1_024 * 1_024 {
652+
return Err(format!("Minimum allowed size is 1MB. Got {} bytes.", bytes));
653+
}
654+
655+
Ok(bytes)
656+
}

src/pgrx_tests/common.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ pub(crate) fn comma_separated_copy_options(options: &HashMap<String, CopyOptionV
4040
}
4141

4242
pub(crate) const LOCAL_TEST_FILE_PATH: &str = "/tmp/pg_parquet_test.parquet";
43-
pub(crate) const LOCAL_TEST_FOLDER_PATH: &str = "/tmp/pg_parquet_test";
4443

4544
pub(crate) struct TestTable<T: IntoDatum + FromDatum> {
4645
uri: String,

src/pgrx_tests/copy_options.rs

Lines changed: 17 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,7 @@ mod tests {
55
use pgrx::{pg_test, Spi};
66

77
use crate::{
8-
pgrx_tests::common::{
9-
CopyOptionValue, TestTable, LOCAL_TEST_FILE_PATH, LOCAL_TEST_FOLDER_PATH,
10-
},
8+
pgrx_tests::common::{CopyOptionValue, TestTable, LOCAL_TEST_FILE_PATH},
119
PgParquetCompression,
1210
};
1311

@@ -282,15 +280,16 @@ mod tests {
282280
}
283281

284282
#[pg_test]
285-
#[should_panic(expected = "file_size_bytes must be at least 1048576 bytes")]
283+
#[should_panic(expected = "Minimum allowed size is 1MB. Got 102400 bytes.")]
286284
fn test_invalid_file_size_bytes() {
287-
let parent_folder = Path::new(LOCAL_TEST_FOLDER_PATH);
285+
let parent_folder = Path::new(LOCAL_TEST_FILE_PATH);
288286
std::fs::remove_dir_all(parent_folder).ok();
287+
std::fs::remove_file(parent_folder).ok();
289288

290289
let mut copy_options = HashMap::new();
291290
copy_options.insert(
292291
"file_size_bytes".to_string(),
293-
CopyOptionValue::IntOption(100),
292+
CopyOptionValue::StringOption("100KB".into()),
294293
);
295294

296295
let test_table = TestTable::<i32>::new("int4".into()).with_copy_to_options(copy_options);
@@ -414,8 +413,6 @@ mod tests {
414413

415414
#[pg_test]
416415
fn test_file_size_bytes() {
417-
let parent_folder = Path::new(LOCAL_TEST_FOLDER_PATH);
418-
419416
let uris = [
420417
// with ".parquet" extension
421418
LOCAL_TEST_FILE_PATH.to_string(),
@@ -432,29 +429,26 @@ mod tests {
432429

433430
for (uri, expected_file_count) in uris.into_iter().zip(expected_file_counts) {
434431
// cleanup
432+
433+
// drop tables
435434
Spi::run("drop table if exists test_expected, test_result;").unwrap();
436-
std::fs::remove_dir_all(parent_folder).ok();
437435

438-
const ONE_MB: i32 = 1024 * 1024;
436+
let parent_folder = Path::new(&uri);
437+
438+
// remove if there is a directory
439+
std::fs::remove_dir(parent_folder).ok();
440+
441+
// remove if there is a file
442+
std::fs::remove_file(parent_folder).ok();
443+
439444
let setup_commands = format!(
440445
"create table test_expected(a text);\n\
441446
create table test_result(a text);\n\
442447
insert into test_expected select 'hellooooo' || i from generate_series(1, 1000000) i;\n\
443-
copy test_expected to '{uri}' with (format parquet, file_size_bytes {ONE_MB})");
448+
copy test_expected to '{uri}' with (format parquet, file_size_bytes '1MB')");
444449
Spi::run(&setup_commands).unwrap();
445450

446451
// assert file count
447-
let file_name = Path::new(&uri)
448-
.file_name()
449-
.expect("invalid uri")
450-
.to_str()
451-
.expect("invalid uri");
452-
453-
let file_extension = file_name
454-
.find('.')
455-
.map(|idx| &file_name[idx..])
456-
.unwrap_or("");
457-
458452
let mut file_entries = parent_folder
459453
.read_dir()
460454
.unwrap()
@@ -473,7 +467,7 @@ mod tests {
473467

474468
// assert file paths
475469
for (file_idx, file_entry) in file_entries.iter().enumerate() {
476-
let expected_path = parent_folder.join(format!("data_{file_idx}{file_extension}"));
470+
let expected_path = parent_folder.join(format!("data_{file_idx}.parquet"));
477471

478472
let expected_path = expected_path.to_str().unwrap();
479473

0 commit comments

Comments
 (0)