Skip to content

Commit 0ee9fd0

Browse files
authored
[Spark] Skip reading log entries beyond endOffset, if specified while getting file changes for CDC in streaming queries (#3110)
#### Which Delta project/connector is this regarding? - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description Skip reading log entries beyond endOffset, if specified while getting file changes for CDC in streaming queries ## How was this patch tested? Existing unit tests Also verified using logs to ensure that additional Delta logs are not read ``` 24/05/16 01:21:01 INFO StateStore: StateStore stopped Run completed in 54 seconds, 237 milliseconds. Total number of tests run: 1 Suites: completed 1, aborted 0 Tests: succeeded 1, failed 0, canceled 0, ignored 0, pending 0 All tests passed. ``` Before: ``` 10457:24/05/16 01:38:37 INFO DeltaSource: [queryId = 199ce] [batchId = 0] Getting CDC dataFrame for delta_log_path=file:/tmp/spark-c3309e79-80cf-4819-8b03-2fc607cc2679/_delta_log with startVersion=0, startIndex=-100, isInitialSnapshot=false, endOffset={"sourceVersion":1,"reservoirId":"75194ee3-4ff6-431e-8e7c-32006684d5ad","reservoirVersion":1,"index":-1,"isStartingVersion":false} took timeMs=52 ms 11114:24/05/16 01:38:39 INFO DeltaSource: [queryId = 199ce] [batchId = 1] Getting CDC dataFrame for delta_log_path=file:/tmp/spark-c3309e79-80cf-4819-8b03-2fc607cc2679/_delta_log with startVersion=1, startIndex=-100, isInitialSnapshot=false, endOffset={"sourceVersion":1,"reservoirId":"75194ee3-4ff6-431e-8e7c-32006684d5ad","reservoirVersion":2,"index":-1,"isStartingVersion":false} took timeMs=25 ms 11518:24/05/16 01:38:39 INFO DeltaSource: [queryId = 199ce] [batchId = 2] Getting CDC dataFrame for delta_log_path=file:/tmp/spark-c3309e79-80cf-4819-8b03-2fc607cc2679/_delta_log with startVersion=2, startIndex=-100, isInitialSnapshot=false, endOffset={"sourceVersion":1,"reservoirId":"75194ee3-4ff6-431e-8e7c-32006684d5ad","reservoirVersion":3,"index":-1,"isStartingVersion":false} took timeMs=24 ms ``` After: ``` 10498:24/05/16 01:32:10 INFO DeltaSource: [queryId = ede3f] [batchId = 0] Getting CDC dataFrame for delta_log_path=file:/tmp/spark-270c3d6e-40df-4e6f-b1da-c293af5d6741/_delta_log with startVersion=0, startIndex=-100, isInitialSnapshot=false, endOffset={"sourceVersion":1,"reservoirId":"516bafe0-e0ea-4380-afcb-44e416302a07","reservoirVersion":1,"index":-1,"isStartingVersion":false} took timeMs=39 ms 11155:24/05/16 01:32:11 INFO DeltaSource: [queryId = ede3f] [batchId = 1] Getting CDC dataFrame for delta_log_path=file:/tmp/spark-270c3d6e-40df-4e6f-b1da-c293af5d6741/_delta_log with startVersion=1, startIndex=-100, isInitialSnapshot=false, endOffset={"sourceVersion":1,"reservoirId":"516bafe0-e0ea-4380-afcb-44e416302a07","reservoirVersion":2,"index":-1,"isStartingVersion":false} took timeMs=14 ms 11579:24/05/16 01:32:12 INFO DeltaSource: [queryId = ede3f] [batchId = 2] Getting CDC dataFrame for delta_log_path=file:/tmp/spark-270c3d6e-40df-4e6f-b1da-c293af5d6741/_delta_log with startVersion=2, startIndex=-100, isInitialSnapshot=false, endOffset={"sourceVersion":1,"reservoirId":"516bafe0-e0ea-4380-afcb-44e416302a07","reservoirVersion":3,"index":-1,"isStartingVersion":false} took timeMs=13 ms ``` Difference is even more if we are processing/reading through large number of backlog versions. In Cx setup, before the change - batches are taking > 300s. After the change, batches complete is < 15s. ## Does this PR introduce _any_ user-facing changes? No
1 parent 699df38 commit 0ee9fd0

File tree

3 files changed

+23
-8
lines changed

3 files changed

+23
-8
lines changed

spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -328,7 +328,7 @@ trait DeltaSourceBase extends Source
328328
}
329329
logInfo(s"Getting dataFrame for delta_log_path=${deltaLog.logPath} with " +
330330
s"startVersion=$startVersion, startIndex=$startIndex, " +
331-
s"isInitialSnapshot=$isInitialSnapshot took timeMs=$duration ms")
331+
s"isInitialSnapshot=$isInitialSnapshot, endOffset=$endOffset took timeMs=$duration ms")
332332
result
333333
} finally {
334334
fileActionsIter.close()

spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceCDCSupport.scala

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -216,8 +216,8 @@ trait DeltaSourceCDCSupport { self: DeltaSource =>
216216
cdcInfo.fileChangeDf
217217
}
218218
logInfo(s"Getting CDC dataFrame for delta_log_path=${deltaLog.logPath} with " +
219-
s"startVersion=$startVersion, startIndex=$startIndex, isInitialSnapshot=$isInitialSnapshot " +
220-
s"took timeMs=$duration ms")
219+
s"startVersion=$startVersion, startIndex=$startIndex, " +
220+
s"isInitialSnapshot=$isInitialSnapshot, endOffset=$endOffset took timeMs=$duration ms")
221221
result
222222
}
223223

