Skip to content

Commit cfb3d2c

Browse files
committed
address
1 parent cf7add3 commit cfb3d2c

File tree

4 files changed

+242
-134
lines changed

4 files changed

+242
-134
lines changed

README.md

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -157,25 +157,26 @@ SELECT uri, encode(key, 'escape') as key, encode(value, 'escape') as value FROM
157157
```
158158

159159
### Inspect Parquet column statistics
160+
160161
You can call `SELECT * FROM parquet.column_stats(<uri>)` to discover the column statistics of the Parquet file, such as min and max value for the column, at given uri.
161162

162163
```sql
163164
SELECT * FROM parquet.column_stats('/tmp/product_example.parquet')
164-
field_id | stats_min | stats_max | stats_null_count | stats_distinct_count
165-
----------+----------------------------+----------------------------+------------------+----------------------
166-
19 | 2022-05-01 16:00:00 | 2022-05-01 16:00:00 | 0 |
167-
15 | | | 2 |
168-
3 | product 1 | product 1 | 0 |
169-
2 | 1 | 1 | 0 |
170-
0 | 1 | 1 | 0 |
171-
6 | 1 | 2 | 1 |
172-
7 | item 1 | item 2 | 1 |
173-
16 | | | 2 |
174-
12 | | | 2 |
175-
18 | 2025-01-29 02:28:35.193773 | 2025-01-29 02:28:35.193773 | 0 |
176-
11 | 1 | 1 | 1 |
177-
8 | 1 | 2 | 1 |
178-
17 | | | 2 |
165+
column_id | field_id | stats_min | stats_max | stats_null_count | stats_distinct_count
166+
-----------+----------+----------------------------+----------------------------+------------------+----------------------
167+
4 | 7 | item 1 | item 2 | 1 |
168+
6 | 11 | 1 | 1 | 1 |
169+
7 | 12 | | | 2 |
170+
10 | 17 | | | 2 |
171+
0 | 0 | 1 | 1 | 0 |
172+
11 | 18 | 2025-03-11 14:01:22.045739 | 2025-03-11 14:01:22.045739 | 0 |
173+
3 | 6 | 1 | 2 | 1 |
174+
12 | 19 | 2022-05-01 19:00:00+03 | 2022-05-01 19:00:00+03 | 0 |
175+
8 | 15 | | | 2 |
176+
5 | 8 | 1 | 2 | 1 |
177+
9 | 16 | | | 2 |
178+
1 | 2 | 1 | 1 | 0 |
179+
2 | 3 | product 1 | product 1 | 0 |
179180
(13 rows)
180181
```
181182

src/parquet_udfs/metadata.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
use pgrx::{iter::TableIterator, name, pg_extern, pg_schema};
22

3-
use crate::arrow_parquet::uri_utils::{
4-
ensure_access_privilege_to_uri, parquet_metadata_from_uri, uri_as_string, ParsedUriInfo,
3+
use crate::{
4+
arrow_parquet::uri_utils::{
5+
ensure_access_privilege_to_uri, parquet_metadata_from_uri, uri_as_string, ParsedUriInfo,
6+
},
7+
parquet_udfs::stats::{stats_max_value_to_pg_str, stats_min_value_to_pg_str},
58
};
69

710
#[pg_schema]
811
mod parquet {
9-
use crate::parquet_udfs::stats::{stats_max_value_to_pg_str, stats_min_value_to_pg_str};
10-
1112
use super::*;
1213

1314
#[pg_extern]

src/parquet_udfs/stats.rs

Lines changed: 123 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,12 @@ use pgrx::{
99
iter::TableIterator,
1010
name, pg_extern, pg_schema,
1111
pg_sys::{getTypeOutputInfo, InvalidOid, OidOutputFunctionCall},
12-
IntoDatum,
12+
IntoDatum, Uuid,
1313
};
1414

1515
use crate::{
1616
arrow_parquet::uri_utils::{
17-
ensure_access_privilege_to_uri, parquet_metadata_from_uri, parse_uri,
17+
ensure_access_privilege_to_uri, parquet_metadata_from_uri, ParsedUriInfo,
1818
},
1919
type_compat::pg_arrow_type_conversions::{
2020
i128_to_numeric, i32_to_date, i64_to_time, i64_to_timestamp, i64_to_timestamptz,
@@ -26,137 +26,147 @@ use crate::{
2626
mod parquet {
2727
use super::*;
2828

29+
struct AggregatedColumnStatsByRowGroup<'a> {
30+
column_id: i32,
31+
column_descriptor: &'a ColumnDescriptor,
32+
field_id: Option<i32>,
33+
stats: Vec<Option<&'a Statistics>>,
34+
}
35+
36+
impl AggregatedColumnStatsByRowGroup<'_> {
37+
fn min(&self) -> Option<String> {
38+
let mut min_value = None;
39+
40+
for stat in self.stats.iter().flatten() {
41+
if let Some(current_min_value) = min_value {
42+
min_value = Some(stats_min_of_two(current_min_value, stat));
43+
} else {
44+
min_value = Some(*stat);
45+
}
46+
}
47+
48+
min_value.and_then(|v| stats_min_value_to_pg_str(v, self.column_descriptor))
49+
}
50+
51+
fn max(&self) -> Option<String> {
52+
let mut max_value = None;
53+
54+
for stat in self.stats.iter().flatten() {
55+
if let Some(current_max_value) = max_value {
56+
max_value = Some(stats_max_of_two(current_max_value, stat));
57+
} else {
58+
max_value = Some(*stat);
59+
}
60+
}
61+
62+
max_value.and_then(|v| stats_max_value_to_pg_str(v, self.column_descriptor))
63+
}
64+
65+
fn null_count(&self) -> Option<i64> {
66+
let mut null_count_sum = None;
67+
68+
for stat in self.stats.iter().flatten() {
69+
if let Some(null_count) = stat.null_count_opt() {
70+
null_count_sum = match null_count_sum {
71+
Some(sum) => Some(sum + null_count as i64),
72+
None => Some(null_count as i64),
73+
};
74+
}
75+
}
76+
77+
null_count_sum
78+
}
79+
80+
fn distinct_count(&self) -> Option<i64> {
81+
let mut distinct_count_sum = None;
82+
83+
for stat in self.stats.iter().flatten() {
84+
if let Some(distinct_count) = stat.distinct_count_opt() {
85+
distinct_count_sum = match distinct_count_sum {
86+
Some(sum) => Some(sum + distinct_count as i64),
87+
None => Some(distinct_count as i64),
88+
};
89+
}
90+
}
91+
92+
distinct_count_sum
93+
}
94+
}
95+
2996
#[pg_extern]
3097
#[allow(clippy::type_complexity)]
3198
fn column_stats(
3299
uri: String,
33100
) -> TableIterator<
34101
'static,
35102
(
36-
name!(field_id, i32),
103+
name!(column_id, i32),
104+
name!(field_id, Option<i32>),
37105
name!(stats_min, Option<String>),
38106
name!(stats_max, Option<String>),
39107
name!(stats_null_count, Option<i64>),
40108
name!(stats_distinct_count, Option<i64>),
41109
),
42110
> {
43-
let uri = parse_uri(&uri);
111+
let uri_info = ParsedUriInfo::try_from(uri.as_str()).unwrap_or_else(|e| {
112+
panic!("{}", e.to_string());
113+
});
114+
115+
let uri = uri_info.uri.clone();
44116

45117
ensure_access_privilege_to_uri(&uri, true);
46-
let parquet_metadata = parquet_metadata_from_uri(&uri);
118+
let parquet_metadata = parquet_metadata_from_uri(uri_info);
47119

48-
let mut column_stats = HashMap::new();
49-
let mut column_descriptors = HashMap::new();
120+
let mut aggregated_column_stats = HashMap::new();
50121

51122
for row_group in parquet_metadata.row_groups().iter() {
52-
for column in row_group.columns().iter() {
53-
if !column
123+
for (column_id, column) in row_group.columns().iter().enumerate() {
124+
let field_id = if column
54125
.column_descr_ptr()
55126
.self_type()
56127
.get_basic_info()
57128
.has_id()
58129
{
59-
continue;
60-
}
130+
Some(column.column_descr_ptr().self_type().get_basic_info().id())
131+
} else {
132+
None
133+
};
61134

62-
let field_id = column.column_descr_ptr().self_type().get_basic_info().id();
135+
let column_descriptor = column.column_descr();
63136

64-
column_descriptors
65-
.entry(field_id)
66-
.or_insert(column.column_descr());
137+
let column_stats = column.statistics();
67138

68139
// column statistics exist for each leaf column per row group
69-
column_stats
70-
.entry(field_id)
71-
.or_insert_with(Vec::new)
72-
.push(column.statistics());
140+
aggregated_column_stats
141+
.entry(column_id)
142+
.or_insert(AggregatedColumnStatsByRowGroup {
143+
column_id: column_id as _,
144+
column_descriptor,
145+
field_id,
146+
stats: vec![],
147+
})
148+
.stats
149+
.push(column_stats);
73150
}
74151
}
75152

76153
let mut stats_rows = Vec::new();
77154

78-
for (field_id, stats) in column_stats.iter_mut() {
79-
let column_descriptor = column_descriptors
80-
.get(field_id)
81-
.expect("column descriptor not found");
82-
155+
for aggregated_column_stats in aggregated_column_stats.into_values() {
83156
stats_rows.push((
84-
*field_id,
85-
stats_min_value_aggregated_by_row_groups(stats, column_descriptor),
86-
stats_max_value_aggregated_by_row_groups(stats, column_descriptor),
87-
stats_null_count_aggregated_by_row_groups(stats),
88-
stats_distinct_count_aggregated_by_row_groups(stats),
157+
aggregated_column_stats.column_id,
158+
aggregated_column_stats.field_id,
159+
aggregated_column_stats.min(),
160+
aggregated_column_stats.max(),
161+
aggregated_column_stats.null_count(),
162+
aggregated_column_stats.distinct_count(),
89163
));
90164
}
91165

92166
TableIterator::new(stats_rows)
93167
}
94168
}
95169

96-
fn stats_null_count_aggregated_by_row_groups(stats: &[Option<&Statistics>]) -> Option<i64> {
97-
let mut null_count_sum = None;
98-
99-
for stat in stats.iter().flatten() {
100-
if let Some(null_count) = stat.null_count_opt() {
101-
null_count_sum = match null_count_sum {
102-
Some(sum) => Some(sum + null_count as i64),
103-
None => Some(null_count as i64),
104-
};
105-
}
106-
}
107-
108-
null_count_sum
109-
}
110-
111-
fn stats_distinct_count_aggregated_by_row_groups(stats: &[Option<&Statistics>]) -> Option<i64> {
112-
let mut distinct_count_sum = None;
113-
114-
for stat in stats.iter().flatten() {
115-
if let Some(distinct_count) = stat.distinct_count_opt() {
116-
distinct_count_sum = match distinct_count_sum {
117-
Some(sum) => Some(sum + distinct_count as i64),
118-
None => Some(distinct_count as i64),
119-
};
120-
}
121-
}
122-
123-
distinct_count_sum
124-
}
125-
126-
fn stats_min_value_aggregated_by_row_groups(
127-
row_group_stats: &[Option<&Statistics>],
128-
column_descriptor: &ColumnDescriptor,
129-
) -> Option<String> {
130-
let mut min_value = None;
131-
132-
for stat in row_group_stats.iter().flatten() {
133-
if let Some(current_min_value) = min_value {
134-
min_value = Some(stats_min_of_two(current_min_value, stat));
135-
} else {
136-
min_value = Some(*stat);
137-
}
138-
}
139-
140-
min_value.and_then(|v| stats_min_value_to_pg_str(v, column_descriptor))
141-
}
142-
143-
fn stats_max_value_aggregated_by_row_groups(
144-
row_group_stats: &[Option<&Statistics>],
145-
column_descriptor: &ColumnDescriptor,
146-
) -> Option<String> {
147-
let mut max_value = None;
148-
149-
for stat in row_group_stats.iter().flatten() {
150-
if let Some(current_max_value) = max_value {
151-
max_value = Some(stats_max_of_two(current_max_value, stat));
152-
} else {
153-
max_value = Some(*stat);
154-
}
155-
}
156-
157-
max_value.and_then(|v| stats_max_value_to_pg_str(v, column_descriptor))
158-
}
159-
160170
pub(crate) fn stats_min_value_to_pg_str(
161171
statistics: &Statistics,
162172
column_descriptor: &ColumnDescriptor,
@@ -168,6 +178,11 @@ pub(crate) fn stats_min_value_to_pg_str(
168178
let is_string = matches!(logical_type, Some(LogicalType::String))
169179
|| matches!(converted_type, ConvertedType::UTF8);
170180

181+
let is_json = matches!(logical_type, Some(LogicalType::Json))
182+
|| matches!(converted_type, ConvertedType::JSON);
183+
184+
let is_uuid = matches!(logical_type, Some(LogicalType::Uuid));
185+
171186
let is_date = matches!(logical_type, Some(LogicalType::Date))
172187
|| matches!(converted_type, ConvertedType::DATE);
173188

@@ -236,7 +251,7 @@ pub(crate) fn stats_min_value_to_pg_str(
236251
Statistics::Float(statistics) => statistics.min_opt().map(|v| v.to_string()),
237252
Statistics::Double(statistics) => statistics.min_opt().map(|v| v.to_string()),
238253
Statistics::ByteArray(statistics) => statistics.min_opt().map(|v| {
239-
if is_string {
254+
if is_string || is_json {
240255
v.as_utf8()
241256
.unwrap_or_else(|e| panic!("cannot convert stats to utf8 {e}"))
242257
.to_string()
@@ -258,6 +273,10 @@ pub(crate) fn stats_min_value_to_pg_str(
258273
let numeric = i128::from_be_bytes(numeric_bytes);
259274

260275
pg_format_numeric(numeric, column_descriptor)
276+
} else if is_uuid {
277+
let uuid = Uuid::from_slice(v.data()).expect("Invalid Uuid");
278+
279+
pg_format(uuid)
261280
} else {
262281
hex_encode(v.data())
263282
}
@@ -276,6 +295,11 @@ pub(crate) fn stats_max_value_to_pg_str(
276295
let is_string = matches!(logical_type, Some(LogicalType::String))
277296
|| matches!(converted_type, ConvertedType::UTF8);
278297

298+
let is_json = matches!(logical_type, Some(LogicalType::Json))
299+
|| matches!(converted_type, ConvertedType::JSON);
300+
301+
let is_uuid = matches!(logical_type, Some(LogicalType::Uuid));
302+
279303
let is_date = matches!(logical_type, Some(LogicalType::Date))
280304
|| matches!(converted_type, ConvertedType::DATE);
281305

@@ -344,7 +368,7 @@ pub(crate) fn stats_max_value_to_pg_str(
344368
Statistics::Float(statistics) => statistics.max_opt().map(|v| v.to_string()),
345369
Statistics::Double(statistics) => statistics.max_opt().map(|v| v.to_string()),
346370
Statistics::ByteArray(statistics) => statistics.max_opt().map(|v| {
347-
if is_string {
371+
if is_string || is_json {
348372
v.as_utf8()
349373
.unwrap_or_else(|e| panic!("cannot convert stats to utf8 {e}"))
350374
.to_string()
@@ -366,6 +390,10 @@ pub(crate) fn stats_max_value_to_pg_str(
366390
let numeric = i128::from_be_bytes(numeric_bytes);
367391

368392
pg_format_numeric(numeric, column_descriptor)
393+
} else if is_uuid {
394+
let uuid = Uuid::from_slice(v.data()).expect("Invalid Uuid");
395+
396+
pg_format(uuid)
369397
} else {
370398
hex_encode(v.data())
371399
}

0 commit comments

Comments
 (0)