Skip to content

Commit 86460aa

Browse files
[Spark] Enable Fast Drop feature by default (#4212)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> Enable Fast Drop feature by default. ## How was this patch tested? <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> Existing tests. ## Does this PR introduce _any_ user-facing changes? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. --> Yes. The current drop feature command requires the execution of the command twice with a 24 hour waiting time in between. In addition, it also results in the truncation of the history of the Delta table to the last 24 hours. We introduce a new DROP FEATURE implementation that allows to drop features instantly without truncating history. Dropping a feature introduces a new writer feature to the table, the `CheckpointProtectionTableFeature`. Dropping a feature with the new behaviour can be achieved as follows: `ALTER TABLE x DROP FEATURE y` We can still drop a feature with the old behaviour as follows: `ALTER TABLE x DROP FEATURE y TRUNCATE HISTORY`. Finally, the CheckpointProtectionTableFeature can be dropped similarly to any other feature: `ALTER TABLE x DROP FEATURE CheckpointProtectionTableFeatureTRUNCATE HISTORY`.
1 parent 59e3ccf commit 86460aa

File tree

8 files changed

+64
-20
lines changed

8 files changed

+64
-20
lines changed

python/delta/tests/test_deltatable.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1221,6 +1221,10 @@ def test_addFeatureSupport(self) -> None:
12211221
["appendOnly", "deletionVectors", "invariants"])
12221222

12231223
def test_dropFeatureSupport(self) -> None:
1224+
# The expected results below are based on drop feature with history truncation.
1225+
# Fast drop feature, adds a writer feature when dropped. The relevant behavior is tested
1226+
# in the DeltaFastDropFeatureSuite.
1227+
self.spark.conf.set('spark.databricks.delta.tableFeatures.fastDropFeature.enabled', 'false')
12241228
dt = self.__create_df_for_feature_tests()
12251229

12261230
dt.addFeatureSupport("testRemovableWriter")

spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -460,14 +460,13 @@ trait DeltaSQLConfBase {
460460
.createWithDefault(false)
461461

462462
val FAST_DROP_FEATURE_ENABLED =
463-
buildConf("tableFeatures.dev.fastDropFeature.enabled")
463+
buildConf("tableFeatures.fastDropFeature.enabled")
464464
.internal()
465465
.doc(
466-
"""Whether to enable the fast drop feature feature functionality.
467-
|This feature is currently in development and this config is only intended to be enabled
468-
|for testing purposes.""".stripMargin)
466+
"""Whether to allow dropping features with the fast drop feature feature
467+
|functionality.""".stripMargin)
469468
.booleanConf
470-
.createWithDefault(false)
469+
.createWithDefault(true)
471470

