Skip to content

Reconnect on-demand clients from MessagesSource::reconnect and MessagesTarget::reconnect #1927

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion relays/lib-substrate-relay/src/messages_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,22 @@ impl<P: SubstrateMessageLane> RelayClient for SubstrateMessagesSource<P> {
type Error = SubstrateError;

async fn reconnect(&mut self) -> Result<(), SubstrateError> {
// since the client calls RPC methods on both sides, we need to reconnect both
self.source_client.reconnect().await?;
self.target_client.reconnect().await
self.target_client.reconnect().await?;

// call reconnect on on-demand headers relay, because we may use different chains there
// and the error that has lead to reconnect may have came from those other chains
// (see `require_target_header_on_source`)
//
// this may lead to multiple reconnects to the same node during the same call and it
// needs to be addressed in the future
// TODO: https://github.com/paritytech/parity-bridges-common/issues/1928
if let Some(ref mut target_to_source_headers_relay) = self.target_to_source_headers_relay {
target_to_source_headers_relay.reconnect().await?;
}

Ok(())
}
}

Expand Down
16 changes: 15 additions & 1 deletion relays/lib-substrate-relay/src/messages_target.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,22 @@ impl<P: SubstrateMessageLane> RelayClient for SubstrateMessagesTarget<P> {
type Error = SubstrateError;

async fn reconnect(&mut self) -> Result<(), SubstrateError> {
// since the client calls RPC methods on both sides, we need to reconnect both
self.target_client.reconnect().await?;
self.source_client.reconnect().await
self.source_client.reconnect().await?;

// call reconnect on on-demand headers relay, because we may use different chains there
// and the error that has lead to reconnect may have came from those other chains
// (see `require_source_header_on_target`)
//
// this may lead to multiple reconnects to the same node during the same call and it
// needs to be addressed in the future
// TODO: https://github.com/paritytech/parity-bridges-common/issues/1928
if let Some(ref mut source_to_target_headers_relay) = self.source_to_target_headers_relay {
source_to_target_headers_relay.reconnect().await?;
}

Ok(())
}
}

Expand Down
10 changes: 10 additions & 0 deletions relays/lib-substrate-relay/src/on_demand/headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ pub struct OnDemandHeadersRelay<P: SubstrateFinalitySyncPipeline> {
required_header_number: RequiredHeaderNumberRef<P::SourceChain>,
/// Client of the source chain.
source_client: Client<P::SourceChain>,
/// Client of the target chain.
target_client: Client<P::TargetChain>,
}

impl<P: SubstrateFinalitySyncPipeline> OnDemandHeadersRelay<P> {
Expand All @@ -83,6 +85,7 @@ impl<P: SubstrateFinalitySyncPipeline> OnDemandHeadersRelay<P> {
relay_task_name: on_demand_headers_relay_name::<P::SourceChain, P::TargetChain>(),
required_header_number: required_header_number.clone(),
source_client: source_client.clone(),
target_client: target_client.clone(),
};
async_std::task::spawn(async move {
background_task::<P>(
Expand All @@ -104,6 +107,13 @@ impl<P: SubstrateFinalitySyncPipeline> OnDemandHeadersRelay<P> {
impl<P: SubstrateFinalitySyncPipeline> OnDemandRelay<P::SourceChain, P::TargetChain>
for OnDemandHeadersRelay<P>
{
async fn reconnect(&self) -> Result<(), SubstrateError> {
// using clone is fine here (to avoid mut requirement), because clone on Client clones
// internal references
self.source_client.clone().reconnect().await?;
self.target_client.clone().reconnect().await
}

async fn require_more_headers(&self, required_header: BlockNumberOf<P::SourceChain>) {
let mut required_header_number = self.required_header_number.lock().await;
if required_header > *required_header_number {
Expand Down
3 changes: 3 additions & 0 deletions relays/lib-substrate-relay/src/on_demand/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ pub mod parachains;
/// On-demand headers relay that is relaying finalizing headers only when requested.
#[async_trait]
pub trait OnDemandRelay<SourceChain: Chain, TargetChain: Chain>: Send + Sync {
/// Reconnect to source and target nodes.
async fn reconnect(&self) -> Result<(), SubstrateError>;

/// Ask relay to relay source header with given number to the target chain.
///
/// Depending on implementation, on-demand relay may also relay `required_header` ancestors
Expand Down
9 changes: 9 additions & 0 deletions relays/lib-substrate-relay/src/on_demand/parachains.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,15 @@ impl<P: SubstrateParachainsPipeline> OnDemandRelay<P::SourceParachain, P::Target
where
P::SourceParachain: Chain<Hash = ParaHash>,
{
async fn reconnect(&self) -> Result<(), SubstrateError> {
// using clone is fine here (to avoid mut requirement), because clone on Client clones
// internal references
self.source_relay_client.clone().reconnect().await?;
self.target_client.clone().reconnect().await?;
// we'll probably need to reconnect relay chain relayer clients also
self.on_demand_source_relay_to_target_headers.reconnect().await
}

async fn require_more_headers(&self, required_header: BlockNumberOf<P::SourceParachain>) {
if let Err(e) = self.required_header_number_sender.send(required_header).await {
log::trace!(
Expand Down