Skip to content
Open
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
7d0c13d
statement-store: defer statement protocol connections during major sync
DenzelPenzel Mar 24, 2026
c8a9ed7
statement-store: major sync tests
DenzelPenzel Mar 26, 2026
a382525
Update from github-actions[bot] running command 'fmt'
github-actions[bot] Mar 26, 2026
7cedec4
statement-store: clippy
DenzelPenzel Mar 27, 2026
5a8e3f1
statement-store: fix spawn_network to support single-collator tests,
DenzelPenzel Mar 27, 2026
b87cd17
Update from github-actions[bot] running command 'prdoc --audience nod…
github-actions[bot] Mar 27, 2026
8db0b1a
statement-store: use BTreeMap for deferred_peers
DenzelPenzel Mar 31, 2026
2120c5d
statement-store: clippy
DenzelPenzel Mar 31, 2026
038e8e5
Merge branch 'master' into denzelpenzel/statement-store-loss-during-m…
DenzelPenzel Mar 31, 2026
0576365
statement-store: fix deferred peer race and extract sync transition c…
DenzelPenzel Apr 1, 2026
b0a6db0
Merge remote-tracking branch 'origin/master' into denzelpenzel/statem…
DenzelPenzel Apr 2, 2026
2934407
statement-store: address review feedback
DenzelPenzel Apr 2, 2026
acf6280
Update from github-actions[bot] running command 'fmt'
github-actions[bot] Apr 2, 2026
6e62495
statement-store: reconnect peers after major sync for statement recovery
DenzelPenzel Apr 7, 2026
20a4386
statement-store: update prdoc to reflect simplified approach
DenzelPenzel Apr 7, 2026
037cc4a
Merge remote-tracking branch 'origin/master' into denzelpenzel/statem…
DenzelPenzel Apr 10, 2026
6ffe8e7
Merge remote-tracking branch 'origin/master' into denzelpenzel/statem…
DenzelPenzel Apr 10, 2026
39dc866
statement-store: fix conflicts
DenzelPenzel Apr 10, 2026
4903081
Update from github-actions[bot] running command 'fmt'
github-actions[bot] Apr 10, 2026
85aecdc
statement-store: added the new ci matrix test
DenzelPenzel Apr 10, 2026
07e6433
Merge remote-tracking branch 'origin/denzelpenzel/statement-store-los…
DenzelPenzel Apr 10, 2026
fe16cee
statement-store: improve reconnect_statement_peers robustness and tes…
DenzelPenzel Apr 10, 2026
79890d7
statement-store: recover statements lost during major sync
DenzelPenzel Apr 13, 2026
e2118dd
statement-store: check statements
DenzelPenzel Apr 13, 2026
a8d7939
Update from github-actions[bot] running command 'fmt'
github-actions[bot] Apr 13, 2026
b69e515
statement-store: incr timeout
DenzelPenzel Apr 13, 2026
b076543
Merge branch 'denzelpenzel/statement-store-loss-during-major-sync' of…
DenzelPenzel Apr 13, 2026
fc46117
Update from github-actions[bot] running command 'fmt'
github-actions[bot] Apr 13, 2026
2af5257
statement-store: improve naming
DenzelPenzel Apr 14, 2026
c96de87
statement-store: buffer peers during major sync instead of remove+add…
DenzelPenzel Apr 16, 2026
922309e
Merge remote-tracking branch 'origin/master' into denzelpenzel/statem…
DenzelPenzel Apr 16, 2026
637f1b0
statement-store: unit test with_syncing
DenzelPenzel Apr 16, 2026
1803dab
statement-store: update prdoc
DenzelPenzel Apr 16, 2026
e09bd4c
Update from github-actions[bot] running command 'fmt'
github-actions[bot] Apr 16, 2026
0a86fbf
statement-store: recover statements dropped during major sync
DenzelPenzel Apr 16, 2026
76f2487
Merge remote-tracking branch 'origin/denzelpenzel/statement-store-los…
DenzelPenzel Apr 16, 2026
6e86175
statement-store: improve recovery integration test coverage
DenzelPenzel Apr 16, 2026
6a90c6b
Update from github-actions[bot] running command 'fmt'
github-actions[bot] Apr 16, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .github/zombienet-tests/zombienet_cumulus_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,9 @@
runner-type: "default"
cumulus-image: "test-parachain"
use-zombienet-sdk: true

