Skip to content

Commit 79031de

Browse files
committed
Refactor page submission to reduce code duplication
Extract common functionality from inner_submit_pages_concurrent and inner_submit_pages_chunked into a shared submit_pages_batch helper function. This reduces code duplication while preserving the distinct behavior of each submission strategy: - Concurrent: Submit all pages in one batch - Chunked: Submit pages in multiple batches, waiting for each batch to complete before starting the next The refactoring improves maintainability and makes the code easier to understand without changing the underlying functionality or performance characteristics.
1 parent 1cdf8b5 commit 79031de

File tree

2 files changed

+92
-75
lines changed

2 files changed

+92
-75
lines changed

src/commands/multi_block/monitor.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ where
4949
.fetch(&runtime::storage().system().account(signer.account_id()))
5050
.await?
5151
.ok_or(Error::AccountDoesNotExists)?;
52-
5352
prometheus::set_balance(account_info.data.free as f64);
5453

5554
log::info!(

src/dynamic/multi_block.rs

Lines changed: 92 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -430,57 +430,56 @@ where
430430
}
431431
}
432432

433-
/// Submit all solution pages concurrently.
434-
pub(crate) async fn inner_submit_pages_concurrent<T: MinerConfig + Send + Sync + 'static>(
433+
/// Helper function to submit a batch of pages and wait for their inclusion in blocks
434+
async fn submit_pages_batch<T: MinerConfig + Send + Sync + 'static>(
435435
client: &Client,
436436
signer: &Signer,
437-
paged_raw_solution: Vec<(u32, T::Solution)>,
437+
pages_to_submit: &[(u32, T::Solution)],
438438
listen: Listen,
439-
) -> Result<Vec<u32>, Error>
439+
) -> Result<(Vec<u32>, HashSet<u32>), Error>
440440
where
441441
T::Solution: Send + Sync + 'static,
442442
T::Pages: Send + Sync + 'static,
443443
{
444444
let mut txs = FuturesUnordered::new();
445-
446445
let mut nonce = client
447446
.rpc()
448447
.system_account_next_index(signer.account_id())
449448
.await?;
450449

451-
let len = paged_raw_solution.len();
452-
453-
// 1. Submit all solution pages.
454-
for (page, solution) in paged_raw_solution.into_iter() {
450+
// 1. Submit all pages in the batch
451+
for (page, solution) in pages_to_submit.iter() {
455452
let tx_status = submit_inner(
456453
client,
457454
signer.clone(),
458-
MultiBlockTransaction::submit_page::<T>(page, Some(solution))?,
455+
MultiBlockTransaction::submit_page::<T>(*page, Some(solution.clone()))?,
459456
nonce,
460457
)
461458
.await?;
462459

463460
txs.push(async move {
464461
match utils::wait_tx_in_block_for_strategy(tx_status, listen).await {
465462
Ok(tx) => Ok(tx),
466-
Err(_) => Err(page),
463+
Err(_) => Err(*page),
467464
}
468465
});
469466

470467
nonce += 1;
471468
}
472469

473-
// 2. Wait for all pages to be included in a block.
474-
let mut failed_pages = Vec::new();
470+
// 2. Wait for all pages in the batch to be included in a block
471+
let mut failed_pages_set = HashSet::new();
475472
let mut submitted_pages = HashSet::new();
473+
let expected_pages: HashSet<u32> = pages_to_submit.iter().map(|(page, _)| *page).collect();
476474

475+
// 3. Process all transactions
477476
while let Some(page) = txs.next().await {
478477
match page {
479478
Ok(tx) => {
480479
let hash = tx.block_hash();
481-
482-
// NOTE: It's slow to iterate over the events and that's we are
483-
// submitting all pages "at once" and several pages are submitted in the same block.
480+
// NOTE: It's slow to iterate over the events and that's the main reason why
481+
// submitting all pages "at once" with several pages submitted in the same block
482+
// is faster than a sequential or chuncked submission.
484483
let events = tx.wait_for_success().await?;
485484
for event in events.iter() {
486485
let event = event?;
@@ -500,14 +499,59 @@ where
500499
}
501500
}
502501
}
502+
// Transaction failed to be included in a block.
503+
// This happens when the transaction itself was rejected or failed
503504
Err(p) => {
504-
failed_pages.push(p);
505+
failed_pages_set.insert(p);
505506
}
506507
}
508+
}
507509

508-
if submitted_pages.len() == len {
509-
return Ok(vec![]);
510-
}
510+
// 4. Check if all expected pages were included.
511+
// This handles cases where the transaction was submitted but we didn't get confirmation.
512+
let missing_pages: HashSet<u32> = expected_pages
513+
.difference(&submitted_pages)
514+
.cloned()
515+
.collect();
516+
517+
// 5. Add missing pages to failed pages set.
518+
// This combines both kind of failures (transation not included ina block or transaction not confirmed)
519+
// into a single set of failed pages.
520+
failed_pages_set.extend(missing_pages);
521+
522+
if !failed_pages_set.is_empty() {
523+
log::warn!(
524+
target: LOG_TARGET,
525+
"Some pages were not included in blocks: {:?}",
526+
failed_pages_set
527+
);
528+
}
529+
530+
let failed_pages: Vec<u32> = failed_pages_set.into_iter().collect();
531+
532+
Ok((failed_pages, submitted_pages))
533+
}
534+
535+
/// Submit all solution pages concurrently.
536+
pub(crate) async fn inner_submit_pages_concurrent<T: MinerConfig + Send + Sync + 'static>(
537+
client: &Client,
538+
signer: &Signer,
539+
paged_raw_solution: Vec<(u32, T::Solution)>,
540+
listen: Listen,
541+
) -> Result<Vec<u32>, Error>
542+
where
543+
T::Solution: Send + Sync + 'static,
544+
T::Pages: Send + Sync + 'static,
545+
{
546+
let len = paged_raw_solution.len();
547+
548+
// Submit all pages in a single batch
549+
let (failed_pages, submitted_pages) =
550+
submit_pages_batch::<T>(client, signer, &paged_raw_solution, listen).await?;
551+
552+
// If all pages were submitted successfully, we're done
553+
if submitted_pages.len() == len {
554+
return Ok(vec![]);
511555
}
512556

513557
Ok(failed_pages)
@@ -526,6 +570,8 @@ where
526570
T::Solution: Send + Sync + 'static,
527571
T::Pages: Send + Sync + 'static,
528572
{
573+
assert!(chunk_size > 0, "Chunk size must be greater than 0");
574+
529575
let mut failed_pages = Vec::new();
530576
let mut submitted_pages = HashSet::new();
531577
let total_pages = paged_raw_solution.len();
@@ -535,70 +581,42 @@ where
535581
let chunk_end = std::cmp::min(chunk_start + chunk_size, total_pages);
536582
let chunk = &paged_raw_solution[chunk_start..chunk_end];
537583

584+
// Get the actual page numbers in this chunk for logging
585+
let chunk_page_numbers: Vec<u32> = chunk.iter().map(|(page, _)| *page).collect();
586+
538587
log::info!(
539588
target: LOG_TARGET,
540-
"Submitting chunk of pages {}-{} of {}",
541-
chunk_start,
542-
chunk_end - 1,
589+
"Submitting pages {:?} (out of {})",
590+
chunk_page_numbers,
543591
total_pages
544592
);
545593

546-
let mut txs = FuturesUnordered::new();
547-
let mut nonce = client
548-
.rpc()
549-
.system_account_next_index(signer.account_id())
550-
.await?;
594+
// Submit the current chunk
595+
let (chunk_failed_pages, chunk_submitted_pages) =
596+
submit_pages_batch::<T>(client, signer, chunk, listen).await?;
551597

552-
// Submit all pages in the current chunk
553-
for (page, solution) in chunk.iter() {
554-
let tx_status = submit_inner(
555-
client,
556-
signer.clone(),
557-
MultiBlockTransaction::submit_page::<T>(*page, Some(solution.clone()))?,
558-
nonce,
559-
)
560-
.await?;
598+
// Check if we have failed pages before extending the overall lists
599+
if !chunk_failed_pages.is_empty() {
600+
log::warn!(
601+
target: LOG_TARGET,
602+
"Pages {:?} failed to be included in blocks",
603+
chunk_failed_pages
604+
);
561605

562-
txs.push(async move {
563-
match utils::wait_tx_in_block_for_strategy(tx_status, listen).await {
564-
Ok(tx) => Ok(tx),
565-
Err(_) => Err(*page),
566-
}
567-
});
606+
// Add the failed pages from this chunk to the overall list
607+
failed_pages.extend(chunk_failed_pages);
568608

569-
nonce += 1;
609+
return Ok(failed_pages);
570610
}
571611

572-
// Wait for all pages in the current chunk to be included in a block
573-
while let Some(page) = txs.next().await {
574-
match page {
575-
Ok(tx) => {
576-
let hash = tx.block_hash();
577-
578-
let events = tx.wait_for_success().await?;
579-
for event in events.iter() {
580-
let event = event?;
581-
582-
if let Some(solution_stored) =
583-
event.as_event::<runtime::multi_block_signed::events::Stored>()?
584-
{
585-
let page = solution_stored.2;
586-
587-
log::info!(
588-
target: LOG_TARGET,
589-
"Page {page} included in block {:?}",
590-
hash
591-
);
592-
593-
submitted_pages.insert(solution_stored.2);
594-
}
595-
}
596-
}
597-
Err(p) => {
598-
failed_pages.push(p);
599-
}
600-
}
601-
}
612+
// Add submitted pages to the overall set
613+
submitted_pages.extend(chunk_submitted_pages);
614+
615+
log::info!(
616+
target: LOG_TARGET,
617+
"All pages {:?} were successfully included in blocks",
618+
chunk_page_numbers
619+
);
602620

603621
// If all pages have been submitted, we're done
604622
if submitted_pages.len() == total_pages {

0 commit comments

Comments
 (0)