Skip to content

Commit 010eb4b

Browse files
committed
Adds basic geoparquet support
We already write postgis geometry column as WKB formatted binary, as specified by [geoparquet spec](https://geoparquet.org/releases/v1.0.0-rc.1/). With this PR, we also write basic geoparquet metadata. We support only basic info to be interoperable with duckdb.
1 parent fcb5036 commit 010eb4b

File tree

6 files changed

+217
-21
lines changed

6 files changed

+217
-21
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ parquet = {version = "53", default-features = false, features = [
4141
]}
4242
pgrx = "=0.12.9"
4343
rust-ini = "0.21"
44+
serde = "1"
45+
serde_json = "1"
4446
tokio = {version = "1", default-features = false, features = ["rt", "time", "macros"]}
4547
url = "2"
4648

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,7 @@ There is currently only one GUC parameter to enable/disable the `pg_parquet`:
294294
> * `numeric` is allowed by Postgres. (precision and scale not specified). These are represented by a default precision (38) and scale (9) instead of writing them as string. You get runtime error if your table tries to read or write a numeric value which is not allowed by the default precision and scale (29 integral digits before decimal point, 9 digits after decimal point).
295295
> - (2) The `date` type is represented according to `Unix epoch` when writing to Parquet files. It is converted back according to `PostgreSQL epoch` when reading from Parquet files.
296296
> - (3) The `timestamptz` and `timetz` types are adjusted to `UTC` when writing to Parquet files. They are converted back with `UTC` timezone when reading from Parquet files.
297-
> - (4) The `geometry` type is represented as `BYTE_ARRAY` encoded as `WKB` when `postgis` extension is created. Otherwise, it is represented as `BYTE_ARRAY` with `STRING` logical type.
297+
> - (4) The `geometry` type is represented as `BYTE_ARRAY` encoded as `WKB`, specified by [geoparquet spec](https://geoparquet.org/releases/v1.0.0/), when `postgis` extension is created. Otherwise, it is represented as `BYTE_ARRAY` with `STRING` logical type.
298298
> - (5) `crunchy_map` is dependent on functionality provided by [Crunchy Bridge](https://www.crunchydata.com/products/crunchy-bridge). The `crunchy_map` type is represented as `GROUP` with `MAP` logical type when `crunchy_map` extension is created. Otherwise, it is represented as `BYTE_ARRAY` with `STRING` logical type.
299299
300300
> [!WARNING]

src/arrow_parquet/parquet_writer.rs

Lines changed: 35 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,9 @@ use arrow_schema::SchemaRef;
55
use parquet::{
66
arrow::{async_writer::ParquetObjectWriter, AsyncArrowWriter},
77
file::properties::{EnabledStatistics, WriterProperties},
8+
format::KeyValue,
89
};
9-
use pgrx::{heap_tuple::PgHeapTuple, pg_sys::RECORDOID, AllocatedByRust, PgTupleDesc};
10+
use pgrx::{heap_tuple::PgHeapTuple, AllocatedByRust, PgTupleDesc};
1011
use url::Url;
1112

1213
use crate::{
@@ -18,7 +19,10 @@ use crate::{
1819
uri_utils::parquet_writer_from_uri,
1920
},
2021
pgrx_utils::{collect_attributes_for, CollectAttributesFor},
21-
type_compat::{geometry::reset_postgis_context, map::reset_map_context},
22+
type_compat::{
23+
geometry::{geoparquet_metadata_json_from_tupledesc, reset_postgis_context},
24+
map::reset_map_context,
25+
},
2226
PG_BACKEND_TOKIO_RUNTIME,
2327
};
2428

@@ -42,25 +46,11 @@ impl ParquetWriterContext {
4246
compression_level: i32,
4347
tupledesc: &PgTupleDesc,
4448
) -> ParquetWriterContext {
45-
debug_assert!(tupledesc.oid() == RECORDOID);
46-
4749
// Postgis and Map contexts are used throughout writing the parquet file.
4850
// We need to reset them to avoid reading the stale data. (e.g. extension could be dropped)
4951
reset_postgis_context();
5052
reset_map_context();
5153

52-
let writer_props = WriterProperties::builder()
53-
.set_statistics_enabled(EnabledStatistics::Page)
54-
.set_compression(
55-
PgParquetCompressionWithLevel {
56-
compression,
57-
compression_level,
58-
}
59-
.into(),
60-
)
61-
.set_created_by("pg_parquet".to_string())
62-
.build();
63-
6454
let attributes = collect_attributes_for(CollectAttributesFor::CopyTo, tupledesc);
6555

6656
pgrx::debug2!(
@@ -71,6 +61,8 @@ impl ParquetWriterContext {
7161
let schema = parse_arrow_schema_from_attributes(&attributes);
7262
let schema = Arc::new(schema);
7363

64+
let writer_props = Self::writer_props(tupledesc, compression, compression_level);
65+
7466
let parquet_writer = parquet_writer_from_uri(&uri, schema.clone(), writer_props);
7567

7668
let attribute_contexts =
@@ -83,6 +75,33 @@ impl ParquetWriterContext {
8375
}
8476
}
8577

78+
fn writer_props(
79+
tupledesc: &PgTupleDesc,
80+
compression: PgParquetCompression,
81+
compression_level: i32,
82+
) -> WriterProperties {
83+
let compression = PgParquetCompressionWithLevel {
84+
compression,
85+
compression_level,
86+
};
87+
88+
let mut writer_props_builder = WriterProperties::builder()
89+
.set_statistics_enabled(EnabledStatistics::Page)
90+
.set_compression(compression.into())
91+
.set_created_by("pg_parquet".to_string());
92+
93+
let geometry_columns_metadata_value = geoparquet_metadata_json_from_tupledesc(tupledesc);
94+
95+
if geometry_columns_metadata_value.is_some() {
96+
let key_value_metadata = KeyValue::new("geo".into(), geometry_columns_metadata_value);
97+
98+
writer_props_builder =
99+
writer_props_builder.set_key_value_metadata(Some(vec![key_value_metadata]));
100+
}
101+
102+
writer_props_builder.build()
103+
}
104+
86105
pub(crate) fn write_new_row_group(
87106
&mut self,
88107
tuples: Vec<Option<PgHeapTuple<AllocatedByRust>>>,

src/pgrx_tests/copy_type_roundtrip.rs

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,20 @@ mod tests {
88
LOCAL_TEST_FILE_PATH,
99
};
1010
use crate::type_compat::fallback_to_text::FallbackToText;
11-
use crate::type_compat::geometry::Geometry;
11+
use crate::type_compat::geometry::{
12+
Geometry, GeometryColumnsMetadata, GeometryEncoding, GeometryType,
13+
};
1214
use crate::type_compat::map::Map;
1315
use crate::type_compat::pg_arrow_type_conversions::{
1416
DEFAULT_UNBOUNDED_NUMERIC_PRECISION, DEFAULT_UNBOUNDED_NUMERIC_SCALE,
1517
};
1618
use pgrx::pg_sys::Oid;
17-
use pgrx::pg_test;
1819
use pgrx::{
1920
composite_type,
2021
datum::{Date, Time, TimeWithTimeZone, Timestamp, TimestampWithTimeZone},
2122
AnyNumeric, Spi,
2223
};
24+
use pgrx::{pg_test, JsonB};
2325

2426
#[pg_test]
2527
fn test_int2() {
@@ -946,6 +948,56 @@ mod tests {
946948
test_table.assert_expected_and_result_rows();
947949
}
948950

951+
#[pg_test]
952+
fn test_geometry_geoparquet_metadata() {
953+
// Skip the test if postgis extension is not available
954+
if !extension_exists("postgis") {
955+
return;
956+
}
957+
958+
let query = "DROP EXTENSION IF EXISTS postgis; CREATE EXTENSION postgis;";
959+
Spi::run(query).unwrap();
960+
961+
let copy_to_query = format!(
962+
"COPY (SELECT ST_GeomFromText('POINT(1 1)')::geometry(point, 4326) as a,
963+
ST_GeomFromText('LINESTRING(0 0, 1 1)')::geometry(linestring, 4326) as b)
964+
TO '{LOCAL_TEST_FILE_PATH}' WITH (format parquet);",
965+
);
966+
Spi::run(copy_to_query.as_str()).unwrap();
967+
968+
// Check geoparquet metadata
969+
let geoparquet_metadata_query = format!(
970+
"select encode(value, 'escape')::jsonb
971+
from parquet.kv_metadata('{LOCAL_TEST_FILE_PATH}')
972+
where encode(key, 'escape') = 'geo';",
973+
);
974+
let geoparquet_metadata_json = Spi::get_one::<JsonB>(geoparquet_metadata_query.as_str())
975+
.unwrap()
976+
.unwrap();
977+
978+
let geoparquet_metadata: GeometryColumnsMetadata =
979+
serde_json::from_value(geoparquet_metadata_json.0).unwrap();
980+
981+
assert_eq!(geoparquet_metadata.version, "1.1.0");
982+
assert_eq!(geoparquet_metadata.primary_column, "a");
983+
assert_eq!(
984+
geoparquet_metadata.columns.get("a").unwrap().encoding,
985+
GeometryEncoding::WKB
986+
);
987+
assert_eq!(
988+
geoparquet_metadata.columns.get("a").unwrap().geometry_types,
989+
vec![GeometryType::Point]
990+
);
991+
assert_eq!(
992+
geoparquet_metadata.columns.get("b").unwrap().encoding,
993+
GeometryEncoding::WKB
994+
);
995+
assert_eq!(
996+
geoparquet_metadata.columns.get("b").unwrap().geometry_types,
997+
vec![GeometryType::LineString]
998+
);
999+
}
1000+
9491001
#[pg_test]
9501002
fn test_complex_composite() {
9511003
Spi::run("CREATE TYPE dog AS (name text, age int);").unwrap();

src/type_compat/geometry.rs

Lines changed: 123 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{ffi::CString, ops::Deref};
1+
use std::{collections::HashMap, ffi::CString, ops::Deref};
22

33
use once_cell::sync::OnceCell;
44
use pgrx::{
@@ -8,8 +8,11 @@ use pgrx::{
88
InvalidOid, LookupFuncName, Oid, OidFunctionCall1Coll, SysCacheIdentifier::TYPENAMENSP,
99
BYTEAOID,
1010
},
11-
FromDatum, IntoDatum, PgList, Spi,
11+
FromDatum, IntoDatum, PgList, PgTupleDesc, Spi,
1212
};
13+
use serde::{Deserialize, Serialize};
14+
15+
use crate::pgrx_utils::{collect_attributes_for, CollectAttributesFor};
1316

1417
// we need to reset the postgis context at each copy start
1518
static mut POSTGIS_CONTEXT: OnceCell<PostgisContext> = OnceCell::new();
@@ -45,6 +48,124 @@ pub(crate) fn is_postgis_geometry_type(typoid: Oid) -> bool {
4548
false
4649
}
4750

51+
#[derive(Debug, PartialEq, Serialize, Deserialize)]
52+
pub(crate) enum GeometryType {
53+
Point,
54+
LineString,
55+
Polygon,
56+
MultiPoint,
57+
MultiLineString,
58+
MultiPolygon,
59+
GeometryCollection,
60+
}
61+
62+
impl GeometryType {
63+
fn from_typmod(typmod: i32) -> Option<Self> {
64+
let geom_type = (typmod & 0x000000FC) >> 2;
65+
66+
match geom_type {
67+
1 => Some(GeometryType::Point),
68+
2 => Some(GeometryType::LineString),
69+
3 => Some(GeometryType::Polygon),
70+
4 => Some(GeometryType::MultiPoint),
71+
5 => Some(GeometryType::MultiLineString),
72+
6 => Some(GeometryType::MultiPolygon),
73+
7 => Some(GeometryType::GeometryCollection),
74+
_ => None,
75+
}
76+
}
77+
}
78+
79+
#[derive(Debug, PartialEq, Serialize, Deserialize)]
80+
pub(crate) enum GeometryEncoding {
81+
// only WKB is supported for now
82+
#[allow(clippy::upper_case_acronyms)]
83+
WKB,
84+
}
85+
86+
#[derive(Debug, Serialize, Deserialize)]
87+
pub(crate) struct GeometryColumn {
88+
pub(crate) encoding: GeometryEncoding,
89+
pub(crate) geometry_types: Vec<GeometryType>,
90+
}
91+
92+
#[derive(Debug, Serialize, Deserialize)]
93+
pub(crate) struct GeometryColumnsMetadata {
94+
pub(crate) version: String,
95+
pub(crate) primary_column: String,
96+
pub(crate) columns: HashMap<String, GeometryColumn>,
97+
}
98+
99+
impl GeometryColumnsMetadata {
100+
fn from_tupledesc(tupledesc: &PgTupleDesc) -> Option<GeometryColumnsMetadata> {
101+
let mut columns = HashMap::new();
102+
let mut primary_column = String::new();
103+
104+
let attributes = collect_attributes_for(CollectAttributesFor::CopyTo, tupledesc);
105+
106+
for attribute in attributes {
107+
let attribute_typoid = attribute.type_oid().value();
108+
109+
if !is_postgis_geometry_type(attribute_typoid) {
110+
continue;
111+
}
112+
113+
let typmod = attribute.type_mod();
114+
115+
let geometry_types = if let Some(geom_type) = GeometryType::from_typmod(typmod) {
116+
vec![geom_type]
117+
} else {
118+
vec![]
119+
};
120+
121+
let encoding = GeometryEncoding::WKB;
122+
123+
let geometry_column = GeometryColumn {
124+
encoding,
125+
geometry_types,
126+
};
127+
128+
let column_name = attribute.name().to_string();
129+
130+
// we use the first geometry column as the primary column
131+
if primary_column.is_empty() {
132+
primary_column = column_name.clone();
133+
}
134+
135+
columns.insert(column_name, geometry_column);
136+
}
137+
138+
if columns.is_empty() {
139+
return None;
140+
}
141+
142+
Some(GeometryColumnsMetadata {
143+
version: "1.1.0".into(),
144+
primary_column,
145+
columns,
146+
})
147+
}
148+
}
149+
150+
// geoparquet_metadata_json_from_tupledesc returns metadata for geometry columns in json format.
151+
// in a format specified by https://geoparquet.org/releases/v1.0.0-rc.1/.
152+
// e.g. "{\"version\":\"1.1.0\",
153+
// \"primary_column\":\"a\",
154+
// \"columns\":{\"a\":{\"encoding\":\"WKB\", \"geometry_types\":[\"Point\"]},
155+
// \"b\":{\"encoding\":\"WKB\", \"geometry_types\":[\"LineString\"]}}}"
156+
pub(crate) fn geoparquet_metadata_json_from_tupledesc(tupledesc: &PgTupleDesc) -> Option<String> {
157+
let geometry_columns_metadata = GeometryColumnsMetadata::from_tupledesc(tupledesc);
158+
159+
geometry_columns_metadata.map(|metadata| {
160+
serde_json::to_string(&metadata).unwrap_or_else(|_| {
161+
panic!(
162+
"failed to serialize geometry columns metadata {:?}",
163+
metadata
164+
)
165+
})
166+
})
167+
}
168+
48169
#[derive(Debug, PartialEq, Clone)]
49170
struct PostgisContext {
50171
geometry_typoid: Option<Oid>,

0 commit comments

Comments
 (0)