Skip to content

[Kernel] [Pagination] New Page Token Class #4848

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Jul 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
/*
* Copyright (2025) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.internal.replay;

import static io.delta.kernel.internal.util.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;

import io.delta.kernel.data.Row;
import io.delta.kernel.internal.data.GenericRow;
import io.delta.kernel.types.*;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

/** Page Token Class for Pagination Support */
public class PageToken {

public static PageToken fromRow(Row row) {
requireNonNull(row);

// Check #1: Correct schema
checkArgument(
PAGE_TOKEN_SCHEMA.equals(row.getSchema()),
String.format(
"Invalid Page Token: input row schema does not match expected PageToken schema."
+ "\nExpected: %s\nGot: %s",
PAGE_TOKEN_SCHEMA, row.getSchema()));

// Check #2: All required fields are present
for (int i = 0; i < PAGE_TOKEN_SCHEMA.length(); i++) {
if (PAGE_TOKEN_SCHEMA.at(i).getName().equals("lastReadSidecarFileIdx")) continue;
checkArgument(
!row.isNullAt(i),
String.format(
"Invalid Page Token: required field '%s' is null at index %d",
PAGE_TOKEN_SCHEMA.at(i).getName(), i));
}

return new PageToken(
row.getString(0), // lastReadLogFileName
row.getLong(1), // lastReturnedRowIndex
Optional.ofNullable(row.isNullAt(2) ? null : row.getLong(2)), // lastReadSidecarFileIdx
row.getString(3), // kernelVersion
row.getString(4), // tablePath
row.getLong(5), // tableVersion
row.getLong(6), // predicateHash
row.getLong(7)); // logSegmentHash
}

public static final StructType PAGE_TOKEN_SCHEMA =
new StructType()
.add("lastReadLogFileName", StringType.STRING, false /* nullable */)
.add("lastReturnedRowIndex", LongType.LONG, false /* nullable */)
.add("lastReadSidecarFileIdx", LongType.LONG, true /* nullable */)
.add("kernelVersion", StringType.STRING, false /* nullable */)
.add("tablePath", StringType.STRING, false /* nullable */)
.add("tableVersion", LongType.LONG, false /* nullable */)
.add("predicateHash", LongType.LONG, false /* nullable */)
.add("logSegmentHash", LongType.LONG, false /* nullable */);

// ===== Variables to mark where the last page ended (and the current page starts) =====

/** The last log file read in the previous page. */
private final String lastReadLogFileName;

/**
* The index of the last row that was returned from the last read log file during the previous
* page. This row index is relative to the file. The current page should begin from the row
* immediately after this row index.
*/
private final long lastReturnedRowIndex;

/**
* Optional index of the last sidecar checkpoint file read in the previous page. This index is
* based on the ordering of sidecar files in the V2 manifest checkpoint file. If present, it must
* represent the final sidecar file that was read and must correspond to the same file as
* `lastReadLogFileName`.
*/
private final Optional<Long> lastReadSidecarFileIdx;

// ===== Variables for validating query params and detecting changes in log segment =====
private final String kernelVersion;
private final String tablePath;
private final long tableVersion;
private final long predicateHash;
private final long logSegmentHash;

public PageToken(
String lastReadLogFileName,
long lastReturnedRowIndex,
Optional<Long> lastReadSidecarFileIdx,
String kernelVersion,
String tablePath,
long tableVersion,
long predicateHash,
long logSegmentHash) {
this.lastReadLogFileName = requireNonNull(lastReadLogFileName, "lastReadLogFileName is null");
this.lastReturnedRowIndex = lastReturnedRowIndex;
this.lastReadSidecarFileIdx = lastReadSidecarFileIdx;
this.kernelVersion = requireNonNull(kernelVersion, "kernelVersion is null");
this.tablePath = requireNonNull(tablePath, "tablePath is null");
this.tableVersion = tableVersion;
this.predicateHash = predicateHash;
this.logSegmentHash = logSegmentHash;
}

public Row toRow() {
Map<Integer, Object> pageTokenMap = new HashMap<>();
pageTokenMap.put(0, lastReadLogFileName);
pageTokenMap.put(1, lastReturnedRowIndex);
pageTokenMap.put(2, lastReadSidecarFileIdx.orElse(null));
pageTokenMap.put(3, kernelVersion);
pageTokenMap.put(4, tablePath);
pageTokenMap.put(5, tableVersion);
pageTokenMap.put(6, predicateHash);
pageTokenMap.put(7, logSegmentHash);

return new GenericRow(PAGE_TOKEN_SCHEMA, pageTokenMap);
}

public String getLastReadLogFileName() {
return lastReadLogFileName;
}

public long getLastReturnedRowIndex() {
return lastReturnedRowIndex;
}

public Optional<Long> getLastReadSidecarFileIdx() {
return lastReadSidecarFileIdx;
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}

PageToken other = (PageToken) obj;

return lastReturnedRowIndex == other.lastReturnedRowIndex
&& tableVersion == other.tableVersion
&& predicateHash == other.predicateHash
&& logSegmentHash == other.logSegmentHash
&& Objects.equals(lastReadSidecarFileIdx, other.lastReadSidecarFileIdx)
&& Objects.equals(lastReadLogFileName, other.lastReadLogFileName)
&& Objects.equals(kernelVersion, other.kernelVersion)
&& Objects.equals(tablePath, other.tablePath);
}

