Skip to content

Commit 8c2a38d

Browse files
committed
Supports field_ids during COPY TO
Adds `field_ids` option, which lets you specify how to assign field ids during COPY TO. Supported values for it: - `none` (default) => no field ids are written into parquet metadata. - `auto` => pg_parquet generates fields ids starting from 0. - `<json string>` => pg_parquet will use the given field ids. e.g. ```sql create table test_table(a int, b text[]); copy test_table to '/tmp/test.parquet' with (field_ids '{"a": 1, "b": {"__root_field_id": 2, "element": 3}}'); ``` Closes #106.
1 parent 5b5725d commit 8c2a38d

File tree

13 files changed

+897
-173
lines changed

13 files changed

+897
-173
lines changed

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,8 @@ Supported authorization methods' priority order is shown below:
242242
## Copy Options
243243
`pg_parquet` supports the following options in the `COPY TO` command:
244244
- `format parquet`: you need to specify this option to read or write Parquet files which does not end with `.parquet[.<compression>]` extension,
245-
- `file_size_bytes <int>`: the total byte size per Parquet file. When set, the parquet files, with target size, are created under parent directory (named the same as file name without file extension). By default, when not specified, a single file is generated without creating a parent folder.
245+
- `file_size_bytes <int>`: the total byte size per Parquet file. When set, the parquet files, with target size, are created under parent directory (named the same as file name without file extension). By default, when not specified, a single file is generated without creating a parent folder,
246+
- `field_ids <string>`: fields ids that are assigned to the fields in Parquet file schema. By default, no field ids are assigned. Pass `auto` to let pg_parquet generate field ids. You can pass a json string to explicitly pass the field ids,
246247
- `row_group_size <int>`: the number of rows in each row group while writing Parquet files. The default row group size is `122880`,
247248
- `row_group_size_bytes <int>`: the total byte size of rows in each row group while writing Parquet files. The default row group size bytes is `row_group_size * 1024`,
248249
- `compression <string>`: the compression format to use while writing Parquet files. The supported compression formats are `uncompressed`, `snappy`, `gzip`, `brotli`, `lz4`, `lz4raw` and `zstd`. The default compression format is `snappy`. If not specified, the compression format is determined by the file extension,

src/arrow_parquet.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
pub(crate) mod arrow_to_pg;
22
pub(crate) mod arrow_utils;
33
pub(crate) mod compression;
4+
pub(crate) mod field_ids;
45
pub(crate) mod match_by;
56
pub(crate) mod parquet_reader;
67
pub(crate) mod parquet_writer;

src/arrow_parquet/field_ids.rs

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
use std::{collections::HashMap, fmt::Display, str::FromStr};
2+
3+
use serde::{Deserialize, Serialize};
4+
5+
#[derive(Debug, Clone, Default)]
6+
pub(crate) enum FieldIds {
7+
#[default]
8+
None,
9+
Auto,
10+
Explicit(FieldIdMapping),
11+
}
12+
13+
impl FromStr for FieldIds {
14+
type Err = String;
15+
16+
fn from_str(s: &str) -> Result<Self, Self::Err> {
17+
match s {
18+
"none" => Ok(FieldIds::None),
19+
"auto" => Ok(FieldIds::Auto),
20+
field_ids => Ok(FieldIds::Explicit(field_id_mapping_from_json_string(
21+
field_ids,
22+
)?)),
23+
}
24+
}
25+
}
26+
27+
impl Display for FieldIds {
28+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
29+
match self {
30+
FieldIds::None => write!(f, "none"),
31+
FieldIds::Auto => write!(f, "auto"),
32+
FieldIds::Explicit(field_id_mapping) => {
33+
write!(f, "{}", field_id_mapping_to_json_string(field_id_mapping))
34+
}
35+
}
36+
}
37+
}
38+
39+
#[derive(Debug, Clone, Serialize, Deserialize)]
40+
#[serde(untagged)]
41+
enum FieldIdMappingItem {
42+
FieldId(i32),
43+
FieldIdMapping(FieldIdMapping),
44+
}
45+
46+
#[derive(Debug, Clone, Serialize, Deserialize)]
47+
pub(crate) struct FieldIdMapping {
48+
#[serde(flatten)]
49+
fields: HashMap<String, FieldIdMappingItem>,
50+
}
51+
52+
impl FieldIdMapping {
53+
pub(crate) fn field_id(&self, field_path: &[String]) -> Option<i32> {
54+
if field_path.is_empty() {
55+
panic!("Field path is empty");
56+
}
57+
58+
let field_name = &field_path[0];
59+
60+
match self.fields.get(field_name) {
61+
Some(FieldIdMappingItem::FieldId(field_id)) => Some(*field_id),
62+
Some(FieldIdMappingItem::FieldIdMapping(field_id_mapping)) => {
63+
field_id_mapping.field_id(&field_path[1..])
64+
}
65+
None => None,
66+
}
67+
}
68+
}
69+
70+
pub(crate) fn field_id_mapping_from_json_string(
71+
json_string: &str,
72+
) -> Result<FieldIdMapping, String> {
73+
serde_json::from_str(json_string).map_err(|_| "invalid JSON string for field_ids".into())
74+
}
75+
76+
fn field_id_mapping_to_json_string(field_id_mapping: &FieldIdMapping) -> String {
77+
serde_json::to_string(field_id_mapping).unwrap()
78+
}

src/arrow_parquet/parquet_reader.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use pgrx::{
1717
use crate::{
1818
arrow_parquet::{
1919
arrow_to_pg::{context::collect_arrow_to_pg_attribute_contexts, to_pg_datum},
20+
field_ids::FieldIds,
2021
schema_parser::{
2122
error_if_copy_from_match_by_position_with_generated_columns,
2223
parquet_schema_string_from_attributes,
@@ -68,7 +69,8 @@ impl ParquetReaderContext {
6869
parquet_schema_string_from_attributes(&attributes)
6970
);
7071

71-
let tupledesc_schema = parse_arrow_schema_from_attributes(&attributes);
72+
let tupledesc_schema =
73+
parse_arrow_schema_from_attributes(&attributes, FieldIds::None);
7274

7375
let tupledesc_schema = Arc::new(tupledesc_schema);
7476

src/arrow_parquet/parquet_writer.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use crate::{
2828
};
2929

3030
use super::{
31+
field_ids::FieldIds,
3132
pg_to_arrow::{context::PgToArrowAttributeContext, to_arrow_array},
3233
uri_utils::ParsedUriInfo,
3334
};
@@ -46,6 +47,7 @@ impl ParquetWriterContext {
4647
pub(crate) fn new(
4748
uri_info: ParsedUriInfo,
4849
options: CopyToParquetOptions,
50+
field_ids: FieldIds,
4951
tupledesc: &PgTupleDesc,
5052
) -> ParquetWriterContext {
5153
// Postgis and Map contexts are used throughout writing the parquet file.
@@ -60,7 +62,7 @@ impl ParquetWriterContext {
6062
parquet_schema_string_from_attributes(&attributes)
6163
);
6264

63-
let schema = parse_arrow_schema_from_attributes(&attributes);
65+
let schema = parse_arrow_schema_from_attributes(&attributes, field_ids);
6466
let schema = Arc::new(schema);
6567

6668
let writer_props = Self::writer_props(tupledesc, options);

0 commit comments

Comments
 (0)