Skip to content

Commit 579a315

Browse files
sabir-akhadovallisonport-db
authored andcommitted
Disallow overwriteSchema with dynamic partitions overwrite
Disallow overwriteSchema when partitionOverwriteMode is set to dynamic. Otherwise, the table might become corrupted as schemas of newly written partitions would not match the non-overwritten partitions. GitOrigin-RevId: 1012793448c1ffed9a3f8bde507d9fe1ee183803
1 parent 6556d6f commit 579a315

File tree

4 files changed

+36
-2
lines changed

4 files changed

+36
-2
lines changed

core/src/main/resources/error/delta-error-classes.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1346,6 +1346,12 @@
13461346
],
13471347
"sqlState" : "0A000"
13481348
},
1349+
"DELTA_OVERWRITE_SCHEMA_WITH_DYNAMIC_PARTITION_OVERWRITE" : {
1350+
"message" : [
1351+
"'overwriteSchema' cannot be used in dynamic partition overwrite mode."
1352+
],
1353+
"sqlState" : "42613"
1354+
},
13491355
"DELTA_PARTITION_COLUMN_CAST_FAILED" : {
13501356
"message" : [
13511357
"Failed to cast value `<value>` to `<dataType>` for partition column `<columnName>`"

core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2403,6 +2403,12 @@ trait DeltaErrorsBase
24032403
)
24042404
}
24052405

2406+
def overwriteSchemaUsedWithDynamicPartitionOverwrite(): Throwable = {
2407+
new DeltaIllegalArgumentException(
2408+
errorClass = "DELTA_OVERWRITE_SCHEMA_WITH_DYNAMIC_PARTITION_OVERWRITE"
2409+
)
2410+
}
2411+
24062412
def replaceWhereUsedInOverwrite(): Throwable = {
24072413
new DeltaAnalysisException(
24082414
errorClass = "DELTA_REPLACE_WHERE_IN_OVERWRITE", messageParameters = Array.empty

core/src/main/scala/org/apache/spark/sql/delta/commands/WriteIntoDelta.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,10 @@ case class WriteIntoDelta(
235235
} else options.isDynamicPartitionOverwriteMode
236236
}
237237

238+
if (useDynamicPartitionOverwriteMode && canOverwriteSchema) {
239+
throw DeltaErrors.overwriteSchemaUsedWithDynamicPartitionOverwrite()
240+
}
241+
238242
// Validate partition predicates
239243
var containsDataFilters = false
240244
val replaceWhere = options.replaceWhere.flatMap { replace =>

core/src/test/scala/org/apache/spark/sql/delta/DeltaOptionSuite.scala

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,10 @@ package org.apache.spark.sql.delta
1919
import java.util.Locale
2020

2121
// scalastyle:off import.ordering.noEmptyLine
22-
import org.apache.spark.sql.delta.DeltaOptions.PARTITION_OVERWRITE_MODE_OPTION
22+
import org.apache.spark.sql.delta.DeltaOptions.{OVERWRITE_SCHEMA_OPTION, PARTITION_OVERWRITE_MODE_OPTION}
2323
import org.apache.spark.sql.delta.actions.{Action, FileAction}
2424
import org.apache.spark.sql.delta.sources.DeltaSQLConf
2525
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
26-
import org.apache.spark.sql.delta.test.DeltaTestImplicits._
2726
import org.apache.spark.sql.delta.util.FileNames
2827
import org.apache.commons.io.FileUtils
2928
import org.apache.parquet.format.CompressionCodec
@@ -283,4 +282,23 @@ class DeltaOptionSuite extends QueryTest
283282
}
284283
}
285284
}
285+
286+
test("overwriteSchema=true should be invalid with partitionOverwriteMode=dynamic") {
287+
withTempDir { tempDir =>
288+
val e = intercept[DeltaIllegalArgumentException] {
289+
withSQLConf(DeltaSQLConf.DYNAMIC_PARTITION_OVERWRITE_ENABLED.key -> "true") {
290+
Seq(1, 2, 3).toDF
291+
.withColumn("part", $"value" % 2)
292+
.write
293+
.mode("overwrite")
294+
.format("delta")
295+
.partitionBy("part")
296+
.option(OVERWRITE_SCHEMA_OPTION, "true")
297+
.option(PARTITION_OVERWRITE_MODE_OPTION, "dynamic")
298+
.save(tempDir.getAbsolutePath)
299+
}
300+
}
301+
assert(e.getErrorClass == "DELTA_OVERWRITE_SCHEMA_WITH_DYNAMIC_PARTITION_OVERWRITE")
302+
}
303+
}
286304
}

0 commit comments

Comments
 (0)