Skip to content

Commit 89ad96e

Browse files
eskimorlexnv
andauthored
Enforce current relay parent to be available (#11621)
I would suggest to encode our intention directly instead of working around a symptom. This makes it much easier to reason about the code in my opinion and should have less edge cases. In particular this change will also wait for the current relay parent mid-parachain slot, which is useful as otherwise we would build on an older than expected relay parent, which could then again affect block confidence as the relay parent might be out of scope already before the collation can land on chain. Also worth mentioning: As the original PR already shows, we have the implicit assumption that the current relay parent has arrived after 1s into the relay chain slot. This seems to be the case most of the time, but not always - triggering the issue this PR is fixing. For best performance we should consider bumping the slot offset some more. If I understand correctly the error case we found was by a relay parent coming late by only a couple of milliseconds - thus 1.5s might already be plenty for the needed wait to almost never happen, but ideally we should find a good value based on data from prod. --------- Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> Co-authored-by: Alexandru Vasile <alexandru.vasile@parity.io>
1 parent de42646 commit 89ad96e

File tree

4 files changed

+401
-46
lines changed

4 files changed

+401
-46
lines changed

cumulus/client/consensus/aura/src/collators/slot_based/block_builder_task.rs

Lines changed: 195 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,7 @@ use cumulus_primitives_core::{
3939
};
4040
use cumulus_relay_chain_interface::RelayChainInterface;
4141
use futures::prelude::*;
42-
use polkadot_primitives::{
43-
Block as RelayBlock, CoreIndex, Hash as RelayHash, Header as RelayHeader, Id as ParaId,
44-
};
42+
use polkadot_primitives::{Block as RelayBlock, CoreIndex, Header as RelayHeader, Id as ParaId};
4543
use sc_client_api::{backend::AuxStore, BlockBackend, BlockOf, UsageProvider};
4644
use sc_consensus::BlockImport;
4745
use sc_consensus_aura::SlotDuration;
@@ -188,6 +186,18 @@ where
188186
collator_util::Collator::<Block, P, _, _, _, _, _>::new(params)
189187
};
190188

189+
let mut best_notifications = match relay_client.new_best_notification_stream().await {
190+
Ok(s) => s,
191+
Err(err) => {
192+
tracing::error!(
193+
target: LOG_TARGET,
194+
?err,
195+
"Failed to initialize consensus: no relay chain best block notification stream"
196+
);
197+
return;
198+
},
199+
};
200+
191201
let mut relay_chain_data_cache = RelayChainDataCache::new(relay_client.clone(), para_id);
192202
let mut connection_helper = BackingGroupConnectionHelper::new(
193203
keystore.clone(),
@@ -205,7 +215,19 @@ where
205215
return;
206216
};
207217

