Skip to content

Commit 51f60ca

Browse files
authored
[spark] assign reserved row tracking field id for uniform tables (#4803)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> For uniform tables, to make the materialized row id column and row commit version column is understandable by Iceberg, we need to assign the corresponding parquet field id to these two columns. ## How was this patch tested? <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> ## Does this PR introduce _any_ user-facing changes? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. -->
1 parent f1293b6 commit 51f60ca

File tree

8 files changed

+164
-54
lines changed

8 files changed

+164
-54
lines changed

spark/src/main/scala/org/apache/spark/sql/delta/RowCommitVersion.scala

Lines changed: 56 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.apache.spark.sql.delta
1818

19+
import org.apache.spark.sql.delta.DeltaColumnMapping.PARQUET_FIELD_ID_METADATA_KEY
1920
import org.apache.spark.sql.delta.actions.{Metadata, Protocol}
2021
import org.apache.spark.sql.util.ScalaExtensions._
2122

@@ -35,9 +36,10 @@ object RowCommitVersion {
3536
def createMetadataStructField(
3637
protocol: Protocol,
3738
metadata: Metadata,
38-
nullable: Boolean = false): Option[StructField] =
39+
nullable: Boolean = false,
40+
shouldSetIcebergReservedFieldId: Boolean): Option[StructField] =
3941
MaterializedRowCommitVersion.getMaterializedColumnName(protocol, metadata)
40-
.map(MetadataStructField(_, nullable))
42+
.map(MetadataStructField(_, nullable, shouldSetIcebergReservedFieldId))
4143

4244
/**
4345
* Add a new column to `dataFrame` that has the name of the materialized Row Commit Version column
@@ -56,22 +58,35 @@ object RowCommitVersion {
5658

5759
val rowCommitVersionColumn =
5860
DeltaTableUtils.getFileMetadataColumn(dataFrame).getField(METADATA_STRUCT_FIELD_NAME)
59-
preserveRowCommitVersionsUnsafe(dataFrame, materializedColumnName, rowCommitVersionColumn)
61+
val shouldSetIcebergReservedFieldId = IcebergCompat.isGeqEnabled(snapshot.metadata, 3)
62+
63+
preserveRowCommitVersionsUnsafe(
64+
dataFrame,
65+
materializedColumnName,
66+
rowCommitVersionColumn,
67+
shouldSetIcebergReservedFieldId
68+
)
6069
}
6170

6271
private[delta] def preserveRowCommitVersionsUnsafe(
6372
dataFrame: DataFrame,
6473
materializedColumnName: String,
65-
rowCommitVersionColumn: Column): DataFrame = {
74+
rowCommitVersionColumn: Column,
75+
shouldSetIcebergReservedFieldId: Boolean): DataFrame = {
6676
dataFrame
6777
.withColumn(materializedColumnName, rowCommitVersionColumn)
68-
.withMetadata(materializedColumnName, MetadataStructField.metadata(materializedColumnName))
78+
.withMetadata(
79+
materializedColumnName,
80+
MetadataStructField.metadata(materializedColumnName, shouldSetIcebergReservedFieldId))
6981
}
7082

7183
object MetadataStructField {
7284
private val METADATA_COL_ATTR_KEY = "__row_commit_version_metadata_col"
7385

74-
def apply(materializedColumnName: String, nullable: Boolean = false): StructField =
86+
def apply(
87+
materializedColumnName: String,
88+
nullable: Boolean = false,
89+
shouldSetIcebergReservedFieldId: Boolean): StructField =
7590
StructField(
7691
METADATA_STRUCT_FIELD_NAME,
7792
LongType,
@@ -80,17 +95,32 @@ object RowCommitVersion {
8095
// injected before the optimizer pass by the [[GenerateRowIDs] rule at which point the Row
8196
// commit version field is non-nullable.
8297
nullable,
83-
metadata = metadata(materializedColumnName))
98+
metadata = metadata(materializedColumnName, shouldSetIcebergReservedFieldId))
8499

85100
def unapply(field: StructField): Option[StructField] =
86101
Option.when(isValid(field.dataType, field.metadata))(field)
87102

88-
def metadata(materializedColumnName: String): types.Metadata = new MetadataBuilder()
89-
.withMetadata(
90-
FileSourceGeneratedMetadataStructField.metadata(
91-
METADATA_STRUCT_FIELD_NAME, materializedColumnName))
92-
.putBoolean(METADATA_COL_ATTR_KEY, value = true)
93-
.build()
103+
def metadata(
104+
materializedColumnName: String,
105+
shouldSetIcebergReservedFieldId: Boolean): types.Metadata = {
106+
val metadataBuilder = new MetadataBuilder()
107+
.withMetadata(
108+
FileSourceGeneratedMetadataStructField.metadata(
109+
METADATA_STRUCT_FIELD_NAME, materializedColumnName))
110+
.putBoolean(METADATA_COL_ATTR_KEY, value = true)
111+
112+
// If IcebergCompatV3 or higher is enabled, assign the field ID of Delta
113+
// Row commit version column to match the reserved `_last_updated_sequence_number`
114+
// field defined in the Iceberg spec.
115+
// This ensures that Iceberg can recognize and track the same column for row lineage purposes.
116+
if (shouldSetIcebergReservedFieldId) {
117+
metadataBuilder.putLong(
118+
PARQUET_FIELD_ID_METADATA_KEY,
119+
IcebergConstants.ICEBERG_ROW_TRACKING_LAST_UPDATED_SEQUENCE_NUMBER_FIELD_ID
120+
)
121+
}
122+
metadataBuilder.build()
123+
}
94124

95125
/** Return true if the column is a Row Commit Version column. */
96126
def isRowCommitVersionColumn(structField: StructField): Boolean =
@@ -103,12 +133,21 @@ object RowCommitVersion {
103133
}
104134
}
105135

