Skip to content

Commit 1afa48e

Browse files
authored
[kernel]Implement file skipping for nullSafeEquals (#4013)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [ ] Spark - [ ] Standalone - [ ] Flink - [x] Kernel - [ ] Other (fill in here) ## Description This PR supports data skipping for `<=>` and addresses #2538 ideas fork the one from spark's data skipping reader: 1.Rewrite `EqualNullSafe(a, NotNullLiteral)` as`And(IsNotNull(a), EqualTo(a, NotNullLiteral))` 2.rewrite `EqualNullSafe(a, null)` as `IsNull(a)` https://github.com/delta-io/delta/blob/master/spark/src/main/scala/org/apache/spark/sql/delta/stats/DataSkippingReader.scala#L508-L510 <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> ## How was this patch tested? <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> ScanSuite.scala, adjusted test cases according to comments. for a <=> 1 in all null case, follows https://github.com/delta-io/delta/blob/master/spark/src/test/scala/org/apache/spark/sql/delta/stats/DataSkippingDeltaTests.scala#L735 ## Does this PR introduce _any_ user-facing changes? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. --> No
1 parent 95e1826 commit 1afa48e

File tree

3 files changed

+119
-62
lines changed

3 files changed

+119
-62
lines changed

kernel/kernel-api/src/main/java/io/delta/kernel/internal/skipping/DataSkippingUtils.java

Lines changed: 90 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static io.delta.kernel.internal.InternalScanFileUtils.ADD_FILE_ORDINAL;
2020
import static io.delta.kernel.internal.InternalScanFileUtils.ADD_FILE_STATS_ORDINAL;
2121
import static io.delta.kernel.internal.util.ExpressionUtils.*;
22+
import static io.delta.kernel.internal.util.Preconditions.checkArgument;
2223

2324
import io.delta.kernel.data.ColumnVector;
2425
import io.delta.kernel.data.ColumnarBatch;
@@ -29,6 +30,7 @@
2930
import io.delta.kernel.types.StructField;
3031
import io.delta.kernel.types.StructType;
3132
import java.util.*;
33+
import java.util.function.BiFunction;
3234

3335
public class DataSkippingUtils {
3436

@@ -254,6 +256,7 @@ private static Optional<DataSkippingPredicate> constructDataSkippingFilter(
254256
case "<=":
255257
case ">":
256258
case ">=":
259+
case "IS NOT DISTINCT FROM":
257260
Expression left = getLeft(dataFilters);
258261
Expression right = getRight(dataFilters);
259262

@@ -262,9 +265,8 @@ private static Optional<DataSkippingPredicate> constructDataSkippingFilter(
262265
Literal rightLit = (Literal) right;
263266
if (schemaHelper.isSkippingEligibleMinMaxColumn(leftCol)
264267
&& schemaHelper.isSkippingEligibleLiteral(rightLit)) {
265-
return Optional.of(
266-
constructComparatorDataSkippingFilters(
267-
dataFilters.getName(), leftCol, rightLit, schemaHelper));
268+
return constructComparatorDataSkippingFilters(
269+
dataFilters.getName(), leftCol, rightLit, schemaHelper);
268270
}
269271
} else if (right instanceof Column && left instanceof Literal) {
270272
return constructDataSkippingFilter(reverseComparatorFilter(dataFilters), schemaHelper);
@@ -281,40 +283,47 @@ private static Optional<DataSkippingPredicate> constructDataSkippingFilter(
281283
}
282284

283285
/** Construct the skipping predicate for a given comparator */
284-
private static DataSkippingPredicate constructComparatorDataSkippingFilters(
286+
private static Optional<DataSkippingPredicate> constructComparatorDataSkippingFilters(
285287
String comparator, Column leftCol, Literal rightLit, StatsSchemaHelper schemaHelper) {
286288

287289
switch (comparator.toUpperCase(Locale.ROOT)) {
288290

289291
// Match any file whose min/max range contains the requested point.
290292
case "=":
291293
// For example a = 1 --> minValue.a <= 1 AND maxValue.a >= 1
292-
return new DataSkippingPredicate(
293-
"AND",
294-
constructBinaryDataSkippingPredicate(
295-
"<=", schemaHelper.getMinColumn(leftCol), rightLit),
296-
constructBinaryDataSkippingPredicate(
297-
">=", schemaHelper.getMaxColumn(leftCol), rightLit));
294+
return Optional.of(
295+
new DataSkippingPredicate(
296+
"AND",
297+
constructBinaryDataSkippingPredicate(
298+
"<=", schemaHelper.getMinColumn(leftCol), rightLit),
299+
constructBinaryDataSkippingPredicate(
300+
">=", schemaHelper.getMaxColumn(leftCol), rightLit)));
298301

299302
// Match any file whose min is less than the requested upper bound.
300303
case "<":
301-
return constructBinaryDataSkippingPredicate(
302-
"<", schemaHelper.getMinColumn(leftCol), rightLit);
304+
return Optional.of(
305+
constructBinaryDataSkippingPredicate(
306+
"<", schemaHelper.getMinColumn(leftCol), rightLit));
303307

304308
// Match any file whose min is less than or equal to the requested upper bound
305309
case "<=":
306-
return constructBinaryDataSkippingPredicate(
307-
"<=", schemaHelper.getMinColumn(leftCol), rightLit);
310+
return Optional.of(
311+
constructBinaryDataSkippingPredicate(
312+
"<=", schemaHelper.getMinColumn(leftCol), rightLit));
308313

309314
// Match any file whose max is larger than the requested lower bound.
310315
case ">":
311-
return constructBinaryDataSkippingPredicate(
312-
">", schemaHelper.getMaxColumn(leftCol), rightLit);
316+
return Optional.of(
317+
constructBinaryDataSkippingPredicate(
318+
">", schemaHelper.getMaxColumn(leftCol), rightLit));
313319

314320
// Match any file whose max is larger than or equal to the requested lower bound.
315321
case ">=":
316-
return constructBinaryDataSkippingPredicate(
317-
">=", schemaHelper.getMaxColumn(leftCol), rightLit);
322+
return Optional.of(
323+
constructBinaryDataSkippingPredicate(
324+
">=", schemaHelper.getMaxColumn(leftCol), rightLit));
325+
case "IS NOT DISTINCT FROM":
326+
return constructDataSkippingFilter(rewriteEqualNullSafe(leftCol, rightLit), schemaHelper);
318327
default:
319328
throw new IllegalArgumentException(
320329
String.format("Unsupported comparator expression %s", comparator));
@@ -342,6 +351,7 @@ private static DataSkippingPredicate constructBinaryDataSkippingPredicate(
342351
put("<=", ">=");
343352
put(">", "<");
344353
put(">=", "<=");
354+
put("IS NOT DISTINCT FROM", "IS NOT DISTINCT FROM");
345355
}
346356
};
347357

@@ -402,29 +412,21 @@ private static Optional<DataSkippingPredicate> constructNotDataSkippingFilters(
402412
new Predicate("IS_NOT_NULL", getUnaryChild(childPredicate)), schemaHelper);
403413

404414
case "=":
405-
Expression left = getLeft(childPredicate);
406-
Expression right = getRight(childPredicate);
407-
if (left instanceof Column && right instanceof Literal) {
408-
Column leftCol = (Column) left;
409-
Literal rightLit = (Literal) right;
410-
if (schemaHelper.isSkippingEligibleMinMaxColumn(leftCol)
411-
&& schemaHelper.isSkippingEligibleLiteral(rightLit)) {
412-
// Match any file whose min/max range contains anything other than the
413-
// rejected point.
414-
// For example a != 1 --> minValue.a < 1 OR maxValue.a > 1
415-
return Optional.of(
416-
new DataSkippingPredicate(
417-
"OR",
418-
constructBinaryDataSkippingPredicate(
419-
"<", schemaHelper.getMinColumn(leftCol), rightLit),
420-
constructBinaryDataSkippingPredicate(
421-
">", schemaHelper.getMaxColumn(leftCol), rightLit)));
422-
}
423-
} else if (right instanceof Column && left instanceof Literal) {
424-
return constructDataSkippingFilter(
425-
new Predicate("NOT", new Predicate("=", right, left)), schemaHelper);
426-
}
427-
break;
415+
return constructDataSkippingFiltersForNotEqual(
416+
childPredicate,
417+
schemaHelper,
418+
(leftColumn, rightLiteral) -> {
419+
// Match any file whose min/max range contains anything other than the
420+
// rejected point.
421+
// For example a != 1 --> minValue.a < 1 OR maxValue.a > 1
422+
return Optional.of(
423+
new DataSkippingPredicate(
424+
"OR",
425+
constructBinaryDataSkippingPredicate(
426+
"<", schemaHelper.getMinColumn(leftColumn), rightLiteral),
427+
constructBinaryDataSkippingPredicate(
428+
">", schemaHelper.getMaxColumn(leftColumn), rightLiteral)));
429+
});
428430
case "<":
429431
return constructDataSkippingFilter(
430432
new Predicate(">=", childPredicate.getChildren()), schemaHelper);
@@ -437,6 +439,14 @@ private static Optional<DataSkippingPredicate> constructNotDataSkippingFilters(
437439
case ">=":
438440
return constructDataSkippingFilter(
439441
new Predicate("<", childPredicate.getChildren()), schemaHelper);
442+
case "IS NOT DISTINCT FROM":
443+
return constructDataSkippingFiltersForNotEqual(
444+
childPredicate,
445+
schemaHelper,
446+
(leftColumn, rightLiteral) ->
447+
constructDataSkippingFilter(
448+
new Predicate("NOT", rewriteEqualNullSafe(leftColumn, rightLiteral)),
449+
schemaHelper));
440450
case "NOT":
441451
// Remove redundant pairs of NOT
442452
return constructDataSkippingFilter(
@@ -510,4 +520,43 @@ private static String[] appendArray(String[] arr, String appendElem) {
510520
newNames[arr.length] = appendElem;
511521
return newNames;
512522
}
523+
524+
/**
525+
* Rewrite `EqualNullSafe(a, NotNullLiteral)` as `And(IsNotNull(a), EqualTo(a, NotNullLiteral))`
526+
* and rewrite `EqualNullSafe(a, null)` as `IsNull(a)`
527+
*/
528+
private static Predicate rewriteEqualNullSafe(Column leftCol, Literal rightLit) {
529+
if (rightLit.getValue() == null) {
530+
return new Predicate("IS_NULL", leftCol);
531+
}
532+
return new Predicate(
533+
"AND", new Predicate("IS_NOT_NULL", leftCol), new Predicate("=", leftCol, rightLit));
534+
}
535+
536+
/** Helper method for building DataSkippingPredicate for NOT =/IS NOT DISTINCT FROM */
537+
private static Optional<DataSkippingPredicate> constructDataSkippingFiltersForNotEqual(
538+
Predicate equalPredicate,
539+
StatsSchemaHelper schemaHelper,
540+
BiFunction<Column, Literal, Optional<DataSkippingPredicate>> buildDataSkippingPredicateFunc) {
541+
checkArgument(
542+
"=".equals(equalPredicate.getName())
543+
|| "IS NOT DISTINCT FROM".equals(equalPredicate.getName()),
544+
"Expects predicate to be = or IS NOT DISTINCT FROM");
545+
Expression leftChild = getLeft(equalPredicate);
546+
Expression rightChild = getRight(equalPredicate);
547+
if (rightChild instanceof Column && leftChild instanceof Literal) {
548+
return constructDataSkippingFilter(
549+
new Predicate("NOT", new Predicate(equalPredicate.getName(), rightChild, leftChild)),
550+
schemaHelper);
551+
}
552+
if (leftChild instanceof Column && rightChild instanceof Literal) {
553+
Column leftCol = (Column) leftChild;
554+
Literal rightLit = (Literal) rightChild;
555+
if (schemaHelper.isSkippingEligibleMinMaxColumn(leftCol)
556+
&& schemaHelper.isSkippingEligibleLiteral(rightLit)) {
557+
return buildDataSkippingPredicateFunc.apply(leftCol, rightLit);
558+
}
559+
}
560+
return Optional.empty();
561+
}
513562
}

kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/ScanSuite.scala

Lines changed: 25 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -192,10 +192,7 @@ class ScanSuite extends AnyFunSuite with TestUtils with ExpressionTestUtils with
192192
nullSafeEquals(ofInt(1), col("a")), // 1 <=> a
193193
not(nullSafeEquals(col("a"), ofInt(2))), // NOT a <=> 2
194194
// MOVE BELOW EXPRESSIONS TO MISSES ONCE SUPPORTED BY DATA SKIPPING
195-
not(nullSafeEquals(col("a"), ofInt(1))), // NOT a <=> 1
196-
nullSafeEquals(col("a"), ofInt(2)), // a <=> 2
197195
notEquals(col("a"), ofInt(1)), // a != 1
198-
nullSafeEquals(col("a"), ofInt(2)), // a <=> 2
199196
notEquals(ofInt(1), col("a")) // 1 != a
200197
),
201198
misses = Seq(
@@ -210,7 +207,11 @@ class ScanSuite extends AnyFunSuite with TestUtils with ExpressionTestUtils with
210207
lessThanOrEqual(ofInt(2), col("a")), // 2 <= a
211208
greaterThanOrEqual(ofInt(0), col("a")), // 0 >= a
212209
not(equals(col("a"), ofInt(1))), // NOT a = 1
213-
not(equals(ofInt(1), col("a"))) // NOT 1 = a
210+
not(equals(ofInt(1), col("a"))), // NOT 1 = a
211+
not(nullSafeEquals(col("a"), ofInt(1))), // NOT a <=> 1
212+
not(nullSafeEquals(ofInt(1), col("a"))), // NOT 1 <=> a
213+
nullSafeEquals(ofInt(2), col("a")), // 2 <=> a
214+
nullSafeEquals(col("a"), ofInt(2)) // a <=> 2
214215
)
215216
)
216217

@@ -762,15 +763,12 @@ class ScanSuite extends AnyFunSuite with TestUtils with ExpressionTestUtils with
762763
lessThan(col("a"), ofInt(1)),
763764
greaterThan(col("a"), ofInt(1)),
764765
not(equals(col("a"), ofInt(1))),
765-
notEquals(col("a"), ofInt(1)),
766-
nullSafeEquals(col("a"), ofInt(1)),
767-
768-
// MOVE BELOW EXPRESSIONS TO MISSES ONCE SUPPORTED BY DATA SKIPPING
769-
// This can be optimized to `IsNotNull(a)` (done by NullPropagation in Spark)
770-
not(nullSafeEquals(col("a"), ofNull(INTEGER)))
766+
notEquals(col("a"), ofInt(1))
771767
),
772768
misses = Seq(
773769
AlwaysFalse.ALWAYS_FALSE,
770+
nullSafeEquals(col("a"), ofInt(1)),
771+
not(nullSafeEquals(col("a"), ofNull(INTEGER))),
774772
isNotNull(col("a"))
775773
)
776774
)
@@ -1054,10 +1052,9 @@ class ScanSuite extends AnyFunSuite with TestUtils with ExpressionTestUtils with
10541052
expNumPartitions = 1,
10551053
expNumFiles = 3) // 3 files with key = null
10561054

1057-
/*
1058-
NOT YET SUPPORTED EXPRESSIONS
1055+
10591056
checkResults(
1060-
predicate = nullSafeEquals(col("key"), ofNull(string)),
1057+
predicate = nullSafeEquals(col("key"), ofNull(STRING)),
10611058
expNumPartitions = 1,
10621059
expNumFiles = 3) // 3 files with key = null
10631060

@@ -1070,7 +1067,6 @@ class ScanSuite extends AnyFunSuite with TestUtils with ExpressionTestUtils with
10701067
predicate = nullSafeEquals(col("key"), ofString("b")),
10711068
expNumPartitions = 1,
10721069
expNumFiles = 1) // 1 files with key <=> 'b'
1073-
*/
10741070

10751071
// Conditions on partitions keys and values
10761072
checkResults(
@@ -1086,7 +1082,12 @@ class ScanSuite extends AnyFunSuite with TestUtils with ExpressionTestUtils with
10861082
checkResults(
10871083
predicate = nullSafeEquals(col("value"), ofNull(STRING)),
10881084
expNumPartitions = 3,
1089-
expNumFiles = 5) // should be 3 once <=> is supported
1085+
expNumFiles = 3)
1086+
1087+
checkResults(
1088+
predicate = nullSafeEquals(ofNull(STRING), col("value")),
1089+
expNumPartitions = 3,
1090+
expNumFiles = 3)
10901091

10911092
checkResults(
10921093
predicate = equals(col("value"), ofString("a")),
@@ -1095,8 +1096,13 @@ class ScanSuite extends AnyFunSuite with TestUtils with ExpressionTestUtils with
10951096

10961097
checkResults(
10971098
predicate = nullSafeEquals(col("value"), ofString("a")),
1098-
expNumPartitions = 3, // should be 2 once <=> is supported
1099-
expNumFiles = 5) // should be 2 once <=> is supported
1099+
expNumPartitions = 2,
1100+
expNumFiles = 2)
1101+
1102+
checkResults(
1103+
predicate = nullSafeEquals(ofString("a"), col("value")),
1104+
expNumPartitions = 2,
1105+
expNumFiles = 2)
11001106

11011107
checkResults(
11021108
predicate = notEquals(col("value"), ofString("a")),
@@ -1110,8 +1116,8 @@ class ScanSuite extends AnyFunSuite with TestUtils with ExpressionTestUtils with
11101116

11111117
checkResults(
11121118
predicate = nullSafeEquals(col("value"), ofString("b")),
1113-
expNumPartitions = 3, // should be 1 once <=> is supported
1114-
expNumFiles = 5) // should be 1 once <=> is supported
1119+
expNumPartitions = 1,
1120+
expNumFiles = 1)
11151121

11161122
// Conditions on both, partition keys and values
11171123
/*

kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/utils/ExpressionTestUtils.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,10 @@ trait ExpressionTestUtils {
5959

6060
def str(value: String): Literal = Literal.ofString(value)
6161

62+
def nullSafeEquals(e1: Expression, e2: Expression): Predicate = {
63+
new Predicate("IS NOT DISTINCT FROM", e1, e2)
64+
}
65+
6266
def unsupported(colName: String): Predicate = predicate("UNSUPPORTED", col(colName));
6367

6468
/* ---------- NOT-YET SUPPORTED EXPRESSIONS ----------- */
@@ -70,8 +74,6 @@ trait ExpressionTestUtils {
7074
them to expect skipped files. If they are ever actually evaluated they will throw an exception.
7175
*/
7276

73-
def nullSafeEquals(e1: Expression, e2: Expression): Predicate = new Predicate("<=>", e1, e2)
74-
7577
def notEquals(e1: Expression, e2: Expression): Predicate = new Predicate("<>", e1, e2)
7678

7779
def startsWith(e1: Expression, e2: Expression): Predicate = new Predicate("STARTS_WITH", e1, e2)

0 commit comments

Comments
 (0)