Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ Supported authorization methods' priority order is shown below:
`pg_parquet` supports the following options in the `COPY TO` command:
- `format parquet`: you need to specify this option to read or write Parquet files which does not end with `.parquet[.<compression>]` extension,
- `file_size_bytes <string>`: the total file size per Parquet file. When set, the parquet files, with target size, are created under parent directory (named the same as file name). By default, when not specified, a single file is generated without creating a parent folder. You can specify total bytes without unit like `file_size_bytes 2000000` or with unit (KB, MB, or GB) like `file_size_bytes '1MB'`,
- `field_ids <string>`: fields ids that are assigned to the fields in Parquet file schema. By default, no field ids are assigned. Pass `auto` to let pg_parquet generate field ids. You can pass a json string to explicitly pass the field ids,
- `row_group_size <int64>`: the number of rows in each row group while writing Parquet files. The default row group size is `122880`,
- `row_group_size_bytes <int64>`: the total byte size of rows in each row group while writing Parquet files. The default row group size bytes is `row_group_size * 1024`,
- `compression <string>`: the compression format to use while writing Parquet files. The supported compression formats are `uncompressed`, `snappy`, `gzip`, `brotli`, `lz4`, `lz4raw` and `zstd`. The default compression format is `snappy`. If not specified, the compression format is determined by the file extension,
Expand Down
1 change: 1 addition & 0 deletions src/arrow_parquet.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub(crate) mod arrow_to_pg;
pub(crate) mod arrow_utils;
pub(crate) mod compression;
pub(crate) mod field_ids;
pub(crate) mod match_by;
pub(crate) mod parquet_reader;
pub(crate) mod parquet_writer;
Expand Down
181 changes: 181 additions & 0 deletions src/arrow_parquet/field_ids.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
use std::{collections::HashMap, fmt::Display, str::FromStr};

use arrow_schema::{DataType, Schema};
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Default)]
pub(crate) enum FieldIds {
#[default]
None,
Auto,
Explicit(FieldIdMapping),
}

/// Implements parsing for the field_ids option in COPY .. TO statements
impl FromStr for FieldIds {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

generally, it would be useful to add more comments to things as the code base is growing:

// implements parsing for the field_ids option in COPY .. TO statements

type Err = String;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"none" => Ok(FieldIds::None),
"auto" => Ok(FieldIds::Auto),
field_ids => Ok(FieldIds::Explicit(field_id_mapping_from_json_string(
field_ids,
)?)),
}
}
}

