Skip to content

Commit 4241cae

Browse files
committed
Match fields by name via option
We add an option for `COPY FROM` called `match_by_name` which matches Parquet file fields to PostgreSQL table columns `by their names` rather than `by their order` in the schema. By default, the option is `false`. The option is useful when field order differs between the Parquet file and the table, but their names match. **!!IMPORTANT!!**: This is a breaking change. Before the PR, we match always by name. This is a bit strict and not common way to match schemas. (e.g. COPY FROM csv at postgres or COPY FROM of duckdb match by field position by default) This is why we match by position by default and have a COPY FROM option `match_by_name` that can be set to true for the old behaviour. Closes #39.
1 parent fbaeadb commit 4241cae

File tree

10 files changed

+163
-35
lines changed

10 files changed

+163
-35
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,7 @@ Alternatively, you can use the following environment variables when starting pos
193193

194194
`pg_parquet` supports the following options in the `COPY FROM` command:
195195
- `format parquet`: you need to specify this option to read or write Parquet files which does not end with `.parquet[.<compression>]` extension,
196+
- `match_by_name <bool>`: matches Parquet file fields to PostgreSQL table columns by their name rather than by their position in the schema (default). By default, the option is `false`. The option is useful when field order differs between the Parquet file and the table, but their names match.
196197

197198
## Configuration
198199
There is currently only one GUC parameter to enable/disable the `pg_parquet`:

src/arrow_parquet/arrow_to_pg.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ impl ArrowToPgAttributeContext {
163163
};
164164

165165
let attributes =
166-
collect_attributes_for(CollectAttributesFor::Struct, attribute_tupledesc);
166+
collect_attributes_for(CollectAttributesFor::Other, attribute_tupledesc);
167167

168168
// we only cast the top-level attributes, which already covers the nested attributes
169169
let cast_to_types = None;

src/arrow_parquet/parquet_reader.rs

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,11 @@ use url::Url;
1616

