Skip to content

refactor!: txn-specific write_metadata_schema #1021

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions kernel/src/engine/default/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ impl<E: TaskExecutor> DefaultEngine<E> {
.write_parquet_file(
write_context.target_dir(),
physical_data,
write_context.write_metadata_schema().clone(),
partition_values,
data_change,
)
Expand Down
34 changes: 21 additions & 13 deletions kernel/src/engine/default/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ impl DataFileMetadata {
// convert DataFileMetadata into a record batch which matches the 'write_metadata' schema
fn as_record_batch(
&self,
write_metadata_schema: SchemaRef,
partition_values: &HashMap<String, String>,
data_change: bool,
) -> DeltaResult<Box<dyn EngineData>> {
Expand All @@ -62,7 +63,6 @@ impl DataFileMetadata {
size,
},
} = self;
let write_metadata_schema = crate::transaction::get_write_metadata_schema();

// create the record batch of the write metadata
let path = Arc::new(StringArray::from(vec![location.to_string()]));
Expand Down Expand Up @@ -166,19 +166,18 @@ impl<E: TaskExecutor> DefaultParquetHandler<E> {
}

/// Write `data` to `{path}/<uuid>.parquet` as parquet using ArrowWriter and return the parquet
/// metadata as an EngineData batch which matches the [write metadata] schema (where `<uuid>` is
/// metadata as an EngineData batch which matches the `write_metadata_schema` (where `<uuid>` is
/// a generated UUIDv4).
///
/// [write metadata]: crate::transaction::get_write_metadata_schema
pub async fn write_parquet_file(
&self,
path: &url::Url,
data: Box<dyn EngineData>,
write_metadata_schema: SchemaRef,
partition_values: HashMap<String, String>,
data_change: bool,
) -> DeltaResult<Box<dyn EngineData>> {
let parquet_metadata = self.write_parquet(path, data).await?;
parquet_metadata.as_record_batch(&partition_values, data_change)
parquet_metadata.as_record_batch(write_metadata_schema, &partition_values, data_change)
}
}

Expand Down Expand Up @@ -406,6 +405,7 @@ mod tests {
use crate::engine::arrow_conversion::TryIntoKernel as _;
use crate::engine::arrow_data::ArrowEngineData;
use crate::engine::default::executor::tokio::TokioBackgroundExecutor;
use crate::schema::{DataType, MapType, StructField, StructType};
use crate::EngineData;

use itertools::Itertools;
Expand Down Expand Up @@ -472,17 +472,25 @@ mod tests {
let data_file_metadata = DataFileMetadata::new(file_metadata);
let partition_values = HashMap::from([("partition1".to_string(), "a".to_string())]);
let data_change = true;
let write_metadata_schema = Arc::new(StructType::new(vec![
StructField::not_null("path", DataType::STRING),
StructField::not_null(
"partitionValues",
MapType::new(DataType::STRING, DataType::STRING, true),
),
StructField::not_null("size", DataType::LONG),
StructField::not_null("modificationTime", DataType::LONG),
StructField::not_null("dataChange", DataType::BOOLEAN),
]));
let actual = data_file_metadata
.as_record_batch(&partition_values, data_change)
.as_record_batch(
write_metadata_schema.clone(),
&partition_values,
data_change,
)
.unwrap();
let actual = ArrowEngineData::try_from_engine_data(actual).unwrap();

let schema = Arc::new(
crate::transaction::get_write_metadata_schema()
.as_ref()
.try_into_arrow()
.unwrap(),
);
let key_builder = StringBuilder::new();
let val_builder = StringBuilder::new();
let mut partition_values_builder = MapBuilder::new(
Expand All @@ -499,7 +507,7 @@ mod tests {
partition_values_builder.append(true).unwrap();
let partition_values = partition_values_builder.finish();
let expected = RecordBatch::try_new(
schema,
Arc::new(write_metadata_schema.as_ref().try_into_arrow().unwrap()),
vec![
Arc::new(StringArray::from(vec![location.to_string()])),
Arc::new(partition_values),
Expand Down
124 changes: 74 additions & 50 deletions kernel/src/transaction.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::collections::HashSet;
use std::iter;
use std::sync::{Arc, LazyLock};
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};

use crate::actions::SetTransaction;
Expand All @@ -18,26 +18,6 @@
const KERNEL_VERSION: &str = env!("CARGO_PKG_VERSION");
const UNKNOWN_OPERATION: &str = "UNKNOWN";

pub(crate) static WRITE_METADATA_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
Arc::new(StructType::new(vec![
StructField::not_null("path", DataType::STRING),
StructField::not_null(
"partitionValues",
MapType::new(DataType::STRING, DataType::STRING, true),
),
StructField::not_null("size", DataType::LONG),
StructField::not_null("modificationTime", DataType::LONG),
StructField::not_null("dataChange", DataType::BOOLEAN),
]))
});

/// Get the expected schema for engine data passed to [`add_write_metadata`].
///
/// [`add_write_metadata`]: crate::transaction::Transaction::add_write_metadata
pub fn get_write_metadata_schema() -> &'static SchemaRef {
&WRITE_METADATA_SCHEMA
}

/// A transaction represents an in-progress write to a table. After creating a transaction, changes
/// to the table may be staged via the transaction methods before calling `commit` to commit the
/// changes to the table.
Expand All @@ -56,6 +36,7 @@
read_snapshot: Arc<Snapshot>,
operation: Option<String>,
commit_info: Option<Arc<dyn EngineData>>,
write_metadata_schema: SchemaRef,
write_metadata: Vec<Box<dyn EngineData>>,
// NB: hashmap would require either duplicating the appid or splitting SetTransaction
// key/payload. HashSet requires Borrow<&str> with matching Eq, Ord, and Hash. Plus,
Expand Down Expand Up @@ -93,6 +74,17 @@
.table_configuration()
.ensure_write_supported()?;

let write_metadata_schema = Arc::new(StructType::new(vec![
StructField::not_null("path", DataType::STRING),
StructField::not_null(
"partitionValues",
MapType::new(DataType::STRING, DataType::STRING, true),
),
StructField::not_null("size", DataType::LONG),
StructField::not_null("modificationTime", DataType::LONG),
StructField::not_null("dataChange", DataType::BOOLEAN),
]));

// TODO: unify all these into a (safer) `fn current_time_ms()`
let commit_timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
Expand All @@ -104,6 +96,7 @@
read_snapshot,
operation: None,
commit_info: None,
write_metadata_schema,
write_metadata: vec![],
set_transactions: vec![],
commit_timestamp,
Expand Down Expand Up @@ -145,7 +138,8 @@
self.commit_timestamp,
engine_commit_info.as_ref(),
);
let add_actions = generate_adds(engine, self.write_metadata.iter().map(|a| a.as_ref()));
let add_actions =
self.generate_adds(engine, self.write_metadata.iter().map(|a| a.as_ref()));

let actions = iter::once(commit_info_actions)
.chain(add_actions)
Expand Down Expand Up @@ -223,42 +217,48 @@
pub fn get_write_context(&self) -> WriteContext {
let target_dir = self.read_snapshot.table_root();
let snapshot_schema = self.read_snapshot.schema();
let write_metadata_schema = self.write_metadata_schema.clone();
let logical_to_physical = self.generate_logical_to_physical();
WriteContext::new(target_dir.clone(), snapshot_schema, logical_to_physical)
WriteContext::new(
target_dir.clone(),
snapshot_schema,
write_metadata_schema,
logical_to_physical,
)
}

/// Add write metadata about files to include in the transaction. This API can be called
/// multiple times to add multiple batches.
///
/// The expected schema for `write_metadata` is given by [`get_write_metadata_schema`].

Check failure on line 233 in kernel/src/transaction.rs

View workflow job for this annotation

GitHub Actions / docs

unresolved link to `get_write_metadata_schema`
pub fn add_write_metadata(&mut self, write_metadata: Box<dyn EngineData>) {
self.write_metadata.push(write_metadata);
}
}

// convert write_metadata into add actions using an expression to transform the data in a single
// pass
fn generate_adds<'a>(
engine: &dyn Engine,
write_metadata: impl Iterator<Item = &'a dyn EngineData> + Send + 'a,
) -> impl Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send + 'a {
let evaluation_handler = engine.evaluation_handler();
let write_metadata_schema = get_write_metadata_schema();
let log_schema = get_log_add_schema();

