Skip to content

Commit cfa4dec

Browse files
committed
Supports field_ids during COPY TO
Adds `field_ids` option, which lets you specify how to assign field ids during COPY TO. Supported values for it: - `none` (default) => no field ids are written into parquet metadata. - `auto` => pg_parquet generates fields ids starting from 0. - `<json string>` => pg_parquet will use the given field ids. e.g. ```sql create table test_table(a int, b text[]); copy test_table to '/tmp/test.parquet' with (field_ids '{"a": 1, "b": {"__root_field_id": 2, "element": 3}}'); ``` Closes #106.
1 parent f8c3d62 commit cfa4dec

File tree

13 files changed

+1192
-189
lines changed

13 files changed

+1192
-189
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,7 @@ Supported authorization methods' priority order is shown below:
274274
`pg_parquet` supports the following options in the `COPY TO` command:
275275
- `format parquet`: you need to specify this option to read or write Parquet files which does not end with `.parquet[.<compression>]` extension,
276276
- `file_size_bytes <string>`: the total file size per Parquet file. When set, the parquet files, with target size, are created under parent directory (named the same as file name). By default, when not specified, a single file is generated without creating a parent folder. You can specify total bytes without unit like `file_size_bytes 2000000` or with unit (KB, MB, or GB) like `file_size_bytes '1MB'`,
277+
- `field_ids <string>`: fields ids that are assigned to the fields in Parquet file schema. By default, no field ids are assigned. Pass `auto` to let pg_parquet generate field ids. You can pass a json string to explicitly pass the field ids,
277278
- `row_group_size <int64>`: the number of rows in each row group while writing Parquet files. The default row group size is `122880`,
278279
- `row_group_size_bytes <int64>`: the total byte size of rows in each row group while writing Parquet files. The default row group size bytes is `row_group_size * 1024`,
279280
- `compression <string>`: the compression format to use while writing Parquet files. The supported compression formats are `uncompressed`, `snappy`, `gzip`, `brotli`, `lz4`, `lz4raw` and `zstd`. The default compression format is `snappy`. If not specified, the compression format is determined by the file extension,

src/arrow_parquet.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
pub(crate) mod arrow_to_pg;
22
pub(crate) mod arrow_utils;
33
pub(crate) mod compression;
4+
pub(crate) mod field_ids;
45
pub(crate) mod match_by;
56
pub(crate) mod parquet_reader;
67
pub(crate) mod parquet_writer;

