Skip to content
Open
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
7d0c13d
statement-store: defer statement protocol connections during major sync
DenzelPenzel Mar 24, 2026
c8a9ed7
statement-store: major sync tests
DenzelPenzel Mar 26, 2026
a382525
Update from github-actions[bot] running command 'fmt'
github-actions[bot] Mar 26, 2026
7cedec4
statement-store: clippy
DenzelPenzel Mar 27, 2026
5a8e3f1
statement-store: fix spawn_network to support single-collator tests,
DenzelPenzel Mar 27, 2026
b87cd17
Update from github-actions[bot] running command 'prdoc --audience nod…
github-actions[bot] Mar 27, 2026
8db0b1a
statement-store: use BTreeMap for deferred_peers
DenzelPenzel Mar 31, 2026
2120c5d
statement-store: clippy
DenzelPenzel Mar 31, 2026
038e8e5
Merge branch 'master' into denzelpenzel/statement-store-loss-during-m…
DenzelPenzel Mar 31, 2026
0576365
statement-store: fix deferred peer race and extract sync transition c…
DenzelPenzel Apr 1, 2026
b0a6db0
Merge remote-tracking branch 'origin/master' into denzelpenzel/statem…
DenzelPenzel Apr 2, 2026
2934407
statement-store: address review feedback
DenzelPenzel Apr 2, 2026
acf6280
Update from github-actions[bot] running command 'fmt'
github-actions[bot] Apr 2, 2026
6e62495
statement-store: reconnect peers after major sync for statement recovery
DenzelPenzel Apr 7, 2026
20a4386
statement-store: update prdoc to reflect simplified approach
DenzelPenzel Apr 7, 2026
037cc4a
Merge remote-tracking branch 'origin/master' into denzelpenzel/statem…
DenzelPenzel Apr 10, 2026
6ffe8e7
Merge remote-tracking branch 'origin/master' into denzelpenzel/statem…
DenzelPenzel Apr 10, 2026
39dc866
statement-store: fix conflicts
DenzelPenzel Apr 10, 2026
4903081
Update from github-actions[bot] running command 'fmt'
github-actions[bot] Apr 10, 2026
85aecdc
statement-store: added the new ci matrix test
DenzelPenzel Apr 10, 2026
07e6433
Merge remote-tracking branch 'origin/denzelpenzel/statement-store-los…
DenzelPenzel Apr 10, 2026
fe16cee
statement-store: improve reconnect_statement_peers robustness and tes…
DenzelPenzel Apr 10, 2026
79890d7
statement-store: recover statements lost during major sync
DenzelPenzel Apr 13, 2026
e2118dd
statement-store: check statements
DenzelPenzel Apr 13, 2026
a8d7939
Update from github-actions[bot] running command 'fmt'
github-actions[bot] Apr 13, 2026
b69e515
statement-store: incr timeout
DenzelPenzel Apr 13, 2026
b076543
Merge branch 'denzelpenzel/statement-store-loss-during-major-sync' of…
DenzelPenzel Apr 13, 2026
fc46117
Update from github-actions[bot] running command 'fmt'
github-actions[bot] Apr 13, 2026
2af5257
statement-store: improve naming
DenzelPenzel Apr 14, 2026
c96de87
statement-store: buffer peers during major sync instead of remove+add…
DenzelPenzel Apr 16, 2026
922309e
Merge remote-tracking branch 'origin/master' into denzelpenzel/statem…
DenzelPenzel Apr 16, 2026
637f1b0
statement-store: unit test with_syncing
DenzelPenzel Apr 16, 2026
1803dab
statement-store: update prdoc
DenzelPenzel Apr 16, 2026
e09bd4c
Update from github-actions[bot] running command 'fmt'
github-actions[bot] Apr 16, 2026
0a86fbf
statement-store: recover statements dropped during major sync
DenzelPenzel Apr 16, 2026
76f2487
Merge remote-tracking branch 'origin/denzelpenzel/statement-store-los…
DenzelPenzel Apr 16, 2026
6e86175
statement-store: improve recovery integration test coverage
DenzelPenzel Apr 16, 2026
6a90c6b
Update from github-actions[bot] running command 'fmt'
github-actions[bot] Apr 16, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .github/zombienet-tests/zombienet_cumulus_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,9 @@
runner-type: "default"
cumulus-image: "test-parachain"
use-zombienet-sdk: true