write_metadata.map(move |write_metadata_batch| {
let adds_expr = Expression::struct_from([Expression::struct_from(
write_metadata_schema
.fields()
.map(|f| Expression::column([f.name()])),
)]);
let adds_evaluator = evaluation_handler.new_expression_evaluator(
write_metadata_schema.clone(),
adds_expr,
log_schema.clone().into(),
);
adds_evaluator.evaluate(write_metadata_batch)
})
// convert write_metadata into add actions using an expression to transform the data in a single
// pass
fn generate_adds<'a>(
&'a self,
engine: &dyn Engine,
write_metadata: impl Iterator<Item = &'a dyn EngineData> + Send + 'a,
) -> impl Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send + 'a {
Comment on lines +240 to +244
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that we take &'a self, I think we can remove the named lifetimes?
At worst we might need + '_ for the iterators?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, the compiler is yelling that anonymous lifetimes are unstable in impl trait..

let evaluation_handler = engine.evaluation_handler();
Copy link
Collaborator Author

@zachschuermann zachschuermann Jun 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

review with whitespace hidden

let log_schema = get_log_add_schema();

write_metadata.map(move |write_metadata_batch| {
let adds_expr = Expression::struct_from([Expression::struct_from(
self.write_metadata_schema
.fields()
.map(|f| Expression::column([f.name()])),
)]);
let adds_evaluator = evaluation_handler.new_expression_evaluator(
self.write_metadata_schema.clone(),
adds_expr,
log_schema.clone().into(),
);
adds_evaluator.evaluate(write_metadata_batch)
})
}
}