472471
val FAST_DROP_FEATURE_DV_DISCOVERY_IN_VACUUM_DISABLED =
473472
buildConf("tableFeatures.dev.fastDropFeature.DVDiscoveryInVacuum.disabled")
@@ -479,7 +478,7 @@ trait DeltaSQLConfBase {
479478
.createWithDefault(false)
480479

481480
val FAST_DROP_FEATURE_GENERATE_DV_TOMBSTONES =
482-
buildConf("tableFeatures.dev.fastDropFeature.generateDVTombstones.enabled")
481+
buildConf("tableFeatures.fastDropFeature.generateDVTombstones.enabled")
483482
.internal()
484483
.doc(
485484
"""Whether to generate DV tombstones when dropping deletion vectors.
@@ -489,7 +488,7 @@ trait DeltaSQLConfBase {
489488
.createWithDefaultFunction(() => SQLConf.get.getConf(DeltaSQLConf.FAST_DROP_FEATURE_ENABLED))
490489

491490
val FAST_DROP_FEATURE_DV_TOMBSTONE_COUNT_THRESHOLD =
492-
buildConf("tableFeatures.dev.fastDropFeature.dvTombstoneCountThreshold")
491+
buildConf("tableFeatures.fastDropFeature.dvTombstoneCountThreshold")
493492
.doc(
494493
"""The maximum number of DV tombstones we are allowed store to memory when dropping
495494
|deletion vectors. When the resulting number of DV tombstones is higher, we use
@@ -500,7 +499,7 @@ trait DeltaSQLConfBase {
500499
.createWithDefault(10000)
501500

502501
val FAST_DROP_FEATURE_STREAMING_ALWAYS_VALIDATE_PROTOCOL =
503-
buildConf("tableFeatures.dev.fastDropFeature.alwaysValidateProtocolInStreaming.enabled")
502+
buildConf("tableFeatures.fastDropFeature.alwaysValidateProtocolInStreaming.enabled")
504503
.internal()
505504
.doc(
506505
"""Whether to validate the protocol when starting a stream from arbitrary

spark/src/test/scala/io/delta/tables/DeltaTableSuite.scala

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import scala.language.postfixOps
2424
// scalastyle:off import.ordering.noEmptyLine
2525
import org.apache.spark.sql.delta.{AppendOnlyTableFeature, DeltaIllegalArgumentException, DeltaLog, DeltaTableFeatureException, FakeFileSystem, InvariantsTableFeature, TestReaderWriterFeature, TestRemovableReaderWriterFeature, TestRemovableWriterFeature, TestWriterFeature}
2626
import org.apache.spark.sql.delta.actions.{ Metadata, Protocol }
27+
import org.apache.spark.sql.delta.sources.DeltaSQLConf
2728
import org.apache.spark.sql.delta.storage.LocalLogStore
2829
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
2930
import org.apache.spark.sql.delta.test.DeltaTestImplicits._
@@ -208,8 +209,15 @@ class DeltaTableHadoopOptionsSuite extends QueryTest
208209

209210
import testImplicits._
210211

211-
protected override def sparkConf =
212-
super.sparkConf.set("spark.delta.logStore.fake.impl", classOf[LocalLogStore].getName)
212+
protected override def sparkConf = {
213+
// The drop feature test below is targeting the drop feature with history truncation
214+
// implementation. The fast drop feature implementation adds a new writer feature when dropping
215+
// a feature and also does not require any waiting time. The fast drop feature implementation
216+
// is tested extensively in the DeltaFastDropFeatureSuite.
217+
super.sparkConf
218+
.set(DeltaSQLConf.FAST_DROP_FEATURE_ENABLED.key, "false")
219+
.set("spark.delta.logStore.fake.impl", classOf[LocalLogStore].getName)
220+
}
213221

214222
/**
215223
* Create Hadoop file system options for `FakeFileSystem`. If Delta doesn't pick up them,

spark/src/test/scala/org/apache/spark/sql/delta/DeltaFastDropFeatureSuite.scala

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,9 @@ class DeltaFastDropFeatureSuite
312312
}
313313
}
314314

315-
test("Drop CheckpointProtectionTableFeature") {
315+
for (withFastDropFeatureEnabled <- BOOLEAN_DOMAIN)
316+
test("Drop CheckpointProtectionTableFeature " +
317+
s"withFastDropFeatureEnabled: $withFastDropFeatureEnabled") {
316318
withTempDir { dir =>
317319
val clock = new ManualClock(System.currentTimeMillis())
318320
val deltaLog = DeltaLog.forTable(spark, new Path(dir.getAbsolutePath), clock)
@@ -334,17 +336,20 @@ class DeltaFastDropFeatureSuite
334336
val checkpointProtectionVersion =
335337
CheckpointProtectionTableFeature.getCheckpointProtectionVersion(deltaLog.update())
336338

337-
val e = intercept[DeltaTableFeatureException] {
338-
dropTableFeature(deltaLog, CheckpointProtectionTableFeature, truncateHistory = true)
339-
}
340-
checkError(
341-
e,
342-
"DELTA_FEATURE_DROP_CHECKPOINT_PROTECTION_WAIT_FOR_RETENTION_PERIOD",
343-
parameters = Map("truncateHistoryLogRetentionPeriod" -> "24 hours"))
339+
withSQLConf(
340+
DeltaSQLConf.FAST_DROP_FEATURE_ENABLED.key -> withFastDropFeatureEnabled.toString) {
341+
val e = intercept[DeltaTableFeatureException] {
342+
dropTableFeature(deltaLog, CheckpointProtectionTableFeature, truncateHistory = true)
343+
}
344+
checkError(
345+
e,
346+
"DELTA_FEATURE_DROP_CHECKPOINT_PROTECTION_WAIT_FOR_RETENTION_PERIOD",
347+
parameters = Map("truncateHistoryLogRetentionPeriod" -> "24 hours"))
344348

345-
clock.advance(TimeUnit.HOURS.toMillis(48))
349+
clock.advance(TimeUnit.HOURS.toMillis(48))
346350

347-
dropTableFeature(deltaLog, CheckpointProtectionTableFeature, truncateHistory = true)
351+
dropTableFeature(deltaLog, CheckpointProtectionTableFeature, truncateHistory = true)
352+
}
348353

349354
val snapshot = deltaLog.update()
350355
val protocol = snapshot.protocol

spark/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolVersionSuite.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,12 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest
6161
// `.schema` generates NOT NULL columns which requires writer protocol 2. We convert all to
6262
// NULLable to avoid silent writer protocol version bump.
6363
private lazy val testTableSchema = spark.range(1).schema.asNullable
64+
override protected def sparkConf: SparkConf = {
65+
// All the drop feature tests below are targeting the drop feature with history truncation
66+
// implementation. The fast drop feature implementation is tested extensively in
67+
// DeltaFastDropFeatureSuite.
68+
super.sparkConf.set(DeltaSQLConf.FAST_DROP_FEATURE_ENABLED.key, "false")
69+
}
6470

6571
// This is solely a test hook. Users cannot create new Delta tables with protocol lower than
6672
// that of their current version.

spark/src/test/scala/org/apache/spark/sql/delta/DeltaTableFeatureSuite.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,13 @@ class DeltaTableFeatureSuite
4040
with DeltaSQLCommandTest {
4141

4242
private lazy val testTableSchema = spark.range(1).schema
43+
override protected def sparkConf: SparkConf = {
44+
// All the drop feature tests below are targeting the drop feature with history truncation
45+
// implementation. The fast drop feature implementation adds a new writer feature when dropping
46+
// a feature and also does not require any waiting time. The fast drop feature implementation
47+
// is tested extensively in the DeltaFastDropFeatureSuite.
48+
super.sparkConf.set(DeltaSQLConf.FAST_DROP_FEATURE_ENABLED.key, "false")
49+
}
4350

4451
// This is solely a test hook. Users cannot create new Delta tables with protocol lower than
4552
// that of their current version.

spark/src/test/scala/org/apache/spark/sql/delta/columnmapping/DropColumnMappingFeatureSuite.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit
2121
import org.apache.spark.sql.delta._
2222
import org.apache.spark.sql.delta.DeltaConfigs._
2323
import org.apache.spark.sql.delta.catalog.DeltaTableV2
24+
import org.apache.spark.sql.delta.sources.DeltaSQLConf
2425
import org.apache.spark.sql.delta.sources.DeltaSQLConf._
2526

2627
import org.apache.spark.sql.catalyst.TableIdentifier
@@ -31,6 +32,14 @@ import org.apache.spark.util.ManualClock
3132
*/
3233
class DropColumnMappingFeatureSuite extends RemoveColumnMappingSuiteUtils {
3334

35+
override def beforeAll(): Unit = {
36+
super.beforeAll()
37+
// All the drop feature tests below are based on the drop feature with history truncation
38+
// implementation. The fast drop feature implementation does not require any waiting time.
39+
// The fast drop feature implementation is tested extensively in the DeltaFastDropFeatureSuite.
40+
spark.conf.set(DeltaSQLConf.FAST_DROP_FEATURE_ENABLED.key, false.toString)
41+
}
42+
3443
val clock = new ManualClock(System.currentTimeMillis())
3544
test("column mapping cannot be dropped without the feature flag") {
3645
withSQLConf(ALLOW_COLUMN_MAPPING_REMOVAL.key -> "false") {

spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningTestMixin.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import org.apache.spark.sql.delta._
2020
import org.apache.spark.sql.delta.actions.{RemoveFile, TableFeatureProtocolUtils}
2121
import org.apache.spark.sql.delta.catalog.DeltaTableV2
2222
import org.apache.spark.sql.delta.commands.AlterTableDropFeatureDeltaCommand
23+
import org.apache.spark.sql.delta.sources.DeltaSQLConf
2324
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
2425
import org.apache.spark.sql.delta.util.DeltaFileOperations
2526
import com.google.common.math.DoubleMath
@@ -46,6 +47,11 @@ trait TypeWideningTestMixin extends DeltaSQLCommandTest with DeltaDMLTestUtils {
4647
.set(SQLConf.ANSI_ENABLED.key, "true")
4748
// Rebase mode must be set explicitly to allow writing dates before 1582-10-15.
4849
.set(SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key, LegacyBehaviorPolicy.CORRECTED.toString)
50+
// All the drop feature tests below are based on the drop feature with history truncation
51+
// implementation. The fast drop feature implementation does not require any waiting time.
52+
// The fast drop feature implementation is tested extensively in the
53+
// DeltaFastDropFeatureSuite.
54+
.set(DeltaSQLConf.FAST_DROP_FEATURE_ENABLED.key, false.toString)
4955
}
5056

5157
/** Enable (or disable) type widening for the table under the given path. */

0 commit comments

Comments
 (0)