- job-name: "zombienet-cumulus-0019-statement_store_recovery_after_major_sync"
test-filter: "zombie_ci::statement_store::integration::statement_store_recovery_after_major_sync"
runner-type: "default"
cumulus-image: "test-parachain"
use-zombienet-sdk: true
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ pub(super) async fn spawn_network_with_injected_allowances(
collators: &[&str],
participant_count: u32,
) -> Result<Network<LocalFileSystem>, anyhow::Error> {
assert!(collators.len() >= 2);
assert!(!collators.is_empty());
let images = zombienet_sdk::environment::get_images_from_env();

let base_dir = std::env::var("ZOMBIENET_SDK_BASE_DIR")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: Apache-2.0

use std::collections::HashSet;
use std::{cell::Cell, collections::HashSet};

use codec::Encode;
use log::{debug, info};
Expand Down Expand Up @@ -381,3 +381,120 @@ async fn statement_store_crash_mid_sync() -> Result<(), anyhow::Error> {
info!("Node crash recovery test passed");
Ok(())
}

/// Test that verifies statement recovery after major sync completes
///
/// Scenario:
/// 1. Spawn charlie only and let the relay chain advance ~10 blocks
/// 2. Submit multiple statements to charlie
/// 3. Add dave as a late joiner — dave will enter major sync because the chain has already
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have anything to proove 3 happens ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this cond confirm it

dave_logs.lines().any(|l| l.contains("Major sync complete, reconnecting")),

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But ... this confirms just that you had a major sync happening not that, you received statements while you were in major sync.

/// progressed. While dave is major syncing, its statement handler ignores incoming
/// notifications, so the initial sync batch from charlie is dropped
/// 4. Wait for dave to exit major sync
/// 5. Subscribe to statements on dave AFTER sync has ended — any arrival is therefore caused
/// exclusively by the reconnect triggered at sync completion
/// 6. Assert the statement arrives, proving that `reconnect_statement_peers` fired and triggered a
/// fresh initial sync with charlie
///
/// Without the fix (reconnect_statement_peers on sync-end), the subscription would time
/// out because nothing re-triggers the initial sync after major sync completes
#[tokio::test(flavor = "multi_thread")]
async fn statement_store_recovery_after_major_sync() -> Result<(), anyhow::Error> {
let _ = env_logger::try_init_from_env(
env_logger::Env::default().filter_or(env_logger::DEFAULT_FILTER_ENV, "info"),
);

const STATEMENT_COUNT: usize = 5;
let items = create_allowance_items(&[(
0,
StatementAllowance { max_count: STATEMENT_COUNT as u32, max_size: 1_000_000 },
)]);
let mut network = spawn_network_sudo(&["charlie", "alice"], items).await?;
Comment thread
DenzelPenzel marked this conversation as resolved.

let charlie = network.get_node("charlie")?;
let charlie_rpc = charlie.rpc().await?;

// Wait for at least 10 parachain blocks before dave joins.
// More blocks means the relay chain has also advanced further, giving dave a wider sync
// window and making it reliably enter major-sync mode for long enough that the statement
// handler's 100ms poll observes the true → false transition
let charlie_height = {
let h = Cell::new(0.0f64);
charlie
.wait_metric_with_timeout(
"block_height{status=\"best\"}",
|v| {
h.set(v);
v >= 10.0
},
180u64,
)
.await
.map_err(|_| anyhow::anyhow!("Charlie did not reach block 10 within 180s"))?;
h.get()
};
info!("Charlie is at block height {:.0} before dave joins", charlie_height);

let topic: Topic = [0u8; 32].into();
let keypair = get_keypair(0);
let statements: Vec<_> = (0..STATEMENT_COUNT as u32)
.map(|seq| create_test_statement(&keypair, &[topic], None, vec![seq as u8], u32::MAX, seq))
.collect();
let mut expected: Vec<Vec<u8>> = statements.iter().map(|s| s.encode()).collect();
expected.sort();

for stmt in &statements {
let result = submit_statement(&charlie_rpc, stmt).await?;
assert_eq!(result, SubmitResult::New);
}
info!("{} statements submitted to charlie", STATEMENT_COUNT);

// Add dave as a late-joining collator.
// Dave will enter major sync because the chain already advanced. During that window
// dave's statement handler ignores incoming notifications (is_major_syncing guard),
// so charlie's 100ms initial-sync burst fires and is silently dropped.
// When sync ends, reconnect_statement_peers removes and re-adds all peers, causing
// charlie to perform a fresh initial sync and recover the lost statements
info!("Adding dave as late-joining collator");
let dave_join_time = std::time::Instant::now();
network.add_collator("dave", Default::default(), 1004).await?;

let dave = network.get_node("dave")?;
let dave_rpc = dave.rpc().await?;

// Wait for dave to reach charlie's block height
dave.wait_metric_with_timeout("block_height{status=\"best\"}", |h| h >= charlie_height, 120u64)
.await
.map_err(|_| {
anyhow::anyhow!("Dave did not reach block height {:.0} within 120s", charlie_height)
})?;
let sync_end = dave_join_time.elapsed();
info!("Dave reached block height {:.0} after {:.1}s", charlie_height, sync_end.as_secs_f64());

// Subscribe after sync — any statements arriving are exclusively due to
// reconnect_statement_peers triggering a fresh initial sync from charlie. The subscription
// also returns statements already in dave's store as an initial batch, so we capture all
// recovered statements regardless of whether they arrive before or after the subscribe call
let mut subscription = subscribe_topic(&dave_rpc, topic).await?;
let received = expect_statements_unordered(&mut subscription, STATEMENT_COUNT, 30).await?;
let mut received_bytes: Vec<Vec<u8>> = received.into_iter().map(|b| b.to_vec()).collect();
received_bytes.sort();
assert_eq!(received_bytes, expected);
info!(
"All {} statements arrived {:.1}s after dave finished syncing",
STATEMENT_COUNT,
dave_join_time.elapsed().as_secs_f64() - sync_end.as_secs_f64()
);

// By the time all statements have arrived, reconnect_statement_peers must have already fired
// and been logged (it is what triggered the initial sync from charlie that delivered them).
// Checking logs here — after statement delivery — avoids the race where the handler hasn't
// polled yet at the moment we read logs
let dave_logs = dave.logs().await?;
assert!(
dave_logs.lines().any(|l| l.contains("Major sync complete, reconnecting")),
"reconnect_statement_peers did not fire — dave may not have entered major sync"
);

Ok(())
}
19 changes: 19 additions & 0 deletions prdoc/pr_11487.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
title: 'statement-store: reconnect peers after major sync for statement recovery'
Comment thread
DenzelPenzel marked this conversation as resolved.
Outdated
doc:
- audience: Node Dev
description: |-
# Description

Nodes drop all statements received from peers during major sync because of the `is_major_syncing()` guard.
After major sync completes, those statements are permanently lost because peers will not re-send them.

This fix detects when major sync ends and reconnects all statement protocol peers by closing and
reopening their notification substreams. This triggers bidirectional initial sync on both sides,
recovering any statements that were missed during the sync period.

The approach is minimal: only reacts to the sync-end transition with a single `was_major_syncing`
flag. No peer tracking or debounce needed since the reconnect only closes notification substreams
(not TCP connections) and is inexpensive.
crates:
- name: sc-network-statement
bump: patch
51 changes: 51 additions & 0 deletions substrate/client/network/statement/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ impl StatementHandlerPrototype {
);
}

