Skip to content

[Kernel] [Pagination] New Page Token Class #4848

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: master
Choose a base branch
from

Conversation

mmmyr
Copy link
Collaborator

@mmmyr mmmyr commented Jun 27, 2025

Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Description

Introduce a new page token class (for pagination).

How was this patch tested?

PageTokenSuite.scala

Does this PR introduce any user-facing changes?

No.

@mmmyr mmmyr marked this pull request as ready for review June 27, 2025 23:27
@mmmyr mmmyr requested review from scottsand-db and huan233usc June 27, 2025 23:28
Copy link
Collaborator

@huan233usc huan233usc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM with some minor

@mmmyr mmmyr requested review from nicklan and raveeram-db June 28, 2025 00:04
Copy link
Collaborator

@scottsand-db scottsand-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good tests! Left comments and questions on the semantics of PageToken

Comment on lines 21 to 24
+ "Expected: "
+ PAGE_TOKEN_SCHEMA
+ ", Got: "
+ row.getSchema());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think all of this could fit on one line.
You could also use String.format here, too.
Did you want the different schemas printed on different lines? Are you missing some \n here?


// ===== Variables to mark where the last page ended (and the current page starts) =====
/**
* The name of the log file where the current page starts. This is the same as the last log file
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this always be the case?
what if we are able to detect that this columnar batch is the last columnar batch in the file?
will we then set this startingLogFileName to the next file?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now it's always the case. You are right, probably it's better to say: the last log file read in the previous page.

* page. This row index is relative to the file. The current page should begin from the row
* immediately after this row index.
*/
private final long lastReturnedRowIndex;
Copy link
Collaborator

@scottsand-db scottsand-db Jul 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is it the last row index and not the next row index ?

if I read 1000 rows in json file 007.json, which is rows 0 to 999 inclusive, do we return 999 or 1000? I would think we should return 1000?

Copy link
Collaborator Author

@mmmyr mmmyr Jul 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my prototype, I returned 999, and I believe this makes more sense than returning the next row index. For example, suppose 007.json contains 1,000 rows (indexed 0 to 999), and the last page returns rows 0 to 999. If we return "007.json" + row index 1000, it's a bit odd—row index 1000 doesn't actually exist in that file.

Conceptually, I feel like it's more accurate to return the last row index read, along with the last file name read. This way, we're recording the precise position where the last page ended, and we can unambiguously start the next page from the row that comes immediately after. I'll change the variable name "startingLogFileName" into "lastReadLogFileName", and also change "startingSidecarFileIdx" to "lastReadSidecarFileIdx".

Copy link
Collaborator Author

@mmmyr mmmyr Jul 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can either record:

  1. last file & last row index of the previous page.
  2. starting file & starting row index of the current page.

But we won't be able to know the real file name of next batch to read until callers tries to get the next batch. If we want to return the start position of next page, we want an accurate starting file name + starting row index.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll change the variable name "startingLogFileName" into "lastReadLogFileName", and also change "startingSidecarFileIdx" to "lastReadSidecarFileIdx".

SGTM. Thanks for being very clear on this detail and semantic!

Copy link
Collaborator

@scottsand-db scottsand-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great! Left some comments

row.getLong(7)); // logSegmentHash
}

/** Schema for PageToken Row representation */
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: superfluous comment :) you can delete this

/** Schema for PageToken Row representation */
public static final StructType PAGE_TOKEN_SCHEMA =
new StructType()
.add("logFileName", StringType.STRING)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you please explicitly add the nullability here? , false /* nullable */) etc

and make the nullability for the sidecarIndex to true?

"Invalid Page Token: input row schema does not match expected PageToken schema.\nExpected: %s\nGot: %s",
PAGE_TOKEN_SCHEMA, row.getSchema()));

for (int i = 0; i < PAGE_TOKEN_SCHEMA.length(); i++) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a simple comment here would help:

above: // Check #1: Correct schema
here: // Check #2: All required fields are present

.add("logSegmentHash", LongType.LONG);

// ===== Variables to mark where the last page ended (and the current page starts) =====
/** The last log file read in the previous page. */
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: newline between the header block and here

* page. This row index is relative to the file. The current page should begin from the row
* immediately after this row index.
*/
private final long lastReturnedRowIndex;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll change the variable name "startingLogFileName" into "lastReadLogFileName", and also change "startingSidecarFileIdx" to "lastReadSidecarFileIdx".

SGTM. Thanks for being very clear on this detail and semantic!

long predicateHash,
long logSegmentHash) {
this.lastReadLogFileName =
requireNonNull(lastReadLogFileName, "lastReadLogFileName must not be null");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of must not be you can just say x is null --> will this make our lines shorter and let this fit onto one line? much cleaner :)


val expectedRow = new GenericRow(PageToken.PAGE_TOKEN_SCHEMA, rowData)

test("Test PageToken.fromRow with valid data") {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you don't need to say test("Test --> we know this is a test :)

private val rowData: Map[Integer, Object] = new HashMap()
rowData.put(0, TEST_FILE_NAME)
rowData.put(1, TEST_ROW_INDEX.asInstanceOf[Object])
rowData.put(2, TEST_SIDECAR_INDEX.orElse(null))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's not have business logic orElse(null) here.

is it null? make it null
is it not null? then just use that value


assert(row.getString(0) == TEST_FILE_NAME)
assert(row.getLong(1) == TEST_ROW_INDEX)
assert(Optional.of(if (row.isNullAt(2)) null else row.getLong(2)) == TEST_SIDECAR_INDEX)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's not have business logic here isNullAt

if it is null, require and assert that it is null

if not, require and assert that it is the value we are expecting

assert(reconstructedPageToken.equals(expectedPageToken))
}

test("PageToken.fromRow throws exception when input row has invalid schema") {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we test:

  • wrong field name
  • wrong data type
  • also explicitly test the sidecar-can-be-null case

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants