Skip to content

Commit 2330330

Browse files
[Spark] Add support for dropping Deletion Vectors (#4009)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> This PR adds support for dropping deletion vectors. ## How was this patch tested? <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> Added tests in `DeltaProtocolVersionSuite`. ## Does this PR introduce _any_ user-facing changes? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. --> Yes. See description.
1 parent 1afa48e commit 2330330

File tree

4 files changed

+180
-1
lines changed

4 files changed

+180
-1
lines changed

spark/src/main/scala/org/apache/spark/sql/delta/PreDowngradeTableFeatureCommand.scala

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,59 @@ case class TestLegacyReaderWriterFeaturePreDowngradeCommand(table: DeltaTableV2)
129129
}
130130
}
131131

132+
class DeletionVectorsRemovalMetrics(
133+
val numDeletionVectorsToRemove: Long,
134+
val numDeletionVectorRowsToRemove: Long,
135+
var downgradeTimeMs: Long = 0L)
136+
137+
case class DeletionVectorsPreDowngradeCommand(table: DeltaTableV2)
138+
extends PreDowngradeTableFeatureCommand
139+
with DeltaLogging {
140+
141+
/**
142+
* We first remove the table feature property to prevent any transactions from committing
143+
* new DVs. This will cause any concurrent transactions tox fail. Then, we run PURGE
144+
* to remove existing DVs from the latest snapshot.
145+
* Note, during the protocol downgrade phase we validate whether all invariants still hold.
146+
* This should detect if any concurrent txns enabled the feature and/or added DVs again.
147+
*
148+
* @return Returns true if it removed DV metadata property and/or DVs. False otherwise.
149+
*/
150+
override def removeFeatureTracesIfNeeded(): Boolean = {
151+
// Latest snapshot looks clean. No action is required. We may proceed
152+
// to the protocol downgrade phase.
153+
if (DeletionVectorsTableFeature.validateRemoval(table.initialSnapshot)) return false
154+
155+
val startTimeNs = System.nanoTime()
156+
val properties = Seq(DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.key)
157+
AlterTableUnsetPropertiesDeltaCommand(
158+
table, properties, ifExists = true, fromDropFeatureCommand = true).run(table.spark)
159+
160+
val snapshot = table.update()
161+
val metrics = new DeletionVectorsRemovalMetrics(
162+
numDeletionVectorsToRemove = snapshot.numDeletionVectorsOpt.getOrElse(0L),
163+
numDeletionVectorRowsToRemove = snapshot.numDeletedRecordsOpt.getOrElse(0L))
164+
165+
// Wrap `table` in a ResolvedTable that can be passed to DeltaReorgTableCommand. The catalog &
166+
// table ID won't be used by DeltaReorgTableCommand.
167+
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
168+
val catalog = table.spark.sessionState.catalogManager.currentCatalog.asTableCatalog
169+
val tableId = Seq(table.name()).asIdentifier
170+
171+
DeltaReorgTableCommand(target = ResolvedTable.create(catalog, tableId, table))(Nil)
172+
.run(table.spark)
173+
174+
metrics.downgradeTimeMs =
175+
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs)
176+
177+
recordDeltaEvent(
178+
table.deltaLog,
179+
opType = "delta.deletionVectorsFeatureRemovalMetrics",
180+
data = metrics)
181+
true
182+
}
183+
}
184+
132185
case class V2CheckpointPreDowngradeCommand(table: DeltaTableV2)
133186
extends PreDowngradeTableFeatureCommand
134187
with DeltaLogging {

spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import java.util.Locale
2020

2121
import org.apache.spark.sql.delta.actions._
2222
import org.apache.spark.sql.delta.catalog.DeltaTableV2
23+
import org.apache.spark.sql.delta.commands.DeletionVectorUtils
2324
import org.apache.spark.sql.delta.constraints.{Constraints, Invariants}
2425
import org.apache.spark.sql.delta.coordinatedcommits.CoordinatedCommitsUtils
2526
import org.apache.spark.sql.delta.redirect.{RedirectReaderWriter, RedirectWriterOnly}
@@ -641,6 +642,7 @@ object VariantTypeTableFeature extends ReaderWriterFeature(name = "variantType")
641642

642643
object DeletionVectorsTableFeature
643644
extends ReaderWriterFeature(name = "deletionVectors")
645+
with RemovableFeature
644646
with FeatureAutomaticallyEnabledByMetadata {
645647
override def automaticallyUpdateProtocolOfExistingTables: Boolean = true
646648

@@ -650,6 +652,27 @@ object DeletionVectorsTableFeature
650652
spark: SparkSession): Boolean = {
651653
DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.fromMetaData(metadata)
652654
}
655+
656+
override def validateRemoval(snapshot: Snapshot): Boolean = {
657+
val dvsWritable = DeletionVectorUtils.deletionVectorsWritable(snapshot)
658+
val dvsExist = snapshot.numDeletionVectorsOpt.getOrElse(0L) > 0
659+
660+
!(dvsWritable || dvsExist)
661+
}
662+
663+
override def actionUsesFeature(action: Action): Boolean = {
664+
action match {
665+
case m: Metadata => DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.fromMetaData(m)
666+
case a: AddFile => a.deletionVector != null
667+
case r: RemoveFile => r.deletionVector != null
668+
// In general, CDC actions do not contain DVs. We added this for safety.
669+
case cdc: AddCDCFile => cdc.deletionVector != null
670+
case _ => false
671+
}
672+
}
673+
674+
override def preDowngradeCommand(table: DeltaTableV2): PreDowngradeTableFeatureCommand =
675+
DeletionVectorsPreDowngradeCommand(table)
653676
}
654677