let is_major_syncing = sync.is_major_syncing();
let handler = StatementHandler {
protocol_name: self.protocol_name,
notification_service: self.notification_service,
Expand All @@ -379,6 +380,7 @@ impl StatementHandlerPrototype {
initial_sync_timeout: Box::pin(tokio::time::sleep(INITIAL_SYNC_BURST_INTERVAL).fuse()),
pending_initial_syncs: HashMap::new(),
initial_sync_peer_queue: VecDeque::new(),
was_major_syncing: is_major_syncing,
};

Ok(handler)
Expand Down Expand Up @@ -423,6 +425,8 @@ pub struct StatementHandler<
pending_initial_syncs: HashMap<PeerId, PendingInitialSync>,
/// Queue for round-robin processing of initial syncs.
initial_sync_peer_queue: VecDeque<PeerId>,
/// Whether the node was major syncing on the last `run()` loop poll
was_major_syncing: bool,
}

/// Per-peer rate limiter using a token bucket algorithm.
Expand Down Expand Up @@ -585,6 +589,7 @@ where
initial_sync_timeout: Box::pin(pending().fuse()),
pending_initial_syncs: HashMap::new(),
initial_sync_peer_queue: VecDeque::new(),
was_major_syncing: false,
}
}

Expand Down Expand Up @@ -639,6 +644,14 @@ where
Box::pin(tokio::time::sleep(INITIAL_SYNC_BURST_INTERVAL).fuse());
},
}

