Skip to content

Commit ff5b36f

Browse files
authored
[Spark] Allow type widening for all supported type changes with Spark 4.0 (#3024)
This PR adds shims to ungate the remaining type changes that only work with Spark 4.0 / master. Spark 4.0 contains the required changes to Parquet readers to be able to read the data after applying the type changes. ## Description Extend the list of supported type changes for type widening to include changes that can be supported with Spark 4.0: - (byte, short, int) -> long - float -> double - date -> timestampNTZ - (byte, short, int) -> double - decimal -> decimal (with increased precision/scale that doesn't cause precision loss) - (byte, short, int, long) -> decimal Shims are added to support these changes when compiling against Spark 4.0/master and to only allow `byte` -> `short` - > `int` when compiling against Spark 3.5. ## How was this patch tested? Adding test cases for the new type changes in the existing type widening test suites. The list of supported / unsupported changes covered in tests differs between Spark 3.5 and Spark 4.0, shims are also provided to handle this. ## Does this PR introduce _any_ user-facing changes? Yes: allow using the listed type changes with type widening, either via `ALTER TABLE CHANGE COLUMN TYPE` or during schema evolution in MERGE and INSERT.
1 parent bfb5c94 commit ff5b36f

File tree

7 files changed

+513
-110
lines changed

7 files changed

+513
-110
lines changed
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Copyright (2024) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.apache.spark.sql.delta
18+
19+
import org.apache.spark.sql.catalyst.expressions.Cast
20+
import org.apache.spark.sql.types._
21+
22+
/**
23+
* Type widening only supports a limited set of type changes with Spark 3.5 due to the parquet
24+
* readers lacking the corresponding conversions that were added in Spark 4.0.
25+
* This shim is for Delta on Spark 3.5 which supports:
26+
* - byte -> short -> int
27+
*/
28+
object TypeWideningShims {
29+
30+
/**
31+
* Returns whether the given type change is eligible for widening. This only checks atomic types.
32+
* It is the responsibility of the caller to recurse into structs, maps and arrays.
33+
*/
34+
def isTypeChangeSupported(fromType: AtomicType, toType: AtomicType): Boolean =
35+
(fromType, toType) match {
36+
case (from, to) if from == to => true
37+
// All supported type changes below are supposed to be widening, but to be safe, reject any
38+
// non-widening change upfront.
39+
case (from, to) if !Cast.canUpCast(from, to) => false
40+
case (ByteType, ShortType) => true
41+
case (ByteType | ShortType, IntegerType) => true
42+
case _ => false
43+
}
44+
45+
/**
46+
* Returns whether the given type change can be applied during schema evolution. Only a
47+
* subset of supported type changes are considered for schema evolution.
48+
*/
49+
def isTypeChangeSupportedForSchemaEvolution(fromType: AtomicType, toType: AtomicType): Boolean = {
50+
// All supported type changes are eligible for schema evolution.
51+
isTypeChangeSupported(fromType, toType)
52+
}
53+
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Copyright (2021) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.apache.spark.sql.delta
18+
19+
import org.apache.spark.sql.catalyst.expressions.Cast
20+
import org.apache.spark.sql.types._
21+
22+
/**
23+
* Type widening only supports a limited set of type changes with Spark 3.5 due to the parquet
24+
* readers lacking the corresponding conversions that were added in Spark 4.0.
25+
* This shim is for Delta on Spark 4.0 which supports:
26+
* - byte -> short -> int -> long.
27+
* - float -> double.
28+
* - date -> timestamp_ntz.
29+
* - {byte, short, int} -> double.
30+
* - decimal -> wider decimal.
31+
* - {byte, short, int} -> decimal(10, 0) and wider.
32+
* - long -> decimal(20, 0) and wider.
33+
*/
34+
object TypeWideningShims {
35+
36+
/**
37+
* Returns whether the given type change is eligible for widening. This only checks atomic types.
38+
* It is the responsibility of the caller to recurse into structs, maps and arrays.
39+
*/
40+
def isTypeChangeSupported(fromType: AtomicType, toType: AtomicType): Boolean =
41+
(fromType, toType) match {
42+
case (from, to) if from == to => true
43+
// All supported type changes below are supposed to be widening, but to be safe, reject any
44+
// non-widening change upfront.
45+
case (from, to) if !Cast.canUpCast(from, to) => false
46+
case (from: IntegralType, to: IntegralType) => from.defaultSize <= to.defaultSize
47+
case (FloatType, DoubleType) => true
48+
case (DateType, TimestampNTZType) => true
49+
case (ByteType | ShortType | IntegerType, DoubleType) => true
50+
case (from: DecimalType, to: DecimalType) => to.isWiderThan(from)
51+
// Byte, Short, Integer are all stored as INT32 in parquet. The parquet readers support
52+
// converting INT32 to Decimal(10, 0) and wider.
53+
case (ByteType | ShortType | IntegerType, d: DecimalType) => d.isWiderThan(IntegerType)
54+
// The parquet readers support converting INT64 to Decimal(20, 0) and wider.
55+
case (LongType, d: DecimalType) => d.isWiderThan(LongType)
56+
case _ => false
57+
}
58+
59+
/**
60+
* Returns whether the given type change can be applied during schema evolution. Only a
61+
* subset of supported type changes are considered for schema evolution.
62+
*/
63+
def isTypeChangeSupportedForSchemaEvolution(fromType: AtomicType, toType: AtomicType): Boolean =
64+
(fromType, toType) match {
65+
case (from, to) if from == to => true
66+
case (from, to) if !isTypeChangeSupported(from, to) => false
67+
case (from: IntegralType, to: IntegralType) => from.defaultSize <= to.defaultSize
68+
case (FloatType, DoubleType) => true
69+
case (from: DecimalType, to: DecimalType) => to.isWiderThan(from)
70+
case (DateType, TimestampNTZType) => true
71+
case _ => false
72+
}
73+
}

spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ package org.apache.spark.sql.delta
1818

1919
import org.apache.spark.sql.delta.actions.{AddFile, Metadata, Protocol, TableFeatureProtocolUtils}
2020

21-
import org.apache.spark.sql.catalyst.expressions.Cast
2221
import org.apache.spark.sql.functions.{col, lit}
2322
import org.apache.spark.sql.types._
2423

@@ -66,28 +65,14 @@ object TypeWidening {
6665
* It is the responsibility of the caller to recurse into structs, maps and arrays.
6766
*/
6867
def isTypeChangeSupported(fromType: AtomicType, toType: AtomicType): Boolean =
69-
(fromType, toType) match {
70-
case (from, to) if from == to => true
71-
// All supported type changes below are supposed to be widening, but to be safe, reject any
72-
// non-widening change upfront.
73-
case (from, to) if !Cast.canUpCast(from, to) => false
74-
case (ByteType, ShortType) => true
75-
case (ByteType | ShortType, IntegerType) => true
76-
case _ => false
77-
}
68+
TypeWideningShims.isTypeChangeSupported(fromType, toType)
7869

7970
/**
8071
* Returns whether the given type change can be applied during schema evolution. Only a
8172
* subset of supported type changes are considered for schema evolution.
8273
*/
8374
def isTypeChangeSupportedForSchemaEvolution(fromType: AtomicType, toType: AtomicType): Boolean =
84-
(fromType, toType) match {
85-
case (from, to) if from == to => true
86-
case (from, to) if !isTypeChangeSupported(from, to) => false
87-
case (ByteType, ShortType) => true
88-
case (ByteType | ShortType, IntegerType) => true
89-
case _ => false
90-
}
75+
TypeWideningShims.isTypeChangeSupportedForSchemaEvolution(fromType, toType)
9176

9277
/**
9378
* Asserts that the given table doesn't contain any unsupported type changes. This should never
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
/*
2+
* Copyright (2024) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.apache.spark.sql.delta
18+
19+
import org.apache.spark.sql.test.SQLTestUtils
20+
import org.apache.spark.sql.types._
21+
22+
/**
23+
* The set of type changes supported by type widening is different between Spark 3.5 and Spark 4.0.
24+
* See [[TypeWideningShims]]. This shim splits the test cases into supported and unsupported
25+
* accordingly for delta on Spark 3.5.
26+
*/
27+
trait TypeWideningTestCasesShims {
28+
self: DeltaTypeWideningTestCases with SQLTestUtils =>
29+
30+
import testImplicits._
31+
32+
// Type changes that are supported by all Parquet readers. Byte, Short, Int are all stored as
33+
// INT32 in parquet so these changes are guaranteed to be supported.
34+
protected val supportedTestCases: Seq[TypeEvolutionTestCase] = Seq(
35+
SupportedTypeEvolutionTestCase(ByteType, ShortType,
36+
Seq(1, -1, Byte.MinValue, Byte.MaxValue, null.asInstanceOf[Byte]),
37+
Seq(4, -4, Short.MinValue, Short.MaxValue, null.asInstanceOf[Short])),
38+
SupportedTypeEvolutionTestCase(ByteType, IntegerType,
39+
Seq(1, -1, Byte.MinValue, Byte.MaxValue, null.asInstanceOf[Byte]),
40+
Seq(4, -4, Int.MinValue, Int.MaxValue, null.asInstanceOf[Int])),
41+
SupportedTypeEvolutionTestCase(ShortType, IntegerType,
42+
Seq(1, -1, Short.MinValue, Short.MaxValue, null.asInstanceOf[Short]),
43+
Seq(4, -4, Int.MinValue, Int.MaxValue, null.asInstanceOf[Int]))
44+
)
45+
46+
// Type changes that are only supported in ALTER TABLE CHANGE COLUMN TYPE but are not considered
47+
// for automatic type widening.
48+
protected val alterTableOnlySupportedTestCases: Seq[TypeEvolutionTestCase] = Seq.empty
49+
50+
// Test type changes that aren't supported.
51+
protected val unsupportedTestCases: Seq[TypeEvolutionTestCase] = Seq(
52+
UnsupportedTypeEvolutionTestCase(IntegerType, ByteType,
53+
Seq(1, 2, Int.MinValue)),
54+
UnsupportedTypeEvolutionTestCase(LongType, IntegerType,
55+
Seq(4, 5, Long.MaxValue)),
56+
UnsupportedTypeEvolutionTestCase(DoubleType, FloatType,
57+
Seq(987654321.987654321d, Double.NaN, Double.NegativeInfinity,
58+
Double.PositiveInfinity, Double.MinPositiveValue,
59+
Double.MinValue, Double.MaxValue)),
60+
UnsupportedTypeEvolutionTestCase(ByteType, DecimalType(2, 0),
61+
Seq(1, -1, Byte.MinValue)),
62+
UnsupportedTypeEvolutionTestCase(ShortType, DecimalType(4, 0),
63+
Seq(1, -1, Short.MinValue)),
64+
UnsupportedTypeEvolutionTestCase(IntegerType, DecimalType(9, 0),
65+
Seq(1, -1, Int.MinValue)),
66+
UnsupportedTypeEvolutionTestCase(LongType, DecimalType(19, 0),
67+
Seq(1, -1, Long.MinValue)),
68+
UnsupportedTypeEvolutionTestCase(TimestampNTZType, DateType,
69+
Seq("2020-03-17 15:23:15", "2023-12-31 23:59:59", "0001-01-01 00:00:00")),
70+
// Reduce scale
71+
UnsupportedTypeEvolutionTestCase(DecimalType(Decimal.MAX_INT_DIGITS, 2),
72+
DecimalType(Decimal.MAX_INT_DIGITS, 3),
73+
Seq(BigDecimal("-67.89"), BigDecimal("9" * (Decimal.MAX_INT_DIGITS - 2) + ".99"))),
74+
// Reduce precision
75+
UnsupportedTypeEvolutionTestCase(DecimalType(Decimal.MAX_INT_DIGITS, 2),
76+
DecimalType(Decimal.MAX_INT_DIGITS - 1, 2),
77+
Seq(BigDecimal("-67.89"), BigDecimal("9" * (Decimal.MAX_INT_DIGITS - 2) + ".99"))),
78+
// Reduce precision & scale
79+
UnsupportedTypeEvolutionTestCase(DecimalType(Decimal.MAX_LONG_DIGITS, 2),
80+
DecimalType(Decimal.MAX_INT_DIGITS - 1, 1),
81+
Seq(BigDecimal("-67.89"), BigDecimal("9" * (Decimal.MAX_LONG_DIGITS - 2) + ".99"))),
82+
// Increase scale more than precision
83+
UnsupportedTypeEvolutionTestCase(DecimalType(Decimal.MAX_INT_DIGITS, 2),
84+
DecimalType(Decimal.MAX_INT_DIGITS + 1, 4),
85+
Seq(BigDecimal("-67.89"), BigDecimal("9" * (Decimal.MAX_INT_DIGITS - 2) + ".99"))),
86+
// Smaller scale and larger precision.
87+
UnsupportedTypeEvolutionTestCase(DecimalType(Decimal.MAX_LONG_DIGITS, 2),
88+
DecimalType(Decimal.MAX_INT_DIGITS + 3, 1),
89+
Seq(BigDecimal("-67.89"), BigDecimal("9" * (Decimal.MAX_LONG_DIGITS - 2) + ".99"))),
90+
SupportedTypeEvolutionTestCase(IntegerType, DoubleType,
91+
Seq(1, -1, Int.MinValue, Int.MaxValue, null.asInstanceOf[Int]),
92+
Seq(987654321.987654321d, -0d, 0d, Double.NaN, Double.NegativeInfinity,
93+
Double.PositiveInfinity, Double.MinPositiveValue, Double.MinValue, Double.MaxValue,
94+
null.asInstanceOf[Double])),
95+
SupportedTypeEvolutionTestCase(ByteType, DecimalType(10, 0),
96+
Seq(1, -1, Byte.MinValue, Byte.MaxValue, null.asInstanceOf[Byte]),
97+
Seq(BigDecimal("1.23"), BigDecimal("9" * 10), null.asInstanceOf[BigDecimal])),
98+
SupportedTypeEvolutionTestCase(ShortType, DecimalType(10, 0),
99+
Seq(1, -1, Short.MinValue, Short.MaxValue, null.asInstanceOf[Short]),
100+
Seq(BigDecimal("1.23"), BigDecimal("9" * 10), null.asInstanceOf[BigDecimal])),
101+
SupportedTypeEvolutionTestCase(IntegerType, DecimalType(10, 0),
102+
Seq(1, -1, Int.MinValue, Int.MaxValue, null.asInstanceOf[Int]),
103+
Seq(BigDecimal("1.23"), BigDecimal("9" * 10), null.asInstanceOf[BigDecimal])),
104+
SupportedTypeEvolutionTestCase(LongType, DecimalType(20, 0),
105+
Seq(1L, -1L, Long.MinValue, Long.MaxValue, null.asInstanceOf[Int]),
106+
Seq(BigDecimal("1.23"), BigDecimal("9" * 20), null.asInstanceOf[BigDecimal])),
107+
SupportedTypeEvolutionTestCase(ShortType, LongType,
108+
Seq(1, -1, Short.MinValue, Short.MaxValue, null.asInstanceOf[Short]),
109+
Seq(4L, -4L, Long.MinValue, Long.MaxValue, null.asInstanceOf[Long])),
110+
SupportedTypeEvolutionTestCase(IntegerType, LongType,
111+
Seq(1, -1, Int.MinValue, Int.MaxValue, null.asInstanceOf[Int]),
112+
Seq(4L, -4L, Long.MinValue, Long.MaxValue, null.asInstanceOf[Long])),
113+
SupportedTypeEvolutionTestCase(FloatType, DoubleType,
114+
Seq(1234.56789f, -0f, 0f, Float.NaN, Float.NegativeInfinity, Float.PositiveInfinity,
115+
Float.MinPositiveValue, Float.MinValue, Float.MaxValue, null.asInstanceOf[Float]),
116+
Seq(987654321.987654321d, -0d, 0d, Double.NaN, Double.NegativeInfinity,
117+
Double.PositiveInfinity, Double.MinPositiveValue, Double.MinValue, Double.MaxValue,
118+
null.asInstanceOf[Double])),
119+
SupportedTypeEvolutionTestCase(DateType, TimestampNTZType,
120+
Seq("2020-01-01", "2024-02-29", "1312-02-27"),
121+
Seq("2020-03-17 15:23:15.123456", "2058-12-31 23:59:59.999", "0001-01-01 00:00:00")),
122+
// Larger precision.
123+
SupportedTypeEvolutionTestCase(DecimalType(Decimal.MAX_INT_DIGITS, 2),
124+
DecimalType(Decimal.MAX_LONG_DIGITS, 2),
125+
Seq(BigDecimal("1.23"), BigDecimal("10.34"), null.asInstanceOf[BigDecimal]),
126+
Seq(BigDecimal("-67.89"), BigDecimal("9" * (Decimal.MAX_LONG_DIGITS - 2) + ".99"),
127+
null.asInstanceOf[BigDecimal])),
128+
// Larger precision and scale, same physical type.
129+
SupportedTypeEvolutionTestCase(DecimalType(Decimal.MAX_INT_DIGITS - 1, 2),
130+
DecimalType(Decimal.MAX_INT_DIGITS, 3),
131+
Seq(BigDecimal("1.23"), BigDecimal("10.34"), null.asInstanceOf[BigDecimal]),
132+
Seq(BigDecimal("-67.89"), BigDecimal("9" * (Decimal.MAX_INT_DIGITS - 3) + ".99"),
133+
null.asInstanceOf[BigDecimal])),
134+
// Larger precision and scale, different physical types.
135+
SupportedTypeEvolutionTestCase(DecimalType(Decimal.MAX_INT_DIGITS, 2),
136+
DecimalType(Decimal.MAX_LONG_DIGITS + 1, 3),
137+
Seq(BigDecimal("1.23"), BigDecimal("10.34"), null.asInstanceOf[BigDecimal]),
138+
Seq(BigDecimal("-67.89"), BigDecimal("9" * (Decimal.MAX_LONG_DIGITS - 2) + ".99"),
139+
null.asInstanceOf[BigDecimal]))
140+
)
141+
}

0 commit comments

Comments
 (0)