1717
use crate::{
1818
arrow_parquet::{
19-
arrow_to_pg::to_pg_datum, schema_parser::parquet_schema_string_from_attributes,
19+
arrow_to_pg::to_pg_datum,
20+
schema_parser::{
21+
error_if_copy_from_match_by_position_with_generated_columns,
22+
parquet_schema_string_from_attributes,
23+
},
2024
},
2125
pgrx_utils::{collect_attributes_for, CollectAttributesFor},
2226
type_compat::{geometry::reset_postgis_context, map::reset_map_context},
@@ -38,15 +42,18 @@ pub(crate) struct ParquetReaderContext {
3842
parquet_reader: ParquetRecordBatchStream<ParquetObjectReader>,
3943
attribute_contexts: Vec<ArrowToPgAttributeContext>,
4044
binary_out_funcs: Vec<PgBox<FmgrInfo>>,
45+
match_by_name: bool,
4146
}
4247

4348
impl ParquetReaderContext {
44-
pub(crate) fn new(uri: Url, tupledesc: &PgTupleDesc) -> Self {
49+
pub(crate) fn new(uri: Url, match_by_name: bool, tupledesc: &PgTupleDesc) -> Self {
4550
// Postgis and Map contexts are used throughout reading the parquet file.
4651
// We need to reset them to avoid reading the stale data. (e.g. extension could be dropped)
4752
reset_postgis_context();
4853
reset_map_context();
4954

55+
error_if_copy_from_match_by_position_with_generated_columns(tupledesc, match_by_name);
56+
5057
let parquet_reader = parquet_reader_from_uri(&uri);
5158

5259
let parquet_file_schema = parquet_reader.schema();
@@ -69,6 +76,7 @@ impl ParquetReaderContext {
6976
parquet_file_schema.clone(),
7077
tupledesc_schema.clone(),
7178
&attributes,
79+
match_by_name,
7280
);
7381

7482
let attribute_contexts = collect_arrow_to_pg_attribute_contexts(
@@ -85,6 +93,7 @@ impl ParquetReaderContext {
8593
attribute_contexts,
8694
parquet_reader,
8795
binary_out_funcs,
96+
match_by_name,
8897
started: false,
8998
finished: false,
9099
}
@@ -116,15 +125,23 @@ impl ParquetReaderContext {
116125
fn record_batch_to_tuple_datums(
117126
record_batch: RecordBatch,
118127
attribute_contexts: &[ArrowToPgAttributeContext],
128+
match_by_name: bool,
119129
) -> Vec<Option<Datum>> {
120130
let mut datums = vec![];
121131

122-
for attribute_context in attribute_contexts {
132+
for (attribute_idx, attribute_context) in attribute_contexts.iter().enumerate() {
123133
let name = attribute_context.name();
124134

125-
let column_array = record_batch
126-
.column_by_name(name)
127-
.unwrap_or_else(|| panic!("column {} not found", name));
135+
let column_array = if match_by_name {
136+
record_batch
137+
.column_by_name(name)
138+
.unwrap_or_else(|| panic!("column {} not found", name))
139+
} else {
140+
record_batch
141+
.columns()
142+
.get(attribute_idx)
143+
.unwrap_or_else(|| panic!("column {} not found", name))
144+
};
128145

129146
let datum = if attribute_context.needs_cast() {
130147
// should fail instead of returning None if the cast fails at runtime
@@ -181,8 +198,11 @@ impl ParquetReaderContext {
181198
self.buffer.extend_from_slice(&attnum_len_bytes);
182199

183200
// convert the columnar arrays in record batch to tuple datums
184-
let tuple_datums =
185-
Self::record_batch_to_tuple_datums(record_batch, &self.attribute_contexts);
201+
let tuple_datums = Self::record_batch_to_tuple_datums(
202+
record_batch,
203+
&self.attribute_contexts,
204+
self.match_by_name,
205+
);
186206

187207
// write the tuple datums to the ParquetReader's internal buffer in PG copy format
188208
for (datum, out_func) in tuple_datums.into_iter().zip(self.binary_out_funcs.iter())

src/arrow_parquet/pg_to_arrow.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ impl PgToArrowAttributeContext {
148148
};
149149

150150
let attributes =
151-
collect_attributes_for(CollectAttributesFor::Struct, &attribute_tupledesc);
151+
collect_attributes_for(CollectAttributesFor::Other, &attribute_tupledesc);
152152

153153
collect_pg_to_arrow_attribute_contexts(&attributes, &fields)
154154
});

src/arrow_parquet/schema_parser.rs

Lines changed: 50 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use pgrx::{check_for_interrupts, prelude::*, PgTupleDesc};
1616
use crate::{
1717
pgrx_utils::{
1818
array_element_typoid, collect_attributes_for, domain_array_base_elem_typoid, is_array_type,
19-
is_composite_type, tuple_desc, CollectAttributesFor,
19+
is_composite_type, is_generated_attribute, tuple_desc, CollectAttributesFor,
2020
},
2121
type_compat::{
2222
geometry::is_postgis_geometry_type,
@@ -95,7 +95,7 @@ fn parse_struct_schema(tupledesc: PgTupleDesc, elem_name: &str, field_id: &mut i
9595

9696
let mut child_fields: Vec<Arc<Field>> = vec![];
9797

98-
let attributes = collect_attributes_for(CollectAttributesFor::Struct, &tupledesc);
98+
let attributes = collect_attributes_for(CollectAttributesFor::Other, &tupledesc);
9999

100100
for attribute in attributes {
101101
if attribute.is_dropped() {
@@ -342,28 +342,69 @@ fn adjust_map_entries_field(field: FieldRef) -> FieldRef {
342342
Arc::new(entries_field)
343343
}
344344

345+
pub(crate) fn error_if_copy_from_match_by_position_with_generated_columns(
346+
tupledesc: &PgTupleDesc,
347+
match_by_name: bool,
348+
) {
349+
// match_by_name can handle generated columns
350+
if match_by_name {
351+
return;
352+
}
353+
354+
let attributes = collect_attributes_for(CollectAttributesFor::Other, tupledesc);
355+
356+
for attribute in attributes {
357+
if is_generated_attribute(&attribute) {
358+
ereport!(
359+
PgLogLevel::ERROR,
360+
PgSqlErrorCode::ERRCODE_FEATURE_NOT_SUPPORTED,
361+
"COPY FROM parquet with generated columns is not supported",
362+
"Try COPY FROM parquet WITH (match_by_name true). \"
363+
It works only if the column names match with parquet file's.",
364+
);
365+
}
366+
}
367+
}
368+
345369
// ensure_file_schema_match_tupledesc_schema throws an error if the file's schema does not match the table schema.
346370
// If the file's arrow schema is castable to the table's arrow schema, it returns a vector of Option<DataType>
347371
// to cast to for each field.
348372
pub(crate) fn ensure_file_schema_match_tupledesc_schema(
349373
file_schema: Arc<Schema>,
350374
tupledesc_schema: Arc<Schema>,
351375
attributes: &[FormData_pg_attribute],
376+
match_by_name: bool,
352377
) -> Vec<Option<DataType>> {
353378
let mut cast_to_types = Vec::new();
354379

380+
if !match_by_name && tupledesc_schema.fields().len() != file_schema.fields().len() {
381+
panic!(
382+
"column count mismatch between table and parquet file. \
383+
parquet file has {} columns, but table has {} columns",
384+
file_schema.fields().len(),
385+
tupledesc_schema.fields().len()
386+
);
387+
}
388+
355389
for (tupledesc_schema_field, attribute) in
356390
tupledesc_schema.fields().iter().zip(attributes.iter())
357391
{
358392
let field_name = tupledesc_schema_field.name();
359393

360-
let file_schema_field = file_schema.column_with_name(field_name);
394+
let file_schema_field = if match_by_name {
395+
let file_schema_field = file_schema.column_with_name(field_name);
361396

362-
if file_schema_field.is_none() {
363-
panic!("column \"{}\" is not found in parquet file", field_name);
364-
}
397+
if file_schema_field.is_none() {
398+
panic!("column \"{}\" is not found in parquet file", field_name);
399+
}
400+
401+
let (_, file_schema_field) = file_schema_field.unwrap();
402+
403+
file_schema_field
404+
} else {
405+
file_schema.field(attribute.attnum as usize - 1)
406+
};
365407

366-
let (_, file_schema_field) = file_schema_field.unwrap();
367408
let file_schema_field = Arc::new(file_schema_field.clone());
368409

369410
let from_type = file_schema_field.data_type();
@@ -378,7 +419,7 @@ pub(crate) fn ensure_file_schema_match_tupledesc_schema(
378419
if !is_coercible(from_type, to_type, attribute.atttypid, attribute.atttypmod) {
379420
panic!(
380421
"type mismatch for column \"{}\" between table and parquet file.\n\n\
381-
table has \"{}\"\n\nparquet file has \"{}\"",
422+
table has \"{}\"\n\nparquet file has \"{}\"",
382423
field_name, to_type, from_type
383424
);
384425
}
@@ -413,7 +454,7 @@ fn is_coercible(from_type: &DataType, to_type: &DataType, to_typoid: Oid, to_typ
413454

414455
let tupledesc = tuple_desc(to_typoid, to_typmod);
415456

416-
let attributes = collect_attributes_for(CollectAttributesFor::Struct, &tupledesc);
457+
let attributes = collect_attributes_for(CollectAttributesFor::Other, &tupledesc);
417458

418459
for (from_field, (to_field, to_attribute)) in from_fields
419460
.iter()

src/parquet_copy_hook/copy_from.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ use crate::{
2020
};
2121

2222
use super::copy_utils::{
23-
copy_stmt_attribute_list, copy_stmt_create_namespace_item, copy_stmt_create_parse_state,
24-
create_filtered_tupledesc_for_relation,
23+
copy_from_stmt_match_by_name, copy_stmt_attribute_list, copy_stmt_create_namespace_item,
24+
copy_stmt_create_parse_state, create_filtered_tupledesc_for_relation,
2525
};
2626

2727
// stack to store parquet reader contexts for COPY FROM.
@@ -131,9 +131,11 @@ pub(crate) fn execute_copy_from(
131131

132132
let tupledesc = create_filtered_tupledesc_for_relation(p_stmt, &relation);
133133

134+
let match_by_name = copy_from_stmt_match_by_name(p_stmt);
135+
134136
unsafe {
135137
// parquet reader context is used throughout the COPY FROM operation.
136-
let parquet_reader_context = ParquetReaderContext::new(uri, &tupledesc);
138+
let parquet_reader_context = ParquetReaderContext::new(uri, match_by_name, &tupledesc);
137139
push_parquet_reader_context(parquet_reader_context);
138140

139141
// makes sure to set binary format

src/parquet_copy_hook/copy_utils.rs

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,12 @@ use std::{ffi::CStr, str::FromStr};
33
use pgrx::{
44
is_a,
55
pg_sys::{
6-
addRangeTableEntryForRelation, defGetInt32, defGetInt64, defGetString, get_namespace_name,
7-
get_rel_namespace, makeDefElem, makeString, make_parsestate, quote_qualified_identifier,
8-
AccessShareLock, AsPgCStr, CopyStmt, CreateTemplateTupleDesc, DefElem, List, NoLock, Node,
9-
NodeTag::T_CopyStmt, Oid, ParseNamespaceItem, ParseState, PlannedStmt, QueryEnvironment,
10-
RangeVar, RangeVarGetRelidExtended, RowExclusiveLock, TupleDescInitEntry,
6+
addRangeTableEntryForRelation, defGetBoolean, defGetInt32, defGetInt64, defGetString,
7+
get_namespace_name, get_rel_namespace, makeDefElem, makeString, make_parsestate,
8+
quote_qualified_identifier, AccessShareLock, AsPgCStr, CopyStmt, CreateTemplateTupleDesc,
9+
DefElem, List, NoLock, Node, NodeTag::T_CopyStmt, Oid, ParseNamespaceItem, ParseState,
10+
PlannedStmt, QueryEnvironment, RangeVar, RangeVarGetRelidExtended, RowExclusiveLock,
11+
TupleDescInitEntry,
1112
},
1213
PgBox, PgList, PgRelation, PgTupleDesc,
1314
};
@@ -109,7 +110,7 @@ pub(crate) fn validate_copy_to_options(p_stmt: &PgBox<PlannedStmt>, uri: &Url) {
109110
}
110111

111112
pub(crate) fn validate_copy_from_options(p_stmt: &PgBox<PlannedStmt>) {
112-
validate_copy_option_names(p_stmt, &["format", "freeze"]);
113+
validate_copy_option_names(p_stmt, &["format", "match_by_name", "freeze"]);
113114

114115
let format_option = copy_stmt_get_option(p_stmt, "format");
115116

@@ -253,6 +254,16 @@ pub(crate) fn copy_from_stmt_create_option_list(p_stmt: &PgBox<PlannedStmt>) ->
253254
new_copy_options
254255
}
255256

257+
pub(crate) fn copy_from_stmt_match_by_name(p_stmt: &PgBox<PlannedStmt>) -> bool {
258+
let match_by_name_option = copy_stmt_get_option(p_stmt, "match_by_name");
259+
260+
if match_by_name_option.is_null() {
261+
false
262+
} else {
263+
unsafe { defGetBoolean(match_by_name_option.as_ptr()) }
264+
}
265+
}
266+
256267
pub(crate) fn copy_stmt_get_option(
257268
p_stmt: &PgBox<PlannedStmt>,
258269
option_name: &str,

src/pgrx_tests/copy_from_coerce.rs

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -966,7 +966,7 @@ mod tests {
966966
}
967967

968968
#[pg_test]
969-
fn test_table_with_different_field_position() {
969+
fn test_table_with_different_position_match_by_name() {
970970
let copy_to = format!(
971971
"COPY (SELECT 1 as x, 'hello' as y) TO '{}'",
972972
LOCAL_TEST_FILE_PATH
@@ -976,13 +976,44 @@ mod tests {
976976
let create_table = "CREATE TABLE test_table (y text, x int)";
977977
Spi::run(create_table).unwrap();
978978

979-
let copy_from = format!("COPY test_table FROM '{}'", LOCAL_TEST_FILE_PATH);
979+
let copy_from = format!(
980+
"COPY test_table FROM '{}' WITH (match_by_name true)",
981+
LOCAL_TEST_FILE_PATH
982+
);
980983
Spi::run(&copy_from).unwrap();
981984

982985
let result = Spi::get_two::<&str, i32>("SELECT y, x FROM test_table LIMIT 1").unwrap();
983986
assert_eq!(result, (Some("hello"), Some(1)));
984987
}
985988

989+
#[pg_test]
990+
fn test_table_with_different_name_match_by_position() {
991+
let copy_to = "COPY (SELECT 1 as a, 'hello' as b) TO '/tmp/test.parquet'";
992+
Spi::run(copy_to).unwrap();
993+
994+
let create_table = "CREATE TABLE test_table (x bigint, y varchar)";
995+
Spi::run(create_table).unwrap();
996+
997+
let copy_from = "COPY test_table FROM '/tmp/test.parquet'";
998+
Spi::run(copy_from).unwrap();
999+
1000+
let result = Spi::get_two::<i64, &str>("SELECT x, y FROM test_table LIMIT 1").unwrap();
1001+
assert_eq!(result, (Some(1), Some("hello")));
1002+
}
1003+
1004+
#[pg_test]
1005+
#[should_panic(expected = "column count mismatch between table and parquet file")]
1006+
fn test_table_with_different_name_match_by_position_fail() {
1007+
let copy_to = "COPY (SELECT 1 as a, 'hello' as b) TO '/tmp/test.parquet'";
1008+
Spi::run(copy_to).unwrap();
1009+
1010+
let create_table = "CREATE TABLE test_table (x bigint, y varchar, z int)";
1011+
Spi::run(create_table).unwrap();
1012+
1013+
let copy_from = "COPY test_table FROM '/tmp/test.parquet'";
1014+
Spi::run(copy_from).unwrap();
1015+
}
1016+
9861017
#[pg_test]
9871018
#[should_panic(expected = "column \"name\" is not found in parquet file")]
9881019
fn test_missing_column_in_parquet() {
@@ -992,7 +1023,10 @@ mod tests {
9921023
let copy_to_parquet = format!("copy (select 100 as id) to '{}';", LOCAL_TEST_FILE_PATH);
9931024
Spi::run(&copy_to_parquet).unwrap();
9941025

995-
let copy_from = format!("COPY test_table FROM '{}'", LOCAL_TEST_FILE_PATH);
1026+
let copy_from = format!(
1027+
"COPY test_table FROM '{}' with (match_by_name true)",
1028+
LOCAL_TEST_FILE_PATH
1029+
);
9961030
Spi::run(&copy_from).unwrap();
9971031
}
9981032

src/pgrx_tests/copy_pg_rules.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,21 @@ mod tests {
101101
Spi::run(&copy_from).unwrap();
102102
}
103103

104+
#[pg_test]
105+
#[should_panic(expected = "COPY FROM parquet with generated columns is not supported")]
106+
fn test_copy_from_by_position_with_generated_columns_not_supported() {
107+
Spi::run("DROP TABLE IF EXISTS test_table;").unwrap();
108+
109+
Spi::run("CREATE TABLE test_table (a int, b int generated always as (10) stored, c text);")
110+
.unwrap();
111+
112+
let copy_from_query = format!(
113+
"COPY test_table FROM '{}' WITH (format parquet);",
114+
LOCAL_TEST_FILE_PATH
115+
);
116+
Spi::run(copy_from_query.as_str()).unwrap();
117+
}
118+
104119
#[pg_test]
105120
fn test_with_generated_and_dropped_columns() {
106121
Spi::run("DROP TABLE IF EXISTS test_table;").unwrap();
@@ -123,7 +138,7 @@ mod tests {
123138
Spi::run("TRUNCATE test_table;").unwrap();
124139

125140
let copy_from_query = format!(
126-
"COPY test_table FROM '{}' WITH (format parquet);",
141+
"COPY test_table FROM '{}' WITH (format parquet, match_by_name true);",
127142
LOCAL_TEST_FILE_PATH
128143
);
129144
Spi::run(copy_from_query.as_str()).unwrap();

0 commit comments

Comments
 (0)