/// WriteContext is data derived from a [`Transaction`] that can be provided to writers in order to
Expand All @@ -268,14 +268,21 @@
pub struct WriteContext {
target_dir: Url,
schema: SchemaRef,
write_metadata_schema: SchemaRef,
logical_to_physical: Expression,
}

impl WriteContext {
fn new(target_dir: Url, schema: SchemaRef, logical_to_physical: Expression) -> Self {
fn new(
target_dir: Url,
schema: SchemaRef,
write_metadata_schema: SchemaRef,
logical_to_physical: Expression,
) -> Self {
WriteContext {
target_dir,
schema,
write_metadata_schema,
logical_to_physical,
}
}
Expand All @@ -284,6 +291,13 @@
&self.target_dir
}

/// Get the expected schema for engine data passed to [`add_write_metadata`].
///
/// [`add_write_metadata`]: crate::transaction::Transaction::add_write_metadata
pub fn write_metadata_schema(&self) -> &SchemaRef {
&self.write_metadata_schema
}

pub fn schema(&self) -> &SchemaRef {
&self.schema
}
Expand Down Expand Up @@ -383,8 +397,11 @@
mod tests {
use super::*;

use std::path::PathBuf;

use crate::engine::arrow_data::ArrowEngineData;
use crate::engine::arrow_expression::ArrowEvaluationHandler;
use crate::engine::sync::SyncEngine;
use crate::schema::MapType;
use crate::{EvaluationHandler, JsonHandler, ParquetHandler, StorageHandler};

Expand Down Expand Up @@ -715,8 +732,14 @@
}

#[test]
fn test_write_metadata_schema() {
let schema = get_write_metadata_schema();
fn test_write_metadata_schema() -> DeltaResult<()> {
let path = std::fs::canonicalize(PathBuf::from("./tests/data/basic_partitioned/"))?;
let url = Url::from_directory_path(path).unwrap();
let engine = SyncEngine::new();
let snapshot = Snapshot::try_new(url, &engine, None)?;
let txn = Transaction::try_new(snapshot)?;
let ctx = txn.get_write_context();
let schema = ctx.write_metadata_schema();
let expected = StructType::new(vec![
StructField::not_null("path", DataType::STRING),
StructField::not_null(
Expand All @@ -728,5 +751,6 @@
StructField::not_null("dataChange", DataType::BOOLEAN),
]);
assert_eq!(*schema, expected.into());
Ok(())
}
}
Loading