diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/PaginatedScan.java b/kernel/kernel-api/src/main/java/io/delta/kernel/PaginatedScan.java new file mode 100644 index 00000000000..34bd9a9d7a8 --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/PaginatedScan.java @@ -0,0 +1,17 @@ +package io.delta.kernel; + +import io.delta.kernel.data.ColumnarBatch; +import io.delta.kernel.data.FilteredColumnarBatch; +import io.delta.kernel.data.Row; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.utils.CloseableIterator; + +public interface PaginatedScan extends Scan { + + @Override + CloseableIterator getScanFiles(Engine engine); + + Row getNewPageToken(); + + ColumnarBatch getTombStoneHashsets(); +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/ScanBuilder.java b/kernel/kernel-api/src/main/java/io/delta/kernel/ScanBuilder.java index d356eae8952..3d2dabe9c19 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/ScanBuilder.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/ScanBuilder.java @@ -17,6 +17,8 @@ package io.delta.kernel; import io.delta.kernel.annotation.Evolving; +import io.delta.kernel.data.ColumnarBatch; +import io.delta.kernel.data.Row; import io.delta.kernel.engine.Engine; import io.delta.kernel.expressions.Predicate; import io.delta.kernel.types.StructType; @@ -65,4 +67,10 @@ public interface ScanBuilder { /** @return Build the {@link Scan instance} */ Scan build(); + + /** Build a Paginated Scan here */ + PaginatedScan buildPaginatedScan(Row pageToken, long pageSize); + + /** Build a Paginated Scan with optional tombstone hashsets injected * */ + PaginatedScan buildPaginatedScan(Row pageToken, long pageSize, ColumnarBatch tombstone); } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/data/FilteredColumnarBatch.java b/kernel/kernel-api/src/main/java/io/delta/kernel/data/FilteredColumnarBatch.java index 807cb0189b7..cc4a8d3bd67 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/data/FilteredColumnarBatch.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/data/FilteredColumnarBatch.java @@ -37,10 +37,25 @@ public class FilteredColumnarBatch { private final ColumnarBatch data; private final Optional selectionVector; + private final Optional numOfTrueRows; + private final Optional fileName; // which file this batch belongs to + + public FilteredColumnarBatch( + ColumnarBatch data, + Optional selectionVector, + long numOfTrueRows, + String fileName) { + this.data = data; + this.selectionVector = selectionVector; + this.numOfTrueRows = Optional.of(numOfTrueRows); + this.fileName = Optional.ofNullable(fileName); + } public FilteredColumnarBatch(ColumnarBatch data, Optional selectionVector) { this.data = data; this.selectionVector = selectionVector; + this.numOfTrueRows = Optional.empty(); + this.fileName = Optional.empty(); } /** @@ -64,6 +79,14 @@ public Optional getSelectionVector() { return selectionVector; } + public Long getNumOfTrueRows() { + return this.numOfTrueRows.get(); + } + + public String getFileName() { + return this.fileName.get(); + } + /** * Iterator of rows that survived the filter. * diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/engine/ParquetHandler.java b/kernel/kernel-api/src/main/java/io/delta/kernel/engine/ParquetHandler.java index 4ff13aee7a0..518359596a0 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/engine/ParquetHandler.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/engine/ParquetHandler.java @@ -20,6 +20,7 @@ import io.delta.kernel.data.*; import io.delta.kernel.expressions.Column; import io.delta.kernel.expressions.Predicate; +import io.delta.kernel.internal.util.Tuple2; import io.delta.kernel.types.StructField; import io.delta.kernel.types.StructType; import io.delta.kernel.utils.*; @@ -56,9 +57,9 @@ public interface ParquetHandler { * @param predicate Optional predicate which the Parquet reader can optionally use to prune rows * that don't satisfy the predicate. Because pruning is optional and may be incomplete, caller * is still responsible apply the predicate on the data returned by this method. - * @return an iterator of {@link ColumnarBatch}s containing the data in columnar format. It is the - * responsibility of the caller to close the iterator. The data returned is in the same as the - * order of files given in {@code scanFileIter}. + * @return an iterator of {@link Tuple2} containing the filename and {@link ColumnarBatch} pairs + * in columnar format. It is the responsibility of the caller to close the iterator. The data + * returned is in the same as the order of files given in {@code scanFileIter}. * @throws IOException if an I/O error occurs during the read. */ CloseableIterator readParquetFiles( @@ -67,6 +68,43 @@ CloseableIterator readParquetFiles( Optional predicate) throws IOException; + /** + * Read the Parquet format files at the given locations and return the data as tuples of filename + * and {@link ColumnarBatch} with the columns requested by {@code physicalSchema}. + * + *

This method is similar to {@link #readParquetFiles(CloseableIterator, StructType, Optional)} + * but returns tuples where the first element is the filename and the second element is the + * columnar batch from that file. + * + *

If {@code physicalSchema} has a {@link StructField} with column name {@link + * StructField#METADATA_ROW_INDEX_COLUMN_NAME} and the field is a metadata column {@link + * StructField#isMetadataColumn()} the column must be populated with the file row index. + * + *

How does a column in {@code physicalSchema} match to the column in the Parquet file? If the + * {@link StructField} has a field id in the {@code metadata} with key `parquet.field.id` the + * column is attempted to match by id. If the column is not found by id, the column is matched by + * name. When trying to find the column in Parquet by name, first case-sensitive match is used. If + * not found then a case-insensitive match is attempted. + * + * @param fileIter Iterator of files to read data from. + * @param physicalSchema Select list of columns to read from the Parquet file. + * @param predicate Optional predicate which the Parquet reader can optionally use to prune rows + * that don't satisfy the predicate. Because pruning is optional and may be incomplete, caller + * is still responsible apply the predicate on the data returned by this method. + * @return an iterator of {@link Tuple2} containing filename and {@link ColumnarBatch} pairs in + * columnar format. It is the responsibility of the caller to close the iterator. The data + * returned is in the same as the order of files given in {@code fileIter}. + * @throws IOException if an I/O error occurs during the read. + */ + default CloseableIterator> readParquetFiles2( + CloseableIterator fileIter, + StructType physicalSchema, + Optional predicate) + throws IOException { + throw new UnsupportedOperationException( + "readParquetFiles2 is not supported by this implementation"); + } + /** * Write the given data batches to a Parquet files. Try to keep the Parquet file size to given * size. If the current file exceeds this size close the current file and start writing to a new diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/PaginatedScanImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/PaginatedScanImpl.java new file mode 100644 index 00000000000..4944a10a9a7 --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/PaginatedScanImpl.java @@ -0,0 +1,107 @@ +package io.delta.kernel.internal; + +import io.delta.kernel.PaginatedScan; +import io.delta.kernel.data.ColumnarBatch; +import io.delta.kernel.data.FilteredColumnarBatch; +import io.delta.kernel.data.Row; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.expressions.Predicate; +import io.delta.kernel.internal.actions.Metadata; +import io.delta.kernel.internal.actions.Protocol; +import io.delta.kernel.internal.fs.Path; +import io.delta.kernel.internal.replay.*; +import io.delta.kernel.metrics.SnapshotReport; +import io.delta.kernel.types.StructType; +import io.delta.kernel.utils.CloseableIterator; +import java.util.HashSet; +import java.util.Optional; + +public class PaginatedScanImpl implements PaginatedScan { + + private final long pageSize; + private final PageToken pageToken; + private final ScanImpl baseScan; + private PaginatedAddFilesIterator paginatedIter; + + public PaginatedScanImpl( + StructType snapshotSchema, + StructType readSchema, + Protocol protocol, + Metadata metadata, + LogReplay logReplay, + Optional filter, + Path dataPath, + SnapshotReport snapshotReport, + Row pageTokenInRow, + long pageSize) { + baseScan = + new ScanImpl( + snapshotSchema, + readSchema, + protocol, + metadata, + logReplay, + filter, + dataPath, + snapshotReport); + assert pageTokenInRow != null; + this.pageToken = decodePageToken(pageTokenInRow); + // TODO: validation 1. LopReplay.getLogSegment() 2. data Path 3. snapshotReport.getVersion(); + // [maybe can use snapshotReport.getCheckpointVersion()] + // TODO: not sure how to check: 1. predicate(filter) 2. Kernel Version ID + this.pageSize = pageSize; + } + + @Override + public CloseableIterator getScanFiles(Engine engine) { + return this.getScanFiles(engine, false); + } + + @Override + public Optional getRemainingFilter() { + return baseScan.getRemainingFilter(); + } + + @Override + public Row getScanState(Engine engine) { + return baseScan.getScanState(engine); + } + + public CloseableIterator getScanFiles( + Engine engine, boolean includeStates) { + // TODO: update code here + boolean isHashSetCached = false; + HashSet tombstonesFromJson = new HashSet<>(); + HashSet addFilesFromJson = new HashSet<>(); + + PaginationContext paginationContext = + new PaginationContext( + pageToken.getStartingFileName(), + pageToken.getRowIndex(), + pageToken.getSidecarIdx(), + pageSize, + isHashSetCached, + tombstonesFromJson, + addFilesFromJson); + + CloseableIterator scanFileIter = + baseScan.getScanFiles(engine, includeStates, paginationContext); + this.paginatedIter = new PaginatedAddFilesIterator(scanFileIter, paginationContext); + return paginatedIter; + } + + // TODO: implement following methods + private PageToken decodePageToken(Row pageTokenInRow) { + return PageToken.fromRow(pageTokenInRow); + } + + @Override + public Row getNewPageToken() { + return paginatedIter.getNewPageToken().getRow(); + } + + @Override + public ColumnarBatch getTombStoneHashsets() { + return null; + } +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/ScanBuilderImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/ScanBuilderImpl.java index 17e872dd0ef..d87928d333d 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/ScanBuilderImpl.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/ScanBuilderImpl.java @@ -16,8 +16,11 @@ package io.delta.kernel.internal; +import io.delta.kernel.PaginatedScan; import io.delta.kernel.Scan; import io.delta.kernel.ScanBuilder; +import io.delta.kernel.data.ColumnarBatch; +import io.delta.kernel.data.Row; import io.delta.kernel.expressions.Predicate; import io.delta.kernel.internal.actions.Metadata; import io.delta.kernel.internal.actions.Protocol; @@ -85,4 +88,24 @@ public Scan build() { dataPath, snapshotReport); } + + @Override + public PaginatedScan buildPaginatedScan(Row pageToken, long pageSize) { + return new PaginatedScanImpl( + snapshotSchema, + readSchema, + protocol, + metadata, + logReplay, + predicate, + dataPath, + snapshotReport, + pageToken, + pageSize); + } + + @Override + public PaginatedScan buildPaginatedScan(Row pageToken, long pageSize, ColumnarBatch tombstones) { + return null; + } } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/ScanImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/ScanImpl.java index b695717b607..2f12667a7dc 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/ScanImpl.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/ScanImpl.java @@ -28,12 +28,16 @@ import io.delta.kernel.expressions.*; import io.delta.kernel.internal.actions.Metadata; import io.delta.kernel.internal.actions.Protocol; +import io.delta.kernel.internal.annotation.VisibleForTesting; import io.delta.kernel.internal.data.ScanStateRow; import io.delta.kernel.internal.fs.Path; import io.delta.kernel.internal.metrics.ScanMetrics; import io.delta.kernel.internal.metrics.ScanReportImpl; import io.delta.kernel.internal.metrics.Timer; import io.delta.kernel.internal.replay.LogReplay; +import io.delta.kernel.internal.replay.LogReplayUtils; +import io.delta.kernel.internal.replay.PaginatedAddFilesIterator; +import io.delta.kernel.internal.replay.PaginationContext; import io.delta.kernel.internal.skipping.DataSkippingPredicate; import io.delta.kernel.internal.skipping.DataSkippingUtils; import io.delta.kernel.internal.util.*; @@ -105,6 +109,11 @@ public CloseableIterator getScanFiles(Engine engine) { return getScanFiles(engine, false); } + public CloseableIterator getScanFiles( + Engine engine, boolean includeStats) { + return getScanFiles(engine, includeStats, null); + } + /** * Get an iterator of data files in this version of scan that survived the predicate pruning. * @@ -117,8 +126,11 @@ public CloseableIterator getScanFiles(Engine engine) { * @param includeStats whether to read and include the JSON statistics * @return the surviving scan files as {@link FilteredColumnarBatch}s */ - public CloseableIterator getScanFiles( - Engine engine, boolean includeStats) { + protected CloseableIterator getScanFiles( + Engine engine, + boolean includeStats, + PaginationContext paginationContext) { // inject two hash set here + if (accessedScanFiles) { throw new IllegalStateException("Scan files are already fetched from this instance"); } @@ -165,7 +177,8 @@ public CloseableIterator getScanFiles( predicate -> rewritePartitionPredicateOnCheckpointFileSchema( predicate, partitionColToStructFieldMap.get())), - scanMetrics); + scanMetrics, + paginationContext); // Apply partition pruning scanFileIter = applyPartitionPruning(engine, scanFileIter); @@ -185,6 +198,35 @@ public CloseableIterator getScanFiles( } } + /** Only used for testing */ + @VisibleForTesting + public CloseableIterator getPaginatedScanFiles( + Engine engine, + long numOfAddFilesToSkip, + long pageSize, + String startingLogFileName, + long sidecarIdx) { + // fetch hashset here + boolean isHashSetCached = false; + HashSet tombstonesFromJson = new HashSet<>(); + HashSet addFilesFromJson = new HashSet<>(); + PaginationContext paginationContext = + new PaginationContext( + startingLogFileName, + numOfAddFilesToSkip, + sidecarIdx, + pageSize, + isHashSetCached, + tombstonesFromJson, + addFilesFromJson); + CloseableIterator scanFileIter = + getScanFiles(engine, false, paginationContext); + System.out.println("fetch the original iterator successfully"); + CloseableIterator paginatedIter = + new PaginatedAddFilesIterator(scanFileIter, paginationContext); + return paginatedIter; + } + @Override public Row getScanState(Engine engine) { // Physical equivalent of the logical read schema. diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ActionWrapper.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ActionWrapper.java index 274e30fc723..962da66040e 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ActionWrapper.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ActionWrapper.java @@ -25,13 +25,19 @@ public class ActionWrapper { private final long version; /* Timestamp of the commit file if isFromCheckpoint=false */ private final Optional timestamp; + private final String fileName; ActionWrapper( - ColumnarBatch data, boolean isFromCheckpoint, long version, Optional timestamp) { + ColumnarBatch data, + boolean isFromCheckpoint, + long version, + Optional timestamp, + String fileName) { this.columnarBatch = data; this.isFromCheckpoint = isFromCheckpoint; this.version = version; this.timestamp = timestamp; + this.fileName = fileName; } public ColumnarBatch getColumnarBatch() { @@ -49,4 +55,8 @@ public long getVersion() { public Optional getTimestamp() { return timestamp; } + + public String getFileName() { + return fileName; + } } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ActionsIterator.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ActionsIterator.java index f6bedc2db6a..7733b28580c 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ActionsIterator.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ActionsIterator.java @@ -85,12 +85,21 @@ public class ActionsIterator implements CloseableIterator { private boolean closed; + /** these variables are only for pagination use */ + private boolean isHashSetCached; + + private Optional startingLogFileName; + private long lastEmitSidecarIdx; + public ActionsIterator( Engine engine, List files, StructType deltaReadSchema, Optional checkpointPredicate) { - this(engine, files, deltaReadSchema, deltaReadSchema, checkpointPredicate); + this(engine, files, deltaReadSchema, deltaReadSchema, checkpointPredicate, null); + this.startingLogFileName = Optional.empty(); + this.isHashSetCached = false; + this.lastEmitSidecarIdx = -1; } public ActionsIterator( @@ -98,18 +107,48 @@ public ActionsIterator( List files, StructType deltaReadSchema, StructType checkpointReadSchema, - Optional checkpointPredicate) { + Optional checkpointPredicate, + PaginationContext paginationContext) { + + // Pagination logic + if (paginationContext != null) { + // Set the starting log file name if provided + this.startingLogFileName = Optional.ofNullable(paginationContext.startingLogFileName); + this.isHashSetCached = paginationContext.isHashSetCached; + this.lastEmitSidecarIdx = paginationContext.sidecarIdx; + } + this.engine = engine; this.checkpointPredicate = checkpointPredicate; this.filesList = new LinkedList<>(); - this.filesList.addAll( - files.stream().map(DeltaLogFile::forFileStatus).collect(Collectors.toList())); + if (paginationContext == null) { + this.filesList.addAll( + files.stream().map(DeltaLogFile::forFileStatus).collect(Collectors.toList())); + } else { + this.filesList.addAll( + files.stream() + .map(DeltaLogFile::forFileStatus) + .filter(this::paginatedFilter) + .collect(Collectors.toList())); + } this.deltaReadSchema = deltaReadSchema; this.checkpointReadSchema = checkpointReadSchema; this.actionsIter = Optional.empty(); this.schemaContainsAddOrRemoveFiles = LogReplay.containsAddOrRemoveFileActions(deltaReadSchema); } + private boolean paginatedFilter(DeltaLogFile nextLogFile) { + FileStatus nextFile = nextLogFile.getFile(); + Path nextFilePath = new Path(nextFile.getPath()); + String fileName = nextFilePath.getName(); + // no need to handle sidecar files here (sidecar files aren't in this file list) + if (nextLogFile.getLogType() == DeltaLogFile.LogType.V2_CHECKPOINT_MANIFEST) + return true; // never skip v2 checkpoint file + if (fileName.compareTo(startingLogFileName.get()) > 0 + && (isHashSetCached || nextLogFile.isCheckpointFile())) return false; // skip these files + return true; + } + @Override public boolean hasNext() { if (closed) { @@ -287,17 +326,24 @@ public ColumnarBatch extractSidecarsFromBatch( List outputFiles = new ArrayList<>(); int sidecarIndex = columnarBatch.getSchema().fieldNames().indexOf(LogReplay.SIDECAR_FIELD_NAME); ColumnVector sidecarVector = columnarBatch.getColumnVector(sidecarIndex); + if (startingLogFileName.isPresent() && lastEmitSidecarIdx == -1) + lastEmitSidecarIdx = 0; // check if pagination is enabled + int sidecarCnt = 0; //sidecar idx starts from 1 for (int i = 0; i < columnarBatch.getSize(); i++) { SidecarFile sidecarFile = SidecarFile.fromColumnVector(sidecarVector, i); if (sidecarFile == null) { continue; } + sidecarCnt++; + if (sidecarCnt < lastEmitSidecarIdx) { + System.out.println("Skipping sidecar file " + sidecarFile.getPath()); + continue; + } FileStatus sideCarFileStatus = FileStatus.of( FileNames.sidecarFile(deltaLogPath, sidecarFile.getPath()), sidecarFile.getSizeInBytes(), sidecarFile.getModificationTime()); - filesList.add(DeltaLogFile.ofSideCar(sideCarFileStatus, checkpointVersion)); } @@ -312,10 +358,14 @@ public ColumnarBatch extractSidecarsFromBatch( *

Requires that `filesList.isEmpty` is false. */ private CloseableIterator getNextActionsIter() { - final DeltaLogFile nextLogFile = filesList.pop(); - final FileStatus nextFile = nextLogFile.getFile(); - final Path nextFilePath = new Path(nextFile.getPath()); - final String fileName = nextFilePath.getName(); + // TODO: remove codes here + System.out.println("current file list size is: " + filesList.size()); + DeltaLogFile nextLogFile = filesList.pop(); + FileStatus nextFile = nextLogFile.getFile(); + Path nextFilePath = new Path(nextFile.getPath()); + String fileName = nextFilePath.getName(); + + // if the tombstone set is cached, skip reading JSON files as well try { switch (nextLogFile.getLogType()) { case COMMIT: @@ -338,7 +388,8 @@ private CloseableIterator getNextActionsIter() { CloseableIterator dataIter = getActionsIterFromSinglePartOrV2Checkpoint(nextFile, fileName); long version = checkpointVersion(nextFilePath); - return combine(dataIter, true /* isFromCheckpoint */, version, Optional.empty()); + return combine( + dataIter, true /* isFromCheckpoint */, version, Optional.empty(), fileName); } case MULTIPART_CHECKPOINT: case SIDECAR: @@ -348,20 +399,17 @@ private CloseableIterator getNextActionsIter() { // optimizations like reading multiple files in parallel. CloseableIterator checkpointFiles = retrieveRemainingCheckpointFiles(nextLogFile); - CloseableIterator dataIter = - wrapEngineExceptionThrowsIO( - () -> - engine - .getParquetHandler() - .readParquetFiles( - checkpointFiles, deltaReadSchema, checkpointPredicate), - "Reading checkpoint sidecars [%s] with readSchema=%s and predicate=%s", - checkpointFiles, - deltaReadSchema, - checkpointPredicate); - + CloseableIterator> tupleIter = + engine + .getParquetHandler() + .readParquetFiles2(checkpointFiles, deltaReadSchema, checkpointPredicate); long version = checkpointVersion(nextFilePath); - return combine(dataIter, true /* isFromCheckpoint */, version, Optional.empty()); + // add file name to action wrapper + return combine( + tupleIter, + true /* isFromCheckpoint */, + version, + Optional.empty()); // this problem needs to be fixed. } default: throw new IOException("Unrecognized log type: " + nextLogFile.getLogType()); @@ -393,7 +441,8 @@ private CloseableIterator readCommitOrCompactionFile( dataIter, false /* isFromCheckpoint */, fileVersion, - Optional.of(nextFile.getModificationTime()) /* timestamp */); + Optional.of(nextFile.getModificationTime()) /* timestamp */, + new Path(nextFile.getPath()).getName()); } /** @@ -402,9 +451,11 @@ private CloseableIterator readCommitOrCompactionFile( * set when the input file is not a Checkpoint. The timestamp will be set to be the * inCommitTimestamp of the delta file when available, otherwise it will be the modification time * of the file. + * + *

Input: takes in an iterator of tuples */ private CloseableIterator combine( - CloseableIterator fileReadDataIter, + CloseableIterator> fileReadDataIter, boolean isFromCheckpoint, long version, Optional timestamp) { @@ -414,6 +465,62 @@ private CloseableIterator combine( // enabled, we will read the first batch and try to extract the timestamp from it. // We also ensure that rewoundFileReadDataIter is identical to the original // fileReadDataIter before any data was consumed. + + final CloseableIterator> rewoundFileReadDataIter; + Optional inCommitTimestampOpt = Optional.empty(); + if (!isFromCheckpoint && fileReadDataIter.hasNext()) { + Tuple2 firstBatch = fileReadDataIter.next(); + rewoundFileReadDataIter = singletonCloseableIterator(firstBatch).combine(fileReadDataIter); + inCommitTimestampOpt = InCommitTimestampUtils.tryExtractInCommitTimestamp(firstBatch._2); + } else { + rewoundFileReadDataIter = fileReadDataIter; + } + + final Optional finalResolvedCommitTimestamp = + inCommitTimestampOpt.isPresent() ? inCommitTimestampOpt : timestamp; + + return new CloseableIterator() { + @Override + public boolean hasNext() { + return rewoundFileReadDataIter.hasNext(); + } + + @Override + public ActionWrapper next() { + Tuple2 nextBatch = rewoundFileReadDataIter.next(); + return new ActionWrapper( + nextBatch._2, isFromCheckpoint, version, finalResolvedCommitTimestamp, nextBatch._1); + } + + @Override + public void close() throws IOException { + fileReadDataIter.close(); + } + }; + } + + /** + * Takes an input iterator of actions read from the file and metadata about the file read, and + * combines it to return an Iterator. The timestamp in the ActionWrapper is only + * set when the input file is not a Checkpoint. The timestamp will be set to be the + * inCommitTimestamp of the delta file when available, otherwise it will be the modification time + * of the file. + * + *

input: takes in an iterator of ColumnarBatch + */ + private CloseableIterator combine( + CloseableIterator fileReadDataIter, + boolean isFromCheckpoint, + long version, + Optional timestamp, + String fileName) { + // For delta files, we want to use the inCommitTimestamp from commitInfo + // as the commit timestamp for the file. + // Since CommitInfo should be the first action in the delta when inCommitTimestamp is + // enabled, we will read the first batch and try to extract the timestamp from it. + // We also ensure that rewoundFileReadDataIter is identical to the original + // fileReadDataIter before any data was consumed. + final CloseableIterator rewoundFileReadDataIter; Optional inCommitTimestampOpt = Optional.empty(); if (!isFromCheckpoint && fileReadDataIter.hasNext()) { @@ -438,7 +545,8 @@ public ActionWrapper next() { rewoundFileReadDataIter.next(), isFromCheckpoint, version, - finalResolvedCommitTimestamp); + finalResolvedCommitTimestamp, + fileName); } @Override @@ -475,7 +583,6 @@ private CloseableIterator retrieveRemainingCheckpointFiles( peek = filesList.peek(); } } - return toCloseableIterator(checkpointFiles.iterator()); } } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ActiveAddFilesIterator.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ActiveAddFilesIterator.java index 5fd4a317ae3..f22c0983bba 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ActiveAddFilesIterator.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ActiveAddFilesIterator.java @@ -78,15 +78,32 @@ public class ActiveAddFilesIterator implements CloseableIterator iter, Path tableRoot, ScanMetrics metrics) { + this(engine, iter, tableRoot, metrics, null); + } + + // pass in a hash set here, as an optional parameter + // if no value is passed -> create a new hashset; otherwise use the hashset injected + ActiveAddFilesIterator( + Engine engine, + CloseableIterator iter, + Path tableRoot, + ScanMetrics metrics, + PaginationContext paginationContext) { this.engine = engine; this.tableRoot = tableRoot; this.iter = iter; - this.tombstonesFromJson = new HashSet<>(); - this.addFilesFromJson = new HashSet<>(); this.next = Optional.empty(); this.metrics = metrics; + if (paginationContext.isHashSetCached) { + this.tombstonesFromJson = paginationContext.tombstonesFromJson; + this.addFilesFromJson = paginationContext.addFilesFromJson; + } else { + this.tombstonesFromJson = new HashSet<>(); + this.addFilesFromJson = new HashSet<>(); + } } @Override @@ -154,7 +171,7 @@ private void prepareNext() { final ActionWrapper _next = iter.next(); final ColumnarBatch addRemoveColumnarBatch = _next.getColumnarBatch(); final boolean isFromCheckpoint = _next.isFromCheckpoint(); - + final String fileName = _next.getFileName(); // Step 1: Update `tombstonesFromJson` with all the RemoveFiles in this columnar batch, if // and only if this batch is not from a checkpoint. // @@ -188,7 +205,7 @@ private void prepareNext() { selectionVectorBuffer = prepareSelectionVectorBuffer(selectionVectorBuffer, addsVector.getSize()); boolean atLeastOneUnselected = false; - + long numOfTrueRows = 0L; for (int rowId = 0; rowId < addsVector.getSize(); rowId++) { if (addsVector.isNullAt(rowId)) { atLeastOneUnselected = true; @@ -222,6 +239,7 @@ private void prepareNext() { if (!alreadyDeleted) { doSelect = true; selectionVectorBuffer[rowId] = true; + numOfTrueRows++; metrics.activeAddFilesCounter.increment(); } } else { @@ -275,7 +293,13 @@ private void prepareNext() { .createSelectionVector(selectionVectorBuffer, 0, addsVector.getSize()), "Create selection vector for selected scan files")); } - next = Optional.of(new FilteredColumnarBatch(scanAddFiles, selectionColumnVector)); + next = + Optional.of( + new FilteredColumnarBatch( + scanAddFiles, + selectionColumnVector, + numOfTrueRows, + fileName)); // add file name here } public static String getAddFilePath(ColumnVector addFileVector, int rowId) { diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/DeltaLogFile.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/DeltaLogFile.java index 2b9cef075de..323b75270d2 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/DeltaLogFile.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/DeltaLogFile.java @@ -90,4 +90,10 @@ public LogType getLogType() { public long getVersion() { return version; } + + public boolean isCheckpointFile() { + return (logType == LogType.SIDECAR + || logType == LogType.MULTIPART_CHECKPOINT + || logType == LogType.CHECKPOINT_CLASSIC); + } } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java index 820956a114d..2c50d1e09ea 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java @@ -217,7 +217,8 @@ public CloseableIterator getAddFilesAsColumnarBatches( Engine engine, boolean shouldReadStats, Optional checkpointPredicate, - ScanMetrics scanMetrics) { + ScanMetrics scanMetrics, + PaginationContext paginationContext) { // We do not need to look at any `remove` files from the checkpoints. Skip the column to save // I/O. Note that we are still going to process the row groups. Adds and removes are randomly // scattered through checkpoint part files, so row group push down is unlikely to be useful. @@ -227,8 +228,10 @@ public CloseableIterator getAddFilesAsColumnarBatches( getLogReplayFiles(logSegment), getAddRemoveReadSchema(shouldReadStats), getAddReadSchema(shouldReadStats), - checkpointPredicate); - return new ActiveAddFilesIterator(engine, addRemoveIter, dataPath, scanMetrics); + checkpointPredicate, + paginationContext); + return new ActiveAddFilesIterator( + engine, addRemoveIter, dataPath, scanMetrics, paginationContext); } //////////////////// diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/PageToken.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/PageToken.java new file mode 100644 index 00000000000..31ba5db8827 --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/PageToken.java @@ -0,0 +1,63 @@ +package io.delta.kernel.internal.replay; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.delta.kernel.data.Row; +import io.delta.kernel.types.*; + +public class PageToken { + /** Variables to know where last page ends (current page starts) */ + private final String startingFileName; + + private final long rowIndex; + private final long sidecarIdx; + + /** Variables for validating query params */ + private final long logSegmentHash; + + private static final ObjectMapper MAPPER = new ObjectMapper(); + + public PageToken(String startingFileName, long rowIndex, long sidecarIdx, long logSegmentHash) { + this.startingFileName = startingFileName; + this.rowIndex = rowIndex; + this.logSegmentHash = sidecarIdx; + this.sidecarIdx = logSegmentHash; + } + + public String getStartingFileName() { + return startingFileName; + } + + public long getRowIndex() { + return rowIndex; + } + + public long getSidecarIdx() { + return sidecarIdx; + } + + public long getLogSegmentHash() { + return logSegmentHash; + } + + /** Convert PageToken to a Kernel Row object. */ + public Row getRow() { + StructType schema = + new StructType() + .add("fileName", StringType.STRING) + .add("rowIndex", LongType.LONG) + .add("sidecarIdx", LongType.LONG) + .add("logSegmentHash", LongType.LONG); + // return Utils.newRow(schema, startingFileName, rowIndex, logSegmentHash); + // TODO: make this schema into a Row Type + return null; + } + + /** Create a PageToken from a Row object */ + public static PageToken fromRow(Row row) { + String fileName = row.getString(0); + long rowIdx = row.getLong(1); + long sideCarIdx = row.getLong(2); + long logsSegmentHash = row.getLong(3); + return new PageToken(fileName, rowIdx, sideCarIdx, logsSegmentHash); + } +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/PaginatedAddFilesIterator.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/PaginatedAddFilesIterator.java new file mode 100644 index 00000000000..6a5f1a415b8 --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/PaginatedAddFilesIterator.java @@ -0,0 +1,140 @@ +package io.delta.kernel.internal.replay; + +import io.delta.kernel.data.FilteredColumnarBatch; +import io.delta.kernel.utils.CloseableIterator; +import java.io.Closeable; +import java.io.IOException; +import java.util.Iterator; +import java.util.NoSuchElementException; + +/** + * We don't return partial batch. If page size is returned, we terminate pagination early. We use + * number of batches to skip as page token. + */ +public class PaginatedAddFilesIterator implements CloseableIterator { + + private final Iterator originalIterator; + private final long numAddFilesToSkip; // how many active add files to skip + private final long pageSize; // max files to return in this page + + private long numAddFilesRead = 0; + private long numAddFilesReturned = 0; + private final String startingLogFileName; + private String lastLogFileName; // name of the last log file read + private long sidecarIdx; + + private FilteredColumnarBatch nextBatch = null; // next batch ready to return + + public PaginatedAddFilesIterator( + Iterator originalIterator, PaginationContext paginationContext) { + this.originalIterator = originalIterator; + this.numAddFilesToSkip = paginationContext.rowIdx; + this.startingLogFileName = paginationContext.startingLogFileName; + this.pageSize = paginationContext.pageSize; + this.lastLogFileName = startingLogFileName; // name of last log file we read + // this.isHashSetCached = paginationContext.isHashSetCached; + this.sidecarIdx = paginationContext.sidecarIdx; + } + + @Override + public boolean hasNext() { + if (nextBatch != null) { + return true; + } + if(numAddFilesReturned >= pageSize) { + return false; + } + while (originalIterator.hasNext()) { + FilteredColumnarBatch batch = originalIterator.next(); // + String fileName = batch.getFileName(); + if (!fileName.equals(lastLogFileName)) { + System.out.println("reading " + fileName); + if (fileName.endsWith(".parquet") && sidecarIdx != -1) + sidecarIdx++; // keep track of sidecarIdx + lastLogFileName = fileName; + numAddFilesRead = 0; + } + long numActiveAddFiles = + batch.getNumOfTrueRows(); // lower long: primitive / Long: box primitive - this can be + // null; long is preferred. + // TODO: change number of AddFiles Skipped to rowNum + long rowNum = batch.getData().getSize(); // number of rows, if 5 AddFile and 7 RemoveFile -> this is 12. + + System.out.println("numActiveAddFiles: " + numActiveAddFiles); + System.out.println("numTotalAddFiles: " + batch.getData().getColumnVector(0).getSize()); + + /** + * if sideIdx isn't -1 & current file to read is checkpoint file: sidecarIdx ++ return + * sidecarIdx; + */ + // if hash set is cached, first read file must be starting log file + // TODO: how to correctly skip batches in V2 checkpoint files? + if (sidecarIdx == -1 && startingLogFileName.compareTo(fileName) < 0 ) { + // skip whole batch + numAddFilesRead += numActiveAddFiles; + } else if (startingLogFileName.equals(fileName) + && numAddFilesRead + numActiveAddFiles <= numAddFilesToSkip) { + // skip whole batch + numAddFilesRead += numActiveAddFiles; + } else if (numAddFilesReturned + numActiveAddFiles >= pageSize) { + System.out.println("pagination comes to an end"); + // This is the last batch to read. + nextBatch = batch; + numAddFilesReturned += numActiveAddFiles; + numAddFilesRead += numActiveAddFiles; + System.out.println("numAddFilesReturned: " + numAddFilesReturned); + // terminate current pagination + return true; + } else if (startingLogFileName.equals(fileName) + && numAddFilesRead + numActiveAddFiles > numAddFilesToSkip + && numAddFilesRead < numAddFilesToSkip) { + // very special case: part files of current batch has been read. + // this happens only if batch size changes between pagination request + System.out.println("batch size changes between pagination request"); + numAddFilesRead += numActiveAddFiles; + long numAddFilesToReturnInBatch = (numAddFilesRead + numActiveAddFiles - numAddFilesToSkip); + numAddFilesReturned += numAddFilesToReturnInBatch; + // TODO: unselect some rows here + nextBatch = batch; + } else { + // no skipping needed, return full batch + nextBatch = batch; + numAddFilesReturned += numActiveAddFiles; + numAddFilesRead += numActiveAddFiles; + System.out.println("numAddFilesReturned: " + numAddFilesReturned); + return true; + } + System.out.println("------------------------read one batch -------------------"); + } + System.out.println("no batches are left"); + return false; + } + + @Override + public FilteredColumnarBatch next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + FilteredColumnarBatch result = nextBatch; + nextBatch = null; + return result; + } + + @Override + public void close() throws IOException { + // Close original iterator if it supports close (if applicable) + if (originalIterator instanceof Closeable) { + ((Closeable) originalIterator).close(); + } + } + + public PageToken getNewPageToken() { + // return sidecarIdx-1 because first file is re-read. + // check if hasNext() is false? if it's true, throw exception? + if (hasNext()) { + // TODO: what exception should be thrown here? + throw new RuntimeException("Page token is unavailable at this point!!!"); + } + return new PageToken(lastLogFileName, numAddFilesRead, sidecarIdx, -1); + } +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/PaginationContext.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/PaginationContext.java new file mode 100644 index 00000000000..901f1a82ffb --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/PaginationContext.java @@ -0,0 +1,34 @@ +package io.delta.kernel.internal.replay; + +import java.util.HashSet; + +public class PaginationContext { + // TODO: wrap all these into a PageToken object + public final String startingLogFileName; + public final long rowIdx; + public final long sidecarIdx; + public final long pageSize; + public final boolean isHashSetCached; + public final HashSet tombstonesFromJson; + public final HashSet addFilesFromJson; + + public PaginationContext( + String startingLogFileName, + long rowIdx, + long sidecarIdx, + long pageSize, + boolean isHashSetCached, + HashSet tombstonesFromJson, + HashSet addFilesFromJson) { + this.startingLogFileName = startingLogFileName; + this.rowIdx = rowIdx; + this.sidecarIdx = sidecarIdx; + this.isHashSetCached = isHashSetCached; + this.tombstonesFromJson = tombstonesFromJson; + this.addFilesFromJson = addFilesFromJson; + this.pageSize = pageSize; + } + + public static final PaginationContext EMPTY = + new PaginationContext(null, 0, 0, 0, false, null, null); +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java index 75bac6a4f22..10bb52e67a6 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java @@ -78,6 +78,9 @@ public Snapshot buildLatestSnapshot(Engine engine, SnapshotQueryContext snapshot final LogSegment logSegment = getLogSegmentForVersion(engine, Optional.empty() /* versionToLoad */); + // compare hash value in page token to new hash value computed + // throw exception if the log segment has changed & no cache is saved + snapshotContext.setVersion(logSegment.getVersion()); snapshotContext.setCheckpointVersion(logSegment.getCheckpointVersionOpt()); @@ -99,12 +102,27 @@ public SnapshotImpl getSnapshotAt( final LogSegment logSegment = getLogSegmentForVersion(engine, Optional.of(version) /* versionToLoadOpt */); + // compare hash value in page token to new hash value computed + // throw exception if the log segment has changed & no cache is saved + snapshotContext.setCheckpointVersion(logSegment.getCheckpointVersionOpt()); snapshotContext.setVersion(logSegment.getVersion()); return createSnapshot(logSegment, engine, snapshotContext); } + // for pagination P2 req: server calls this function if log segment is cached. + // tip: this function can be used as a helper for two functions above (to reduce duplicate code) + public Snapshot buildSnapshotWithLogSegment( + Engine engine, SnapshotQueryContext snapshotContext, LogSegment logSegment) + throws TableNotFoundException { + + snapshotContext.setVersion(logSegment.getVersion()); + snapshotContext.setCheckpointVersion(logSegment.getCheckpointVersionOpt()); + + return createSnapshot(logSegment, engine, snapshotContext); + } + /** * Construct the snapshot for the given table at the provided timestamp. * @@ -194,6 +212,7 @@ private SnapshotImpl createSnapshot( // Note: LogReplay now loads the protocol and metadata (P & M) lazily. Nonetheless, SnapshotImpl // is still constructed with an "eagerly"-loaded P & M. + /** maybe compare the init segment with the cached log segment here */ LogReplay logReplay = new LogReplay( tablePath, diff --git a/kernel/kernel-api/src/test/scala/io/delta/kernel/test/MockEngineUtils.scala b/kernel/kernel-api/src/test/scala/io/delta/kernel/test/MockEngineUtils.scala index 035ef00f5b1..8c1e6f438d1 100644 --- a/kernel/kernel-api/src/test/scala/io/delta/kernel/test/MockEngineUtils.scala +++ b/kernel/kernel-api/src/test/scala/io/delta/kernel/test/MockEngineUtils.scala @@ -19,12 +19,13 @@ import java.io.ByteArrayInputStream import java.util import java.util.Optional -import io.delta.kernel.data.{ColumnarBatch, ColumnVector, FilteredColumnarBatch, Row} +import io.delta.kernel.data.{ColumnVector, ColumnarBatch, FilteredColumnarBatch, Row} import io.delta.kernel.engine._ import io.delta.kernel.expressions.{Column, Expression, ExpressionEvaluator, Predicate, PredicateEvaluator} import io.delta.kernel.internal.actions.CommitInfo import io.delta.kernel.internal.fs.Path -import io.delta.kernel.internal.util.{FileNames, Utils} +import io.delta.kernel.internal.util.FileNames +import io.delta.kernel.internal.util.Utils import io.delta.kernel.types.{DataType, StructType} import io.delta.kernel.utils.{CloseableIterator, DataFileStatus, FileStatus} diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/DefaultParquetHandler.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/DefaultParquetHandler.java index c58085d4e95..ffb25b7aea8 100644 --- a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/DefaultParquetHandler.java +++ b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/DefaultParquetHandler.java @@ -23,6 +23,8 @@ import io.delta.kernel.engine.ParquetHandler; import io.delta.kernel.expressions.Column; import io.delta.kernel.expressions.Predicate; +import io.delta.kernel.internal.fs.Path; +import io.delta.kernel.internal.util.Tuple2; import io.delta.kernel.internal.util.Utils; import io.delta.kernel.types.StructType; import io.delta.kernel.utils.*; @@ -123,4 +125,60 @@ public void writeParquetFileAtomically( Utils.closeCloseables(data); } } + + @Override + public CloseableIterator> readParquetFiles2( + CloseableIterator fileIter, + StructType physicalSchema, + Optional predicate) + throws IOException { + return new CloseableIterator>() { + private final ParquetFileReader batchReader = new ParquetFileReader(fileIO); + private CloseableIterator currentFileReader; + private String currentFileName; + + @Override + public void close() throws IOException { + Utils.closeCloseables(currentFileReader, fileIter); + } + + @Override + public boolean hasNext() { + + if (currentFileReader != null && currentFileReader.hasNext()) { + + // System.out.println("in has next: current file still has batches" + currentFileName); + return true; + } else { + // There is no file in reading or the current file being read has no more data. + // Initialize the next file reader or return false if there are no more files to + // read. + Utils.closeCloseables(currentFileReader); + currentFileReader = null; + currentFileName = null; + if (fileIter.hasNext()) { + FileStatus fileStatus = fileIter.next(); + currentFileName = new Path(fileStatus.getPath()).getName(); + // System.out.println("current file name: " + currentFileName); + currentFileReader = + batchReader.read( + fileStatus, physicalSchema, predicate); // where real reading happens + return hasNext(); // recurse since it's possible the loaded file is empty + } else { + // System.out.println("no files are left"); + return false; + } + } + } + + @Override + public Tuple2 next() { + boolean isLastBatchFile = false; + if (!currentFileReader.hasNext()) { + isLastBatchFile = true; + } + return new Tuple2<>(currentFileName, currentFileReader.next()); + } + }; + } } diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/ScanSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/ScanSuite.scala index 271809f54c5..564ef824d65 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/ScanSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/ScanSuite.scala @@ -35,12 +35,15 @@ import io.delta.kernel.engine.{Engine, JsonHandler, ParquetHandler} import io.delta.kernel.expressions._ import io.delta.kernel.expressions.Literal._ import io.delta.kernel.internal.{InternalScanFileUtils, ScanImpl, TableConfig} -import io.delta.kernel.internal.util.InternalUtils +import io.delta.kernel.internal.replay.PaginatedAddFilesIterator +import io.delta.kernel.internal.util.{InternalUtils, Tuple2} import io.delta.kernel.types._ import io.delta.kernel.types.IntegerType.INTEGER import io.delta.kernel.types.StringType.STRING import io.delta.kernel.utils.{CloseableIterator, FileStatus} import io.delta.kernel.utils.CloseableIterable.emptyIterable +import io.delta.kernel.internal.util.FileNames +import io.delta.kernel.internal.util import org.apache.spark.sql.delta.{DeltaConfigs, DeltaLog} @@ -50,6 +53,7 @@ import org.apache.spark.sql.catalyst.plans.SQLHelper import org.apache.spark.sql.types.{IntegerType => SparkIntegerType, StructField => SparkStructField, StructType => SparkStructType} import org.scalatest.funsuite.AnyFunSuite + class ScanSuite extends AnyFunSuite with TestUtils with ExpressionTestUtils with SQLHelper with DeltaTableWriteSuiteBase { @@ -1635,6 +1639,426 @@ class ScanSuite extends AnyFunSuite with TestUtils } } } + + ////////////////////////////////////////////////////////////////////////////////// + // Pagination tests for PaginatedScan + ////////////////////////////////////////////////////////////////////////////////// + + /** + * test I can read the second page -> use page token returned. + * werid boundary: where json file = page size ; where json file size > page size + * second file + * test the prototype can inject page token. + * What's the best representation of page token + * */ + + //TODO: test page size < JSON file size + //TODO: test page size = JSON file size (page size = 10) + test("getPaginatedScanFiles - basic pagination with single JSON file") { + withTempDir { tempDir => + /** + Creates a Delta table with 10 Parquet files + Each file contains 10 rows + _delta_log/00000000000000000000.json contains 10 AddFile actions — one per file + parquet 0: 0 - 9 + parquet 1: 10 -19 + parquet 9: 90 -99 + * */ + spark.range(0, 100, 1, 10).write.format("delta").save(tempDir.getCanonicalPath) + + val snapshot = latestSnapshot(tempDir.getCanonicalPath) + val scan = snapshot.getScanBuilder().build().asInstanceOf[ScanImpl] + + // Create a custom engine with batch size 5 + val hadoopConf = new org.apache.hadoop.conf.Configuration() + hadoopConf.set("delta.kernel.default.json.reader.batch-size", "5") + val customEngine = DefaultEngine.create(hadoopConf) + + // Try read first page (with size = 12) + val paginatedIter = scan.getPaginatedScanFiles(customEngine, 2, 6, + "00000000000000000000.json", -1) + val firstPageFiles = paginatedIter.asScala.toSeq + assert(firstPageFiles.nonEmpty, "First page should contain some files") + + // Verify we got at most 6 AddFiles across all batches + val fileCounts: Seq[Long] = firstPageFiles.map(_.getNumOfTrueRows.toLong) + val totalFileCountsReturned = fileCounts.sum + println(s"Num of Batches returned = ${fileCounts.length}") + + // Log additional pagination state info + val paginatedAddFilesIter = paginatedIter.asInstanceOf[PaginatedAddFilesIterator] + val nextFilesToSkip = paginatedAddFilesIter.getNewPageToken.getRowIndex() + val nextStartingFile = paginatedAddFilesIter.getNewPageToken.getStartingFileName() + + assert(totalFileCountsReturned <= 12, s"First page should contain at most 12 files, got $totalFileCountsReturned") + + println(s"nextFilesToSkip = ${nextFilesToSkip.toString}") + println(s"nextStartingFile = ${nextStartingFile.toString}") + println(s"Total Parquet Files fetched = ${totalFileCountsReturned.toString}") + + paginatedIter.close() + } + } + + + /** + * 1. check if files returned are within page limit + * 2. check if page token returned is correct. + * */ + + /** + * test I can read the second page -> use page token returned. + * werid boundary: where json file = page size ; where json file size > page size + * second file + * test the prototype can inject page token. + * What's the best representation of page token + * */ + //TODO: test page size < JSON file size + //TODO: test page size = JSON file size (page size = 10) + //TODO: test page size > JSON file size + test("getPaginatedScanFiles - basic pagination with multiple JSON files") { + withTempDir { tempDir => + val tablePath = tempDir.getCanonicalPath + + // Create multiple commits to generate multiple JSON files + // First commit: files 0-4 (5 files) + spark.range(0, 50, 1, 5).write.format("delta").save(tablePath) + + // Second commit: files 5-9 (5 more files) + spark.range(50, 100, 1, 5).write.format("delta").mode("append").save(tablePath) + + // Third commit: files 10-14 (5 more files) + spark.range(100, 150, 1, 5).write.format("delta").mode("append").save(tablePath) + + // This should create: 00000000000000000000.json, 00000000000000000001.json, 00000000000000000002.json + val snapshot = latestSnapshot(tablePath) + val scan = snapshot.getScanBuilder().build().asInstanceOf[ScanImpl] + + // Create a custom engine with batch size 4 + val hadoopConf = new org.apache.hadoop.conf.Configuration() + hadoopConf.set("delta.kernel.default.json.reader.batch-size", "4") + hadoopConf.set("delta.kernel.default.parquet.reader.batch-size", "4") + val customEngine = DefaultEngine.create(hadoopConf) + + // Test first page with page size 8, starting from first JSON file + val paginatedIter = scan.getPaginatedScanFiles(customEngine, 3, 9, + "00000000000000000001.json",-1) // start reading from the second json file + val firstPageFiles = paginatedIter.asScala.toSeq + + assert(firstPageFiles.nonEmpty, "First page should contain files") + + // Verify we got at most 8 AddFiles across all batches + val fileCounts: Seq[Long] = firstPageFiles.map(_.getNumOfTrueRows.toLong) + val totalAddFiles = fileCounts.sum + + // Get pagination state info + val paginatedAddFilesIter = paginatedIter.asInstanceOf[PaginatedAddFilesIterator] + val nextBatchesToSkip = paginatedAddFilesIter.getNewPageToken.getRowIndex() + val nextStartingFile = paginatedAddFilesIter.getNewPageToken.getStartingFileName() + + println(s"nextBatchesToSkip = ${nextBatchesToSkip.toString}") + println(s"nextStartingFile = ${nextStartingFile.toString}") + println(s"totalAddFiles = ${totalAddFiles.toString}") + + assert(totalAddFiles <= 8, s"First page should contain at most 8 files, got $totalAddFiles") + assert(totalAddFiles > 0, s"Should have some files, got $totalAddFiles") + + paginatedIter.close() + + // Verify that pagination spans multiple JSON files using a separate scan instance + val verificationScan = snapshot.getScanBuilder().build() + val allFiles = collectScanFileRows(verificationScan) + val totalFilesInTable = allFiles.length + assert(totalFilesInTable == 15, s"Should have 15 total files, got $totalFilesInTable") + } + } + + test("getPaginatedScanFiles - basic pagination with one checkpoint file " + + "and multiple JSON files") { + withTempDir { tempDir => + val tablePath = tempDir.getCanonicalPath + + // Create many commits to trigger checkpoint creation (checkpoints usually happen every 10 commits) + // First, create 10 commits to trigger a checkpoint + for (i <- 0 until 10) { + val mode = if (i == 0) "overwrite" else "append" + spark.range(i * 10, (i + 1) * 10, 1, 2) + .write.format("delta").mode(mode).save(tablePath) + } + + // Force checkpoint creation + val deltaLog = DeltaLog.forTable(spark, tablePath) + deltaLog.checkpoint() + + // Add a few more commits after checkpoint to create additional JSON files + for (i <- 10 until 13) { + spark.range(i * 10, (i + 1) * 10, 1, 2) + .write.format("delta").mode("append").save(tablePath) + } + + // This should create: 00000000000000000010.checkpoint.parquet(checkpoint file), 00000000000000000011.json, 00000000000000000012.json + val snapshot = latestSnapshot(tablePath) + val scan = snapshot.getScanBuilder().build().asInstanceOf[ScanImpl] + + // Create a custom engine with batch size 5 + val hadoopConf = new org.apache.hadoop.conf.Configuration() + hadoopConf.set("delta.kernel.default.json.reader.batch-size", "5") + hadoopConf.set("delta.kernel.default.parquet.reader.batch-size", "5") + val customEngine = DefaultEngine.create(hadoopConf) + + // Test pagination starting from the checkpoint (should be processed first) + val paginatedIter = scan.getPaginatedScanFiles(customEngine, 5, 10, + "00000000000000000010.checkpoint.parquet", -1) // Start from first JSON file after checkpoint + val firstPageFiles = paginatedIter.asScala.toSeq + + assert(firstPageFiles.nonEmpty, "First page should contain files") + + // Verify we got at most 10 AddFiles across all batches + val fileCounts: Seq[Long] = firstPageFiles.map(_.getNumOfTrueRows.toLong) + val totalAddFiles = fileCounts.sum + + // Get pagination state info + val paginatedAddFilesIter = paginatedIter.asInstanceOf[PaginatedAddFilesIterator] + val nextBatchesToSkip = paginatedAddFilesIter.getNewPageToken.getRowIndex() + val nextStartingFile = paginatedAddFilesIter.getNewPageToken.getStartingFileName() + + assert(totalAddFiles <= 10, s"First page should contain at most 10 files, got $totalAddFiles") + assert(totalAddFiles > 0, s"Should have some files, got $totalAddFiles") + + paginatedIter.close() + + // Verify we have both checkpoint and JSON files in the log using a separate scan instance + val verificationScan = snapshot.getScanBuilder().build() + val allFiles = collectScanFileRows(verificationScan) + val totalFilesInTable = allFiles.length + assert(totalFilesInTable == 26, s"Should have 26 total files (13 commits * 2 files each), got $totalFilesInTable") + } + } + + test("getPaginatedScanFiles - basic pagination with side car checkpoint files and " + + "multiple JSON files") { + // val tablePath = "/home/ada.ma/delta/.bloop/goldenTables/bloop-bsp-clients-classes/classes-Metals-zFK6dCvKR3y5fbr7diWpFQ==/golden/v2-checkpoint-parquet" + val tablePath = goldenTablePath("v2-checkpoint-parquet") + // Create table with deletion vectors to trigger sidecar checkpoint creation + + // Add a few more commits after checkpoint to create additional JSON files + // Disable automatic checkpointing to prevent superseding our multi-part checkpoint + /* + withSQLConf( + "spark.databricks.delta.checkpointInterval" -> "1000" // Very high interval to disable auto-checkpointing + ) { + for (i <- 10 until 13) { + spark.range(i * 40, (i + 1) * 40, 1, 4) + .write.format("delta").mode("append").save(tablePath) + } + } + */ + + // Check what checkpoint files were actually created + val logDir = new java.io.File(s"$tablePath/_delta_log") + val sidecarLogDir = new java.io.File(s"$tablePath/_delta_log/_sidecars") + val checkpointFiles = logDir.listFiles().filter(_.getName.contains("checkpoint")).sortBy(_.getName) + val sidecarFiles = sidecarLogDir.listFiles().filter(_.getName.contains("checkpoint")).sortBy(_.getName) + val jsonFiles = logDir.listFiles().filter(_.getName.endsWith(".json")).sortBy(_.getName) + + /** + * V2-checkpoint file + * 00000000000000000002.checkpoint.e8fa2696-9728-4e9c-b285-634743fdd4fb.parquet + * Sidecar files + * 00000000000000000002.checkpoint.0000000001.0000000002.055454d8-329c-4e0e-864d-7f867075af33.parquet + * 00000000000000000002.checkpoint.0000000002.0000000002.33321cc1-9c55-4d1f-8511-fafe6d2e1133.parquet + * */ + println(s"Final checkpoint files: ${checkpointFiles.map(_.getName).mkString(", ")}") + println(s"Final sidecar checkpoint files: ${sidecarFiles.map(_.getName).mkString(", ")}") + println(s"JSON files: ${jsonFiles.map(_.getName).mkString(", ")}") + + val snapshot = latestSnapshot(tablePath) + val scan = snapshot.getScanBuilder().build().asInstanceOf[ScanImpl] + + // Create a custom engine with batch size 8 + val hadoopConf = new org.apache.hadoop.conf.Configuration() + hadoopConf.set("delta.kernel.default.json.reader.batch-size", "8") + hadoopConf.set("delta.kernel.default.parquet.reader.batch-size", "8") + val customEngine = DefaultEngine.create(hadoopConf) + + // Find the first JSON file after checkpoint for starting pagination + val checkpointVersions = checkpointFiles.map(_.getName) + .filter(_.matches(".*checkpoint.*")) + .filter(!_.endsWith(".crc")) // Filter out CRC files + .filter(_.matches(".*\\d+.*")) // Only include files that contain numbers + .map(name => name.replaceAll(".*?(\\d+).*", "$1").toLong) + .toSeq + val lastCheckpointVersion = if (checkpointVersions.nonEmpty) checkpointVersions.max else 0L + + val startingJsonFile = f"${lastCheckpointVersion + 1}%020d.json" + + // Test pagination starting from first JSON file after checkpoint + // batch skipping logic is problematic for sidecars + val paginatedIter = scan.getPaginatedScanFiles(customEngine, 3, 200, + "00000000000000000002.checkpoint.0000000001.0000000002.055454d8-329c-4e0e-864d-7f867075af33.parquet", 1) + val firstPageFiles = paginatedIter.asScala.toSeq + + assert(firstPageFiles.nonEmpty, "First page should contain files") + + // Verify we got at most 15 AddFiles across all batches + val fileCounts: Seq[Long] = firstPageFiles.map(_.getNumOfTrueRows.toLong) + val totalAddFiles = fileCounts.sum + + // Get pagination state info + val paginatedAddFilesIter = paginatedIter.asInstanceOf[PaginatedAddFilesIterator] + val nextBatchesToSkip = paginatedAddFilesIter.getNewPageToken.getRowIndex() + val nextStartingFile = paginatedAddFilesIter.getNewPageToken.getStartingFileName() + + println(s"Multi-part checkpoint test - nextBatchesToSkip = $nextBatchesToSkip") + println(s"Multi-part checkpoint test - nextStartingFile = $nextStartingFile") + println(s"Multi-part checkpoint test - totalAddFiles = $totalAddFiles") + + assert(totalAddFiles <= 15, s"First page should contain at most 15 files, got $totalAddFiles") + assert(totalAddFiles > 0, s"Should have some files, got $totalAddFiles") + + paginatedIter.close() + + // Verify checkpoint behavior using a separate scan instance + val verificationScan = snapshot.getScanBuilder().build() + val allFiles = collectScanFileRows(verificationScan) + val totalFilesInTable = allFiles.length + + // The exact number depends on how many commits we made, but should be substantial + assert(totalFilesInTable > 100, + s"Should have many files from checkpoint + JSON files, got $totalFilesInTable") + + // Verify checkpoint files exist - if we couldn't create multi-part, at least verify single checkpoint works + assert(checkpointFiles.length >= 1, + s"Should have at least one checkpoint file, found ${checkpointFiles.length}") + + if (checkpointFiles.length > 1) { + println(s"SUCCESS: Created multi-part checkpoint with ${checkpointFiles.length} parts") + } else { + println(s"WARNING: Only created single checkpoint file, multi-part checkpoint creation may need different approach") + } + + } + + test("getPaginatedScanFiles - basic pagination with multi part checkpoint files and " + + "multiple JSON files") { + withTempDir { tempDir => + val tablePath = tempDir.getCanonicalPath + + // Create 10 commits to trigger checkpoint creation + for (i <- 0 until 10) { + val mode = if (i == 0) "overwrite" else "append" + // Create 4 files per commit = 40 total AddFile actions + spark.range(i * 40, (i + 1) * 40, 1, 4) + .write.format("delta").mode(mode).save(tablePath) + } + + // Force multi-part checkpoint creation (3-5 parts) + withSQLConf( + "spark.databricks.delta.checkpoint.partSize" -> "10" // 40 AddFiles ÷ 10 per part = 4 parts + ) { + val deltaLog = DeltaLog.forTable(spark, tablePath) + deltaLog.checkpoint() + } + + // Add a few more commits after checkpoint to create additional JSON files + // Disable automatic checkpointing to prevent superseding our multi-part checkpoint + /* + withSQLConf( + "spark.databricks.delta.checkpointInterval" -> "1000" // Very high interval to disable auto-checkpointing + ) { + for (i <- 10 until 13) { + spark.range(i * 40, (i + 1) * 40, 1, 4) + .write.format("delta").mode("append").save(tablePath) + } + } + */ + + // Check what checkpoint files were actually created + val logDir = new java.io.File(s"$tablePath/_delta_log") + val checkpointFiles = logDir.listFiles().filter(_.getName.contains("checkpoint")).sortBy(_.getName) + val jsonFiles = logDir.listFiles().filter(_.getName.endsWith(".json")).sortBy(_.getName) + + /** + * 00000000000000000009.checkpoint.0000000001.0000000004.parquet, + * 00000000000000000009.checkpoint.0000000002.0000000004.parquet, + * 00000000000000000009.checkpoint.0000000003.0000000004.parquet, + * 00000000000000000009.checkpoint.0000000004.0000000004.parquet, + * 00000000000000000010.checkpoint.parquet + * JSON files: from 0 to 12 + * */ + println(s"Final checkpoint files: ${checkpointFiles.map(_.getName).mkString(", ")}") + println(s"JSON files: ${jsonFiles.map(_.getName).mkString(", ")}") + + val snapshot = latestSnapshot(tablePath) + val scan = snapshot.getScanBuilder().build().asInstanceOf[ScanImpl] + + // Create a custom engine with batch size 8 + val hadoopConf = new org.apache.hadoop.conf.Configuration() + hadoopConf.set("delta.kernel.default.json.reader.batch-size", "8") + hadoopConf.set("delta.kernel.default.parquet.reader.batch-size", "8") + val customEngine = DefaultEngine.create(hadoopConf) + + // Find the first JSON file after checkpoint for starting pagination + val checkpointVersions = checkpointFiles.map(_.getName) + .filter(_.matches(".*checkpoint.*")) + .filter(!_.endsWith(".crc")) // Filter out CRC files + .filter(_.matches(".*\\d+.*")) // Only include files that contain numbers + .map(name => name.replaceAll(".*?(\\d+).*", "$1").toLong) + .toSeq + val lastCheckpointVersion = if (checkpointVersions.nonEmpty) checkpointVersions.max else 0L + + val startingJsonFile = f"${lastCheckpointVersion + 1}%020d.json" + + // Test pagination starting from first JSON file after checkpoint + val paginatedIter = scan.getPaginatedScanFiles(customEngine, 8, 40, "00000000000000000009.checkpoint.0000000002.0000000004.parquet", -1) + val firstPageFiles = paginatedIter.asScala.toSeq + + assert(firstPageFiles.nonEmpty, "First page should contain files") + + // Verify we got at most 15 AddFiles across all batches + val fileCounts: Seq[Long] = firstPageFiles.map(_.getNumOfTrueRows.toLong) + val totalAddFiles = fileCounts.sum + + // Get pagination state info + val paginatedAddFilesIter = paginatedIter.asInstanceOf[PaginatedAddFilesIterator] + val nextBatchesToSkip = paginatedAddFilesIter.getNewPageToken.getRowIndex() + val nextStartingFile = paginatedAddFilesIter.getNewPageToken.getStartingFileName() + + println(s"Multi-part checkpoint test - nextBatchesToSkip = $nextBatchesToSkip") + println(s"Multi-part checkpoint test - nextStartingFile = $nextStartingFile") + println(s"Multi-part checkpoint test - totalAddFiles = $totalAddFiles") + + assert(totalAddFiles <= 15, s"First page should contain at most 15 files, got $totalAddFiles") + assert(totalAddFiles > 0, s"Should have some files, got $totalAddFiles") + + paginatedIter.close() + + // Verify checkpoint behavior using a separate scan instance + val verificationScan = snapshot.getScanBuilder().build() + val allFiles = collectScanFileRows(verificationScan) + val totalFilesInTable = allFiles.length + + // The exact number depends on how many commits we made, but should be substantial + assert(totalFilesInTable > 100, + s"Should have many files from checkpoint + JSON files, got $totalFilesInTable") + + // Verify checkpoint files exist - if we couldn't create multi-part, at least verify single checkpoint works + assert(checkpointFiles.length >= 1, + s"Should have at least one checkpoint file, found ${checkpointFiles.length}") + + if (checkpointFiles.length > 1) { + println(s"SUCCESS: Created multi-part checkpoint with ${checkpointFiles.length} parts") + } else { + println(s"WARNING: Only created single checkpoint file, multi-part checkpoint creation may need different approach") + } + } + } + + + test("getPaginatedScanFiles - basic pagination with a log compaction file " + + "and multiple JSON files") { + + } } object ScanSuite {