Skip to content

Commit 1cdf8b5

Browse files
committed
Add chunked submission strategy for multi-block solutions
The current implementation submits all pages concurrently, which can be fastest if all submissions succeed quickly and the network/node can handle the load efficiently. However, if an early transaction fails (e.g., nonce N+1), all subsequent transactions will fail due to the nonce gap, resulting in wasted submission attempts and potentially high load on the network/node due to retransmissions. This commit adds support for a `chunked submission strategy` where the size of the chunk can be specified in the `experimental monitor command`. If the chunk size is not specified or zero, we use the fully concurrent approach. Otherwise, we submit a chunk of N pages, wait for all of them to be successfully included in a block, and then move to the next chunk. This offers a good balance between speed and robustness/load. Note that if the chunk size equals 1, we end up with a fully sequential solution (submit 1 page - wait - submit next page, etc.). Testing against a simple zombienet setup with an 8-page solution: | Submission Strategy | Time (seconds) | |---------------------|----------------| | Fully concurrent | ~77s | | Chunked (size=4) | ~105s |
1 parent dbf0090 commit 1cdf8b5

File tree

5 files changed

+200
-13
lines changed

5 files changed

+200
-13
lines changed

README.md

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ This is a re-write of the [staking miner](https://github.com/paritytech/polkadot
77
The miner can work with two kinds of nodes:
88

99
- **multi-phase (legacy)** , targeting the multi-phase, single paged election provider pallet
10-
- **multi-block (experimental) **, targeting the multi-phase, multi-block election provider pallet.
10+
- **multi-block (experimental)**, targeting the multi-phase, multi-block election provider pallet.
1111

1212
The legacy path is intended to be temporary until all chains are migrated to multi-page staking (`asset-hub`).
1313
Once that occurs, we can remove the legacy components and rename the experimental monitor command to `monitor`.
@@ -104,6 +104,18 @@ This command is similar to the stable `monitor command` but targets the new pall
104104
polkadot-staking-miner --uri ws://127.0.0.1:9966 experimental-monitor-multi-block --seed-or-path //Alice
105105
```
106106

107+
The `--chunk-size` option controls how many solution pages are submitted concurrently. When set to 0 (default), all pages are submitted at once. When set to a positive number, pages are submitted in chunks of that size, waiting for each chunk to be included in a block before submitting the next chunk. This can help manage network load and improve reliability on congested networks or if the pages per solution increases.
108+
109+
```bash
110+
polkadot-staking-miner --uri ws://127.0.0.1:9966 experimental-monitor-multi-block --seed-or-path //Alice --chunk-size 4
111+
```
112+
113+
The `--do-reduce` option (off by default) enables solution reduction, to prevent further trimming, making submission more efficient.
114+
115+
```bash
116+
polkadot-staking-miner --uri ws://127.0.0.1:9966 experimental-monitor-multi-block --seed-or-path //Alice --do-reduce
117+
```
118+
107119
### Prepare your SEED
108120

109121
While you could pass your seed directly to the cli or Docker, this is highly **NOT** recommended. Instead, you should use an ENV variable.

src/commands/multi_block/monitor.rs

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ pub async fn monitor_cmd<T>(
3131
) -> Result<(), Error>
3232
where
3333
T: MinerConfig<AccountId = AccountId> + Send + Sync + 'static,
34-
T::Solution: Send,
34+
T::Solution: Send + Sync + 'static,
3535
T::Pages: Send + Sync + 'static,
3636
T::TargetSnapshotPerBlock: Send + Sync + 'static,
3737
T::VoterSnapshotPerBlock: Send + Sync + 'static,
@@ -49,6 +49,7 @@ where
4949
.fetch(&runtime::storage().system().account(signer.account_id()))
5050
.await?
5151
.ok_or(Error::AccountDoesNotExists)?;
52+
5253
prometheus::set_balance(account_info.data.free as f64);
5354

5455
log::info!(
@@ -135,6 +136,7 @@ where
135136
submit_lock,
136137
config.submission_strategy,
137138
config.do_reduce,
139+
config.chunk_size,
138140
)
139141
.await
140142
{
@@ -165,14 +167,15 @@ async fn process_block<T>(
165167
submit_lock: Arc<Mutex<()>>,
166168
submission_strategy: SubmissionStrategy,
167169
do_reduce: bool,
170+
chunk_size: usize,
168171
) -> Result<(), Error>
169172
where
170173
T: MinerConfig<AccountId = AccountId> + Send + Sync + 'static,
171-
T::Solution: Send,
172-
T::Pages: Send,
174+
T::Solution: Send + Sync + 'static,
175+
T::Pages: Send + Sync + 'static,
173176
T::TargetSnapshotPerBlock: Send,
174177
T::VoterSnapshotPerBlock: Send,
175-
T::MaxVotesPerVoter: Send,
178+
T::MaxVotesPerVoter: Send + Sync + 'static,
176179
{
177180
let BlockDetails {
178181
storage,
@@ -268,7 +271,25 @@ where
268271
missing_pages.push((page, solution));
269272
}
270273

271-
dynamic::inner_submit_pages::<T>(&client, &signer, missing_pages, listen).await?;
274+
// Use the appropriate submission method based on chunk_size
275+
if chunk_size == 0 {
276+
dynamic::inner_submit_pages_concurrent::<T>(
277+
&client,
278+
&signer,
279+
missing_pages,
280+
listen,
281+
)
282+
.await?;
283+
} else {
284+
dynamic::inner_submit_pages_chunked::<T>(
285+
&client,
286+
&signer,
287+
missing_pages,
288+
listen,
289+
chunk_size,
290+
)
291+
.await?;
292+
}
272293
return Ok(());
273294
}
274295

@@ -301,7 +322,7 @@ where
301322
prometheus::set_score(paged_raw_solution.score);
302323

303324
// 7. Submit the score and solution to the chain.
304-
match dynamic::submit(&client, &signer, paged_raw_solution, listen)
325+
match dynamic::submit(&client, &signer, paged_raw_solution, listen, chunk_size)
305326
.timed()
306327
.await
307328
{

src/commands/types.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,4 +185,11 @@ pub struct ExperimentalMultiBlockMonitorConfig {
185185
/// Reduce the solution to prevent further trimming.
186186
#[clap(long, default_value_t = false)]
187187
pub do_reduce: bool,
188+
189+
/// Chunk size for submitting solution pages. If not specified or equal to zero,
190+
/// all pages will be submitted concurrently. Otherwise, pages will be submitted in chunks
191+
/// of the specified size, waiting for each chunk to be included in a block before
192+
/// submitting the next chunk.
193+
#[clap(long, default_value_t = 0)]
194+
pub chunk_size: usize,
188195
}

src/dynamic/multi_block.rs

Lines changed: 123 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,12 @@ pub(crate) async fn submit<T: MinerConfig + Send + Sync + 'static>(
335335
signer: &Signer,
336336
mut paged_raw_solution: PagedRawSolution<T>,
337337
listen: Listen,
338-
) -> Result<(), Error> {
338+
chunk_size: usize,
339+
) -> Result<(), Error>
340+
where
341+
T::Solution: Send + Sync + 'static,
342+
T::Pages: Send + Sync + 'static,
343+
{
339344
let mut i = 0;
340345
let tx_status = loop {
341346
let nonce = client
@@ -384,8 +389,14 @@ pub(crate) async fn submit<T: MinerConfig + Send + Sync + 'static>(
384389
.map(|(page, solution)| (page as u32, solution.clone()))
385390
.collect::<Vec<_>>();
386391

387-
// 2. Submit all solution pages.
388-
let failed_pages = inner_submit_pages::<T>(client, signer, solutions, listen).await?;
392+
// 2. Submit all solution pages using the appropriate strategy based on chunk_size
393+
let failed_pages = if chunk_size == 0 {
394+
// Use fully concurrent submission
395+
inner_submit_pages_concurrent::<T>(client, signer, solutions, listen).await?
396+
} else {
397+
// Use chunked concurrent submission
398+
inner_submit_pages_chunked::<T>(client, signer, solutions, listen, chunk_size).await?
399+
};
389400

390401
// 3. All pages were submitted successfully, we are done.
391402
if failed_pages.is_empty() {
@@ -405,7 +416,12 @@ pub(crate) async fn submit<T: MinerConfig + Send + Sync + 'static>(
405416
solutions.push((page, solution));
406417
}
407418

408-
let failed_pages = inner_submit_pages::<T>(client, signer, solutions, listen).await?;
419+
// Retry with the same strategy as the initial submission
420+
let failed_pages = if chunk_size == 0 {
421+
inner_submit_pages_concurrent::<T>(client, signer, solutions, listen).await?
422+
} else {
423+
inner_submit_pages_chunked::<T>(client, signer, solutions, listen, chunk_size).await?
424+
};
409425

410426
if failed_pages.is_empty() {
411427
Ok(())
@@ -414,12 +430,17 @@ pub(crate) async fn submit<T: MinerConfig + Send + Sync + 'static>(
414430
}
415431
}
416432

417-
pub(crate) async fn inner_submit_pages<T: MinerConfig + Send + Sync + 'static>(
433+
/// Submit all solution pages concurrently.
434+
pub(crate) async fn inner_submit_pages_concurrent<T: MinerConfig + Send + Sync + 'static>(
418435
client: &Client,
419436
signer: &Signer,
420437
paged_raw_solution: Vec<(u32, T::Solution)>,
421438
listen: Listen,
422-
) -> Result<Vec<u32>, Error> {
439+
) -> Result<Vec<u32>, Error>
440+
where
441+
T::Solution: Send + Sync + 'static,
442+
T::Pages: Send + Sync + 'static,
443+
{
423444
let mut txs = FuturesUnordered::new();
424445

425446
let mut nonce = client
@@ -491,3 +512,99 @@ pub(crate) async fn inner_submit_pages<T: MinerConfig + Send + Sync + 'static>(
491512

492513
Ok(failed_pages)
493514
}
515+
516+
/// Submit solution pages in chunks, waiting for each chunk to be included in a block
517+
/// before submitting the next chunk.
518+
pub(crate) async fn inner_submit_pages_chunked<T: MinerConfig + Send + Sync + 'static>(
519+
client: &Client,
520+
signer: &Signer,
521+
paged_raw_solution: Vec<(u32, T::Solution)>,
522+
listen: Listen,
523+
chunk_size: usize,
524+
) -> Result<Vec<u32>, Error>
525+
where
526+
T::Solution: Send + Sync + 'static,
527+
T::Pages: Send + Sync + 'static,
528+
{
529+
let mut failed_pages = Vec::new();
530+
let mut submitted_pages = HashSet::new();
531+
let total_pages = paged_raw_solution.len();
532+
533+
// Process pages in chunks
534+
for chunk_start in (0..total_pages).step_by(chunk_size) {
535+
let chunk_end = std::cmp::min(chunk_start + chunk_size, total_pages);
536+
let chunk = &paged_raw_solution[chunk_start..chunk_end];
537+
538+
log::info!(
539+
target: LOG_TARGET,
540+
"Submitting chunk of pages {}-{} of {}",
541+
chunk_start,
542+
chunk_end - 1,
543+
total_pages
544+
);
545+
546+
let mut txs = FuturesUnordered::new();
547+
let mut nonce = client
548+
.rpc()
549+
.system_account_next_index(signer.account_id())
550+
.await?;
551+
552+
// Submit all pages in the current chunk
553+
for (page, solution) in chunk.iter() {
554+
let tx_status = submit_inner(
555+
client,
556+
signer.clone(),
557+
MultiBlockTransaction::submit_page::<T>(*page, Some(solution.clone()))?,
558+
nonce,
559+
)
560+
.await?;
561+
562+
txs.push(async move {
563+
match utils::wait_tx_in_block_for_strategy(tx_status, listen).await {
564+
Ok(tx) => Ok(tx),
565+
Err(_) => Err(*page),
566+
}
567+
});
568+
569+
nonce += 1;
570+
}
571+
572+
// Wait for all pages in the current chunk to be included in a block
573+
while let Some(page) = txs.next().await {
574+
match page {
575+
Ok(tx) => {
576+
let hash = tx.block_hash();
577+
578+
let events = tx.wait_for_success().await?;
579+
for event in events.iter() {
580+
let event = event?;
581+
582+
if let Some(solution_stored) =
583+
event.as_event::<runtime::multi_block_signed::events::Stored>()?
584+
{
585+
let page = solution_stored.2;
586+
587+
log::info!(
588+
target: LOG_TARGET,
589+
"Page {page} included in block {:?}",
590+
hash
591+
);
592+
593+
submitted_pages.insert(solution_stored.2);
594+
}
595+
}
596+
}
597+
Err(p) => {
598+
failed_pages.push(p);
599+
}
600+
}
601+
}
602+
603+
// If all pages have been submitted, we're done
604+
if submitted_pages.len() == total_pages {
605+
return Ok(vec![]);
606+
}
607+
}
608+
609+
Ok(failed_pages)
610+
}

src/main.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -397,6 +397,7 @@ mod tests {
397397
listen: Listen::Finalized, // Assuming default
398398
submission_strategy: SubmissionStrategy::IfLeading, // Assuming default
399399
do_reduce: true, // Expect true because flag was present
400+
chunk_size: 0, // Default value
400401
}
401402
),
402403
}
@@ -424,6 +425,35 @@ mod tests {
424425
listen: Listen::Finalized,
425426
submission_strategy: SubmissionStrategy::IfLeading,
426427
do_reduce: false, // Expect false (default)
428+
chunk_size: 0, // Default value
429+
}
430+
)
431+
);
432+
}
433+
434+
#[test]
435+
fn cli_experimental_monitor_multi_block_with_chunk_size_works() {
436+
let opt = Opt::try_parse_from([
437+
env!("CARGO_PKG_NAME"),
438+
"--uri",
439+
"hi",
440+
"experimental-monitor-multi-block",
441+
"--seed-or-path",
442+
"//Alice",
443+
"--chunk-size",
444+
"4",
445+
])
446+
.unwrap();
447+
448+
assert_eq!(
449+
opt.command,
450+
Command::ExperimentalMonitorMultiBlock(
451+
commands::types::ExperimentalMultiBlockMonitorConfig {
452+
seed_or_path: "//Alice".to_string(),
453+
listen: Listen::Finalized,
454+
submission_strategy: SubmissionStrategy::IfLeading,
455+
do_reduce: false, // Default value
456+
chunk_size: 4, // Explicitly set to 4
427457
}
428458
)
429459
);

0 commit comments

Comments
 (0)