- job-name: "zombienet-cumulus-0019-statement_store_peer_disconnect_during_major_sync"
Comment thread
DenzelPenzel marked this conversation as resolved.
Outdated
test-filter: "zombie_ci::statement_store::integration::statement_store_peer_disconnect_during_major_sync"
runner-type: "default"
cumulus-image: "test-parachain"
use-zombienet-sdk: true
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ pub(super) async fn spawn_network_with_injected_allowances(
collators: &[&str],
participant_count: u32,
) -> Result<Network<LocalFileSystem>, anyhow::Error> {
assert!(collators.len() >= 2);
assert!(!collators.is_empty());
let images = zombienet_sdk::environment::get_images_from_env();

let base_dir = std::env::var("ZOMBIENET_SDK_BASE_DIR")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: Apache-2.0

use std::collections::HashSet;
use std::{collections::HashSet, time::Duration};

use codec::Encode;
use log::{debug, info};
use sp_core::Bytes;
use sp_statement_store::{
RejectionReason, Statement, StatementAllowance, SubmitResult, Topic, TopicFilter,
RejectionReason, Statement, StatementAllowance, StatementEvent, SubmitResult, Topic,
TopicFilter,
};
use zombienet_sdk::subxt::ext::subxt_rpcs::rpc_params;

use sc_statement_store::test_utils::{create_allowance_items, create_test_statement, get_keypair};

Expand Down Expand Up @@ -381,3 +383,148 @@ async fn statement_store_crash_mid_sync() -> Result<(), anyhow::Error> {
info!("Node crash recovery test passed");
Ok(())
}

/// Test that verifies peer connectivity and statement propagation timing during major sync
///
/// Scenario:
/// 1. Spawn charlie only, let relay chain advance ~10 blocks
/// 2. Submit a statement to charlie
/// 3. Add dave as a late joiner (will enter major sync)
/// 4. Poll system_peers on dave every 2s to track when dave connects to charlie
/// 5. Simultaneously wait for the statement to arrive on dave
/// 6. Compare timing: the statement should arrive AFTER dave finishes major sync
///
/// During major sync, peers are added to the reserved set immediately on PeerConnected,
/// but statement substreams are only effective once sync completes. When major sync ends,
/// reconnect_statement_peers removes and re-adds all peers to trigger bidirectional
/// initial sync, recovering any statements missed during the sync period
#[tokio::test(flavor = "multi_thread")]
async fn statement_store_peer_disconnect_during_major_sync() -> Result<(), anyhow::Error> {
Comment thread
DenzelPenzel marked this conversation as resolved.
Outdated
let _ = env_logger::try_init_from_env(
env_logger::Env::default().filter_or(env_logger::DEFAULT_FILTER_ENV, "info"),
);

let mut network = spawn_network_with_injected_allowances(&["charlie"], 8).await?;

let charlie = network.get_node("charlie")?;
let charlie_rpc = charlie.rpc().await?;

// Wait for relay chain to advance so dave will enter major sync
// ~60s gives ~10 relay blocks at 6s block time, enough for major sync trigger
log::info!("Waiting 60s for relay chain to advance");
tokio::time::sleep(Duration::from_secs(60)).await;

log::info!("Submitting statement to charlie");
let topic: Topic = [0u8; 32].into();
let mut statement = sp_statement_store::Statement::new();
statement.set_plain_data(vec![1, 2, 3]);
statement.set_topic(0, topic);
statement.set_expiry_from_parts(u32::MAX, 0);
let keypair = get_keypair(0);
statement.sign_sr25519_private(&keypair);
let statement_bytes: Bytes = statement.encode().into();

let result: SubmitResult = charlie_rpc
.request("statement_submit", rpc_params![statement_bytes.clone()])
.await?;
assert_eq!(result, SubmitResult::New, "Statement should be accepted by charlie");
log::info!("Statement submitted to charlie");

// Add dave as a late-joining collator
// Dave will enter major sync because the chain advanced ~10 blocks while dave was offline.
// From dave's perspective, statement substreams with charlie are established on
// PeerConnected but are not productive until major sync ends. When sync completes,
// reconnect_statement_peers triggers bidirectional initial sync to recover statements
log::info!("Adding dave as late-joining collator");
let dave_join_time = std::time::Instant::now();
network.add_collator("dave", Default::default(), 1004).await?;

let dave = network.get_node("dave")?;
let dave_rpc = dave.rpc().await?;

log::info!("Subscribing to statements on dave");
let mut subscription = dave_rpc
.subscribe::<StatementEvent>(
"statement_subscribeStatement",
rpc_params![TopicFilter::MatchAll(vec![topic].try_into().expect("Single topic"))],
"statement_unsubscribeStatement",
)
.await?;

// Wait for dave to sync and receive the statement
// Poll system_peers every second to build a peer count timeline, while also
// waiting for the statement subscription to fire
let mut peer_counts: Vec<(f64, usize)> = Vec::new();
let mut statement_received_at: Option<Duration> = None;
let max_wait = Duration::from_secs(120);

loop {
let elapsed = dave_join_time.elapsed();
if elapsed > max_wait {
panic!(
"Timed out after {:.0}s waiting for statement on dave. \
statement_received={}",
elapsed.as_secs_f64(),
statement_received_at.is_some()
);
}

// Poll system_peers on dave
let peers: Vec<serde_json::Value> =
dave_rpc.request("system_peers", rpc_params![]).await.unwrap_or_default();
let t = elapsed.as_secs_f64();
log::info!("[{:>5.1}s] dave system_peers: {} peer(s)", t, peers.len());
peer_counts.push((t, peers.len()));

if statement_received_at.is_some() {
if peer_counts.len() > 3 && peer_counts.iter().rev().take(3).all(|(_, c)| *c > 0) {
break;
}
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}

// Try to receive the statement with a 1s timeout
match tokio::time::timeout(Duration::from_secs(1), subscription.next()).await {
Ok(Some(Ok(StatementEvent::NewStatements { statements: batch, .. })))
if !batch.is_empty() =>
{
assert_eq!(batch.len(), 1, "Expected exactly one statement in batch");
assert_eq!(batch[0], statement_bytes, "Statement content mismatch");
statement_received_at = Some(elapsed);
log::info!(
">>> Statement received at {:.1}s after dave joined",
elapsed.as_secs_f64()
);
},
Ok(Some(Err(e))) => {
log::warn!("Subscription error on dave: {e}");
},
Ok(None) => {
panic!("Subscription stream closed unexpectedly on dave");
},
_ => {},
}
}

let stmt_t = statement_received_at.expect("Statement should have been received");
let peer_first_seen = peer_counts.iter().find(|(_, c)| *c > 0);

log::info!("Peer count timeline:");
for (t, count) in &peer_counts {
let marker = if stmt_t.as_secs_f64() >= *t && stmt_t.as_secs_f64() < *t + 1.5 {
" <-- statement received"
} else {
""
};
log::info!(" [{:>5.1}s] {} peer(s){}", t, count, marker);
}

if let Some((peer_t, _)) = peer_first_seen {
log::info!("First peer visible in system_peers: {:.1}s", peer_t);
} else {
log::info!("WARNING: system_peers never showed any peers (statement arrived via notification substream before system_peers poll caught it)");
}

Ok(())
}
19 changes: 19 additions & 0 deletions prdoc/pr_11487.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
title: 'statement-store: reconnect peers after major sync for statement recovery'
Comment thread
DenzelPenzel marked this conversation as resolved.
Outdated
doc:
- audience: Node Dev
description: |-
# Description

Nodes drop all statements received from peers during major sync because of the `is_major_syncing()` guard.
After major sync completes, those statements are permanently lost because peers will not re-send them.

This fix detects when major sync ends and reconnects all statement protocol peers by closing and
reopening their notification substreams. This triggers bidirectional initial sync on both sides,
recovering any statements that were missed during the sync period.

The approach is minimal: only reacts to the sync-end transition with a single `had_major_syncing`
flag. No peer tracking or debounce needed since the reconnect only closes notification substreams
(not TCP connections) and is inexpensive.
crates:
- name: sc-network-statement
bump: patch
52 changes: 52 additions & 0 deletions substrate/client/network/statement/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ impl StatementHandlerPrototype {
);
}

