Skip to content

Commit 9b8dd11

Browse files
committed
Support parquet v2 format
Supports reading or writing parquet files with format v2. By default, we write files in v1 format to improve interop with common query engines. But you can specify `COPY table TO '..' with (parquet_version 'v2')`. Closes #104.
1 parent a1193c9 commit 9b8dd11

File tree

8 files changed

+156
-2
lines changed

8 files changed

+156
-2
lines changed

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,8 @@ Supported Google Cloud Storage uri formats are shown below:
310310
- `row_group_size <int64>`: the number of rows in each row group while writing Parquet files. The default row group size is `122880`,
311311
- `row_group_size_bytes <int64>`: the total byte size of rows in each row group while writing Parquet files. The default row group size bytes is `row_group_size * 1024`,
312312
- `compression <string>`: the compression format to use while writing Parquet files. The supported compression formats are `uncompressed`, `snappy`, `gzip`, `brotli`, `lz4`, `lz4raw` and `zstd`. The default compression format is `snappy`. If not specified, the compression format is determined by the file extension,
313-
- `compression_level <int>`: the compression level to use while writing Parquet files. The supported compression levels are only supported for `gzip`, `zstd` and `brotli` compression formats. The default compression level is `6` for `gzip (0-10)`, `1` for `zstd (1-22)` and `1` for `brotli (0-11)`.
313+
- `compression_level <int>`: the compression level to use while writing Parquet files. The supported compression levels are only supported for `gzip`, `zstd` and `brotli` compression formats. The default compression level is `6` for `gzip (0-10)`, `1` for `zstd (1-22)` and `1` for `brotli (0-11)`,
314+
- `parquet_version <string>`: writer version of the Parquet file. By default, it is set to `v1` to be more interoperable with common query engines. (some are not able to read v2 files) You can set it to `v2` to unlock some of the new encodings.
314315

315316
`pg_parquet` supports the following options in the `COPY FROM` command:
316317
- `format parquet`: you need to specify this option to read or write Parquet files which does not end with `.parquet[.<compression>]` extension,

src/arrow_parquet.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ pub(crate) mod compression;
44
pub(crate) mod field_ids;
55
pub(crate) mod match_by;
66
pub(crate) mod parquet_reader;
7+
pub(crate) mod parquet_version;
78
pub(crate) mod parquet_writer;
89
pub(crate) mod pg_to_arrow;
910
pub(crate) mod schema_parser;
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
use std::str::FromStr;
2+
3+
use parquet::file::properties::WriterVersion;
4+
5+
#[repr(C)]
6+
#[derive(Debug, Clone, Copy, Default, PartialEq)]
7+
pub(crate) enum ParquetVersion {
8+
#[default]
9+
V1,
10+
V2,
11+
}
12+
13+
impl FromStr for ParquetVersion {
14+
type Err = String;
15+
16+
fn from_str(s: &str) -> Result<Self, Self::Err> {
17+
match s.to_lowercase().as_str() {
18+
"v1" => Ok(ParquetVersion::V1),
19+
"v2" => Ok(ParquetVersion::V2),
20+
_ => Err(format!(
21+
"unrecognized parquet version: {s}. v1 or v2 is supported.",
22+
)),
23+
}
24+
}
25+
}
26+
27+
impl From<ParquetVersion> for WriterVersion {
28+
fn from(value: ParquetVersion) -> Self {
29+
match value {
30+
ParquetVersion::V1 => WriterVersion::PARQUET_1_0,
31+
ParquetVersion::V2 => WriterVersion::PARQUET_2_0,
32+
}
33+
}
34+
}

