Skip to content

Commit 44f4190

Browse files
committed
Add a C abi safe function to dest api to get bytes written so far
1 parent f49dcac commit 44f4190

File tree

3 files changed

+39
-0
lines changed

3 files changed

+39
-0
lines changed

src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ mod type_compat;
1919
pub use crate::arrow_parquet::compression::PgParquetCompression;
2020
#[allow(unused_imports)]
2121
pub use crate::parquet_copy_hook::copy_to_split_dest_receiver::create_copy_to_parquet_split_dest_receiver;
22+
#[allow(unused_imports)]
23+
pub use crate::parquet_copy_hook::copy_to_split_dest_receiver::parquet_split_dest_receiver_bytes_written;
2224

2325
pgrx::pg_module_magic!();
2426

src/parquet_copy_hook/copy_to_dest_receiver.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,10 @@ impl CopyToParquetDestReceiver {
9191
}
9292

9393
pub(crate) fn collected_bytes(&self) -> usize {
94+
if self.parquet_writer_context.is_null() {
95+
return 0;
96+
}
97+
9498
let current_parquet_writer_context = unsafe {
9599
self.parquet_writer_context
96100
.as_ref()

src/parquet_copy_hook/copy_to_split_dest_receiver.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ struct CopyToParquetSplitDestReceiver {
2727
options: CopyToParquetOptions,
2828
current_child_id: i64,
2929
current_child_receiver: *mut CopyToParquetDestReceiver,
30+
bytes_written: u64,
3031
}
3132

3233
#[repr(C)]
@@ -90,6 +91,11 @@ impl CopyToParquetSplitDestReceiver {
9091
}
9192
}
9293

94+
self.bytes_written += unsafe {
95+
PgBox::<CopyToParquetDestReceiver>::from_pg(self.current_child_receiver as _)
96+
.collected_bytes() as u64
97+
};
98+
9399
self.current_child_receiver = std::ptr::null_mut();
94100
}
95101

@@ -140,6 +146,18 @@ impl CopyToParquetSplitDestReceiver {
140146

141147
child_uri.to_str().expect("invalid uri").as_pg_cstr()
142148
}
149+
150+
fn bytes_written(&self) -> u64 {
151+
if self.current_child_receiver.is_null() {
152+
return self.bytes_written;
153+
}
154+
155+
let child_parquet_dest = unsafe {
156+
PgBox::<CopyToParquetDestReceiver>::from_pg(self.current_child_receiver as _)
157+
};
158+
159+
self.bytes_written + child_parquet_dest.collected_bytes() as u64
160+
}
143161
}
144162

145163
#[pg_guard]
@@ -265,6 +283,21 @@ pub extern "C" fn create_copy_to_parquet_split_dest_receiver(
265283
split_dest.operation = -1;
266284
split_dest.options = options;
267285
split_dest.current_child_id = 0;
286+
split_dest.bytes_written = 0;
268287

269288
unsafe { std::mem::transmute(split_dest) }
270289
}
290+
291+
// parquet_split_dest_receiver_bytes_written returns the total number of bytes written to parquet files
292+
// so far. This includes all child receivers that have been created and flushed.
293+
#[pg_guard]
294+
#[no_mangle]
295+
pub extern "C" fn parquet_split_dest_receiver_bytes_written(dest: *const DestReceiver) -> u64 {
296+
let split_parquet_dest = unsafe {
297+
(dest as *const CopyToParquetSplitDestReceiver)
298+
.as_ref()
299+
.expect("invalid split parquet dest receiver ptr")
300+
};
301+
302+
split_parquet_dest.bytes_written()
303+
}

0 commit comments

Comments
 (0)