impl Display for FieldIds {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
FieldIds::None => write!(f, "none"),
FieldIds::Auto => write!(f, "auto"),
FieldIds::Explicit(field_id_mapping) => {
write!(f, "{}", field_id_mapping_to_json_string(field_id_mapping))

Check warning on line 35 in src/arrow_parquet/field_ids.rs

View check run for this annotation

Codecov / codecov/patch

src/arrow_parquet/field_ids.rs#L33-L35

Added lines #L33 - L35 were not covered by tests
}
}
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
enum FieldIdMappingItem {
FieldId(i32),
FieldIdMapping(FieldIdMapping),
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct FieldIdMapping {
#[serde(flatten)]
fields: HashMap<String, FieldIdMappingItem>,
}

impl FieldIdMapping {
/// Returns the field ID, if any, from `FieldIdMapping` for the given field path.
pub(crate) fn field_id(&self, field_path: &[String]) -> Option<i32> {
if field_path.is_empty() {
panic!("Field path is empty");

Check warning on line 58 in src/arrow_parquet/field_ids.rs

View check run for this annotation

Codecov / codecov/patch

src/arrow_parquet/field_ids.rs#L58

Added line #L58 was not covered by tests
}

let field_name = &field_path[0];

match self.fields.get(field_name) {
Some(FieldIdMappingItem::FieldId(field_id)) => Some(*field_id),
Some(FieldIdMappingItem::FieldIdMapping(field_id_mapping)) => {
field_id_mapping.field_id(&field_path[1..])
}
None => None,
}
}

/// Validates that every field name in the `FieldIdMapping` exists in the provided Arrow schema
fn validate_against_schema(&self, arrow_schema: &Schema) -> Result<(), String> {
// Build a map from field name to &Field for quick lookups
let mut arrow_field_map = HashMap::new();
for field in arrow_schema.fields() {
arrow_field_map.insert(field.name().clone(), field);
}

// Check every field name in the JSON mapping
for (field_name, mapping_item) in &self.fields {
if field_name == "__root_field_id" {
// Skip the root field, as it doesn't exist in the Arrow schema
continue;
}

// Ensure the field exists in the Arrow schema
let arrow_field = match arrow_field_map.get(field_name) {
Some(f) => f,
None => {
return Err(format!(
"Field '{}' in the mapping does not exist in the Arrow schema.\nAvailable fields: {:?}",
field_name,
arrow_schema
.fields()
.iter()
.map(|f| f.name())
.collect::<Vec<_>>()
));
}
};

match mapping_item {
// If the JSON item is an integer field ID, we're done
FieldIdMappingItem::FieldId(_id) => {}

// If the JSON item is a nested mapping, we need to validate it
FieldIdMappingItem::FieldIdMapping(mapping) => match arrow_field.data_type() {
DataType::Struct(subfields) => {
// We expect the JSON keys to include something like:
// "__root_field_id": <int>,
// "field_name": <int or nested mapping>

let subschema = Schema::new(subfields.clone());
mapping.validate_against_schema(&subschema)?;
}
DataType::List(element_field) => {
// We expect the JSON keys to include something like:
// "__root_field_id": <int>,
// "element": <int or nested mapping>
//

let element_schema = Schema::new(vec![element_field.clone()]);
mapping.validate_against_schema(&element_schema)?;
}
DataType::Map(entry_field, _) => {
// We expect the JSON keys to include something like:
// "__root_field_id": <int>,
// "key": <int or nested mapping>
// "val": <int or nested mapping>

match entry_field.data_type() {
DataType::Struct(entry_fields) => {
let entry_schema = Schema::new(entry_fields.clone());
mapping.validate_against_schema(&entry_schema)?;

Check warning on line 135 in src/arrow_parquet/field_ids.rs

View check run for this annotation

Codecov / codecov/patch

src/arrow_parquet/field_ids.rs#L126-L135

Added lines #L126 - L135 were not covered by tests
}
other_type => {
panic!(
"Map entry field should be a struct, but got '{:?}' for field '{}'",
other_type, field_name
);

Check warning on line 141 in src/arrow_parquet/field_ids.rs

View check run for this annotation

Codecov / codecov/patch

src/arrow_parquet/field_ids.rs#L137-L141

Added lines #L137 - L141 were not covered by tests
}
};

return Ok(());

Check warning on line 145 in src/arrow_parquet/field_ids.rs

View check run for this annotation

Codecov / codecov/patch

src/arrow_parquet/field_ids.rs#L145

Added line #L145 was not covered by tests
}
other_type => {
panic!(
"Unexpected data type '{:?}' for field '{}'",
other_type, field_name
);

Check warning on line 151 in src/arrow_parquet/field_ids.rs

View check run for this annotation

Codecov / codecov/patch

src/arrow_parquet/field_ids.rs#L147-L151

Added lines #L147 - L151 were not covered by tests
}
},
}
}

Ok(())
}
}

pub(crate) fn field_id_mapping_from_json_string(
json_string: &str,
) -> Result<FieldIdMapping, String> {
serde_json::from_str(json_string).map_err(|_| "invalid JSON string for field_ids".into())
}

fn field_id_mapping_to_json_string(field_id_mapping: &FieldIdMapping) -> String {
serde_json::to_string(field_id_mapping).unwrap()
}

Check warning on line 169 in src/arrow_parquet/field_ids.rs

View check run for this annotation

Codecov / codecov/patch

src/arrow_parquet/field_ids.rs#L167-L169

Added lines #L167 - L169 were not covered by tests

/// Validate that every field name in the `FieldIdMapping` exists in the provided Arrow schema
/// when the `FieldIds` are explicitly specified.
pub(crate) fn validate_field_ids(field_ids: FieldIds, arrow_schema: &Schema) -> Result<(), String> {
match field_ids {
FieldIds::None => Ok(()),
FieldIds::Auto => Ok(()),
FieldIds::Explicit(field_id_mapping) => {
field_id_mapping.validate_against_schema(arrow_schema)
}
}
}
5 changes: 3 additions & 2 deletions src/arrow_parquet/parquet_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use pgrx::{
use crate::{
arrow_parquet::{
arrow_to_pg::{context::collect_arrow_to_pg_attribute_contexts, to_pg_datum},
field_ids::FieldIds,
schema_parser::{
error_if_copy_from_match_by_position_with_generated_columns,
parquet_schema_string_from_attributes,
Expand Down Expand Up @@ -65,10 +66,10 @@ impl ParquetReaderContext {

pgrx::debug2!(
"schema for tuples: {}",
parquet_schema_string_from_attributes(&attributes)
parquet_schema_string_from_attributes(&attributes, FieldIds::None)
);

let tupledesc_schema = parse_arrow_schema_from_attributes(&attributes);
let tupledesc_schema = parse_arrow_schema_from_attributes(&attributes, FieldIds::None);

let tupledesc_schema = Arc::new(tupledesc_schema);

Expand Down
10 changes: 8 additions & 2 deletions src/arrow_parquet/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use pgrx::{heap_tuple::PgHeapTuple, AllocatedByRust, PgTupleDesc};
use crate::{
arrow_parquet::{
compression::PgParquetCompressionWithLevel,
field_ids::validate_field_ids,
pg_to_arrow::context::collect_pg_to_arrow_attribute_contexts,
schema_parser::{
parquet_schema_string_from_attributes, parse_arrow_schema_from_attributes,
Expand All @@ -28,6 +29,7 @@ use crate::{
};

use super::{
field_ids::FieldIds,
pg_to_arrow::{context::PgToArrowAttributeContext, to_arrow_array},
uri_utils::ParsedUriInfo,
};
Expand All @@ -46,6 +48,7 @@ impl ParquetWriterContext {
pub(crate) fn new(
uri_info: ParsedUriInfo,
options: CopyToParquetOptions,
field_ids: FieldIds,
tupledesc: &PgTupleDesc,
) -> ParquetWriterContext {
// Postgis and Map contexts are used throughout writing the parquet file.
Expand All @@ -57,10 +60,13 @@ impl ParquetWriterContext {

pgrx::debug2!(
"schema for tuples: {}",
parquet_schema_string_from_attributes(&attributes)
parquet_schema_string_from_attributes(&attributes, field_ids.clone())
);

let schema = parse_arrow_schema_from_attributes(&attributes);
let schema = parse_arrow_schema_from_attributes(&attributes, field_ids.clone());

validate_field_ids(field_ids, &schema).unwrap_or_else(|e| panic!("{e}"));

let schema = Arc::new(schema);

let writer_props = Self::writer_props(tupledesc, options);
Expand Down
Loading