Skip to content

Commit 5bbb5cb

Browse files
committed
copy to/from stdout/stdin with (format parquet)
1 parent 1b5878d commit 5bbb5cb

File tree

14 files changed

+415
-38
lines changed

14 files changed

+415
-38
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ serde = "1"
4646
serde_json = "1"
4747
tokio = {version = "1", default-features = false, features = ["rt", "time", "macros"]}
4848
url = "2"
49+
uuid = { version = "1", default-features = false, features = ["v4"] }
4950

5051
[dev-dependencies]
5152
pgrx-tests = "=0.13.1"

src/arrow_parquet/parquet_writer.rs

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,8 @@ impl ParquetWriterContext {
6464

6565
let writer_props = Self::writer_props(tupledesc, compression, compression_level);
6666

67-
let parquet_writer = parquet_writer_from_uri(uri_info, schema.clone(), writer_props);
67+
let parquet_writer =
68+
parquet_writer_from_uri(uri_info.clone(), schema.clone(), writer_props);
6869

6970
let attribute_contexts =
7071
collect_pg_to_arrow_attribute_contexts(&attributes, &schema.fields);
@@ -121,6 +122,14 @@ impl ParquetWriterContext {
121122
.unwrap_or_else(|e| panic!("failed to flush record batch: {}", e));
122123
}
123124

125+
pub(crate) fn write_finish(&mut self) {
126+
PG_BACKEND_TOKIO_RUNTIME
127+
.block_on(self.parquet_writer.finish())
128+
.unwrap_or_else(|e| {
129+
panic!("failed to close parquet writer: {}", e);
130+
});
131+
}
132+
124133
fn pg_tuples_to_record_batch(
125134
tuples: Vec<Option<PgHeapTuple<AllocatedByRust>>>,
126135
attribute_contexts: &[PgToArrowAttributeContext],
@@ -137,13 +146,3 @@ impl ParquetWriterContext {
137146
RecordBatch::try_new(schema, attribute_arrays).expect("Expected record batch")
138147
}
139148
}
140-
141-
impl Drop for ParquetWriterContext {
142-
fn drop(&mut self) {
143-
PG_BACKEND_TOKIO_RUNTIME
144-
.block_on(self.parquet_writer.finish())
145-
.unwrap_or_else(|e| {
146-
panic!("failed to close parquet writer: {}", e);
147-
});
148-
}
149-
}

src/arrow_parquet/uri_utils.rs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{panic, sync::Arc};
1+
use std::{env::temp_dir, panic, sync::Arc};
22

33
use arrow::datatypes::SchemaRef;
44
use object_store::{path::Path, ObjectStoreScheme};
@@ -16,6 +16,7 @@ use pgrx::{
1616
pg_sys::{get_role_oid, has_privs_of_role, superuser, AsPgCStr, GetUserId},
1717
};
1818
use url::Url;
19+
use uuid::Uuid;
1920

2021
use crate::{
2122
arrow_parquet::parquet_writer::DEFAULT_ROW_GROUP_SIZE,
@@ -36,9 +37,21 @@ pub(crate) struct ParsedUriInfo {
3637
pub(crate) bucket: Option<String>,
3738
pub(crate) path: Path,
3839
pub(crate) scheme: ObjectStoreScheme,
40+
pub(crate) is_stdio: bool,
3941
}
4042

4143
impl ParsedUriInfo {
44+
fn for_stdout() -> Self {
45+
let path = temp_dir().join(format!("pg_parquet_{}", Uuid::new_v4()));
46+
47+
let mut parsed_uri = Self::try_from(path.to_str().expect("invalid temp path"))
48+
.unwrap_or_else(|e| panic!("{}", e));
49+
50+
parsed_uri.is_stdio = true;
51+
52+
parsed_uri
53+
}
54+
4255
fn try_parse_uri(uri: &str) -> Result<Url, String> {
4356
if !uri.contains("://") {
4457
// local file
@@ -79,6 +92,10 @@ impl TryFrom<&str> for ParsedUriInfo {
7992
type Error = String;
8093

8194
fn try_from(uri: &str) -> Result<Self, Self::Error> {
95+
if uri == "std" {
96+
return Ok(Self::for_stdout());
97+
}
98+
8299
let uri = Self::try_parse_uri(uri)?;
83100

84101
let (scheme, path) = Self::try_parse_scheme(&uri)?;
@@ -90,6 +107,7 @@ impl TryFrom<&str> for ParsedUriInfo {
90107
bucket,
91108
path,
92109
scheme,
110+
is_stdio: false,
93111
})
94112
}
95113
}

src/object_store/local_file.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,11 @@ pub(crate) fn create_local_file_object_store(
1414

1515
if !copy_from {
1616
// create or overwrite the local file
17-
std::fs::OpenOptions::new()
17+
let _file = std::fs::OpenOptions::new()
1818
.write(true)
1919
.truncate(true)
2020
.create(true)
21-
.open(path)
21+
.open(&path)
2222
.unwrap_or_else(|e| panic!("{}", e));
2323
}
2424

src/object_store/object_store_cache.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ impl ObjectStoreCache {
5353
path,
5454
scheme,
5555
bucket,
56+
..
5657
} = uri_info;
5758

5859
// no need to cache local files

src/parquet_copy_hook.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
pub(crate) mod copy_from;
2+
pub(crate) mod copy_from_stdin;
23
pub(crate) mod copy_to;
34
pub(crate) mod copy_to_dest_receiver;
5+
pub(crate) mod copy_to_stdout;
46
pub(crate) mod copy_utils;
57
pub(crate) mod hook;
68
pub(crate) mod pg_compat;

src/parquet_copy_hook/copy_from.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,12 @@ use crate::{
1818
},
1919
};
2020

21-
use super::copy_utils::{
22-
copy_from_stmt_match_by, copy_stmt_attribute_list, copy_stmt_create_namespace_item,
23-
copy_stmt_create_parse_state, create_filtered_tupledesc_for_relation,
21+
use super::{
22+
copy_from_stdin::copy_stdin_to_file,
23+
copy_utils::{
24+
copy_from_stmt_match_by, copy_stmt_attribute_list, copy_stmt_create_namespace_item,
25+
copy_stmt_create_parse_state, create_filtered_tupledesc_for_relation,
26+
},
2427
};
2528

2629
// stack to store parquet reader contexts for COPY FROM.
@@ -140,6 +143,11 @@ pub(crate) fn execute_copy_from(
140143
let match_by = copy_from_stmt_match_by(p_stmt);
141144

142145
unsafe {
146+
if uri_info.is_stdio {
147+
let is_binary = true;
148+
copy_stdin_to_file(uri_info.clone(), tupledesc.natts as _, is_binary);
149+
}
150+
143151
// parquet reader context is used throughout the COPY FROM operation.
144152
let parquet_reader_context = ParquetReaderContext::new(uri_info, match_by, &tupledesc);
145153
push_parquet_reader_context(parquet_reader_context);
Lines changed: 216 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,216 @@
1+
use std::{ffi::CStr, io::Write};
2+
3+
use pgrx::{
4+
ffi::c_char,
5+
pg_sys::{
6+
makeStringInfo, pq_beginmessage, pq_copymsgbytes, pq_endmessage, pq_getmsgstring,
7+
pq_sendbyte, pq_sendint16, QueryCancelHoldoffCount, StringInfo,
8+
},
9+
};
10+
11+
use crate::arrow_parquet::uri_utils::{uri_as_string, ParsedUriInfo};
12+
13+
/*
14+
* CopyFromStdinState is a simplified version of CopyFromState
15+
* in PostgreSQL to use in ReceiveDataFromClient with names
16+
* preserved.
17+
*/
18+
struct CopyFromStdinState {
19+
/* buffer in which we store incoming bytes */
20+
fe_msgbuf: StringInfo,
21+
22+
/* whether we reached the end-of-file */
23+
raw_reached_eof: bool,
24+
}
25+
26+
const MAX_READ_SIZE: usize = 65536;
27+
28+
/*
29+
* CopyInputToFile copies data from the socket to the given file.
30+
* We request the client send a specific column count.
31+
*/
32+
pub(crate) unsafe fn copy_stdin_to_file(uri_info: ParsedUriInfo, natts: i16, is_binary: bool) {
33+
let mut cstate = CopyFromStdinState {
34+
fe_msgbuf: makeStringInfo(),
35+
raw_reached_eof: false,
36+
};
37+
38+
/* open the destination file for writing */
39+
let path = uri_as_string(&uri_info.uri);
40+
41+
// create or overwrite the local file
42+
let mut file = std::fs::OpenOptions::new()
43+
.write(true)
44+
.truncate(true)
45+
.create(true)
46+
.open(&path)
47+
.unwrap_or_else(|e| panic!("{}", e));
48+
49+
/* tell the client we are ready for data */
50+
send_copy_in_begin(natts, is_binary);
51+
52+
/* allocate on the heap since it's quite big */
53+
let mut receive_buffer = vec![0u8; MAX_READ_SIZE];
54+
55+
while !cstate.raw_reached_eof {
56+
/* copy some bytes from the client into fe_msgbuf */
57+
let bytes_read = receive_data_from_client(&mut cstate, &mut receive_buffer);
58+
59+
if bytes_read == 0 {
60+
break;
61+
}
62+
63+
if bytes_read > 0 {
64+
/* copy bytes from fe_msgbuf to the destination file */
65+
file.write_all(&receive_buffer[..bytes_read])
66+
.unwrap_or_else(|e| {
67+
panic!("could not write to file: {}", e);
68+
});
69+
}
70+
}
71+
}
72+
73+
/*
74+
* send_copy_in_begin sends the CopyInResponse message that the client
75+
* expects after a COPY .. FROM STDIN.
76+
*
77+
* This code is adapted from ReceiveCopyBegin in PostgreSQL.
78+
*/
79+
unsafe fn send_copy_in_begin(natts: i16, is_binary: bool) {
80+
let buf = makeStringInfo();
81+
82+
pq_beginmessage(buf, 'G' as _);
83+
84+
let copy_format = if is_binary { 1 } else { 0 };
85+
pq_sendbyte(buf, copy_format);
86+
87+
pq_sendint16(buf, natts as _);
88+
for _ in 0..natts {
89+
/* use the same format for all columns */
90+
pq_sendint16(buf, copy_format as _);
91+
}
92+
93+
pq_endmessage(buf);
94+
((*PqCommMethods).flush)();
95+
}
96+
97+
const PQ_LARGE_MESSAGE_LIMIT: i32 = 1024 * 1024 * 1024 - 3;
98+
const PQ_SMALL_MESSAGE_LIMIT: i32 = 10000;
99+
100+
unsafe fn receive_data_from_client(
101+
cstate: &mut CopyFromStdinState,
102+
receive_buffer: &mut [u8],
103+
) -> usize {
104+
let mut databuf = receive_buffer;
105+
106+
let minread = 1;
107+
let mut maxread = MAX_READ_SIZE;
108+
109+
let mut bytesread = 0;
110+
111+
while maxread > 0 && bytesread < minread && !cstate.raw_reached_eof {
112+
let mut avail;
113+
let mut flushed = false;
114+
115+
while flushed || (*cstate.fe_msgbuf).cursor >= (*cstate.fe_msgbuf).len {
116+
/* Try to receive another message */
117+
118+
QueryCancelHoldoffCount += 1;
119+
120+
pq_startmsgread();
121+
122+
let mtype = pq_getbyte();
123+
if mtype == -1 {
124+
panic!("unexpected EOF on client connection with an open transaction");
125+
}
126+
127+
/* Validate message type and set packet size limit */
128+
let maxmsglen = match mtype as u8 as char {
129+
'd' =>
130+
/* CopyData */
131+
{
132+
PQ_LARGE_MESSAGE_LIMIT
133+
}
134+
'c' | 'f' | 'H' | 'S' =>
135+
/* CopyDone, CopyFail, Flush, Sync */
136+
{
137+
PQ_SMALL_MESSAGE_LIMIT
138+
}
139+
_ => {
140+
panic!(
141+
"unexpected message type 0x{:02X} during COPY from stdin",
142+
mtype
143+
);
144+
}
145+
};
146+
147+
/* Now collect the message body */
148+
if pq_getmessage(cstate.fe_msgbuf, maxmsglen) != 0 {
149+
panic!("unexpected EOF on client connection with an open transaction");
150+
}
151+
152+
QueryCancelHoldoffCount -= 1;
153+
154+
/* ... and process it */
155+
match mtype as u8 as char {
156+
'd' => {
157+
/* CopyData */
158+
break;
159+
}
160+
'c' => {
161+
/* CopyDone */
162+
cstate.raw_reached_eof = true;
163+
return bytesread;
164+
}
165+
'f' => {
166+
/* CopyFail */
167+
let msg = pq_getmsgstring(cstate.fe_msgbuf);
168+
let msg = CStr::from_ptr(msg).to_str().expect("invalid CStr");
169+
panic!("COPY from stdin failed: {msg}");
170+
}
171+
'H' | 'S' => {
172+
/* Flush, Sync */
173+
flushed = true;
174+
continue;
175+
}
176+
_ => {
177+
panic!(
178+
"unexpected message type 0x{:02X} during COPY from stdin",
179+
mtype
180+
);
181+
}
182+
}
183+
}
184+
185+
avail = ((*cstate.fe_msgbuf).len - (*cstate.fe_msgbuf).cursor) as _;
186+
if avail > maxread {
187+
avail = maxread;
188+
}
189+
190+
pq_copymsgbytes(cstate.fe_msgbuf, databuf.as_mut_ptr() as _, avail as _);
191+
databuf = &mut databuf[avail..];
192+
maxread -= avail;
193+
bytesread += avail;
194+
}
195+
196+
bytesread
197+
}
198+
199+
// todo: move to pgrx (include libpq.h)
200+
#[repr(C)]
201+
struct PQcommMethods {
202+
comm_reset: unsafe extern "C" fn(),
203+
flush: unsafe extern "C" fn() -> i32,
204+
flush_if_writable: unsafe extern "C" fn() -> i32,
205+
is_send_pending: unsafe extern "C" fn() -> bool,
206+
putmessage: unsafe extern "C" fn(msgtype: u32, s: *const c_char, len: usize) -> i32,
207+
putmessage_noblock: unsafe extern "C" fn(msgtype: u32, s: *const c_char, len: usize),
208+
}
209+
210+
unsafe extern "C" {
211+
fn pq_startmsgread();
212+
fn pq_getmessage(s: StringInfo, maxlen: i32) -> i32;
213+
fn pq_getbyte() -> i32;
214+
215+
static PqCommMethods: *mut PQcommMethods;
216+
}

src/parquet_copy_hook/copy_to.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,10 @@ pub(crate) fn execute_copy_to_with_dest_receiver(
102102
&mut completion_tag as _,
103103
);
104104

105+
if let Some(destroy_callback) = parquet_dest.rDestroy {
106+
destroy_callback(parquet_dest.as_ptr());
107+
}
108+
105109
PortalDrop(portal.as_ptr(), false);
106110

107111
completion_tag.nprocessed

0 commit comments

Comments
 (0)