src/arrow_parquet/parquet_writer.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ impl ParquetWriterContext {
9494
.set_statistics_enabled(EnabledStatistics::Page)
9595
.set_compression(compression.into())
9696
.set_max_row_group_size(options.row_group_size as usize)
97+
.set_writer_version(options.parquet_version.into())
9798
.set_created_by("pg_parquet".to_string());
9899

99100
let geometry_columns_metadata_value = geoparquet_metadata_json_from_tupledesc(tupledesc);

src/parquet_copy_hook/copy_to_split_dest_receiver.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use pgrx::prelude::*;
99
use crate::arrow_parquet::{
1010
compression::{PgParquetCompression, INVALID_COMPRESSION_LEVEL},
1111
field_ids::FieldIds,
12+
parquet_version::ParquetVersion,
1213
parquet_writer::{DEFAULT_ROW_GROUP_SIZE, DEFAULT_ROW_GROUP_SIZE_BYTES},
1314
};
1415

@@ -39,6 +40,7 @@ pub(crate) struct CopyToParquetOptions {
3940
pub(crate) row_group_size_bytes: i64,
4041
pub(crate) compression: PgParquetCompression,
4142
pub(crate) compression_level: i32,
43+
pub(crate) parquet_version: ParquetVersion,
4244
}
4345

4446
impl CopyToParquetSplitDestReceiver {
@@ -212,6 +214,7 @@ pub extern "C-unwind" fn create_copy_to_parquet_split_dest_receiver(
212214
row_group_size_bytes: *const i64,
213215
compression: *const PgParquetCompression,
214216
compression_level: *const i32,
217+
parquet_version: *const ParquetVersion,
215218
) -> *mut DestReceiver {
216219
let file_size_bytes = if file_size_bytes.is_null() {
217220
INVALID_FILE_SIZE_BYTES
@@ -251,13 +254,20 @@ pub extern "C-unwind" fn create_copy_to_parquet_split_dest_receiver(
251254
unsafe { *compression_level }
252255
};
253256

257+
let parquet_version = if parquet_version.is_null() {
258+
ParquetVersion::default()
259+
} else {
260+
unsafe { *parquet_version }
261+
};
262+
254263
let options = CopyToParquetOptions {
255264
file_size_bytes,
256265
field_ids,
257266
row_group_size,
258267
row_group_size_bytes,
259268
compression,
260269
compression_level,
270+
parquet_version,
261271
};
262272

263273
let mut split_dest =

src/parquet_copy_hook/copy_utils.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use crate::{
2020
compression::{all_supported_compressions, PgParquetCompression},
2121
field_ids,
2222
match_by::MatchBy,
23+
parquet_version::ParquetVersion,
2324
parquet_writer::{DEFAULT_ROW_GROUP_SIZE, DEFAULT_ROW_GROUP_SIZE_BYTES},
2425
uri_utils::ParsedUriInfo,
2526
},
@@ -44,6 +45,7 @@ pub(crate) fn validate_copy_to_options(p_stmt: &PgBox<PlannedStmt>, uri_info: &P
4445
"row_group_size_bytes",
4546
"compression",
4647
"compression_level",
48+
"parquet_version",
4749
"freeze",
4850
],
4951
);
@@ -156,6 +158,20 @@ pub(crate) fn validate_copy_to_options(p_stmt: &PgBox<PlannedStmt>, uri_info: &P
156158

157159
compression.ensure_compression_level(compression_level);
158160
}
161+
162+
let parquet_version_option = copy_stmt_get_option(p_stmt, "parquet_version");
163+
164+
if !parquet_version_option.is_null() {
165+
let parquet_version = unsafe { defGetString(parquet_version_option.as_ptr()) };
166+
167+
let parquet_version = unsafe {
168+
CStr::from_ptr(parquet_version)
169+
.to_str()
170+
.expect("parquet_version option is not a valid CString")
171+
};
172+
173+
ParquetVersion::from_str(parquet_version).unwrap_or_else(|e| panic!("{}", e));
174+
}
159175
}
160176

161177
pub(crate) fn validate_copy_from_options(p_stmt: &PgBox<PlannedStmt>) {
@@ -313,6 +329,23 @@ pub(crate) fn copy_to_stmt_compression_level(
313329
}
314330
}
315331

332+
pub(crate) fn copy_to_stmt_parquet_version(p_stmt: &PgBox<PlannedStmt>) -> ParquetVersion {
333+
let parquet_version_option = copy_stmt_get_option(p_stmt, "parquet_version");
334+
335+
if parquet_version_option.is_null() {
336+
ParquetVersion::default()
337+
} else {
338+
let parquet_version = unsafe { defGetString(parquet_version_option.as_ptr()) };
339+
let parquet_version = unsafe {
340+
CStr::from_ptr(parquet_version)
341+
.to_str()
342+
.expect("parquet_version option is not a valid CString")
343+
};
344+
345+
ParquetVersion::from_str(parquet_version).unwrap_or_else(|e| panic!("{}", e))
346+
}
347+
}
348+
316349
pub(crate) fn copy_from_stmt_create_option_list(p_stmt: &PgBox<PlannedStmt>) -> PgList<DefElem> {
317350
let mut new_copy_options = PgList::<DefElem>::new();
318351

src/parquet_copy_hook/hook.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use super::{
2424
copy_to_split_dest_receiver::free_copy_to_parquet_split_dest_receiver,
2525
copy_utils::{
2626
copy_to_stmt_compression, copy_to_stmt_field_ids, copy_to_stmt_file_size_bytes,
27-
validate_copy_from_options, validate_copy_to_options,
27+
copy_to_stmt_parquet_version, validate_copy_from_options, validate_copy_to_options,
2828
},
2929
};
3030

@@ -66,6 +66,7 @@ fn process_copy_to_parquet(
6666
let row_group_size_bytes = copy_to_stmt_row_group_size_bytes(p_stmt);
6767
let compression = copy_to_stmt_compression(p_stmt, &uri_info);
6868
let compression_level = copy_to_stmt_compression_level(p_stmt, &uri_info);
69+
let parquet_version = copy_to_stmt_parquet_version(p_stmt);
6970

7071
let parquet_split_dest = create_copy_to_parquet_split_dest_receiver(
7172
uri_as_string(&uri).as_pg_cstr(),
@@ -76,6 +77,7 @@ fn process_copy_to_parquet(
7677
&row_group_size_bytes,
7778
&compression,
7879
&compression_level.unwrap_or(INVALID_COMPRESSION_LEVEL),
80+
&parquet_version,
7981
);
8082

8183
let parquet_split_dest = unsafe { PgBox::from_pg(parquet_split_dest) };

src/pgrx_tests/copy_options.rs

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,22 @@ mod tests {
298298
test_table.assert_expected_and_result_rows();
299299
}
300300

301+
#[pg_test]
302+
#[should_panic(expected = "unrecognized parquet version: vv2. v1 or v2 is supported.")]
303+
fn test_invalid_parquet_version() {
304+
let _file_cleanup = FileCleanup::new(LOCAL_TEST_FILE_PATH);
305+
306+
let mut copy_options = HashMap::new();
307+
copy_options.insert(
308+
"parquet_version".to_string(),
309+
CopyOptionValue::StringOption("vv2".into()),
310+
);
311+
312+
let test_table = TestTable::<i32>::new("int4".into()).with_copy_to_options(copy_options);
313+
test_table.insert("INSERT INTO test_expected (a) VALUES (1), (2), (null);");
314+
test_table.assert_expected_and_result_rows();
315+
}
316+
301317
#[pg_test]
302318
fn test_large_arrow_array_limit() {
303319
// disable row group size bytes limit
@@ -412,6 +428,62 @@ mod tests {
412428
assert_eq!(result_metadata, vec![12]);
413429
}
414430

431+
#[pg_test]
432+
fn test_parquet_version() {
433+
let fetch_parquet_version = || {
434+
Spi::connect(|client| {
435+
let parquet_file_metadata_command = format!(
436+
"select * from parquet.file_metadata('{}');",
437+
LOCAL_TEST_FILE_PATH
438+
);
439+
440+
let mut results = Vec::new();
441+
let tup_table = client
442+
.select(&parquet_file_metadata_command, None, &[])
443+
.unwrap();
444+
445+
for row in tup_table {
446+
let parquet_version = row["format_version"].value::<String>().unwrap().unwrap();
447+
results.push(parquet_version);
448+
}
449+
450+
results
451+
})
452+
};
453+
454+
// try writing/reading from parquet v2 file
455+
let mut copy_to_options = HashMap::new();
456+
copy_to_options.insert(
457+
"parquet_version".to_string(),
458+
CopyOptionValue::StringOption("v2".to_string()),
459+
);
460+
461+
let test_table = TestTable::<i32>::new("int4".into()).with_copy_to_options(copy_to_options);
462+
test_table.insert("INSERT INTO test_expected (a) select i from generate_series(1,10) i;");
463+
test_table.assert_expected_and_result_rows();
464+
465+
// v2
466+
let copy_to_parquet = format!(
467+
"copy (select 1 as id) to '{LOCAL_TEST_FILE_PATH}' with (parquet_version 'v2');",
468+
);
469+
Spi::run(&copy_to_parquet).unwrap();
470+
assert_eq!(fetch_parquet_version(), vec!["2".to_string()]);
471+
472+
// v1
473+
let copy_to_parquet = format!(
474+
"copy (select 1 as id) to '{LOCAL_TEST_FILE_PATH}' with (parquet_version 'v1');",
475+
);
476+
Spi::run(&copy_to_parquet).unwrap();
477+
assert_eq!(fetch_parquet_version(), vec!["1".to_string()]);
478+
479+
// V1
480+
let copy_to_parquet = format!(
481+
"copy (select 1 as id) to '{LOCAL_TEST_FILE_PATH}' with (parquet_version 'V1');",
482+
);
483+
Spi::run(&copy_to_parquet).unwrap();
484+
assert_eq!(fetch_parquet_version(), vec!["1".to_string()]);
485+
}
486+
415487
#[pg_test]
416488
fn test_file_size_bytes() {
417489
let uris = [

0 commit comments

Comments
 (0)