Skip to content

Commit f6ebe24

Browse files
authored
[Kernel][Writes] Write timestamp as INT64 type to Parquet data files (#3084)
## Description Write the `timestamp` as `INT64` physical format in Parquet. Currently, it is written as `INT96` which is a very old method of writing timestamp and deprecated a long time ago. Also, collect statistics, for `timestamp` type columns. ## How was this patch tested? Update the existing tests.
1 parent a5d7c69 commit f6ebe24

File tree

4 files changed

+26
-45
lines changed

4 files changed

+26
-45
lines changed

kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ParquetColumnWriters.java

Lines changed: 6 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616
package io.delta.kernel.defaults.internal.parquet;
1717

1818
import java.math.BigDecimal;
19-
import java.nio.ByteBuffer;
20-
import java.nio.ByteOrder;
2119
import java.nio.charset.StandardCharsets;
2220
import java.util.Arrays;
2321
import static java.util.Objects.requireNonNull;
@@ -28,10 +26,8 @@
2826
import io.delta.kernel.data.*;
2927
import io.delta.kernel.types.*;
3028

31-
import io.delta.kernel.internal.util.Tuple2;
3229
import static io.delta.kernel.internal.util.Preconditions.checkArgument;
3330

34-
import io.delta.kernel.defaults.internal.DefaultKernelUtils;
3531
import static io.delta.kernel.defaults.internal.parquet.ParquetSchemaUtils.MAX_BYTES_PER_PRECISION;
3632

3733
/**
@@ -134,10 +130,9 @@ private static ColumnWriter createColumnWriter(
134130
return new DecimalFixedBinaryWriter(colName, fieldIndex, columnVector);
135131
} else if (dataType instanceof DateType) {
136132
return new DateWriter(colName, fieldIndex, columnVector);
137-
} else if (dataType instanceof TimestampType) {
133+
} else if (dataType instanceof TimestampType || dataType instanceof TimestampNTZType) {
134+
// for both get the input as long type from column vector and write to file as INT64
138135
return new TimestampWriter(colName, fieldIndex, columnVector);
139-
} else if (dataType instanceof TimestampNTZType) {
140-
return new TimestampNTZWriter(colName, fieldIndex, columnVector);
141136
} else if (dataType instanceof ArrayType) {
142137
return new ArrayWriter(colName, fieldIndex, columnVector);
143138
} else if (dataType instanceof MapType) {
@@ -338,38 +333,15 @@ void writeNonNullRowValue(RecordConsumer recordConsumer, int rowId) {
338333
}
339334
}
340335

341-
static class TimestampWriter extends ColumnWriter {
342-
// Reuse this buffer to avoid allocating a new buffer for each row
343-
private final byte[] reusedBuffer = new byte[12];
344336

337+
/**
338+
* Writer for both timestamp and timestamp with time zone.
339+
*/
340+
static class TimestampWriter extends ColumnWriter {
345341
TimestampWriter(String name, int fieldId, ColumnVector columnVector) {
346342
super(name, fieldId, columnVector);
347343
}
348344

349-
@Override
350-
void writeNonNullRowValue(RecordConsumer recordConsumer, int rowId) {
351-
// TODO: Spark has various handling mode for DateType, need to check if it is needed
352-
// for Delta Kernel.
353-
354-
// For now write as INT96 which is the most supported format for timestamps
355-
// Later on, depending upon the config, we can write either as INT64 or INT96
356-
long microsSinceEpochUTC = columnVector.getLong(rowId);
357-
Tuple2<Integer, Long> julianDayRemainingNanos =
358-
DefaultKernelUtils.toJulianDay(microsSinceEpochUTC);
359-
360-
ByteBuffer buffer = ByteBuffer.wrap(reusedBuffer);
361-
buffer.order(ByteOrder.LITTLE_ENDIAN)
362-
.putLong(julianDayRemainingNanos._2) // timeOfDayNanos
363-
.putInt(julianDayRemainingNanos._1); // julianDay
364-
recordConsumer.addBinary(Binary.fromReusedByteArray(reusedBuffer));
365-
}
366-
}
367-
368-
static class TimestampNTZWriter extends ColumnWriter {
369-
TimestampNTZWriter(String name, int fieldId, ColumnVector columnVector) {
370-
super(name, fieldId, columnVector);
371-
}
372-
373345
@Override
374346
void writeNonNullRowValue(RecordConsumer recordConsumer, int rowId) {
375347
long microsSinceEpochUTC = columnVector.getLong(rowId);

kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ParquetSchemaUtils.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -229,8 +229,14 @@ private static Type toParquetType(
229229
} else if (dataType instanceof DateType) {
230230
type = primitive(INT32, repetition).as(LogicalTypeAnnotation.dateType()).named(name);
231231
} else if (dataType instanceof TimestampType) {
232-
// We are supporting only the INT96 format now.
233-
type = primitive(INT96, repetition).named(name);
232+
// Kernel is by default going to write as INT64 with isAdjustedToUTC set to true
233+
// Delta-Spark writes as INT96 for legacy reasons (maintaining compatibility with
234+
// unknown consumers with very, very old versions of Parquet reader). Kernel is a new
235+
// project, and we are ok if it breaks readers (we use this opportunity to find such
236+
// readers and ask them to upgrade).
237+
type = primitive(INT64, repetition)
238+
.as(timestampType(true /* isAdjustedToUTC */, MICROS))
239+
.named(name);
234240
} else if (dataType instanceof TimestampNTZType) {
235241
// Write as INT64 with isAdjustedToUTC set to false
236242
type = primitive(INT64, repetition)

kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ParquetStatsReader.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,12 @@ private static Literal decodeMinMaxStat(
181181
statistics instanceof IntStatistics,
182182
"Column with DATE type contained invalid statistics: %s", statistics);
183183
return Literal.ofDate((Integer) statValue); // stats are stored as epoch days in Parquet
184+
} else if (dataType instanceof TimestampType) {
185+
// Kernel Parquet writer always writes timestamps in INT64 format
186+
checkArgument(
187+
statistics instanceof LongStatistics,
188+
"Column with TIMESTAMP type contained invalid statistics: %s", statistics);
189+
return Literal.ofTimestamp((Long) statValue);
184190
} else if (dataType instanceof TimestampNTZType) {
185191
checkArgument(
186192
statistics instanceof LongStatistics,
@@ -235,11 +241,10 @@ private static boolean isStatsSupportedDataType(DataType dataType) {
235241
dataType instanceof DoubleType ||
236242
dataType instanceof DecimalType ||
237243
dataType instanceof DateType ||
244+
dataType instanceof TimestampType ||
238245
dataType instanceof TimestampNTZType ||
239246
dataType instanceof StringType ||
240247
dataType instanceof BinaryType;
241-
// TODO: timestamp is complicated to handle because of the storage format (INT96 or INT64).
242-
// Add support later.
243248
}
244249

245250
private static byte[] getBinaryStat(Statistics<?> statistics, boolean decodeMin) {

kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/parquet/ParquetFileWriterSuite.scala

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ class ParquetFileWriterSuite extends AnyFunSuite
5858

5959
Seq(
6060
// Test cases reading and writing all types of data with or without stats collection
61-
Seq((200, 67), (1024, 17), (1048576, 1)).map {
61+
Seq((200, 67), (1024, 16), (1048576, 1)).map {
6262
case (targetFileSize, expParquetFileCount) =>
6363
(
6464
"write all types (no stats)", // test name
@@ -103,7 +103,7 @@ class ParquetFileWriterSuite extends AnyFunSuite
103103
)
104104
},
105105
// Test cases reading and writing only a subset of data passing a predicate.
106-
Seq((200, 26), (1024, 7), (1048576, 1)).map {
106+
Seq((200, 26), (1024, 6), (1048576, 1)).map {
107107
case (targetFileSize, expParquetFileCount) =>
108108
(
109109
"write filtered all types (no stats)", // test name
@@ -118,7 +118,7 @@ class ParquetFileWriterSuite extends AnyFunSuite
118118
)
119119
},
120120
// Test cases reading and writing all types of data WITH stats collection
121-
Seq((200, 67), (1024, 17), (1048576, 1)).map {
121+
Seq((200, 67), (1024, 16), (1048576, 1)).map {
122122
case (targetFileSize, expParquetFileCount) =>
123123
(
124124
"write all types (with stats for all leaf-level columns)", // test name
@@ -128,11 +128,11 @@ class ParquetFileWriterSuite extends AnyFunSuite
128128
200, /* expected number of rows written to Parquet files */
129129
Option.empty[Predicate], // predicate for filtering what rows to write to parquet files
130130
leafLevelPrimitiveColumns(Seq.empty, tableSchema(goldenTablePath("parquet-all-types"))),
131-
14 // how many columns have the stats collected from given list above
131+
15 // how many columns have the stats collected from given list above
132132
)
133133
},
134134
// Test cases reading and writing all types of data with a partial column set stats collection
135-
Seq((200, 67), (1024, 17), (1048576, 1)).map {
135+
Seq((200, 67), (1024, 16), (1048576, 1)).map {
136136
case (targetFileSize, expParquetFileCount) =>
137137
(
138138
"write all types (with stats for a subset of leaf-level columns)", // test name
@@ -146,7 +146,6 @@ class ParquetFileWriterSuite extends AnyFunSuite
146146
new Column("DateType"),
147147
new Column(Array("nested_struct", "aa")),
148148
new Column(Array("nested_struct", "ac", "aca")),
149-
new Column("TimestampType"), // stats are not collected for timestamp type YET.
150149
new Column(Array("nested_struct", "ac")), // stats are not collected for struct types
151150
new Column("nested_struct"), // stats are not collected for struct types
152151
new Column("array_of_prims"), // stats are not collected for array types
@@ -360,7 +359,6 @@ class ParquetFileWriterSuite extends AnyFunSuite
360359
.flatMap { statColumn =>
361360
val dataType = DefaultKernelUtils.getDataType(fileDataSchema, statColumn)
362361
dataType match {
363-
case _: TimestampType => nullStats // not yet supported
364362
case _: StructType => nullStats // no concept of stats for struct types
365363
case _: ArrayType => nullStats // no concept of stats for array types
366364
case _: MapType => nullStats // no concept of stats for map types

0 commit comments

Comments
 (0)