Skip to content

Commit 9462dcd

Browse files
Hisoka-XHyukjinKwon
authored andcommitted
[SPARK-41636][SQL] Make sure selectFilters returns predicates in deterministic order
### What changes were proposed in this pull request? Method `DataSourceStrategy#selectFilters`, which is used to determine "pushdown-able" filters, does not preserve the order of the input Seq[Expression] nor does it return the same order across the same plans. This is resulting in CodeGenerator cache misses even when the exact same LogicalPlan is executed. This PR to make sure `selectFilters` returns predicates in deterministic order. ### Why are the changes needed? Make sure `selectFilters` returns predicates in deterministic order, to reduce the probability of codegen cache misses. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? add new test. Closes #42265 from Hisoka-X/SPARK-41636_selectfilters_order. Authored-by: Jia Fan <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
1 parent a640373 commit 9462dcd

File tree

2 files changed

+18
-2
lines changed

2 files changed

+18
-2
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources
1919

2020
import java.util.Locale
2121

22+
import scala.collection.immutable.ListMap
2223
import scala.collection.mutable
2324

2425
import org.apache.hadoop.fs.Path
@@ -670,9 +671,10 @@ object DataSourceStrategy
670671
// A map from original Catalyst expressions to corresponding translated data source filters.
671672
// If a predicate is not in this map, it means it cannot be pushed down.
672673
val supportNestedPredicatePushdown = DataSourceUtils.supportNestedPredicatePushdown(relation)
673-
val translatedMap: Map[Expression, Filter] = predicates.flatMap { p =>
674+
// SPARK-41636: we keep the order of the predicates to avoid CodeGenerator cache misses
675+
val translatedMap: Map[Expression, Filter] = ListMap(predicates.flatMap { p =>
674676
translateFilter(p, supportNestedPredicatePushdown).map(f => p -> f)
675-
}.toMap
677+
}: _*)
676678

677679
val pushedFilters: Seq[Filter] = translatedMap.values.toSeq
678680

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategySuite.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,4 +324,18 @@ class DataSourceStrategySuite extends PlanTest with SharedSparkSession {
324324
DataSourceStrategy.translateFilter(catalystFilter, true)
325325
}
326326
}
327+
328+
test("SPARK-41636: selectFilters returns predicates in deterministic order") {
329+
330+
val predicates = Seq(EqualTo($"id", 1), EqualTo($"id", 2),
331+
EqualTo($"id", 3), EqualTo($"id", 4), EqualTo($"id", 5), EqualTo($"id", 6))
332+
333+
val (unhandledPredicates, pushedFilters, handledFilters) =
334+
DataSourceStrategy.selectFilters(FakeRelation(), predicates)
335+
assert(unhandledPredicates.equals(predicates))
336+
assert(pushedFilters.zipWithIndex.forall { case (f, i) =>
337+
f.equals(sources.EqualTo("id", i + 1))
338+
})
339+
assert(handledFilters.isEmpty)
340+
}
327341
}

0 commit comments

Comments
 (0)