Skip to content

Commit 0c51ea6

Browse files
committed
Refactor page submission to reduce code duplication
Extract common functionality from inner_submit_pages_concurrent and inner_submit_pages_chunked into a shared submit_pages_batch helper function. This reduces code duplication while preserving the distinct behavior of each submission strategy: - Concurrent: Submit all pages in one batch - Chunked: Submit pages in multiple batches, waiting for each batch to complete before starting the next The refactoring improves maintainability and makes the code easier to understand without changing the underlying functionality or performance characteristics.
1 parent 1cdf8b5 commit 0c51ea6

File tree

2 files changed

+92
-73
lines changed

2 files changed

+92
-73
lines changed

src/commands/multi_block/monitor.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ where
4949
.fetch(&runtime::storage().system().account(signer.account_id()))
5050
.await?
5151
.ok_or(Error::AccountDoesNotExists)?;
52-
5352
prometheus::set_balance(account_info.data.free as f64);
5453

5554
log::info!(

src/dynamic/multi_block.rs

Lines changed: 92 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -430,28 +430,28 @@ where
430430
}
431431
}
432432

433-
/// Submit all solution pages concurrently.
434-
pub(crate) async fn inner_submit_pages_concurrent<T: MinerConfig + Send + Sync + 'static>(
433+
/// Helper function to submit a batch of pages and wait for their inclusion in blocks
434+
async fn submit_pages_batch<T: MinerConfig + Send + Sync + 'static>(
435435
client: &Client,
436436
signer: &Signer,
437-
paged_raw_solution: Vec<(u32, T::Solution)>,
437+
pages_to_submit: Vec<(u32, T::Solution)>,
438438
listen: Listen,
439-
) -> Result<Vec<u32>, Error>
439+
) -> Result<(Vec<u32>, HashSet<u32>), Error>
440440
where
441441
T::Solution: Send + Sync + 'static,
442442
T::Pages: Send + Sync + 'static,
443443
{
444444
let mut txs = FuturesUnordered::new();
445-
446445
let mut nonce = client
447446
.rpc()
448447
.system_account_next_index(signer.account_id())
449448
.await?;
450449

451-
let len = paged_raw_solution.len();
450+
// Collect expected pages before consuming the vector
451+
let expected_pages: HashSet<u32> = pages_to_submit.iter().map(|(page, _)| *page).collect();
452452

453-
// 1. Submit all solution pages.
454-
for (page, solution) in paged_raw_solution.into_iter() {
453+
// 1. Submit all pages in the batch
454+
for (page, solution) in pages_to_submit.into_iter() {
455455
let tx_status = submit_inner(
456456
client,
457457
signer.clone(),
@@ -470,17 +470,18 @@ where
470470
nonce += 1;
471471
}
472472

473-
// 2. Wait for all pages to be included in a block.
474-
let mut failed_pages = Vec::new();
473+
// 2. Wait for all pages in the batch to be included in a block
474+
let mut failed_pages_set = HashSet::new();
475475
let mut submitted_pages = HashSet::new();
476476

477+
// 3. Process all transactions
477478
while let Some(page) = txs.next().await {
478479
match page {
479480
Ok(tx) => {
480481
let hash = tx.block_hash();
481-
482-
// NOTE: It's slow to iterate over the events and that's we are
483-
// submitting all pages "at once" and several pages are submitted in the same block.
482+
// NOTE: It's slow to iterate over the events and that's the main reason why
483+
// submitting all pages "at once" with several pages submitted in the same block
484+
// is faster than a sequential or chuncked submission.
484485
let events = tx.wait_for_success().await?;
485486
for event in events.iter() {
486487
let event = event?;
@@ -500,14 +501,59 @@ where
500501
}
501502
}
502503
}
504+
// Transaction failed to be included in a block.
505+
// This happens when the transaction itself was rejected or failed
503506
Err(p) => {
504-
failed_pages.push(p);
507+
failed_pages_set.insert(p);
505508
}
506509
}
510+
}
507511

508-
if submitted_pages.len() == len {
509-
return Ok(vec![]);
510-
}
512+
// 4. Check if all expected pages were included.
513+
// This handles cases where the transaction was submitted but we didn't get confirmation.
514+
let missing_pages: HashSet<u32> = expected_pages
515+
.difference(&submitted_pages)
516+
.cloned()
517+
.collect();
518+
519+
// 5. Add missing pages to failed pages set.
520+
// This combines both types of failures (transactions not included in a block and transactions not confirmed)
521+
// into a single set of failed pages.
522+
failed_pages_set.extend(missing_pages);
523+
524+
if !failed_pages_set.is_empty() {
525+
log::warn!(
526+
target: LOG_TARGET,
527+
"Some pages were not included in blocks: {:?}",
528+
failed_pages_set
529+
);
530+
}
531+
532+
let failed_pages: Vec<u32> = failed_pages_set.into_iter().collect();
533+
534+
Ok((failed_pages, submitted_pages))
535+
}
536+
537+
/// Submit all solution pages concurrently.
538+
pub(crate) async fn inner_submit_pages_concurrent<T: MinerConfig + Send + Sync + 'static>(
539+
client: &Client,
540+
signer: &Signer,
541+
paged_raw_solution: Vec<(u32, T::Solution)>,
542+
listen: Listen,
543+
) -> Result<Vec<u32>, Error>
544+
where
545+
T::Solution: Send + Sync + 'static,
546+
T::Pages: Send + Sync + 'static,
547+
{
548+
let len = paged_raw_solution.len();
549+
550+
// Submit all pages in a single batch
551+
let (failed_pages, submitted_pages) =
552+
submit_pages_batch::<T>(client, signer, paged_raw_solution, listen).await?;
553+
554+
// If all pages were submitted successfully, we're done
555+
if submitted_pages.len() == len {
556+
return Ok(vec![]);
511557
}
512558

513559
Ok(failed_pages)
@@ -526,79 +572,53 @@ where
526572
T::Solution: Send + Sync + 'static,
527573
T::Pages: Send + Sync + 'static,
528574
{
575+
assert!(chunk_size > 0, "Chunk size must be greater than 0");
576+
529577
let mut failed_pages = Vec::new();
530578
let mut submitted_pages = HashSet::new();
531579
let total_pages = paged_raw_solution.len();
532580

533581
// Process pages in chunks
534582
for chunk_start in (0..total_pages).step_by(chunk_size) {
535583
let chunk_end = std::cmp::min(chunk_start + chunk_size, total_pages);
536-
let chunk = &paged_raw_solution[chunk_start..chunk_end];
584+
let chunk = paged_raw_solution[chunk_start..chunk_end].to_vec();
585+
586+
// Get the actual page numbers in this chunk for logging
587+
let chunk_page_numbers: Vec<u32> = chunk.iter().map(|(page, _)| *page).collect();
537588

538589
log::info!(
539590
target: LOG_TARGET,
540-
"Submitting chunk of pages {}-{} of {}",
541-
chunk_start,
542-
chunk_end - 1,
591+
"Submitting pages {:?} (out of {})",
592+
chunk_page_numbers,
543593
total_pages
544594
);
545595

546-
let mut txs = FuturesUnordered::new();
547-
let mut nonce = client
548-
.rpc()
549-
.system_account_next_index(signer.account_id())
550-
.await?;
596+
// Submit the current chunk
597+
let (chunk_failed_pages, chunk_submitted_pages) =
598+
submit_pages_batch::<T>(client, signer, chunk, listen).await?;
551599

552-
// Submit all pages in the current chunk
553-
for (page, solution) in chunk.iter() {
554-
let tx_status = submit_inner(
555-
client,
556-
signer.clone(),
557-
MultiBlockTransaction::submit_page::<T>(*page, Some(solution.clone()))?,
558-
nonce,
559-
)
560-
.await?;
600+
// Check if we have failed pages before extending the overall lists
601+
if !chunk_failed_pages.is_empty() {
602+
log::warn!(
603+
target: LOG_TARGET,
604+
"Pages {:?} failed to be included in blocks",
605+
chunk_failed_pages
606+
);
561607

562-
txs.push(async move {
563-
match utils::wait_tx_in_block_for_strategy(tx_status, listen).await {
564-
Ok(tx) => Ok(tx),
565-
Err(_) => Err(*page),
566-
}
567-
});
608+
// Add the failed pages from this chunk to the overall list
609+
failed_pages.extend(chunk_failed_pages);
568610

569-
nonce += 1;
611+
return Ok(failed_pages);
570612
}
571613

572-
// Wait for all pages in the current chunk to be included in a block
573-
while let Some(page) = txs.next().await {
574-
match page {
575-
Ok(tx) => {
576-
let hash = tx.block_hash();
577-
578-
let events = tx.wait_for_success().await?;
579-
for event in events.iter() {
580-
let event = event?;
581-
582-
if let Some(solution_stored) =
583-
event.as_event::<runtime::multi_block_signed::events::Stored>()?
584-
{
585-
let page = solution_stored.2;
586-
587-
log::info!(
588-
target: LOG_TARGET,
589-
"Page {page} included in block {:?}",
590-
hash
591-
);
592-
593-
submitted_pages.insert(solution_stored.2);
594-
}
595-
}
596-
}
597-
Err(p) => {
598-
failed_pages.push(p);
599-
}
600-
}
601-
}
614+
// Add submitted pages to the overall set
615+
submitted_pages.extend(chunk_submitted_pages);
616+
617+
log::info!(
618+
target: LOG_TARGET,
619+
"All pages {:?} were successfully included in blocks",
620+
chunk_page_numbers
621+
);
602622

603623
// If all pages have been submitted, we're done
604624
if submitted_pages.len() == total_pages {

0 commit comments

Comments
 (0)