Skip to content

Commit 8cec92c

Browse files
Support parquet v2 format (#127)
Supports reading or writing parquet files with format v2. By default, we write files in v1 format to improve interop with common query engines. But you can specify COPY table TO '..' with (parquet_version 'v2'). Closes #104.
1 parent cc442d0 commit 8cec92c

File tree

11 files changed

+220
-121
lines changed

11 files changed

+220
-121
lines changed

.github/workflows/ci.yml

Lines changed: 19 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,9 @@ jobs:
2323
runs_on: [ 'ubuntu-22.04', 'ubuntu-22.04-arm' ]
2424
include:
2525
- runs_on: ubuntu-22.04
26-
arch: x64
26+
arch: x86_64
2727
- runs_on: ubuntu-22.04-arm
28-
arch: arm64
28+
arch: aarch64
2929

3030
runs-on: ${{ matrix.runs_on }}
3131

@@ -36,44 +36,28 @@ jobs:
3636
steps:
3737
- uses: actions/checkout@v4
3838

39-
- name: Set up sccache for x86_64
40-
if: ${{ matrix.arch == 'x64' }}
39+
- name: Set up sccache
4140
run: |
42-
wget https://github.com/mozilla/sccache/releases/download/v$SCCACHE_VERSION/sccache-v$SCCACHE_VERSION-x86_64-unknown-linux-musl.tar.gz
43-
tar -xzf sccache-v$SCCACHE_VERSION-x86_64-unknown-linux-musl.tar.gz
44-
sudo mv sccache-v$SCCACHE_VERSION-x86_64-unknown-linux-musl/sccache /usr/local/bin
41+
wget https://github.com/mozilla/sccache/releases/download/v$SCCACHE_VERSION/sccache-v$SCCACHE_VERSION-${{ matrix.arch }}-unknown-linux-musl.tar.gz
42+
tar -xzf sccache-v$SCCACHE_VERSION-${{ matrix.arch }}-unknown-linux-musl.tar.gz
43+
sudo mv sccache-v$SCCACHE_VERSION-${{ matrix.arch }}-unknown-linux-musl/sccache /usr/local/bin
4544
chmod +x /usr/local/bin/sccache
45+
if [ "${{ matrix.arch }}" == "x86_64" ]; then
46+
SCCACHE_SHA256=$SCCACHE_x86_64_SHA256
47+
else
48+
SCCACHE_SHA256=$SCCACHE_AARCH64_SHA256
49+
fi
4650
echo "$SCCACHE_SHA256 /usr/local/bin/sccache" | sha256sum --check
4751
env:
4852
SCCACHE_VERSION: 0.8.1
49-
SCCACHE_SHA256: "7203a4dcb3a67f3a0272366d50ede22e5faa3e2a798deaa4d1ea377b51c0ab0c"
53+
SCCACHE_x86_64_SHA256: "7203a4dcb3a67f3a0272366d50ede22e5faa3e2a798deaa4d1ea377b51c0ab0c"
54+
SCCACHE_AARCH64_SHA256: "36b2fd1c6c3a104ec1d526edb0533a3827c266054bf4552fb97f524beff6a612"
5055

51-
- name: Set up sccache for arm64
52-
if: ${{ matrix.arch == 'arm64' }}
53-
run: |
54-
wget https://github.com/mozilla/sccache/releases/download/v$SCCACHE_VERSION/sccache-v$SCCACHE_VERSION-aarch64-unknown-linux-musl.tar.gz
55-
tar -xzf sccache-v$SCCACHE_VERSION-aarch64-unknown-linux-musl.tar.gz
56-
sudo mv sccache-v$SCCACHE_VERSION-aarch64-unknown-linux-musl/sccache /usr/local/bin
57-
chmod +x /usr/local/bin/sccache
58-
echo "$SCCACHE_SHA256 /usr/local/bin/sccache" | sha256sum --check
59-
env:
60-
SCCACHE_VERSION: 0.8.1
61-
SCCACHE_SHA256: "36b2fd1c6c3a104ec1d526edb0533a3827c266054bf4552fb97f524beff6a612"
62-
63-
- name: Set up Rust for x86_64
64-
if: ${{ matrix.arch == 'x64' }}
65-
uses: dtolnay/rust-toolchain@stable
66-
with:
67-
toolchain: 1.88.0
68-
target: x86_64-unknown-linux-gnu
69-
components: rustfmt, clippy, llvm-tools-preview
70-
71-
- name: Set up Rust for arm64
72-
if: ${{ matrix.arch == 'arm64' }}
56+
- name: Set up Rust
7357
uses: dtolnay/rust-toolchain@stable
7458
with:
75-
toolchain: 1.88.0
76-
target: aarch64-unknown-linux-gnu
59+
toolchain: 1.89.0
60+
target: ${{ matrix.arch }}-unknown-linux-gnu
7761
components: rustfmt, clippy, llvm-tools-preview
7862

7963
- name: Cache cargo registry
@@ -150,17 +134,17 @@ jobs:
150134
make check-lint
151135
152136
- name: Run tests without coverage
153-
if: ${{ env.PG_MAJOR != '17' || matrix.arch != 'x64' }}
137+
if: ${{ env.PG_MAJOR != '17' || matrix.arch != 'x86_64' }}
154138
run: |
155139
make check
156140
157141
- name: Run tests with coverage
158-
if: ${{ env.PG_MAJOR == '17' && matrix.arch == 'x64' }}
142+
if: ${{ env.PG_MAJOR == '17' && matrix.arch == 'x86_64' }}
159143
run: |
160144
make check-with-coverage
161145
162146
- name: Upload coverage report to Codecov
163-
if: ${{ env.PG_MAJOR == '17' && matrix.arch == 'x64' }}
147+
if: ${{ env.PG_MAJOR == '17' && matrix.arch == 'x86_64' }}
164148
uses: codecov/codecov-action@v4
165149
with:
166150
fail_ci_if_error: true

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,8 @@ Supported Google Cloud Storage uri formats are shown below:
310310
- `row_group_size <int64>`: the number of rows in each row group while writing Parquet files. The default row group size is `122880`,
311311
- `row_group_size_bytes <int64>`: the total byte size of rows in each row group while writing Parquet files. The default row group size bytes is `row_group_size * 1024`,
312312
- `compression <string>`: the compression format to use while writing Parquet files. The supported compression formats are `uncompressed`, `snappy`, `gzip`, `brotli`, `lz4`, `lz4raw` and `zstd`. The default compression format is `snappy`. If not specified, the compression format is determined by the file extension,
313-
- `compression_level <int>`: the compression level to use while writing Parquet files. The supported compression levels are only supported for `gzip`, `zstd` and `brotli` compression formats. The default compression level is `6` for `gzip (0-10)`, `1` for `zstd (1-22)` and `1` for `brotli (0-11)`.
313+
- `compression_level <int>`: the compression level to use while writing Parquet files. The supported compression levels are only supported for `gzip`, `zstd` and `brotli` compression formats. The default compression level is `6` for `gzip (0-10)`, `1` for `zstd (1-22)` and `1` for `brotli (0-11)`,
314+
- `parquet_version <string>`: writer version of the Parquet file. By default, it is set to `v1` to be more interoperable with common query engines. (some are not able to read v2 files) You can set it to `v2` to unlock some of the new encodings.
314315

315316
`pg_parquet` supports the following options in the `COPY FROM` command:
316317
- `format parquet`: you need to specify this option to read or write Parquet files which does not end with `.parquet[.<compression>]` extension,

src/arrow_parquet.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ pub(crate) mod compression;
44
pub(crate) mod field_ids;
55
pub(crate) mod match_by;
66
pub(crate) mod parquet_reader;
7+
pub(crate) mod parquet_version;
78
pub(crate) mod parquet_writer;
89
pub(crate) mod pg_to_arrow;
910
pub(crate) mod schema_parser;
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
use std::str::FromStr;
2+
3+
use parquet::file::properties::WriterVersion;
4+
5+
#[repr(C)]
6+
#[derive(Debug, Clone, Copy, Default, PartialEq)]
7+
pub(crate) enum ParquetVersion {
8+
#[default]
9+
V1,
10+
V2,
11+
}
12+
13+
impl FromStr for ParquetVersion {
14+
type Err = String;
15+
16+
fn from_str(s: &str) -> Result<Self, Self::Err> {
17+
match s.to_lowercase().as_str() {
18+
"v1" => Ok(ParquetVersion::V1),
19+
"v2" => Ok(ParquetVersion::V2),
20+
_ => Err(format!(
21+
"unrecognized parquet version: {s}. v1 or v2 is supported.",
22+
)),
23+
}
24+
}
25+
}
26+
27+
impl From<ParquetVersion> for WriterVersion {
28+
fn from(value: ParquetVersion) -> Self {
29+
match value {
30+
ParquetVersion::V1 => WriterVersion::PARQUET_1_0,
31+
ParquetVersion::V2 => WriterVersion::PARQUET_2_0,
32+
}
33+
}
34+
}

src/arrow_parquet/parquet_writer.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ impl ParquetWriterContext {
9494
.set_statistics_enabled(EnabledStatistics::Page)
9595
.set_compression(compression.into())
9696
.set_max_row_group_size(options.row_group_size as usize)
97+
.set_writer_version(options.parquet_version.into())
9798
.set_created_by("pg_parquet".to_string());
9899

99100
let geometry_columns_metadata_value = geoparquet_metadata_json_from_tupledesc(tupledesc);

src/parquet_copy_hook/copy_to_split_dest_receiver.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use pgrx::prelude::*;
99
use crate::arrow_parquet::{
1010
compression::{PgParquetCompression, INVALID_COMPRESSION_LEVEL},
1111
field_ids::FieldIds,
12+
parquet_version::ParquetVersion,
1213
parquet_writer::{DEFAULT_ROW_GROUP_SIZE, DEFAULT_ROW_GROUP_SIZE_BYTES},
1314
};
1415

@@ -39,6 +40,7 @@ pub(crate) struct CopyToParquetOptions {
3940
pub(crate) row_group_size_bytes: i64,
4041
pub(crate) compression: PgParquetCompression,
4142
pub(crate) compression_level: i32,
43+
pub(crate) parquet_version: ParquetVersion,
4244
}
4345

4446
impl CopyToParquetSplitDestReceiver {
@@ -212,6 +214,7 @@ pub extern "C-unwind" fn create_copy_to_parquet_split_dest_receiver(
212214
row_group_size_bytes: *const i64,
213215
compression: *const PgParquetCompression,
214216
compression_level: *const i32,
217+
parquet_version: *const ParquetVersion,
215218
) -> *mut DestReceiver {
216219
let file_size_bytes = if file_size_bytes.is_null() {
217220
INVALID_FILE_SIZE_BYTES
@@ -251,13 +254,20 @@ pub extern "C-unwind" fn create_copy_to_parquet_split_dest_receiver(
251254
unsafe { *compression_level }
252255
};
253256

257+
let parquet_version = if parquet_version.is_null() {
258+
ParquetVersion::default()
259+
} else {
260+
unsafe { *parquet_version }
261+
};
262+
254263
let options = CopyToParquetOptions {
255264
file_size_bytes,
256265
field_ids,
257266
row_group_size,
258267
row_group_size_bytes,
259268
compression,
260269
compression_level,
270+
parquet_version,
261271
};
262272

263273
let mut split_dest =

src/parquet_copy_hook/copy_utils.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use crate::{
2020
compression::{all_supported_compressions, PgParquetCompression},
2121
field_ids,
2222
match_by::MatchBy,
23+
parquet_version::ParquetVersion,
2324
parquet_writer::{DEFAULT_ROW_GROUP_SIZE, DEFAULT_ROW_GROUP_SIZE_BYTES},
2425
uri_utils::ParsedUriInfo,
2526
},
@@ -44,6 +45,7 @@ pub(crate) fn validate_copy_to_options(p_stmt: &PgBox<PlannedStmt>, uri_info: &P
4445
"row_group_size_bytes",
4546
"compression",
4647
"compression_level",
48+
"parquet_version",
4749
"freeze",
4850
],
4951
);
@@ -153,6 +155,20 @@ pub(crate) fn validate_copy_to_options(p_stmt: &PgBox<PlannedStmt>, uri_info: &P
153155

154156
compression.ensure_compression_level(compression_level);
155157
}
158+
159+
let parquet_version_option = copy_stmt_get_option(p_stmt, "parquet_version");
160+
161+
if !parquet_version_option.is_null() {
162+
let parquet_version = unsafe { defGetString(parquet_version_option.as_ptr()) };
163+
164+
let parquet_version = unsafe {
165+
CStr::from_ptr(parquet_version)
166+
.to_str()
167+
.expect("parquet_version option is not a valid CString")
168+
};
169+
170+
ParquetVersion::from_str(parquet_version).unwrap_or_else(|e| panic!("{}", e));
171+
}
156172
}
157173

158174
pub(crate) fn validate_copy_from_options(p_stmt: &PgBox<PlannedStmt>) {
@@ -307,6 +323,23 @@ pub(crate) fn copy_to_stmt_compression_level(
307323
}
308324
}
309325

326+
pub(crate) fn copy_to_stmt_parquet_version(p_stmt: &PgBox<PlannedStmt>) -> ParquetVersion {
327+
let parquet_version_option = copy_stmt_get_option(p_stmt, "parquet_version");
328+
329+
if parquet_version_option.is_null() {
330+
ParquetVersion::default()
331+
} else {
332+
let parquet_version = unsafe { defGetString(parquet_version_option.as_ptr()) };
333+
let parquet_version = unsafe {
334+
CStr::from_ptr(parquet_version)
335+
.to_str()
336+
.expect("parquet_version option is not a valid CString")
337+
};
338+
339+
ParquetVersion::from_str(parquet_version).unwrap_or_else(|e| panic!("{}", e))
340+
}
341+
}
342+
310343
pub(crate) fn copy_from_stmt_create_option_list(p_stmt: &PgBox<PlannedStmt>) -> PgList<DefElem> {
311344
let mut new_copy_options = PgList::<DefElem>::new();
312345

src/parquet_copy_hook/hook.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use super::{
2424
copy_to_split_dest_receiver::free_copy_to_parquet_split_dest_receiver,
2525
copy_utils::{
2626
copy_to_stmt_compression, copy_to_stmt_field_ids, copy_to_stmt_file_size_bytes,
27-
validate_copy_from_options, validate_copy_to_options,
27+
copy_to_stmt_parquet_version, validate_copy_from_options, validate_copy_to_options,
2828
},
2929
};
3030

@@ -64,6 +64,7 @@ fn process_copy_to_parquet(
6464
let row_group_size_bytes = copy_to_stmt_row_group_size_bytes(p_stmt);
6565
let compression = copy_to_stmt_compression(p_stmt, &uri_info);
6666
let compression_level = copy_to_stmt_compression_level(p_stmt, &uri_info);
67+
let parquet_version = copy_to_stmt_parquet_version(p_stmt);
6768

6869
let parquet_split_dest = create_copy_to_parquet_split_dest_receiver(
6970
uri_as_string(&uri_info.uri).as_pg_cstr(),
@@ -74,6 +75,7 @@ fn process_copy_to_parquet(
7475
&row_group_size_bytes,
7576
&compression,
7677
&compression_level.unwrap_or(INVALID_COMPRESSION_LEVEL),
78+
&parquet_version,
7779
);
7880

7981
let parquet_split_dest = unsafe { PgBox::from_pg(parquet_split_dest) };

0 commit comments

Comments
 (0)