655678
object RowTrackingFeature extends WriterFeature(name = "rowTracking")

spark/src/test/scala/org/apache/spark/sql/delta/DeletionVectorsTestUtils.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import java.util.UUID
2121

2222
import org.apache.spark.sql.delta.DeltaOperations.Truncate
2323
import org.apache.spark.sql.delta.actions.{Action, AddFile, DeletionVectorDescriptor, RemoveFile}
24+
import org.apache.spark.sql.delta.catalog.DeltaTableV2
25+
import org.apache.spark.sql.delta.commands.AlterTableDropFeatureDeltaCommand
2426
import org.apache.spark.sql.delta.deletionvectors.{RoaringBitmapArray, RoaringBitmapArrayFormat}
2527
import org.apache.spark.sql.delta.sources.DeltaSQLConf
2628
import org.apache.spark.sql.delta.storage.dv.DeletionVectorStore
@@ -72,6 +74,15 @@ trait DeletionVectorsTestUtils extends QueryTest with SharedSparkSession with De
7274
}
7375
}
7476

77+
def dropDVTableFeature(
78+
spark: SparkSession,
79+
log: DeltaLog,
80+
truncateHistory: Boolean): Unit =
81+
AlterTableDropFeatureDeltaCommand(
82+
DeltaTableV2(spark, log.dataPath),
83+
DeletionVectorsTableFeature.name,
84+
truncateHistory = truncateHistory).run(spark)
85+
7586
/** Helper to run 'fn' with a temporary Delta table. */
7687
def withTempDeltaTable(
7788
dataDF: DataFrame,

spark/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolVersionSuite.scala

Lines changed: 93 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import org.apache.spark.sql.delta.actions._
3030
import org.apache.spark.sql.delta.actions.TableFeatureProtocolUtils._
3131
import org.apache.spark.sql.delta.catalog.DeltaTableV2
3232
import org.apache.spark.sql.delta.commands.{AlterTableDropFeatureDeltaCommand, AlterTableSetPropertiesDeltaCommand, AlterTableUnsetPropertiesDeltaCommand}
33+
import org.apache.spark.sql.delta.commands.DeletionVectorUtils
3334
import org.apache.spark.sql.delta.coordinatedcommits._
3435
import org.apache.spark.sql.delta.sources.DeltaSQLConf
3536
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
@@ -54,7 +55,8 @@ import org.apache.spark.util.ManualClock
5455

5556
trait DeltaProtocolVersionSuiteBase extends QueryTest
5657
with SharedSparkSession
57-
with DeltaSQLCommandTest {
58+
with DeltaSQLCommandTest
59+
with DeletionVectorsTestUtils {
5860

5961
// `.schema` generates NOT NULL columns which requires writer protocol 2. We convert all to
6062
// NULLable to avoid silent writer protocol version bump.
@@ -3408,6 +3410,96 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest
34083410
expectedDowngradedProtocol = protocolWithReaderFeature(TestRemovableReaderWriterFeature))
34093411
}
34103412

3413+
for {
3414+
truncateHistory <- BOOLEAN_DOMAIN
3415+
enableCDF <- if (truncateHistory) Seq(false) else BOOLEAN_DOMAIN
3416+
} test(s"Remove Deletion Vectors feature " +
3417+
s"truncateHistory: $truncateHistory, enableCDF: $enableCDF") {
3418+
val targetDF = spark.range(start = 0, end = 100, step = 1, numPartitions = 2)
3419+
withSQLConf(
3420+
DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.defaultTablePropertyKey -> "true",
3421+
DeltaConfigs.CHANGE_DATA_FEED.defaultTablePropertyKey -> enableCDF.toString) {
3422+
withTempPath { dir =>
3423+
val clock = new ManualClock(System.currentTimeMillis())
3424+
val targetLog = DeltaLog.forTable(spark, dir, clock)
3425+
val defaultRetentionPeriod =
3426+
DeltaConfigs.LOG_RETENTION.fromMetaData(targetLog.update().metadata).toString
3427+
3428+
targetDF.write.format("delta").save(dir.toString)
3429+
3430+
val targetTable = io.delta.tables.DeltaTable.forPath(dir.toString)
3431+
3432+
// Add some DVs.
3433+
targetTable.delete("id >= 90")
3434+
3435+
// Assert that DVs exist.
3436+
val preDowngradeSnapshot = targetLog.update()
3437+
assert(DeletionVectorUtils.deletionVectorsWritable(preDowngradeSnapshot))
3438+
assert(preDowngradeSnapshot.numDeletionVectorsOpt === Some(1L))
3439+
3440+
// Attempting to drop Deletion Vectors feature will prohibit adding new DVs and remove
3441+
// all DVs from the latest snapshot, but ultimately fail, because history will still
3442+
// contain traces of the feature. For this reason, we have to wait for the retention period
3443+
// to be over before we can downgrade the protocol.
3444+
val e1 = intercept[DeltaTableFeatureException] {
3445+
dropDVTableFeature(spark, targetLog, truncateHistory = false)
3446+
}
3447+
checkError(
3448+
e1,
3449+
"DELTA_FEATURE_DROP_WAIT_FOR_RETENTION_PERIOD",
3450+
parameters = Map(
3451+
"feature" -> DeletionVectorsTableFeature.name,
3452+
"logRetentionPeriodKey" -> "delta.logRetentionDuration",
3453+
"logRetentionPeriod" -> defaultRetentionPeriod,
3454+
"truncateHistoryLogRetentionPeriod" -> truncateHistoryDefaultLogRetention.toString))
3455+
3456+
val postCleanupSnapshot = targetLog.update()
3457+
assert(!DeletionVectorUtils.deletionVectorsWritable(postCleanupSnapshot))
3458+
assert(postCleanupSnapshot.numDeletionVectorsOpt.getOrElse(0L) === 0)
3459+
assert(postCleanupSnapshot.numDeletedRecordsOpt.getOrElse(0L) === 0)
3460+
3461+
spark.range(100, 120).write.format("delta").mode("append").save(dir.getCanonicalPath)
3462+
spark.range(120, 140).write.format("delta").mode("append").save(dir.getCanonicalPath)
3463+
3464+
// Table still contains historical data with DVs. Attempt should fail.
3465+
val e2 = intercept[DeltaTableFeatureException] {
3466+
dropDVTableFeature(spark, targetLog, truncateHistory = false)
3467+
}
3468+
checkError(
3469+
e2,
3470+
"DELTA_FEATURE_DROP_HISTORICAL_VERSIONS_EXIST",
3471+
parameters = Map(
3472+
"feature" -> DeletionVectorsTableFeature.name,
3473+
"logRetentionPeriodKey" -> "delta.logRetentionDuration",
3474+
"logRetentionPeriod" -> defaultRetentionPeriod,
3475+
"truncateHistoryLogRetentionPeriod" -> truncateHistoryDefaultLogRetention.toString))
3476+
3477+
// Pretend retention period has passed.
3478+
val clockAdvanceMillis = if (truncateHistory) {
3479+
DeltaConfigs.getMilliSeconds(truncateHistoryDefaultLogRetention) +
3480+
TimeUnit.HOURS.toMillis(24)
3481+
} else {
3482+
targetLog.deltaRetentionMillis(targetLog.update().metadata) + TimeUnit.DAYS.toMillis(3)
3483+
}
3484+
clock.advance(clockAdvanceMillis)
3485+
3486+
// Cleanup logs.
3487+
targetLog.cleanUpExpiredLogs(targetLog.update())
3488+
3489+
// History is now clean. We should be able to remove the feature.
3490+
dropDVTableFeature(spark, targetLog, truncateHistory)
3491+
3492+
val postDowngradeSnapshot = targetLog.update()
3493+
val protocol = postDowngradeSnapshot.protocol
3494+
assert(!DeletionVectorUtils.deletionVectorsWritable(postDowngradeSnapshot))
3495+
assert(postDowngradeSnapshot.numDeletionVectorsOpt.getOrElse(0L) === 0)
3496+
assert(postDowngradeSnapshot.numDeletedRecordsOpt.getOrElse(0L) === 0)
3497+
assert(!protocol.readerFeatureNames.contains(DeletionVectorsTableFeature.name))
3498+
}
3499+
}
3500+
}
3501+
3502+
34113503
test("Can drop reader+writer feature when there is nothing to clean") {
34123504
withTempPath { dir =>
34133505
val clock = new ManualClock(System.currentTimeMillis())

0 commit comments

Comments
 (0)