Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 41 additions & 25 deletions src/parquet_copy_hook/copy_utils.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,29 @@
use std::{ffi::CStr, str::FromStr};

use pgrx::{
is_a,
ereport, is_a,
pg_sys::{
addRangeTableEntryForRelation, defGetInt32, defGetInt64, defGetString, get_namespace_name,
get_rel_namespace, makeDefElem, makeString, make_parsestate, quote_qualified_identifier,
AccessShareLock, AsPgCStr, CopyStmt, CreateTemplateTupleDesc, DefElem, List, NoLock, Node,
NodeTag::T_CopyStmt, Oid, ParseNamespaceItem, ParseState, PlannedStmt, QueryEnvironment,
RangeVar, RangeVarGetRelidExtended, RowExclusiveLock, TupleDescInitEntry,
},
PgBox, PgList, PgRelation, PgTupleDesc,
PgBox, PgList, PgLogLevel, PgRelation, PgSqlErrorCode, PgTupleDesc,
};
use url::Url;

use crate::arrow_parquet::{
compression::{all_supported_compressions, PgParquetCompression},
match_by::MatchBy,
parquet_writer::{DEFAULT_ROW_GROUP_SIZE, DEFAULT_ROW_GROUP_SIZE_BYTES},
uri_utils::parse_uri,
use crate::{
arrow_parquet::{
compression::{all_supported_compressions, PgParquetCompression},
match_by::MatchBy,
parquet_writer::{DEFAULT_ROW_GROUP_SIZE, DEFAULT_ROW_GROUP_SIZE_BYTES},
uri_utils::parse_uri,
},
pgrx_utils::extension_exists,
};

use super::pg_compat::strVal;
use super::{hook::ENABLE_PARQUET_COPY_HOOK, pg_compat::strVal};

pub(crate) fn validate_copy_to_options(p_stmt: &PgBox<PlannedStmt>, uri: &Url) {
validate_copy_option_names(
Expand Down Expand Up @@ -297,7 +300,12 @@
PgBox::null()
}

pub(crate) fn is_copy_to_parquet_stmt(p_stmt: &PgBox<PlannedStmt>) -> bool {
fn is_copy_parquet_stmt(p_stmt: &PgBox<PlannedStmt>, copy_from: bool) -> bool {
// the GUC pg_parquet.enable_copy_hook must be set to true
if !ENABLE_PARQUET_COPY_HOOK.get() {
return false;
}

let is_copy_stmt = unsafe { is_a(p_stmt.utilityStmt, T_CopyStmt) };

if !is_copy_stmt {
Expand All @@ -306,7 +314,7 @@

let copy_stmt = unsafe { PgBox::<CopyStmt>::from_pg(p_stmt.utilityStmt as _) };

if copy_stmt.is_from {
if copy_from != copy_stmt.is_from {
return false;
}

Expand All @@ -320,33 +328,41 @@

let uri = copy_stmt_uri(p_stmt).expect("uri is None");

is_parquet_format_option(p_stmt) || is_parquet_uri(uri)
}

pub(crate) fn is_copy_from_parquet_stmt(p_stmt: &PgBox<PlannedStmt>) -> bool {
let is_copy_stmt = unsafe { is_a(p_stmt.utilityStmt, T_CopyStmt) };

if !is_copy_stmt {
if !is_parquet_format_option(p_stmt) && !is_parquet_uri(uri) {
return false;
}

let copy_stmt = unsafe { PgBox::<CopyStmt>::from_pg(p_stmt.utilityStmt as _) };
// extension checks are done via catalog (not yet searched via cache by postgres till pg18)
// this is why we check them after the uri checks

if !copy_stmt.is_from {
// crunchy_query_engine should not be created
if extension_exists("crunchy_query_engine") {
return false;
}

if copy_stmt.is_program {
return false;
}
// pg_parquet should be created
if !extension_exists("pg_parquet") {
ereport!(
PgLogLevel::WARNING,
PgSqlErrorCode::ERRCODE_WARNING,
"pg_parquet can handle this COPY command but is not enabled",
"Run CREATE EXTENSION pg_parquet; to enable the pg_parquet extension.",
);

Check warning on line 350 in src/parquet_copy_hook/copy_utils.rs

View check run for this annotation

Codecov / codecov/patch

src/parquet_copy_hook/copy_utils.rs#L345-L350

Added lines #L345 - L350 were not covered by tests

if copy_stmt.filename.is_null() {
return false;
}

let uri = copy_stmt_uri(p_stmt).expect("uri is None");
true
}

is_parquet_format_option(p_stmt) || is_parquet_uri(uri)
pub(crate) fn is_copy_to_parquet_stmt(p_stmt: &PgBox<PlannedStmt>) -> bool {
let copy_from = false;
is_copy_parquet_stmt(p_stmt, copy_from)
}

pub(crate) fn is_copy_from_parquet_stmt(p_stmt: &PgBox<PlannedStmt>) -> bool {
let copy_from = true;
is_copy_parquet_stmt(p_stmt, copy_from)
}

fn is_parquet_format_option(p_stmt: &PgBox<PlannedStmt>) -> bool {
Expand Down
4 changes: 2 additions & 2 deletions src/parquet_copy_hook/hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,15 +128,15 @@ extern "C" fn parquet_copy_hook(
let query_env = unsafe { PgBox::from_pg(query_env) };
let mut completion_tag = unsafe { PgBox::from_pg(completion_tag) };

if ENABLE_PARQUET_COPY_HOOK.get() && is_copy_to_parquet_stmt(&p_stmt) {
if is_copy_to_parquet_stmt(&p_stmt) {
let nprocessed = process_copy_to_parquet(&p_stmt, query_string, &params, &query_env);

if !completion_tag.is_null() {
completion_tag.nprocessed = nprocessed;
completion_tag.commandTag = CommandTag::CMDTAG_COPY;
}
return;
} else if ENABLE_PARQUET_COPY_HOOK.get() && is_copy_from_parquet_stmt(&p_stmt) {
} else if is_copy_from_parquet_stmt(&p_stmt) {
let nprocessed = process_copy_from_parquet(&p_stmt, query_string, &query_env);

if !completion_tag.is_null() {
Expand Down
10 changes: 8 additions & 2 deletions src/pgrx_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use std::collections::HashSet;

use pgrx::{
pg_sys::{
getBaseType, get_element_type, lookup_rowtype_tupdesc, type_is_array, type_is_rowtype,
FormData_pg_attribute, InvalidOid, Oid,
getBaseType, get_element_type, get_extension_oid, lookup_rowtype_tupdesc, type_is_array,
type_is_rowtype, AsPgCStr, FormData_pg_attribute, InvalidOid, Oid,
},
PgTupleDesc,
};
Expand Down Expand Up @@ -99,3 +99,9 @@ pub(crate) fn domain_array_base_elem_typoid(domain_typoid: Oid) -> Oid {

array_element_typoid(base_array_typoid)
}

pub(crate) fn extension_exists(extension_name: &str) -> bool {
let extension_name = extension_name.as_pg_cstr();
let extension_oid = unsafe { get_extension_oid(extension_name, true) };
extension_oid != InvalidOid
}
Loading