Skip to content

Commit fef4b55

Browse files
committed
Adds basic geoparquet support
We already write postgis geometry column as WKB formatted binary, as specified by [geoparquet spec](https://geoparquet.org/releases/v1.0.0-rc.1/). With this PR, we also write basic geoparquet metadata. We support only basic info to be interoperable with duckdb.
1 parent fcb5036 commit fef4b55

File tree

6 files changed

+280
-21
lines changed

6 files changed

+280
-21
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ parquet = {version = "53", default-features = false, features = [
4141
]}
4242
pgrx = "=0.12.9"
4343
rust-ini = "0.21"
44+
serde = "1"
45+
serde_json = "1"
4446
tokio = {version = "1", default-features = false, features = ["rt", "time", "macros"]}
4547
url = "2"
4648

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,7 @@ There is currently only one GUC parameter to enable/disable the `pg_parquet`:
294294
> * `numeric` is allowed by Postgres. (precision and scale not specified). These are represented by a default precision (38) and scale (9) instead of writing them as string. You get runtime error if your table tries to read or write a numeric value which is not allowed by the default precision and scale (29 integral digits before decimal point, 9 digits after decimal point).
295295
> - (2) The `date` type is represented according to `Unix epoch` when writing to Parquet files. It is converted back according to `PostgreSQL epoch` when reading from Parquet files.
296296
> - (3) The `timestamptz` and `timetz` types are adjusted to `UTC` when writing to Parquet files. They are converted back with `UTC` timezone when reading from Parquet files.
297-
> - (4) The `geometry` type is represented as `BYTE_ARRAY` encoded as `WKB` when `postgis` extension is created. Otherwise, it is represented as `BYTE_ARRAY` with `STRING` logical type.
297+
> - (4) The `geometry` type is represented as `BYTE_ARRAY` encoded as `WKB`, specified by [geoparquet spec](https://geoparquet.org/releases/v1.1.0/), when `postgis` extension is created. Otherwise, it is represented as `BYTE_ARRAY` with `STRING` logical type.
298298
> - (5) `crunchy_map` is dependent on functionality provided by [Crunchy Bridge](https://www.crunchydata.com/products/crunchy-bridge). The `crunchy_map` type is represented as `GROUP` with `MAP` logical type when `crunchy_map` extension is created. Otherwise, it is represented as `BYTE_ARRAY` with `STRING` logical type.
299299
300300
> [!WARNING]

src/arrow_parquet/parquet_writer.rs

Lines changed: 35 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,9 @@ use arrow_schema::SchemaRef;
55
use parquet::{
66
arrow::{async_writer::ParquetObjectWriter, AsyncArrowWriter},
77
file::properties::{EnabledStatistics, WriterProperties},
8+
format::KeyValue,
89
};
9-
use pgrx::{heap_tuple::PgHeapTuple, pg_sys::RECORDOID, AllocatedByRust, PgTupleDesc};
10+
use pgrx::{heap_tuple::PgHeapTuple, AllocatedByRust, PgTupleDesc};
1011
use url::Url;
1112

1213
use crate::{
@@ -18,7 +19,10 @@ use crate::{
1819
uri_utils::parquet_writer_from_uri,
1920
},
2021
pgrx_utils::{collect_attributes_for, CollectAttributesFor},
21-
type_compat::{geometry::reset_postgis_context, map::reset_map_context},
22+
type_compat::{
23+
geometry::{geoparquet_metadata_json_from_tupledesc, reset_postgis_context},
24+
map::reset_map_context,
25+
},
2226
PG_BACKEND_TOKIO_RUNTIME,
2327
};
2428

@@ -42,25 +46,11 @@ impl ParquetWriterContext {
4246
compression_level: i32,
4347
tupledesc: &PgTupleDesc,
4448
) -> ParquetWriterContext {
45-
debug_assert!(tupledesc.oid() == RECORDOID);
46-
4749
// Postgis and Map contexts are used throughout writing the parquet file.
4850
// We need to reset them to avoid reading the stale data. (e.g. extension could be dropped)
4951
reset_postgis_context();
5052
reset_map_context();
5153

52-
let writer_props = WriterProperties::builder()
53-
.set_statistics_enabled(EnabledStatistics::Page)
54-
.set_compression(
55-
PgParquetCompressionWithLevel {
56-
compression,
57-
compression_level,
58-
}
59-
.into(),
60-
)
61-
.set_created_by("pg_parquet".to_string())
62-
.build();
63-
6454
let attributes = collect_attributes_for(CollectAttributesFor::CopyTo, tupledesc);
6555

6656
pgrx::debug2!(
@@ -71,6 +61,8 @@ impl ParquetWriterContext {
7161
let schema = parse_arrow_schema_from_attributes(&attributes);
7262
let schema = Arc::new(schema);
7363

64+
let writer_props = Self::writer_props(tupledesc, compression, compression_level);
65+
7466
let parquet_writer = parquet_writer_from_uri(&uri, schema.clone(), writer_props);
7567

7668
let attribute_contexts =
@@ -83,6 +75,33 @@ impl ParquetWriterContext {
8375
}
8476
}
8577

78+
fn writer_props(
79+
tupledesc: &PgTupleDesc,
80+
compression: PgParquetCompression,
81+
compression_level: i32,
82+
) -> WriterProperties {
83+
let compression = PgParquetCompressionWithLevel {
84+
compression,
85+
compression_level,
86+
};
87+
88+
let mut writer_props_builder = WriterProperties::builder()
89+
.set_statistics_enabled(EnabledStatistics::Page)
90+
.set_compression(compression.into())
91+
.set_created_by("pg_parquet".to_string());
92+
93+
let geometry_columns_metadata_value = geoparquet_metadata_json_from_tupledesc(tupledesc);
94+
95+
if geometry_columns_metadata_value.is_some() {
96+
let key_value_metadata = KeyValue::new("geo".into(), geometry_columns_metadata_value);
97+
98+
writer_props_builder =
99+
writer_props_builder.set_key_value_metadata(Some(vec![key_value_metadata]));
100+
}
101+
102+
writer_props_builder.build()
103+
}
104+
86105
pub(crate) fn write_new_row_group(
87106
&mut self,
88107
tuples: Vec<Option<PgHeapTuple<AllocatedByRust>>>,

src/pgrx_tests/copy_type_roundtrip.rs

Lines changed: 115 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,20 @@ mod tests {
88
LOCAL_TEST_FILE_PATH,
99
};
1010
use crate::type_compat::fallback_to_text::FallbackToText;
11-
use crate::type_compat::geometry::Geometry;
11+
use crate::type_compat::geometry::{
12+
Geometry, GeometryColumnsMetadata, GeometryEncoding, GeometryType,
13+
};
1214
use crate::type_compat::map::Map;
1315
use crate::type_compat::pg_arrow_type_conversions::{
1416
DEFAULT_UNBOUNDED_NUMERIC_PRECISION, DEFAULT_UNBOUNDED_NUMERIC_SCALE,
1517
};
1618
use pgrx::pg_sys::Oid;
17-
use pgrx::pg_test;
1819
use pgrx::{
1920
composite_type,
2021
datum::{Date, Time, TimeWithTimeZone, Timestamp, TimestampWithTimeZone},
2122
AnyNumeric, Spi,
2223
};
24+
use pgrx::{pg_test, JsonB};
2325

2426
#[pg_test]
2527
fn test_int2() {
@@ -946,6 +948,117 @@ mod tests {
946948
test_table.assert_expected_and_result_rows();
947949
}
948950

951+
#[pg_test]
952+
fn test_geometry_geoparquet_metadata() {
953+
// Skip the test if postgis extension is not available
954+
if !extension_exists("postgis") {
955+
return;
956+
}
957+
958+
let query = "DROP EXTENSION IF EXISTS postgis; CREATE EXTENSION postgis;";
959+
Spi::run(query).unwrap();
960+
961+
let copy_to_query = format!(
962+
"COPY (SELECT ST_GeomFromText('POINT(1 1)')::geometry(point) as a,
963+
ST_GeomFromText('LINESTRING(0 0, 1 1)')::geometry(linestring) as b,
964+
ST_GeomFromText('POLYGON((0 0, 1 1, 2 2, 0 0))')::geometry(polygon) as c,
965+
ST_GeomFromText('MULTIPOINT((0 0), (1 1))')::geometry(multipoint) as d,
966+
ST_GeomFromText('MULTILINESTRING((0 0, 1 1), (2 2, 3 3))')::geometry(multilinestring) as e,
967+
ST_GeomFromText('MULTIPOLYGON(((0 0, 1 1, 2 2, 0 0)), ((3 3, 4 4, 5 5, 3 3)))')::geometry(multipolygon) as f,
968+
ST_GeomFromText('GEOMETRYCOLLECTION(POINT(1 1), LINESTRING(0 0, 1 1))')::geometry(geometrycollection) as g
969+
)
970+
TO '{LOCAL_TEST_FILE_PATH}' WITH (format parquet);",
971+
);
972+
Spi::run(copy_to_query.as_str()).unwrap();
973+
974+
// Check geoparquet metadata
975+
let geoparquet_metadata_query = format!(
976+
"select encode(value, 'escape')::jsonb
977+
from parquet.kv_metadata('{LOCAL_TEST_FILE_PATH}')
978+
where encode(key, 'escape') = 'geo';",
979+
);
980+
let geoparquet_metadata_json = Spi::get_one::<JsonB>(geoparquet_metadata_query.as_str())
981+
.unwrap()
982+
.unwrap();
983+
984+
let geoparquet_metadata: GeometryColumnsMetadata =
985+
serde_json::from_value(geoparquet_metadata_json.0).unwrap();
986+
987+
// assert common metadata
988+
assert_eq!(geoparquet_metadata.version, "1.1.0");
989+
assert_eq!(geoparquet_metadata.primary_column, "a");
990+
991+
// point
992+
assert_eq!(
993+
geoparquet_metadata.columns.get("a").unwrap().encoding,
994+
GeometryEncoding::WKB
995+
);
996+
assert_eq!(
997+
geoparquet_metadata.columns.get("a").unwrap().geometry_types,
998+
vec![GeometryType::Point]
999+
);
1000+
1001+
// linestring
1002+
assert_eq!(
1003+
geoparquet_metadata.columns.get("b").unwrap().encoding,
1004+
GeometryEncoding::WKB
1005+
);
1006+
assert_eq!(
1007+
geoparquet_metadata.columns.get("b").unwrap().geometry_types,
1008+
vec![GeometryType::LineString]
1009+
);
1010+
1011+
// polygon
1012+
assert_eq!(
1013+
geoparquet_metadata.columns.get("c").unwrap().encoding,
1014+
GeometryEncoding::WKB
1015+
);
1016+
assert_eq!(
1017+
geoparquet_metadata.columns.get("c").unwrap().geometry_types,
1018+
vec![GeometryType::Polygon]
1019+
);
1020+
1021+
// multipoint
1022+
assert_eq!(
1023+
geoparquet_metadata.columns.get("d").unwrap().encoding,
1024+
GeometryEncoding::WKB
1025+
);
1026+
assert_eq!(
1027+
geoparquet_metadata.columns.get("d").unwrap().geometry_types,
1028+
vec![GeometryType::MultiPoint]
1029+
);
1030+
1031+
// multilinestring
1032+
assert_eq!(
1033+
geoparquet_metadata.columns.get("e").unwrap().encoding,
1034+
GeometryEncoding::WKB
1035+
);
1036+
assert_eq!(
1037+
geoparquet_metadata.columns.get("e").unwrap().geometry_types,
1038+
vec![GeometryType::MultiLineString]
1039+
);
1040+
1041+
// multipolygon
1042+
assert_eq!(
1043+
geoparquet_metadata.columns.get("f").unwrap().encoding,
1044+
GeometryEncoding::WKB
1045+
);
1046+
assert_eq!(
1047+
geoparquet_metadata.columns.get("f").unwrap().geometry_types,
1048+
vec![GeometryType::MultiPolygon]
1049+
);
1050+
1051+
// geometrycollection
1052+
assert_eq!(
1053+
geoparquet_metadata.columns.get("g").unwrap().encoding,
1054+
GeometryEncoding::WKB
1055+
);
1056+
assert_eq!(
1057+
geoparquet_metadata.columns.get("g").unwrap().geometry_types,
1058+
vec![GeometryType::GeometryCollection]
1059+
);
1060+
}
1061+
9491062
#[pg_test]
9501063
fn test_complex_composite() {
9511064
Spi::run("CREATE TYPE dog AS (name text, age int);").unwrap();

0 commit comments

Comments
 (0)