Skip to content

Commit 57df2c0

Browse files
[Spark] Handle case when Checkpoints.findLastCompleteCheckpoint is passed MAX_VALUE (#3105)
#### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [X] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description Fixes an issue where `Checkpoints.findLastCompleteCheckpoint` goes into an almost infinite loop if it is passed a Checkpoint.MAX_VALUE. ## How was this patch tested? UT ## Does this PR introduce _any_ user-facing changes? No
1 parent 3af4335 commit 57df2c0

File tree

2 files changed

+151
-26
lines changed

2 files changed

+151
-26
lines changed

spark/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala

Lines changed: 44 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -429,10 +429,46 @@ trait Checkpoints extends DeltaLogging {
429429
*/
430430
private[delta] def findLastCompleteCheckpointBefore(
431431
checkpointInstance: Option[CheckpointInstance] = None): Option[CheckpointInstance] = {
432-
val upperBoundCv = checkpointInstance.filterNot(_.version < 0).getOrElse {
433-
logInfo(s"Try to find Delta last complete checkpoint")
434-
return findLastCompleteCheckpoint()
435-
}
432+
val eventData = mutable.Map[String, String]()
433+
val startTimeMs = System.currentTimeMillis()
434+
def sendUsageLog(): Unit = {
435+
eventData("totalTimeTakenMs") = (System.currentTimeMillis() - startTimeMs).toString
436+
recordDeltaEvent(
437+
self, opType = "delta.findLastCompleteCheckpointBefore", data = eventData.toMap)
438+
}
439+
try {
440+
val resultOpt = findLastCompleteCheckpointBeforeInternal(eventData, checkpointInstance)
441+
eventData("resultantCheckpointVersion") = resultOpt.map(_.version).getOrElse(-1L).toString
442+
sendUsageLog()
443+
resultOpt
444+
} catch {
445+
case e@(NonFatal(_) | _: InterruptedException | _: java.io.InterruptedIOException |
446+
_: java.nio.channels.ClosedByInterruptException) =>
447+
eventData("exception") = Utils.exceptionString(e)
448+
sendUsageLog()
449+
throw e
450+
}
451+
}
452+
453+
private def findLastCompleteCheckpointBeforeInternal(
454+
eventData: mutable.Map[String, String],
455+
checkpointInstance: Option[CheckpointInstance]): Option[CheckpointInstance] = {
456+
val upperBoundCv =
457+
checkpointInstance
458+
// If someone passes the upperBound as 0 or sentinel value, we should not do backward
459+
// listing. Instead we should list the entire directory from 0 and return the latest
460+
// available checkpoint.
461+
.filterNot(cv => cv.version < 0 || cv.version == CheckpointInstance.MaxValue.version)
462+
.getOrElse {
463+
logInfo(s"Try to find Delta last complete checkpoint")
464+
eventData("listingFromZero") = true.toString
465+
return findLastCompleteCheckpoint()
466+
}
467+
eventData("efficientBackwardListingEnabled") = true.toString
468+
eventData("upperBoundVersion") = upperBoundCv.version.toString
469+
eventData("upperBoundCheckpointType") = upperBoundCv.format.name
470+
var iterations: Long = 0L
471+
var numFilesScanned: Long = 0L
436472
logInfo(s"Try to find Delta last complete checkpoint before version ${upperBoundCv.version}")
437473
var listingEndVersion = upperBoundCv.version
438474

@@ -446,9 +482,12 @@ trait Checkpoints extends DeltaLogging {
446482
// |
447483
// latest checkpoint
448484
while (listingEndVersion >= 0) {
485+
iterations += 1
486+
eventData("iterations") = iterations.toString
449487
val listingStartVersion = math.max(0, listingEndVersion - 1000)
450488
val checkpoints = store
451489
.listFrom(listingPrefix(logPath, listingStartVersion), newDeltaHadoopConf())
490+
.map { file => numFilesScanned += 1 ; file }
452491
.collect {
453492
// Also collect delta files from the listing result so that the next takeWhile helps us
454493
// terminate iterator early if no checkpoint exists upto the `listingEndVersion`
@@ -471,6 +510,7 @@ trait Checkpoints extends DeltaLogging {
471510
.toArray
472511
val lastCheckpoint =
473512
getLatestCompleteCheckpointFromList(checkpoints, Some(upperBoundCv.version))
513+
eventData("numFilesScanned") = numFilesScanned.toString
474514
if (lastCheckpoint.isDefined) {
475515
logInfo(s"Delta checkpoint is found at version ${lastCheckpoint.get.version}")
476516
return lastCheckpoint
@@ -494,7 +534,6 @@ trait Checkpoints extends DeltaLogging {
494534
getLatestCompleteCheckpointFromList(files.map(f => CheckpointInstance(f.getPath)).toArray)
495535
}.foldLeft(Option.empty[CheckpointInstance])((_, right) => Some(right))
496536
// ^The foldLeft here emulates the non-existing Iterator.tailOption method.
497-
498537
}
499538

500539
/**

spark/src/test/scala/org/apache/spark/sql/delta/FindLastCompleteCheckpointSuite.scala

Lines changed: 107 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,13 @@
1616

1717
package org.apache.spark.sql.delta
1818

19+
import com.databricks.spark.util.Log4jUsageLogger
1920
import org.apache.spark.sql.delta.CheckpointInstance.Format
2021
import org.apache.spark.sql.delta.DeltaTestUtils.BOOLEAN_DOMAIN
2122
import org.apache.spark.sql.delta.managedcommit.ManagedCommitBaseSuite
2223
import org.apache.spark.sql.delta.storage.LocalLogStore
2324
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
24-
import org.apache.spark.sql.delta.util.FileNames
25+
import org.apache.spark.sql.delta.util.{FileNames, JsonUtils}
2526
import org.apache.hadoop.conf.Configuration
2627
import org.apache.hadoop.fs.{FileStatus, Path}
2728

@@ -68,6 +69,18 @@ class FindLastCompleteCheckpointSuite
6869
versions.map { version => pathToFileStatus(FileNames.checksumFile(logPath, version)) }
6970
}
7071

72+
def getLastCompleteCheckpointUsageLog(f: => Unit): Map[String, String] = {
73+
val usageRecords = Log4jUsageLogger.track {
74+
f
75+
}
76+
val opType = "delta.findLastCompleteCheckpointBefore"
77+
val records = usageRecords.filter { r =>
78+
r.tags.get("opType").contains(opType) || r.opType.map(_.typeName).contains(opType)
79+
}
80+
assert(records.size === 1)
81+
JsonUtils.fromJson[Map[String, String]](records.head.blob)
82+
}
83+
7184
test("findLastCompleteCheckpoint without any argument") {
7285
withTempDir { dir =>
7386
val log = DeltaLog.forTable(spark, dir.getAbsolutePath)
@@ -79,14 +92,20 @@ class FindLastCompleteCheckpointSuite
7992
commitFiles(logPath, 0L to 3000) ++
8093
singleCheckpointFiles(logPath, Seq(100, 200, 1000, 2000))
8194
)
82-
assert(log.findLastCompleteCheckpointBefore().contains(CheckpointInstance(version = 2000)))
95+
val eventData1 = getLastCompleteCheckpointUsageLog {
96+
assert(log.findLastCompleteCheckpointBefore().contains(CheckpointInstance(version = 2000)))
97+
}
98+
assert(!eventData1.contains("iterations"))
8399
assert(logStore.listFromCount == 1)
84100
assert(logStore.elementsConsumedFromListFromIter == 3005)
85101
logStore.reset()
86102

87103
// Case-2: No checkpoint exists in table dir
88104
logStore.customListingResult = Some(commitFiles(logPath, 0L to 3000))
89-
assert(log.findLastCompleteCheckpointBefore().isEmpty)
105+
val eventData2 = getLastCompleteCheckpointUsageLog {
106+
assert(log.findLastCompleteCheckpointBefore().isEmpty)
107+
}
108+
assert(!eventData2.contains("iterations"))
90109
assert(logStore.listFromCount == 1)
91110
assert(logStore.elementsConsumedFromListFromIter == 3001)
92111
logStore.reset()
@@ -97,8 +116,11 @@ class FindLastCompleteCheckpointSuite
97116
singleCheckpointFiles(logPath, Seq(100, 200, 1000, 2000)) ++
98117
multipartCheckpointFiles(logPath, Seq(300, 2000), numParts = 4)
99118
)
100-
assert(log.findLastCompleteCheckpointBefore().contains(
101-
CheckpointInstance(version = 2000, Format.WITH_PARTS, numParts = Some(4))))
119+
val eventData3 = getLastCompleteCheckpointUsageLog {
120+
assert(log.findLastCompleteCheckpointBefore().contains(
121+
CheckpointInstance(version = 2000, Format.WITH_PARTS, numParts = Some(4))))
122+
}
123+
assert(!eventData2.contains("iterations"))
102124
assert(logStore.listFromCount == 1)
103125
assert(logStore.elementsConsumedFromListFromIter == 3013)
104126
logStore.reset()
@@ -117,11 +139,15 @@ class FindLastCompleteCheckpointSuite
117139
commitFiles(logPath, 0L to 3000) ++
118140
singleCheckpointFiles(logPath, Seq(100, 200, 1000, 2000))
119141
)
120-
assert(
121-
log.findLastCompleteCheckpointBefore(Some(CheckpointInstance(version = 2000)))
122-
.contains(CheckpointInstance(version = 1000)))
142+
val eventData1 = getLastCompleteCheckpointUsageLog {
143+
assert(
144+
log.findLastCompleteCheckpointBefore(Some(CheckpointInstance(version = 2000)))
145+
.contains(CheckpointInstance(version = 1000)))
146+
}
123147
assert(logStore.listFromCount == 1)
124148
assert(logStore.elementsConsumedFromListFromIter == 1002 + 2) // commits + checkpoint
149+
assert(eventData1("iterations") == "1")
150+
assert(eventData1("numFilesScanned") == "1004")
125151
logStore.reset()
126152

127153
// Case-2: The exact upperBound (a multi-part checkpoint) doesn't exist but another single
@@ -132,10 +158,14 @@ class FindLastCompleteCheckpointSuite
132158
)
133159
var sentinelCheckpoint =
134160
CheckpointInstance(version = 2000, Format.WITH_PARTS, numParts = Some(4))
135-
assert(log.findLastCompleteCheckpointBefore(Some(sentinelCheckpoint))
136-
.contains(CheckpointInstance(version = 2000)))
161+
val eventData2 = getLastCompleteCheckpointUsageLog {
162+
assert(log.findLastCompleteCheckpointBefore(Some(sentinelCheckpoint))
163+
.contains(CheckpointInstance(version = 2000)))
164+
}
137165
assert(logStore.listFromCount == 1)
138166
assert(logStore.elementsConsumedFromListFromIter == 1002 + 2) // commits + checkpoint
167+
assert(eventData2("iterations") == "1")
168+
assert(eventData2("numFilesScanned") == "1004")
139169
logStore.reset()
140170

141171
// Case-3: The last complete checkpoint doesn't exist in last 1000 elements and needs
@@ -144,14 +174,17 @@ class FindLastCompleteCheckpointSuite
144174
commitFiles(logPath, 0L to 2500) ++
145175
singleCheckpointFiles(logPath, Seq(100, 150))
146176
)
147-
assert(
148-
log.findLastCompleteCheckpointBefore(2200)
149-
.contains(CheckpointInstance(version = 150)))
177+
val eventData3 = getLastCompleteCheckpointUsageLog {
178+
assert(
179+
log.findLastCompleteCheckpointBefore(2200).contains(CheckpointInstance(version = 150)))
180+
}
150181
assert(logStore.listFromCount == 3)
151182
// the first listing will consume 1000 elements from 1200 to 2201 => 1002 commits
152183
// the second listing will consume 1000 elements from 200 to 1201 => 1002 commits
153184
// the third listing will consume 501 elements from 0 to 201 => 202 commits + 2 checkpoints
154185
assert(logStore.elementsConsumedFromListFromIter == 2208) // commits + checkpoint
186+
assert(eventData3("iterations") == "3")
187+
assert(eventData3("numFilesScanned") == "2208")
155188
logStore.reset()
156189
}
157190
}
@@ -186,12 +219,22 @@ class FindLastCompleteCheckpointSuite
186219
commitFiles(logPath, 0L to lastCommitVersion) ++
187220
singleCheckpointFiles(logPath, Seq(100), length = 20) ++
188221
singleCheckpointFiles(logPath, Seq(200), length = 0))
189-
assert(
190-
log.findLastCompleteCheckpointBefore(sentinelInstance)
191-
.contains(CheckpointInstance(version = 100)))
222+
val eventData1 = getLastCompleteCheckpointUsageLog {
223+
assert(
224+
log.findLastCompleteCheckpointBefore(sentinelInstance)
225+
.contains(CheckpointInstance(version = 100)))
226+
}
192227
assert(logStore.listFromCount == expectedListCount)
193228
assert(logStore.elementsConsumedFromListFromIter ===
194229
getExpectedFileCount(filesPerCheckpoint = 1))
230+
if (passSentinelInstance) {
231+
assert(eventData1("iterations") == expectedListCount.toString)
232+
assert(eventData1("numFilesScanned") ==
233+
getExpectedFileCount(filesPerCheckpoint = 1).toString)
234+
} else {
235+
assert(Seq("iterations", "numFilesScanned").forall(!eventData1.contains(_)))
236+
}
237+
195238
logStore.reset()
196239

197240
// Case-2: `findLastCompleteCheckpointBefore` invoked with upperBound, with a multi-part
@@ -206,8 +249,17 @@ class FindLastCompleteCheckpointSuite
206249
multipartCheckpointFiles(logPath, Seq(100), numParts = 4) ++
207250
badCheckpointV200
208251
)
209-
assert(log.findLastCompleteCheckpointBefore(sentinelInstance)
210-
.contains(CheckpointInstance(version = 100, Format.WITH_PARTS, numParts = Some(4))))
252+
val eventData2 = getLastCompleteCheckpointUsageLog {
253+
assert(log.findLastCompleteCheckpointBefore(sentinelInstance)
254+
.contains(CheckpointInstance(version = 100, Format.WITH_PARTS, numParts = Some(4))))
255+
}
256+
if (passSentinelInstance) {
257+
assert(eventData2("iterations") == expectedListCount.toString)
258+
assert(eventData2("numFilesScanned") ==
259+
getExpectedFileCount(filesPerCheckpoint = 4).toString)
260+
} else {
261+
assert(Seq("iterations", "numFilesScanned").forall(!eventData2.contains(_)))
262+
}
211263
assert(logStore.listFromCount == expectedListCount)
212264
assert(logStore.elementsConsumedFromListFromIter ===
213265
getExpectedFileCount(filesPerCheckpoint = 4))
@@ -245,16 +297,50 @@ class FindLastCompleteCheckpointSuite
245297
commitFiles(logPath, 0L to lastCommitVersion) ++
246298
multipartCheckpointFiles(logPath, Seq(100), numParts = 4, length = 20) ++
247299
multipartCheckpointFiles(logPath, Seq(200), numParts = 4, length = 20).take(3))
248-
assert(
249-
log.findLastCompleteCheckpointBefore(sentinelInstance)
250-
.contains(CheckpointInstance(100, Format.WITH_PARTS, numParts = Some(4))))
300+
val eventData1 = getLastCompleteCheckpointUsageLog {
301+
assert(
302+
log.findLastCompleteCheckpointBefore(sentinelInstance)
303+
.contains(CheckpointInstance(100, Format.WITH_PARTS, numParts = Some(4))))
304+
}
251305
assert(logStore.listFromCount == expectedListCount)
252306
assert(logStore.elementsConsumedFromListFromIter ===
253307
getExpectedFileCount(fileInCheckpointV200 = 3, filesInCheckpointV100 = 4))
308+
if (passSentinelInstance) {
309+
assert(eventData1("iterations") == expectedListCount.toString)
310+
assert(eventData1("numFilesScanned") ==
311+
getExpectedFileCount(fileInCheckpointV200 = 3, filesInCheckpointV100 = 4).toString)
312+
} else {
313+
assert(Seq("iterations", "numFilesScanned").forall(!eventData1.contains(_)))
314+
}
315+
254316
logStore.reset()
255317
}
256318
}
257319

320+
test("findLastCompleteCheckpoint with CheckpointInstance.MAX value") {
321+
withTempDir { dir =>
322+
val log = DeltaLog.forTable(spark, dir.getAbsolutePath)
323+
val logPath = log.logPath
324+
val logStore = log.store.asInstanceOf[CustomListingLogStore]
325+
logStore.reset()
326+
327+
logStore.customListingResult = Some(
328+
commitFiles(logPath, 0L to 3000) ++
329+
singleCheckpointFiles(logPath, Seq(100, 200, 1000, 1200))
330+
)
331+
val eventData = getLastCompleteCheckpointUsageLog {
332+
assert(
333+
log.findLastCompleteCheckpointBefore(Some(CheckpointInstance.MaxValue))
334+
.contains(CheckpointInstance(version = 1200)))
335+
}
336+
assert(!eventData.contains("iterations"))
337+
assert(!eventData.contains("upperBoundVersion"))
338+
assert(eventData("totalTimeTakenMs").toLong > 0)
339+
assert(logStore.listFromCount == 1)
340+
assert(logStore.elementsConsumedFromListFromIter == 3001 + 4) // commits + checkpoint
341+
logStore.reset()
342+
}
343+
}
258344
}
259345

260346
/**

0 commit comments

Comments
 (0)