Skip to content

Fix invalid messages relay delivery transactions #1940

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Mar 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 96 additions & 14 deletions relays/messages/src/message_race_delivery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ struct MessageDeliveryStrategy<P: MessageLane, SC, TC> {
/// Latest confirmed nonces at the source client + the header id where we have first met this
/// nonce.
latest_confirmed_nonces_at_source: VecDeque<(SourceHeaderIdOf<P>, MessageNonce)>,
/// Target nonces from the source client.
/// Target nonces available at the **best** block of the target chain.
target_nonces: Option<TargetClientNonces<DeliveryRaceTargetNoncesData>>,
/// Basic delivery strategy.
strategy: MessageDeliveryStrategyBase<P>,
Expand Down Expand Up @@ -387,13 +387,11 @@ where
race_state: &mut RaceState<SourceHeaderIdOf<P>, TargetHeaderIdOf<P>, P::MessagesProof>,
) {
// best target nonces must always be ge than finalized target nonces
let mut target_nonces = self.target_nonces.take().unwrap_or_else(|| nonces.clone());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change here and similar change in the other file comes from the #719, but IIUC this (cmp::max) should only happen in the finalized version of those methods.

target_nonces.nonces_data = nonces.nonces_data.clone();
target_nonces.latest_nonce = std::cmp::max(target_nonces.latest_nonce, nonces.latest_nonce);
self.target_nonces = Some(target_nonces);
let latest_nonce = nonces.latest_nonce;
self.target_nonces = Some(nonces);

self.strategy.best_target_nonces_updated(
TargetClientNonces { latest_nonce: nonces.latest_nonce, nonces_data: () },
TargetClientNonces { latest_nonce, nonces_data: () },
race_state,
)
}
Expand Down Expand Up @@ -432,14 +430,16 @@ where
&self,
race_state: RaceState<SourceHeaderIdOf<P>, TargetHeaderIdOf<P>, P::MessagesProof>,
) -> Option<(RangeInclusive<MessageNonce>, Self::ProofParameters)> {
let best_target_nonce = self.strategy.best_at_target()?;
let best_finalized_source_header_id_at_best_target =
race_state.best_finalized_source_header_id_at_best_target.clone()?;
let latest_confirmed_nonce_at_source = self
.latest_confirmed_nonces_at_source
.iter()
.take_while(|(id, _)| id.0 <= best_finalized_source_header_id_at_best_target.0)
.last()
.map(|(_, nonce)| *nonce)?;
.map(|(_, nonce)| *nonce)
.unwrap_or(best_target_nonce);
let target_nonces = self.target_nonces.as_ref()?;

// There's additional condition in the message delivery race: target would reject messages
Expand Down Expand Up @@ -529,8 +529,8 @@ where
let lane_source_client = self.lane_source_client.clone();
let lane_target_client = self.lane_target_client.clone();

let maximal_source_queue_index =
self.strategy.maximal_available_source_queue_index(race_state)?;
let available_source_queue_indices =
self.strategy.available_source_queue_indices(race_state)?;
let source_queue = self.strategy.source_queue();

let reference = RelayMessagesBatchReference {
Expand All @@ -539,15 +539,13 @@ where
max_messages_size_in_single_batch,
lane_source_client: lane_source_client.clone(),
lane_target_client: lane_target_client.clone(),
best_target_nonce,
nonces_queue: source_queue.clone(),
nonces_queue_range: 0..maximal_source_queue_index + 1,
nonces_queue_range: available_source_queue_indices,
metrics: self.metrics_msg.clone(),
};

let range_end = MessageRaceLimits::decide(reference).await?;

let range_begin = source_queue[0].1.begin();
let selected_nonces = range_begin..=range_end;
let selected_nonces = MessageRaceLimits::decide(reference).await?;
let dispatch_weight = self.dispatch_weight_for_range(&selected_nonces);

Some((
Expand Down Expand Up @@ -1057,4 +1055,88 @@ mod tests {
);
assert_eq!(strategy.required_source_header_at_target(&source_header_id), None);
}

#[async_std::test]
async fn previous_nonces_are_selected_if_reorg_happens_at_target_chain() {
// this is the copy of the similar test in the `mesage_race_strategy.rs`, but it also tests
// that the `MessageDeliveryStrategy` acts properly in the similar scenario

// tune parameters to allow 5 nonces per delivery transaction
let (mut state, mut strategy) = prepare_strategy();
strategy.max_unrewarded_relayer_entries_at_target = 5;
strategy.max_unconfirmed_nonces_at_target = 5;
strategy.max_messages_in_single_batch = 5;
strategy.max_messages_weight_in_single_batch = Weight::from_parts(5, 0);
strategy.max_messages_size_in_single_batch = 5;

// in this state we have 4 available nonces for delivery
assert_eq!(
strategy.select_nonces_to_deliver(state.clone()).await,
Some((
20..=23,
MessageProofParameters {
outbound_state_proof_required: false,
dispatch_weight: Weight::from_parts(4, 0),
}
)),
);

// let's say we have submitted 20..=23
state.nonces_submitted = Some(20..=23);

// then new nonce 24 appear at the source block 2
let new_nonce_24 = vec![(
24,
MessageDetails { dispatch_weight: Weight::from_parts(1, 0), size: 0, reward: 0 },
)]
.into_iter()
.collect();
let source_header_2 = header_id(2);
state.best_finalized_source_header_id_at_source = Some(source_header_2);
strategy.source_nonces_updated(
source_header_2,
SourceClientNonces { new_nonces: new_nonce_24, confirmed_nonce: None },
);
// and nonce 23 appear at the best block of the target node (best finalized still has 0
// nonces)
let target_nonces_data = DeliveryRaceTargetNoncesData {
confirmed_nonce: 19,
unrewarded_relayers: UnrewardedRelayersState::default(),
};
let target_header_2 = header_id(2);
state.best_target_header_id = Some(target_header_2);
strategy.best_target_nonces_updated(
TargetClientNonces { latest_nonce: 23, nonces_data: target_nonces_data.clone() },
&mut state,
);

// then best target header is retracted
strategy.best_target_nonces_updated(
TargetClientNonces { latest_nonce: 19, nonces_data: target_nonces_data.clone() },
&mut state,
);

// ... and some fork with 19 delivered nonces is finalized
let target_header_2_fork = header_id(2_1);
state.best_finalized_source_header_id_at_source = Some(source_header_2);
state.best_finalized_source_header_id_at_best_target = Some(source_header_2);
state.best_target_header_id = Some(target_header_2_fork);
state.best_finalized_target_header_id = Some(target_header_2_fork);
strategy.finalized_target_nonces_updated(
TargetClientNonces { latest_nonce: 19, nonces_data: target_nonces_data.clone() },
&mut state,
);

// now we have to select nonces 20..=23 for delivery again
assert_eq!(
strategy.select_nonces_to_deliver(state.clone()).await,
Some((
20..=24,
MessageProofParameters {
outbound_state_proof_required: false,
dispatch_weight: Weight::from_parts(5, 0),
}
)),
);
}
}
20 changes: 13 additions & 7 deletions relays/messages/src/message_race_limits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
//! enforcement strategy

use num_traits::Zero;
use std::ops::Range;
use std::ops::RangeInclusive;

use bp_messages::{MessageNonce, Weight};

Expand Down Expand Up @@ -76,14 +76,17 @@ pub struct RelayMessagesBatchReference<
pub lane_target_client: TargetClient,
/// Metrics reference.
pub metrics: Option<MessageLaneLoopMetrics>,
/// Best available nonce at the **best** target block. We do not want to deliver nonces
/// less than this nonce, even though the block may be retracted.
pub best_target_nonce: MessageNonce,
/// Source queue.
pub nonces_queue: SourceRangesQueue<
P::SourceHeaderHash,
P::SourceHeaderNumber,
MessageDetailsMap<P::SourceChainBalance>,
>,
/// Source queue range
pub nonces_queue_range: Range<usize>,
/// Range of indices within the `nonces_queue` that are available for selection.
pub nonces_queue_range: RangeInclusive<usize>,
}

