-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Statement-store: defer statement protocol connections during major sync #11487
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 28 commits
7d0c13d
c8a9ed7
a382525
7cedec4
5a8e3f1
b87cd17
8db0b1a
2120c5d
038e8e5
0576365
b0a6db0
2934407
acf6280
6e62495
20a4386
037cc4a
6ffe8e7
39dc866
4903081
85aecdc
07e6433
fe16cee
79890d7
e2118dd
a8d7939
b69e515
b076543
fc46117
2af5257
c96de87
922309e
637f1b0
1803dab
e09bd4c
0a86fbf
76f2487
6e86175
6a90c6b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||
|---|---|---|---|---|
| @@ -1,7 +1,7 @@ | ||||
| // Copyright (C) Parity Technologies (UK) Ltd. | ||||
| // SPDX-License-Identifier: Apache-2.0 | ||||
|
|
||||
| use std::collections::HashSet; | ||||
| use std::{cell::Cell, collections::HashSet}; | ||||
|
|
||||
| use codec::Encode; | ||||
| use log::{debug, info}; | ||||
|
|
@@ -381,3 +381,120 @@ async fn statement_store_crash_mid_sync() -> Result<(), anyhow::Error> { | |||
| info!("Node crash recovery test passed"); | ||||
| Ok(()) | ||||
| } | ||||
|
|
||||
| /// Test that verifies statement recovery after major sync completes | ||||
| /// | ||||
| /// Scenario: | ||||
| /// 1. Spawn charlie only and let the relay chain advance ~10 blocks | ||||
| /// 2. Submit multiple statements to charlie | ||||
| /// 3. Add dave as a late joiner — dave will enter major sync because the chain has already | ||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you have anything to proove 3 happens ?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, this cond confirm it polkadot-sdk/cumulus/zombienet/zombienet-sdk/tests/zombie_ci/statement_store/integration.rs Line 495 in fc46117
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But ... this confirms just that you had a major sync happening not that, you received statements while you were in major sync. |
||||
| /// progressed. While dave is major syncing, its statement handler ignores incoming | ||||
| /// notifications, so the initial sync batch from charlie is dropped | ||||
| /// 4. Wait for dave to exit major sync | ||||
| /// 5. Subscribe to statements on dave AFTER sync has ended — any arrival is therefore caused | ||||
| /// exclusively by the reconnect triggered at sync completion | ||||
| /// 6. Assert the statement arrives, proving that `reconnect_statement_peers` fired and triggered a | ||||
| /// fresh initial sync with charlie | ||||
| /// | ||||
| /// Without the fix (reconnect_statement_peers on sync-end), the subscription would time | ||||
| /// out because nothing re-triggers the initial sync after major sync completes | ||||
| #[tokio::test(flavor = "multi_thread")] | ||||
| async fn statement_store_peer_disconnect_during_major_sync() -> Result<(), anyhow::Error> { | ||||
|
DenzelPenzel marked this conversation as resolved.
Outdated
|
||||
| let _ = env_logger::try_init_from_env( | ||||
| env_logger::Env::default().filter_or(env_logger::DEFAULT_FILTER_ENV, "info"), | ||||
| ); | ||||
|
|
||||
| const STATEMENT_COUNT: usize = 5; | ||||
| let items = create_allowance_items(&[( | ||||
| 0, | ||||
| StatementAllowance { max_count: STATEMENT_COUNT as u32, max_size: 1_000_000 }, | ||||
| )]); | ||||
| let mut network = spawn_network_sudo(&["charlie", "alice"], items).await?; | ||||
|
DenzelPenzel marked this conversation as resolved.
|
||||
|
|
||||
| let charlie = network.get_node("charlie")?; | ||||
| let charlie_rpc = charlie.rpc().await?; | ||||
|
|
||||
| // Wait for at least 10 parachain blocks before dave joins. | ||||
| // More blocks means the relay chain has also advanced further, giving dave a wider sync | ||||
| // window and making it reliably enter major-sync mode for long enough that the statement | ||||
| // handler's 100ms poll observes the true → false transition | ||||
| let charlie_height = { | ||||
| let h = Cell::new(0.0f64); | ||||
| charlie | ||||
| .wait_metric_with_timeout( | ||||
| "block_height{status=\"best\"}", | ||||
| |v| { | ||||
| h.set(v); | ||||
| v >= 10.0 | ||||
| }, | ||||
| 180u64, | ||||
| ) | ||||
| .await | ||||
| .map_err(|_| anyhow::anyhow!("Charlie did not reach block 10 within 180s"))?; | ||||
| h.get() | ||||
| }; | ||||
| info!("Charlie is at block height {:.0} before dave joins", charlie_height); | ||||
|
|
||||
| let topic: Topic = [0u8; 32].into(); | ||||
| let keypair = get_keypair(0); | ||||
| let statements: Vec<_> = (0..STATEMENT_COUNT as u32) | ||||
| .map(|seq| create_test_statement(&keypair, &[topic], None, vec![seq as u8], u32::MAX, seq)) | ||||
| .collect(); | ||||
| let mut expected: Vec<Vec<u8>> = statements.iter().map(|s| s.encode()).collect(); | ||||
| expected.sort(); | ||||
|
|
||||
| for stmt in &statements { | ||||
| let result = submit_statement(&charlie_rpc, stmt).await?; | ||||
| assert_eq!(result, SubmitResult::New); | ||||
| } | ||||
| info!("{} statements submitted to charlie", STATEMENT_COUNT); | ||||
|
|
||||
| // Add dave as a late-joining collator. | ||||
| // Dave will enter major sync because the chain already advanced. During that window | ||||
| // dave's statement handler ignores incoming notifications (is_major_syncing guard), | ||||
| // so charlie's 100ms initial-sync burst fires and is silently dropped. | ||||
| // When sync ends, reconnect_statement_peers removes and re-adds all peers, causing | ||||
| // charlie to perform a fresh initial sync and recover the lost statements | ||||
| info!("Adding dave as late-joining collator"); | ||||
| let dave_join_time = std::time::Instant::now(); | ||||
| network.add_collator("dave", Default::default(), 1004).await?; | ||||
|
|
||||
| let dave = network.get_node("dave")?; | ||||
| let dave_rpc = dave.rpc().await?; | ||||
|
|
||||
| // Wait for dave to reach charlie's block height | ||||
| dave.wait_metric_with_timeout("block_height{status=\"best\"}", |h| h >= charlie_height, 120u64) | ||||
| .await | ||||
| .map_err(|_| { | ||||
| anyhow::anyhow!("Dave did not reach block height {:.0} within 120s", charlie_height) | ||||
| })?; | ||||
| let sync_end = dave_join_time.elapsed(); | ||||
| info!("Dave reached block height {:.0} after {:.1}s", charlie_height, sync_end.as_secs_f64()); | ||||
|
|
||||
| // Subscribe after sync — any statements arriving are exclusively due to | ||||
| // reconnect_statement_peers triggering a fresh initial sync from charlie. The subscription | ||||
| // also returns statements already in dave's store as an initial batch, so we capture all | ||||
| // recovered statements regardless of whether they arrive before or after the subscribe call | ||||
| let mut subscription = subscribe_topic(&dave_rpc, topic).await?; | ||||
| let received = expect_statements_unordered(&mut subscription, STATEMENT_COUNT, 30).await?; | ||||
| let mut received_bytes: Vec<Vec<u8>> = received.into_iter().map(|b| b.to_vec()).collect(); | ||||
| received_bytes.sort(); | ||||
| assert_eq!(received_bytes, expected); | ||||
| info!( | ||||
| "All {} statements arrived {:.1}s after dave finished syncing", | ||||
| STATEMENT_COUNT, | ||||
| dave_join_time.elapsed().as_secs_f64() - sync_end.as_secs_f64() | ||||
| ); | ||||
|
|
||||
| // By the time all statements have arrived, reconnect_statement_peers must have already fired | ||||
| // and been logged (it is what triggered the initial sync from charlie that delivered them). | ||||
| // Checking logs here — after statement delivery — avoids the race where the handler hasn't | ||||
| // polled yet at the moment we read logs | ||||
| let dave_logs = dave.logs().await?; | ||||
| assert!( | ||||
| dave_logs.lines().any(|l| l.contains("Major sync complete, reconnecting")), | ||||
| "reconnect_statement_peers did not fire — dave may not have entered major sync" | ||||
| ); | ||||
|
|
||||
| Ok(()) | ||||
| } | ||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,19 @@ | ||
| title: 'statement-store: reconnect peers after major sync for statement recovery' | ||
|
DenzelPenzel marked this conversation as resolved.
Outdated
|
||
| doc: | ||
| - audience: Node Dev | ||
| description: |- | ||
| # Description | ||
|
|
||
| Nodes drop all statements received from peers during major sync because of the `is_major_syncing()` guard. | ||
| After major sync completes, those statements are permanently lost because peers will not re-send them. | ||
|
|
||
| This fix detects when major sync ends and reconnects all statement protocol peers by closing and | ||
| reopening their notification substreams. This triggers bidirectional initial sync on both sides, | ||
| recovering any statements that were missed during the sync period. | ||
|
|
||
| The approach is minimal: only reacts to the sync-end transition with a single `had_major_syncing` | ||
| flag. No peer tracking or debounce needed since the reconnect only closes notification substreams | ||
| (not TCP connections) and is inexpensive. | ||
| crates: | ||
| - name: sc-network-statement | ||
| bump: patch | ||
Uh oh!
There was an error while loading. Please reload this page.