Skip to content

Commit ef6f07d

Browse files
committed
Coerce types on read
`COPY FROM parquet` is too strict when matching Postgres tupledesc schema to the schema from parquet file. e.g. `INT32` type in the parquet schema cannot be read into a Postgres column with `int64` type. We can avoid this situation by adding a `is_coercible(from_type, to_type)` check while matching the expected schema from the parquet file. With that we can coerce as shown below from parquet source type to Postgres destination types: - INT16 => {int32, int64} - INT32 => {int64} - UINT16 => {int16, int32, int64} - UINT32 => {int32, int64} - UINT64 => {int64} - FLOAT32 => {double} As we use arrow as intermediate format, it might be the case that `LargeUtf8` or `LargeBinary` types are used by the external writer instead of `Utf8` and `Binary`. That is why we also need to support below coercions for arrow source types: - `Utf8 | LargeUtf8` => {text} - `Binary | LargeBinary` => {bytea} Closes #67.
1 parent 518a5ac commit ef6f07d

File tree

12 files changed

+922
-173
lines changed

12 files changed

+922
-173
lines changed

src/arrow_parquet/arrow_to_pg.rs

Lines changed: 312 additions & 163 deletions
Large diffs are not rendered by default.

src/arrow_parquet/arrow_to_pg/bytea.rs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use arrow::array::{Array, BinaryArray};
1+
use arrow::array::{Array, BinaryArray, LargeBinaryArray};
22

33
use super::{ArrowArrayToPgType, ArrowToPgAttributeContext};
44

@@ -13,6 +13,16 @@ impl ArrowArrayToPgType<Vec<u8>> for BinaryArray {
1313
}
1414
}
1515

16+
impl ArrowArrayToPgType<Vec<u8>> for LargeBinaryArray {
17+
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<Vec<u8>> {
18+
if self.is_null(0) {
19+
None
20+
} else {
21+
Some(self.value(0).to_vec())
22+
}
23+
}
24+
}
25+
1626
// Bytea[]
1727
impl ArrowArrayToPgType<Vec<Option<Vec<u8>>>> for BinaryArray {
1828
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<Vec<Option<Vec<u8>>>> {
@@ -28,3 +38,18 @@ impl ArrowArrayToPgType<Vec<Option<Vec<u8>>>> for BinaryArray {
2838
Some(vals)
2939
}
3040
}
41+
42+
impl ArrowArrayToPgType<Vec<Option<Vec<u8>>>> for LargeBinaryArray {
43+
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<Vec<Option<Vec<u8>>>> {
44+
let mut vals = vec![];
45+
for val in self.iter() {
46+
if let Some(val) = val {
47+
vals.push(Some(val.to_vec()));
48+
} else {
49+
vals.push(None);
50+
}
51+
}
52+
53+
Some(vals)
54+
}
55+
}

src/arrow_parquet/arrow_to_pg/char.rs

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use arrow::array::{Array, StringArray};
1+
use arrow::array::{Array, LargeStringArray, StringArray};
22

33
use super::{ArrowArrayToPgType, ArrowToPgAttributeContext};
44

@@ -15,6 +15,18 @@ impl ArrowArrayToPgType<i8> for StringArray {
1515
}
1616
}
1717

18+
impl ArrowArrayToPgType<i8> for LargeStringArray {
19+
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<i8> {
20+
if self.is_null(0) {
21+
None
22+
} else {
23+
let val = self.value(0);
24+
let val: i8 = val.chars().next().expect("unexpected ascii char") as i8;
25+
Some(val)
26+
}
27+
}
28+
}
29+
1830
// Char[]
1931
impl ArrowArrayToPgType<Vec<Option<i8>>> for StringArray {
2032
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<Vec<Option<i8>>> {
@@ -29,3 +41,17 @@ impl ArrowArrayToPgType<Vec<Option<i8>>> for StringArray {
2941
Some(vals)
3042
}
3143
}
44+
45+
impl ArrowArrayToPgType<Vec<Option<i8>>> for LargeStringArray {
46+
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<Vec<Option<i8>>> {
47+
let mut vals = vec![];
48+
for val in self.iter() {
49+
let val = val.map(|val| {
50+
let val: i8 = val.chars().next().expect("unexpected ascii char") as i8;
51+
val
52+
});
53+
vals.push(val);
54+
}
55+
Some(vals)
56+
}
57+
}

src/arrow_parquet/arrow_to_pg/fallback_to_text.rs

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use arrow::array::{Array, StringArray};
1+
use arrow::array::{Array, LargeStringArray, StringArray};
22

33
use crate::type_compat::fallback_to_text::FallbackToText;
44

@@ -17,6 +17,18 @@ impl ArrowArrayToPgType<FallbackToText> for StringArray {
1717
}
1818
}
1919