/// Limits of the message race transactions.
Expand All @@ -97,14 +100,16 @@ impl MessageRaceLimits {
TargetClient: MessageLaneTargetClient<P>,
>(
reference: RelayMessagesBatchReference<P, SourceClient, TargetClient>,
) -> Option<MessageNonce> {
) -> Option<RangeInclusive<MessageNonce>> {
let mut hard_selected_count = 0;

let mut selected_weight = Weight::zero();
let mut selected_count: MessageNonce = 0;

let hard_selected_begin_nonce =
reference.nonces_queue[reference.nonces_queue_range.start].1.begin();
let hard_selected_begin_nonce = std::cmp::max(
reference.best_target_nonce + 1,
reference.nonces_queue[*reference.nonces_queue_range.start()].1.begin(),
);

// relay reference
let mut relay_reference = RelayReference {
Expand All @@ -129,6 +134,7 @@ impl MessageRaceLimits {
.nonces_queue
.range(reference.nonces_queue_range.clone())
.flat_map(|(_, ready_nonces)| ready_nonces.iter())
.filter(|(nonce, _)| **nonce >= hard_selected_begin_nonce)
.enumerate();
for (index, (nonce, details)) in all_ready_nonces {
relay_reference.index = index;
Expand Down Expand Up @@ -192,7 +198,7 @@ impl MessageRaceLimits {
if hard_selected_count != 0 {
let selected_max_nonce =
hard_selected_begin_nonce + hard_selected_count as MessageNonce - 1;
Some(selected_max_nonce)
Some(hard_selected_begin_nonce..=selected_max_nonce)
} else {
None
}
Expand Down
Loading