Skip to content

Commit 883adc5

Browse files
authored
Keep multiple latest confirmed nonces at source in messages relay (paritytech#719)
* keep multiple latest confirmed nonces at source in messages relay * post-merge fix
1 parent 93d4d36 commit 883adc5

File tree

4 files changed

+275
-82
lines changed

4 files changed

+275
-82
lines changed

bridges/relays/messages-relay/src/message_race_delivery.rs

Lines changed: 142 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,12 @@ use async_trait::async_trait;
2929
use bp_message_lane::{MessageNonce, UnrewardedRelayersState, Weight};
3030
use futures::stream::FusedStream;
3131
use relay_utils::FailedClient;
32-
use std::{collections::BTreeMap, marker::PhantomData, ops::RangeInclusive, time::Duration};
32+
use std::{
33+
collections::{BTreeMap, VecDeque},
34+
marker::PhantomData,
35+
ops::RangeInclusive,
36+
time::Duration,
37+
};
3338

3439
/// Run message delivery race.
3540
pub async fn run<P: MessageLane>(
@@ -61,7 +66,7 @@ pub async fn run<P: MessageLane>(
6166
max_messages_in_single_batch: params.max_messages_in_single_batch,
6267
max_messages_weight_in_single_batch: params.max_messages_weight_in_single_batch,
6368
max_messages_size_in_single_batch: params.max_messages_size_in_single_batch,
64-
latest_confirmed_nonce_at_source: None,
69+
latest_confirmed_nonces_at_source: VecDeque::new(),
6570
target_nonces: None,
6671
strategy: BasicStrategy::new(),
6772
},
@@ -164,14 +169,17 @@ where
164169
async fn nonces(
165170
&self,
166171
at_block: TargetHeaderIdOf<P>,
172+
update_metrics: bool,
167173
) -> Result<(TargetHeaderIdOf<P>, TargetClientNonces<DeliveryRaceTargetNoncesData>), Self::Error> {
168174
let (at_block, latest_received_nonce) = self.client.latest_received_nonce(at_block).await?;
169175
let (at_block, latest_confirmed_nonce) = self.client.latest_confirmed_received_nonce(at_block).await?;
170176
let (at_block, unrewarded_relayers) = self.client.unrewarded_relayers_state(at_block).await?;
171177

172-
if let Some(metrics_msg) = self.metrics_msg.as_ref() {
173-
metrics_msg.update_target_latest_received_nonce::<P>(latest_received_nonce);
174-
metrics_msg.update_target_latest_confirmed_nonce::<P>(latest_confirmed_nonce);
178+
if update_metrics {
179+
if let Some(metrics_msg) = self.metrics_msg.as_ref() {
180+
metrics_msg.update_target_latest_received_nonce::<P>(latest_received_nonce);
181+
metrics_msg.update_target_latest_confirmed_nonce::<P>(latest_confirmed_nonce);
182+
}
175183
}
176184

177185
Ok((
@@ -221,8 +229,8 @@ struct MessageDeliveryStrategy<P: MessageLane> {
221229
max_messages_weight_in_single_batch: Weight,
222230
/// Maximal messages size in the single delivery transaction.
223231
max_messages_size_in_single_batch: usize,
224-
/// Latest confirmed nonce at the source client.
225-
latest_confirmed_nonce_at_source: Option<MessageNonce>,
232+
/// Latest confirmed nonces at the source client + the header id where we have first met this nonce.
233+
latest_confirmed_nonces_at_source: VecDeque<(SourceHeaderIdOf<P>, MessageNonce)>,
226234
/// Target nonces from the source client.
227235
target_nonces: Option<TargetClientNonces<DeliveryRaceTargetNoncesData>>,
228236
/// Basic delivery strategy.
@@ -259,8 +267,8 @@ impl<P: MessageLane> std::fmt::Debug for MessageDeliveryStrategy<P> {
259267
&self.max_messages_size_in_single_batch,
260268
)
261269
.field(
262-
"latest_confirmed_noncs_at_source",
263-
&self.latest_confirmed_nonce_at_source,
270+
"latest_confirmed_nonces_at_source",
271+
&self.latest_confirmed_nonces_at_source,
264272
)
265273
.field("target_nonces", &self.target_nonces)
266274
.field("strategy", &self.strategy)
@@ -292,17 +300,64 @@ impl<P: MessageLane> RaceStrategy<SourceHeaderIdOf<P>, TargetHeaderIdOf<P>, P::M
292300
at_block: SourceHeaderIdOf<P>,
293301
nonces: SourceClientNonces<Self::SourceNoncesRange>,
294302
) {
295-
self.latest_confirmed_nonce_at_source = nonces.confirmed_nonce;
303+
if let Some(confirmed_nonce) = nonces.confirmed_nonce {
304+
let is_confirmed_nonce_updated = self
305+
.latest_confirmed_nonces_at_source
306+
.back()
307+
.map(|(_, prev_nonce)| *prev_nonce != confirmed_nonce)
308+
.unwrap_or(true);
309+
if is_confirmed_nonce_updated {
310+
self.latest_confirmed_nonces_at_source
311+
.push_back((at_block.clone(), confirmed_nonce));
312+
}
313+
}
296314
self.strategy.source_nonces_updated(at_block, nonces)
297315
}
298316

299-
fn target_nonces_updated(
317+
fn best_target_nonces_updated(
300318
&mut self,
301319
nonces: TargetClientNonces<DeliveryRaceTargetNoncesData>,
302320
race_state: &mut RaceState<SourceHeaderIdOf<P>, TargetHeaderIdOf<P>, P::MessagesProof>,
303321
) {
304-
self.target_nonces = Some(nonces.clone());
305-
self.strategy.target_nonces_updated(
322+
// best target nonces must always be ge than finalized target nonces
323+
let mut target_nonces = self.target_nonces.take().unwrap_or_else(|| nonces.clone());
324+
target_nonces.nonces_data = nonces.nonces_data.clone();
325+
target_nonces.latest_nonce = std::cmp::max(target_nonces.latest_nonce, nonces.latest_nonce);
326+
self.target_nonces = Some(target_nonces);
327+
328+
self.strategy.best_target_nonces_updated(
329+
TargetClientNonces {
330+
latest_nonce: nonces.latest_nonce,
331+
nonces_data: (),
332+
},
333+
race_state,
334+
)
335+
}
336+
337+
fn finalized_target_nonces_updated(
338+
&mut self,
339+
nonces: TargetClientNonces<DeliveryRaceTargetNoncesData>,
340+
race_state: &mut RaceState<SourceHeaderIdOf<P>, TargetHeaderIdOf<P>, P::MessagesProof>,
341+
) {
342+
if let Some(ref best_finalized_source_header_id_at_best_target) =
343+
race_state.best_finalized_source_header_id_at_best_target
344+
{
345+
let oldest_header_number_to_keep = best_finalized_source_header_id_at_best_target.0;
346+
while self
347+
.latest_confirmed_nonces_at_source
348+
.front()
349+
.map(|(id, _)| id.0 < oldest_header_number_to_keep)
350+
.unwrap_or(false)
351+
{
352+
self.latest_confirmed_nonces_at_source.pop_front();
353+
}
354+
}
355+
356+
if let Some(ref mut target_nonces) = self.target_nonces {
357+
target_nonces.latest_nonce = std::cmp::max(target_nonces.latest_nonce, nonces.latest_nonce);
358+
}
359+
360+
self.strategy.finalized_target_nonces_updated(
306361
TargetClientNonces {
307362
latest_nonce: nonces.latest_nonce,
308363
nonces_data: (),
@@ -315,7 +370,14 @@ impl<P: MessageLane> RaceStrategy<SourceHeaderIdOf<P>, TargetHeaderIdOf<P>, P::M
315370
&mut self,
316371
race_state: &RaceState<SourceHeaderIdOf<P>, TargetHeaderIdOf<P>, P::MessagesProof>,
317372
) -> Option<(RangeInclusive<MessageNonce>, Self::ProofParameters)> {
318-
let latest_confirmed_nonce_at_source = self.latest_confirmed_nonce_at_source?;
373+
let best_finalized_source_header_id_at_best_target =
374+
race_state.best_finalized_source_header_id_at_best_target.clone()?;
375+
let latest_confirmed_nonce_at_source = self
376+
.latest_confirmed_nonces_at_source
377+
.iter()
378+
.take_while(|(id, _)| id.0 <= best_finalized_source_header_id_at_best_target.0)
379+
.last()
380+
.map(|(_, nonce)| *nonce)?;
319381
let target_nonces = self.target_nonces.as_ref()?;
320382

321383
// There's additional condition in the message delivery race: target would reject messages
@@ -509,6 +571,7 @@ mod tests {
509571
best_finalized_source_header_id_at_source: Some(header_id(1)),
510572
best_finalized_source_header_id_at_best_target: Some(header_id(1)),
511573
best_target_header_id: Some(header_id(1)),
574+
best_finalized_target_header_id: Some(header_id(1)),
512575
nonces_to_submit: None,
513576
nonces_submitted: None,
514577
};
@@ -519,7 +582,7 @@ mod tests {
519582
max_messages_in_single_batch: 4,
520583
max_messages_weight_in_single_batch: 4,
521584
max_messages_size_in_single_batch: 4,
522-
latest_confirmed_nonce_at_source: Some(19),
585+
latest_confirmed_nonces_at_source: vec![(header_id(1), 19)].into_iter().collect(),
523586
target_nonces: Some(TargetClientNonces {
524587
latest_nonce: 19,
525588
nonces_data: DeliveryRaceTargetNoncesData {
@@ -548,13 +611,17 @@ mod tests {
548611
confirmed_nonce: Some(19),
549612
},
550613
);
551-
race_strategy.strategy.target_nonces_updated(
552-
TargetClientNonces {
553-
latest_nonce: 19,
554-
nonces_data: (),
555-
},
556-
&mut race_state,
557-
);
614+
615+
let target_nonces = TargetClientNonces {
616+
latest_nonce: 19,
617+
nonces_data: (),
618+
};
619+
race_strategy
620+
.strategy
621+
.best_target_nonces_updated(target_nonces.clone(), &mut race_state);
622+
race_strategy
623+
.strategy
624+
.finalized_target_nonces_updated(target_nonces, &mut race_state);
558625

559626
(race_state, race_strategy)
560627
}
@@ -611,8 +678,12 @@ mod tests {
611678

612679
// if there are already `max_unconfirmed_nonces_at_target` messages on target,
613680
// we need to wait until confirmations will be delivered by receiving race
614-
strategy.latest_confirmed_nonce_at_source =
615-
Some(strategy.target_nonces.as_ref().unwrap().latest_nonce - strategy.max_unconfirmed_nonces_at_target);
681+
strategy.latest_confirmed_nonces_at_source = vec![(
682+
header_id(1),
683+
strategy.target_nonces.as_ref().unwrap().latest_nonce - strategy.max_unconfirmed_nonces_at_target,
684+
)]
685+
.into_iter()
686+
.collect();
616687
assert_eq!(strategy.select_nonces_to_deliver(&state), None);
617688
}
618689

@@ -622,7 +693,7 @@ mod tests {
622693

623694
// if there are new confirmed nonces on source, we want to relay this information
624695
// to target to prune rewards queue
625-
let prev_confirmed_nonce_at_source = strategy.latest_confirmed_nonce_at_source.unwrap();
696+
let prev_confirmed_nonce_at_source = strategy.latest_confirmed_nonces_at_source.back().unwrap().1;
626697
strategy.target_nonces.as_mut().unwrap().nonces_data.confirmed_nonce = prev_confirmed_nonce_at_source - 1;
627698
assert_eq!(
628699
strategy.select_nonces_to_deliver(&state),
@@ -650,7 +721,7 @@ mod tests {
650721

651722
// if there are already `max_unrewarded_relayer_entries_at_target` entries at target,
652723
// we need to prove at least `messages_in_oldest_entry` rewards
653-
let prev_confirmed_nonce_at_source = strategy.latest_confirmed_nonce_at_source.unwrap();
724+
let prev_confirmed_nonce_at_source = strategy.latest_confirmed_nonces_at_source.back().unwrap().1;
654725
{
655726
let mut nonces_data = &mut strategy.target_nonces.as_mut().unwrap().nonces_data;
656727
nonces_data.confirmed_nonce = prev_confirmed_nonce_at_source - 1;
@@ -667,7 +738,7 @@ mod tests {
667738

668739
// if there are already `max_unrewarded_relayer_entries_at_target` entries at target,
669740
// we need to prove at least `messages_in_oldest_entry` rewards
670-
let prev_confirmed_nonce_at_source = strategy.latest_confirmed_nonce_at_source.unwrap();
741+
let prev_confirmed_nonce_at_source = strategy.latest_confirmed_nonces_at_source.back().unwrap().1;
671742
{
672743
let mut nonces_data = &mut strategy.target_nonces.as_mut().unwrap().nonces_data;
673744
nonces_data.confirmed_nonce = prev_confirmed_nonce_at_source - 3;
@@ -747,12 +818,54 @@ mod tests {
747818

748819
// 1 delivery confirmation from target to source is still missing, so we may only
749820
// relay 3 new messages
750-
let prev_confirmed_nonce_at_source = strategy.latest_confirmed_nonce_at_source.unwrap();
751-
strategy.latest_confirmed_nonce_at_source = Some(prev_confirmed_nonce_at_source - 1);
821+
let prev_confirmed_nonce_at_source = strategy.latest_confirmed_nonces_at_source.back().unwrap().1;
822+
strategy.latest_confirmed_nonces_at_source = vec![(header_id(1), prev_confirmed_nonce_at_source - 1)]
823+
.into_iter()
824+
.collect();
825+
strategy.target_nonces.as_mut().unwrap().nonces_data.confirmed_nonce = prev_confirmed_nonce_at_source - 1;
826+
assert_eq!(
827+
strategy.select_nonces_to_deliver(&state),
828+
Some(((20..=22), proof_parameters(false, 3)))
829+
);
830+
}
831+
832+
#[test]
833+
fn message_delivery_strategy_waits_for_confirmed_nonce_header_to_appear_on_target() {
834+
// 1 delivery confirmation from target to source is still missing, so we may deliver
835+
// reward confirmation with our message delivery transaction. But the problem is that
836+
// the reward has been paid at header 2 && this header is still unknown to target node.
837+
//
838+
// => so we can't deliver more than 3 messages
839+
let (mut state, mut strategy) = prepare_strategy();
840+
let prev_confirmed_nonce_at_source = strategy.latest_confirmed_nonces_at_source.back().unwrap().1;
841+
strategy.latest_confirmed_nonces_at_source = vec![
842+
(header_id(1), prev_confirmed_nonce_at_source - 1),
843+
(header_id(2), prev_confirmed_nonce_at_source),
844+
]
845+
.into_iter()
846+
.collect();
752847
strategy.target_nonces.as_mut().unwrap().nonces_data.confirmed_nonce = prev_confirmed_nonce_at_source - 1;
848+
state.best_finalized_source_header_id_at_best_target = Some(header_id(1));
753849
assert_eq!(
754850
strategy.select_nonces_to_deliver(&state),
755851
Some(((20..=22), proof_parameters(false, 3)))
756852
);
853+
854+
// the same situation, but the header 2 is known to the target node, so we may deliver reward confirmation
855+
let (mut state, mut strategy) = prepare_strategy();
856+
let prev_confirmed_nonce_at_source = strategy.latest_confirmed_nonces_at_source.back().unwrap().1;
857+
strategy.latest_confirmed_nonces_at_source = vec![
858+
(header_id(1), prev_confirmed_nonce_at_source - 1),
859+
(header_id(2), prev_confirmed_nonce_at_source),
860+
]
861+
.into_iter()
862+
.collect();
863+
strategy.target_nonces.as_mut().unwrap().nonces_data.confirmed_nonce = prev_confirmed_nonce_at_source - 1;
864+
state.best_finalized_source_header_id_at_source = Some(header_id(2));
865+
state.best_finalized_source_header_id_at_best_target = Some(header_id(2));
866+
assert_eq!(
867+
strategy.select_nonces_to_deliver(&state),
868+
Some(((20..=23), proof_parameters(true, 4)))
869+
);
757870
}
758871
}

0 commit comments

Comments
 (0)