feat: Distributed Procedure Support - prestissimo native changes#26375
feat: Distributed Procedure Support - prestissimo native changes#26375hantangwangd wants to merge 1 commit intoprestodb:masterfrom
Conversation
There was a problem hiding this comment.
Sorry @hantangwangd, your pull request is larger than the review limit of 150000 diff characters
db906b2 to
b59a17f
Compare
34f0586 to
4a75c9b
Compare
4a75c9b to
782ce14
Compare
There was a problem hiding this comment.
Pull request overview
Adds native-worker support paths for calling distributed procedures (Iceberg) by extending the native protocol and query-plan conversion, and introduces Iceberg procedure tests in the native execution module.
Changes:
- Adds native Iceberg tests for
system.rewrite_data_filesbehavior and parameter validation. - Extends Iceberg connector protocol to carry a distributed procedure handle.
- Implements plan conversion and connector plumbing to produce a Velox
TableWriteNodeforCallDistributedProcedureNode.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
| presto-native-execution/src/test/java/com/facebook/presto/nativeworker/iceberg/TestRewriteDataFilesProcedure.java | Adds native/expected runner integration tests for Iceberg rewrite_data_files. |
| presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/IcebergConnectorProtocol.h | Wires Iceberg distributed procedure handle into connector protocol template. |
| presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.h | Declares plan conversion overload for CallDistributedProcedureNode. |
| presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.cpp | Converts CallDistributedProcedureNode into a Velox TableWriteNode using an execute-procedure writer target. |
| presto-native-execution/presto_cpp/main/connectors/PrestoToVeloxConnector.h | Adds a virtual hook to build a Velox insert handle for execute-procedure targets. |
| presto-native-execution/presto_cpp/main/connectors/IcebergPrestoToVeloxConnector.h | Declares Iceberg override for execute-procedure insert handle conversion. |
| presto-native-execution/presto_cpp/main/connectors/IcebergPrestoToVeloxConnector.cpp | Implements Iceberg distributed procedure handle -> Velox IcebergInsertTableHandle conversion. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
...on/src/test/java/com/facebook/presto/nativeworker/iceberg/TestRewriteDataFilesProcedure.java
Outdated
Show resolved
Hide resolved
| assertEquals(result.getOnlyValue(), 0L); | ||
|
|
||
| // do not support rewrite files filtered by non-identity columns | ||
| assertQueryFails(format("call system.rewrite_data_files(table_name => '%s', schema => '%s', filter => 'c1 > 3')", tableName, schemaName), ".*"); |
There was a problem hiding this comment.
Using \".*\" as the expected failure message makes the test effectively only assert that something failed, which can hide regressions (e.g., failures coming from unrelated causes). Prefer asserting a more specific error message or at least a distinctive substring/regex that indicates the intended failure mode (unsupported filter / non-identity column filter).
| assertQueryFails(format("call system.rewrite_data_files(table_name => '%s', schema => '%s', filter => 'c1 > 3')", tableName, schemaName), ".*"); | |
| assertQueryFails( | |
| format("call system.rewrite_data_files(table_name => '%s', schema => '%s', filter => 'c1 > 3')", tableName, schemaName), | |
| ".*non-identity.*column.*"); |
| assertUpdate("INSERT INTO " + tableName + " values(7, 'foo'), (8, 'bar')", 2); | ||
| assertUpdate("INSERT INTO " + tableName + " values(9, 'foo'), (10, 'bar')", 2); | ||
|
|
||
| //The number of data files is 5,and the number of delete files is 0 |
There was a problem hiding this comment.
These comments contain a non-ASCII comma (,) and inconsistent spacing (//The). This makes the file inconsistent with typical style checks and can cause unnecessary diffs/formatting issues. Consider normalizing to ASCII punctuation and standard comment spacing (e.g., // The number of data files is 5, and the number of delete files is 0).
| result = getExpectedQueryRunner().execute(getSession(), "DELETE from " + tableName + " WHERE c1 in (9, 10)", ImmutableList.of(BigintType.BIGINT)); | ||
| assertEquals(result.getOnlyValue(), 2L); | ||
|
|
||
| //The number of data files is 5,and the number of delete files is 2 |
There was a problem hiding this comment.
These comments contain a non-ASCII comma (,) and inconsistent spacing (//The). This makes the file inconsistent with typical style checks and can cause unnecessary diffs/formatting issues. Consider normalizing to ASCII punctuation and standard comment spacing (e.g., // The number of data files is 5, and the number of delete files is 0).
|
|
||
| assertUpdate(format("CALL system.rewrite_data_files(table_name => '%s', schema => '%s')", tableName, schemaName), 7); | ||
|
|
||
| //The number of data files is 1,and the number of delete files is 0 |
There was a problem hiding this comment.
These comments contain a non-ASCII comma (,) and inconsistent spacing (//The). This makes the file inconsistent with typical style checks and can cause unnecessary diffs/formatting issues. Consider normalizing to ASCII punctuation and standard comment spacing (e.g., // The number of data files is 5, and the number of delete files is 0).
|
|
||
| return std::make_unique< | ||
| velox::connector::hive::iceberg::IcebergInsertTableHandle>( | ||
| inputColumns, | ||
| std::make_shared<velox::connector::hive::LocationHandle>( | ||
| fmt::format("{}/data", icebergDistributedProcedureHandle->outputPath), | ||
| fmt::format("{}/data", icebergDistributedProcedureHandle->outputPath), |
There was a problem hiding this comment.
The formatted \"{}/data\" path is computed twice with the same expression. Consider storing it in a local variable (or using one fmt::format call) and passing it for both arguments to reduce duplication and make future changes less error-prone.
| return std::make_unique< | |
| velox::connector::hive::iceberg::IcebergInsertTableHandle>( | |
| inputColumns, | |
| std::make_shared<velox::connector::hive::LocationHandle>( | |
| fmt::format("{}/data", icebergDistributedProcedureHandle->outputPath), | |
| fmt::format("{}/data", icebergDistributedProcedureHandle->outputPath), | |
| const auto dataPath = | |
| fmt::format("{}/data", icebergDistributedProcedureHandle->outputPath); | |
| return std::make_unique< | |
| velox::connector::hive::iceberg::IcebergInsertTableHandle>( | |
| inputColumns, | |
| std::make_shared<velox::connector::hive::LocationHandle>( | |
| dataPath, | |
| dataPath, |
| if (!executeProcedureHandle) { | ||
| VELOX_UNSUPPORTED( | ||
| "Unsupported execute procedure handle: {}", | ||
| toJsonString(tableWriteInfo->writerTarget)); | ||
| } |
There was a problem hiding this comment.
Both failure branches emit the same generic message. This makes it harder to diagnose whether the issue is (a) writerTarget not being an ExecuteProcedureHandle vs (b) the connector not implementing toVeloxInsertTableHandle for this procedure handle type. Consider differentiating the messages (e.g., include the actual _type / connector id / connector handle type) so failures are actionable.
|
|
||
| if (!connectorInsertHandle) { | ||
| VELOX_UNSUPPORTED( | ||
| "Unsupported execute procedure handle: {}", |
There was a problem hiding this comment.
Both failure branches emit the same generic message. This makes it harder to diagnose whether the issue is (a) writerTarget not being an ExecuteProcedureHandle vs (b) the connector not implementing toVeloxInsertTableHandle for this procedure handle type. Consider differentiating the messages (e.g., include the actual _type / connector id / connector handle type) so failures are actionable.
| "Unsupported execute procedure handle: {}", | |
| "Connector '{}' does not support execute procedure handle of type '{}' for writer target: {}", | |
| connectorId, | |
| executeProcedureHandle->handle.connectorHandle->_type, |
| @Test | ||
| public void testRewriteDataFilesInEmptyTable() |
There was a problem hiding this comment.
The PR description lists the Test Plan as N/A, but this PR adds a new test class exercising system.rewrite_data_files. Consider updating the Test Plan section to reflect the added/expected tests (e.g., how to run this test suite/module) to keep the PR metadata accurate.
782ce14 to
3a9ab7a
Compare
Description
This PR is the third part of many PRs to support distributed procedure into Presto. It is a split of the original entire PR which is located here: #22659.
The whole work in this PR includes the necessary presto c++ protocol and native changes to support calling distributed procedures in native workers.
Motivation and Context
prestodb/rfcs#12
Impact
N/A
Test Plan
N/A
Contributor checklist
Release Notes