@Override
public int hashCode() {
return Objects.hash(
lastReadLogFileName,
lastReturnedRowIndex,
lastReadSidecarFileIdx,
kernelVersion,
tablePath,
tableVersion,
predicateHash,
logSegmentHash);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/*
* Copyright (2025) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.internal

import java.util
import java.util.{HashMap, Map}
import java.util.Optional

import scala.collection.JavaConverters._

import io.delta.kernel.data.Row
import io.delta.kernel.internal.annotation.VisibleForTesting
import io.delta.kernel.internal.data.GenericRow
import io.delta.kernel.internal.replay.PageToken
import io.delta.kernel.test.MockFileSystemClientUtils
import io.delta.kernel.types._

import org.scalatest.funsuite.AnyFunSuite

class PageTokenSuite extends AnyFunSuite with MockFileSystemClientUtils {

private val TEST_FILE_NAME = "test_file.json"
private val TEST_ROW_INDEX = 42L
private val TEST_SIDECAR_INDEX = Optional.of(java.lang.Long.valueOf(5L))
private val TEST_KERNEL_VERSION = "4.0.0"
private val TEST_TABLE_PATH = "/path/to/table"
private val TEST_TABLE_VERSION = 5L
private val TEST_PREDICATE_HASH = 123L
private val TEST_LOG_SEGMENT_HASH = 456L

private val expectedPageToken = new PageToken(
TEST_FILE_NAME,
TEST_ROW_INDEX,
TEST_SIDECAR_INDEX,
TEST_KERNEL_VERSION,
TEST_TABLE_PATH,
TEST_TABLE_VERSION,
TEST_PREDICATE_HASH,
TEST_LOG_SEGMENT_HASH)

private val rowData: Map[Integer, Object] = new HashMap()
rowData.put(0, TEST_FILE_NAME)
rowData.put(1, TEST_ROW_INDEX.asInstanceOf[Object])
rowData.put(2, TEST_SIDECAR_INDEX.get())
rowData.put(3, TEST_KERNEL_VERSION)
rowData.put(4, TEST_TABLE_PATH)
rowData.put(5, TEST_TABLE_VERSION.asInstanceOf[Object])
rowData.put(6, TEST_PREDICATE_HASH.asInstanceOf[Object])
rowData.put(7, TEST_LOG_SEGMENT_HASH.asInstanceOf[Object])

val expectedRow = new GenericRow(PageToken.PAGE_TOKEN_SCHEMA, rowData)

test("PageToken.fromRow with valid data") {
val pageToken = PageToken.fromRow(expectedRow)
assert(pageToken.equals(expectedPageToken))
}

test("PageToken.toRow with valid data") {
val row = expectedPageToken.toRow
assert(row.getSchema.equals(PageToken.PAGE_TOKEN_SCHEMA))

assert(row.getString(0) == TEST_FILE_NAME)
assert(row.getLong(1) == TEST_ROW_INDEX)
assert(Optional.of(row.getLong(2)) == TEST_SIDECAR_INDEX)
assert(row.getString(3) == TEST_KERNEL_VERSION)
assert(row.getString(4) == TEST_TABLE_PATH)
assert(row.getLong(5) == TEST_TABLE_VERSION)
assert(row.getLong(6) == TEST_PREDICATE_HASH)
assert(row.getLong(7) == TEST_LOG_SEGMENT_HASH)
}

test("E2E: PageToken round-trip: toRow -> fromRow") {
val row = expectedPageToken.toRow
val reconstructedPageToken = PageToken.fromRow(row)
assert(reconstructedPageToken.equals(expectedPageToken))
}

test("PageToken.fromRow throws exception when input row schema has invalid field name") {
val invalidSchema = new StructType()
.add("wrongFieldName", StringType.STRING)
.add("lastReturnedRowIndex", LongType.LONG)
.add("lastReadSidecarFileIdx", LongType.LONG)
.add("kernelVersion", StringType.STRING)
.add("tablePath", StringType.STRING)
.add("tableVersion", LongType.LONG)
.add("predicateHash", LongType.LONG)
.add("logSegmentHash", LongType.LONG)

val invalidRowData: Map[Integer, Object] = new HashMap()
invalidRowData.put(0, TEST_FILE_NAME)
invalidRowData.put(1, TEST_ROW_INDEX.asInstanceOf[Object])
invalidRowData.put(2, TEST_SIDECAR_INDEX)
invalidRowData.put(3, TEST_KERNEL_VERSION)
invalidRowData.put(4, TEST_TABLE_PATH)
invalidRowData.put(5, TEST_TABLE_VERSION.asInstanceOf[Object])
invalidRowData.put(6, TEST_PREDICATE_HASH.asInstanceOf[Object])
invalidRowData.put(7, TEST_LOG_SEGMENT_HASH.asInstanceOf[Object])

val row = new GenericRow(invalidSchema, invalidRowData)
val exception = intercept[IllegalArgumentException] {
PageToken.fromRow(row)
}
assert(exception.getMessage.contains(
"Invalid Page Token: input row schema does not match expected PageToken schema"))
}

test("PageToken.fromRow throws exception when input row schema has wrong data type") {
val invalidSchema = new StructType()
.add("lastReadLogFileName", StringType.STRING)
.add("lastReturnedRowIndex", LongType.LONG)
.add("lastReadSidecarFileIdx", StringType.STRING) // should be long type
.add("kernelVersion", StringType.STRING)
.add("tablePath", StringType.STRING)
.add("tableVersion", LongType.LONG)
.add("predicateHash", LongType.LONG)
.add("logSegmentHash", LongType.LONG)

val invalidRowData: Map[Integer, Object] = new HashMap()
invalidRowData.put(0, TEST_FILE_NAME)
invalidRowData.put(1, TEST_ROW_INDEX.asInstanceOf[Object])
invalidRowData.put(2, TEST_SIDECAR_INDEX)
invalidRowData.put(3, TEST_KERNEL_VERSION)
invalidRowData.put(4, TEST_TABLE_PATH)
invalidRowData.put(5, TEST_TABLE_VERSION.asInstanceOf[Object])
invalidRowData.put(6, TEST_PREDICATE_HASH.asInstanceOf[Object])
invalidRowData.put(7, TEST_LOG_SEGMENT_HASH.asInstanceOf[Object])

val row = new GenericRow(invalidSchema, invalidRowData)
val exception = intercept[IllegalArgumentException] {
PageToken.fromRow(row)
}
assert(exception.getMessage.contains(
"Invalid Page Token: input row schema does not match expected PageToken schema"))
}

test("PageToken.fromRow accepts the case sidecar field is null") {
val nullSidecarData: Map[Integer, Object] = new HashMap(rowData)
nullSidecarData.put(2, null)
val nullSidecarRow = new GenericRow(PageToken.PAGE_TOKEN_SCHEMA, nullSidecarData)
val pageToken = PageToken.fromRow(nullSidecarRow)
assert(pageToken.getLastReadSidecarFileIdx == Optional.empty())
}

test("PageToken.fromRow throws exception when required field is null") {
val invalidData: Map[Integer, Object] = new HashMap(rowData)
invalidData.put(3, null)
val invalidRow = new GenericRow(PageToken.PAGE_TOKEN_SCHEMA, invalidData)
val exception = intercept[IllegalArgumentException] {
PageToken.fromRow(invalidRow)
}
assert(exception.getMessage.contains(
"Invalid Page Token: required field"))
}
}
Loading