Skip to content

Commit d86c3ce

Browse files
authored
Reconnect on-demand clients from MessagesSource::reconnect and MessagesTarget::reconnect (#1927)
* reconnect on-demand clients from MessagesSource::reconnect and MessagesTarget::reconnect * add issue reference * fmt
1 parent 4161b51 commit d86c3ce

File tree

5 files changed

+52
-2
lines changed

5 files changed

+52
-2
lines changed

relays/lib-substrate-relay/src/messages_source.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,8 +129,22 @@ impl<P: SubstrateMessageLane> RelayClient for SubstrateMessagesSource<P> {
129129
type Error = SubstrateError;
130130

131131
async fn reconnect(&mut self) -> Result<(), SubstrateError> {
132+
// since the client calls RPC methods on both sides, we need to reconnect both
132133
self.source_client.reconnect().await?;
133-
self.target_client.reconnect().await
134+
self.target_client.reconnect().await?;
135+
136+
// call reconnect on on-demand headers relay, because we may use different chains there
137+
// and the error that has lead to reconnect may have came from those other chains
138+
// (see `require_target_header_on_source`)
139+
//
140+
// this may lead to multiple reconnects to the same node during the same call and it
141+
// needs to be addressed in the future
142+
// TODO: https://github.com/paritytech/parity-bridges-common/issues/1928
143+
if let Some(ref mut target_to_source_headers_relay) = self.target_to_source_headers_relay {
144+
target_to_source_headers_relay.reconnect().await?;
145+
}
146+
147+
Ok(())
134148
}
135149
}
136150

relays/lib-substrate-relay/src/messages_target.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,8 +123,22 @@ impl<P: SubstrateMessageLane> RelayClient for SubstrateMessagesTarget<P> {
123123
type Error = SubstrateError;
124124

125125
async fn reconnect(&mut self) -> Result<(), SubstrateError> {
126+
// since the client calls RPC methods on both sides, we need to reconnect both
126127
self.target_client.reconnect().await?;
127-
self.source_client.reconnect().await
128+
self.source_client.reconnect().await?;
129+
130+
// call reconnect on on-demand headers relay, because we may use different chains there
131+
// and the error that has lead to reconnect may have came from those other chains
132+
// (see `require_source_header_on_target`)
133+
//
134+
// this may lead to multiple reconnects to the same node during the same call and it
135+
// needs to be addressed in the future
136+
// TODO: https://github.com/paritytech/parity-bridges-common/issues/1928
137+
if let Some(ref mut source_to_target_headers_relay) = self.source_to_target_headers_relay {
138+
source_to_target_headers_relay.reconnect().await?;
139+
}
140+
141+
Ok(())
128142
}
129143
}
130144

relays/lib-substrate-relay/src/on_demand/headers.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ pub struct OnDemandHeadersRelay<P: SubstrateFinalitySyncPipeline> {
6060
required_header_number: RequiredHeaderNumberRef<P::SourceChain>,
6161
/// Client of the source chain.
6262
source_client: Client<P::SourceChain>,
63+
/// Client of the target chain.
64+
target_client: Client<P::TargetChain>,
6365
}
6466

6567
impl<P: SubstrateFinalitySyncPipeline> OnDemandHeadersRelay<P> {
@@ -83,6 +85,7 @@ impl<P: SubstrateFinalitySyncPipeline> OnDemandHeadersRelay<P> {
8385
relay_task_name: on_demand_headers_relay_name::<P::SourceChain, P::TargetChain>(),
8486
required_header_number: required_header_number.clone(),
8587
source_client: source_client.clone(),
88+
target_client: target_client.clone(),
8689
};
8790
async_std::task::spawn(async move {
8891
background_task::<P>(
@@ -104,6 +107,13 @@ impl<P: SubstrateFinalitySyncPipeline> OnDemandHeadersRelay<P> {
104107
impl<P: SubstrateFinalitySyncPipeline> OnDemandRelay<P::SourceChain, P::TargetChain>
105108
for OnDemandHeadersRelay<P>
106109
{
110+
async fn reconnect(&self) -> Result<(), SubstrateError> {
111+
// using clone is fine here (to avoid mut requirement), because clone on Client clones
112+
// internal references
113+
self.source_client.clone().reconnect().await?;
114+
self.target_client.clone().reconnect().await
115+
}
116+
107117
async fn require_more_headers(&self, required_header: BlockNumberOf<P::SourceChain>) {
108118
let mut required_header_number = self.required_header_number.lock().await;
109119
if required_header > *required_header_number {

relays/lib-substrate-relay/src/on_demand/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ pub mod parachains;
2626
/// On-demand headers relay that is relaying finalizing headers only when requested.
2727
#[async_trait]
2828
pub trait OnDemandRelay<SourceChain: Chain, TargetChain: Chain>: Send + Sync {
29+
/// Reconnect to source and target nodes.
30+
async fn reconnect(&self) -> Result<(), SubstrateError>;
31+
2932
/// Ask relay to relay source header with given number to the target chain.
3033
///
3134
/// Depending on implementation, on-demand relay may also relay `required_header` ancestors

relays/lib-substrate-relay/src/on_demand/parachains.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,15 @@ impl<P: SubstrateParachainsPipeline> OnDemandRelay<P::SourceParachain, P::Target
119119
where
120120
P::SourceParachain: Chain<Hash = ParaHash>,
121121
{
122+
async fn reconnect(&self) -> Result<(), SubstrateError> {
123+
// using clone is fine here (to avoid mut requirement), because clone on Client clones
124+
// internal references
125+
self.source_relay_client.clone().reconnect().await?;
126+
self.target_client.clone().reconnect().await?;
127+
// we'll probably need to reconnect relay chain relayer clients also
128+
self.on_demand_source_relay_to_target_headers.reconnect().await
129+
}
130+
122131
async fn require_more_headers(&self, required_header: BlockNumberOf<P::SourceParachain>) {
123132
if let Err(e) = self.required_header_number_sender.send(required_header).await {
124133
log::trace!(

0 commit comments

Comments
 (0)