Skip to content

Commit 324434d

Browse files
committed
address
1 parent 7b12db4 commit 324434d

File tree

8 files changed

+125
-111
lines changed

8 files changed

+125
-111
lines changed

src/arrow_parquet/uri_utils.rs

Lines changed: 49 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{ffi::CStr, panic, sync::Arc};
1+
use std::{ffi::CStr, fs::File, os::fd::FromRawFd, panic, sync::Arc};
22

33
use arrow::datatypes::SchemaRef;
44
use object_store::{path::Path, ObjectStoreScheme};
@@ -15,8 +15,9 @@ use pgrx::{
1515
ereport,
1616
ffi::c_char,
1717
pg_sys::{
18-
get_role_oid, has_privs_of_role, palloc0, superuser, AsPgCStr, DataDir, FileClose,
19-
FilePathName, GetUserId, InvalidOid, OpenTemporaryFile, TempTablespacePath, MAXPGPATH,
18+
get_role_oid, has_privs_of_role, palloc0, superuser, AsPgCStr, ClosePipeStream, DataDir,
19+
FileClose, FilePathName, GetUserId, InvalidOid, OpenPipeStream, OpenTemporaryFile,
20+
TempTablespacePath, __sFILE, MAXPGPATH, PG_BINARY_R, PG_BINARY_W,
2021
},
2122
};
2223
use url::Url;
@@ -39,22 +40,42 @@ pub(crate) struct ParsedUriInfo {
3940
pub(crate) bucket: Option<String>,
4041
pub(crate) path: Path,
4142
pub(crate) scheme: ObjectStoreScheme,
42-
pub(crate) stdio_tmp_fd: Option<i32>,
43-
pub(crate) program: Option<*mut c_char>,
43+
// tmp_fd is used as intermediate file for copying data to/from stdin/out or program pipes
44+
pub(crate) tmp_fd: Option<i32>,
45+
// pipe_file is used to hold the pipe file descriptor for copying data to/from a program
46+
// call open_program_pipe to open the pipe to a program
47+
pub(crate) is_program: bool,
48+
pub(crate) pipe_file: *mut __sFILE,
4449
}
4550

