diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 5e6e0ad039258..94c2d2ffaca59 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources import java.util.Locale +import scala.collection.immutable.ListMap import scala.collection.mutable import org.apache.hadoop.fs.Path @@ -670,9 +671,10 @@ object DataSourceStrategy // A map from original Catalyst expressions to corresponding translated data source filters. // If a predicate is not in this map, it means it cannot be pushed down. val supportNestedPredicatePushdown = DataSourceUtils.supportNestedPredicatePushdown(relation) - val translatedMap: Map[Expression, Filter] = predicates.flatMap { p => + // SPARK-41636: we keep the order of the predicates to avoid CodeGenerator cache misses + val translatedMap: Map[Expression, Filter] = ListMap(predicates.flatMap { p => translateFilter(p, supportNestedPredicatePushdown).map(f => p -> f) - }.toMap + }: _*) val pushedFilters: Seq[Filter] = translatedMap.values.toSeq diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategySuite.scala index a35fb5f627145..2b9ec97bace1e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategySuite.scala @@ -324,4 +324,18 @@ class DataSourceStrategySuite extends PlanTest with SharedSparkSession { DataSourceStrategy.translateFilter(catalystFilter, true) } } + + test("SPARK-41636: selectFilters returns predicates in deterministic order") { + + val predicates = Seq(EqualTo($"id", 1), EqualTo($"id", 2), + EqualTo($"id", 3), EqualTo($"id", 4), EqualTo($"id", 5), EqualTo($"id", 6)) + + val (unhandledPredicates, pushedFilters, handledFilters) = + DataSourceStrategy.selectFilters(FakeRelation(), predicates) + assert(unhandledPredicates.equals(predicates)) + assert(pushedFilters.zipWithIndex.forall { case (f, i) => + f.equals(sources.EqualTo("id", i + 1)) + }) + assert(handledFilters.isEmpty) + } }