Skip to content

Commit d00ec14

Browse files
committed
copy to/from stdout/stdin with (format parquet)
1 parent 0de2de1 commit d00ec14

File tree

18 files changed

+691
-55
lines changed

18 files changed

+691
-55
lines changed

.devcontainer/.env

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,7 @@ GOOGLE_SERVICE_ACCOUNT_KEY='{"gcs_base_url": "http://localhost:4443","disable_oa
2828
GOOGLE_SERVICE_ENDPOINT=http://localhost:4443
2929

3030
# Others
31+
# run pgrx tests with a single thread to avoid race conditions
3132
RUST_TEST_THREADS=1
33+
# pgrx runs test on the port base_port + pg_version
34+
PGRX_TEST_PG_BASE_PORT=5454

.github/workflows/ci.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,8 @@ jobs:
9494
- name: Install and configure pgrx
9595
run: |
9696
cargo install --locked [email protected]
97-
cargo pgrx init --pg${{ env.PG_MAJOR }} /usr/lib/postgresql/${{ env.PG_MAJOR }}/bin/pg_config
97+
cargo pgrx init --pg${{ env.PG_MAJOR }} /usr/lib/postgresql/${{ env.PG_MAJOR }}/bin/pg_config \
98+
--base-testing-port $PGRX_TEST_PG_BASE_PORT
9899
99100
- name: Install cargo-llvm-cov for coverage report
100101
run: cargo install --locked [email protected]

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ serde = "1"
4646
serde_json = "1"
4747
tokio = {version = "1", default-features = false, features = ["rt", "time", "macros"]}
4848
url = "2"
49+
uuid = { version = "1", default-features = false, features = ["v4"] }
4950

5051
[dev-dependencies]
5152
pgrx-tests = "=0.13.1"

src/arrow_parquet/parquet_writer.rs

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ impl ParquetWriterContext {
130130
}
131131