4651
impl ParsedUriInfo {
4752
pub(crate) fn for_std_inout() -> Self {
48-
Self::create_tmp_file()
53+
Self::with_tmp_file()
4954
}
5055

51-
pub(crate) fn for_program(program: *mut c_char) -> Self {
52-
let mut uri_info = Self::create_tmp_file();
53-
uri_info.program = Some(program);
56+
pub(crate) fn for_program() -> Self {
57+
let mut uri_info = Self::with_tmp_file();
58+
uri_info.is_program = true;
5459
uri_info
5560
}
5661

57-
fn create_tmp_file() -> Self {
62+
pub(crate) fn open_program_pipe(&mut self, program: &str, copy_from: bool) -> File {
63+
let pipe_mode = if copy_from { PG_BINARY_R } else { PG_BINARY_W };
64+
65+
let pipe_file = unsafe { OpenPipeStream(program.as_pg_cstr(), pipe_mode.as_ptr()) };
66+
67+
if pipe_file.is_null() {
68+
panic!("Failed to open pipe stream for program: {}", program);
69+
}
70+
71+
self.pipe_file = pipe_file;
72+
73+
let pipe_fd = (unsafe { *self.pipe_file })._file;
74+
75+
unsafe { File::from_raw_fd(pipe_fd as _) }
76+
}
77+
78+
fn with_tmp_file() -> Self {
5879
// open temp postgres file, which is removed after transaction ends
5980
let tmp_path_fd = unsafe { OpenTemporaryFile(false) };
6081

@@ -81,11 +102,15 @@ impl ParsedUriInfo {
81102

82103
let mut parsed_uri = Self::try_from(tmp_path.as_str()).unwrap_or_else(|e| panic!("{}", e));
83104

84-
parsed_uri.stdio_tmp_fd = Some(tmp_path_fd);
105+
parsed_uri.tmp_fd = Some(tmp_path_fd);
85106

86107
parsed_uri
87108
}
88109

110+
fn is_std_inout(&self) -> bool {
111+
self.tmp_fd.is_some() && !self.is_program
112+
}
113+
89114
fn try_parse_uri(uri: &str) -> Result<Url, String> {
90115
if !uri.contains("://") {
91116
// local file
@@ -139,17 +164,23 @@ impl TryFrom<&str> for ParsedUriInfo {
139164
bucket,
140165
path,
141166
scheme,
142-
stdio_tmp_fd: None,
143-
program: None,
167+
tmp_fd: None,
168+
is_program: false,
169+
pipe_file: std::ptr::null_mut(),
144170
})
145171
}
146172
}
147173

148174
impl Drop for ParsedUriInfo {
149175
fn drop(&mut self) {
150-
if let Some(stdio_tmp_fd) = self.stdio_tmp_fd {
176+
if let Some(tmp_fd) = self.tmp_fd {
151177
// close temp file, postgres api will remove it on close
152-
unsafe { FileClose(stdio_tmp_fd) };
178+
unsafe { FileClose(tmp_fd) };
179+
}
180+
181+
if !self.pipe_file.is_null() {
182+
// close pipe file, postgres api will remove it on close
183+
unsafe { ClosePipeStream(self.pipe_file) };
153184
}
154185
}
155186
}
@@ -286,17 +317,15 @@ pub(crate) fn ensure_access_privilege_to_uri(uri_info: &ParsedUriInfo, copy_from
286317
return;
287318
}
288319

289-
let is_program = uri_info.program.is_some();
290-
291320
// permission check is not needed for stdin/out
292-
if uri_info.stdio_tmp_fd.is_some() && !is_program {
321+
if uri_info.is_std_inout() {
293322
return;
294323
}
295324

296325
let user_id = unsafe { GetUserId() };
297326
let is_file = uri_info.uri.scheme() == "file";
298327

299-
let required_role_name = if is_program {
328+
let required_role_name = if uri_info.is_program {
300329
"pg_execute_server_program"
301330
} else if is_file {
302331
if copy_from {
@@ -317,7 +346,7 @@ pub(crate) fn ensure_access_privilege_to_uri(uri_info: &ParsedUriInfo, copy_from
317346
unsafe { get_role_oid(required_role_name.to_string().as_pg_cstr(), false) };
318347

319348
let operation_str = if copy_from { "from" } else { "to" };
320-
let object_type = if is_program {
349+
let object_type = if uri_info.is_program {
321350
"program"
322351
} else if is_file {
323352
"file"

src/parquet_copy_hook/copy_from.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@ use crate::{
1616
parquet_copy_hook::{
1717
copy_from_program::copy_program_to_file,
1818
copy_utils::{
19-
copy_from_stmt_create_option_list, copy_stmt_lock_mode, copy_stmt_relation_oid,
19+
copy_from_stmt_create_option_list, copy_stmt_is_std_inout, copy_stmt_lock_mode,
20+
copy_stmt_program, copy_stmt_relation_oid,
2021
},
2122
},
2223
};
@@ -120,7 +121,7 @@ pub(crate) fn execute_copy_from(
120121
p_stmt: &PgBox<PlannedStmt>,
121122
query_string: &CStr,
122123
query_env: &PgBox<QueryEnvironment>,
123-
uri_info: &ParsedUriInfo,
124+
mut uri_info: ParsedUriInfo,
124125
) -> u64 {
125126
let rel_oid = copy_stmt_relation_oid(p_stmt);
126127

@@ -149,15 +150,15 @@ pub(crate) fn execute_copy_from(
149150
let match_by = copy_from_stmt_match_by(p_stmt);
150151

151152
unsafe {
152-
if uri_info.program.is_some() {
153-
copy_program_to_file(uri_info);
154-
} else if uri_info.stdio_tmp_fd.is_some() {
153+
if let Some(program) = copy_stmt_program(p_stmt) {
154+
copy_program_to_file(&mut uri_info, &program);
155+
} else if copy_stmt_is_std_inout(p_stmt) {
155156
let is_binary = true;
156-
copy_stdin_to_file(uri_info, tupledesc.natts as _, is_binary);
157+
copy_stdin_to_file(&uri_info, tupledesc.natts as _, is_binary);
157158
}
158159

159160
// parquet reader context is used throughout the COPY FROM operation.
160-
let parquet_reader_context = ParquetReaderContext::new(uri_info, match_by, &tupledesc);
161+
let parquet_reader_context = ParquetReaderContext::new(&uri_info, match_by, &tupledesc);
161162
push_parquet_reader_context(parquet_reader_context);
162163

163164
// makes sure to set binary format
Lines changed: 8 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,39 +1,13 @@
1-
use std::{ffi::CStr, io::pipe, process::Command};
2-
31
use crate::arrow_parquet::uri_utils::{uri_as_string, ParsedUriInfo};
42

5-
pub(crate) unsafe fn copy_program_to_file(uri_info: &ParsedUriInfo) {
6-
let program = CStr::from_ptr(uri_info.program.expect("Program pointer is null"))
7-
.to_str()
8-
.unwrap_or_else(|e| panic!("Failed to convert program pointer to string: {e}"))
9-
.to_string();
10-
11-
let (mut pipe_in, pipe_out) =
12-
pipe().unwrap_or_else(|e| panic!("Failed to create command pipe: {e}"));
13-
14-
#[cfg(unix)]
15-
let mut command = Command::new("/bin/sh")
16-
.arg("-lc")
17-
.arg(program)
18-
.stdout(pipe_out)
19-
.spawn()
20-
.unwrap_or_else(|e| panic!("Failed to spawn command: {e}"));
21-
22-
#[cfg(windows)]
23-
let mut command = Command::new("cmd")
24-
.arg("/C")
25-
.arg(program)
26-
.stdout(pipe_out)
27-
.spawn()
28-
.unwrap_or_else(|e| panic!("Failed to spawn command: {e}"));
29-
30-
command
31-
.wait()
32-
.unwrap_or_else(|e| panic!("Failed to wait for command: {e}"));
33-
34-
// Write input pipe to the file
3+
pub(crate) unsafe fn copy_program_to_file(uri_info: &mut ParsedUriInfo, program: &str) {
4+
// get tmp file
355
let path = uri_as_string(&uri_info.uri);
366

7+
// open and then get pipe file
8+
let copy_from = true;
9+
let mut pipe_file = uri_info.open_program_pipe(program, copy_from);
10+
3711
// create or overwrite the local file
3812
let mut file = std::fs::OpenOptions::new()
3913
.write(true)
@@ -42,6 +16,7 @@ pub(crate) unsafe fn copy_program_to_file(uri_info: &ParsedUriInfo) {
4216
.open(&path)
4317
.unwrap_or_else(|e| panic!("{}", e));
4418

45-
std::io::copy(&mut pipe_in, &mut file)
19+
// Write pipe file to temp file
20+
std::io::copy(&mut pipe_file, &mut file)
4621
.unwrap_or_else(|e| panic!("Failed to copy command stdout to file: {e}"));
4722
}

src/parquet_copy_hook/copy_to_dest_receiver.rs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@ pub(crate) struct CopyToParquetDestReceiver {
3636
collected_tuple_column_sizes: *mut i64,
3737
target_batch_size: i64,
3838
uri: *const c_char,
39-
is_to_stdout: bool,
4039
program: *mut c_char,
40+
is_to_stdout: bool,
4141
copy_options: CopyToParquetOptions,
4242
copy_memory_context: MemoryContext,
4343
row_group_memory_context: MemoryContext,
@@ -156,10 +156,16 @@ impl CopyToParquetDestReceiver {
156156

157157
fn copy_to_program(&self) {
158158
if !self.program.is_null() {
159+
let program = unsafe {
160+
CStr::from_ptr(self.program)
161+
.to_str()
162+
.expect("invalid program")
163+
};
164+
159165
let uri = unsafe { CStr::from_ptr(self.uri).to_str().expect("invalid uri") };
160-
let uri_info = ParsedUriInfo::try_from(uri).expect("invalid uri");
166+
let mut uri_info = ParsedUriInfo::try_from(uri).expect("invalid uri");
161167

162-
unsafe { copy_file_to_program(uri_info, self.program) };
168+
unsafe { copy_file_to_program(&mut uri_info, program) };
163169
}
164170
}
165171

@@ -370,8 +376,8 @@ fn tuple_column_sizes(tuple_datums: &[Option<Datum>], tupledesc: &PgTupleDesc) -
370376
#[pg_guard]
371377
pub(crate) extern "C-unwind" fn create_copy_to_parquet_dest_receiver(
372378
uri: *const c_char,
373-
is_to_stdout: bool,
374379
program: *mut c_char,
380+
is_to_stdout: bool,
375381
options: CopyToParquetOptions,
376382
) -> *mut CopyToParquetDestReceiver {
377383
let row_group_memory_context = unsafe {
@@ -403,8 +409,8 @@ pub(crate) extern "C-unwind" fn create_copy_to_parquet_dest_receiver(
403409
parquet_dest.dest.rDestroy = Some(copy_destroy);
404410
parquet_dest.dest.mydest = CommandDest::DestCopyOut;
405411
parquet_dest.uri = uri;
406-
parquet_dest.is_to_stdout = is_to_stdout;
407412
parquet_dest.program = program;
413+
parquet_dest.is_to_stdout = is_to_stdout;
408414
parquet_dest.tupledesc = std::ptr::null_mut();
409415
parquet_dest.parquet_writer_context = std::ptr::null_mut();
410416
parquet_dest.natts = 0;
Lines changed: 9 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,48 +1,20 @@
1-
use std::{
2-
ffi::{c_char, CStr},
3-
fs::File,
4-
io::pipe,
5-
process::Command,
6-
};
1+
use std::fs::File;
72

83
use crate::arrow_parquet::uri_utils::{uri_as_string, ParsedUriInfo};
94

10-
pub(crate) unsafe fn copy_file_to_program(uri_info: ParsedUriInfo, program: *mut c_char) {
11-
let (pipe_in, mut pipe_out) =
12-
pipe().unwrap_or_else(|e| panic!("Failed to create command pipe: {e}"));
13-
14-
let program = CStr::from_ptr(program).to_string_lossy().into_owned();
15-
16-
#[cfg(unix)]
17-
let mut command = Command::new("/bin/sh")
18-
.arg("-lc")
19-
.arg(program)
20-
.stdin(pipe_in)
21-
.spawn()
22-
.unwrap_or_else(|e| panic!("Failed to spawn command: {e}"));
23-
24-
#[cfg(windows)]
25-
let mut command = Command::new("cmd")
26-
.arg("/C")
27-
.arg(program)
28-
.stdin(pipe_in)
29-
.spawn()
30-
.unwrap_or_else(|e| panic!("Failed to spawn command: {e}"));
31-
32-
// Write the file to output pipe
5+
pub(crate) unsafe fn copy_file_to_program(uri_info: &mut ParsedUriInfo, program: &str) {
6+
// get tmp file
337
let path = uri_as_string(&uri_info.uri);
348

359
let mut file = File::open(path).unwrap_or_else(|e| {
3610
panic!("could not open temp file: {e}");
3711
});
3812

39-
std::io::copy(&mut file, &mut pipe_out)
40-
.unwrap_or_else(|e| panic!("Failed to copy file to command stdin: {e}"));
41-
42-
// close output pipe to unblock the command
43-
drop(pipe_out);
13+
// open and then get pipe file
14+
let copy_from = false;
15+
let mut pipe_file = uri_info.open_program_pipe(program, copy_from);
4416

45-
command
46-
.wait()
47-
.unwrap_or_else(|e| panic!("Failed to wait for command: {e}"));
17+
// Write temp file to pipe file
18+
std::io::copy(&mut file, &mut pipe_file)
19+
.unwrap_or_else(|e| panic!("Failed to copy file to command stdin: {e}"));
4820
}

src/parquet_copy_hook/copy_to_split_dest_receiver.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ pub(crate) const INVALID_FILE_SIZE_BYTES: i64 = 0;
2323
struct CopyToParquetSplitDestReceiver {
2424
dest: DestReceiver,
2525
uri: *const c_char,
26-
is_to_stdout: bool,
2726
program: *mut c_char,
27+
is_to_stdout: bool,
2828
tupledesc: TupleDesc,
2929
operation: i32,
3030
options: CopyToParquetOptions,
@@ -50,8 +50,8 @@ impl CopyToParquetSplitDestReceiver {
5050
let child_uri = self.create_uri_for_child();
5151
self.current_child_receiver = create_copy_to_parquet_dest_receiver(
5252
child_uri,
53-
self.is_to_stdout,
5453
self.program,
54+
self.is_to_stdout,
5555
self.options,
5656
);
5757
self.current_child_id += 1;
@@ -212,8 +212,8 @@ extern "C-unwind" fn copy_split_destroy(_dest: *mut DestReceiver) {}
212212
#[allow(clippy::too_many_arguments)]
213213
pub extern "C-unwind" fn create_copy_to_parquet_split_dest_receiver(
214214
uri: *const c_char,
215-
is_to_stdout: bool,
216215
program: *mut c_char,
216+
is_to_stdout: bool,
217217
file_size_bytes: *const i64,
218218
field_ids: *const c_char,
219219
row_group_size: *const i64,

0 commit comments

Comments
 (0)