Skip to content

Commit 50a99fd

Browse files
Disallow enabling column mapping if invalid column mapping metadata is already present (#4167)
#### Which Delta project/connector is this regarding? - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description As effect of earlier bugs (e.g. fixed in #3487) there can exists tables where column mapping is disabled, but there is column mapping metadata on the table. Enabling column mapping metadata on such a table could lead to unexpected corruption. Simply stripping such metadata could also lead to curruptions, as the invalid metadata can be already used in other places (e.g. column statistics) via DeltaColumnMapping.getPhysicalName, which returns the name from the metadata even when column mapping is disabled. After #3688 it should no longer be possible to end up with tables having such invalid metadata, so the issue only concerns existing tables created before that fix. To avoid corruption, we want to disallow enabling column mapping on such tables. ## How was this patch tested? Added tests to DeltaColumnMappingSuite. ## Does this PR introduce _any_ user-facing changes? No. We are disallowing an operation on tables that would lead to Delta table corruption on tables that are already in an invalid state entering which is fixed already, so it can only concern old tables in the wild. --------- Co-authored-by: Julek Sompolski <Juliusz Sompolski>
1 parent 66d8099 commit 50a99fd

File tree

5 files changed

+82
-0
lines changed

5 files changed

+82
-0
lines changed

spark/src/main/resources/error/delta-error-classes.json

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -869,6 +869,13 @@
869869
],
870870
"sqlState" : "42K03"
871871
},
872+
"DELTA_ENABLING_COLUMN_MAPPING_DISALLOWED_WHEN_COLUMN_MAPPING_METADATA_ALREADY_EXISTS" : {
873+
"message" : [
874+
"Enabling column mapping when column mapping metadata is already present in schema is not supported.",
875+
"To use column mapping, create a new table and reload the data into it."
876+
],
877+
"sqlState" : "XXKDS"
878+
},
872879
"DELTA_EXCEED_CHAR_VARCHAR_LIMIT" : {
873880
"message" : [
874881
"Value \"<value>\" exceeds char/varchar type length limitation. Failed check: <expr>."

spark/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,18 @@ trait DeltaColumnMappingBase extends DeltaLogging {
174174
}
175175
}
176176

177+
// If column mapping was disabled, but there was already column mapping in the schema, it is
178+
// a result of a bug in the previous version of Delta. This should no longer happen with the
179+
// stripping done above. For existing tables with this issue, we should not allow enabling
180+
// column mapping, to prevent further corruption.
181+
if (spark.conf.get(DeltaSQLConf.
182+
DELTA_COLUMN_MAPPING_DISALLOW_ENABLING_WHEN_METADATA_ALREADY_EXISTS)) {
183+
if (oldMappingMode == NoMapping && newMappingMode != NoMapping &&
184+
schemaHasColumnMappingMetadata(oldMetadata.schema)) {
185+
throw DeltaErrors.enablingColumnMappingDisallowedWhenColumnMappingMetadataAlreadyExists()
186+
}
187+
}
188+
177189
updatedMetadata = updateColumnMappingMetadata(
178190
oldMetadata, updatedMetadata, isChangingModeOnExistingTable, isOverwriteSchema)
179191

spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2126,6 +2126,12 @@ trait DeltaErrorsBase
21262126
messageParameters = Array(oldMode, newMode))
21272127
}
21282128

