Skip to content

Commit bae5771

Browse files
authored
[VARIANT] Add enableVariantShredding Table Property and VariantShredding-preview table feature in Delta-Spark (#4521)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description Add the preview table feature for Variant Shredding (see #4033) enabled by the new enableVariantShredding table property. Spark will check if the `enableVariantShredding` table property is enabled before writing shredded data to Delta tables, when support for shredded writes is implemented in Spark. ## How was this patch tested? Unit test ## Does this PR introduce _any_ user-facing changes? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. --> No
1 parent 20228ad commit bae5771

File tree

7 files changed

+267
-1
lines changed

7 files changed

+267
-1
lines changed

spark/src/main/resources/error/delta-error-classes.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2317,6 +2317,12 @@
23172317
],
23182318
"sqlState" : "42809"
23192319
},
2320+
"DELTA_SHREDDING_TABLE_PROPERTY_DISABLED" : {
2321+
"message" : [
2322+
"Attempted to write shredded Variants but the table does not support shredded writes. Consider setting the table property enableVariantShredding to true."
2323+
],
2324+
"sqlState" : "0A000"
2325+
},
23202326
"DELTA_SOURCE_IGNORE_DELETE" : {
23212327
"message" : [
23222328
"Detected deleted data (for example <removedFile>) from streaming source at version <version>. This is currently not supported. If you'd like to ignore deletes, set the option 'ignoreDeletes' to 'true'. The source table can be found at path <dataPath>."

spark/src/main/scala/org/apache/spark/sql/delta/DeltaConfig.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -549,6 +549,13 @@ trait DeltaConfigsBase extends DeltaLogging {
549549
validationFunction = _ => true,
550550
helpMessage = "needs to be a boolean.")
551551

552+
val ENABLE_VARIANT_SHREDDING = buildConfig[Boolean](
553+
key = "enableVariantShredding",
554+
defaultValue = "false",
555+
fromString = _.toBoolean,
556+
validationFunction = _ => true,
557+
helpMessage = "needs to be a boolean.")
558+
552559
/**
553560
* Whether this table will automatically optimize the layout of files during writes.
554561
*/

spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2827,6 +2827,13 @@ trait DeltaErrorsBase
28272827
)
28282828
}
28292829

2830+
def variantShreddingUnsupported(): Throwable = {
2831+
new DeltaSparkException(
2832+
errorClass = "DELTA_SHREDDING_TABLE_PROPERTY_DISABLED",
2833+
messageParameters = Array.empty
2834+
)
2835+
}
2836+
28302837
def unsupportSubqueryInPartitionPredicates(): Throwable = {
28312838
new DeltaAnalysisException(
28322839
errorClass = "DELTA_UNSUPPORTED_SUBQUERY_IN_PARTITION_PREDICATES",

spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ import org.apache.spark.sql.catalyst.expressions._
6161
import org.apache.spark.sql.catalyst.plans.logical.UnsetTableProperties
6262
import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
6363
import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, ResolveDefaultColumns}
64+
import org.apache.spark.sql.internal.SQLConf
6465
import org.apache.spark.sql.types.{StructField, StructType}
6566
import org.apache.spark.util.{Clock, Utils}
6667

@@ -964,6 +965,18 @@ trait OptimisticTransactionImpl extends DeltaTransaction
964965
}
965966
}
966967

