Skip to content

fix: Move logic into the thread::scope call so it doesn't hang #1040

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 33 additions & 34 deletions kernel/examples/read-table-multi-threaded/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,7 @@ fn try_main() -> DeltaResult<()> {
// scan_file_[t/r]x are used to send each scan file from the iterator out to the waiting threads
let (mut scan_file_tx, scan_file_rx) = spmc::channel();

// fire up each thread. we don't need the handles as we rely on the channels to indicate when
// things are done
// fire up each thread. they will be automatically joined at the end due to the scope
thread::scope(|s| {
(0..cli.thread_count).for_each(|_| {
// items that we need to send to the other thread
Expand All @@ -190,43 +189,43 @@ fn try_main() -> DeltaResult<()> {
do_work(&engine, scan_state, rb_tx, scan_file_rx);
});
});
});

// have handed out all copies needed, drop so record_batch_rx will exit when the last thread is
// done sending
drop(record_batch_tx);
// have handed out all copies needed, drop so record_batch_rx will exit when the last thread is
Copy link
Collaborator Author

@nicklan nicklan Jul 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you're seeing this comment as being on a change, turn on "Hide Whitespace"

// done sending
drop(record_batch_tx);

for res in scan_metadata {
let scan_metadata = res?;
scan_file_tx = scan_metadata.visit_scan_files(scan_file_tx, send_scan_file)?;
}
for res in scan_metadata {
let scan_metadata = res?;
scan_file_tx = scan_metadata.visit_scan_files(scan_file_tx, send_scan_file)?;
}

// have sent all scan files, drop this so threads will exit when there's no more work
drop(scan_file_tx);

let batches = if let Some(limit) = cli.limit {
// gather batches while we need
let mut batches = vec![];
let mut rows_so_far = 0;
for mut batch in record_batch_rx.iter() {
let batch_rows = batch.num_rows();
if rows_so_far < limit {
if rows_so_far + batch_rows > limit {
// truncate this batch
batch = truncate_batch(batch, limit - rows_so_far);
// have sent all scan files, drop this so threads will exit when there's no more work
drop(scan_file_tx);

let batches = if let Some(limit) = cli.limit {
// gather batches while we need
let mut batches = vec![];
let mut rows_so_far = 0;
for mut batch in record_batch_rx.iter() {
let batch_rows = batch.num_rows();
if rows_so_far < limit {
if rows_so_far + batch_rows > limit {
// truncate this batch
batch = truncate_batch(batch, limit - rows_so_far);
}
batches.push(batch);
}
batches.push(batch);
rows_so_far += batch_rows;
}
rows_so_far += batch_rows;
}
println!("Printing first {limit} rows of {rows_so_far} total rows");
batches
} else {
// simply gather up all batches
record_batch_rx.iter().collect()
};
print_batches(&batches)?;
Ok(())
println!("Printing first {limit} rows of {rows_so_far} total rows");
batches
} else {
// simply gather up all batches
record_batch_rx.iter().collect()
};
print_batches(&batches)?;
Ok(())
})
}

// this is the work each thread does
Expand Down
Loading