Skip to content

Commit 854b607

Browse files
committed
Support uuid, json, jsonb natively
Arrow supports extension types with new major version 54. They can correctly convert uuid and json arrow arrays from/to parquet. This PR uses this fact to natively read/write these types from/to parquet to/from postgres. Closes #46.
1 parent 6014e11 commit 854b607

File tree

14 files changed

+437
-54
lines changed

14 files changed

+437
-54
lines changed

Cargo.lock

Lines changed: 4 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ pg_test = []
2222
[dependencies]
2323
arrow = {version = "54", default-features = false}
2424
arrow-cast = {version = "54", default-features = false}
25-
arrow-schema = {version = "54", default-features = false}
25+
arrow-schema = {version = "54", default-features = false, features = ["canonical_extension_types"]}
2626
aws-config = { version = "1", default-features = false, features = ["rustls","rt-tokio"] }
2727
aws-credential-types = {version = "1", default-features = false}
2828
azure_storage = {version = "0.21", default-features = false}
@@ -38,6 +38,7 @@ parquet = {version = "54", default-features = false, features = [
3838
"lz4",
3939
"zstd",
4040
"object_store",
41+
"arrow_canonical_extension_types",
4142
]}
4243
pgrx = "=0.13.1"
4344
rust-ini = "0.21"

README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,9 @@ There is currently only one GUC parameter to enable/disable the `pg_parquet`:
269269
| `double` | DOUBLE | |
270270
| `numeric`(1) | FIXED_LEN_BYTE_ARRAY(16) | DECIMAL(128) |
271271
| `text` | BYTE_ARRAY | STRING |
272-
| `json` | BYTE_ARRAY | STRING |
272+
| `json` | BYTE_ARRAY | JSON |
273+
| `jsonb` | BYTE_ARRAY | JSON |
274+
| `uuid` | FIXED_LEN_BYTE_ARRAY(16) | UUID |
273275
| `bytea` | BYTE_ARRAY | |
274276
| `date` (2) | INT32 | DATE |
275277
| `timestamp` | INT64 | TIMESTAMP_MICROS |

src/arrow_parquet/arrow_to_pg.rs

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
11
use arrow::array::{
2-
Array, ArrayData, BinaryArray, BooleanArray, Date32Array, Decimal128Array, Float32Array,
3-
Float64Array, Int16Array, Int32Array, Int64Array, ListArray, MapArray, StringArray,
4-
StructArray, Time64MicrosecondArray, TimestampMicrosecondArray, UInt32Array,
2+
Array, ArrayData, BinaryArray, BooleanArray, Date32Array, Decimal128Array,
3+
FixedSizeBinaryArray, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array,
4+
ListArray, MapArray, StringArray, StructArray, Time64MicrosecondArray,
5+
TimestampMicrosecondArray, UInt32Array,
56
};
67
use arrow_schema::{DataType, TimeUnit};
78
use context::ArrowToPgAttributeContext;
89
use pgrx::{
910
datum::{Date, Time, TimeWithTimeZone, Timestamp, TimestampWithTimeZone},
10-
pg_sys::{Datum, Oid, CHAROID, TEXTOID, TIMEOID},
11+
pg_sys::{Datum, Oid, CHAROID, JSONBOID, JSONOID, TEXTOID, TIMEOID, UUIDOID},
1112
prelude::PgHeapTuple,
12-
AllocatedByRust, AnyNumeric, IntoDatum,
13+
AllocatedByRust, AnyNumeric, IntoDatum, Json, JsonB, Uuid,
1314
};
1415