let is_major_syncing = sync.is_major_syncing();
let handler = StatementHandler {
protocol_name: self.protocol_name,
notification_service: self.notification_service,
Expand All @@ -379,6 +380,7 @@ impl StatementHandlerPrototype {
initial_sync_timeout: Box::pin(tokio::time::sleep(INITIAL_SYNC_BURST_INTERVAL).fuse()),
pending_initial_syncs: HashMap::new(),
initial_sync_peer_queue: VecDeque::new(),
had_major_syncing: is_major_syncing,
};

Ok(handler)
Expand Down Expand Up @@ -423,6 +425,8 @@ pub struct StatementHandler<
pending_initial_syncs: HashMap<PeerId, PendingInitialSync>,
/// Queue for round-robin processing of initial syncs.
initial_sync_peer_queue: VecDeque<PeerId>,
/// Whether the node was major syncing in the previous loop iteration
Comment thread
DenzelPenzel marked this conversation as resolved.
Outdated
had_major_syncing: bool,
Comment thread
DenzelPenzel marked this conversation as resolved.
Outdated
}

/// Per-peer rate limiter using a token bucket algorithm.
Expand Down Expand Up @@ -585,6 +589,7 @@ where
initial_sync_timeout: Box::pin(pending().fuse()),
pending_initial_syncs: HashMap::new(),
initial_sync_peer_queue: VecDeque::new(),
had_major_syncing: false,
}
}