106-
def columnMetadata(materializedColumnName: String): types.Metadata =
107-
MetadataStructField.metadata(materializedColumnName)
136+
def columnMetadata(
137+
materializedColumnName: String,
138+
shouldSetIcebergReservedFieldId: Boolean): types.Metadata =
139+
MetadataStructField.metadata(materializedColumnName, shouldSetIcebergReservedFieldId)
108140

109141
object MetadataAttribute {
110-
def apply(materializedColumnName: String): AttributeReference =
111-
DataTypeUtils.toAttribute(MetadataStructField(materializedColumnName))
142+
def apply(
143+
materializedColumnName: String,
144+
shouldSetIcebergReservedFieldId: Boolean): AttributeReference =
145+
DataTypeUtils
146+
.toAttribute(
147+
MetadataStructField(
148+
materializedColumnName,
149+
shouldSetIcebergReservedFieldId = shouldSetIcebergReservedFieldId
150+
))
112151
.withName(materializedColumnName)
113152

114153
def unapply(attr: Attribute): Option[Attribute] =

spark/src/main/scala/org/apache/spark/sql/delta/RowId.scala

Lines changed: 52 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.apache.spark.sql.delta
1818

19+
import org.apache.spark.sql.delta.DeltaColumnMapping.PARQUET_FIELD_ID_METADATA_KEY
1920
import org.apache.spark.sql.delta.actions.{Action, AddFile, DomainMetadata, Metadata, Protocol}
2021
import org.apache.spark.sql.delta.actions.TableFeatureProtocolUtils.propertyKey
2122
import org.apache.spark.sql.util.ScalaExtensions._
@@ -183,17 +184,22 @@ object RowId {
183184
val QUALIFIED_COLUMN_NAME = s"${FileFormat.METADATA_NAME}.${ROW_ID}"
184185

185186
/** Column metadata to be used in conjunction [[QUALIFIED_COLUMN_NAME]] to mark row id columns */
186-
def columnMetadata(materializedColumnName: String): types.Metadata =
187-
RowIdMetadataStructField.metadata(materializedColumnName)
187+
def columnMetadata(
188+
materializedColumnName: String,
189+
shouldSetIcebergReservedFieldId: Boolean): types.Metadata =
190+
RowIdMetadataStructField.metadata(materializedColumnName, shouldSetIcebergReservedFieldId)
188191

189192
/**
190193
* The field readers can use to access the generated row id column. The scanner's internal column
191194
* name is obtained from the table's metadata.
192195
*/
193-
def createRowIdField(protocol: Protocol, metadata: Metadata, nullable: Boolean)
194-
: Option[StructField] =
196+
def createRowIdField(
197+
protocol: Protocol,
198+
metadata: Metadata,
199+
nullable: Boolean,
200+
shouldSetIcebergReservedFieldId: Boolean): Option[StructField] =
195201
MaterializedRowId.getMaterializedColumnName(protocol, metadata)
196-
.map(RowIdMetadataStructField(_, nullable))
202+
.map(RowIdMetadataStructField(_, nullable, shouldSetIcebergReservedFieldId))
197203

198204
/*
199205
* A specialization of [[FileSourceGeneratedMetadataStructField]] used to represent RowId columns.
@@ -209,21 +215,38 @@ object RowId {
209215

210216
val ROW_ID_METADATA_COL_ATTR_KEY = "__row_id_metadata_col"
211217

212-
def metadata(materializedColumnName: String): types.Metadata = new MetadataBuilder()
213-
.withMetadata(
214-
FileSourceGeneratedMetadataStructField.metadata(RowId.ROW_ID, materializedColumnName))
215-
.putBoolean(ROW_ID_METADATA_COL_ATTR_KEY, value = true)
216-
.build()
218+
def metadata(
219+
materializedColumnName: String,
220+
shouldSetIcebergReservedFieldId: Boolean): types.Metadata = {
221+
val metadataBuilder = new MetadataBuilder()
222+
.withMetadata(
223+
FileSourceGeneratedMetadataStructField.metadata(RowId.ROW_ID, materializedColumnName))
224+
.putBoolean(ROW_ID_METADATA_COL_ATTR_KEY, value = true)
225+
226+
// If IcebergCompatV3 or higher is enabled, assign the field ID of Delta row id column
227+
// to match the reserved `_row_id` field defined in the Iceberg spec.
228+
// This ensures that Iceberg can recognize and track the same column for row lineage purposes.
229+
if (shouldSetIcebergReservedFieldId) {
230+
metadataBuilder.putLong(
231+
PARQUET_FIELD_ID_METADATA_KEY,
232+
IcebergConstants.ICEBERG_ROW_TRACKING_ROW_ID_FIELD_ID
233+
)
234+
}
235+
metadataBuilder.build()
236+
}
217237

218-
def apply(materializedColumnName: String, nullable: Boolean = false): StructField =
238+
def apply(
239+
materializedColumnName: String,
240+
nullable: Boolean = false,
241+
shouldSetIcebergReservedFieldId: Boolean): StructField =
219242
StructField(
220243
RowId.ROW_ID,
221244
LongType,
222245
// The Row ID field is used to read the materialized Row ID value which is nullable. The
223246
// actual Row ID expression is created using a projection injected before the optimizer pass
224247
// by the [[GenerateRowIDs] rule at which point the Row ID field is non-nullable.
225248
nullable,
226-
metadata = metadata(materializedColumnName))
249+
metadata = metadata(materializedColumnName, shouldSetIcebergReservedFieldId))
227250

228251
def unapply(field: StructField): Option[StructField] =
229252
if (isRowIdColumn(field)) Some(field) else None
@@ -241,8 +264,14 @@ object RowId {
241264

242265
object RowIdMetadataAttribute {
243266
/** Creates an attribute for writing out the materialized column name */
244-
def apply(materializedColumnName: String): AttributeReference =
245-
DataTypeUtils.toAttribute(RowIdMetadataStructField(materializedColumnName))
267+
def apply(
268+
materializedColumnName: String,
269+
shouldSetIcebergReservedFieldId: Boolean): AttributeReference =
270+
DataTypeUtils
271+
.toAttribute(
272+
RowIdMetadataStructField(
273+
materializedColumnName,
274+
shouldSetIcebergReservedFieldId = shouldSetIcebergReservedFieldId))
246275
.withName(materializedColumnName)
247276

248277
def unapply(attr: Attribute): Option[Attribute] =
@@ -302,7 +331,10 @@ object RowId {
302331
snapshot.protocol, snapshot.metadata, snapshot.deltaLog.tableId)
303332

304333
val rowIdColumn = DeltaTableUtils.getFileMetadataColumn(dataFrame).getField(ROW_ID)
305-
preserveRowIdsUnsafe(dataFrame, materializedColumnName, rowIdColumn)
334+
val shouldSetIcebergReservedFieldId = IcebergCompat.isGeqEnabled(snapshot.metadata, 3)
335+
336+
preserveRowIdsUnsafe(
337+
dataFrame, materializedColumnName, rowIdColumn, shouldSetIcebergReservedFieldId)
306338
}
307339

308340
/**
@@ -314,9 +346,12 @@ object RowId {
314346
private[delta] def preserveRowIdsUnsafe(
315347
dataFrame: DataFrame,
316348
materializedColumnName: String,
317-
rowIdColumn: Column): DataFrame = {
349+
rowIdColumn: Column,
350+
shouldSetIcebergReservedFieldId: Boolean): DataFrame = {
318351
dataFrame
319352
.withColumn(materializedColumnName, rowIdColumn)
320-
.withMetadata(materializedColumnName, columnMetadata(materializedColumnName))
353+
.withMetadata(
354+
materializedColumnName,
355+
columnMetadata(materializedColumnName, shouldSetIcebergReservedFieldId))
321356
}
322357
}

spark/src/main/scala/org/apache/spark/sql/delta/RowTracking.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,11 +74,16 @@ object RowTracking {
7474
metadata: Metadata,
7575
nullableConstantFields: Boolean,
7676
nullableGeneratedFields: Boolean): Iterable[StructField] = {
77-
RowId.createRowIdField(protocol, metadata, nullableGeneratedFields) ++
77+
val needSetRowTrackingFieldIdForUniform =
78+
IcebergCompat.isGeqEnabled(metadata, requiredVersion = 3)
79+
80+
RowId.createRowIdField(
81+
protocol, metadata, nullableGeneratedFields, needSetRowTrackingFieldIdForUniform) ++
7882
RowId.createBaseRowIdField(protocol, metadata, nullableConstantFields) ++
7983
DefaultRowCommitVersion.createDefaultRowCommitVersionField(
8084
protocol, metadata, nullableConstantFields) ++
81-
RowCommitVersion.createMetadataStructField(protocol, metadata, nullableGeneratedFields)
85+
RowCommitVersion.createMetadataStructField(
86+
protocol, metadata, nullableGeneratedFields, needSetRowTrackingFieldIdForUniform)
8287
}
8388

8489
/**

spark/src/main/scala/org/apache/spark/sql/delta/UniversalFormat.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,12 @@ object IcebergConstants {
338338
val ICEBERG_TBLPROP_METADATA_LOCATION = "metadata_location"
339339
val ICEBERG_PROVIDER = "iceberg"
340340
val ICEBERG_NAME_MAPPING_PROPERTY = "schema.name-mapping.default"
341+
342+
// Reserved field ID for the `_row_id` column
343+
// Iceberg spec: https://iceberg.apache.org/spec/?h=row#reserved-field-ids
344+
val ICEBERG_ROW_TRACKING_ROW_ID_FIELD_ID = 2147483540L
345+
// Reserved field ID for the `_last_updated_sequence_number` column
346+
val ICEBERG_ROW_TRACKING_LAST_UPDATED_SEQUENCE_NUMBER_FIELD_ID = 2147483539L
341347
}
342348

343349
object HudiConstants {

spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/ClassicMergeExecutor.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -432,14 +432,16 @@ trait ClassicMergeExecutor extends MergeOutputGeneration {
432432
(if (cdcEnabled) Seq(CDC_TYPE_NOT_CDC) else Seq())
433433

434434
// Generate output columns.
435+
val needSetRowTrackingFieldIdForUniform = IcebergCompat.isGeqEnabled(deltaTxn.metadata, 3)
435436
val outputCols = generateWriteAllChangesOutputCols(
436437
targetWriteCols,
437438
rowIdColumnExpressionOpt,
438439
rowCommitVersionColumnExpressionOpt,
439440
outputColNames,
440441
noopCopyExprs,
441442
clausesWithPrecompConditions,
442-
cdcEnabled
443+
cdcEnabled,
444+
needSetRowTrackingFieldIdForUniform = needSetRowTrackingFieldIdForUniform
443445
)
444446

445447
val preOutputDF = if (cdcEnabled) {
@@ -450,7 +452,9 @@ trait ClassicMergeExecutor extends MergeOutputGeneration {
450452
noopCopyExprs,
451453
rowIdColumnExpressionOpt.map(_.name),
452454
rowCommitVersionColumnExpressionOpt.map(_.name),
453-
deduplicateCDFDeletes)
455+
deduplicateCDFDeletes,
456+
needSetRowTrackingFieldIdForUniform = needSetRowTrackingFieldIdForUniform
457+
)
454458
} else {
455459
// change data capture is off, just output the normal data
456460
joinedAndPrecomputedConditionsDF

0 commit comments

Comments
 (0)