Skip to content

[Kernel] [Prototype] Kernel Pagination Support #4786

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 7 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions kernel/kernel-api/src/main/java/io/delta/kernel/PaginatedScan.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package io.delta.kernel;

import io.delta.kernel.data.ColumnarBatch;
import io.delta.kernel.data.FilteredColumnarBatch;
import io.delta.kernel.data.Row;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.utils.CloseableIterator;

public interface PaginatedScan extends Scan {

@Override
CloseableIterator<FilteredColumnarBatch> getScanFiles(Engine engine);

Row getNewPageToken();

ColumnarBatch getTombStoneHashsets();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package io.delta.kernel;

import io.delta.kernel.annotation.Evolving;
import io.delta.kernel.data.ColumnarBatch;
import io.delta.kernel.data.Row;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.expressions.Predicate;
import io.delta.kernel.types.StructType;
Expand Down Expand Up @@ -65,4 +67,10 @@ public interface ScanBuilder {

/** @return Build the {@link Scan instance} */
Scan build();

/** Build a Paginated Scan here */
PaginatedScan buildPaginatedScan(Row pageToken, long pageSize);

/** Build a Paginated Scan with optional tombstone hashsets injected * */
PaginatedScan buildPaginatedScan(Row pageToken, long pageSize, ColumnarBatch tombstone);
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,25 @@
public class FilteredColumnarBatch {
private final ColumnarBatch data;
private final Optional<ColumnVector> selectionVector;
private final Optional<Long> numOfTrueRows;
private final Optional<String> fileName; // which file this batch belongs to

public FilteredColumnarBatch(
ColumnarBatch data,
Optional<ColumnVector> selectionVector,
long numOfTrueRows,
String fileName) {
this.data = data;
this.selectionVector = selectionVector;
this.numOfTrueRows = Optional.of(numOfTrueRows);
this.fileName = Optional.ofNullable(fileName);
}

public FilteredColumnarBatch(ColumnarBatch data, Optional<ColumnVector> selectionVector) {
this.data = data;
this.selectionVector = selectionVector;
this.numOfTrueRows = Optional.empty();
this.fileName = Optional.empty();
}

/**
Expand All @@ -64,6 +79,14 @@ public Optional<ColumnVector> getSelectionVector() {
return selectionVector;
}

public Long getNumOfTrueRows() {
return this.numOfTrueRows.get();
}

public String getFileName() {
return this.fileName.get();
}

/**
* Iterator of rows that survived the filter.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.delta.kernel.data.*;
import io.delta.kernel.expressions.Column;
import io.delta.kernel.expressions.Predicate;
import io.delta.kernel.internal.util.Tuple2;
import io.delta.kernel.types.StructField;
import io.delta.kernel.types.StructType;
import io.delta.kernel.utils.*;
Expand Down Expand Up @@ -56,9 +57,9 @@ public interface ParquetHandler {
* @param predicate Optional predicate which the Parquet reader can optionally use to prune rows
* that don't satisfy the predicate. Because pruning is optional and may be incomplete, caller
* is still responsible apply the predicate on the data returned by this method.
* @return an iterator of {@link ColumnarBatch}s containing the data in columnar format. It is the
* responsibility of the caller to close the iterator. The data returned is in the same as the
* order of files given in {@code scanFileIter}.
* @return an iterator of {@link Tuple2} containing the filename and {@link ColumnarBatch} pairs
* in columnar format. It is the responsibility of the caller to close the iterator. The data
* returned is in the same as the order of files given in {@code scanFileIter}.
* @throws IOException if an I/O error occurs during the read.
*/
CloseableIterator<ColumnarBatch> readParquetFiles(
Expand All @@ -67,6 +68,43 @@ CloseableIterator<ColumnarBatch> readParquetFiles(
Optional<Predicate> predicate)
throws IOException;

/**
* Read the Parquet format files at the given locations and return the data as tuples of filename
* and {@link ColumnarBatch} with the columns requested by {@code physicalSchema}.
*
* <p>This method is similar to {@link #readParquetFiles(CloseableIterator, StructType, Optional)}
* but returns tuples where the first element is the filename and the second element is the
* columnar batch from that file.
*
* <p>If {@code physicalSchema} has a {@link StructField} with column name {@link
* StructField#METADATA_ROW_INDEX_COLUMN_NAME} and the field is a metadata column {@link
* StructField#isMetadataColumn()} the column must be populated with the file row index.
*
* <p>How does a column in {@code physicalSchema} match to the column in the Parquet file? If the
* {@link StructField} has a field id in the {@code metadata} with key `parquet.field.id` the
* column is attempted to match by id. If the column is not found by id, the column is matched by
* name. When trying to find the column in Parquet by name, first case-sensitive match is used. If
* not found then a case-insensitive match is attempted.
*
* @param fileIter Iterator of files to read data from.
* @param physicalSchema Select list of columns to read from the Parquet file.
* @param predicate Optional predicate which the Parquet reader can optionally use to prune rows
* that don't satisfy the predicate. Because pruning is optional and may be incomplete, caller
* is still responsible apply the predicate on the data returned by this method.
* @return an iterator of {@link Tuple2} containing filename and {@link ColumnarBatch} pairs in
* columnar format. It is the responsibility of the caller to close the iterator. The data
* returned is in the same as the order of files given in {@code fileIter}.
* @throws IOException if an I/O error occurs during the read.
*/
default CloseableIterator<Tuple2<String, ColumnarBatch>> readParquetFiles2(
CloseableIterator<FileStatus> fileIter,
StructType physicalSchema,
Optional<Predicate> predicate)
throws IOException {
throw new UnsupportedOperationException(
"readParquetFiles2 is not supported by this implementation");
}

/**
* Write the given data batches to a Parquet files. Try to keep the Parquet file size to given
* size. If the current file exceeds this size close the current file and start writing to a new
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package io.delta.kernel.internal;

import io.delta.kernel.PaginatedScan;
import io.delta.kernel.data.ColumnarBatch;
import io.delta.kernel.data.FilteredColumnarBatch;
import io.delta.kernel.data.Row;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.expressions.Predicate;
import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.actions.Protocol;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.replay.*;
import io.delta.kernel.metrics.SnapshotReport;
import io.delta.kernel.types.StructType;
import io.delta.kernel.utils.CloseableIterator;
import java.util.HashSet;
import java.util.Optional;

public class PaginatedScanImpl implements PaginatedScan {

private final long pageSize;
private final PageToken pageToken;
private final ScanImpl baseScan;
private PaginatedAddFilesIterator paginatedIter;

public PaginatedScanImpl(
StructType snapshotSchema,
StructType readSchema,
Protocol protocol,
Metadata metadata,
LogReplay logReplay,
Optional<Predicate> filter,
Path dataPath,
SnapshotReport snapshotReport,
Row pageTokenInRow,
long pageSize) {
baseScan =
new ScanImpl(
snapshotSchema,
readSchema,
protocol,
metadata,
logReplay,
filter,
dataPath,
snapshotReport);
assert pageTokenInRow != null;
this.pageToken = decodePageToken(pageTokenInRow);
// TODO: validation 1. LopReplay.getLogSegment() 2. data Path 3. snapshotReport.getVersion();
// [maybe can use snapshotReport.getCheckpointVersion()]
// TODO: not sure how to check: 1. predicate(filter) 2. Kernel Version ID
this.pageSize = pageSize;
}

@Override
public CloseableIterator<FilteredColumnarBatch> getScanFiles(Engine engine) {
return this.getScanFiles(engine, false);
}

@Override
public Optional<Predicate> getRemainingFilter() {
return baseScan.getRemainingFilter();
}

@Override
public Row getScanState(Engine engine) {
return baseScan.getScanState(engine);
}

public CloseableIterator<FilteredColumnarBatch> getScanFiles(
Engine engine, boolean includeStates) {
// TODO: update code here
boolean isHashSetCached = false;
HashSet<LogReplayUtils.UniqueFileActionTuple> tombstonesFromJson = new HashSet<>();
HashSet<LogReplayUtils.UniqueFileActionTuple> addFilesFromJson = new HashSet<>();

PaginationContext paginationContext =
new PaginationContext(
pageToken.getStartingFileName(),
pageToken.getRowIndex(),
pageToken.getSidecarIdx(),
pageSize,
isHashSetCached,
tombstonesFromJson,
addFilesFromJson);

CloseableIterator<FilteredColumnarBatch> scanFileIter =
baseScan.getScanFiles(engine, includeStates, paginationContext);
this.paginatedIter = new PaginatedAddFilesIterator(scanFileIter, paginationContext);
return paginatedIter;
}

// TODO: implement following methods
private PageToken decodePageToken(Row pageTokenInRow) {
return PageToken.fromRow(pageTokenInRow);
}

@Override
public Row getNewPageToken() {
return paginatedIter.getNewPageToken().getRow();
}

@Override
public ColumnarBatch getTombStoneHashsets() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@

package io.delta.kernel.internal;

import io.delta.kernel.PaginatedScan;
import io.delta.kernel.Scan;
import io.delta.kernel.ScanBuilder;
import io.delta.kernel.data.ColumnarBatch;
import io.delta.kernel.data.Row;
import io.delta.kernel.expressions.Predicate;
import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.actions.Protocol;
Expand Down Expand Up @@ -85,4 +88,24 @@ public Scan build() {
dataPath,
snapshotReport);
}

@Override
public PaginatedScan buildPaginatedScan(Row pageToken, long pageSize) {
return new PaginatedScanImpl(
snapshotSchema,
readSchema,
protocol,
metadata,
logReplay,
predicate,
dataPath,
snapshotReport,
pageToken,
pageSize);
}

@Override
public PaginatedScan buildPaginatedScan(Row pageToken, long pageSize, ColumnarBatch tombstones) {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,16 @@
import io.delta.kernel.expressions.*;
import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.actions.Protocol;
import io.delta.kernel.internal.annotation.VisibleForTesting;
import io.delta.kernel.internal.data.ScanStateRow;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.metrics.ScanMetrics;
import io.delta.kernel.internal.metrics.ScanReportImpl;
import io.delta.kernel.internal.metrics.Timer;
import io.delta.kernel.internal.replay.LogReplay;
import io.delta.kernel.internal.replay.LogReplayUtils;
import io.delta.kernel.internal.replay.PaginatedAddFilesIterator;
import io.delta.kernel.internal.replay.PaginationContext;
import io.delta.kernel.internal.skipping.DataSkippingPredicate;
import io.delta.kernel.internal.skipping.DataSkippingUtils;
import io.delta.kernel.internal.util.*;
Expand Down Expand Up @@ -105,6 +109,11 @@ public CloseableIterator<FilteredColumnarBatch> getScanFiles(Engine engine) {
return getScanFiles(engine, false);
}

public CloseableIterator<FilteredColumnarBatch> getScanFiles(
Engine engine, boolean includeStats) {
return getScanFiles(engine, includeStats, null);
}

/**
* Get an iterator of data files in this version of scan that survived the predicate pruning.
*
Expand All @@ -117,8 +126,11 @@ public CloseableIterator<FilteredColumnarBatch> getScanFiles(Engine engine) {
* @param includeStats whether to read and include the JSON statistics
* @return the surviving scan files as {@link FilteredColumnarBatch}s
*/
public CloseableIterator<FilteredColumnarBatch> getScanFiles(
Engine engine, boolean includeStats) {
protected CloseableIterator<FilteredColumnarBatch> getScanFiles(
Engine engine,
boolean includeStats,
PaginationContext paginationContext) { // inject two hash set here

if (accessedScanFiles) {
throw new IllegalStateException("Scan files are already fetched from this instance");
}
Expand Down Expand Up @@ -165,7 +177,8 @@ public CloseableIterator<FilteredColumnarBatch> getScanFiles(
predicate ->
rewritePartitionPredicateOnCheckpointFileSchema(
predicate, partitionColToStructFieldMap.get())),
scanMetrics);
scanMetrics,
paginationContext);

// Apply partition pruning
scanFileIter = applyPartitionPruning(engine, scanFileIter);
Expand All @@ -185,6 +198,35 @@ public CloseableIterator<FilteredColumnarBatch> getScanFiles(
}
}

/** Only used for testing */
@VisibleForTesting
public CloseableIterator<FilteredColumnarBatch> getPaginatedScanFiles(
Engine engine,
long numOfAddFilesToSkip,
long pageSize,
String startingLogFileName,
long sidecarIdx) {
// fetch hashset here
boolean isHashSetCached = false;
HashSet<LogReplayUtils.UniqueFileActionTuple> tombstonesFromJson = new HashSet<>();
HashSet<LogReplayUtils.UniqueFileActionTuple> addFilesFromJson = new HashSet<>();
PaginationContext paginationContext =
new PaginationContext(
startingLogFileName,
numOfAddFilesToSkip,
sidecarIdx,
pageSize,
isHashSetCached,
tombstonesFromJson,
addFilesFromJson);
CloseableIterator<FilteredColumnarBatch> scanFileIter =
getScanFiles(engine, false, paginationContext);
System.out.println("fetch the original iterator successfully");
CloseableIterator<FilteredColumnarBatch> paginatedIter =
new PaginatedAddFilesIterator(scanFileIter, paginationContext);
return paginatedIter;
}

@Override
public Row getScanState(Engine engine) {
// Physical equivalent of the logical read schema.
Expand Down
Loading