Skip to content

Commit f2d6c8b

Browse files
authored
[Spark] Apply filters pushed down into DeltaCDFRelation (#3127)
#### Which Delta project/connector is this regarding? - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description This PR modifies `DeltaCDFRelation` to apply the filters that are pushed down into this. This enables both partition pruning and row group skipping to happen when reading the Change Data Feed. ## How was this patch tested? Unit tests ## Does this PR introduce _any_ user-facing changes? No
1 parent 0ee9fd0 commit f2d6c8b

File tree

4 files changed

+31
-5
lines changed

4 files changed

+31
-5
lines changed

spark/src/main/scala/org/apache/spark/sql/delta/commands/cdc/CDCReader.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,12 @@ import org.apache.spark.sql.delta.deletionvectors.{RoaringBitmapArray, RoaringBi
2626
import org.apache.spark.sql.delta.files.{CdcAddFileIndex, TahoeChangeFileIndex, TahoeFileIndexWithSnapshotDescriptor, TahoeRemoveFileIndex}
2727
import org.apache.spark.sql.delta.metering.DeltaLogging
2828
import org.apache.spark.sql.delta.schema.SchemaUtils
29-
import org.apache.spark.sql.delta.sources.{DeltaDataSource, DeltaSource, DeltaSQLConf}
29+
import org.apache.spark.sql.delta.sources.{DeltaDataSource, DeltaSource, DeltaSourceUtils, DeltaSQLConf}
3030
import org.apache.spark.sql.delta.storage.dv.DeletionVectorStore
3131
import org.apache.spark.sql.util.ScalaExtensions.OptionExt
3232

3333
import org.apache.spark.rdd.RDD
34-
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession, SQLContext}
34+
import org.apache.spark.sql.{Column, DataFrame, Dataset, Row, SparkSession, SQLContext}
3535
import org.apache.spark.sql.catalyst.InternalRow
3636
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Literal}
3737
import org.apache.spark.sql.catalyst.plans.logical.Statistics
@@ -147,6 +147,8 @@ object CDCReader extends CDCReaderImpl
147147

148148
override val schema: StructType = cdcReadSchema(snapshotForBatchSchema.metadata.schema)
149149

150+
override def unhandledFilters(filters: Array[Filter]): Array[Filter] = Array.empty
151+
150152
override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
151153
val df = changesToBatchDF(
152154
deltaLog,
@@ -158,7 +160,9 @@ object CDCReader extends CDCReaderImpl
158160
sqlContext.sparkSession,
159161
readSchemaSnapshot = Some(snapshotForBatchSchema))
160162

161-
df.select(requiredColumns.map(SchemaUtils.fieldNameToColumn): _*).rdd
163+
val filter = new Column(DeltaSourceUtils.translateFilters(filters))
164+
val projections = requiredColumns.map(SchemaUtils.fieldNameToColumn)
165+
df.filter(filter).select(projections: _*).rdd
162166
}
163167
}
164168

spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceUtils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,5 +103,5 @@ object DeltaSourceUtils {
103103
UnresolvedAttribute(attribute), expressions.Literal.create(s"%${value}%"))
104104
case sources.AlwaysTrue() => expressions.Literal.TrueLiteral
105105
case sources.AlwaysFalse() => expressions.Literal.FalseLiteral
106-
}.reduce(expressions.And)
106+
}.reduceOption(expressions.And).getOrElse(expressions.Literal.TrueLiteral)
107107
}

spark/src/test/scala/org/apache/spark/sql/delta/DeltaCDCSQLSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ class DeltaCDCSQLSuite extends DeltaCDCSuiteBase with DeltaColumnMappingTestUtil
115115
.withColumn("_change_type", lit("insert")))
116116
}
117117
assert(plans.map(_.executedPlan).toString
118-
.contains("PushedFilters: [IsNotNull(id), LessThan(id,5)]"))
118+
.contains("PushedFilters: [*IsNotNull(id), *LessThan(id,5)]"))
119119
}
120120
}
121121

spark/src/test/scala/org/apache/spark/sql/delta/DeltaCDCSuite.scala

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -913,6 +913,28 @@ class DeltaCDCScalaSuite extends DeltaCDCSuiteBase {
913913
}
914914
}
915915

916+
test("filters should be pushed down") {
917+
val tblName = "tbl"
918+
withTable(tblName) {
919+
createTblWithThreeVersions(tblName = Some(tblName))
920+
val plans = DeltaTestUtils.withAllPlansCaptured(spark) {
921+
val res = spark.read.format("delta")
922+
.option(DeltaOptions.CDC_READ_OPTION, "true")
923+
.option("startingVersion", 0)
924+
.option("endingVersion", 1)
925+
.table(tblName)
926+
.select("id", "_change_type")
927+
.where(col("id") < lit(5))
928+
assert(res.columns === Seq("id", "_change_type"))
929+
checkAnswer(
930+
res,
931+
spark.range(5)
932+
.withColumn("_change_type", lit("insert")))
933+
}
934+
assert(plans.map(_.executedPlan).toString
935+
.contains("PushedFilters: [*IsNotNull(id), *LessThan(id,5)]"))
936+
}
937+
}
916938

917939
test("start version or timestamp is not provided") {
918940
val tblName = "tbl"

0 commit comments

Comments
 (0)