Skip to content

Commit 1cd6fed

Browse files
[Spark] Drop feature support in DeltaTable Scala/Python APIs (#3952)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> This PR adds drop feature support in the DeltaTable API for both scala and python APIs. ## How was this patch tested? <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> Added UTs. ## Does this PR introduce _any_ user-facing changes? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. --> Yes. See description.
1 parent baa5518 commit 1cd6fed

File tree

6 files changed

+248
-24
lines changed

6 files changed

+248
-24
lines changed

python/delta/tables.py

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -594,14 +594,43 @@ def addFeatureSupport(self, featureName: str) -> None:
594594
DeltaTable._verify_type_str(featureName, "featureName")
595595
self._jdt.addFeatureSupport(featureName)
596596

597+
@since(3.4) # type: ignore[arg-type]
598+
def dropFeatureSupport(self, featureName: str, truncateHistory: Optional[bool] = None) -> None:
599+
"""
600+
Modify the protocol to drop a supported feature. The operation always normalizes the
601+
resulting protocol. Protocol normalization is the process of converting a table features
602+
protocol to the weakest possible form. This primarily refers to converting a table features
603+
protocol to a legacy protocol. A table features protocol can be represented with the legacy
604+
representation only when the feature set of the former exactly matches a legacy protocol.
605+
Normalization can also decrease the reader version of a table features protocol when it is
606+
higher than necessary. For example:
607+
608+
(1, 7, None, {AppendOnly, Invariants, CheckConstraints}) -> (1, 3)
609+
(3, 7, None, {RowTracking}) -> (1, 7, RowTracking)
610+
611+
The dropFeatureSupport method can be used as follows:
612+
delta.tables.DeltaTable.dropFeatureSupport("rowTracking")
613+
614+
:param featureName: The name of the feature to drop.
615+
:param truncateHistory: Optional value whether to truncate history. If not specified,
616+
the history is not truncated.
617+
:return: None.
618+
"""
619+
DeltaTable._verify_type_str(featureName, "featureName")
620+
if truncateHistory is None:
621+
self._jdt.dropFeatureSupport(featureName)
622+
else:
623+
DeltaTable._verify_type_bool(truncateHistory, "truncateHistory")
624+
self._jdt.dropFeatureSupport(featureName, truncateHistory)
625+
597626
@since(1.2) # type: ignore[arg-type]
598627
def restoreToVersion(self, version: int) -> DataFrame:
599628
"""
600629
Restore the DeltaTable to an older version of the table specified by version number.
601630
602631
Example::
603632
604-
io.delta.tables.DeltaTable.restoreToVersion(1)
633+
delta.tables.DeltaTable.restoreToVersion(1)
605634
606635
:param version: target version of restored table
607636
:return: Dataframe with metrics of restore operation.
@@ -622,8 +651,8 @@ def restoreToTimestamp(self, timestamp: str) -> DataFrame:
622651
623652
Example::
624653
625-
io.delta.tables.DeltaTable.restoreToTimestamp('2021-01-01')
626-
io.delta.tables.DeltaTable.restoreToTimestamp('2021-01-01 01:01:01')
654+
delta.tables.DeltaTable.restoreToTimestamp('2021-01-01')
655+
delta.tables.DeltaTable.restoreToTimestamp('2021-01-01 01:01:01')
627656
628657
:param timestamp: target timestamp of restored table
629658
:return: Dataframe with metrics of restore operation.
@@ -658,6 +687,11 @@ def optimize(self) -> "DeltaOptimizeBuilder":
658687
jbuilder = self._jdt.optimize()
659688
return DeltaOptimizeBuilder(self._spark, jbuilder)
660689

690+
@classmethod # type: ignore[arg-type]
691+
def _verify_type_bool(self, variable: bool, name: str) -> None:
692+
if not isinstance(variable, bool) or variable is None:
693+
raise ValueError("%s needs to be a boolean but got '%s'." % (name, type(variable)))
694+
661695
@staticmethod # type: ignore[arg-type]
662696
def _verify_type_str(variable: str, name: str) -> None:
663697
if not isinstance(variable, str) or variable is None:

python/delta/tests/test_deltatable.py

Lines changed: 72 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1151,16 +1151,19 @@ def test_delta_table_builder_with_bad_args(self) -> None:
11511151
with self.assertRaises(TypeError):
11521152
builder.property("1", 1) # type: ignore[arg-type]
11531153

1154-
def test_protocolUpgrade(self) -> None:
1154+
def __create_df_for_feature_tests(self) -> DeltaTable:
11551155
try:
1156-
self.spark.conf.set('spark.databricks.delta.minWriterVersion', '2')
11571156
self.spark.conf.set('spark.databricks.delta.minReaderVersion', '1')
1157+
self.spark.conf.set('spark.databricks.delta.minWriterVersion', '2')
11581158
self.__writeDeltaTable([('a', 1), ('b', 2), ('c', 3), ('d', 4)])
1159-
dt = DeltaTable.forPath(self.spark, self.tempFile)
1160-
dt.upgradeTableProtocol(1, 3)
1159+
return DeltaTable.forPath(self.spark, self.tempFile)
11611160
finally:
1162-
self.spark.conf.unset('spark.databricks.delta.minWriterVersion')
11631161
self.spark.conf.unset('spark.databricks.delta.minReaderVersion')
1162+
self.spark.conf.unset('spark.databricks.delta.minWriterVersion')
1163+
1164+
def test_protocolUpgrade(self) -> None:
1165+
dt = self.__create_df_for_feature_tests()
1166+
dt.upgradeTableProtocol(1, 3)
11641167

11651168
# cannot downgrade once upgraded
11661169
dt.upgradeTableProtocol(1, 2)
@@ -1189,14 +1192,7 @@ def test_protocolUpgrade(self) -> None:
11891192
dt.upgradeTableProtocol(1, {}) # type: ignore[arg-type]
11901193

11911194
def test_addFeatureSupport(self) -> None:
1192-
try:
1193-
self.spark.conf.set('spark.databricks.delta.minReaderVersion', '1')
1194-
self.spark.conf.set('spark.databricks.delta.minWriterVersion', '2')
1195-
self.__writeDeltaTable([('a', 1), ('b', 2), ('c', 3), ('d', 4)])
1196-
dt = DeltaTable.forPath(self.spark, self.tempFile)
1197-
finally:
1198-
self.spark.conf.unset('spark.databricks.delta.minReaderVersion')
1199-
self.spark.conf.unset('spark.databricks.delta.minWriterVersion')
1195+
dt = self.__create_df_for_feature_tests()
12001196

12011197
# bad args
12021198
with self.assertRaisesRegex(Py4JJavaError, "DELTA_UNSUPPORTED_FEATURES_IN_CONFIG"):
@@ -1224,6 +1220,69 @@ def test_addFeatureSupport(self) -> None:
12241220
self.assertEqual(sorted(dt_details["tableFeatures"]),
12251221
["appendOnly", "deletionVectors", "invariants"])
12261222

1223+
def test_dropFeatureSupport(self) -> None:
1224+
dt = self.__create_df_for_feature_tests()
1225+
1226+
dt.addFeatureSupport("testRemovableWriter")
1227+
dt_details = dt.detail().collect()[0].asDict()
1228+
self.assertTrue(dt_details["minReaderVersion"] == 1)
1229+
self.assertTrue(dt_details["minWriterVersion"] == 7, "Should upgrade to table features")
1230+
self.assertEqual(sorted(dt_details["tableFeatures"]),
1231+
["appendOnly", "invariants", "testRemovableWriter"])
1232+
1233+
# Attempt truncating the history when dropping a feature that is not required.
1234+
# This verifies the truncateHistory option was correctly passed.
1235+
with self.assertRaisesRegex(Exception,
1236+
"DELTA_FEATURE_DROP_HISTORY_TRUNCATION_NOT_ALLOWED"):
1237+
dt.dropFeatureSupport("testRemovableWriter", True)
1238+
1239+
dt.dropFeatureSupport("testRemovableWriter")
1240+
dt_details = dt.detail().collect()[0].asDict()
1241+
self.assertTrue(dt_details["minReaderVersion"] == 1)
1242+
self.assertTrue(dt_details["minWriterVersion"] == 2, "Should return to legacy protocol")
1243+
1244+
dt.addFeatureSupport("testRemovableReaderWriter")
1245+
dt_details = dt.detail().collect()[0].asDict()
1246+
self.assertTrue(dt_details["minReaderVersion"] == 3, "Should upgrade to table features")
1247+
self.assertTrue(dt_details["minWriterVersion"] == 7, "Should upgrade to table features")
1248+
self.assertEqual(sorted(dt_details["tableFeatures"]),
1249+
["appendOnly", "invariants", "testRemovableReaderWriter"])
1250+
1251+
dt.dropFeatureSupport("testRemovableReaderWriter")
1252+
dt_details = dt.detail().collect()[0].asDict()
1253+
self.assertTrue(dt_details["minReaderVersion"] == 1, "Should return to legacy protocol")
1254+
self.assertTrue(dt_details["minWriterVersion"] == 2, "Should return to legacy protocol")
1255+
1256+
# Try to drop an unsupported feature.
1257+
with self.assertRaisesRegex(Exception, "DELTA_FEATURE_DROP_UNSUPPORTED_CLIENT_FEATURE"):
1258+
dt.dropFeatureSupport("__invalid_feature__")
1259+
1260+
# Try to drop a feature that is not present in the protocol.
1261+
with self.assertRaisesRegex(Exception, "DELTA_FEATURE_DROP_FEATURE_NOT_PRESENT"):
1262+
dt.dropFeatureSupport("testRemovableReaderWriter")
1263+
1264+
# Try to drop a non-removable feature.
1265+
with self.assertRaisesRegex(Exception, "DELTA_FEATURE_DROP_NONREMOVABLE_FEATURE"):
1266+
dt.dropFeatureSupport("testReaderWriter")
1267+
1268+
with self.assertRaisesRegex(ValueError, "featureName needs to be a string"):
1269+
dt.dropFeatureSupport(12345) # type: ignore[arg-type]
1270+
with self.assertRaisesRegex(ValueError, "featureName needs to be a string"):
1271+
dt.dropFeatureSupport([12345]) # type: ignore[arg-type]
1272+
with self.assertRaisesRegex(ValueError, "featureName needs to be a string"):
1273+
dt.dropFeatureSupport({}) # type: ignore[arg-type]
1274+
with self.assertRaisesRegex(ValueError, "featureName needs to be a string"):
1275+
dt.dropFeatureSupport([]) # type: ignore[arg-type]
1276+
1277+
with self.assertRaisesRegex(ValueError, "truncateHistory needs to be a boolean"):
1278+
dt.dropFeatureSupport("testRemovableWriter", 12345) # type: ignore[arg-type]
1279+
with self.assertRaisesRegex(ValueError, "truncateHistory needs to be a boolean"):
1280+
dt.dropFeatureSupport("testRemovableWriter", [12345]) # type: ignore[arg-type]
1281+
with self.assertRaisesRegex(ValueError, "truncateHistory needs to be a boolean"):
1282+
dt.dropFeatureSupport("testRemovableWriter", {}) # type: ignore[arg-type]
1283+
with self.assertRaisesRegex(ValueError, "truncateHistory needs to be a boolean"):
1284+
dt.dropFeatureSupport("testRemovableWriter", []) # type: ignore[arg-type]
1285+
12271286
def test_restore_to_version(self) -> None:
12281287
self.__writeDeltaTable([('a', 1), ('b', 2)])
12291288
self.__overwriteDeltaTable([('a', 3), ('b', 2)],

spark/src/main/scala/io/delta/tables/DeltaTable.scala

Lines changed: 69 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@ import scala.collection.JavaConverters._
2020

2121
import org.apache.spark.sql.delta._
2222
import org.apache.spark.sql.delta.DeltaTableUtils.withActiveSession
23-
import org.apache.spark.sql.delta.actions.{Protocol, TableFeatureProtocolUtils}
23+
import org.apache.spark.sql.delta.actions.TableFeatureProtocolUtils
2424
import org.apache.spark.sql.delta.catalog.DeltaTableV2
25-
import org.apache.spark.sql.delta.commands.AlterTableSetPropertiesDeltaCommand
25+
import org.apache.spark.sql.delta.commands.{AlterTableDropFeatureDeltaCommand, AlterTableSetPropertiesDeltaCommand}
2626
import org.apache.spark.sql.delta.sources.DeltaSQLConf
2727
import io.delta.tables.execution._
2828
import org.apache.hadoop.fs.Path
@@ -562,6 +562,73 @@ class DeltaTable private[tables](
562562
toDataset(sparkSession, alterTableCmd)
563563
}
564564

565+
private def executeDropFeature(featureName: String, truncateHistory: Option[Boolean]): Unit = {
566+
val alterTableCmd = AlterTableDropFeatureDeltaCommand(
567+
table = table,
568+
featureName = featureName,
569+
truncateHistory = truncateHistory.getOrElse(false))
570+
toDataset(sparkSession, alterTableCmd)
571+
}
572+
573+
/**
574+
* Modify the protocol to drop a supported feature. The operation always normalizes the
575+
* resulting protocol. Protocol normalization is the process of converting a table features
576+
* protocol to the weakest possible form. This primarily refers to converting a table features
577+
* protocol to a legacy protocol. A table features protocol can be represented with the legacy
578+
* representation only when the feature set of the former exactly matches a legacy protocol.
579+
* Normalization can also decrease the reader version of a table features protocol when it is
580+
* higher than necessary. For example:
581+
*
582+
* (1, 7, None, {AppendOnly, Invariants, CheckConstraints}) -> (1, 3)
583+
* (3, 7, None, {RowTracking}) -> (1, 7, RowTracking)
584+
*
585+
* The dropFeatureSupport method can be used as follows:
586+
* {{{
587+
* io.delta.tables.DeltaTable.dropFeatureSupport("rowTracking")
588+
* }}}
589+
*
590+
* See online documentation for more details.
591+
*
592+
* @param featureName The name of the feature to drop.
593+
* @param truncateHistory Whether to truncate history before downgrading the protocol.
594+
* @return None.
595+
* @since 3.4.0
596+
*/
597+
def dropFeatureSupport(
598+
featureName: String,
599+
truncateHistory: Boolean): Unit = withActiveSession(sparkSession) {
600+
executeDropFeature(featureName, Some(truncateHistory))
601+
}
602+
603+
/**
604+
* Modify the protocol to drop a supported feature. The operation always normalizes the
605+
* resulting protocol. Protocol normalization is the process of converting a table features
606+
* protocol to the weakest possible form. This primarily refers to converting a table features
607+
* protocol to a legacy protocol. A table features protocol can be represented with the legacy
608+
* representation only when the feature set of the former exactly matches a legacy protocol.
609+
* Normalization can also decrease the reader version of a table features protocol when it is
610+
* higher than necessary. For example:
611+
*
612+
* (1, 7, None, {AppendOnly, Invariants, CheckConstraints}) -> (1, 3)
613+
* (3, 7, None, {RowTracking}) -> (1, 7, RowTracking)
614+
*
615+
* The dropFeatureSupport method can be used as follows:
616+
* {{{
617+
* io.delta.tables.DeltaTable.dropFeatureSupport("rowTracking")
618+
* }}}
619+
*
620+
* Note, this command will not truncate history.
621+
*
622+
* See online documentation for more details.
623+
*
624+
* @param featureName The name of the feature to drop.
625+
* @return None.
626+
* @since 3.4.0
627+
*/
628+
def dropFeatureSupport(featureName: String): Unit = withActiveSession(sparkSession) {
629+
executeDropFeature(featureName, None)
630+
}
631+
565632
/**
566633
* Clone a DeltaTable to a given destination to mirror the existing table's data and metadata.
567634
*

spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1044,7 +1044,7 @@ object TestReaderWriterMetadataAutoUpdateFeature
10441044
}
10451045
}
10461046

1047-
private[sql] object TestRemovableWriterFeature
1047+
object TestRemovableWriterFeature
10481048
extends WriterFeature(name = "testRemovableWriter")
10491049
with FeatureAutomaticallyEnabledByMetadata
10501050
with RemovableFeature {
@@ -1093,7 +1093,7 @@ private[sql] object TestRemovableWriterFeatureWithDependency
10931093
Set(TestRemovableReaderWriterFeature, TestRemovableWriterFeature)
10941094
}
10951095

1096-
private[sql] object TestRemovableReaderWriterFeature
1096+
object TestRemovableReaderWriterFeature
10971097
extends ReaderWriterFeature(name = "testRemovableReaderWriter")
10981098
with FeatureAutomaticallyEnabledByMetadata
10991099
with RemovableFeature {

spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -322,7 +322,7 @@ case class AlterTableDropFeatureDeltaCommand(
322322
// Check whether the protocol contains the feature in either the writer features list or
323323
// the reader+writer features list. Note, protocol needs to denormalized to allow dropping
324324
// features from legacy protocols.
325-
val protocol = table.initialSnapshot.protocol
325+
val protocol = table.deltaLog.update().protocol
326326
val protocolContainsFeatureName =
327327
protocol.implicitlyAndExplicitlySupportedFeatures.map(_.name).contains(featureName)
328328
if (!protocolContainsFeatureName) {

spark/src/test/scala/io/delta/tables/DeltaTableSuite.scala

Lines changed: 67 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import java.util.Locale
2222
import scala.language.postfixOps
2323

2424
// scalastyle:off import.ordering.noEmptyLine
25-
import org.apache.spark.sql.delta.{AppendOnlyTableFeature, DeltaIllegalArgumentException, DeltaLog, DeltaTableFeatureException, FakeFileSystem, InvariantsTableFeature, TestReaderWriterFeature, TestWriterFeature}
25+
import org.apache.spark.sql.delta.{AppendOnlyTableFeature, DeltaIllegalArgumentException, DeltaLog, DeltaTableFeatureException, FakeFileSystem, InvariantsTableFeature, TestReaderWriterFeature, TestRemovableReaderWriterFeature, TestRemovableWriterFeature, TestWriterFeature}
2626
import org.apache.spark.sql.delta.actions.{ Metadata, Protocol }
2727
import org.apache.spark.sql.delta.storage.LocalLogStore
2828
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
@@ -247,6 +247,7 @@ class DeltaTableHadoopOptionsSuite extends QueryTest
247247
"cloneAtVersion",
248248
"delete",
249249
"detail",
250+
"dropFeatureSupport",
250251
"generate",
251252
"history",
252253
"merge",
@@ -631,8 +632,7 @@ class DeltaTableHadoopOptionsSuite extends QueryTest
631632
}
632633
}
633634

634-
test(
635-
"addFeatureSupport - with filesystem options.") {
635+
test("addFeatureSupport - with filesystem options.") {
636636
withTempDir { dir =>
637637
val path = fakeFileSystemPath(dir)
638638
val fsOptions = fakeFileSystemOptions
@@ -670,6 +670,70 @@ class DeltaTableHadoopOptionsSuite extends QueryTest
670670
}
671671
}
672672

673+
test("dropFeatureSupport - with filesystem options.") {
674+
withTempDir { dir =>
675+
val path = fakeFileSystemPath(dir)
676+
val fsOptions = fakeFileSystemOptions
677+
678+
// create a table with a default Protocol.
679+
val testSchema = spark.range(1).schema
680+
val log = DeltaLog.forTable(spark, new Path(path), fsOptions)
681+
log.createLogDirectoriesIfNotExists()
682+
log.store.write(
683+
FileNames.unsafeDeltaFile(log.logPath, 0),
684+
Iterator(Metadata(schemaString = testSchema.json).json, Protocol(1, 2).json),
685+
overwrite = false,
686+
log.newDeltaHadoopConf())
687+
log.update()
688+
689+
// update the protocol to support a writer feature.
690+
val table = DeltaTable.forPath(spark, path, fsOptions)
691+
table.addFeatureSupport(TestRemovableWriterFeature.name)
692+
assert(log.update().protocol === Protocol(1, 7).withFeatures(Seq(
693+
AppendOnlyTableFeature,
694+
InvariantsTableFeature,
695+
TestRemovableWriterFeature)))
696+
697+
// Attempt truncating the history when dropping a feature that is not required.
698+
// This verifies the truncateHistory option was correctly passed.
699+
assert(intercept[DeltaTableFeatureException] {
700+
table.dropFeatureSupport("testRemovableWriter", truncateHistory = true)
701+
}.getErrorClass === "DELTA_FEATURE_DROP_HISTORY_TRUNCATION_NOT_ALLOWED")
702+
703+
// Drop feature.
704+
table.dropFeatureSupport(TestRemovableWriterFeature.name)
705+
// After dropping the feature we should return back to the original protocol.
706+
assert(log.update().protocol === Protocol(1, 2))
707+
708+
table.addFeatureSupport(TestRemovableReaderWriterFeature.name)
709+
assert(
710+
log.update().protocol === Protocol(3, 7).withFeatures(Seq(
711+
AppendOnlyTableFeature,
712+
InvariantsTableFeature,
713+
TestRemovableReaderWriterFeature)))
714+
715+
// Drop feature.
716+
table.dropFeatureSupport(TestRemovableReaderWriterFeature.name)
717+
// After dropping the feature we should return back to the original protocol.
718+
assert(log.update().protocol === Protocol(1, 2))
719+
720+
// Try to drop an unsupported feature.
721+
assert(intercept[DeltaTableFeatureException] {
722+
table.dropFeatureSupport("__invalid_feature__")
723+
}.getErrorClass === "DELTA_FEATURE_DROP_UNSUPPORTED_CLIENT_FEATURE")
724+
725+
// Try to drop a feature that is not present in the protocol.
726+
assert(intercept[DeltaTableFeatureException] {
727+
table.dropFeatureSupport(TestRemovableReaderWriterFeature.name)
728+
}.getErrorClass === "DELTA_FEATURE_DROP_FEATURE_NOT_PRESENT")
729+
730+
// Try to drop a non-removable feature.
731+
assert(intercept[DeltaTableFeatureException] {
732+
table.dropFeatureSupport(TestReaderWriterFeature.name)
733+
}.getErrorClass === "DELTA_FEATURE_DROP_NONREMOVABLE_FEATURE")
734+
}
735+
}
736+
673737
test("details - with filesystem options.") {
674738
withTempDir{ dir =>
675739
val path = fakeFileSystemPath(dir)

0 commit comments

Comments
 (0)