@@ -29,7 +29,12 @@ use async_trait::async_trait;
29
29
use bp_message_lane:: { MessageNonce , UnrewardedRelayersState , Weight } ;
30
30
use futures:: stream:: FusedStream ;
31
31
use relay_utils:: FailedClient ;
32
- use std:: { collections:: BTreeMap , marker:: PhantomData , ops:: RangeInclusive , time:: Duration } ;
32
+ use std:: {
33
+ collections:: { BTreeMap , VecDeque } ,
34
+ marker:: PhantomData ,
35
+ ops:: RangeInclusive ,
36
+ time:: Duration ,
37
+ } ;
33
38
34
39
/// Run message delivery race.
35
40
pub async fn run < P : MessageLane > (
@@ -61,7 +66,7 @@ pub async fn run<P: MessageLane>(
61
66
max_messages_in_single_batch : params. max_messages_in_single_batch ,
62
67
max_messages_weight_in_single_batch : params. max_messages_weight_in_single_batch ,
63
68
max_messages_size_in_single_batch : params. max_messages_size_in_single_batch ,
64
- latest_confirmed_nonce_at_source : None ,
69
+ latest_confirmed_nonces_at_source : VecDeque :: new ( ) ,
65
70
target_nonces : None ,
66
71
strategy : BasicStrategy :: new ( ) ,
67
72
} ,
@@ -164,14 +169,17 @@ where
164
169
async fn nonces (
165
170
& self ,
166
171
at_block : TargetHeaderIdOf < P > ,
172
+ update_metrics : bool ,
167
173
) -> Result < ( TargetHeaderIdOf < P > , TargetClientNonces < DeliveryRaceTargetNoncesData > ) , Self :: Error > {
168
174
let ( at_block, latest_received_nonce) = self . client . latest_received_nonce ( at_block) . await ?;
169
175
let ( at_block, latest_confirmed_nonce) = self . client . latest_confirmed_received_nonce ( at_block) . await ?;
170
176
let ( at_block, unrewarded_relayers) = self . client . unrewarded_relayers_state ( at_block) . await ?;
171
177
172
- if let Some ( metrics_msg) = self . metrics_msg . as_ref ( ) {
173
- metrics_msg. update_target_latest_received_nonce :: < P > ( latest_received_nonce) ;
174
- metrics_msg. update_target_latest_confirmed_nonce :: < P > ( latest_confirmed_nonce) ;
178
+ if update_metrics {
179
+ if let Some ( metrics_msg) = self . metrics_msg . as_ref ( ) {
180
+ metrics_msg. update_target_latest_received_nonce :: < P > ( latest_received_nonce) ;
181
+ metrics_msg. update_target_latest_confirmed_nonce :: < P > ( latest_confirmed_nonce) ;
182
+ }
175
183
}
176
184
177
185
Ok ( (
@@ -221,8 +229,8 @@ struct MessageDeliveryStrategy<P: MessageLane> {
221
229
max_messages_weight_in_single_batch : Weight ,
222
230
/// Maximal messages size in the single delivery transaction.
223
231
max_messages_size_in_single_batch : usize ,
224
- /// Latest confirmed nonce at the source client.
225
- latest_confirmed_nonce_at_source : Option < MessageNonce > ,
232
+ /// Latest confirmed nonces at the source client + the header id where we have first met this nonce .
233
+ latest_confirmed_nonces_at_source : VecDeque < ( SourceHeaderIdOf < P > , MessageNonce ) > ,
226
234
/// Target nonces from the source client.
227
235
target_nonces : Option < TargetClientNonces < DeliveryRaceTargetNoncesData > > ,
228
236
/// Basic delivery strategy.
@@ -259,8 +267,8 @@ impl<P: MessageLane> std::fmt::Debug for MessageDeliveryStrategy<P> {
259
267
& self . max_messages_size_in_single_batch ,
260
268
)
261
269
. field (
262
- "latest_confirmed_noncs_at_source " ,
263
- & self . latest_confirmed_nonce_at_source ,
270
+ "latest_confirmed_nonces_at_source " ,
271
+ & self . latest_confirmed_nonces_at_source ,
264
272
)
265
273
. field ( "target_nonces" , & self . target_nonces )
266
274
. field ( "strategy" , & self . strategy )
@@ -292,17 +300,64 @@ impl<P: MessageLane> RaceStrategy<SourceHeaderIdOf<P>, TargetHeaderIdOf<P>, P::M
292
300
at_block : SourceHeaderIdOf < P > ,
293
301
nonces : SourceClientNonces < Self :: SourceNoncesRange > ,
294
302
) {
295
- self . latest_confirmed_nonce_at_source = nonces. confirmed_nonce ;
303
+ if let Some ( confirmed_nonce) = nonces. confirmed_nonce {
304
+ let is_confirmed_nonce_updated = self
305
+ . latest_confirmed_nonces_at_source
306
+ . back ( )
307
+ . map ( |( _, prev_nonce) | * prev_nonce != confirmed_nonce)
308
+ . unwrap_or ( true ) ;
309
+ if is_confirmed_nonce_updated {
310
+ self . latest_confirmed_nonces_at_source
311
+ . push_back ( ( at_block. clone ( ) , confirmed_nonce) ) ;
312
+ }
313
+ }
296
314
self . strategy . source_nonces_updated ( at_block, nonces)
297
315
}
298
316
299
- fn target_nonces_updated (
317
+ fn best_target_nonces_updated (
300
318
& mut self ,
301
319
nonces : TargetClientNonces < DeliveryRaceTargetNoncesData > ,
302
320
race_state : & mut RaceState < SourceHeaderIdOf < P > , TargetHeaderIdOf < P > , P :: MessagesProof > ,
303
321
) {
304
- self . target_nonces = Some ( nonces. clone ( ) ) ;
305
- self . strategy . target_nonces_updated (
322
+ // best target nonces must always be ge than finalized target nonces
323
+ let mut target_nonces = self . target_nonces . take ( ) . unwrap_or_else ( || nonces. clone ( ) ) ;
324
+ target_nonces. nonces_data = nonces. nonces_data . clone ( ) ;
325
+ target_nonces. latest_nonce = std:: cmp:: max ( target_nonces. latest_nonce , nonces. latest_nonce ) ;
326
+ self . target_nonces = Some ( target_nonces) ;
327
+
328
+ self . strategy . best_target_nonces_updated (
329
+ TargetClientNonces {
330
+ latest_nonce : nonces. latest_nonce ,
331
+ nonces_data : ( ) ,
332
+ } ,
333
+ race_state,
334
+ )
335
+ }
336
+
337
+ fn finalized_target_nonces_updated (
338
+ & mut self ,
339
+ nonces : TargetClientNonces < DeliveryRaceTargetNoncesData > ,
340
+ race_state : & mut RaceState < SourceHeaderIdOf < P > , TargetHeaderIdOf < P > , P :: MessagesProof > ,
341
+ ) {
342
+ if let Some ( ref best_finalized_source_header_id_at_best_target) =
343
+ race_state. best_finalized_source_header_id_at_best_target
344
+ {
345
+ let oldest_header_number_to_keep = best_finalized_source_header_id_at_best_target. 0 ;
346
+ while self
347
+ . latest_confirmed_nonces_at_source
348
+ . front ( )
349
+ . map ( |( id, _) | id. 0 < oldest_header_number_to_keep)
350
+ . unwrap_or ( false )
351
+ {
352
+ self . latest_confirmed_nonces_at_source . pop_front ( ) ;
353
+ }
354
+ }
355
+
356
+ if let Some ( ref mut target_nonces) = self . target_nonces {
357
+ target_nonces. latest_nonce = std:: cmp:: max ( target_nonces. latest_nonce , nonces. latest_nonce ) ;
358
+ }
359
+
360
+ self . strategy . finalized_target_nonces_updated (
306
361
TargetClientNonces {
307
362
latest_nonce : nonces. latest_nonce ,
308
363
nonces_data : ( ) ,
@@ -315,7 +370,14 @@ impl<P: MessageLane> RaceStrategy<SourceHeaderIdOf<P>, TargetHeaderIdOf<P>, P::M
315
370
& mut self ,
316
371
race_state : & RaceState < SourceHeaderIdOf < P > , TargetHeaderIdOf < P > , P :: MessagesProof > ,
317
372
) -> Option < ( RangeInclusive < MessageNonce > , Self :: ProofParameters ) > {
318
- let latest_confirmed_nonce_at_source = self . latest_confirmed_nonce_at_source ?;
373
+ let best_finalized_source_header_id_at_best_target =
374
+ race_state. best_finalized_source_header_id_at_best_target . clone ( ) ?;
375
+ let latest_confirmed_nonce_at_source = self
376
+ . latest_confirmed_nonces_at_source
377
+ . iter ( )
378
+ . take_while ( |( id, _) | id. 0 <= best_finalized_source_header_id_at_best_target. 0 )
379
+ . last ( )
380
+ . map ( |( _, nonce) | * nonce) ?;
319
381
let target_nonces = self . target_nonces . as_ref ( ) ?;
320
382
321
383
// There's additional condition in the message delivery race: target would reject messages
@@ -509,6 +571,7 @@ mod tests {
509
571
best_finalized_source_header_id_at_source : Some ( header_id ( 1 ) ) ,
510
572
best_finalized_source_header_id_at_best_target : Some ( header_id ( 1 ) ) ,
511
573
best_target_header_id : Some ( header_id ( 1 ) ) ,
574
+ best_finalized_target_header_id : Some ( header_id ( 1 ) ) ,
512
575
nonces_to_submit : None ,
513
576
nonces_submitted : None ,
514
577
} ;
@@ -519,7 +582,7 @@ mod tests {
519
582
max_messages_in_single_batch : 4 ,
520
583
max_messages_weight_in_single_batch : 4 ,
521
584
max_messages_size_in_single_batch : 4 ,
522
- latest_confirmed_nonce_at_source : Some ( 19 ) ,
585
+ latest_confirmed_nonces_at_source : vec ! [ ( header_id ( 1 ) , 19 ) ] . into_iter ( ) . collect ( ) ,
523
586
target_nonces : Some ( TargetClientNonces {
524
587
latest_nonce : 19 ,
525
588
nonces_data : DeliveryRaceTargetNoncesData {
@@ -548,13 +611,17 @@ mod tests {
548
611
confirmed_nonce : Some ( 19 ) ,
549
612
} ,
550
613
) ;
551
- race_strategy. strategy . target_nonces_updated (
552
- TargetClientNonces {
553
- latest_nonce : 19 ,
554
- nonces_data : ( ) ,
555
- } ,
556
- & mut race_state,
557
- ) ;
614
+
615
+ let target_nonces = TargetClientNonces {
616
+ latest_nonce : 19 ,
617
+ nonces_data : ( ) ,
618
+ } ;
619
+ race_strategy
620
+ . strategy
621
+ . best_target_nonces_updated ( target_nonces. clone ( ) , & mut race_state) ;
622
+ race_strategy
623
+ . strategy
624
+ . finalized_target_nonces_updated ( target_nonces, & mut race_state) ;
558
625
559
626
( race_state, race_strategy)
560
627
}
@@ -611,8 +678,12 @@ mod tests {
611
678
612
679
// if there are already `max_unconfirmed_nonces_at_target` messages on target,
613
680
// we need to wait until confirmations will be delivered by receiving race
614
- strategy. latest_confirmed_nonce_at_source =
615
- Some ( strategy. target_nonces . as_ref ( ) . unwrap ( ) . latest_nonce - strategy. max_unconfirmed_nonces_at_target ) ;
681
+ strategy. latest_confirmed_nonces_at_source = vec ! [ (
682
+ header_id( 1 ) ,
683
+ strategy. target_nonces. as_ref( ) . unwrap( ) . latest_nonce - strategy. max_unconfirmed_nonces_at_target,
684
+ ) ]
685
+ . into_iter ( )
686
+ . collect ( ) ;
616
687
assert_eq ! ( strategy. select_nonces_to_deliver( & state) , None ) ;
617
688
}
618
689
@@ -622,7 +693,7 @@ mod tests {
622
693
623
694
// if there are new confirmed nonces on source, we want to relay this information
624
695
// to target to prune rewards queue
625
- let prev_confirmed_nonce_at_source = strategy. latest_confirmed_nonce_at_source . unwrap ( ) ;
696
+ let prev_confirmed_nonce_at_source = strategy. latest_confirmed_nonces_at_source . back ( ) . unwrap ( ) . 1 ;
626
697
strategy. target_nonces . as_mut ( ) . unwrap ( ) . nonces_data . confirmed_nonce = prev_confirmed_nonce_at_source - 1 ;
627
698
assert_eq ! (
628
699
strategy. select_nonces_to_deliver( & state) ,
@@ -650,7 +721,7 @@ mod tests {
650
721
651
722
// if there are already `max_unrewarded_relayer_entries_at_target` entries at target,
652
723
// we need to prove at least `messages_in_oldest_entry` rewards
653
- let prev_confirmed_nonce_at_source = strategy. latest_confirmed_nonce_at_source . unwrap ( ) ;
724
+ let prev_confirmed_nonce_at_source = strategy. latest_confirmed_nonces_at_source . back ( ) . unwrap ( ) . 1 ;
654
725
{
655
726
let mut nonces_data = & mut strategy. target_nonces . as_mut ( ) . unwrap ( ) . nonces_data ;
656
727
nonces_data. confirmed_nonce = prev_confirmed_nonce_at_source - 1 ;
@@ -667,7 +738,7 @@ mod tests {
667
738
668
739
// if there are already `max_unrewarded_relayer_entries_at_target` entries at target,
669
740
// we need to prove at least `messages_in_oldest_entry` rewards
670
- let prev_confirmed_nonce_at_source = strategy. latest_confirmed_nonce_at_source . unwrap ( ) ;
741
+ let prev_confirmed_nonce_at_source = strategy. latest_confirmed_nonces_at_source . back ( ) . unwrap ( ) . 1 ;
671
742
{
672
743
let mut nonces_data = & mut strategy. target_nonces . as_mut ( ) . unwrap ( ) . nonces_data ;
673
744
nonces_data. confirmed_nonce = prev_confirmed_nonce_at_source - 3 ;
@@ -747,12 +818,54 @@ mod tests {
747
818
748
819
// 1 delivery confirmation from target to source is still missing, so we may only
749
820
// relay 3 new messages
750
- let prev_confirmed_nonce_at_source = strategy. latest_confirmed_nonce_at_source . unwrap ( ) ;
751
- strategy. latest_confirmed_nonce_at_source = Some ( prev_confirmed_nonce_at_source - 1 ) ;
821
+ let prev_confirmed_nonce_at_source = strategy. latest_confirmed_nonces_at_source . back ( ) . unwrap ( ) . 1 ;
822
+ strategy. latest_confirmed_nonces_at_source = vec ! [ ( header_id( 1 ) , prev_confirmed_nonce_at_source - 1 ) ]
823
+ . into_iter ( )
824
+ . collect ( ) ;
825
+ strategy. target_nonces . as_mut ( ) . unwrap ( ) . nonces_data . confirmed_nonce = prev_confirmed_nonce_at_source - 1 ;
826
+ assert_eq ! (
827
+ strategy. select_nonces_to_deliver( & state) ,
828
+ Some ( ( ( 20 ..=22 ) , proof_parameters( false , 3 ) ) )
829
+ ) ;
830
+ }
831
+
832
+ #[ test]
833
+ fn message_delivery_strategy_waits_for_confirmed_nonce_header_to_appear_on_target ( ) {
834
+ // 1 delivery confirmation from target to source is still missing, so we may deliver
835
+ // reward confirmation with our message delivery transaction. But the problem is that
836
+ // the reward has been paid at header 2 && this header is still unknown to target node.
837
+ //
838
+ // => so we can't deliver more than 3 messages
839
+ let ( mut state, mut strategy) = prepare_strategy ( ) ;
840
+ let prev_confirmed_nonce_at_source = strategy. latest_confirmed_nonces_at_source . back ( ) . unwrap ( ) . 1 ;
841
+ strategy. latest_confirmed_nonces_at_source = vec ! [
842
+ ( header_id( 1 ) , prev_confirmed_nonce_at_source - 1 ) ,
843
+ ( header_id( 2 ) , prev_confirmed_nonce_at_source) ,
844
+ ]
845
+ . into_iter ( )
846
+ . collect ( ) ;
752
847
strategy. target_nonces . as_mut ( ) . unwrap ( ) . nonces_data . confirmed_nonce = prev_confirmed_nonce_at_source - 1 ;
848
+ state. best_finalized_source_header_id_at_best_target = Some ( header_id ( 1 ) ) ;
753
849
assert_eq ! (
754
850
strategy. select_nonces_to_deliver( & state) ,
755
851
Some ( ( ( 20 ..=22 ) , proof_parameters( false , 3 ) ) )
756
852
) ;
853
+
854
+ // the same situation, but the header 2 is known to the target node, so we may deliver reward confirmation
855
+ let ( mut state, mut strategy) = prepare_strategy ( ) ;
856
+ let prev_confirmed_nonce_at_source = strategy. latest_confirmed_nonces_at_source . back ( ) . unwrap ( ) . 1 ;
857
+ strategy. latest_confirmed_nonces_at_source = vec ! [
858
+ ( header_id( 1 ) , prev_confirmed_nonce_at_source - 1 ) ,
859
+ ( header_id( 2 ) , prev_confirmed_nonce_at_source) ,
860
+ ]
861
+ . into_iter ( )
862
+ . collect ( ) ;
863
+ strategy. target_nonces . as_mut ( ) . unwrap ( ) . nonces_data . confirmed_nonce = prev_confirmed_nonce_at_source - 1 ;
864
+ state. best_finalized_source_header_id_at_source = Some ( header_id ( 2 ) ) ;
865
+ state. best_finalized_source_header_id_at_best_target = Some ( header_id ( 2 ) ) ;
866
+ assert_eq ! (
867
+ strategy. select_nonces_to_deliver( & state) ,
868
+ Some ( ( ( 20 ..=23 ) , proof_parameters( true , 4 ) ) )
869
+ ) ;
757
870
}
758
871
}
0 commit comments