src/arrow_parquet/field_ids.rs

Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
use std::{collections::HashMap, fmt::Display, str::FromStr};
2+
3+
use arrow_schema::{DataType, Schema};
4+
use serde::{Deserialize, Serialize};
5+
6+
#[derive(Debug, Clone, Default)]
7+
pub(crate) enum FieldIds {
8+
#[default]
9+
None,
10+
Auto,
11+
Explicit(FieldIdMapping),
12+
}
13+
14+
/// Implements parsing for the field_ids option in COPY .. TO statements
15+
impl FromStr for FieldIds {
16+
type Err = String;
17+
18+
fn from_str(s: &str) -> Result<Self, Self::Err> {
19+
match s {
20+
"none" => Ok(FieldIds::None),
21+
"auto" => Ok(FieldIds::Auto),
22+
field_ids => Ok(FieldIds::Explicit(field_id_mapping_from_json_string(
23+
field_ids,
24+
)?)),
25+
}
26+
}
27+
}
28+
29+
impl Display for FieldIds {
30+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31+
match self {
32+
FieldIds::None => write!(f, "none"),
33+
FieldIds::Auto => write!(f, "auto"),
34+
FieldIds::Explicit(field_id_mapping) => {
35+
write!(f, "{}", field_id_mapping_to_json_string(field_id_mapping))
36+
}
37+
}
38+
}
39+
}
40+
41+
#[derive(Debug, Clone, Serialize, Deserialize)]
42+
#[serde(untagged)]
43+
enum FieldIdMappingItem {
44+
FieldId(i32),
45+
FieldIdMapping(FieldIdMapping),
46+
}
47+
48+
#[derive(Debug, Clone, Serialize, Deserialize)]
49+
pub(crate) struct FieldIdMapping {
50+
#[serde(flatten)]
51+
fields: HashMap<String, FieldIdMappingItem>,
52+
}
53+
54+
impl FieldIdMapping {
55+
/// Returns the field ID, if any, from `FieldIdMapping` for the given field path.
56+
pub(crate) fn field_id(&self, field_path: &[String]) -> Option<i32> {
57+
if field_path.is_empty() {
58+
panic!("Field path is empty");
59+
}
60+
61+
let field_name = &field_path[0];
62+
63+
match self.fields.get(field_name) {
64+
Some(FieldIdMappingItem::FieldId(field_id)) => Some(*field_id),
65+
Some(FieldIdMappingItem::FieldIdMapping(field_id_mapping)) => {
66+
field_id_mapping.field_id(&field_path[1..])
67+
}
68+
None => None,
69+
}
70+
}
71+
72+
/// Validates that every field name in the `FieldIdMapping` exists in the provided Arrow schema
73+
fn validate_against_schema(&self, arrow_schema: &Schema) -> Result<(), String> {
74+
// Build a map from field name to &Field for quick lookups
75+
let mut arrow_field_map = HashMap::new();
76+
for field in arrow_schema.fields() {
77+
arrow_field_map.insert(field.name().clone(), field);
78+
}
79+
80+
// Check every field name in the JSON mapping
81+
for (field_name, mapping_item) in &self.fields {
82+
if field_name == "__root_field_id" {
83+
// Skip the root field, as it doesn't exist in the Arrow schema
84+
continue;
85+
}
86+
87+
// Ensure the field exists in the Arrow schema
88+
let arrow_field = match arrow_field_map.get(field_name) {
89+
Some(f) => f,
90+
None => {
91+
return Err(format!(
92+
"Field '{}' in the mapping does not exist in the Arrow schema.\nAvailable fields: {:?}",
93+
field_name,
94+
arrow_schema
95+
.fields()
96+
.iter()
97+
.map(|f| f.name())
98+
.collect::<Vec<_>>()
99+
));
100+
}
101+
};
102+
103+
match mapping_item {
104+
// If the JSON item is an integer field ID, we're done
105+
FieldIdMappingItem::FieldId(_id) => {}
106+
107+
// If the JSON item is a nested mapping, we need to validate it
108+
FieldIdMappingItem::FieldIdMapping(mapping) => match arrow_field.data_type() {
109+
DataType::Struct(subfields) => {
110+
// We expect the JSON keys to include something like:
111+
// "__root_field_id": <int>,
112+
// "field_name": <int or nested mapping>
113+
114+
let subschema = Schema::new(subfields.clone());
115+
mapping.validate_against_schema(&subschema)?;
116+
}
117+
DataType::List(element_field) => {
118+
// We expect the JSON keys to include something like:
119+
// "__root_field_id": <int>,
120+
// "element": <int or nested mapping>
121+
//
122+
123+
let element_schema = Schema::new(vec![element_field.clone()]);
124+
mapping.validate_against_schema(&element_schema)?;
125+
}
126+
DataType::Map(entry_field, _) => {
127+
// We expect the JSON keys to include something like:
128+
// "__root_field_id": <int>,
129+
// "key": <int or nested mapping>
130+
// "val": <int or nested mapping>
131+
132+
match entry_field.data_type() {
133+
DataType::Struct(entry_fields) => {
134+
let entry_schema = Schema::new(entry_fields.clone());
135+
mapping.validate_against_schema(&entry_schema)?;
136+
}
137+
other_type => {
138+
panic!(
139+
"Map entry field should be a struct, but got '{:?}' for field '{}'",
140+
other_type, field_name
141+
);
142+
}
143+
};
144+
145+
return Ok(());
146+
}
147+
other_type => {
148+
panic!(
149+
"Unexpected data type '{:?}' for field '{}'",
150+
other_type, field_name
151+
);
152+
}
153+
},
154+
}
155+
}
156+
157+
Ok(())
158+
}
159+
}
160+
161+
pub(crate) fn field_id_mapping_from_json_string(
162+
json_string: &str,
163+
) -> Result<FieldIdMapping, String> {
164+
serde_json::from_str(json_string).map_err(|_| "invalid JSON string for field_ids".into())
165+
}
166+
167+
fn field_id_mapping_to_json_string(field_id_mapping: &FieldIdMapping) -> String {
168+
serde_json::to_string(field_id_mapping).unwrap()
169+
}
170+
171+
/// Validate that every field name in the `FieldIdMapping` exists in the provided Arrow schema
172+
/// when the `FieldIds` are explicitly specified.
173+
pub(crate) fn validate_field_ids(field_ids: FieldIds, arrow_schema: &Schema) -> Result<(), String> {
174+
match field_ids {
175+
FieldIds::None => Ok(()),
176+
FieldIds::Auto => Ok(()),
177+
FieldIds::Explicit(field_id_mapping) => {
178+
field_id_mapping.validate_against_schema(arrow_schema)
179+
}
180+
}
181+
}