132132
// finalize flushes the in progress rows to a new row group and finally writes metadata to the file.
133-
fn finalize(&mut self) {
133+
pub(crate) fn finalize(&mut self) {
134134
PG_BACKEND_TOKIO_RUNTIME
135135
.block_on(self.parquet_writer.finish())
136136
.unwrap_or_else(|e| panic!("failed to finish parquet writer: {}", e));
@@ -156,9 +156,3 @@ impl ParquetWriterContext {
156156
RecordBatch::try_new(schema, attribute_arrays).expect("Expected record batch")
157157
}
158158
}
159-
160-
impl Drop for ParquetWriterContext {
161-
fn drop(&mut self) {
162-
self.finalize();
163-
}
164-
}

src/arrow_parquet/uri_utils.rs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{panic, sync::Arc};
1+
use std::{env::temp_dir, panic, sync::Arc};
22

33
use arrow::datatypes::SchemaRef;
44
use object_store::{path::Path, ObjectStoreScheme};
@@ -16,6 +16,7 @@ use pgrx::{
1616
pg_sys::{get_role_oid, has_privs_of_role, superuser, AsPgCStr, GetUserId},
1717
};
1818
use url::Url;
19+
use uuid::Uuid;
1920

2021
use crate::{
2122
object_store::{
@@ -35,9 +36,21 @@ pub(crate) struct ParsedUriInfo {
3536
pub(crate) bucket: Option<String>,
3637
pub(crate) path: Path,
3738
pub(crate) scheme: ObjectStoreScheme,
39+
pub(crate) is_stdio: bool,
3840
}
3941

4042
impl ParsedUriInfo {
43+
fn for_stdout() -> Self {
44+
let path = temp_dir().join(format!("pg_parquet_{}", Uuid::new_v4()));
45+
46+
let mut parsed_uri = Self::try_from(path.to_str().expect("invalid temp path"))
47+
.unwrap_or_else(|e| panic!("{}", e));
48+
49+
parsed_uri.is_stdio = true;
50+
51+
parsed_uri
52+
}
53+
4154
fn try_parse_uri(uri: &str) -> Result<Url, String> {
4255
if !uri.contains("://") {
4356
// local file
@@ -81,6 +94,10 @@ impl TryFrom<&str> for ParsedUriInfo {
8194
type Error = String;
8295

8396
fn try_from(uri: &str) -> Result<Self, Self::Error> {
97+
if uri == "std" {
98+
return Ok(Self::for_stdout());
99+
}
100+
84101
let uri = Self::try_parse_uri(uri)?;
85102

86103
let (scheme, path) = Self::try_parse_scheme(&uri)?;
@@ -92,6 +109,7 @@ impl TryFrom<&str> for ParsedUriInfo {
92109
bucket,
93110
path,
94111
scheme,
112+
is_stdio: false,
95113
})
96114
}
97115
}

src/object_store/object_store_cache.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ impl ObjectStoreCache {
5353
path,
5454
scheme,
5555
bucket,
56+
..
5657
} = uri_info;
5758

5859
// no need to cache local files

src/parquet_copy_hook.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
pub(crate) mod copy_from;
2+
pub(crate) mod copy_from_stdin;
23
pub(crate) mod copy_to;
34
pub(crate) mod copy_to_dest_receiver;
45
pub(crate) mod copy_to_split_dest_receiver;
6+
pub(crate) mod copy_to_stdout;
57
pub(crate) mod copy_utils;
68
pub(crate) mod hook;
79
pub(crate) mod pg_compat;

src/parquet_copy_hook/copy_from.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,12 @@ use crate::{
1818
},
1919
};
2020

21-
use super::copy_utils::{
22-
copy_from_stmt_match_by, copy_stmt_attribute_list, copy_stmt_create_namespace_item,
23-
copy_stmt_create_parse_state, create_filtered_tupledesc_for_relation,
21+
use super::{
22+
copy_from_stdin::copy_stdin_to_file,
23+
copy_utils::{
24+
copy_from_stmt_match_by, copy_stmt_attribute_list, copy_stmt_create_namespace_item,
25+
copy_stmt_create_parse_state, create_filtered_tupledesc_for_relation,
26+
},
2427
};
2528

2629
// stack to store parquet reader contexts for COPY FROM.
@@ -140,6 +143,11 @@ pub(crate) fn execute_copy_from(
140143
let match_by = copy_from_stmt_match_by(p_stmt);
141144

142145
unsafe {
146+
if uri_info.is_stdio {
147+
let is_binary = true;
148+
copy_stdin_to_file(uri_info.clone(), tupledesc.natts as _, is_binary);
149+
}
150+
143151
// parquet reader context is used throughout the COPY FROM operation.
144152
let parquet_reader_context = ParquetReaderContext::new(uri_info, match_by, &tupledesc);
145153
push_parquet_reader_context(parquet_reader_context);
Lines changed: 216 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,216 @@
1+
use std::{ffi::CStr, io::Write};
2+
3+
use pgrx::{
4+
ffi::c_char,
5+
pg_sys::{
6+
makeStringInfo, pq_beginmessage, pq_copymsgbytes, pq_endmessage, pq_getmsgstring,
7+
pq_sendbyte, pq_sendint16, QueryCancelHoldoffCount, StringInfo,
8+
},
9+
};
10+
11+
use crate::arrow_parquet::uri_utils::{uri_as_string, ParsedUriInfo};
12+
13+
/*
14+
* CopyFromStdinState is a simplified version of CopyFromState
15+
* in PostgreSQL to use in ReceiveDataFromClient with names
16+
* preserved.
17+
*/
18+
struct CopyFromStdinState {
19+
/* buffer in which we store incoming bytes */
20+
fe_msgbuf: StringInfo,
21+
22+
/* whether we reached the end-of-file */
23+
raw_reached_eof: bool,
24+
}
25+
26+
const MAX_READ_SIZE: usize = 65536;
27+
28+
/*
29+
* CopyInputToFile copies data from the socket to the given file.
30+
* We request the client send a specific column count.
31+
*/
32+
pub(crate) unsafe fn copy_stdin_to_file(uri_info: ParsedUriInfo, natts: i16, is_binary: bool) {
33+
let mut cstate = CopyFromStdinState {
34+
fe_msgbuf: makeStringInfo(),
35+
raw_reached_eof: false,
36+
};
37+
38+
/* open the destination file for writing */
39+
let path = uri_as_string(&uri_info.uri);
40+
41+
// create or overwrite the local file
42+
let mut file = std::fs::OpenOptions::new()
43+
.write(true)
44+
.truncate(true)
45+
.create(true)
46+
.open(&path)
47+
.unwrap_or_else(|e| panic!("{}", e));
48+
49+
/* tell the client we are ready for data */
50+
send_copy_in_begin(natts, is_binary);
51+
52+
/* allocate on the heap since it's quite big */
53+
let mut receive_buffer = vec![0u8; MAX_READ_SIZE];
54+
55+
while !cstate.raw_reached_eof {
56+
/* copy some bytes from the client into fe_msgbuf */
57+
let bytes_read = receive_data_from_client(&mut cstate, &mut receive_buffer);
58+
59+
if bytes_read == 0 {
60+
break;
61+
}
62+
63+
if bytes_read > 0 {
64+
/* copy bytes from fe_msgbuf to the destination file */
65+
file.write_all(&receive_buffer[..bytes_read])
66+
.unwrap_or_else(|e| {
67+
panic!("could not write to file: {}", e);
68+
});
69+
}
70+
}
71+
}
72+
73+
/*
74+
* send_copy_in_begin sends the CopyInResponse message that the client
75+
* expects after a COPY .. FROM STDIN.
76+
*
77+
* This code is adapted from ReceiveCopyBegin in PostgreSQL.
78+
*/
79+
unsafe fn send_copy_in_begin(natts: i16, is_binary: bool) {
80+
let buf = makeStringInfo();
81+
82+
pq_beginmessage(buf, 'G' as _);
83+
84+
let copy_format = if is_binary { 1 } else { 0 };
85+
pq_sendbyte(buf, copy_format);
86+
87+
pq_sendint16(buf, natts as _);
88+
for _ in 0..natts {
89+
/* use the same format for all columns */
90+
pq_sendint16(buf, copy_format as _);
91+
}
92+
93+
pq_endmessage(buf);
94+
((*PqCommMethods).flush)();
95+
}
96+
97+
const PQ_LARGE_MESSAGE_LIMIT: i32 = 1024 * 1024 * 1024 - 3;
98+
const PQ_SMALL_MESSAGE_LIMIT: i32 = 10000;
99+
100+
unsafe fn receive_data_from_client(
101+
cstate: &mut CopyFromStdinState,
102+
receive_buffer: &mut [u8],
103+
) -> usize {
104+
let mut databuf = receive_buffer;
105+
106+
let minread = 1;
107+
let mut maxread = MAX_READ_SIZE;
108+
109+
let mut bytesread = 0;
110+
111+
while maxread > 0 && bytesread < minread && !cstate.raw_reached_eof {
112+
let mut avail;
113+
let mut flushed = false;
114+
115+
while flushed || (*cstate.fe_msgbuf).cursor >= (*cstate.fe_msgbuf).len {
116+
/* Try to receive another message */
117+
118+
QueryCancelHoldoffCount += 1;
119+
120+
pq_startmsgread();
121+
122+
let mtype = pq_getbyte();
123+
if mtype == -1 {
124+
panic!("unexpected EOF on client connection with an open transaction");
125+
}
126+
127+
/* Validate message type and set packet size limit */
128+
let maxmsglen = match mtype as u8 as char {
129+
'd' =>
130+
/* CopyData */
131+
{
132+
PQ_LARGE_MESSAGE_LIMIT
133+
}
134+
'c' | 'f' | 'H' | 'S' =>
135+
/* CopyDone, CopyFail, Flush, Sync */
136+
{
137+
PQ_SMALL_MESSAGE_LIMIT
138+
}
139+
_ => {
140+
panic!(
141+
"unexpected message type 0x{:02X} during COPY from stdin",
142+
mtype
143+
);
144+
}
145+
};
146+
147+
/* Now collect the message body */
148+
if pq_getmessage(cstate.fe_msgbuf, maxmsglen) != 0 {
149+
panic!("unexpected EOF on client connection with an open transaction");
150+
}
151+
152+
QueryCancelHoldoffCount -= 1;
153+
154+
/* ... and process it */
155+
match mtype as u8 as char {
156+
'd' => {
157+
/* CopyData */
158+
break;
159+
}
160+
'c' => {
161+
/* CopyDone */
162+
cstate.raw_reached_eof = true;
163+
return bytesread;
164+
}
165+
'f' => {
166+
/* CopyFail */
167+
let msg = pq_getmsgstring(cstate.fe_msgbuf);
168+
let msg = CStr::from_ptr(msg).to_str().expect("invalid CStr");
169+
panic!("COPY from stdin failed: {msg}");
170+
}
171+
'H' | 'S' => {
172+
/* Flush, Sync */
173+
flushed = true;
174+
continue;
175+
}
176+
_ => {
177+
panic!(
178+
"unexpected message type 0x{:02X} during COPY from stdin",
179+
mtype
180+
);
181+
}
182+
}
183+
}
184+
185+
avail = ((*cstate.fe_msgbuf).len - (*cstate.fe_msgbuf).cursor) as _;
186+
if avail > maxread {
187+
avail = maxread;
188+
}
189+
190+
pq_copymsgbytes(cstate.fe_msgbuf, databuf.as_mut_ptr() as _, avail as _);
191+
databuf = &mut databuf[avail..];
192+
maxread -= avail;
193+
bytesread += avail;
194+
}
195+
196+
bytesread
197+
}
198+
199+
// todo: move to pgrx (include libpq.h)
200+
#[repr(C)]
201+
struct PQcommMethods {
202+
comm_reset: unsafe extern "C" fn(),
203+
flush: unsafe extern "C" fn() -> i32,
204+
flush_if_writable: unsafe extern "C" fn() -> i32,
205+
is_send_pending: unsafe extern "C" fn() -> bool,
206+
putmessage: unsafe extern "C" fn(msgtype: u32, s: *const c_char, len: usize) -> i32,
207+
putmessage_noblock: unsafe extern "C" fn(msgtype: u32, s: *const c_char, len: usize),
208+
}
209+
210+
unsafe extern "C" {
211+
fn pq_startmsgread();
212+
fn pq_getmessage(s: StringInfo, maxlen: i32) -> i32;
213+
fn pq_getbyte() -> i32;
214+
215+
static PqCommMethods: *mut PQcommMethods;
216+
}

0 commit comments

Comments
 (0)