2129+
def enablingColumnMappingDisallowedWhenColumnMappingMetadataAlreadyExists(): Throwable = {
2130+
new DeltaColumnMappingUnsupportedException(
2131+
errorClass =
2132+
"DELTA_ENABLING_COLUMN_MAPPING_DISALLOWED_WHEN_COLUMN_MAPPING_METADATA_ALREADY_EXISTS")
2133+
}
2134+
21292135
def generateManifestWithColumnMappingNotSupported: Throwable = {
21302136
new DeltaColumnMappingUnsupportedException(
21312137
errorClass = "DELTA_UNSUPPORTED_MANIFEST_GENERATION_WITH_COLUMN_MAPPING")

spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1953,6 +1953,18 @@ trait DeltaSQLConfBase {
19531953
.booleanConf
19541954
.createWithDefault(true)
19551955

1956+
val DELTA_COLUMN_MAPPING_DISALLOW_ENABLING_WHEN_METADATA_ALREADY_EXISTS =
1957+
buildConf("columnMapping.disallowEnablingWhenColumnMappingMetadataAlreadyExists")
1958+
.doc(
1959+
"""
1960+
|If Delta table already has column mapping metadata before the feature is enabled, it is
1961+
|as a result of a corruption or a bug. Enabling column mapping in such a case can lead to
1962+
|further corruption of the table and should be disallowed.
1963+
|""".stripMargin)
1964+
.internal()
1965+
.booleanConf
1966+
.createWithDefault(true)
1967+
19561968
val DYNAMIC_PARTITION_OVERWRITE_ENABLED =
19571969
buildConf("dynamicPartitionOverwrite.enabled")
19581970
.doc("Whether to overwrite partitions dynamically when 'partitionOverwriteMode' is set to " +

spark/src/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingSuite.scala

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2106,4 +2106,49 @@ class DeltaColumnMappingSuite extends QueryTest
21062106
s"Supported modes are: $supportedModes"))
21072107
}
21082108
}
2109+
2110+
test("enabling column mapping disallowed if column mapping metadata already exists") {
2111+
withSQLConf(
2112+
// enabling this fixes the issue of committing invalid metadata in the first place
2113+
DeltaSQLConf.DELTA_COLUMN_MAPPING_STRIP_METADATA.key -> "false"
2114+
) {
2115+
withTempDir { dir =>
2116+
val path = dir.getCanonicalPath
2117+
val deltaLog = DeltaLog.forTable(spark, path)
2118+
deltaLog.withNewTransaction(catalogTableOpt = None) { txn =>
2119+
val schema =
2120+
new StructType().add("id", IntegerType, true, withIdAndPhysicalName(0, "col-0"))
2121+
val metadata = actions.Metadata(
2122+
name = "test_table",
2123+
schemaString = schema.json,
2124+
configuration = Map(DeltaConfigs.COLUMN_MAPPING_MODE.key -> NoMapping.name)
2125+
)
2126+
txn.updateMetadata(metadata)
2127+
txn.commit(Seq.empty, DeltaOperations.ManualUpdate)
2128+
2129+
// Enabling the config will disallow enabling column mapping.
2130+
withSQLConf(DeltaSQLConf
2131+
.DELTA_COLUMN_MAPPING_DISALLOW_ENABLING_WHEN_METADATA_ALREADY_EXISTS.key
2132+
-> "true") {
2133+
val e = intercept[DeltaColumnMappingUnsupportedException] {
2134+
alterTableWithProps(
2135+
s"delta.`$path`",
2136+
Map(DeltaConfigs.COLUMN_MAPPING_MODE.key -> NameMapping.name))
2137+
}
2138+
assert(e.getErrorClass ==
2139+
"DELTA_ENABLING_COLUMN_MAPPING_DISALLOWED_WHEN_COLUMN_MAPPING_METADATA_ALREADY_EXISTS")
2140+
}
2141+
2142+
// Disabling the config will allow enabling column mapping.
2143+
withSQLConf(DeltaSQLConf
2144+
.DELTA_COLUMN_MAPPING_DISALLOW_ENABLING_WHEN_METADATA_ALREADY_EXISTS.key
2145+
-> "false") {
2146+
alterTableWithProps(
2147+
s"delta.`$path`",
2148+
Map(DeltaConfigs.COLUMN_MAPPING_MODE.key -> NameMapping.name))
2149+
}
2150+
}
2151+
}
2152+
}
2153+
}
21092154
}

0 commit comments

Comments
 (0)