@@ -277,6 +277,16 @@ trait DeltaSourceCDCSupport { self: DeltaSource =>
277277
}
278278
}
279279

280+
/** Verifies that provided version is <= endOffset version, if defined. */
281+
def versionLessThanEndOffset(version: Long, endOffset: Option[DeltaSourceOffset]): Boolean = {
282+
endOffset match {
283+
case Some(eo) =>
284+
version <= eo.reservoirVersion
285+
case None =>
286+
true
287+
}
288+
}
289+
280290
val (result, duration) = Utils.timeTakenMs {
281291
val iter: Iterator[(Long, IndexedChangeFileSeq)] = if (isInitialSnapshot) {
282292
// If we are reading change data from the start of the table we need to
@@ -301,9 +311,11 @@ trait DeltaSourceCDCSupport { self: DeltaSource =>
301311
}
302312

303313
// In this case, filterFiles will consume the available capacity. We use takeWhile
304-
// to stop the iteration when we reach the limit which will save us from reading
305-
// unnecessary log files.
306-
iter.takeWhile(_ => limits.forall(_.hasCapacity)).map { case (version, indexItr) =>
314+
// to stop the iteration when we reach the limit or if endOffset is specified and the
315+
// endVersion is reached which will save us from reading unnecessary log files.
316+
iter.takeWhile { case (version, _) =>
317+
limits.forall(_.hasCapacity) && versionLessThanEndOffset(version, endOffset)
318+
}.map { case (version, indexItr) =>
307319
(version, indexItr.filterFiles(fromVersion, fromIndex, limits, endOffset))
308320
}
309321
}

spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceColumnMappingSuite.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -500,8 +500,11 @@ trait ColumnMappingStreamingBlockedWorkflowSuiteBase extends ColumnMappingStream
500500
val dfStartAtZero = dropCDCFields(dsr
501501
.option(DeltaOptions.STARTING_VERSION_OPTION, "0")
502502
.load(inputDir.getCanonicalPath))
503-
checkStreamStartBlocked(
504-
dfStartAtZero, checkpointDir2, ExpectGenericSchemaIncompatibleFailure)
503+
testStream(dfStartAtZero)(
504+
StartStream(checkpointLocation = checkpointDir2.getCanonicalPath),
505+
ProcessAllAvailableIgnoreError,
506+
ExpectGenericSchemaIncompatibleFailure
507+
)
505508
} else {
506509
// In the trickier case when we rename a column and rename back, we could not
507510
// immediately detect the schema incompatibility at stream start, so we will move on.

0 commit comments

Comments
 (0)