feat(optimizer): Rewrite bucketed semi-join to inner join#27510
feat(optimizer): Rewrite bucketed semi-join to inner join#27510kaikalur wants to merge 1 commit intoprestodb:masterfrom
Conversation
Reviewer's GuideImplements a new iterative optimizer rule that rewrites eligible bucketed semi-joins into inner joins with a DISTINCT build side, guarded by a new session property and wired into the optimizer pipeline with comprehensive unit tests using a mock bucketed TPCH connector. Sequence diagram for RewriteBucketedSemiJoinToInnerJoin rule applicationsequenceDiagram
participant Session
participant PlanOptimizers
participant IterativeOptimizer
participant RewriteRule as RewriteBucketedSemiJoinToInnerJoin
participant Context
participant Metadata
Session->>PlanOptimizers: create with FeaturesConfig
PlanOptimizers->>IterativeOptimizer: register rule RewriteBucketedSemiJoinToInnerJoin
loop optimization_iterations
IterativeOptimizer->>RewriteRule: isEnabled(session)
RewriteRule->>Session: isRewriteBucketedSemiJoinToInnerJoinEnabled
Session-->>RewriteRule: boolean enabled
alt rule disabled
RewriteRule-->>IterativeOptimizer: disabled
else rule enabled
IterativeOptimizer->>RewriteRule: apply(SemiJoinNode, captures, context)
RewriteRule->>Context: getLookup.resolve(source)
RewriteRule->>Context: getLookup.resolve(filteringSource)
par resolve_source_table_scan
RewriteRule->>Context: findTableScanAndResolveVariable(source)
Context-->>RewriteRule: Optional TableScanInfo
and resolve_filter_table_scan
RewriteRule->>Context: findTableScanAndResolveVariable(filteringSource)
Context-->>RewriteRule: Optional TableScanInfo
end
alt missing_table_scan
RewriteRule-->>IterativeOptimizer: Result.empty
else found_table_scans
RewriteRule->>Metadata: getTableMetadata(source.table)
Metadata-->>RewriteRule: properties including bucketed_by
RewriteRule->>Metadata: getColumnMetadata(source.table, source.columnHandle)
Metadata-->>RewriteRule: source column name
RewriteRule->>Metadata: getTableMetadata(filtering.table)
Metadata-->>RewriteRule: properties including bucketed_by
RewriteRule->>Metadata: getColumnMetadata(filtering.table, filtering.columnHandle)
Metadata-->>RewriteRule: filtering column name
alt both_sides_bucketed_by_join_key
RewriteRule->>Context: isOutputDistinct(filteringSource)
Context-->>RewriteRule: boolean isDistinct
alt not_distinct
RewriteRule->>Context: create AggregationNode DISTINCT on filteringSource
else already_distinct
RewriteRule->>RewriteRule: reuse existing filteringSource
end
RewriteRule->>Context: create InnerJoinNode(source, distinctFilteringSource)
RewriteRule->>Context: create ProjectNode(InnerJoinNode, semiJoinOutput := TRUE)
RewriteRule-->>IterativeOptimizer: Result.ofPlanNode(ProjectNode)
else not_bucketed
RewriteRule-->>IterativeOptimizer: Result.empty
end
end
end
end
Updated class diagram for bucketed semi-join rewrite and configurationclassDiagram
class FeaturesConfig {
- boolean addExchangeBelowPartialAggregationOverGroupId
- boolean addDistinctBelowSemiJoinBuild
- boolean rewriteBucketedSemiJoinToInnerJoin
- boolean mergeMaxByMinByAggregationsEnabled
+ FeaturesConfig setRewriteBucketedSemiJoinToInnerJoin(boolean rewriteBucketedSemiJoinToInnerJoin)
+ boolean isRewriteBucketedSemiJoinToInnerJoin()
}
class SystemSessionProperties {
<<final>>
+ String REWRITE_BUCKETED_SEMI_JOIN_TO_INNER_JOIN
+ booleanProperty(String name, String description, boolean defaultValue, boolean hidden)
+ boolean isRewriteBucketedSemiJoinToInnerJoinEnabled(Session session)
}
class PlanOptimizers {
+ PlanOptimizers(Metadata metadata, RuleStats ruleStats, StatsCalculator statsCalculator, CostCalculator estimatedExchangesCostCalculator)
}
class IterativeOptimizer {
+ IterativeOptimizer(Metadata metadata, RuleStats ruleStats, StatsCalculator statsCalculator, CostCalculator estimatedExchangesCostCalculator, Set~Rule~ rules)
+ Result optimize(Session session, PlanNode plan)
}
class RewriteBucketedSemiJoinToInnerJoin {
- Metadata metadata
- String BUCKETED_BY_PROPERTY
+ RewriteBucketedSemiJoinToInnerJoin(Metadata metadata)
+ Pattern~SemiJoinNode~ getPattern()
+ boolean isEnabled(Session session)
+ Result apply(SemiJoinNode node, Captures captures, Context context)
- Optional~TableScanInfo~ findTableScanAndResolveVariable(PlanNode node, VariableReferenceExpression variable, Context context)
- boolean isBucketedByColumn(TableScanInfo info, Session session)
- boolean isOutputDistinct(PlanNode node, VariableReferenceExpression output, Context context)
}
class TableScanInfo {
- TableScanNode tableScan
- ColumnHandle columnHandle
+ TableScanInfo(TableScanNode tableScan, ColumnHandle columnHandle)
}
class SemiJoinNode {
+ PlanNode getSource()
+ PlanNode getFilteringSource()
+ VariableReferenceExpression getSourceJoinVariable()
+ VariableReferenceExpression getFilteringSourceJoinVariable()
+ VariableReferenceExpression getSemiJoinOutput()
}
class JoinNode {
+ JoinType joinType
+ List~EquiJoinClause~ criteria
}
class AggregationNode {
+ Step step
+ List~VariableReferenceExpression~ groupingKeys
+ static boolean isDistinct(AggregationNode node)
+ static GroupingSetDescriptor singleGroupingSet(List~VariableReferenceExpression~ keys)
}
class ProjectNode {
+ Assignments assignments
}
class Metadata {
+ TableMetadata getTableMetadata(Session session, TableHandle table)
+ ColumnMetadata getColumnMetadata(Session session, TableHandle table, ColumnHandle columnHandle)
}
class Session
class RuleStats
class StatsCalculator
class CostCalculator
class Rule
class Pattern
class Captures
class Context {
+ Session getSession()
+ IdAllocator getIdAllocator()
+ Lookup getLookup()
}
class Lookup {
+ PlanNode resolve(PlanNode node)
}
RewriteBucketedSemiJoinToInnerJoin ..> Metadata : uses
RewriteBucketedSemiJoinToInnerJoin ..> TableScanInfo : creates
RewriteBucketedSemiJoinToInnerJoin ..> SemiJoinNode : transforms
RewriteBucketedSemiJoinToInnerJoin ..> JoinNode : creates
RewriteBucketedSemiJoinToInnerJoin ..> AggregationNode : creates
RewriteBucketedSemiJoinToInnerJoin ..> ProjectNode : creates
RewriteBucketedSemiJoinToInnerJoin ..> Context : uses
RewriteBucketedSemiJoinToInnerJoin ..> Pattern : returns
RewriteBucketedSemiJoinToInnerJoin ..|> Rule
TableScanInfo --> TableScanNode
SystemSessionProperties ..> FeaturesConfig : reads defaults
SystemSessionProperties ..> Session : reads system property
PlanOptimizers ..> IterativeOptimizer : composes
IterativeOptimizer ..> RewriteBucketedSemiJoinToInnerJoin : contains rule set
IterativeOptimizer ..> Session : uses
IterativeOptimizer ..> PlanNode : rewrites
Flow diagram for rewriting bucketed semi-join to inner joinflowchart TD
SemiJoinNode[SemiJoinNode input]
ResolveSource[Resolve source plan node]
ResolveFiltering[Resolve filteringSource plan node]
FindSourceScan[Find TableScan and join column for source]
FindFilterScan[Find TableScan and join column for filteringSource]
CheckSourceBucket[Check source is bucketed by join key]
CheckFilterBucket[Check filteringSource is bucketed by join key]
CheckDistinct[Check if filteringSource output is already DISTINCT]
BuildDistinct[Wrap filteringSource in AggregationNode DISTINCT if needed]
BuildJoin[Build InnerJoinNode with DISTINCT filteringSource]
BuildProject[Build ProjectNode adding semiJoinOutput TRUE]
OutputPlan[Rewritten plan Project -> InnerJoin -> DISTINCT filteringSource]
SemiJoinNode --> ResolveSource
SemiJoinNode --> ResolveFiltering
ResolveSource --> FindSourceScan
ResolveFiltering --> FindFilterScan
FindSourceScan --> CheckSourceBucket
FindFilterScan --> CheckFilterBucket
CheckSourceBucket -->|not bucketed or scan not found| NoRewrite[Return original SemiJoinNode]
CheckFilterBucket -->|not bucketed or scan not found| NoRewrite
CheckSourceBucket -->|bucketed| CheckFilterBucket
CheckFilterBucket -->|bucketed| CheckDistinct
CheckDistinct -->|already DISTINCT| BuildJoin
CheckDistinct -->|not DISTINCT| BuildDistinct --> BuildJoin
BuildJoin --> BuildProject --> OutputPlan
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Hey - I've found 2 issues, and left some high level feedback:
- When constructing the new JoinNode you drop existing semi-join metadata (e.g., dynamic filter id, join distribution/hints, additional join criteria), which could regress behavior; consider threading through any applicable properties from the original SemiJoinNode instead of using empty optionals/defaults.
- The helper
findTableScanAndResolveVariableonly traverses Project and Filter before giving up, so the rule will silently not apply when the semi-join sides are wrapped in other common nodes (e.g., Limit, TopN, EnforceSingleRow); consider extending this traversal to handle the additional wrappers you expect to see around bucketed table scans.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- When constructing the new JoinNode you drop existing semi-join metadata (e.g., dynamic filter id, join distribution/hints, additional join criteria), which could regress behavior; consider threading through any applicable properties from the original SemiJoinNode instead of using empty optionals/defaults.
- The helper `findTableScanAndResolveVariable` only traverses Project and Filter before giving up, so the rule will silently not apply when the semi-join sides are wrapped in other common nodes (e.g., Limit, TopN, EnforceSingleRow); consider extending this traversal to handle the additional wrappers you expect to see around bucketed table scans.
## Individual Comments
### Comment 1
<location path="presto-main-base/src/test/java/com/facebook/presto/sql/planner/iterative/rule/TestRewriteBucketedSemiJoinToInnerJoin.java" line_range="271" />
<code_context>
+ }
+
+ @Test
+ public void testDoesNotFireForNonTableScanSource()
+ {
+ // source is ValuesNode, not a TableScan
</code_context>
<issue_to_address>
**suggestion (testing):** Add a symmetric negative test where the filtering side (or both sides) is a non-TableScan to cover `findTableScanAndResolveVariable` for that branch.
The source-side branch is covered by `testDoesNotFireForNonTableScanSource`, but the equivalent branch where the filtering side is a non-`TableScan` (and the source is a bucketed `TableScan`) isn’t tested. Please add a semi-join test with a non-`TableScan` filtering source (e.g., `ValuesNode`) that asserts `.doesNotFire()` to cover that path in `findTableScanAndResolveVariable`.
</issue_to_address>
### Comment 2
<location path="presto-main-base/src/test/java/com/facebook/presto/sql/planner/iterative/rule/TestRewriteBucketedSemiJoinToInnerJoin.java" line_range="130" />
<code_context>
+ }
+
+ @Test
+ public void testResultDemoShowsRewrite()
+ {
+ // Demonstrates: SemiJoin(bucketed source, bucketed filteringSource)
</code_context>
<issue_to_address>
**suggestion (testing):** Tighten the plan assertions to verify the semi-join output column is preserved and set to TRUE by the Project node.
The current positive tests (`testRewriteBucketedSemiJoinToInnerJoin` and `testResultDemoShowsRewrite`) only verify the `Project` → `InnerJoin` → `Aggregation` shape, but not that the semi-join output symbol is preserved and set to TRUE in the `ProjectNode`. Since this rule relies on `semiJoinOutput := TRUE` to preserve semantics, a regression in that assignment (wrong symbol/expression or missing output) may go unnoticed. Please extend one of these tests to assert that the project outputs include the semi-join output symbol and that it is mapped to a TRUE literal (or at least that the parent plan expects and uses that symbol), using the appropriate `PlanMatchPattern` helpers.
Suggested implementation:
```java
@Test
public void testResultDemoShowsRewrite()
{
// Demonstrates: SemiJoin(bucketed source, bucketed filteringSource)
// → Project(semiJoinOutput := TRUE) → InnerJoin → Distinct(filteringSource)
// and verifies that the semi-join output symbol is preserved and set to TRUE.
tester().assertThat(new RewriteBucketedSemiJoinToInnerJoin(tester().getMetadata()))
.setSystemProperty(REWRITE_BUCKETED_SEMI_JOIN_TO_INNER_JOIN, "true")
.on(p -> {
VariableReferenceExpression sourceKey = p.variable("sourceKey", BIGINT);
VariableReferenceExpression filterKey = p.variable("filterKey", BIGINT);
VariableReferenceExpression output = p.variable("output", BOOLEAN);
// Build the original plan with a SemiJoin that produces `output`
return p.semiJoin(
sourceKey,
filterKey,
output,
"source",
"filter",
Optional.empty(),
Optional.empty(),
Optional.empty());
})
.matches(
// After rewrite we expect:
// Project(output := TRUE, ...) → InnerJoin → Aggregation(DISTINCT filteringSource)
project(
// The project must explicitly map the semi-join output symbol to TRUE
ImmutableMap.of("output", expression("true")),
join(
INNER,
ImmutableList.of(equiJoinClause("sourceKey", "filterKey")),
Optional.empty(),
anyTree(),
aggregation(
singleGroupingSet("filteringOrderkey"),
ImmutableMap.of(),
ImmutableMap.of(),
Optional.empty(),
AggregationNode.Step.SINGLE,
anyTree()))));
```
I only saw the beginning of `testResultDemoShowsRewrite`, so the following may need adjustment to fit the existing code:
1. Ensure the `return p.semiJoin(...)` arguments (source/filter handles and optionals) match how `PlanBuilder.semiJoin` is used elsewhere in this test class or codebase.
2. The `matches(...)` tree is reconstructed from the surrounding snippet; if the existing expected pattern uses different helper methods (e.g., `aggregation(...)` wrapping `distinct()` or different symbol names), align the join/aggregation pattern accordingly.
3. Confirm you have static imports for the pattern helpers used above:
* `project`, `join`, `aggregation`, `singleGroupingSet`, `anyTree`, `equiJoinClause`, and `expression` from `PlanMatchPattern`.
4. If the test previously wrapped the `semiJoin` in a `project` node in the input plan, keep that structure and only tighten the **expected** `project(...)` in `matches(...)` to assert that `"output"` is mapped to `expression("true")` and that `"output"` is used by the parent node as appropriate.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| } | ||
|
|
||
| @Test | ||
| public void testDoesNotFireForNonTableScanSource() |
There was a problem hiding this comment.
suggestion (testing): Add a symmetric negative test where the filtering side (or both sides) is a non-TableScan to cover findTableScanAndResolveVariable for that branch.
The source-side branch is covered by testDoesNotFireForNonTableScanSource, but the equivalent branch where the filtering side is a non-TableScan (and the source is a bucketed TableScan) isn’t tested. Please add a semi-join test with a non-TableScan filtering source (e.g., ValuesNode) that asserts .doesNotFire() to cover that path in findTableScanAndResolveVariable.
| } | ||
|
|
||
| @Test | ||
| public void testResultDemoShowsRewrite() |
There was a problem hiding this comment.
suggestion (testing): Tighten the plan assertions to verify the semi-join output column is preserved and set to TRUE by the Project node.
The current positive tests (testRewriteBucketedSemiJoinToInnerJoin and testResultDemoShowsRewrite) only verify the Project → InnerJoin → Aggregation shape, but not that the semi-join output symbol is preserved and set to TRUE in the ProjectNode. Since this rule relies on semiJoinOutput := TRUE to preserve semantics, a regression in that assignment (wrong symbol/expression or missing output) may go unnoticed. Please extend one of these tests to assert that the project outputs include the semi-join output symbol and that it is mapped to a TRUE literal (or at least that the parent plan expects and uses that symbol), using the appropriate PlanMatchPattern helpers.
Suggested implementation:
@Test
public void testResultDemoShowsRewrite()
{
// Demonstrates: SemiJoin(bucketed source, bucketed filteringSource)
// → Project(semiJoinOutput := TRUE) → InnerJoin → Distinct(filteringSource)
// and verifies that the semi-join output symbol is preserved and set to TRUE.
tester().assertThat(new RewriteBucketedSemiJoinToInnerJoin(tester().getMetadata()))
.setSystemProperty(REWRITE_BUCKETED_SEMI_JOIN_TO_INNER_JOIN, "true")
.on(p -> {
VariableReferenceExpression sourceKey = p.variable("sourceKey", BIGINT);
VariableReferenceExpression filterKey = p.variable("filterKey", BIGINT);
VariableReferenceExpression output = p.variable("output", BOOLEAN);
// Build the original plan with a SemiJoin that produces `output`
return p.semiJoin(
sourceKey,
filterKey,
output,
"source",
"filter",
Optional.empty(),
Optional.empty(),
Optional.empty());
})
.matches(
// After rewrite we expect:
// Project(output := TRUE, ...) → InnerJoin → Aggregation(DISTINCT filteringSource)
project(
// The project must explicitly map the semi-join output symbol to TRUE
ImmutableMap.of("output", expression("true")),
join(
INNER,
ImmutableList.of(equiJoinClause("sourceKey", "filterKey")),
Optional.empty(),
anyTree(),
aggregation(
singleGroupingSet("filteringOrderkey"),
ImmutableMap.of(),
ImmutableMap.of(),
Optional.empty(),
AggregationNode.Step.SINGLE,
anyTree()))));I only saw the beginning of testResultDemoShowsRewrite, so the following may need adjustment to fit the existing code:
- Ensure the
return p.semiJoin(...)arguments (source/filter handles and optionals) match howPlanBuilder.semiJoinis used elsewhere in this test class or codebase. - The
matches(...)tree is reconstructed from the surrounding snippet; if the existing expected pattern uses different helper methods (e.g.,aggregation(...)wrappingdistinct()or different symbol names), align the join/aggregation pattern accordingly. - Confirm you have static imports for the pattern helpers used above:
project,join,aggregation,singleGroupingSet,anyTree,equiJoinClause, andexpressionfromPlanMatchPattern.
- If the test previously wrapped the
semiJoinin aprojectnode in the input plan, keep that structure and only tighten the expectedproject(...)inmatches(...)to assert that"output"is mapped toexpression("true")and that"output"is used by the parent node as appropriate.
3b07998 to
661b173
Compare
When both sides of a semi-join are backed by tables bucketed on the
semi-join key, rewrite the SemiJoinNode to a colocated INNER JOIN with
a DISTINCT on the build side. This avoids unnecessary data shuffles
since both sides are already co-partitioned by the join key.
The rewrite runs early (before other semi-join/join optimizers) so the
resulting JoinNode participates in downstream join ordering.
Transformation:
SemiJoin(source, filteringSource, key, semiJoinOutput)
→ Project(semiJoinOutput := TRUE)
→ InnerJoin(source, Distinct(filteringSource), key)
Changes:
- Add session property optimizer.rewrite-bucketed-semi-join-to-inner-join
(FeaturesConfig, SystemSessionProperties, TestFeaturesConfig)
- Add RewriteBucketedSemiJoinToInnerJoin optimizer rule
- Register rule in PlanOptimizers before LeftJoinNullFilterToSemiJoin
- Add 8 test cases with mock bucketed connector infrastructure
661b173 to
210667f
Compare
Summary
When both sides of a semi-join are backed by tables bucketed on the semi-join key, rewrite the SemiJoinNode to a colocated INNER JOIN with a DISTINCT on the build side. This avoids unnecessary data shuffles since both sides are already co-partitioned by the join key.
The rewrite runs early (before other semi-join/join optimizers) so the resulting JoinNode participates in downstream join ordering.
Transformation
Changes
optimizer.rewrite-bucketed-semi-join-to-inner-join(default: false)LeftJoinNullFilterToSemiJoinTest Plan
TestRewriteBucketedSemiJoinToInnerJoinwith mock bucketed connectorTestFeaturesConfigupdated for the new session propertyRelease Notes