Skip to content

Commit 6859c86

Browse files
prakharjain09vkorukanti
authored andcommitted
[Spark] Read side changes for v2 checkpoints
This PR adds read side changes for v2 checkpoints. Closes #2056 GitOrigin-RevId: 3673bb576aed5e1b572f2dfc4b69e829ae9555a6
1 parent 4622db6 commit 6859c86

File tree

11 files changed

+803
-24
lines changed

11 files changed

+803
-24
lines changed

spark/src/main/scala/org/apache/spark/sql/delta/CheckpointProvider.scala

Lines changed: 370 additions & 4 deletions
Large diffs are not rendered by default.

spark/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala

Lines changed: 66 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,17 +54,28 @@ import org.apache.spark.util.Utils
5454
/**
5555
* A class to help with comparing checkpoints with each other, where we may have had concurrent
5656
* writers that checkpoint with different number of parts.
57+
* The `numParts` field will be present only for multipart checkpoints (represented by
58+
* Format.WITH_PARTS).
59+
* The `fileName` field is present only for V2 Checkpoints (represented by Format.V2)
60+
* These additional fields are used as a tie breaker when comparing multiple checkpoint
61+
* instance of same Format for the same `version`.
5762
*/
5863
case class CheckpointInstance(
5964
version: Long,
6065
format: CheckpointInstance.Format,
66+
fileName: Option[String] = None,
6167
numParts: Option[Int] = None) extends Ordered[CheckpointInstance] {
6268

6369
// Assert that numParts are present when checkpoint format is Format.WITH_PARTS.
6470
// For other formats, numParts must be None.
6571
require((format == CheckpointInstance.Format.WITH_PARTS) == numParts.isDefined,
6672
s"numParts ($numParts) must be present for checkpoint format" +
6773
s" ${CheckpointInstance.Format.WITH_PARTS.name}")
74+
// Assert that filePath is present only when checkpoint format is Format.V2.
75+
// For other formats, filePath must be None.
76+
require((format == CheckpointInstance.Format.V2) == fileName.isDefined,
77+
s"fileName ($fileName) must be present for checkpoint format" +
78+
s" ${CheckpointInstance.Format.V2.name}")
6879

6980
/**
7081
* Returns a [[CheckpointProvider]] which can tell the files corresponding to this
@@ -81,7 +92,26 @@ case class CheckpointInstance(
8192
val lastCheckpointInfo = lastCheckpointInfoHint.filter(cm => CheckpointInstance(cm) == this)
8293
val cpFiles = filterFiles(deltaLog, filesForCheckpointConstruction)
8394
format match {
84-
case CheckpointInstance.Format.WITH_PARTS | CheckpointInstance.Format.SINGLE =>
95+
// Treat single file checkpoints also as V2 Checkpoints because we don't know if it is
96+
// actually a V2 checkpoint until we read it.
97+
case CheckpointInstance.Format.V2 | CheckpointInstance.Format.SINGLE =>
98+
assert(cpFiles.size == 1)
99+
val fileStatus = cpFiles.head
100+
if (format == CheckpointInstance.Format.V2) {
101+
val hadoopConf = deltaLog.newDeltaHadoopConf()
102+
UninitializedV2CheckpointProvider(
103+
version,
104+
fileStatus,
105+
logPath,
106+
hadoopConf,
107+
deltaLog.options,
108+
deltaLog.store,
109+
lastCheckpointInfo)
110+
} else {
111+
UninitializedV1OrV2ParquetCheckpointProvider(
112+
version, fileStatus, logPath, lastCheckpointInfo)
113+
}
114+
case CheckpointInstance.Format.WITH_PARTS =>
85115
PreloadedCheckpointProvider(cpFiles, lastCheckpointInfo)
86116
case CheckpointInstance.Format.SENTINEL =>
87117
throw DeltaErrors.assertionFailedError(
@@ -93,6 +123,23 @@ case class CheckpointInstance(
93123
filesForCheckpointConstruction: Seq[FileStatus]) : Seq[FileStatus] = {
94124
val logPath = deltaLog.logPath
95125
format match {
126+
// Treat Single File checkpoints also as V2 Checkpoints because we don't know if it is
127+
// actually a V2 checkpoint until we read it.
128+
case format if format.usesSidecars =>
129+
val checkpointFileName = format match {
130+
case CheckpointInstance.Format.V2 => fileName.get
131+
case CheckpointInstance.Format.SINGLE => checkpointFileSingular(logPath, version).getName
132+
case other =>
133+
throw new IllegalStateException(s"Unknown checkpoint format $other supporting sidecars")
134+
}
135+
val fileStatus = filesForCheckpointConstruction
136+
.find(_.getPath.getName == checkpointFileName)
137+
.getOrElse {
138+
throw new IllegalStateException("Failed in getting the file information for:\n" +
139+
fileName.get + "\namong\n" +
140+
filesForCheckpointConstruction.map(_.getPath.getName).mkString(" -", "\n -", ""))
141+
}
142+
Seq(fileStatus)
96143
case CheckpointInstance.Format.WITH_PARTS | CheckpointInstance.Format.SINGLE =>
97144
val filePaths = if (format == CheckpointInstance.Format.WITH_PARTS) {
98145
checkpointFileWithParts(logPath, version, numParts.get).toSet
@@ -119,28 +166,35 @@ case class CheckpointInstance(
119166
* Single part checkpoint.
120167
* 3. For Multi-part [[CheckpointInstance]]s corresponding to same version, the one with more
121168
* parts is greater than the one with less parts.
169+
* 4. For V2 Checkpoints corresponding to same version, we use the fileName as tie breaker.
122170
*/
123171
override def compare(other: CheckpointInstance): Int = {
124-
(version, format, numParts) compare (other.version, other.format, other.numParts)
172+
(version, format, numParts, fileName) compare
173+
(other.version, other.format, other.numParts, other.fileName)
125174
}
126175
}
127176

