Skip to content

Commit 32f85fd

Browse files
authored
fix: Move logic into the thread::scope call so it doesn't hang (#1040)
## What changes are proposed in this pull request? Previously in #957 we moved the code to use [`thread::scope`](https://doc.rust-lang.org/std/thread/fn.scope.html). This allows not having to clone an engine arc, but also means that all the threads are implicitly joined at the end of the `scope` call. This meant that we waited for the threads to exit before ever sending them any work, so the version on main hangs 😱 . Instead we need to move all the rest of the logic inside the scope call and only let the implicit joining happen at the end. ## How was this change tested? Running the program on a few tables
1 parent 54d1994 commit 32f85fd

File tree

1 file changed

+33
-34
lines changed
  • kernel/examples/read-table-multi-threaded/src

1 file changed

+33
-34
lines changed

kernel/examples/read-table-multi-threaded/src/main.rs

Lines changed: 33 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -174,8 +174,7 @@ fn try_main() -> DeltaResult<()> {
174174
// scan_file_[t/r]x are used to send each scan file from the iterator out to the waiting threads
175175
let (mut scan_file_tx, scan_file_rx) = spmc::channel();
176176

177-
// fire up each thread. we don't need the handles as we rely on the channels to indicate when
178-
// things are done
177+
// fire up each thread. they will be automatically joined at the end due to the scope
179178
thread::scope(|s| {
180179
(0..cli.thread_count).for_each(|_| {
181180
// items that we need to send to the other thread
@@ -190,43 +189,43 @@ fn try_main() -> DeltaResult<()> {
190189
do_work(&engine, scan_state, rb_tx, scan_file_rx);
191190
});
192191
});
193-
});
194192

195-
// have handed out all copies needed, drop so record_batch_rx will exit when the last thread is
196-
// done sending
197-
drop(record_batch_tx);
193+
// have handed out all copies needed, drop so record_batch_rx will exit when the last thread is
194+
// done sending
195+
drop(record_batch_tx);
198196

199-
for res in scan_metadata {
200-
let scan_metadata = res?;
201-
scan_file_tx = scan_metadata.visit_scan_files(scan_file_tx, send_scan_file)?;
202-
}
197+
for res in scan_metadata {
198+
let scan_metadata = res?;
199+
scan_file_tx = scan_metadata.visit_scan_files(scan_file_tx, send_scan_file)?;
200+
}
203201

204-
// have sent all scan files, drop this so threads will exit when there's no more work
205-
drop(scan_file_tx);
206-
207-
let batches = if let Some(limit) = cli.limit {
208-
// gather batches while we need
209-
let mut batches = vec![];
210-
let mut rows_so_far = 0;
211-
for mut batch in record_batch_rx.iter() {
212-
let batch_rows = batch.num_rows();
213-
if rows_so_far < limit {
214-
if rows_so_far + batch_rows > limit {
215-
// truncate this batch
216-
batch = truncate_batch(batch, limit - rows_so_far);
202+
// have sent all scan files, drop this so threads will exit when there's no more work
203+
drop(scan_file_tx);
204+
205+
let batches = if let Some(limit) = cli.limit {
206+
// gather batches while we need
207+
let mut batches = vec![];
208+
let mut rows_so_far = 0;
209+
for mut batch in record_batch_rx.iter() {
210+
let batch_rows = batch.num_rows();
211+
if rows_so_far < limit {
212+
if rows_so_far + batch_rows > limit {
213+
// truncate this batch
214+
batch = truncate_batch(batch, limit - rows_so_far);
215+
}
216+
batches.push(batch);
217217
}
218-
batches.push(batch);
218+
rows_so_far += batch_rows;
219219
}
220-
rows_so_far += batch_rows;
221-
}
222-
println!("Printing first {limit} rows of {rows_so_far} total rows");
223-
batches
224-
} else {
225-
// simply gather up all batches
226-
record_batch_rx.iter().collect()
227-
};
228-
print_batches(&batches)?;
229-
Ok(())
220+
println!("Printing first {limit} rows of {rows_so_far} total rows");
221+
batches
222+
} else {
223+
// simply gather up all batches
224+
record_batch_rx.iter().collect()
225+
};
226+
print_batches(&batches)?;
227+
Ok(())
228+
})
230229
}
231230

232231
// this is the work each thread does

0 commit comments

Comments
 (0)