Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,36 @@ IcebergPrestoToVeloxConnector::toVeloxTableHandle(
typeParser);
}

std::unique_ptr<velox::connector::ConnectorInsertTableHandle>
IcebergPrestoToVeloxConnector::toVeloxInsertTableHandle(
const protocol::ExecuteProcedureHandle* executeProcedureHandle,
const TypeParser& typeParser) const {
auto icebergDistributedProcedureHandle = std::dynamic_pointer_cast<
protocol::iceberg::IcebergDistributedProcedureHandle>(
executeProcedureHandle->handle.connectorHandle);

VELOX_CHECK_NOT_NULL(
icebergDistributedProcedureHandle,
"Unexpected call distributed procedure handle type {}",
executeProcedureHandle->handle.connectorHandle->_type);

const auto inputColumns = toIcebergColumns(
icebergDistributedProcedureHandle->inputColumns, typeParser);

return std::make_unique<
velox::connector::hive::iceberg::IcebergInsertTableHandle>(
inputColumns,
std::make_shared<velox::connector::hive::LocationHandle>(
fmt::format("{}/data", icebergDistributedProcedureHandle->outputPath),
fmt::format("{}/data", icebergDistributedProcedureHandle->outputPath),
velox::connector::hive::LocationHandle::TableType::kExisting),
toVeloxFileFormat(icebergDistributedProcedureHandle->fileFormat),
toVeloxIcebergPartitionSpec(
icebergDistributedProcedureHandle->partitionSpec, typeParser),
std::optional(toFileCompressionKind(
icebergDistributedProcedureHandle->compressionCodec)));
}

std::unique_ptr<protocol::ConnectorProtocol>
IcebergPrestoToVeloxConnector::createConnectorProtocol() const {
return std::make_unique<protocol::iceberg::IcebergConnectorProtocol>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ class IcebergPrestoToVeloxConnector final : public PrestoToVeloxConnector {
std::unique_ptr<protocol::ConnectorProtocol> createConnectorProtocol()
const final;

std::unique_ptr<velox::connector::ConnectorInsertTableHandle>
toVeloxInsertTableHandle(
const protocol::ExecuteProcedureHandle* executeProcedureHandle,
const TypeParser& typeParser) const final;

std::unique_ptr<velox::connector::ConnectorInsertTableHandle>
toVeloxInsertTableHandle(
const protocol::CreateHandle* createHandle,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,14 @@ class PrestoToVeloxConnector {
return {};
}

[[nodiscard]] virtual std::unique_ptr<
velox::connector::ConnectorInsertTableHandle>
toVeloxInsertTableHandle(
const protocol::ExecuteProcedureHandle* executeProcedureHandle,
const TypeParser& typeParser) const {
return {};
}

[[nodiscard]] std::unique_ptr<velox::core::PartitionFunctionSpec>
createVeloxPartitionFunctionSpec(
const protocol::ConnectorPartitioningHandle* partitioningHandle,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1578,6 +1578,61 @@ VeloxQueryPlanConverterBase::toVeloxQueryPlan(
sourceVeloxPlan);
}

std::shared_ptr<const core::TableWriteNode>
VeloxQueryPlanConverterBase::toVeloxQueryPlan(
const std::shared_ptr<const protocol::CallDistributedProcedureNode>& node,
const std::shared_ptr<protocol::TableWriteInfo>& tableWriteInfo,
const protocol::TaskId& taskId) {
const auto executeProcedureHandle =
std::dynamic_pointer_cast<protocol::ExecuteProcedureHandle>(
tableWriteInfo->writerTarget);

if (!executeProcedureHandle) {
VELOX_UNSUPPORTED(
"Unsupported execute procedure handle: {}",
toJsonString(tableWriteInfo->writerTarget));
}

std::string connectorId = executeProcedureHandle->handle.connectorId;
auto& connector = getPrestoToVeloxConnector(
executeProcedureHandle->handle.connectorHandle->_type);
auto veloxHandle = connector.toVeloxInsertTableHandle(
executeProcedureHandle.get(), typeParser_);
auto connectorInsertHandle = std::shared_ptr(std::move(veloxHandle));

if (!connectorInsertHandle) {
VELOX_UNSUPPORTED(
"Connector '{}' does not support execute procedure handle of type '{}' for writer target: {}",
connectorId,
executeProcedureHandle->handle.connectorHandle->_type,
toJsonString(tableWriteInfo->writerTarget));
}

auto insertTableHandle = std::make_shared<core::InsertTableHandle>(
connectorId, connectorInsertHandle);

const auto outputType = toRowType(
generateOutputVariables(
{node->rowCountVariable,
node->fragmentVariable,
node->tableCommitContextVariable},
nullptr),
typeParser_);
const auto sourceVeloxPlan =
toVeloxQueryPlan(node->source, tableWriteInfo, taskId);

return std::make_shared<core::TableWriteNode>(
node->id,
toRowType(node->columns, typeParser_),
node->columnNames,
std::nullopt,
std::move(insertTableHandle),
node->partitioningScheme != nullptr,
outputType,
getCommitStrategy(),
sourceVeloxPlan);
}

std::shared_ptr<const core::TableWriteNode>
VeloxQueryPlanConverterBase::toVeloxQueryPlan(
const std::shared_ptr<const protocol::DeleteNode>& node,
Expand Down Expand Up @@ -1982,6 +2037,10 @@ core::PlanNodePtr VeloxQueryPlanConverterBase::toVeloxQueryPlan(
std::dynamic_pointer_cast<const protocol::TableWriterNode>(node)) {
return toVeloxQueryPlan(tableWriter, tableWriteInfo, taskId);
}
if (auto callDistributedProcedure = std::dynamic_pointer_cast<
const protocol::CallDistributedProcedureNode>(node)) {
return toVeloxQueryPlan(callDistributedProcedure, tableWriteInfo, taskId);
}
if (auto deleteNode =
std::dynamic_pointer_cast<const protocol::DeleteNode>(node)) {
return toVeloxQueryPlan(deleteNode, tableWriteInfo, taskId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,11 @@ class VeloxQueryPlanConverterBase {
const std::shared_ptr<protocol::TableWriteInfo>& tableWriteInfo,
const protocol::TaskId& taskId);

std::shared_ptr<const velox::core::TableWriteNode> toVeloxQueryPlan(
const std::shared_ptr<const protocol::CallDistributedProcedureNode>& node,
const std::shared_ptr<protocol::TableWriteInfo>& tableWriteInfo,
const protocol::TaskId& taskId);

std::shared_ptr<const velox::core::TableWriteMergeNode> toVeloxQueryPlan(
const std::shared_ptr<const protocol::TableWriterMergeNode>& node,
const std::shared_ptr<protocol::TableWriteInfo>& tableWriteInfo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ using IcebergConnectorProtocol = ConnectorProtocolTemplate<
IcebergSplit,
NotImplemented,
hive::HiveTransactionHandle,
NotImplemented,
IcebergDistributedProcedureHandle,
NotImplemented,
NotImplemented>;

Expand Down
Loading
Loading