Skip to content

Commit e70ca04

Browse files
authored
[Spark] Fix type widening with char/varchar columns (#3744)
## Description Using type widening on a table that contains a char/varchar column causes the following reads to fail with `DELTA_UNSUPPORTED_TYPE_CHANGE_IN_SCHEMA`: ``` CREATE TABLE t (a VARCHAR(10), b INT); ALTER TABLE t SET TBLPROPERTIES ('delta.enableTypeWidening' = 'true'); ALTER TABLE t ALTER COLUMN b TYPE LONG; SELECT * FROM t; [DELTA_UNSUPPORTED_TYPE_CHANGE_IN_SCHEMA] Unable to operate on this table because an unsupported type change was applied. Field cut was changed from VARCHAR(10) to STRING` ``` Type changes are recorded in the table metadata and a check on read ensures that all type changes are supported by the current implementation as attempting to read data after an unsupported type change could lead to incorrect results. CHAR/VARCHAR columns are sometimes stripped down to STRING internally, for that reason, ALTER TABLE incorrectly identify that column `a` type changed to STRING and records it in the type widening metadata. The read check in turn doesn't recognize that type change as one of the supported widening type changes (which doesn't include changes to string columns). Fix: 1. Never record char/varchar/string type changes in the type widening metadata 2. Never record unsupported type changes in the type widening metadata and log an assertion instead. 3. Don't fail on char/varchar/string type changes in the type widening metadata if such type change slips through 1. This will prevent failing in case a non-compliant implementation still record a char/varchar/string type change. 4. Provide a table property to bypass the check if a similar issue happens again in the future.
1 parent 3b15f0e commit e70ca04

File tree

8 files changed

+191
-56
lines changed

8 files changed

+191
-56
lines changed

spark/src/main/scala/org/apache/spark/sql/delta/DeltaParquetFileFormat.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import org.apache.spark.sql.delta.deletionvectors.{DropMarkedRowsFilter, KeepAll
2727
import org.apache.spark.sql.delta.logging.DeltaLogKeys
2828
import org.apache.spark.sql.delta.schema.SchemaMergingUtils
2929
import org.apache.spark.sql.delta.sources.DeltaSQLConf
30+
import org.apache.spark.sql.util.ScalaExtensions._
3031
import org.apache.hadoop.conf.Configuration
3132
import org.apache.hadoop.fs.Path
3233
import org.apache.hadoop.mapreduce.Job
@@ -73,7 +74,10 @@ case class DeltaParquetFileFormat(
7374
}
7475
}
7576

76-
TypeWidening.assertTableReadable(protocol, metadata)
77+
SparkSession.getActiveSession.ifDefined { session =>
78+
TypeWidening.assertTableReadable(session.sessionState.conf, protocol, metadata)
79+
}
80+
7781

7882
val columnMappingMode: DeltaColumnMappingMode = metadata.columnMappingMode
7983
val referenceSchema: StructType = metadata.schema

spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@
1717
package org.apache.spark.sql.delta
1818

1919
import org.apache.spark.sql.delta.actions.{AddFile, Metadata, Protocol, TableFeatureProtocolUtils}
20+
import org.apache.spark.sql.delta.sources.DeltaSQLConf
2021

2122
import org.apache.spark.sql.functions.{col, lit}
23+
import org.apache.spark.sql.internal.SQLConf
2224
import org.apache.spark.sql.types._
2325

2426
object TypeWidening {
@@ -80,15 +82,22 @@ object TypeWidening {
8082
* happen unless a non-compliant writer applied a type change that is not part of the feature
8183
* specification.
8284
*/
83-
def assertTableReadable(protocol: Protocol, metadata: Metadata): Unit = {
84-
if (!isSupported(protocol) ||
85+
def assertTableReadable(conf: SQLConf, protocol: Protocol, metadata: Metadata): Unit = {
86+
if (conf.getConf(DeltaSQLConf.DELTA_TYPE_WIDENING_BYPASS_UNSUPPORTED_TYPE_CHANGE_CHECK) ||
87+
!isSupported(protocol) ||
8588
!TypeWideningMetadata.containsTypeWideningMetadata(metadata.schema)) {
8689
return
8790
}
8891

8992
TypeWideningMetadata.getAllTypeChanges(metadata.schema).foreach {
9093
case (_, TypeChange(_, from: AtomicType, to: AtomicType, _))
9194
if isTypeChangeSupported(from, to) =>
95+
// Char/Varchar/String type changes are allowed and independent from type widening.
96+
// Implementations shouldn't record these type changes in the table metadata per the Delta
97+
// spec, but in case that happen we really shouldn't block reading the table.
98+
case (_, TypeChange(_,
99+
_: StringType | CharType(_) | VarcharType(_),
100+
_: StringType | CharType(_) | VarcharType(_), _)) =>
92101
case (fieldPath, TypeChange(_, from: AtomicType, to: AtomicType, _))
93102
if stableFeatureCanReadTypeChange(from, to) =>
94103
val featureName = if (protocol.isFeatureSupported(TypeWideningPreviewTableFeature)) {

spark/src/main/scala/org/apache/spark/sql/delta/TypeWideningMetadata.scala

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -188,14 +188,28 @@ private[delta] object TypeWideningMetadata extends DeltaLogging {
188188
collectTypeChanges(from.elementType, to.elementType).map { typeChange =>
189189
typeChange.copy(fieldPath = "element" +: typeChange.fieldPath)
190190
}
191-
case (fromType: AtomicType, toType: AtomicType) if fromType != toType =>
191+
case (fromType: AtomicType, toType: AtomicType) if fromType != toType &&
192+
TypeWidening.isTypeChangeSupported(fromType, toType) =>
192193
Seq(TypeChange(
193194
version = None,
194195
fromType,
195196
toType,
196197
fieldPath = Seq.empty
197198
))
198-
case (_: AtomicType, _: AtomicType) => Seq.empty
199+
// Char/Varchar/String type changes are expected and unrelated to type widening. We don't record
200+
// them in the table schema metadata and don't log them as unexpected type changes either,
201+
case (StringType | CharType(_) | VarcharType(_), StringType | CharType(_) | VarcharType(_)) =>
202+
Seq.empty
203+
case (_: AtomicType, _: AtomicType) =>
204+
deltaAssert(fromType == toType,
205+
name = "typeWidening.unexpectedTypeChange",
206+
msg = s"Trying to apply an unsupported type change: $fromType to $toType",
207+
data = Map(
208+
"fromType" -> fromType.sql,
209+
"toType" -> toType.sql
210+
)
211+
)
212+
Seq.empty
199213
// Don't recurse inside structs, `collectTypeChanges` should be called directly on each struct
200214
// fields instead to only collect type changes inside these fields.
201215
case (_: StructType, _: StructType) => Seq.empty

spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -720,7 +720,10 @@ case class AlterTableChangeColumnDeltaCommand(
720720
StatisticsCollection.renameDeltaStatsColumn(metadata, oldColumnPath, newColumnPath)
721721

722722
val newSchemaWithTypeWideningMetadata =
723-
TypeWideningMetadata.addTypeWideningMetadata(txn, schema = newSchema, oldSchema = oldSchema)
723+
TypeWideningMetadata.addTypeWideningMetadata(
724+
txn,
725+
schema = newSchema,
726+
oldSchema = metadata.schema)
724727

725728
val newMetadata = metadata.copy(
726729
schemaString = newSchemaWithTypeWideningMetadata.json,

spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1074,6 +1074,20 @@ trait DeltaSQLConfBase {
10741074
.booleanConf
10751075
.createWithDefault(true)
10761076

1077+
/**
1078+
* Internal config to bypass the check that ensures a table doesn't contain any unsupported type
1079+
* change when reading it. Meant as a mitigation in case the check incorrectly flags valid cases.
1080+
*/
1081+
val DELTA_TYPE_WIDENING_BYPASS_UNSUPPORTED_TYPE_CHANGE_CHECK =
1082+
buildConf("typeWidening.bypassUnsupportedTypeChangeCheck")
1083+
.internal()
1084+
.doc("""
1085+
| Disables check that ensures a table doesn't contain any unsupported type change when
1086+
| reading it.
1087+
|""".stripMargin)
1088+
.booleanConf
1089+
.createWithDefault(false)
1090+
10771091
val DELTA_IS_DELTA_TABLE_THROW_ON_ERROR =
10781092
buildConf("isDeltaTable.throwOnError")
10791093
.internal()

spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningFeatureCompatibilitySuite.scala

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,41 @@ trait TypeWideningCompatibilityTests {
120120
assert(latestVersion.schema("a").dataType === ShortType)
121121
checkAnswer(latestVersion, Seq(Row(1), Row(2)))
122122
}
123+
124+
test("compatibility with char/varchar columns") {
125+
sql(s"CREATE TABLE delta.`$tempPath` (a byte, c char(3), v varchar(3)) USING DELTA")
126+
append(Seq((1.toByte, "abc", "def")).toDF("a", "c", "v"))
127+
checkAnswer(readDeltaTable(tempPath), Seq(Row(1, "abc", "def")))
128+
129+
sql(s"ALTER TABLE delta.`$tempPath` CHANGE COLUMN a TYPE smallint")
130+
append(Seq((2.toShort, "ghi", "jkl")).toDF("a", "c", "v"))
131+
assert(readDeltaTable(tempPath).schema ===
132+
new StructType()
133+
.add("a", ShortType, nullable = true,
134+
metadata = typeWideningMetadata(version = 2, ByteType, ShortType))
135+
.add("c", StringType, nullable = true,
136+
metadata = new MetadataBuilder()
137+
.putString("__CHAR_VARCHAR_TYPE_STRING", "char(3)")
138+
.build()
139+
)
140+
.add("v", StringType, nullable = true,
141+
metadata = new MetadataBuilder()
142+
.putString("__CHAR_VARCHAR_TYPE_STRING", "varchar(3)")
143+
.build()))
144+
checkAnswer(readDeltaTable(tempPath), Seq(Row(1, "abc", "def"), Row(2, "ghi", "jkl")))
145+
146+
sql(s"ALTER TABLE delta.`$tempPath` CHANGE COLUMN c TYPE string")
147+
sql(s"ALTER TABLE delta.`$tempPath` CHANGE COLUMN v TYPE string")
148+
append(Seq((3.toShort, "longer string 1", "longer string 2")).toDF("a", "c", "v"))
149+
assert(readDeltaTable(tempPath).schema ===
150+
new StructType()
151+
.add("a", ShortType, nullable = true,
152+
metadata = typeWideningMetadata(version = 2, ByteType, ShortType))
153+
.add("c", StringType)
154+
.add("v", StringType))
155+
checkAnswer(readDeltaTable(tempPath),
156+
Seq(Row(1, "abc", "def"), Row(2, "ghi", "jkl"), Row(3, "longer string 1", "longer string 2")))
157+
}
123158
}
124159

125160
/** Trait collecting tests covering type widening + column mapping. */

spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningMetadataSuite.scala

Lines changed: 33 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -292,33 +292,26 @@ trait TypeWideningMetadataTests extends QueryTest with DeltaSQLCommandTest {
292292

293293
test("addTypeWideningMetadata/removeTypeWideningMetadata on top-level fields") {
294294
val schemaWithoutMetadata =
295-
StructType.fromDDL("i long, d decimal(15, 4), a array<double>, m map<short, int>")
295+
StructType.fromDDL("i int, a array<int>, m map<short, int>")
296296
val firstOldSchema =
297-
StructType.fromDDL("i short, d decimal(6, 2), a array<byte>, m map<byte, int>")
297+
StructType.fromDDL("i byte, a array<byte>, m map<byte, int>")
298298
val secondOldSchema =
299-
StructType.fromDDL("i int, d decimal(10, 4), a array<int>, m map<short, byte>")
299+
StructType.fromDDL("i short, a array<short>, m map<short, byte>")
300300

301301
var schema =
302302
TypeWideningMetadata.addTypeWideningMetadata(txn, schemaWithoutMetadata, firstOldSchema)
303303

304-
assert(schema("i") === StructField("i", LongType,
304+
assert(schema("i") === StructField("i", IntegerType,
305305
metadata = new MetadataBuilder()
306306
.putMetadataArray("delta.typeChanges", Array(
307-
typeChangeMetadata("short", "long")
307+
typeChangeMetadata("byte", "integer")
308308
)).build()
309309
))
310310

311-
assert(schema("d") === StructField("d", DecimalType(15, 4),
311+
assert(schema("a") === StructField("a", ArrayType(IntegerType),
312312
metadata = new MetadataBuilder()
313313
.putMetadataArray("delta.typeChanges", Array(
314-
typeChangeMetadata("decimal(6,2)", "decimal(15,4)")
315-
)).build()
316-
))
317-
318-
assert(schema("a") === StructField("a", ArrayType(DoubleType),
319-
metadata = new MetadataBuilder()
320-
.putMetadataArray("delta.typeChanges", Array(
321-
typeChangeMetadata("byte", "double", "element")
314+
typeChangeMetadata("byte", "integer", "element")
322315
)).build()
323316
))
324317

@@ -332,34 +325,25 @@ trait TypeWideningMetadataTests extends QueryTest with DeltaSQLCommandTest {
332325
assert(TypeWideningMetadata.removeTypeWideningMetadata(schema) ===
333326
schemaWithoutMetadata -> Seq(
334327
Seq.empty -> schema("i"),
335-
Seq.empty -> schema("d"),
336328
Seq.empty -> schema("a"),
337329
Seq.empty -> schema("m")
338330
))
339331
// Second type change on all fields.
340332
schema = TypeWideningMetadata.addTypeWideningMetadata(txn, schema, secondOldSchema)
341333

342-
assert(schema("i") === StructField("i", LongType,
334+
assert(schema("i") === StructField("i", IntegerType,
343335
metadata = new MetadataBuilder()
344336
.putMetadataArray("delta.typeChanges", Array(
345-
typeChangeMetadata("short", "long"),
346-
typeChangeMetadata("integer", "long")
337+
typeChangeMetadata("byte", "integer"),
338+
typeChangeMetadata("short", "integer")
347339
)).build()
348340
))
349341

350-
assert(schema("d") === StructField("d", DecimalType(15, 4),
342+
assert(schema("a") === StructField("a", ArrayType(IntegerType),
351343
metadata = new MetadataBuilder()
352344
.putMetadataArray("delta.typeChanges", Array(
353-
typeChangeMetadata("decimal(6,2)", "decimal(15,4)"),
354-
typeChangeMetadata("decimal(10,4)", "decimal(15,4)")
355-
)).build()
356-
))
357-
358-
assert(schema("a") === StructField("a", ArrayType(DoubleType),
359-
metadata = new MetadataBuilder()
360-
.putMetadataArray("delta.typeChanges", Array(
361-
typeChangeMetadata("byte", "double", "element"),
362-
typeChangeMetadata("integer", "double", "element")
345+
typeChangeMetadata("byte", "integer", "element"),
346+
typeChangeMetadata("short", "integer", "element")
363347
)).build()
364348
))
365349

@@ -374,44 +358,43 @@ trait TypeWideningMetadataTests extends QueryTest with DeltaSQLCommandTest {
374358
assert(TypeWideningMetadata.removeTypeWideningMetadata(schema) ===
375359
schemaWithoutMetadata -> Seq(
376360
Seq.empty -> schema("i"),
377-
Seq.empty -> schema("d"),
378361
Seq.empty -> schema("a"),
379362
Seq.empty -> schema("m")
380363
))
381364
}
382365

383366
test("addTypeWideningMetadata/removeTypeWideningMetadata on nested fields") {
384367
val schemaWithoutMetadata = StructType.fromDDL(
385-
"s struct<i: long, a: array<map<int, long>>, m: map<map<long, int>, array<long>>>")
368+
"s struct<i: int, a: array<map<int, int>>, m: map<map<int, int>, array<int>>>")
386369
val firstOldSchema = StructType.fromDDL(
387-
"s struct<i: short, a: array<map<byte, long>>, m: map<map<int, int>, array<long>>>")
370+
"s struct<i: byte, a: array<map<byte, int>>, m: map<map<short, int>, array<int>>>")
388371
val secondOldSchema = StructType.fromDDL(
389-
"s struct<i: int, a: array<map<int, int>>, m: map<map<long, int>, array<int>>>")
372+
"s struct<i: short, a: array<map<int, short>>, m: map<map<int, int>, array<short>>>")
390373

391374
// First type change on all struct fields.
392375
var schema =
393376
TypeWideningMetadata.addTypeWideningMetadata(txn, schemaWithoutMetadata, firstOldSchema)
394377
var struct = schema("s").dataType.asInstanceOf[StructType]
395378

396-
assert(struct("i") === StructField("i", LongType,
379+
assert(struct("i") === StructField("i", IntegerType,
397380
metadata = new MetadataBuilder()
398381
.putMetadataArray("delta.typeChanges", Array(
399-
typeChangeMetadata("short", "long")
382+
typeChangeMetadata("byte", "integer")
400383
)).build()
401384
))
402385

403-
assert(struct("a") === StructField("a", ArrayType(MapType(IntegerType, LongType)),
386+
assert(struct("a") === StructField("a", ArrayType(MapType(IntegerType, IntegerType)),
404387
metadata = new MetadataBuilder()
405388
.putMetadataArray("delta.typeChanges", Array(
406389
typeChangeMetadata("byte", "integer", "element.key")
407390
)).build()
408391
))
409392

410393
assert(struct("m") === StructField("m",
411-
MapType(MapType(LongType, IntegerType), ArrayType(LongType)),
394+
MapType(MapType(IntegerType, IntegerType), ArrayType(IntegerType)),
412395
metadata = new MetadataBuilder()
413396
.putMetadataArray("delta.typeChanges", Array(
414-
typeChangeMetadata("integer", "long", "key.key")
397+
typeChangeMetadata("short", "integer", "key.key")
415398
)).build()
416399
))
417400

@@ -426,28 +409,28 @@ trait TypeWideningMetadataTests extends QueryTest with DeltaSQLCommandTest {
426409
schema = TypeWideningMetadata.addTypeWideningMetadata(txn, schema, secondOldSchema)
427410
struct = schema("s").dataType.asInstanceOf[StructType]
428411

429-
assert(struct("i") === StructField("i", LongType,
412+
assert(struct("i") === StructField("i", IntegerType,
430413
metadata = new MetadataBuilder()
431414
.putMetadataArray("delta.typeChanges", Array(
432-
typeChangeMetadata("short", "long"),
433-
typeChangeMetadata("integer", "long")
415+
typeChangeMetadata("byte", "integer"),
416+
typeChangeMetadata("short", "integer")
434417
)).build()
435418
))
436419

437-
assert(struct("a") === StructField("a", ArrayType(MapType(IntegerType, LongType)),
420+
assert(struct("a") === StructField("a", ArrayType(MapType(IntegerType, IntegerType)),
438421
metadata = new MetadataBuilder()
439422
.putMetadataArray("delta.typeChanges", Array(
440423
typeChangeMetadata("byte", "integer", "element.key"),
441-
typeChangeMetadata("integer", "long", "element.value")
424+
typeChangeMetadata("short", "integer", "element.value")
442425
)).build()
443426
))
444427

445428
assert(struct("m") === StructField("m",
446-
MapType(MapType(LongType, IntegerType), ArrayType(LongType)),
429+
MapType(MapType(IntegerType, IntegerType), ArrayType(IntegerType)),
447430
metadata = new MetadataBuilder()
448431
.putMetadataArray("delta.typeChanges", Array(
449-
typeChangeMetadata("integer", "long", "key.key"),
450-
typeChangeMetadata("integer", "long", "value.element")
432+
typeChangeMetadata("short", "integer", "key.key"),
433+
typeChangeMetadata("short", "integer", "value.element")
451434
)).build()
452435
))
453436
assert(TypeWideningMetadata.removeTypeWideningMetadata(schema) ===
@@ -459,18 +442,18 @@ trait TypeWideningMetadataTests extends QueryTest with DeltaSQLCommandTest {
459442
}
460443

461444
test("addTypeWideningMetadata/removeTypeWideningMetadata with added and removed fields") {
462-
val newSchema = StructType.fromDDL("a int, b long, d int")
463-
val oldSchema = StructType.fromDDL("a int, b int, c int")
445+
val newSchema = StructType.fromDDL("a int, b int, d int")
446+
val oldSchema = StructType.fromDDL("a int, b short, c int")
464447

465448
val schema = TypeWideningMetadata.addTypeWideningMetadata(txn, newSchema, oldSchema)
466449
assert(schema("a") === StructField("a", IntegerType))
467450
assert(schema("d") === StructField("d", IntegerType))
468451
assert(!schema.contains("c"))
469452

470-
assert(schema("b") === StructField("b", LongType,
453+
assert(schema("b") === StructField("b", IntegerType,
471454
metadata = new MetadataBuilder()
472455
.putMetadataArray("delta.typeChanges", Array(
473-
typeChangeMetadata("integer", "long")
456+
typeChangeMetadata("short", "integer")
474457
)).build()
475458
))
476459
assert(TypeWideningMetadata.removeTypeWideningMetadata(schema) ===

0 commit comments

Comments
 (0)