// When major sync ends, reconnect all statement peers to trigger
// bidirectional initial sync and recover statements missed during sync
let currently_syncing = self.sync.is_major_syncing();
Comment thread
DenzelPenzel marked this conversation as resolved.
if self.was_major_syncing && !currently_syncing {
self.reconnect_statement_peers();
}
self.was_major_syncing = currently_syncing;
}
}

Expand Down Expand Up @@ -687,6 +700,43 @@ where
}
}

/// Close and reopen notification substreams with all connected statement peers.
/// This triggers bidirectional initial sync so both sides exchange any statements
/// that were missed during major sync
fn reconnect_statement_peers(&mut self) {
Comment thread
DenzelPenzel marked this conversation as resolved.
Outdated
let peer_ids: Vec<PeerId> = self.peers.keys().copied().collect();
if peer_ids.is_empty() {
return;
}

log::info!(
target: LOG_TARGET,
"Major sync complete, reconnecting {} statement peers for initial sync",
peer_ids.len(),
);

if let Err(err) = self
Comment thread
DenzelPenzel marked this conversation as resolved.
Outdated
.network
.remove_peers_from_reserved_set(self.protocol_name.clone(), peer_ids.clone())
{
log::warn!(target: LOG_TARGET, "Failed to remove reserved peers: {err}");
}

// Re-add all peers to trigger substream reopening and bidirectional initial sync.
// The multiaddrs are constructed from PeerIds in self.peers, which guarantees valid
// /p2p/ addresses and excludes the local peer ID
let addrs: HashSet<multiaddr::Multiaddr> = peer_ids
.into_iter()
.map(|p| {
iter::once(multiaddr::Protocol::P2p(p.into())).collect::<multiaddr::Multiaddr>()
})
.collect();
if let Err(err) = self.network.add_peers_to_reserved_set(self.protocol_name.clone(), addrs)
{
log::warn!(target: LOG_TARGET, "Failed to re-add reserved peers: {err}");
}
}

fn handle_sync_event(&mut self, event: SyncEvent) {
match event {
SyncEvent::PeerConnected(remote) => {
Expand Down Expand Up @@ -1500,6 +1550,7 @@ mod tests {
initial_sync_timeout: Box::pin(futures::future::pending()),
pending_initial_syncs: HashMap::new(),
initial_sync_peer_queue: VecDeque::new(),
was_major_syncing: false,
};
(handler, statement_store, network, notification_service, queue_receiver, peer_ids)
}
Expand Down
Loading