1516
use crate::{
@@ -37,6 +38,8 @@ pub(crate) mod geometry;
3738
pub(crate) mod int2;
3839
pub(crate) mod int4;
3940
pub(crate) mod int8;
41+
pub(crate) mod json;
42+
pub(crate) mod jsonb;
4043
pub(crate) mod map;
4144
pub(crate) mod numeric;
4245
pub(crate) mod oid;
@@ -45,6 +48,7 @@ pub(crate) mod time;
4548
pub(crate) mod timestamp;
4649
pub(crate) mod timestamptz;
4750
pub(crate) mod timetz;
51+
pub(crate) mod uuid;
4852

4953
pub(crate) trait ArrowArrayToPgType<T: IntoDatum>: From<ArrayData> {
5054
fn to_pg_type(self, context: &ArrowToPgAttributeContext) -> Option<T>;
@@ -102,6 +106,10 @@ fn to_pg_nonarray_datum(
102106
to_pg_datum!(StringArray, i8, primitive_array, attribute_context)
103107
} else if attribute_context.typoid() == TEXTOID {
104108
to_pg_datum!(StringArray, String, primitive_array, attribute_context)
109+
} else if attribute_context.typoid() == JSONOID {
110+
to_pg_datum!(StringArray, Json, primitive_array, attribute_context)
111+
} else if attribute_context.typoid() == JSONBOID {
112+
to_pg_datum!(StringArray, JsonB, primitive_array, attribute_context)
105113
} else {
106114
reset_fallback_to_text_context(
107115
attribute_context.typoid(),
@@ -123,6 +131,14 @@ fn to_pg_nonarray_datum(
123131
to_pg_datum!(BinaryArray, Vec<u8>, primitive_array, attribute_context)
124132
}
125133
}
134+
DataType::FixedSizeBinary(16) if attribute_context.typoid() == UUIDOID => {
135+
to_pg_datum!(
136+
FixedSizeBinaryArray,
137+
Uuid,
138+
primitive_array,
139+
attribute_context
140+
)
141+
}
126142
DataType::Decimal128(_, _) => {
127143
to_pg_datum!(
128144
Decimal128Array,
@@ -232,6 +248,10 @@ fn to_pg_array_datum(
232248
list_array,
233249
element_context
234250
)
251+
} else if element_context.typoid() == JSONOID {
252+
to_pg_datum!(StringArray, Vec<Option<Json>>, list_array, element_context)
253+
} else if element_context.typoid() == JSONBOID {
254+
to_pg_datum!(StringArray, Vec<Option<JsonB>>, list_array, element_context)
235255
} else {
236256
reset_fallback_to_text_context(element_context.typoid(), element_context.typmod());
237257

@@ -260,6 +280,14 @@ fn to_pg_array_datum(
260280
)
261281
}
262282
}
283+
DataType::FixedSizeBinary(16) if element_context.typoid() == UUIDOID => {
284+
to_pg_datum!(
285+
FixedSizeBinaryArray,
286+
Vec<Option<Uuid>>,
287+
list_array,
288+
element_context
289+
)
290+
}
263291
DataType::Decimal128(_, _) => {
264292
to_pg_datum!(
265293
Decimal128Array,
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
use arrow::array::{Array, StringArray};
2+
use pgrx::Json;
3+
4+
use super::{ArrowArrayToPgType, ArrowToPgAttributeContext};
5+
6+
// Json
7+
impl ArrowArrayToPgType<Json> for StringArray {
8+
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<Json> {
9+
if self.is_null(0) {
10+
None
11+
} else {
12+
let val = self.value(0);
13+
Some(Json(
14+
serde_json::from_str(val).unwrap_or_else(|_| panic!("invalid json: {}", val)),
15+
))
16+
}
17+
}
18+
}
19+
20+
// Json[]
21+
impl ArrowArrayToPgType<Vec<Option<Json>>> for StringArray {
22+
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<Vec<Option<Json>>> {
23+
let mut vals = vec![];
24+
for val in self.iter() {
25+
let val = val.map(|val| {
26+
Json(serde_json::from_str(val).unwrap_or_else(|_| panic!("invalid json: {}", val)))
27+
});
28+
vals.push(val);
29+
}
30+
Some(vals)
31+
}
32+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
use arrow::array::{Array, StringArray};
2+
use pgrx::JsonB;
3+
4+
use super::{ArrowArrayToPgType, ArrowToPgAttributeContext};
5+
6+
// JsonB
7+
impl ArrowArrayToPgType<JsonB> for StringArray {
8+
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<JsonB> {
9+
if self.is_null(0) {
10+
None
11+
} else {
12+
let val = self.value(0);
13+
Some(JsonB(
14+
serde_json::from_str(val).unwrap_or_else(|_| panic!("invalid jsonb: {}", val)),
15+
))
16+
}
17+
}
18+
}
19+
20+
// JsonB[]
21+
impl ArrowArrayToPgType<Vec<Option<JsonB>>> for StringArray {
22+
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<Vec<Option<JsonB>>> {
23+
let mut vals = vec![];
24+
for val in self.iter() {
25+
let val = val.map(|val| {
26+
JsonB(
27+
serde_json::from_str(val).unwrap_or_else(|_| panic!("invalid jsonb: {}", val)),
28+
)
29+
});
30+
vals.push(val);
31+
}
32+
Some(vals)
33+
}
34+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
use arrow::array::{Array, FixedSizeBinaryArray};
2+
use pgrx::Uuid;
3+
4+
use super::{ArrowArrayToPgType, ArrowToPgAttributeContext};
5+
6+
// Uuid
7+
impl ArrowArrayToPgType<Uuid> for FixedSizeBinaryArray {
8+
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<Uuid> {
9+
if self.is_null(0) {
10+
None
11+
} else {
12+
let val = self.value(0);
13+
Some(Uuid::from_slice(val).expect("Invalid Uuid"))
14+
}
15+
}
16+
}
17+
18+
// Uuid[]
19+
impl ArrowArrayToPgType<Vec<Option<Uuid>>> for FixedSizeBinaryArray {
20+
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<Vec<Option<Uuid>>> {
21+
let mut vals = vec![];
22+
for val in self.iter() {
23+
vals.push(val.map(|val| Uuid::from_slice(val).expect("Invalid Uuid")));
24+
}
25+
Some(vals)
26+
}
27+
}

src/arrow_parquet/pg_to_arrow.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,10 @@ use pgrx::{
88
heap_tuple::PgHeapTuple,
99
pg_sys::{
1010
Oid, BOOLOID, BYTEAOID, CHAROID, DATEOID, FLOAT4OID, FLOAT8OID, INT2OID, INT4OID, INT8OID,
11-
NUMERICOID, OIDOID, TEXTOID, TIMEOID, TIMESTAMPOID, TIMESTAMPTZOID, TIMETZOID,
11+
JSONBOID, JSONOID, NUMERICOID, OIDOID, TEXTOID, TIMEOID, TIMESTAMPOID, TIMESTAMPTZOID,
12+
TIMETZOID, UUIDOID,
1213
},
13-
AllocatedByRust, AnyNumeric, FromDatum,
14+
AllocatedByRust, AnyNumeric, FromDatum, Json, JsonB, Uuid,
1415
};
1516

1617
use crate::{
@@ -41,6 +42,8 @@ pub(crate) mod geometry;
4142
pub(crate) mod int2;
4243
pub(crate) mod int4;
4344
pub(crate) mod int8;
45+
pub(crate) mod json;
46+
pub(crate) mod jsonb;
4447
pub(crate) mod map;
4548
pub(crate) mod numeric;
4649
pub(crate) mod oid;
@@ -49,6 +52,7 @@ pub(crate) mod time;
4952
pub(crate) mod timestamp;
5053
pub(crate) mod timestamptz;
5154
pub(crate) mod timetz;
55+
pub(crate) mod uuid;
5256

5357
pub(crate) trait PgTypeToArrowArray<T: FromDatum + UnboxDatum> {
5458
fn to_arrow_array(self, context: &PgToArrowAttributeContext) -> ArrayRef;
@@ -130,6 +134,7 @@ fn to_arrow_primitive_array(
130134
INT2OID => to_arrow_primitive_array!(i16, tuples, attribute_context),
131135
INT4OID => to_arrow_primitive_array!(i32, tuples, attribute_context),
132136
INT8OID => to_arrow_primitive_array!(i64, tuples, attribute_context),
137+
UUIDOID => to_arrow_primitive_array!(Uuid, tuples, attribute_context),
133138
NUMERICOID => {
134139
let precision = attribute_context.precision();
135140

@@ -154,6 +159,8 @@ fn to_arrow_primitive_array(
154159
}
155160
CHAROID => to_arrow_primitive_array!(i8, tuples, attribute_context),
156161
TEXTOID => to_arrow_primitive_array!(String, tuples, attribute_context),
162+
JSONOID => to_arrow_primitive_array!(Json, tuples, attribute_context),
163+
JSONBOID => to_arrow_primitive_array!(JsonB, tuples, attribute_context),
157164
BYTEAOID => to_arrow_primitive_array!(&[u8], tuples, attribute_context),
158165
OIDOID => to_arrow_primitive_array!(Oid, tuples, attribute_context),
159166
_ => {
@@ -224,6 +231,7 @@ fn to_arrow_list_array(
224231
INT2OID => to_arrow_list_array!(pgrx::Array<i16>, tuples, element_context),
225232
INT4OID => to_arrow_list_array!(pgrx::Array<i32>, tuples, element_context),
226233
INT8OID => to_arrow_list_array!(pgrx::Array<i64>, tuples, element_context),
234+
UUIDOID => to_arrow_list_array!(pgrx::Array<Uuid>, tuples, element_context),
227235
NUMERICOID => {
228236
let precision = element_context.precision();
229237

@@ -249,6 +257,8 @@ fn to_arrow_list_array(
249257
}
250258
CHAROID => to_arrow_list_array!(pgrx::Array<i8>, tuples, element_context),
251259
TEXTOID => to_arrow_list_array!(pgrx::Array<String>, tuples, element_context),
260+
JSONOID => to_arrow_list_array!(pgrx::Array<Json>, tuples, element_context),
261+
JSONBOID => to_arrow_list_array!(pgrx::Array<JsonB>, tuples, element_context),
252262
BYTEAOID => to_arrow_list_array!(pgrx::Array<&[u8]>, tuples, element_context),
253263
OIDOID => to_arrow_list_array!(pgrx::Array<Oid>, tuples, element_context),
254264
_ => {
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
use std::sync::Arc;
2+
3+
use arrow::array::{ArrayRef, ListArray, StringArray};
4+
use pgrx::Json;
5+
6+
use crate::arrow_parquet::{arrow_utils::arrow_array_offsets, pg_to_arrow::PgTypeToArrowArray};
7+
8+
use super::PgToArrowAttributeContext;
9+
10+
// Json
11+
impl PgTypeToArrowArray<Json> for Vec<Option<Json>> {
12+
fn to_arrow_array(self, _context: &PgToArrowAttributeContext) -> ArrayRef {
13+
let jsons = self
14+
.into_iter()
15+
.map(|json| {
16+
json.map(|json| {
17+
serde_json::to_string(&json)
18+
.unwrap_or_else(|e| panic!("failed to serialize JSON value: {}", e))
19+
})
20+
})
21+
.collect::<Vec<_>>();
22+
23+
let json_array = StringArray::from(jsons);
24+
Arc::new(json_array)
25+
}
26+
}
27+
28+
// Json[]
29+
impl PgTypeToArrowArray<Json> for Vec<Option<Vec<Option<Json>>>> {
30+
fn to_arrow_array(self, element_context: &PgToArrowAttributeContext) -> ArrayRef {
31+
let (offsets, nulls) = arrow_array_offsets(&self);
32+
33+
// gets rid of the first level of Option, then flattens the inner Vec<Option<bool>>.
34+
let pg_array = self
35+
.into_iter()
36+
.flatten()
37+
.flatten()
38+
.map(|json| {
39+
json.map(|json| {
40+
serde_json::to_string(&json)
41+
.unwrap_or_else(|e| panic!("failed to serialize JSON value: {}", e))
42+
})
43+
})
44+
.collect::<Vec<_>>();
45+
46+
let json_array = StringArray::from(pg_array);
47+
48+
let list_array = ListArray::new(
49+
element_context.field(),
50+
offsets,
51+
Arc::new(json_array),
52+
Some(nulls),
53+
);
54+
55+
Arc::new(list_array)
56+
}
57+
}

0 commit comments

Comments
 (0)