-
Notifications
You must be signed in to change notification settings - Fork 178
Handle timestamp_ntz in delta and iceberg #647
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -53,6 +53,7 @@ | |
import org.apache.spark.sql.SparkSession; | ||
import org.apache.spark.sql.types.DataTypes; | ||
import org.junit.jupiter.api.AfterAll; | ||
import org.junit.jupiter.api.Assertions; | ||
import org.junit.jupiter.api.BeforeAll; | ||
import org.junit.jupiter.api.BeforeEach; | ||
import org.junit.jupiter.api.Test; | ||
|
@@ -94,6 +95,7 @@ | |
import org.apache.xtable.model.storage.DataLayoutStrategy; | ||
import org.apache.xtable.model.storage.FileFormat; | ||
import org.apache.xtable.model.storage.InternalDataFile; | ||
import org.apache.xtable.model.storage.InternalFile; | ||
import org.apache.xtable.model.storage.PartitionFileGroup; | ||
import org.apache.xtable.model.storage.TableFormat; | ||
import org.apache.xtable.schema.SchemaFieldFinder; | ||
|
@@ -431,6 +433,39 @@ public void testGetTargetCommitIdentifierWithNullSourceIdentifier() throws Excep | |
assertFalse(unmappedTargetId.isPresent()); | ||
} | ||
|
||
@Test | ||
public void testTimestampNtz() { | ||
InternalSchema schema1 = getInternalSchemaWithTimestampNtz(); | ||
List<InternalField> fields2 = new ArrayList<>(schema1.getFields()); | ||
fields2.add( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Have you tested whether the write works if the initial schema contains the timestamp_ntz? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes in this test, schema1 contains ntz column and the second commit is adding a nullable float column. |
||
InternalField.builder() | ||
.name("float_field") | ||
.schema( | ||
InternalSchema.builder() | ||
.name("float") | ||
.dataType(InternalType.FLOAT) | ||
.isNullable(true) | ||
.build()) | ||
.build()); | ||
InternalSchema schema2 = getInternalSchema().toBuilder().fields(fields2).build(); | ||
InternalTable table1 = getInternalTable(tableName, basePath, schema1, null, LAST_COMMIT_TIME); | ||
InternalTable table2 = getInternalTable(tableName, basePath, schema2, null, LAST_COMMIT_TIME); | ||
|
||
InternalDataFile dataFile1 = getDataFile(1, Collections.emptyList(), basePath); | ||
InternalDataFile dataFile2 = getDataFile(2, Collections.emptyList(), basePath); | ||
InternalDataFile dataFile3 = getDataFile(3, Collections.emptyList(), basePath); | ||
|
||
InternalSnapshot snapshot1 = buildSnapshot(table1, "0", dataFile1, dataFile2); | ||
InternalSnapshot snapshot2 = buildSnapshot(table2, "1", dataFile2, dataFile3); | ||
|
||
TableFormatSync.getInstance() | ||
.syncSnapshot(Collections.singletonList(conversionTarget), snapshot1); | ||
validateDeltaTableUsingSpark(basePath, new HashSet<>(Arrays.asList(dataFile1, dataFile2))); | ||
TableFormatSync.getInstance() | ||
.syncSnapshot(Collections.singletonList(conversionTarget), snapshot2); | ||
validateDeltaTableUsingSpark(basePath, new HashSet<>(Arrays.asList(dataFile2, dataFile3))); | ||
} | ||
|
||
private static Stream<Arguments> timestampPartitionTestingArgs() { | ||
return Stream.of( | ||
Arguments.of(PartitionTransformType.YEAR), | ||
|
@@ -472,6 +507,13 @@ private void validateDeltaTable( | |
internalDataFiles.size(), count, "Number of files from DeltaScan don't match expectation"); | ||
} | ||
|
||
private void validateDeltaTableUsingSpark( | ||
Path basePath, Set<InternalDataFile> internalDataFiles) { | ||
Dataset<Row> dataset = sparkSession.read().format("delta").load(basePath.toString()); | ||
long countFromFiles = internalDataFiles.stream().mapToLong(InternalFile::getRecordCount).sum(); | ||
Assertions.assertEquals(countFromFiles, dataset.count()); | ||
} | ||
|
||
private InternalSnapshot buildSnapshot( | ||
InternalTable table, String sourceIdentifier, InternalDataFile... dataFiles) { | ||
return InternalSnapshot.builder() | ||
|
@@ -563,6 +605,25 @@ private InternalSchema getInternalSchema() { | |
.build(); | ||
} | ||
|
||
private InternalSchema getInternalSchemaWithTimestampNtz() { | ||
Map<InternalSchema.MetadataKey, Object> timestampMetadata = new HashMap<>(); | ||
timestampMetadata.put( | ||
InternalSchema.MetadataKey.TIMESTAMP_PRECISION, InternalSchema.MetadataValue.MICROS); | ||
List<InternalField> fields = new ArrayList<>(getInternalSchema().getFields()); | ||
fields.add( | ||
InternalField.builder() | ||
.name("timestamp_ntz_field") | ||
.schema( | ||
InternalSchema.builder() | ||
.name("time_ntz") | ||
.dataType(InternalType.TIMESTAMP_NTZ) | ||
.isNullable(true) | ||
.metadata(timestampMetadata) | ||
.build()) | ||
.build()); | ||
return getInternalSchema().toBuilder().fields(fields).build(); | ||
} | ||
|
||
private static SparkSession buildSparkSession() { | ||
SparkConf sparkConf = | ||
new SparkConf() | ||
|
Uh oh!
There was an error while loading. Please reload this page.