src/arrow_parquet/parquet_reader.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use pgrx::{
1717
use crate::{
1818
arrow_parquet::{
1919
arrow_to_pg::{context::collect_arrow_to_pg_attribute_contexts, to_pg_datum},
20+
field_ids::FieldIds,
2021
schema_parser::{
2122
error_if_copy_from_match_by_position_with_generated_columns,
2223
parquet_schema_string_from_attributes,
@@ -65,10 +66,10 @@ impl ParquetReaderContext {
6566

6667
pgrx::debug2!(
6768
"schema for tuples: {}",
68-
parquet_schema_string_from_attributes(&attributes)
69+
parquet_schema_string_from_attributes(&attributes, FieldIds::None)
6970
);
7071

71-
let tupledesc_schema = parse_arrow_schema_from_attributes(&attributes);
72+
let tupledesc_schema = parse_arrow_schema_from_attributes(&attributes, FieldIds::None);
7273

7374
let tupledesc_schema = Arc::new(tupledesc_schema);
7475

src/arrow_parquet/parquet_writer.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use pgrx::{heap_tuple::PgHeapTuple, AllocatedByRust, PgTupleDesc};
1212
use crate::{
1313
arrow_parquet::{
1414
compression::PgParquetCompressionWithLevel,
15+
field_ids::validate_field_ids,
1516
pg_to_arrow::context::collect_pg_to_arrow_attribute_contexts,
1617
schema_parser::{
1718
parquet_schema_string_from_attributes, parse_arrow_schema_from_attributes,
@@ -28,6 +29,7 @@ use crate::{
2829
};
2930

3031
use super::{
32+
field_ids::FieldIds,
3133
pg_to_arrow::{context::PgToArrowAttributeContext, to_arrow_array},
3234
uri_utils::ParsedUriInfo,
3335
};
@@ -46,6 +48,7 @@ impl ParquetWriterContext {
4648
pub(crate) fn new(
4749
uri_info: ParsedUriInfo,
4850
options: CopyToParquetOptions,
51+
field_ids: FieldIds,
4952
tupledesc: &PgTupleDesc,
5053
) -> ParquetWriterContext {
5154
// Postgis and Map contexts are used throughout writing the parquet file.
@@ -57,10 +60,13 @@ impl ParquetWriterContext {
5760

5861
pgrx::debug2!(
5962
"schema for tuples: {}",
60-
parquet_schema_string_from_attributes(&attributes)
63+
parquet_schema_string_from_attributes(&attributes, field_ids.clone())
6164
);
6265

63-
let schema = parse_arrow_schema_from_attributes(&attributes);
66+
let schema = parse_arrow_schema_from_attributes(&attributes, field_ids.clone());
67+
68+
validate_field_ids(field_ids, &schema).unwrap_or_else(|e| panic!("{e}"));
69+
6470
let schema = Arc::new(schema);
6571

6672
let writer_props = Self::writer_props(tupledesc, options);

0 commit comments

Comments
 (0)