diff --git a/kernel/examples/read-table-multi-threaded/src/main.rs b/kernel/examples/read-table-multi-threaded/src/main.rs index 05ec2ef76..0d62a0da7 100644 --- a/kernel/examples/read-table-multi-threaded/src/main.rs +++ b/kernel/examples/read-table-multi-threaded/src/main.rs @@ -174,8 +174,7 @@ fn try_main() -> DeltaResult<()> { // scan_file_[t/r]x are used to send each scan file from the iterator out to the waiting threads let (mut scan_file_tx, scan_file_rx) = spmc::channel(); - // fire up each thread. we don't need the handles as we rely on the channels to indicate when - // things are done + // fire up each thread. they will be automatically joined at the end due to the scope thread::scope(|s| { (0..cli.thread_count).for_each(|_| { // items that we need to send to the other thread @@ -190,43 +189,43 @@ fn try_main() -> DeltaResult<()> { do_work(&engine, scan_state, rb_tx, scan_file_rx); }); }); - }); - // have handed out all copies needed, drop so record_batch_rx will exit when the last thread is - // done sending - drop(record_batch_tx); + // have handed out all copies needed, drop so record_batch_rx will exit when the last thread is + // done sending + drop(record_batch_tx); - for res in scan_metadata { - let scan_metadata = res?; - scan_file_tx = scan_metadata.visit_scan_files(scan_file_tx, send_scan_file)?; - } + for res in scan_metadata { + let scan_metadata = res?; + scan_file_tx = scan_metadata.visit_scan_files(scan_file_tx, send_scan_file)?; + } - // have sent all scan files, drop this so threads will exit when there's no more work - drop(scan_file_tx); - - let batches = if let Some(limit) = cli.limit { - // gather batches while we need - let mut batches = vec![]; - let mut rows_so_far = 0; - for mut batch in record_batch_rx.iter() { - let batch_rows = batch.num_rows(); - if rows_so_far < limit { - if rows_so_far + batch_rows > limit { - // truncate this batch - batch = truncate_batch(batch, limit - rows_so_far); + // have sent all scan files, drop this so threads will exit when there's no more work + drop(scan_file_tx); + + let batches = if let Some(limit) = cli.limit { + // gather batches while we need + let mut batches = vec![]; + let mut rows_so_far = 0; + for mut batch in record_batch_rx.iter() { + let batch_rows = batch.num_rows(); + if rows_so_far < limit { + if rows_so_far + batch_rows > limit { + // truncate this batch + batch = truncate_batch(batch, limit - rows_so_far); + } + batches.push(batch); } - batches.push(batch); + rows_so_far += batch_rows; } - rows_so_far += batch_rows; - } - println!("Printing first {limit} rows of {rows_so_far} total rows"); - batches - } else { - // simply gather up all batches - record_batch_rx.iter().collect() - }; - print_batches(&batches)?; - Ok(()) + println!("Printing first {limit} rows of {rows_so_far} total rows"); + batches + } else { + // simply gather up all batches + record_batch_rx.iter().collect() + }; + print_batches(&batches)?; + Ok(()) + }) } // this is the work each thread does