20+
impl ArrowArrayToPgType<FallbackToText> for LargeStringArray {
21+
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<FallbackToText> {
22+
if self.is_null(0) {
23+
None
24+
} else {
25+
let text_repr = self.value(0).to_string();
26+
let val = FallbackToText(text_repr);
27+
Some(val)
28+
}
29+
}
30+
}
31+
2032
// Text[] representation of any type
2133
impl ArrowArrayToPgType<Vec<Option<FallbackToText>>> for StringArray {
2234
fn to_pg_type(
@@ -31,3 +43,17 @@ impl ArrowArrayToPgType<Vec<Option<FallbackToText>>> for StringArray {
3143
Some(vals)
3244
}
3345
}
46+
47+
impl ArrowArrayToPgType<Vec<Option<FallbackToText>>> for LargeStringArray {
48+
fn to_pg_type(
49+
self,
50+
_context: &ArrowToPgAttributeContext,
51+
) -> Option<Vec<Option<FallbackToText>>> {
52+
let mut vals = vec![];
53+
for val in self.iter() {
54+
let val = val.map(|val| FallbackToText(val.to_string()));
55+
vals.push(val);
56+
}
57+
Some(vals)
58+
}
59+
}

src/arrow_parquet/arrow_to_pg/float4.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,17 @@ impl ArrowArrayToPgType<f32> for Float32Array {
1414
}
1515
}
1616

17+
impl ArrowArrayToPgType<f64> for Float32Array {
18+
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<f64> {
19+
if self.is_null(0) {
20+
None
21+
} else {
22+
let val = self.value(0) as _;
23+
Some(val)
24+
}
25+
}
26+
}
27+
1728
// Float4[]
1829
impl ArrowArrayToPgType<Vec<Option<f32>>> for Float32Array {
1930
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<Vec<Option<f32>>> {
@@ -24,3 +35,13 @@ impl ArrowArrayToPgType<Vec<Option<f32>>> for Float32Array {
2435
Some(vals)
2536
}
2637
}
38+
39+
impl ArrowArrayToPgType<Vec<Option<f64>>> for Float32Array {
40+
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<Vec<Option<f64>>> {
41+
let mut vals = vec![];
42+
for val in self.iter() {
43+
vals.push(val.map(|val| val as _));
44+
}
45+
Some(vals)
46+
}
47+
}

src/arrow_parquet/arrow_to_pg/geometry.rs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use arrow::array::{Array, BinaryArray};
1+
use arrow::array::{Array, BinaryArray, LargeBinaryArray};
22

33
use crate::type_compat::geometry::Geometry;
44

@@ -15,6 +15,16 @@ impl ArrowArrayToPgType<Geometry> for BinaryArray {
1515
}
1616
}
1717

18+
impl ArrowArrayToPgType<Geometry> for LargeBinaryArray {
19+
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<Geometry> {
20+
if self.is_null(0) {
21+
None
22+
} else {
23+
Some(self.value(0).to_vec().into())
24+
}
25+
}
26+
}
27+
1828
// Geometry[]
1929
impl ArrowArrayToPgType<Vec<Option<Geometry>>> for BinaryArray {
2030
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<Vec<Option<Geometry>>> {
@@ -30,3 +40,18 @@ impl ArrowArrayToPgType<Vec<Option<Geometry>>> for BinaryArray {
3040
Some(vals)
3141
}
3242
}
43+
44+
impl ArrowArrayToPgType<Vec<Option<Geometry>>> for LargeBinaryArray {
45+
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<Vec<Option<Geometry>>> {
46+
let mut vals = vec![];
47+
for val in self.iter() {
48+
if let Some(val) = val {
49+
vals.push(Some(val.to_vec().into()));
50+
} else {
51+
vals.push(None);
52+
}
53+
}
54+
55+
Some(vals)
56+
}
57+
}

src/arrow_parquet/arrow_to_pg/int2.rs

Lines changed: 106 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use arrow::array::{Array, Int16Array};
1+
use arrow::array::{Array, Int16Array, UInt16Array};
22

33
use super::{ArrowArrayToPgType, ArrowToPgAttributeContext};
44

@@ -14,6 +14,61 @@ impl ArrowArrayToPgType<i16> for Int16Array {
1414
}
1515
}
1616