968+
// Make sure shredded writes are only performed if the shredding table property was set.
969+
private def assertShreddingStateConsistent() = {
970+
if (!DeltaConfigs.ENABLE_VARIANT_SHREDDING.fromMetaData(metadata)) {
971+
val isVariantShreddingSchemaForced =
972+
spark.sessionState.conf
973+
.getConfString("spark.sql.variant.forceShreddingSchemaForTest", "").nonEmpty
974+
if (isVariantShreddingSchemaForced) {
975+
throw DeltaErrors.variantShreddingUnsupported()
976+
}
977+
}
978+
}
979+
967980
/**
968981
* Must make sure that deletion vectors are never added to a table where that isn't allowed.
969982
* Note, statistics recomputation is still allowed even though DVs might be currently disabled.
@@ -2176,6 +2189,9 @@ trait OptimisticTransactionImpl extends DeltaTransaction
21762189
val assertDeletionVectorWellFormed = getAssertDeletionVectorWellFormedFunc(spark, op)
21772190
actions.foreach(assertDeletionVectorWellFormed)
21782191

2192+
// Make sure shredded writes are only performed if the shredding table property was set
2193+
assertShreddingStateConsistent()
2194+
21792195
// Make sure this operation does not include default column values if the corresponding table
21802196
// feature is not enabled.
21812197
if (!protocol.isFeatureSupported(AllowColumnDefaultsTableFeature)) {

spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,7 @@ object TableFeature {
370370
InCommitTimestampTableFeature,
371371
VariantTypePreviewTableFeature,
372372
VariantTypeTableFeature,
373+
VariantShreddingPreviewTableFeature,
373374
CatalogOwnedTableFeature,
374375
CoordinatedCommitsTableFeature,
375376
CheckpointProtectionTableFeature)
@@ -713,6 +714,17 @@ object VariantTypeTableFeature extends ReaderWriterFeature(name = "variantType")
713714
}
714715
}
715716

717+
object VariantShreddingPreviewTableFeature
718+
extends ReaderWriterFeature(name = "variantShredding-preview")
719+
with FeatureAutomaticallyEnabledByMetadata {
720+
override def automaticallyUpdateProtocolOfExistingTables: Boolean = true
721+
722+
override def metadataRequiresFeatureToBeEnabled(
723+
protocol: Protocol, metadata: Metadata, spark: SparkSession): Boolean = {
724+
DeltaConfigs.ENABLE_VARIANT_SHREDDING.fromMetaData(metadata)
725+
}
726+
}
727+
716728
object DeletionVectorsTableFeature
717729
extends ReaderWriterFeature(name = "deletionVectors")
718730
with RemovableFeature
Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,218 @@
1+
/*
2+
* Copyright (2025) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.apache.spark.sql.delta
18+
19+
import scala.collection.JavaConverters._
20+
21+
import org.apache.hadoop.conf.Configuration
22+
import org.apache.hadoop.fs.Path
23+
24+
import org.apache.spark.SparkException
25+
import org.apache.spark.sql.{QueryTest, Row}
26+
import org.apache.spark.sql.catalyst.TableIdentifier
27+
import org.apache.spark.sql.delta.actions.AddFile
28+
import org.apache.spark.sql.delta.test.{DeltaSQLCommandTest, DeltaSQLTestUtils, TestsStatistics}
29+
import org.apache.spark.sql.internal.SQLConf
30+
import org.apache.spark.sql.test.SharedSparkSession
31+
32+
import org.apache.parquet.hadoop.ParquetFileReader
33+
import org.apache.parquet.hadoop.metadata.ParquetMetadata
34+
import org.apache.parquet.schema.{GroupType, MessageType, Type}
35+
36+
class DeltaVariantShreddingSuite
37+
extends QueryTest
38+
with SharedSparkSession
39+
with DeltaSQLCommandTest
40+
with DeltaSQLTestUtils
41+
with TestsStatistics {
42+
43+
import testImplicits._
44+
45+
private def numShreddedFiles(path: String, validation: GroupType => Boolean = _ => true): Int = {
46+
def listParquetFilesRecursively(dir: String): Seq[String] = {
47+
val deltaLog = DeltaLog.forTable(spark, dir)
48+
val files = deltaLog.snapshot.allFiles
49+
files.collect().map { file: AddFile =>
50+
file.absolutePath(deltaLog).toString
51+
}
52+
}
53+
54+
val parquetFiles = listParquetFilesRecursively(path)
55+
56+
def hasStructWithFieldNamesInternal(schema: List[Type], fieldNames: Set[String]): Boolean = {
57+
schema.exists {
58+
case group: GroupType if group.getFields.asScala.map(_.getName).toSet == fieldNames =>
59+
true
60+
case group: GroupType =>
61+
hasStructWithFieldNamesInternal(group.getFields.asScala.toList, fieldNames)
62+
case _ => false
63+
}
64+
}
65+
66+
def hasStructWithFieldNames(schema: MessageType, fieldNames: Set[String]): Boolean = {
67+
schema.getFields.asScala.exists {
68+
case group: GroupType if group.getFields.asScala.map(_.getName).toSet == fieldNames &&
69+
validation(group) =>
70+
true
71+
case group: GroupType =>
72+
hasStructWithFieldNamesInternal(group.getFields.asScala.toList, fieldNames)
73+
case _ => false
74+
}
75+
}
76+
77+
val requiredFieldNames = Set("value", "metadata", "typed_value")
78+
val conf = new Configuration()
79+
parquetFiles.count { p =>
80+
val reader = ParquetFileReader.open(conf, new Path(p))
81+
val footer: ParquetMetadata = reader.getFooter
82+
val isShredded =
83+
hasStructWithFieldNames(footer.getFileMetaData().getSchema, requiredFieldNames)
84+
reader.close()
85+
isShredded
86+
}
87+
}
88+
89+
test("variant shredding table property") {
90+
withTable("tbl") {
91+
sql("CREATE TABLE tbl(s STRING, i INTEGER) USING DELTA")
92+
val (deltaLog, snapshot) = DeltaLog.forTableWithSnapshot(spark, TableIdentifier("tbl"))
93+
assert(!snapshot.protocol
94+
.isFeatureSupported(VariantShreddingPreviewTableFeature),
95+
s"Table tbl contains ShreddedVariantTableFeature descriptor when its not supposed to"
96+
)
97+
sql(s"ALTER TABLE tbl " +
98+
s"SET TBLPROPERTIES('${DeltaConfigs.ENABLE_VARIANT_SHREDDING.key}' = 'true')")
99+
assert(getProtocolForTable("tbl")
100+
.readerAndWriterFeatures.contains(VariantShreddingPreviewTableFeature))
101+
}
102+
withTable("tbl") {
103+
sql(s"CREATE TABLE tbl(s STRING, i INTEGER) USING DELTA " +
104+
s"TBLPROPERTIES('${DeltaConfigs.ENABLE_VARIANT_SHREDDING.key}' = 'true')")
105+
assert(getProtocolForTable("tbl")
106+
.readerAndWriterFeatures.contains(VariantShreddingPreviewTableFeature))
107+
}
108+
assert(DeltaConfigs.ENABLE_VARIANT_SHREDDING.key == "delta.enableVariantShredding")
109+
}
110+
111+
test("Spark can read shredded table containing the shredding table feature") {
112+
withTable("tbl") {
113+
withTempDir { dir =>
114+
val schema = "a int, b string, c decimal(15, 1)"
115+
val df = spark.sql(
116+
"""
117+
| select id i, case
118+
| when id = 0 then parse_json('{"a": 1, "b": "2", "c": 3.3, "d": 4.4}')
119+
| when id = 1 then parse_json('{"a": [1,2,3], "b": "hello", "c": {"x": 0}}')
120+
| when id = 2 then parse_json('{"A": 1, "c": 1.23}')
121+
| end v from range(0, 3, 1, 1)
122+
|""".stripMargin)
123+
124+
sql("CREATE TABLE tbl (i long, v variant) USING DELTA " +
125+
s"TBLPROPERTIES ('${DeltaConfigs.ENABLE_VARIANT_SHREDDING.key}' = 'true') " +
126+
s"LOCATION '${dir.getAbsolutePath}'")
127+
assert(getProtocolForTable("tbl")
128+
.readerAndWriterFeatures.contains(VariantShreddingPreviewTableFeature))
129+
withSQLConf(SQLConf.VARIANT_WRITE_SHREDDING_ENABLED.key -> true.toString,
130+
SQLConf.VARIANT_ALLOW_READING_SHREDDED.key -> true.toString,
131+
SQLConf.VARIANT_FORCE_SHREDDING_SCHEMA_FOR_TEST.key -> schema) {
132+
133+
df.write.format("delta").mode("append").saveAsTable("tbl")
134+
// Make sure the actual parquet files are shredded
135+
assert(numShreddedFiles(dir.getAbsolutePath, validation = { field: GroupType =>
136+
field.getName == "v" && (field.getType("typed_value") match {
137+
case t: GroupType =>
138+
t.getFields.asScala.map(_.getName).toSet == Set("a", "b", "c")
139+
case _ => false
140+
})
141+
}) == 1)
142+
checkAnswer(
143+
spark.read.format("delta").load(dir.getAbsolutePath).selectExpr("i", "to_json(v)"),
144+
df.selectExpr("i", "to_json(v)").collect()
145+
)
146+
}
147+
}
148+
}
149+
}
150+
151+
test("Test shredding property controls shredded writes") {
152+
val schema = "a int, b string, c decimal(15, 1)"
153+
val df = spark.sql(
154+
"""
155+
| select id i, case
156+
| when id = 0 then parse_json('{"a": 1, "b": "2", "c": 3.3, "d": 4.4}')
157+
| when id = 1 then parse_json('{"a": [1,2,3], "b": "hello", "c": {"x": 0}}')
158+
| when id = 2 then parse_json('{"A": 1, "c": 1.23}')
159+
| end v from range(0, 3, 1, 1)
160+
|""".stripMargin)
161+
// Table property not present or false
162+
Seq("", s"TBLPROPERTIES ('${DeltaConfigs.ENABLE_VARIANT_SHREDDING.key}' = 'false') ")
163+
.foreach { tblProperties =>
164+
withTable("tbl") {
165+
withTempDir { dir =>
166+
sql("CREATE TABLE tbl (i long, v variant) USING DELTA " + tblProperties +
167+
s"LOCATION '${dir.getAbsolutePath}'")
168+
withSQLConf(SQLConf.VARIANT_WRITE_SHREDDING_ENABLED.key -> true.toString,
169+
SQLConf.VARIANT_ALLOW_READING_SHREDDED.key -> true.toString,
170+
SQLConf.VARIANT_FORCE_SHREDDING_SCHEMA_FOR_TEST.key -> schema) {
171+
172+
val e = intercept[DeltaSparkException] {
173+
df.write.format("delta").mode("append").saveAsTable("tbl")
174+
}
175+
checkError(e, "DELTA_SHREDDING_TABLE_PROPERTY_DISABLED", parameters = Map())
176+
assert(e.getMessage.contains(
177+
"Attempted to write shredded Variants but the table does not support shredded " +
178+
"writes. Consider setting the table property enableVariantShredding to true."))
179+
assert(numShreddedFiles(dir.getAbsolutePath, validation = { field: GroupType =>
180+
field.getName == "v" && (field.getType("typed_value") match {
181+
case t: GroupType =>
182+
t.getFields.asScala.map(_.getName).toSet == Set("a", "b", "c")
183+
case _ => false
184+
})
185+
}) == 0)
186+
checkAnswer(
187+
spark.read.format("delta").load(dir.getAbsolutePath).selectExpr("i", "to_json(v)"),
188+
Seq()
189+
)
190+
}
191+
}
192+
}
193+
}
194+
}
195+
196+
test("Set table property to invalid value") {
197+
withTable("tbl") {
198+
sql("CREATE TABLE tbl(s STRING, i INTEGER) USING DELTA")
199+
val (deltaLog, snapshot) = DeltaLog.forTableWithSnapshot(spark, TableIdentifier("tbl"))
200+
assert(!snapshot.protocol
201+
.isFeatureSupported(VariantShreddingPreviewTableFeature),
202+
s"Table tbl contains ShreddedVariantTableFeature descriptor when its not supposed to"
203+
)
204+
checkError(
205+
intercept[SparkException] {
206+
sql(s"ALTER TABLE tbl " +
207+
s"SET TBLPROPERTIES('${DeltaConfigs.ENABLE_VARIANT_SHREDDING.key}' = 'bla')")
208+
},
209+
"_LEGACY_ERROR_TEMP_2045",
210+
parameters = Map(
211+
"message" -> "For input string: \"bla\""
212+
)
213+
)
214+
assert(!getProtocolForTable("tbl")
215+
.readerAndWriterFeatures.contains(VariantShreddingPreviewTableFeature))
216+
}
217+
}
218+
}

spark/src/test/scala-spark-master/org/apache/spark/sql/delta/DeltaVariantSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (2021) The Delta Lake Project Authors.
2+
* Copyright (2025) The Delta Lake Project Authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.

0 commit comments

Comments
 (0)