Skip to content

Commit 52b6be2

Browse files
authored
fix hanging behavior for constructDataFilters with expression like EqualTo(Literal, Literal) (#3059)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description This fixes the hanging behavior for `constructDataFilters` with expressions like `EqualTo(Literal, Literal)`. This is due to infinite recursion caused by cases like `case EqualTo(v: Literal, a) => constructDataFilters(EqualTo(a, v))`. This also fixes the same behavior for `EqualTo, Not(EqualTo), EqualNullSafe, LessThan, LessThanOrEqual, GreaterThan, GreaterThanOrEqual` ## How was this patch tested? <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> new UT `DataSkippingDeltaConstructDataFiltersSuite` ## Does this PR introduce _any_ user-facing changes? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. --> No
1 parent 5efee74 commit 52b6be2

File tree

2 files changed

+120
-1
lines changed

2 files changed

+120
-1
lines changed

spark/src/main/scala/org/apache/spark/sql/delta/stats/DataSkippingReader.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -380,8 +380,11 @@ trait DataSkippingReaderBase
380380
* manifest as NULL). That case is handled separately by `verifyStatsForFilter` (which disables
381381
* skipping for any file that lacks the needed stats columns).
382382
*/
383-
private def constructDataFilters(dataFilter: Expression):
383+
private[stats] def constructDataFilters(dataFilter: Expression):
384384
Option[DataSkippingPredicate] = dataFilter match {
385+
// Expressions that contain only literals are not eligible for skipping.
386+
case cmp: Expression if cmp.children.forall(areAllLeavesLiteral) => None
387+
385388
// Push skipping predicate generation through the AND:
386389
//
387390
// constructDataFilters(AND(a, b))
@@ -572,6 +575,12 @@ trait DataSkippingReaderBase
572575
case _ => None
573576
}
574577

578+
private def areAllLeavesLiteral(e: Expression): Boolean = e match {
579+
case _: Literal => true
580+
case _ if e.children.nonEmpty => e.children.forall(areAllLeavesLiteral)
581+
case _ => false
582+
}
583+
575584
/**
576585
* An extractor that matches expressions that are eligible for data skipping predicates.
577586
*
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* Copyright (2021) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.apache.spark.sql.delta.stats
18+
19+
import org.apache.spark.sql.delta.DeltaLog
20+
21+
import org.apache.spark.sql.QueryTest
22+
import org.apache.spark.sql.catalyst.expressions._
23+
import org.apache.spark.sql.test.SharedSparkSession
24+
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
25+
import org.apache.spark.sql.types.StringType
26+
27+
class DataSkippingDeltaConstructDataFiltersSuite
28+
extends QueryTest with SharedSparkSession with DeltaSQLCommandTest {
29+
test("Verify constructDataFilters doesn't hang for expressions with Literal operands.") {
30+
val snapshot = DeltaLog.forTable(spark, "dummy_path").update()
31+
val dataFilterBuilder = new snapshot.DataFiltersBuilder(
32+
spark, DeltaDataSkippingType.dataSkippingOnlyV1)
33+
34+
val literal = Literal.create("foo", StringType)
35+
Seq(
36+
EqualTo(literal, literal),
37+
Not(EqualTo(literal, literal)),
38+
EqualNullSafe(literal, literal),
39+
Not(EqualNullSafe(literal, literal)),
40+
LessThan(literal, literal),
41+
LessThanOrEqual(literal, literal),
42+
GreaterThan(literal, literal),
43+
GreaterThanOrEqual(literal, literal),
44+
Not(GreaterThanOrEqual(literal, literal)),
45+
In(literal, Seq(literal)),
46+
IsNull(literal),
47+
IsNotNull(literal),
48+
And(EqualTo(literal, literal), LessThan(literal, literal))
49+
).foreach { expression =>
50+
assert(dataFilterBuilder.constructDataFilters(expression).isEmpty)
51+
}
52+
}
53+
54+
test("Test when the query contains EqualTo(Literal, Literal) in the filter.") {
55+
setup {
56+
sql(
57+
"""
58+
|explain
59+
|select
60+
| *
61+
|from
62+
| view1 c
63+
| join view2 cv on c.type=cv.type and c.key=cv.key
64+
| join tbl3 b on cv.name=b.name
65+
|where
66+
| (
67+
| (b.name="name1" and c.type="foo")
68+
| or
69+
| (b.name="name2" and c.type="bar")
70+
| )
71+
|""".stripMargin)
72+
}
73+
}
74+
75+
private def setup(f: => Unit) {
76+
withTable("tbl1_foo", "tbl1_bar", "tbl2_foo", "tbl2_bar", "tbl3") {
77+
Seq("foo", "bar").foreach { tableType =>
78+
sql(s"CREATE TABLE tbl1_$tableType (key STRING) USING delta")
79+
sql(s"CREATE TABLE tbl2_$tableType (key STRING, name STRING) USING delta")
80+
}
81+
sql("CREATE TABLE tbl3 (name STRING) USING delta")
82+
83+
withView("view1", "view2") {
84+
sql(
85+
s"""
86+
|CREATE VIEW view1 (type, key)
87+
|AS (
88+
| select 'foo' as type, * from tbl1_foo
89+
| union all
90+
| select 'bar' as type, * from tbl1_bar
91+
|)
92+
|""".stripMargin
93+
)
94+
95+
sql(
96+
s"""
97+
|CREATE VIEW view2 (type, key, name)
98+
|AS (
99+
| select 'foo' as type, * from tbl2_foo
100+
| union all
101+
| select 'bar' as type, * from tbl2_bar
102+
|)
103+
|""".stripMargin
104+
)
105+
106+
f
107+
}
108+
}
109+
}
110+
}

0 commit comments

Comments
 (0)