17+
impl ArrowArrayToPgType<i32> for Int16Array {
18+
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<i32> {
19+
if self.is_null(0) {
20+
None
21+
} else {
22+
let val = self.value(0) as _;
23+
Some(val)
24+
}
25+
}
26+
}
27+
28+
impl ArrowArrayToPgType<i64> for Int16Array {
29+
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<i64> {
30+
if self.is_null(0) {
31+
None
32+
} else {
33+
let val = self.value(0) as _;
34+
Some(val)
35+
}
36+
}
37+
}
38+
39+
impl ArrowArrayToPgType<i16> for UInt16Array {
40+
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<i16> {
41+
if self.is_null(0) {
42+
None
43+
} else {
44+
let val = self.value(0) as _;
45+
Some(val)
46+
}
47+
}
48+
}
49+
50+
impl ArrowArrayToPgType<i32> for UInt16Array {
51+
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<i32> {
52+
if self.is_null(0) {
53+
None
54+
} else {
55+
let val = self.value(0) as _;
56+
Some(val)
57+
}
58+
}
59+
}
60+
61+
impl ArrowArrayToPgType<i64> for UInt16Array {
62+
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<i64> {
63+
if self.is_null(0) {
64+
None
65+
} else {
66+
let val = self.value(0) as _;
67+
Some(val)
68+
}
69+
}
70+
}
71+
1772
// Int2[]
1873
impl ArrowArrayToPgType<Vec<Option<i16>>> for Int16Array {
1974
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<Vec<Option<i16>>> {
@@ -24,3 +79,53 @@ impl ArrowArrayToPgType<Vec<Option<i16>>> for Int16Array {
2479
Some(vals)
2580
}
2681
}
82+
83+
impl ArrowArrayToPgType<Vec<Option<i32>>> for Int16Array {
84+
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<Vec<Option<i32>>> {
85+
let mut vals = vec![];
86+
for val in self.iter() {
87+
vals.push(val.map(|val| val as _));
88+
}
89+
Some(vals)
90+
}
91+
}
92+
93+
impl ArrowArrayToPgType<Vec<Option<i64>>> for Int16Array {
94+
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<Vec<Option<i64>>> {
95+
let mut vals = vec![];
96+
for val in self.iter() {
97+
vals.push(val.map(|val| val as _));
98+
}
99+
Some(vals)
100+
}
101+
}
102+
103+
impl ArrowArrayToPgType<Vec<Option<i16>>> for UInt16Array {
104+
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<Vec<Option<i16>>> {
105+
let mut vals = vec![];
106+
for val in self.iter() {
107+
vals.push(val.map(|val| val as _));
108+
}
109+
Some(vals)
110+
}
111+
}
112+
113+
impl ArrowArrayToPgType<Vec<Option<i32>>> for UInt16Array {
114+
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<Vec<Option<i32>>> {
115+
let mut vals = vec![];
116+
for val in self.iter() {
117+
vals.push(val.map(|val| val as _));
118+
}
119+
Some(vals)
120+
}
121+
}
122+
123+
impl ArrowArrayToPgType<Vec<Option<i64>>> for UInt16Array {
124+
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<Vec<Option<i64>>> {
125+
let mut vals = vec![];
126+
for val in self.iter() {
127+
vals.push(val.map(|val| val as _));
128+
}
129+
Some(vals)
130+
}
131+
}

0 commit comments

Comments
 (0)