Skip to content

Commit 23b7c17

Browse files
authored
[Spark] Add read support for RowId (#2856)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description 1. Add the Analyzer Rule `GenerateRowIds` to generate default Row IDs. 2. Add the `row_id` field to the `_metadata` column for Delta tables, allowing us to read the `row_id` from the file metadata after it is stored. <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> ## How was this patch tested? Added UTs. <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> ## Does this PR introduce _any_ user-facing changes? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. --> No.
1 parent 3b50a92 commit 23b7c17

15 files changed

+915
-14
lines changed

spark/src/main/scala/io/delta/sql/DeltaSparkSessionExtension.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,9 @@ class DeltaSparkSessionExtension extends (SparkSessionExtensions => Unit) {
125125
extensions.injectPostHocResolutionRule { session =>
126126
PostHocResolveUpCast(session)
127127
}
128+
129+
extensions.injectPlanNormalizationRule { _ => GenerateRowIDs }
130+
128131
// We don't use `injectOptimizerRule` here as we won't want to apply further optimizations after
129132
// `PrepareDeltaScan`.
130133
// For example, `ConstantFolding` will break unit tests in `OptimizeGeneratedColumnSuite`.

spark/src/main/scala/org/apache/spark/sql/delta/ColumnWithDefaultExprUtils.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,12 @@ object ColumnWithDefaultExprUtils extends DeltaLogging {
153153
}
154154
}
155155
selectExprs = selectExprs ++ cdcSelectExprs
156+
157+
val rowIdExprs = data.queryExecution.analyzed.output
158+
.filter(RowId.RowIdMetadataAttribute.isRowIdColumn)
159+
.map(new Column(_))
160+
selectExprs = selectExprs ++ rowIdExprs
161+
156162
val newData = queryExecution match {
157163
case incrementalExecution: IncrementalExecution =>
158164
selectFromStreamingDataFrame(incrementalExecution, data, selectExprs: _*)

spark/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import java.util.{Locale, UUID}
2020

2121
import scala.collection.mutable
2222

23+
import org.apache.spark.sql.delta.RowId.RowIdMetadataStructField
2324
import org.apache.spark.sql.delta.actions.{Metadata, Protocol}
2425
import org.apache.spark.sql.delta.commands.cdc.CDCReader
2526
import org.apache.spark.sql.delta.metering.DeltaLogging
@@ -78,8 +79,9 @@ trait DeltaColumnMappingBase extends DeltaLogging {
7879
val supportedModes: Set[DeltaColumnMappingMode] =
7980
Set(IdMapping, NoMapping, NameMapping)
8081

81-
def isInternalField(field: StructField): Boolean = DELTA_INTERNAL_COLUMNS
82-
.contains(field.name.toLowerCase(Locale.ROOT))
82+
def isInternalField(field: StructField): Boolean =
83+
DELTA_INTERNAL_COLUMNS.contains(field.name.toLowerCase(Locale.ROOT)) ||
84+
RowIdMetadataStructField.isRowIdColumn(field)
8385

8486
def satisfiesColumnMappingProtocol(protocol: Protocol): Boolean =
8587
protocol.isFeatureSupported(ColumnMappingTableFeature)

spark/src/main/scala/org/apache/spark/sql/delta/DeltaParquetFileFormat.scala

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ import org.apache.spark.util.SerializableConfiguration
5555
case class DeltaParquetFileFormat(
5656
protocol: Protocol,
5757
metadata: Metadata,
58+
nullableRowTrackingFields: Boolean = false,
5859
isSplittable: Boolean = true,
5960
disablePushDowns: Boolean = false,
6061
tablePath: Option[String] = None,
@@ -197,12 +198,21 @@ case class DeltaParquetFileFormat(
197198
}
198199

199200
override def metadataSchemaFields: Seq[StructField] = {
200-
// Parquet reader in Spark has a bug where a file containing 2b+ rows in a single rowgroup
201-
// causes it to run out of the `Integer` range (TODO: Create a SPARK issue)
201+
val rowTrackingFields =
202+
RowTracking.createMetadataStructFields(protocol, metadata, nullableRowTrackingFields)
203+
// TODO(SPARK-47731): Parquet reader in Spark has a bug where a file containing 2b+ rows
204+
// in a single rowgroup causes it to run out of the `Integer` range.
202205
// For Delta Parquet readers don't expose the row_index field as a metadata field.
203-
super.metadataSchemaFields.filter(field => field != ParquetFileFormat.ROW_INDEX_FIELD) ++
204-
RowId.createBaseRowIdField(protocol, metadata) ++
205-
DefaultRowCommitVersion.createDefaultRowCommitVersionField(protocol, metadata)
206+
if (!RowId.isEnabled(protocol, metadata)) {
207+
super.metadataSchemaFields.filter(_ != ParquetFileFormat.ROW_INDEX_FIELD)
208+
} else {
209+
// It is fine to expose the row_index field as a metadata field when Row Tracking
210+
// is enabled because it is needed to generate the Row ID field, and it is not a
211+
// big problem if we use 2b+ rows in a single rowgroup, it will throw an exception and
212+
// we can then use less rows per rowgroup. Also, 2b+ rows in a single rowgroup is
213+
// not a common use case.
214+
super.metadataSchemaFields ++ rowTrackingFields
215+
}
206216
}
207217

208218
override def prepareWrite(
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
/*
2+
* Copyright (2021) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.apache.spark.sql.delta
18+
19+
import scala.collection.mutable
20+
21+
import org.apache.spark.sql.catalyst.expressions._
22+
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
23+
import org.apache.spark.sql.catalyst.rules.Rule
24+
import org.apache.spark.sql.catalyst.trees.TreePattern.PLAN_EXPRESSION
25+
import org.apache.spark.sql.execution.datasources.{FileFormat, HadoopFsRelation, LogicalRelation}
26+
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
27+
import org.apache.spark.sql.types.StructType
28+
29+
/**
30+
* This rule adds a Project on top of Delta tables that support the Row tracking table feature to
31+
* provide a default generated Row ID for rows that don't have them materialized in the data file.
32+
*/
33+
object GenerateRowIDs extends Rule[LogicalPlan] {
34+
35+
/**
36+
* Matcher for a scan on a Delta table that has Row tracking enabled.
37+
*/
38+
private object DeltaScanWithRowTrackingEnabled {
39+
def unapply(plan: LogicalPlan): Option[LogicalRelation] = plan match {
40+
case scan @ LogicalRelation(relation: HadoopFsRelation, _, _, _) =>
41+
relation.fileFormat match {
42+
case format: DeltaParquetFileFormat
43+
if RowTracking.isEnabled(format.protocol, format.metadata) => Some(scan)
44+
case _ => None
45+
}
46+
case _ => None
47+
}
48+
}
49+
50+
override def apply(plan: LogicalPlan): LogicalPlan = plan.transformUpWithNewOutput {
51+
case DeltaScanWithRowTrackingEnabled(scan) =>
52+
// While Row IDs are non-nullable, we'll use the Row ID attributes to read
53+
// the materialized values from now on, which can be null. We make
54+
// the materialized Row ID attributes nullable in the scan here.
55+
56+
// Update nullability in the scan `metadataOutput` by updating the delta file format.
57+
val baseRelation = scan.relation.asInstanceOf[HadoopFsRelation]
58+
val newFileFormat = baseRelation.fileFormat match {
59+
case format: DeltaParquetFileFormat =>
60+
format.copy(nullableRowTrackingFields = true)
61+
}
62+
val newBaseRelation = baseRelation.copy(fileFormat = newFileFormat)(baseRelation.sparkSession)
63+
64+
// Update the output metadata column's data type (now with nullable row tracking fields).
65+
val newOutput = scan.output.map {
66+
case MetadataAttributeWithLogicalName(metadata, FileFormat.METADATA_NAME) =>
67+
metadata.withDataType(newFileFormat.createFileMetadataCol().dataType)
68+
case other => other
69+
}
70+
val newScan = scan.copy(relation = newBaseRelation, output = newOutput)
71+
newScan.copyTagsFrom(scan)
72+
73+
// Add projection with row tracking column expressions.
74+
val updatedAttributes = mutable.Buffer.empty[(Attribute, Attribute)]
75+
val projectList = newOutput.map {
76+
case MetadataAttributeWithLogicalName(metadata, FileFormat.METADATA_NAME) =>
77+
val updatedMetadata = metadataWithRowTrackingColumnsProjection(metadata)
78+
updatedAttributes += metadata -> updatedMetadata.toAttribute
79+
updatedMetadata
80+
case other => other
81+
}
82+
Project(projectList = projectList, child = newScan) -> updatedAttributes.toSeq
83+
case o =>
84+
val newPlan = o.transformExpressionsWithPruning(_.containsPattern(PLAN_EXPRESSION)) {
85+
// Recurse into subquery plans. Similar to how [[transformUpWithSubqueries]] works except
86+
// that it allows us to still use [[transformUpWithNewOutput]] on subquery plans to
87+
// correctly update references to the metadata attribute when going up the plan.
88+
// Get around type erasure by explicitly checking the plan type and removing warning.
89+
case planExpression: PlanExpression[LogicalPlan @unchecked]
90+
if planExpression.plan.isInstanceOf[LogicalPlan] =>
91+
planExpression.withNewPlan(apply(planExpression.plan))
92+
}
93+
newPlan -> Nil
94+
}
95+
96+
/**
97+
* Expression that reads the Row IDs from the materialized Row ID column if the value is
98+
* present and returns the default generated Row ID using the file's base Row ID and current row
99+
* index if not:
100+
* coalesce(_metadata.row_id, _metadata.base_row_id + _metadata.row_index).
101+
*/
102+
private def rowIdExpr(metadata: AttributeReference): Expression = {
103+
Coalesce(Seq(
104+
getField(metadata, RowId.ROW_ID),
105+
Add(
106+
getField(metadata, RowId.BASE_ROW_ID),
107+
getField(metadata, ParquetFileFormat.ROW_INDEX))))
108+
}
109+
110+
/**
111+
* Extract a field from the metadata column.
112+
*/
113+
private def getField(metadata: AttributeReference, name: String): GetStructField = {
114+
ExtractValue(metadata, Literal(name), conf.resolver) match {
115+
case field: GetStructField => field
116+
case _ =>
117+
throw new IllegalStateException(s"The metadata column '${metadata.name}' is not a struct.")
118+
}
119+
}
120+
121+
/**
122+
* Create a new metadata struct where the Row ID values are populated using
123+
* the materialized values if present, or the default Row ID values if not.
124+
*/
125+
private def metadataWithRowTrackingColumnsProjection(metadata: AttributeReference)
126+
: NamedExpression = {
127+
val metadataFields = metadata.dataType.asInstanceOf[StructType].map {
128+
case field if field.name == RowId.ROW_ID =>
129+
field -> rowIdExpr(metadata)
130+
case field =>
131+
field -> getField(metadata, field.name)
132+
}.flatMap { case (oldField, newExpr) =>
133+
// Propagate the type metadata from the old fields to the new fields.
134+
val newField = Alias(newExpr, oldField.name)(explicitMetadata = Some(oldField.metadata))
135+
Seq(Literal(oldField.name), newField)
136+
}
137+
Alias(CreateNamedStruct(metadataFields), metadata.name)()
138+
}
139+
}

spark/src/main/scala/org/apache/spark/sql/delta/RowId.scala

Lines changed: 114 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,11 @@ import org.apache.spark.sql.delta.actions.{Action, AddFile, DomainMetadata, Meta
2020
import org.apache.spark.sql.delta.actions.TableFeatureProtocolUtils.propertyKey
2121
import org.apache.spark.sql.util.ScalaExtensions._
2222

23-
import org.apache.spark.sql.catalyst.expressions.FileSourceConstantMetadataStructField
23+
import org.apache.spark.sql.DataFrame
24+
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, FileSourceConstantMetadataStructField, FileSourceGeneratedMetadataStructField}
25+
import org.apache.spark.sql.catalyst.types.DataTypeUtils
26+
import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
27+
import org.apache.spark.sql.execution.datasources.FileFormat
2428
import org.apache.spark.sql.types
2529
import org.apache.spark.sql.types.{DataType, LongType, MetadataBuilder, StructField}
2630

@@ -155,4 +159,113 @@ object RowId {
155159
Option.when(RowId.isEnabled(protocol, metadata)) {
156160
BaseRowIdMetadataStructField()
157161
}
162+
163+
/** Row ID column name */
164+
val ROW_ID = "row_id"
165+
166+
val QUALIFIED_COLUMN_NAME = s"${FileFormat.METADATA_NAME}.${ROW_ID}"
167+
168+
/** Column metadata to be used in conjunction [[QUALIFIED_COLUMN_NAME]] to mark row id columns */
169+
def columnMetadata(materializedColumnName: String): types.Metadata =
170+
RowIdMetadataStructField.metadata(materializedColumnName)
171+
172+
/**
173+
* The field readers can use to access the generated row id column. The scanner's internal column
174+
* name is obtained from the table's metadata.
175+
*/
176+
def createRowIdField(protocol: Protocol, metadata: Metadata, nullable: Boolean)
177+
: Option[StructField] =
178+
MaterializedRowId.getMaterializedColumnName(protocol, metadata)
179+
.map(RowIdMetadataStructField(_, nullable))
180+
181+
/*
182+
* A specialization of [[FileSourceGeneratedMetadataStructField]] used to represent RowId columns.
183+
*
184+
* - Row ID columns can be read by adding '_metadata.row_id' to the read schema
185+
* - To write to the materialized Row ID column
186+
* - use the materialized Row ID column name which can be obtained using
187+
* [[getMaterializedColumnName]]
188+
* - add [[COLUMN_METADATA]] which is part of [[RowId]] as metadata to the column
189+
* - nulls are replaced with fresh Row IDs
190+
*/
191+
object RowIdMetadataStructField {
192+
193+
val ROW_ID_METADATA_COL_ATTR_KEY = "__row_id_metadata_col"
194+
195+
def metadata(materializedColumnName: String): types.Metadata = new MetadataBuilder()
196+
.withMetadata(
197+
FileSourceGeneratedMetadataStructField.metadata(RowId.ROW_ID, materializedColumnName))
198+
.putBoolean(ROW_ID_METADATA_COL_ATTR_KEY, value = true)
199+
.build()
200+
201+
def apply(materializedColumnName: String, nullable: Boolean = false): StructField =
202+
StructField(
203+
RowId.ROW_ID,
204+
LongType,
205+
// The Row ID field is used to read the materialized Row ID value which is nullable. The
206+
// actual Row ID expression is created using a projection injected before the optimizer pass
207+
// by the [[GenerateRowIDs] rule at which point the Row ID field is non-nullable.
208+
nullable,
209+
metadata = metadata(materializedColumnName))
210+
211+
def unapply(field: StructField): Option[StructField] =
212+
if (isRowIdColumn(field)) Some(field) else None
213+
214+
/** Return true if the column is a Row Id column. */
215+
def isRowIdColumn(structField: StructField): Boolean =
216+
isValid(structField.dataType, structField.metadata)
217+
218+
def isValid(dataType: DataType, metadata: types.Metadata): Boolean = {
219+
FileSourceGeneratedMetadataStructField.isValid(dataType, metadata) &&
220+
metadata.contains(ROW_ID_METADATA_COL_ATTR_KEY) &&
221+
metadata.getBoolean(ROW_ID_METADATA_COL_ATTR_KEY)
222+
}
223+
}
224+
225+
object RowIdMetadataAttribute {
226+
/** Creates an attribute for writing out the materialized column name */
227+
def apply(materializedColumnName: String): AttributeReference =
228+
DataTypeUtils.toAttribute(RowIdMetadataStructField(materializedColumnName))
229+
.withName(materializedColumnName)
230+
231+
def unapply(attr: Attribute): Option[Attribute] =
232+
if (isRowIdColumn(attr)) Some(attr) else None
233+
234+
/** Return true if the column is a Row Id column. */
235+
def isRowIdColumn(attr: Attribute): Boolean =
236+
RowIdMetadataStructField.isValid(attr.dataType, attr.metadata)
237+
}
238+
239+
/**
240+
* Throw if row tracking is supported and columns in the write schema tagged as materialized row
241+
* IDs do not reference the materialized row id column name.
242+
*/
243+
private[delta] def throwIfMaterializedRowIdColumnNameIsInvalid(
244+
data: DataFrame, metadata: Metadata, protocol: Protocol, tableId: String): Unit = {
245+
if (!RowTracking.isEnabled(protocol, metadata)) {
246+
return
247+
}
248+
249+
val materializedColumnName =
250+
metadata.configuration.get(MaterializedRowId.MATERIALIZED_COLUMN_NAME_PROP)
251+
252+
if (materializedColumnName.isEmpty) {
253+
// If row tracking is enabled, a missing materialized column name is a bug and we need to
254+
// throw an error. If row tracking is only supported, we should just return, as it's fine
255+
// for the materialized column to not be assigned.
256+
if (RowTracking.isEnabled(protocol, metadata)) {
257+
throw DeltaErrors.materializedRowIdMetadataMissing(tableId)
258+
}
259+
return
260+
}
261+
262+
toAttributes(data.schema).foreach {
263+
case RowIdMetadataAttribute(attribute) =>
264+
if (attribute.name != materializedColumnName.get) {
265+
throw new UnsupportedOperationException("Materialized Row IDs column name " +
266+
s"${attribute.name} is invalid. Must be ${materializedColumnName.get}.")
267+
}
268+
case _ =>
269+
}
270+
}
158271
}

spark/src/main/scala/org/apache/spark/sql/delta/RowTracking.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package org.apache.spark.sql.delta
1818

1919
import org.apache.spark.sql.delta.actions.{Metadata, Protocol, TableFeatureProtocolUtils}
2020

21+
import org.apache.spark.sql.types.StructField
2122

2223
/**
2324
* Utility functions for Row Tracking that are shared between Row IDs and Row Commit Versions.
@@ -60,4 +61,15 @@ object RowTracking {
6061
throw DeltaErrors.convertToDeltaRowTrackingEnabledWithoutStatsCollection
6162
}
6263
}
64+
65+
/**
66+
* Returns the Row Tracking metadata fields for the file's _metadata when Row Tracking
67+
* is enabled.
68+
*/
69+
def createMetadataStructFields(protocol: Protocol, metadata: Metadata, nullable: Boolean)
70+
: Iterable[StructField] = {
71+
RowId.createRowIdField(protocol, metadata, nullable) ++
72+
RowId.createBaseRowIdField(protocol, metadata) ++
73+
DefaultRowCommitVersion.createDefaultRowCommitVersionField(protocol, metadata)
74+
}
6375
}

spark/src/main/scala/org/apache/spark/sql/delta/files/TransactionalWrite.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,11 @@ trait TransactionalWrite extends DeltaLogging { self: OptimisticTransactionImpl
106106
val normalizedData = SchemaUtils.normalizeColumnNames(
107107
deltaLog, metadata.schema, data
108108
)
109+
110+
// Validate that write columns for Row IDs have the correct name.
111+
RowId.throwIfMaterializedRowIdColumnNameIsInvalid(
112+
normalizedData, metadata, protocol, deltaLog.tableId)
113+
109114
val nullAsDefault = options.isDefined &&
110115
options.get.options.contains(ColumnWithDefaultExprUtils.USE_NULL_AS_DEFAULT_DELTA_OPTION)
111116
val enforcesDefaultExprs = ColumnWithDefaultExprUtils.tableHasDefaultExpr(

0 commit comments

Comments
 (0)