128177
object CheckpointInstance {
129178
sealed abstract class Format(val ordinal: Int, val name: String) extends Ordered[Format] {
130179
override def compare(other: Format): Int = ordinal compare other.ordinal
180+
def usesSidecars: Boolean = this.isInstanceOf[FormatUsesSidecars]
131181
}
182+
trait FormatUsesSidecars
132183

133184
object Format {
134185
def unapply(name: String): Option[Format] = name match {
135186
case SINGLE.name => Some(SINGLE)
136187
case WITH_PARTS.name => Some(WITH_PARTS)
188+
case V2.name => Some(V2)
137189
case _ => None
138190
}
139191

140192
/** single-file checkpoint format */
141-
object SINGLE extends Format(0, "SINGLE")
193+
object SINGLE extends Format(0, "SINGLE") with FormatUsesSidecars
142194
/** multi-file checkpoint format */
143195
object WITH_PARTS extends Format(1, "WITH_PARTS")
196+
/** V2 Checkpoint format */
197+
object V2 extends Format(2, "V2") with FormatUsesSidecars
144198
/** Sentinel, for internal use only */
145199
object SENTINEL extends Format(Int.MaxValue, "SENTINEL")
146200
}
@@ -149,7 +203,14 @@ object CheckpointInstance {
149203
// Three formats to worry about:
150204
// * <version>.checkpoint.parquet
151205
// * <version>.checkpoint.<i>.<n>.parquet
206+
// * <version>.checkpoint.<u>.parquet where u is a unique string
152207
path.getName.split("\\.") match {
208+
case Array(v, "checkpoint", uniqueStr, format) if Seq("json", "parquet").contains(format) =>
209+
CheckpointInstance(
210+
version = v.toLong,
211+
format = Format.V2,
212+
numParts = None,
213+
fileName = Some(path.getName))
153214
case Array(v, "checkpoint", "parquet") =>
154215
CheckpointInstance(v.toLong, Format.SINGLE, numParts = None)
155216
case Array(v, "checkpoint", _, n, "parquet") =>
@@ -384,6 +445,8 @@ trait Checkpoints extends DeltaLogging {
384445
case CheckpointInstance.Format.WITH_PARTS =>
385446
assert(ci.numParts.nonEmpty, "Multi-Part Checkpoint must have non empty numParts")
386447
matchingCheckpointInstances.length == ci.numParts.get
448+
case CheckpointInstance.Format.V2 =>
449+
matchingCheckpointInstances.length == 1
387450
case CheckpointInstance.Format.SENTINEL =>
388451
false
389452
}

spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import org.apache.spark.sql.delta.metering.DeltaLogging
3535
import org.apache.spark.sql.delta.schema.{SchemaMergingUtils, SchemaUtils}
3636
import org.apache.spark.sql.delta.sources._
3737
import org.apache.spark.sql.delta.storage.LogStoreProvider
38+
import org.apache.spark.sql.delta.util.FileNames
3839
import com.google.common.cache.{CacheBuilder, RemovalNotification}
3940
import org.apache.hadoop.conf.Configuration
4041
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
@@ -86,6 +87,14 @@ class DeltaLog private(
8687
import org.apache.spark.sql.delta.files.TahoeFileIndex
8788
import org.apache.spark.sql.delta.util.FileNames._
8889

90+
/**
91+
* Path to sidecar directory.
92+
* This is intentionally kept `lazy val` as otherwise any other constructor codepaths in DeltaLog
93+
* (e.g. SnapshotManagement etc) will see it as null as they are executed before this line is
94+
* called.
95+
*/
96+
lazy val sidecarDirPath: Path = FileNames.sidecarDirPath(logPath)
97+
8998

9099
protected def spark = SparkSession.active
91100

spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,12 +207,22 @@ class Snapshot(
207207
* Pulls the protocol and metadata of the table from the files that are used to compute the
208208
* Snapshot directly--without triggering a full state reconstruction. This is important, because
209209
* state reconstruction depends on protocol and metadata for correctness.
210+
*
211+
* Also this method should only access methods defined in [[UninitializedCheckpointProvider]]
212+
* which are not present in [[CheckpointProvider]]. This is because initialization of
213+
* [[Snapshot.checkpointProvider]] depends on [[Snapshot.protocolAndMetadataReconstruction()]]
214+
* and so if [[Snapshot.protocolAndMetadataReconstruction()]] starts depending on
215+
* [[Snapshot.checkpointProvider]] then there will be cyclic dependency.
210216
*/
211217
protected def protocolAndMetadataReconstruction(): Array[(Protocol, Metadata)] = {
212218
import implicits._
213219

214220
val schemaToUse = Action.logSchema(Set("protocol", "metaData"))
215-
fileIndices.map(deltaLog.loadIndex(_, schemaToUse))
221+
val checkpointOpt = checkpointProvider.topLevelFileIndex.map { index =>
222+
deltaLog.loadIndex(index, schemaToUse)
223+
.withColumn(COMMIT_VERSION_COLUMN, lit(checkpointProvider.version))
224+
}
225+
(checkpointOpt ++ deltaFileIndexOpt.map(deltaLog.loadIndex(_, schemaToUse)).toSeq)
216226
.reduceOption(_.union(_)).getOrElse(emptyDF)
217227
.select("protocol", "metaData", COMMIT_VERSION_COLUMN)
218228
.where("protocol.minReaderVersion is not null or metaData.id is not null")
@@ -368,6 +378,8 @@ class Snapshot(
368378
/** The [[CheckpointProvider]] for the underlying checkpoint */
369379
lazy val checkpointProvider: CheckpointProvider = logSegment.checkpointProvider match {
370380
case cp: CheckpointProvider => cp
381+
case uninitializedProvider: UninitializedCheckpointProvider =>
382+
CheckpointProvider(spark, this, checksumOpt, uninitializedProvider)
371383
case o => throw new IllegalStateException(s"Unknown checkpoint provider: ${o.getClass.getName}")
372384
}
373385

spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -813,6 +813,14 @@ trait SnapshotManagement { self: DeltaLog =>
813813
}
814814

815815
object SnapshotManagement {
816+
// A thread pool for reading checkpoint files and collecting checkpoint v2 actions like
817+
// checkpointMetadata, sidecarFiles.
818+
private[delta] lazy val checkpointV2ThreadPool = {
819+
val numThreads = SparkSession.active.sessionState.conf.getConf(
820+
DeltaSQLConf.CHECKPOINT_V2_DRIVER_THREADPOOL_PARALLELISM)
821+
DeltaThreadPool("checkpointV2-threadpool", numThreads)
822+
}
823+
816824
protected[delta] lazy val deltaLogAsyncUpdateThreadPool = {
817825
val tpe = ThreadUtils.newDaemonCachedThreadPool("delta-state-update", 8)
818826
new DeltaThreadPool(tpe)

spark/src/main/scala/org/apache/spark/sql/delta/actions/InMemoryLogReplay.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ class InMemoryLogReplay(
5959
domainMetadatas.remove(a.domain)
6060
case a: DomainMetadata if !a.removed =>
6161
domainMetadatas(a.domain) = a
62+
case _: CheckpointOnlyAction => // Ignore this while doing LogReplay
6263
case a: Metadata =>
6364
currentMetaData = a
6465
case a: Protocol =>

spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -576,6 +576,14 @@ trait DeltaSQLConfBase {
576576
// Checkpoint V2 Specific Configs
577577
////////////////////////////////////
578578

579+
val CHECKPOINT_V2_DRIVER_THREADPOOL_PARALLELISM =
580+
buildStaticConf("checkpointV2.threadpool.size")
581+
.doc("The size of the threadpool for fetching CheckpointMetadata and SidecarFiles from a" +
582+
" checkpoint.")
583+
.internal()
584+
.intConf
585+
.createWithDefault(32)
586+
579587
val CHECKPOINT_V2_TOP_LEVEL_FILE_FORMAT =
580588
buildConf("checkpointV2.topLevelFileFormat")
581589
.internal()

spark/src/main/scala/org/apache/spark/sql/delta/util/DeltaEncoders.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.sql.delta.util
1919
import scala.reflect.runtime.universe.TypeTag
2020

2121
import org.apache.spark.sql.delta.{DeltaHistory, DeltaHistoryManager, SerializableFileStatus, SnapshotState}
22-
import org.apache.spark.sql.delta.actions.{AddFile, Metadata, Protocol, RemoveFile, SingleAction}
22+
import org.apache.spark.sql.delta.actions._
2323
import org.apache.spark.sql.delta.commands.convert.ConvertTargetFile
2424
import org.apache.spark.sql.delta.sources.IndexedFile
2525

@@ -77,6 +77,10 @@ private[delta] trait DeltaEncoders {
7777
private lazy val _pmvEncoder = new DeltaEncoder[(Protocol, Metadata, Long)]
7878
implicit def pmvEncoder: Encoder[(Protocol, Metadata, Long)] = _pmvEncoder.get
7979

80+
private lazy val _v2CheckpointActionsEncoder = new DeltaEncoder[(CheckpointMetadata, SidecarFile)]
81+
implicit def v2CheckpointActionsEncoder: Encoder[(CheckpointMetadata, SidecarFile)] =
82+
_v2CheckpointActionsEncoder.get
83+
8084
private lazy val _serializableFileStatusEncoder = new DeltaEncoder[SerializableFileStatus]
8185
implicit def serializableFileStatusEncoder: Encoder[SerializableFileStatus] =
8286
_serializableFileStatusEncoder.get

0 commit comments

Comments
 (0)