208-
let Ok(relay_best_hash) = relay_client.best_block_hash().await else {
218+
// Wait for the best relay block to be from the current relay
219+
// chain slot. If propagation exceeded `slot_offset`, this
220+
// blocks until a new-best notification arrives.
221+
// See: https://github.com/paritytech/polkadot-sdk/pull/11453
222+
let Some(relay_best_header) = wait_for_current_relay_block(
223+
&relay_client,
224+
&mut relay_chain_data_cache,
225+
&mut best_notifications,
226+
slot_offset,
227+
relay_chain_slot_duration,
228+
)
229+
.await
230+
else {
209231
tracing::warn!(target: crate::LOG_TARGET, "Unable to fetch latest relay chain block hash.");
210232
continue;
211233
};
@@ -221,7 +243,7 @@ where
221243

222244
let Ok(Some(rp_data)) = offset_relay_parent_find_descendants(
223245
&mut relay_chain_data_cache,
224-
relay_best_hash,
246+
relay_best_header,
225247
relay_parent_offset,
226248
)
227249
.await
@@ -507,6 +529,84 @@ fn adjust_para_to_relay_parent_slot(
507529
Some(para_slot)
508530
}
509531

532+
/// Returns `true` if the best relay chain block is from the current relay chain
533+
/// slot. Uses the wall clock adjusted by `slot_offset`.
534+
fn is_best_relay_block_current(
535+
best_relay_slot: u64,
536+
slot_offset: Duration,
537+
relay_chain_slot_duration: Duration,
538+
) -> bool {
539+
let now = super::slot_timer::duration_now().saturating_sub(slot_offset);
540+
is_best_relay_block_current_at(best_relay_slot, now, relay_chain_slot_duration)
541+
}
542+
543+
/// Pure logic for the relay block freshness check, taking the current time as
544+
/// a parameter for testability.
545+
fn is_best_relay_block_current_at(
546+
best_relay_slot: u64,
547+
now: Duration,
548+
relay_chain_slot_duration: Duration,
549+
) -> bool {
550+
let current_relay_slot = now.as_millis() as u64 / relay_chain_slot_duration.as_millis() as u64;
551+
best_relay_slot >= current_relay_slot
552+
}
553+
554+
/// Wait until the best relay chain block is from the current relay chain slot.
555+
///
556+
/// If the current best block is already current, returns its hash immediately.
557+
/// Otherwise waits for a new-best notification and re-checks. This ensures
558+
/// the collator doesn't build on a stale relay parent when relay block
559+
/// propagation exceeds `slot_offset` at a slot boundary.
560+
///
561+
/// Returns the best relay block hash, or `None` on error.
562+
pub(crate) async fn wait_for_current_relay_block<RelayClient>(
563+
relay_client: &RelayClient,
564+
relay_chain_data_cache: &mut RelayChainDataCache<RelayClient>,
565+
best_notifications: &mut (impl Stream<Item = RelayHeader> + Unpin),
566+
slot_offset: Duration,
567+
relay_chain_slot_duration: Duration,
568+
) -> Option<RelayHeader>
569+
where
570+
RelayClient: RelayChainInterface + Clone + 'static,
571+
{
572+
let relay_best_hash = relay_client.best_block_hash().await.ok()?;
573+
let mut first_best_header = Some(
574+
relay_chain_data_cache
575+
.get_mut_relay_chain_data(relay_best_hash)
576+
.await
577+
.ok()
578+
.map(|d| d.relay_parent_header.clone())?,
579+
);
580+
581+
loop {
582+
// Drain buffered notifications.
583+
while let Some(maybe_header) = best_notifications.next().now_or_never() {
584+
first_best_header = Some(maybe_header?);
585+
}
586+
587+
let best_header = match first_best_header.take() {
588+
Some(h) => h,
589+
None => best_notifications.next().await?, // Block until one arrives.
590+
};
591+
592+
let best_slot = sc_consensus_babe::find_pre_digest::<RelayBlock>(&best_header)
593+
.map(|d| d.slot())
594+
.ok()?;
595+
596+
if is_best_relay_block_current(*best_slot, slot_offset, relay_chain_slot_duration) {
597+
return Some(best_header);
598+
}
599+
600+
tracing::debug!(
601+
target: LOG_TARGET,
602+
?relay_best_hash,
603+
relay_best_num = %best_header.number(),
604+
?best_slot,
605+
"Best relay block is stale, waiting for fresh one."
606+
);
607+
}
608+
}
609+
510610
/// Finds a relay chain parent block at a specified offset from the best block, collecting its
511611
/// descendants.
512612
///
@@ -518,21 +618,13 @@ fn adjust_para_to_relay_parent_slot(
518618
/// offset, collecting all blocks in between to maintain the chain of ancestry.
519619
pub(crate) async fn offset_relay_parent_find_descendants<RelayClient>(
520620
relay_chain_data_cache: &mut RelayChainDataCache<RelayClient>,
521-
relay_best_block: RelayHash,
621+
mut relay_header: RelayHeader,
522622
relay_parent_offset: u32,
523623
) -> Result<Option<RelayParentData>, ()>
524624
where
525625
RelayClient: RelayChainInterface + Clone + 'static,
526626
{
527-
let Ok(mut relay_header) = relay_chain_data_cache
528-
.get_mut_relay_chain_data(relay_best_block)
529-
.await
530-
.map(|d| d.relay_parent_header.clone())
531-
else {
532-
tracing::error!(target: LOG_TARGET, ?relay_best_block, "Unable to fetch best relay chain block header.");
533-
return Err(());
534-
};
535-
627+
let relay_best_block = relay_header.hash();
536628
if relay_parent_offset == 0 {
537629
return Ok(Some(RelayParentData::new(relay_header)));
538630
}
@@ -668,3 +760,91 @@ pub(crate) async fn determine_core<H: HeaderT, RI: RelayChainInterface + 'static
668760
number_of_cores: cores_at_offset.len() as u16,
669761
}))
670762
}
763+
764+
#[cfg(test)]
765+
mod tests {
766+
use super::*;
767+
768+
const RELAY_SLOT_DURATION: Duration = Duration::from_secs(6);
769+
770+
/// Simulate the wall clock at a specific point within a relay slot.
771+
///
772+
/// `relay_slot` is the current relay chain slot number, `ms_into_slot` is
773+
/// how far into that slot we are (0..6000).
774+
fn now_at(relay_slot: u64, ms_into_slot: u64) -> Duration {
775+
Duration::from_millis(relay_slot * 6000 + ms_into_slot)
776+
}
777+
778+
// ---------------------------------------------------------------
779+
// Tests for `is_best_relay_block_current_at`
780+
// ---------------------------------------------------------------
781+
782+
#[test]
783+
fn best_block_in_current_slot_is_current() {
784+
// Wall clock in slot 804, best block from slot 804 → current.
785+
assert!(is_best_relay_block_current_at(804, now_at(804, 500), RELAY_SLOT_DURATION));
786+
}
787+
788+
#[test]
789+
fn best_block_in_previous_slot_is_stale() {
790+
// Wall clock in slot 805, best block from slot 804 → stale.
791+
assert!(!is_best_relay_block_current_at(804, now_at(805, 500), RELAY_SLOT_DURATION));
792+
}
793+
794+
#[test]
795+
fn the_bug_scenario_best_block_stale_at_slot_boundary() {
796+
// THE BUG: wall clock just crossed into slot 805 (17ms in),
797+
// but best relay block is still from slot 804. Stale.
798+
assert!(!is_best_relay_block_current_at(804, now_at(805, 17), RELAY_SLOT_DURATION));
799+
}
800+
801+
#[test]
802+
fn best_block_current_after_new_relay_block_arrives() {
803+
// New relay block (slot 805) arrives. Wall clock in slot 805.
804+
assert!(is_best_relay_block_current_at(805, now_at(805, 500), RELAY_SLOT_DURATION));
805+
}
806+
807+
#[test]
808+
fn best_block_from_future_slot_is_current() {
809+
// Should not happen, but must not panic.
810+
assert!(is_best_relay_block_current_at(810, now_at(805, 0), RELAY_SLOT_DURATION));
811+
}
812+
813+
#[test]
814+
fn stale_at_exact_slot_boundary() {
815+
// Exactly at the start of slot 805.
816+
// Best from 804 → stale (804 < 805).
817+
assert!(!is_best_relay_block_current_at(804, now_at(805, 0), RELAY_SLOT_DURATION));
818+
// Best from 805 → current.
819+
assert!(is_best_relay_block_current_at(805, now_at(805, 0), RELAY_SLOT_DURATION));
820+
}
821+
822+
#[test]
823+
fn current_at_end_of_slot() {
824+
// 5999ms into slot 804 — still in slot 804.
825+
// Best from 804 → current.
826+
assert!(is_best_relay_block_current_at(804, now_at(804, 5999), RELAY_SLOT_DURATION));
827+
}
828+
829+
#[test]
830+
fn no_wait_needed_during_normal_building() {
831+
// During elastic scaling in slot 804: best is from 804,
832+
// wall clock is mid-slot 804. No wait needed.
833+
for ms in (0..6000).step_by(500) {
834+
assert!(
835+
is_best_relay_block_current_at(804, now_at(804, ms), RELAY_SLOT_DURATION),
836+
"Should be current at {}ms into slot 804",
837+
ms
838+
);
839+
}
840+
}
841+
842+
#[test]
843+
fn wait_needed_when_slot_advances() {
844+
// Wall clock moves to slot 805, best still from 804.
845+
// This is the race condition — must detect as stale.
846+
assert!(!is_best_relay_block_current_at(804, now_at(805, 0), RELAY_SLOT_DURATION));
847+
assert!(!is_best_relay_block_current_at(804, now_at(805, 17), RELAY_SLOT_DURATION));
848+
assert!(!is_best_relay_block_current_at(804, now_at(805, 500), RELAY_SLOT_DURATION));
849+
}
850+
}

cumulus/client/consensus/aura/src/collators/slot_based/slot_timer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ fn compute_time_until_next_slot_change(
140140
}
141141

142142
/// Returns current duration since Unix epoch.
143-
fn duration_now() -> Duration {
143+
pub(super) fn duration_now() -> Duration {
144144
use std::time::SystemTime;
145145
let now = SystemTime::now();
146146
now.duration_since(SystemTime::UNIX_EPOCH).unwrap_or_else(|e| {

0 commit comments

Comments
 (0)