Expand Down Expand Up @@ -639,6 +644,14 @@ where
Box::pin(tokio::time::sleep(INITIAL_SYNC_BURST_INTERVAL).fuse());
},
}

// When major sync ends, reconnect all statement peers to trigger
// bidirectional initial sync and recover statements missed during sync
let currently_syncing = self.sync.is_major_syncing();
Comment thread
DenzelPenzel marked this conversation as resolved.
if self.had_major_syncing && !currently_syncing {
self.reconnect_statement_peers();
}
self.had_major_syncing = currently_syncing;
}
}

Expand Down Expand Up @@ -687,6 +700,44 @@ where
}
}

/// Close and reopen notification substreams with all connected statement peers.
/// This triggers bidirectional initial sync so both sides exchange any statements
/// that were missed during major sync
fn reconnect_statement_peers(&mut self) {
Comment thread
DenzelPenzel marked this conversation as resolved.
Outdated
let peer_ids: Vec<PeerId> = self.peers.keys().copied().collect();
if peer_ids.is_empty() {
return;
}

log::debug!(
target: LOG_TARGET,
"Major sync complete, reconnecting {} statement peers for initial sync",
peer_ids.len(),
);

let result = self
.network
.remove_peers_from_reserved_set(self.protocol_name.clone(), peer_ids.clone());
if let Err(err) = result {
log::error!(target: LOG_TARGET, "Failed to remove reserved peers: {err}");
return;
}

// Re-add all peers to trigger substream reopening and bidirectional initial sync.
// The multiaddrs are constructed from PeerIds in self.peers, which guarantees valid
// /p2p/ addresses and excludes the local peer ID, so the batch add cannot partially fail
let addrs: HashSet<multiaddr::Multiaddr> = peer_ids
.into_iter()
.map(|p| {
iter::once(multiaddr::Protocol::P2p(p.into())).collect::<multiaddr::Multiaddr>()
})
.collect();
let result = self.network.add_peers_to_reserved_set(self.protocol_name.clone(), addrs);
if let Err(err) = result {
log::error!(target: LOG_TARGET, "Failed to re-add reserved peers: {err}");
}
}

fn handle_sync_event(&mut self, event: SyncEvent) {
match event {
SyncEvent::PeerConnected(remote) => {
Expand Down Expand Up @@ -1500,6 +1551,7 @@ mod tests {
initial_sync_timeout: Box::pin(futures::future::pending()),
pending_initial_syncs: HashMap::new(),
initial_sync_peer_queue: VecDeque::new(),
had_major_syncing: false,
};
(handler, statement_store, network, notification_service, queue_receiver, peer_ids)
}
Expand Down
Loading