Skip to content

Commit eafaf5e

Browse files
committed
Handle timestamp_ntz in delta conversion target
1 parent dafe24a commit eafaf5e

File tree

5 files changed

+85
-12
lines changed

5 files changed

+85
-12
lines changed

xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,9 +73,9 @@
7373

7474
@Log4j2
7575
public class DeltaConversionTarget implements ConversionTarget {
76-
private static final String MIN_READER_VERSION = String.valueOf(1);
76+
private static final int MIN_READER_VERSION = 1;
7777
// gets access to generated columns.
78-
private static final String MIN_WRITER_VERSION = String.valueOf(4);
78+
private static final int MIN_WRITER_VERSION = 4;
7979

8080
private DeltaLog deltaLog;
8181
private DeltaSchemaExtractor schemaExtractor;
@@ -329,8 +329,14 @@ private void commitTransaction() {
329329

330330
private Map<String, String> getConfigurationsForDeltaSync() {
331331
Map<String, String> configMap = new HashMap<>();
332-
configMap.put(DeltaConfigs.MIN_READER_VERSION().key(), MIN_READER_VERSION);
333-
configMap.put(DeltaConfigs.MIN_WRITER_VERSION().key(), MIN_WRITER_VERSION);
332+
configMap.put(
333+
DeltaConfigs.MIN_READER_VERSION().key(),
334+
String.valueOf(
335+
Math.max(deltaLog.snapshot().protocol().minReaderVersion(), MIN_READER_VERSION)));
336+
configMap.put(
337+
DeltaConfigs.MIN_WRITER_VERSION().key(),
338+
String.valueOf(
339+
Math.max(deltaLog.snapshot().protocol().minWriterVersion(), MIN_WRITER_VERSION)));
334340
configMap.put(TableSyncMetadata.XTABLE_METADATA, metadata.toJson());
335341
// Sets retention for the Delta Log, does not impact underlying files in the table
336342
configMap.put(

xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,11 @@
5656
public class DeltaSchemaExtractor {
5757
private static final String DELTA_COLUMN_MAPPING_ID = "delta.columnMapping.id";
5858
private static final DeltaSchemaExtractor INSTANCE = new DeltaSchemaExtractor();
59+
// Timestamps in Delta are microsecond precision by default
60+
private static final Map<InternalSchema.MetadataKey, Object>
61+
DEFAULT_TIMESTAMP_PRECISION_METADATA =
62+
Collections.singletonMap(
63+
InternalSchema.MetadataKey.TIMESTAMP_PRECISION, InternalSchema.MetadataValue.MICROS);
5964

6065
public static DeltaSchemaExtractor getInstance() {
6166
return INSTANCE;
@@ -110,11 +115,11 @@ private InternalSchema toInternalSchema(
110115
break;
111116
case "timestamp":
112117
type = InternalType.TIMESTAMP;
113-
// Timestamps in Delta are microsecond precision by default
114-
metadata =
115-
Collections.singletonMap(
116-
InternalSchema.MetadataKey.TIMESTAMP_PRECISION,
117-
InternalSchema.MetadataValue.MICROS);
118+
metadata = DEFAULT_TIMESTAMP_PRECISION_METADATA;
119+
break;
120+
case "timestamp_ntz":
121+
type = InternalType.TIMESTAMP_NTZ;
122+
metadata = DEFAULT_TIMESTAMP_PRECISION_METADATA;
118123
break;
119124
case "struct":
120125
StructType structType = (StructType) dataType;

xtable-core/src/main/java/org/apache/xtable/schema/SparkSchemaExtractor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@ private DataType convertFieldType(InternalField field) {
6262
case INT:
6363
return DataTypes.IntegerType;
6464
case LONG:
65-
case TIMESTAMP_NTZ:
6665
return DataTypes.LongType;
6766
case BYTES:
6867
case FIXED:
@@ -76,6 +75,8 @@ private DataType convertFieldType(InternalField field) {
7675
return DataTypes.DateType;
7776
case TIMESTAMP:
7877
return DataTypes.TimestampType;
78+
case TIMESTAMP_NTZ:
79+
return DataTypes.TimestampNTZType;
7980
case DOUBLE:
8081
return DataTypes.DoubleType;
8182
case DECIMAL:

xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import org.apache.spark.sql.SparkSession;
5454
import org.apache.spark.sql.types.DataTypes;
5555
import org.junit.jupiter.api.AfterAll;
56+
import org.junit.jupiter.api.Assertions;
5657
import org.junit.jupiter.api.BeforeAll;
5758
import org.junit.jupiter.api.BeforeEach;
5859
import org.junit.jupiter.api.Test;
@@ -77,6 +78,7 @@
7778
import io.delta.standalone.expressions.EqualTo;
7879
import io.delta.standalone.expressions.Expression;
7980
import io.delta.standalone.expressions.Literal;
81+
import io.delta.standalone.internal.exception.DeltaErrors;
8082
import io.delta.standalone.types.IntegerType;
8183
import io.delta.standalone.types.StringType;
8284

@@ -431,6 +433,46 @@ public void testGetTargetCommitIdentifierWithNullSourceIdentifier() throws Excep
431433
assertFalse(unmappedTargetId.isPresent());
432434
}
433435

436+
@Test
437+
public void testTimestampNtz() {
438+
InternalSchema schema1 = getInternalSchemaWithTimestampNtz();
439+
List<InternalField> fields2 = new ArrayList<>(schema1.getFields());
440+
fields2.add(
441+
InternalField.builder()
442+
.name("float_field")
443+
.schema(
444+
InternalSchema.builder()
445+
.name("float")
446+
.dataType(InternalType.FLOAT)
447+
.isNullable(true)
448+
.build())
449+
.build());
450+
InternalSchema schema2 = getInternalSchema().toBuilder().fields(fields2).build();
451+
InternalTable table1 = getInternalTable(tableName, basePath, schema1, null, LAST_COMMIT_TIME);
452+
InternalTable table2 = getInternalTable(tableName, basePath, schema2, null, LAST_COMMIT_TIME);
453+
454+
InternalDataFile dataFile1 = getDataFile(1, Collections.emptyList(), basePath);
455+
InternalDataFile dataFile2 = getDataFile(2, Collections.emptyList(), basePath);
456+
InternalDataFile dataFile3 = getDataFile(3, Collections.emptyList(), basePath);
457+
458+
InternalSnapshot snapshot1 = buildSnapshot(table1, "0", dataFile1, dataFile2);
459+
InternalSnapshot snapshot2 = buildSnapshot(table2, "1", dataFile2, dataFile3);
460+
461+
TableFormatSync.getInstance()
462+
.syncSnapshot(Collections.singletonList(conversionTarget), snapshot1);
463+
// Delta standalone library can't read versions (3,7) and needs delta kernel dependency.
464+
Assertions.assertThrows(
465+
DeltaErrors.InvalidProtocolVersionException.class,
466+
() ->
467+
validateDeltaTable(basePath, new HashSet<>(Arrays.asList(dataFile1, dataFile2)), null));
468+
TableFormatSync.getInstance()
469+
.syncSnapshot(Collections.singletonList(conversionTarget), snapshot2);
470+
Assertions.assertThrows(
471+
DeltaErrors.InvalidProtocolVersionException.class,
472+
() ->
473+
validateDeltaTable(basePath, new HashSet<>(Arrays.asList(dataFile1, dataFile2)), null));
474+
}
475+
434476
private static Stream<Arguments> timestampPartitionTestingArgs() {
435477
return Stream.of(
436478
Arguments.of(PartitionTransformType.YEAR),
@@ -563,6 +605,25 @@ private InternalSchema getInternalSchema() {
563605
.build();
564606
}
565607

608+
private InternalSchema getInternalSchemaWithTimestampNtz() {
609+
Map<InternalSchema.MetadataKey, Object> timestampMetadata = new HashMap<>();
610+
timestampMetadata.put(
611+
InternalSchema.MetadataKey.TIMESTAMP_PRECISION, InternalSchema.MetadataValue.MICROS);
612+
List<InternalField> fields = new ArrayList<>(getInternalSchema().getFields());
613+
fields.add(
614+
InternalField.builder()
615+
.name("timestamp_ntz_field")
616+
.schema(
617+
InternalSchema.builder()
618+
.name("time_ntz")
619+
.dataType(InternalType.TIMESTAMP_NTZ)
620+
.isNullable(true)
621+
.metadata(timestampMetadata)
622+
.build())
623+
.build());
624+
return getInternalSchema().toBuilder().fields(fields).build();
625+
}
626+
566627
private static SparkSession buildSparkSession() {
567628
SparkConf sparkConf =
568629
new SparkConf()

xtable-core/src/test/java/org/apache/xtable/schema/TestSparkSchemaExtractor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -385,8 +385,8 @@ public void testTimestamps() {
385385

386386
StructType structRepresentationTimestampNtz =
387387
new StructType()
388-
.add("requiredTimestampNtz", DataTypes.LongType, false)
389-
.add("optionalTimestampNtz", DataTypes.LongType, true);
388+
.add("requiredTimestampNtz", DataTypes.TimestampNTZType, false)
389+
.add("optionalTimestampNtz", DataTypes.TimestampNTZType, true);
390390

391391
Assertions.assertEquals(
392392
structRepresentationTimestamp,

0 commit comments

Comments
 (0)