Skip to content

Commit 196fce0

Browse files
authored
[Spark][Version Checksum][3.3] Clean up stale checksum files during cleanup (#4511)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [X] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> Backport of d47c09a to 3.3. Fixes Metadata Cleanup so that checksum files are also considered for removal. Resolves #4475 ## How was this patch tested? <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> Updated DeltaRetentionSuite ## Does this PR introduce _any_ user-facing changes? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. -->
1 parent 6b7db36 commit 196fce0

File tree

3 files changed

+31
-10
lines changed

3 files changed

+31
-10
lines changed

spark/src/main/scala/org/apache/spark/sql/delta/MetadataCleanup.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -141,8 +141,8 @@ trait MetadataCleanup extends DeltaLogging {
141141
}
142142

143143
/** Helper function for getting the version of a checkpoint or a commit. */
144-
def getDeltaFileOrCheckpointVersion(filePath: Path): Long = {
145-
require(isCheckpointFile(filePath) || isDeltaFile(filePath))
144+
def getDeltaFileChecksumOrCheckpointVersion(filePath: Path): Long = {
145+
require(isCheckpointFile(filePath) || isDeltaFile(filePath) || isChecksumFile(filePath))
146146
getFileVersion(filePath)
147147
}
148148

@@ -157,10 +157,10 @@ trait MetadataCleanup extends DeltaLogging {
157157
if (latestCheckpoint.isEmpty) return Iterator.empty
158158
val threshold = latestCheckpoint.get.version - 1L
159159
val files = store.listFrom(listingPrefix(logPath, 0), newDeltaHadoopConf())
160-
.filter(f => isCheckpointFile(f) || isDeltaFile(f))
160+
.filter(f => isCheckpointFile(f) || isDeltaFile(f) || isChecksumFile(f))
161161

162162
new BufferingLogDeletionIterator(
163-
files, fileCutOffTime, threshold, getDeltaFileOrCheckpointVersion)
163+
files, fileCutOffTime, threshold, getDeltaFileChecksumOrCheckpointVersion)
164164
}
165165

166166
/**
@@ -177,7 +177,7 @@ trait MetadataCleanup extends DeltaLogging {
177177
if (checkpointProtectionVersion <= 0) return true
178178

179179
def versionGreaterOrEqualToThreshold(file: FileStatus): Boolean =
180-
getDeltaFileOrCheckpointVersion(file.getPath) >= checkpointProtectionVersion - 1
180+
getDeltaFileChecksumOrCheckpointVersion(file.getPath) >= checkpointProtectionVersion - 1
181181

182182
val expiredDeltaLogs = listExpiredDeltaLogs(fileCutOffTime)
183183
expiredDeltaLogs.isEmpty || expiredDeltaLogs.exists(versionGreaterOrEqualToThreshold)

spark/src/test/scala/org/apache/spark/sql/delta/DeltaRetentionSuite.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@ class DeltaRetentionSuite extends QueryTest
4444
protected override def sparkConf: SparkConf = super.sparkConf
4545

4646
override protected def getLogFiles(dir: File): Seq[File] =
47-
getDeltaFiles(dir) ++ getUnbackfilledDeltaFiles(dir) ++ getCheckpointFiles(dir)
47+
getDeltaFiles(dir) ++ getUnbackfilledDeltaFiles(dir) ++ getCheckpointFiles(dir)++
48+
getCrcFiles(dir)
4849

4950
test("delete expired logs") {
5051
withTempDir { tempDir =>
@@ -80,7 +81,7 @@ class DeltaRetentionSuite extends QueryTest
8081

8182
log.checkpoint()
8283

83-
val expectedFiles = Seq("04.json", "04.checkpoint.parquet")
84+
val expectedFiles = Seq("04.json", "04.checkpoint.parquet", "04.crc")
8485
// after checkpointing, the files should be cleared
8586
log.cleanUpExpiredLogs(log.snapshot)
8687
val afterCleanup = getLogFiles(logPath)
@@ -262,7 +263,7 @@ class DeltaRetentionSuite extends QueryTest
262263
}
263264
}
264265

265-
test("the checkpoint file for version 0 should be cleaned") {
266+
test("the checkpoint and checksum for version 0 should be cleaned") {
266267
withTempDir { tempDir =>
267268
val clock = new ManualClock(getStartTimeForRetentionTest)
268269
val log = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath), clock)
@@ -289,6 +290,9 @@ class DeltaRetentionSuite extends QueryTest
289290
initialFiles.foreach { file =>
290291
assert(!afterCleanup.contains(file))
291292
}
293+
compareVersions(getCrcVersions(logPath), "checksum", Set(1))
294+
compareVersions(getFileVersions(getDeltaFiles(logPath)), "commit", Set(1))
295+
compareVersions(getFileVersions(getCheckpointFiles(logPath)), "checkpoint", Set(1))
292296
}
293297
}
294298

spark/src/test/scala/org/apache/spark/sql/delta/DeltaRetentionSuiteBase.scala

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,16 @@ trait DeltaRetentionSuiteBase extends QueryTest
8989
files.map(f => f.getName()).map(s => s.substring(0, s.indexOf(".")).toLong).toSet
9090
}
9191

92+
protected def getCrcFiles(dir: File): Seq[File] =
93+
dir.listFiles().filter(f => FileNames.isChecksumFile(new Path(f.getCanonicalPath)))
94+
95+
protected def getCrcVersions(dir: File): Set[Long] =
96+
getFileVersions(getCrcFiles(dir))
97+
98+
protected def getDeltaAndCrcFiles(dir: File): Seq[File] =
99+
getDeltaFiles(dir) ++ getCrcFiles(dir)
100+
101+
92102
protected def getDeltaVersions(dir: File): Set[Long] = {
93103
val backfilledDeltaVersions = getFileVersions(getDeltaFiles(dir))
94104
val unbackfilledDeltaVersions = getUnbackfilledDeltaVersions(dir)
@@ -171,14 +181,21 @@ trait DeltaRetentionSuiteBase extends QueryTest
171181
}
172182
if (!checkpointOnly) {
173183
val deltaPath = new Path(log.logPath, new Path(f"$version%020d.json"))
184+
val ts = day(startTime, dayNum) + version * 1000
174185
if (fs.exists(deltaPath)) {
175186
// Add some second offset so that we don't have files with same timestamps
176-
fs.setTimes(deltaPath, day(startTime, dayNum) + version * 1000, 0)
187+
fs.setTimes(deltaPath, ts, 0)
188+
}
189+
// Add the same timestamp for the crc file as well.
190+
val crcPath = new Path(log.logPath, new Path(f"$version%020d.crc"))
191+
if (fs.exists(crcPath)) {
192+
// Add some second offset so that we don't have files with same timestamps
193+
fs.setTimes(crcPath, ts, 0)
177194
}
178195
// Add the same timestamp for unbackfilled delta files as well
179196
fs.listStatus(FileNames.commitDirPath(log.logPath))
180197
.find(_.getPath.getName.startsWith(f"$version%020d"))
181-
.foreach(f => fs.setTimes(f.getPath, day(startTime, dayNum) + version * 1000, 0))
198+
.foreach(f => fs.setTimes(f.getPath, ts, 0))
182199
}
183200
}
184201

0 commit comments

Comments
 (0)