Skip to content

Commit 4ee7f4d

Browse files
[Spark] Protocol version downgrade in the presence of table features (#2841)
## Description This PR adds support for protocol versions downgrade when table features exist in the protocol. The downgraded protocol versions should be the minimum required to support all available table features. For example, `Protocol(3, 7, DeletionVectors, RowTracking)` can be downgraded to `Protocol(1, 7, RowTracking)` after removing the DV feature. ## How was this patch tested? Added new UTs in DeltaProtocolVersionSuite. Furthermore, existing UTs cover a significant part of the functionality. These these are the following: - Downgrade protocol version on table created with (3, 7). - Downgrade protocol version on table created with (1, 7). - Protocol version downgrade on a table with table features and added legacy feature. - Protocol version is not downgraded when writer features exist. - Protocol version is not downgraded when reader+writer features exist. - Protocol version is not downgraded when multiple reader+writer features exist. ## Does this PR introduce _any_ user-facing changes? Yes. Dropping a table feature from a table with multiple features may now result to a Protocol versions downgrade. For example, `Protocol(3, 7, DeletionVectors, RowTracking)` can now be downgraded to `Protocol(1, 7, RowTracking)`.
1 parent f6ebe24 commit 4ee7f4d

File tree

3 files changed

+88
-75
lines changed

3 files changed

+88
-75
lines changed

spark/src/main/scala/org/apache/spark/sql/delta/actions/TableFeatureSupport.scala

Lines changed: 30 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,14 @@ trait TableFeatureSupport { this: Protocol =>
188188
lazy val readerAndWriterFeatures: Seq[TableFeature] =
189189
readerAndWriterFeatureNames.toSeq.flatMap(TableFeature.featureNameToFeature)
190190

191+
/**
192+
* A sequence of native [[TableFeature]]s. This is derived by filtering out all explicitly
193+
* supported legacy features.
194+
*/
195+
@JsonIgnore
196+
lazy val nativeReaderAndWriterFeatures: Seq[TableFeature] =
197+
readerAndWriterFeatures.filterNot(_.isLegacyFeature)
198+
191199
/**
192200
* Get all features that are implicitly supported by this protocol, for example, `Protocol(1,2)`
193201
* implicitly supports `appendOnly` and `invariants`. When this protocol is capable of requiring
@@ -242,43 +250,16 @@ trait TableFeatureSupport { this: Protocol =>
242250
}
243251

244252
/**
245-
* Determine whether this protocol can be safely downgraded to a new protocol `to`. This
246-
* includes the following:
247-
* - The current protocol needs to support at least writer features. This is because protocol
248-
* downgrade is only supported with table features.
249-
* - The protocol version can only be downgraded when there are no non-legacy table features.
250-
* - We can only remove one feature at a time.
251-
* - When downgrading protocol versions, the resulting versions must support exactly the same
252-
* set of legacy features supported by the current protocol.
253-
*
254-
* Note, this not an exhaustive list of downgrade rules. Rather, we check the most important
255-
* downgrade invariants. We also perform checks during feature removal at
256-
* [[AlterTableDropFeatureDeltaCommand]].
253+
* Determine whether this protocol can be safely downgraded to a new protocol `to`.
254+
* All we need is the implicit and explicit features between the two protocols to match,
255+
* excluding the dropped feature. Note, this accounts for cases where we downgrade
256+
* from table features to legacy protocol versions.
257257
*/
258258
def canDowngradeTo(to: Protocol, droppedFeatureName: String): Boolean = {
259-
if (!supportsWriterFeatures) return false
260-
261-
// When `to` protocol does not have any features version downgrades are possible. However,
262-
// the current protocol needs to contain one non-legacy feature. We also allow downgrade when
263-
// there are only legacy features. This is to accommodate the case when the user attempts to
264-
// remove a legacy feature in a table that only contains legacy features.
265-
if (to.readerAndWriterFeatureNames.isEmpty) {
266-
val featureNames = readerAndWriterFeatureNames - droppedFeatureName
267-
val sameLegacyFeaturesSupported = featureNames == to.implicitlySupportedFeatures.map(_.name)
268-
val minRequiredVersions = TableFeatureProtocolUtils.minimumRequiredVersions(
269-
featureNames.flatMap(TableFeature.featureNameToFeature).toSeq)
270-
271-
return sameLegacyFeaturesSupported &&
272-
(to.minReaderVersion, to.minWriterVersion) == minRequiredVersions &&
273-
readerAndWriterFeatures.filterNot(_.isLegacyFeature).size <= 1
274-
}
275-
276-
// When `to` protocol contains table features we cannot downgrade the protocol version.
277-
if (to.minReaderVersion != this.minReaderVersion) return false
278-
if (to.minWriterVersion != this.minWriterVersion) return false
279-
280-
// Can only remove a maximum of one feature at a time.
281-
(this.readerAndWriterFeatureNames -- to.readerAndWriterFeatureNames).size == 1
259+
val thisFeatures = this.implicitlyAndExplicitlySupportedFeatures
260+
val toFeatures = to.implicitlyAndExplicitlySupportedFeatures
261+
val droppedFeature = Seq(droppedFeatureName).flatMap(TableFeature.featureNameToFeature)
262+
(thisFeatures -- droppedFeature) == toFeatures
282263
}
283264

284265
/**
@@ -368,13 +349,25 @@ trait TableFeatureSupport { this: Protocol =>
368349
* features. After we remove the last native feature we downgrade the protocol to (1, 1).
369350
*/
370351
def downgradeProtocolVersionsIfNeeded: Protocol = {
371-
if (!readerAndWriterFeatures.forall(_.isLegacyFeature)) return this
352+
if (nativeReaderAndWriterFeatures.nonEmpty) {
353+
val (minReaderVersion, minWriterVersion) =
354+
TableFeatureProtocolUtils.minimumRequiredVersions(readerAndWriterFeatures)
355+
// It is guaranteed by the definitions of WriterFeature and ReaderFeature, that we cannot
356+
// end up with invalid protocol versions such as (3, 3). Nevertheless,
357+
// we double check it here.
358+
val newProtocol =
359+
Protocol(minReaderVersion, minWriterVersion).withFeatures(readerAndWriterFeatures)
360+
assert(
361+
newProtocol.supportsWriterFeatures,
362+
s"Downgraded protocol should at least support writer features, but got $newProtocol.")
363+
return newProtocol
364+
}
372365

373366
val (minReaderVersion, minWriterVersion) =
374367
TableFeatureProtocolUtils.minimumRequiredVersions(readerAndWriterFeatures)
375368
val newProtocol = Protocol(minReaderVersion, minWriterVersion)
376369

377-
require(
370+
assert(
378371
!newProtocol.supportsReaderFeatures && !newProtocol.supportsWriterFeatures,
379372
s"Downgraded protocol should not support table features, but got $newProtocol.")
380373

spark/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolVersionSuite.scala

Lines changed: 54 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3276,7 +3276,8 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest
32763276
initialMinWriterVersion: Int,
32773277
featuresToAdd: Seq[TableFeature],
32783278
featuresToRemove: Seq[TableFeature],
3279-
expectedDowngradedProtocol: Protocol): Unit = {
3279+
expectedDowngradedProtocol: Protocol,
3280+
truncateHistory: Boolean = false): Unit = {
32803281
withTempDir { dir =>
32813282
val deltaLog = DeltaLog.forTable(spark, dir)
32823283

@@ -3297,8 +3298,10 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest
32973298
|)""".stripMargin)
32983299

32993300
for (feature <- featuresToRemove) {
3300-
AlterTableDropFeatureDeltaCommand(DeltaTableV2(spark, deltaLog.dataPath), feature.name)
3301-
.run(spark)
3301+
AlterTableDropFeatureDeltaCommand(
3302+
table = DeltaTableV2(spark, deltaLog.dataPath),
3303+
featureName = feature.name,
3304+
truncateHistory = truncateHistory).run(spark)
33023305
}
33033306
assert(deltaLog.update().protocol === expectedDowngradedProtocol)
33043307
}
@@ -3345,7 +3348,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest
33453348
expectedDowngradedProtocol = Protocol(1, 1))
33463349
}
33473350

3348-
test("Downgrade protocol version on table created with table features") {
3351+
test("Downgrade protocol version on table created with (3, 7)") {
33493352
// When the table is initialized with table features there are no active (implicit) legacy
33503353
// features. After removing the last table feature we downgrade back to (1, 1).
33513354
testProtocolVersionDowngrade(
@@ -3356,7 +3359,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest
33563359
expectedDowngradedProtocol = Protocol(1, 1))
33573360
}
33583361

3359-
test("Downgrade protocol version on table created with writer features") {
3362+
test("Downgrade protocol version on table created with (1, 7)") {
33603363
testProtocolVersionDowngrade(
33613364
initialMinReaderVersion = 1,
33623365
initialMinWriterVersion = 7,
@@ -3418,7 +3421,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest
34183421
expectedDowngradedProtocol = protocolWithWriterFeature(DomainMetadataTableFeature))
34193422
}
34203423

3421-
test("Protocol version is not downgraded when reader+writer features exist") {
3424+
test("Protocol version is not downgraded when multiple reader+writer features exist") {
34223425
testProtocolVersionDowngrade(
34233426
initialMinReaderVersion = 3,
34243427
initialMinWriterVersion = 7,
@@ -3427,15 +3430,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest
34273430
expectedDowngradedProtocol = protocolWithReaderFeature(DeletionVectorsTableFeature))
34283431
}
34293432

3430-
test("Protocol version is not downgraded when both reader+writer and writer features exist") {
3431-
testProtocolVersionDowngrade(
3432-
initialMinReaderVersion = 3,
3433-
initialMinWriterVersion = 7,
3434-
featuresToAdd = Seq(TestRemovableReaderWriterFeature, TestRemovableWriterFeature),
3435-
featuresToRemove = Seq(TestRemovableReaderWriterFeature),
3436-
expectedDowngradedProtocol =
3437-
Protocol(3, 7, Some(Set.empty), Some(Set(TestRemovableWriterFeature.name))))
3438-
3433+
test("Protocol version is not downgraded when reader+writer features exist") {
34393434
testProtocolVersionDowngrade(
34403435
initialMinReaderVersion = 3,
34413436
initialMinWriterVersion = 7,
@@ -3473,6 +3468,50 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest
34733468
}
34743469
}
34753470

3471+
for (truncateHistory <- BOOLEAN_DOMAIN)
3472+
test(s"Protocol version downgrade with Table Features - Basic test " +
3473+
s"truncateHistory: ${truncateHistory}") {
3474+
val expectedFeatures = Seq(RowTrackingFeature, DomainMetadataTableFeature)
3475+
3476+
testProtocolVersionDowngrade(
3477+
initialMinReaderVersion = 3,
3478+
initialMinWriterVersion = 7,
3479+
featuresToAdd = expectedFeatures :+ TestRemovableReaderWriterFeature,
3480+
featuresToRemove = Seq(TestRemovableReaderWriterFeature),
3481+
expectedDowngradedProtocol = Protocol(1, 7).withFeatures(expectedFeatures),
3482+
truncateHistory = truncateHistory)
3483+
}
3484+
3485+
for (truncateHistory <- BOOLEAN_DOMAIN)
3486+
test(s"Protocol version downgrade with Table Features - include legacy writer features: " +
3487+
s"truncateHistory: ${truncateHistory}") {
3488+
val expectedFeatures =
3489+
Seq(DomainMetadataTableFeature, ChangeDataFeedTableFeature, AppendOnlyTableFeature)
3490+
3491+
testProtocolVersionDowngrade(
3492+
initialMinReaderVersion = 3,
3493+
initialMinWriterVersion = 7,
3494+
featuresToAdd = expectedFeatures :+ TestRemovableReaderWriterFeature,
3495+
featuresToRemove = Seq(TestRemovableReaderWriterFeature),
3496+
expectedDowngradedProtocol = Protocol(1, 7).withFeatures(expectedFeatures),
3497+
truncateHistory = truncateHistory)
3498+
}
3499+
3500+
for (truncateHistory <- BOOLEAN_DOMAIN)
3501+
test(s"Protocol version downgrade with Table Features - include legacy reader features: " +
3502+
s"truncateHistory: ${truncateHistory}") {
3503+
val expectedFeatures =
3504+
Seq(DomainMetadataTableFeature, ChangeDataFeedTableFeature, ColumnMappingTableFeature)
3505+
3506+
testProtocolVersionDowngrade(
3507+
initialMinReaderVersion = 3,
3508+
initialMinWriterVersion = 7,
3509+
featuresToAdd = expectedFeatures :+ TestRemovableReaderWriterFeature,
3510+
featuresToRemove = Seq(TestRemovableReaderWriterFeature),
3511+
expectedDowngradedProtocol = Protocol(2, 7).withFeatures(expectedFeatures),
3512+
truncateHistory = truncateHistory)
3513+
}
3514+
34763515
private def dropV2CheckpointsTableFeature(spark: SparkSession, log: DeltaLog): Unit = {
34773516
spark.sql(s"ALTER TABLE delta.`${log.dataPath}` DROP FEATURE " +
34783517
s"`${V2CheckpointTableFeature.name}`")

spark/src/test/scala/org/apache/spark/sql/delta/DeltaTableFeatureSuite.scala

Lines changed: 4 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -276,12 +276,11 @@ class DeltaTableFeatureSuite
276276
test("protocol downgrade compatibility") {
277277
val tableFeatureProtocol =
278278
Protocol(TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION)
279-
// Cannot downgrade when the original protocol does not support at a minimum writer features.
280-
assert(!Protocol(1, 6).canDowngradeTo(Protocol(1, 6), droppedFeatureName = ""))
281-
assert(tableFeatureProtocol.withFeature(TestWriterFeature)
282-
.canDowngradeTo(Protocol(1, 1), droppedFeatureName = TestWriterFeature.name))
283279
assert(Protocol(1, 7).withFeature(TestWriterFeature)
284-
.canDowngradeTo(Protocol(1, 1), droppedFeatureName = TestWriterFeature.name))
280+
.canDowngradeTo(Protocol(1, 7), droppedFeatureName = TestWriterFeature.name))
281+
// When there are no explicit features the protocol versions need to be downgraded
282+
// below table features. The new protocol versions need to match exactly the supported
283+
// legacy features.
285284
for (n <- 1 to 3) {
286285
assert(
287286
!Protocol(n, 7)
@@ -292,31 +291,13 @@ class DeltaTableFeatureSuite
292291
.withFeatures(Seq(TestWriterFeature, AppendOnlyTableFeature, InvariantsTableFeature))
293292
.canDowngradeTo(Protocol(1, 2), droppedFeatureName = TestWriterFeature.name))
294293
}
295-
// When there are no explicit features the protocol versions need to be downgraded
296-
// below table features.
297-
assert(!tableFeatureProtocol.withFeature(TestWriterFeature)
298-
.canDowngradeTo(tableFeatureProtocol, droppedFeatureName = TestWriterFeature.name))
299-
assert(!tableFeatureProtocol.withFeature(TestWriterFeature)
300-
.canDowngradeTo(Protocol(2, 7), droppedFeatureName = TestWriterFeature.name))
301-
// Only one non-legacy writer feature per time.
302-
assert(!tableFeatureProtocol.withFeatures(Seq(TestWriterFeature, TestRemovableWriterFeature))
303-
.canDowngradeTo(tableFeatureProtocol, droppedFeatureName = TestWriterFeature.name))
304-
// Remove reader+writer feature.
305294
assert(tableFeatureProtocol.withFeatures(Seq(TestReaderWriterFeature))
306295
.canDowngradeTo(Protocol(1, 1), droppedFeatureName = TestReaderWriterFeature.name))
307-
// Only one non-legacy feature at a time - multiple reader+writer features.
308-
assert(
309-
!tableFeatureProtocol
310-
.withFeatures(Seq(TestReaderWriterFeature, TestReaderWriterMetadataAutoUpdateFeature))
311-
.canDowngradeTo(tableFeatureProtocol, droppedFeatureName = ""))
312296
assert(
313297
tableFeatureProtocol
314298
.merge(Protocol(2, 5))
315299
.withFeatures(Seq(TestReaderWriterFeature, TestRemovableLegacyReaderWriterFeature))
316300
.canDowngradeTo(Protocol(2, 5), droppedFeatureName = TestReaderWriterFeature.name))
317-
// Only one feature at a time - mix of reader+writer and writer features.
318-
assert(!tableFeatureProtocol.withFeatures(Seq(TestWriterFeature, TestReaderWriterFeature))
319-
.canDowngradeTo(tableFeatureProtocol, droppedFeatureName = TestWriterFeature.name))
320301
// Downgraded protocol must be able to support all legacy table features.
321302
assert(
322303
!tableFeatureProtocol

0 commit comments

Comments
 (0)