Skip to content

Commit 38035c9

Browse files
committed
Move logic into the thread::scope call so it doesn't hang
1 parent 4a13e74 commit 38035c9

File tree

1 file changed

+35
-35
lines changed
  • kernel/examples/read-table-multi-threaded/src

1 file changed

+35
-35
lines changed

kernel/examples/read-table-multi-threaded/src/main.rs

Lines changed: 35 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -174,9 +174,8 @@ fn try_main() -> DeltaResult<()> {
174174
// scan_file_[t/r]x are used to send each scan file from the iterator out to the waiting threads
175175
let (mut scan_file_tx, scan_file_rx) = spmc::channel();
176176

177-
// fire up each thread. we don't need the handles as we rely on the channels to indicate when
178-
// things are done
179-
thread::scope(|s| {
177+
// fire up each thread. they will be automatically joined at the end due to the scope
178+
thread::scope(|s| -> DeltaResult<()> {
180179
(0..cli.thread_count).for_each(|_| {
181180
// items that we need to send to the other thread
182181
let scan_state = Arc::new(ScanState {
@@ -190,43 +189,44 @@ fn try_main() -> DeltaResult<()> {
190189
do_work(&engine, scan_state, rb_tx, scan_file_rx);
191190
});
192191
});
193-
});
194192

195-
// have handed out all copies needed, drop so record_batch_rx will exit when the last thread is
196-
// done sending
197-
drop(record_batch_tx);
193+
// have handed out all copies needed, drop so record_batch_rx will exit when the last thread is
194+
// done sending
195+
drop(record_batch_tx);
198196

199-
for res in scan_metadata {
200-
let scan_metadata = res?;
201-
scan_file_tx = scan_metadata.visit_scan_files(scan_file_tx, send_scan_file)?;
202-
}
197+
for res in scan_metadata {
198+
let scan_metadata = res?;
199+
scan_file_tx = scan_metadata.visit_scan_files(scan_file_tx, send_scan_file)?;
200+
}
203201

204-
// have sent all scan files, drop this so threads will exit when there's no more work
205-
drop(scan_file_tx);
206-
207-
let batches = if let Some(limit) = cli.limit {
208-
// gather batches while we need
209-
let mut batches = vec![];
210-
let mut rows_so_far = 0;
211-
for mut batch in record_batch_rx.iter() {
212-
let batch_rows = batch.num_rows();
213-
if rows_so_far < limit {
214-
if rows_so_far + batch_rows > limit {
215-
// truncate this batch
216-
batch = truncate_batch(batch, limit - rows_so_far);
202+
// have sent all scan files, drop this so threads will exit when there's no more work
203+
drop(scan_file_tx);
204+
205+
let batches = if let Some(limit) = cli.limit {
206+
// gather batches while we need
207+
let mut batches = vec![];
208+
let mut rows_so_far = 0;
209+
for mut batch in record_batch_rx.iter() {
210+
let batch_rows = batch.num_rows();
211+
if rows_so_far < limit {
212+
if rows_so_far + batch_rows > limit {
213+
// truncate this batch
214+
batch = truncate_batch(batch, limit - rows_so_far);
215+
}
216+
batches.push(batch);
217217
}
218-
batches.push(batch);
218+
rows_so_far += batch_rows;
219219
}
220-
rows_so_far += batch_rows;
221-
}
222-
println!("Printing first {limit} rows of {rows_so_far} total rows");
223-
batches
224-
} else {
225-
// simply gather up all batches
226-
record_batch_rx.iter().collect()
227-
};
228-
print_batches(&batches)?;
229-
Ok(())
220+
println!("Printing first {limit} rows of {rows_so_far} total rows");
221+
batches
222+
} else {
223+
// simply gather up all batches
224+
record_batch_rx.iter().collect()
225+
};
226+
227+
print_batches(&batches)?;
228+
Ok(())
229+
})
230230
}
231231

232232
// this is the work each thread does

0 commit comments

Comments
 (0)