Skip to content

Commit a8fcf3d

Browse files
authored
pause relays(s) when node is syncing (#605)
1 parent a886988 commit a8fcf3d

File tree

16 files changed

+94
-6
lines changed

16 files changed

+94
-6
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

relays/ethereum-client/src/client.rs

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,18 @@
1616

1717
use crate::rpc::Ethereum;
1818
use crate::types::{
19-
Address, Bytes, CallRequest, Header, HeaderWithTransactions, Receipt, SignedRawTx, Transaction, TransactionHash,
20-
H256, U256,
19+
Address, Bytes, CallRequest, Header, HeaderWithTransactions, Receipt, SignedRawTx, SyncState, Transaction,
20+
TransactionHash, H256, U256,
2121
};
2222
use crate::{ConnectionParams, Error, Result};
2323

2424
use jsonrpsee::raw::RawClient;
2525
use jsonrpsee::transport::http::HttpTransportClient;
2626
use jsonrpsee::Client as RpcClient;
2727

28+
/// Number of headers missing from the Ethereum node for us to consider node not synced.
29+
const MAJOR_SYNC_BLOCKS: u64 = 5;
30+
2831
/// The client used to interact with an Ethereum node through RPC.
2932
#[derive(Clone)]
3033
pub struct Client {
@@ -53,6 +56,23 @@ impl Client {
5356
pub fn reconnect(&mut self) {
5457
self.client = Self::build_client(&self.params);
5558
}
59+
}
60+
61+
impl Client {
62+
/// Returns true if client is connected to at least one peer and is in synced state.
63+
pub async fn ensure_synced(&self) -> Result<()> {
64+
match Ethereum::syncing(&self.client).await? {
65+
SyncState::NotSyncing => Ok(()),
66+
SyncState::Syncing(syncing) => {
67+
let missing_headers = syncing.highest_block.saturating_sub(syncing.current_block);
68+
if missing_headers > MAJOR_SYNC_BLOCKS.into() {
69+
return Err(Error::ClientNotSynced(missing_headers));
70+
}
71+
72+
Ok(())
73+
}
74+
}
75+
}
5676

5777
/// Estimate gas usage for the given call.
5878
pub async fn estimate_gas(&self, call_request: CallRequest) -> Result<U256> {

relays/ethereum-client/src/error.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
//! Ethereum node RPC errors.
1818
19+
use crate::types::U256;
20+
1921
use jsonrpsee::client::RequestError;
2022
use relay_utils::MaybeConnectionError;
2123

@@ -40,6 +42,9 @@ pub enum Error {
4042
InvalidSubstrateBlockNumber,
4143
/// An invalid index has been received from an Ethereum node.
4244
InvalidIncompleteIndex,
45+
/// The client we're connected to is not synced, so we can't rely on its state. Contains
46+
/// number of unsynced headers.
47+
ClientNotSynced(U256),
4348
}
4449

4550
impl From<RequestError> for Error {
@@ -50,7 +55,11 @@ impl From<RequestError> for Error {
5055

5156
impl MaybeConnectionError for Error {
5257
fn is_connection_error(&self) -> bool {
53-
matches!(*self, Error::Request(RequestError::TransportError(_)))
58+
matches!(
59+
*self,
60+
Error::Request(RequestError::TransportError(_))
61+
| Error::ClientNotSynced(_),
62+
)
5463
}
5564
}
5665

@@ -66,6 +75,9 @@ impl ToString for Error {
6675
Self::IncompleteTransaction => "Incomplete Ethereum Transaction (missing required field - raw)".to_string(),
6776
Self::InvalidSubstrateBlockNumber => "Received an invalid Substrate block from Ethereum Node".to_string(),
6877
Self::InvalidIncompleteIndex => "Received an invalid incomplete index from Ethereum Node".to_string(),
78+
Self::ClientNotSynced(missing_headers) => {
79+
format!("Ethereum client is not synced: syncing {} headers", missing_headers)
80+
}
6981
}
7082
}
7183
}

relays/ethereum-client/src/rpc.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,14 @@
2222
#![allow(unused_variables)]
2323

2424
use crate::types::{
25-
Address, Bytes, CallRequest, Header, HeaderWithTransactions, Receipt, Transaction, TransactionHash, H256, U256, U64,
25+
Address, Bytes, CallRequest, Header, HeaderWithTransactions, Receipt, SyncState, Transaction, TransactionHash,
26+
H256, U256, U64,
2627
};
2728

2829
jsonrpsee::rpc_api! {
2930
pub(crate) Ethereum {
31+
#[rpc(method = "eth_syncing", positional_params)]
32+
fn syncing() -> SyncState;
3033
#[rpc(method = "eth_estimateGas", positional_params)]
3134
fn estimate_gas(call_request: CallRequest) -> U256;
3235
#[rpc(method = "eth_blockNumber", positional_params)]

relays/ethereum-client/src/types.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
1919
use headers_relay::sync_types::SourceHeader;
2020

21-
pub use web3::types::{Address, Bytes, CallRequest, H256, U128, U256, U64};
21+
pub use web3::types::{Address, Bytes, CallRequest, SyncState, H256, U128, U256, U64};
2222

2323
/// When header is just received from the Ethereum node, we check that it has
2424
/// both number and hash fields filled.

relays/ethereum/src/ethereum_exchange.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,10 @@ impl TargetClient<EthereumToSubstrateExchange> for SubstrateTransactionsTarget {
248248
}
249249

250250
async fn best_finalized_header_id(&self) -> Result<EthereumHeaderId, RpcError> {
251+
// we can't continue to relay exchange proofs if Substrate node is out of sync, because
252+
// it may have already received (some of) proofs that we're going to relay
253+
self.client.ensure_synced().await?;
254+
251255
self.client.best_ethereum_finalized_block().await
252256
}
253257

relays/ethereum/src/ethereum_sync_loop.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,9 @@ impl RelayClient for EthereumHeadersSource {
130130
#[async_trait]
131131
impl SourceClient<EthereumHeadersSyncPipeline> for EthereumHeadersSource {
132132
async fn best_block_number(&self) -> Result<u64, RpcError> {
133+
// we **CAN** continue to relay headers if Ethereum node is out of sync, because
134+
// Substrate node may be missing headers that are already available at the Ethereum
135+
133136
self.client.best_block_number().await.map_err(Into::into)
134137
}
135138

@@ -204,6 +207,10 @@ impl RelayClient for SubstrateHeadersTarget {
204207
#[async_trait]
205208
impl TargetClient<EthereumHeadersSyncPipeline> for SubstrateHeadersTarget {
206209
async fn best_header_id(&self) -> Result<EthereumHeaderId, RpcError> {
210+
// we can't continue to relay headers if Substrate node is out of sync, because
211+
// it may have already received (some of) headers that we're going to relay
212+
self.client.ensure_synced().await?;
213+
207214
self.client.best_ethereum_block().await
208215
}
209216

relays/ethereum/src/substrate_sync_loop.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,10 @@ impl RelayClient for EthereumHeadersTarget {
131131
#[async_trait]
132132
impl TargetClient<SubstrateHeadersSyncPipeline> for EthereumHeadersTarget {
133133
async fn best_header_id(&self) -> Result<RialtoHeaderId, RpcError> {
134+
// we can't continue to relay headers if Ethereum node is out of sync, because
135+
// it may have already received (some of) headers that we're going to relay
136+
self.client.ensure_synced().await?;
137+
134138
self.client.best_substrate_block(self.contract).await
135139
}
136140

relays/substrate-client/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ relay-utils = { path = "../utils" }
2626
frame-support = { git = "https://github.com/paritytech/substrate.git", branch = "master" }
2727
frame-system = { git = "https://github.com/paritytech/substrate.git", branch = "master" }
2828
pallet-balances = { git = "https://github.com/paritytech/substrate.git", branch = "master" }
29+
sc-rpc-api = { git = "https://github.com/paritytech/substrate.git", branch = "master" }
2930
sp-core = { git = "https://github.com/paritytech/substrate.git", branch = "master" }
3031
sp-runtime = { git = "https://github.com/paritytech/substrate.git", branch = "master" }
3132
sp-std = { git = "https://github.com/paritytech/substrate.git", branch = "master" }

relays/substrate-client/src/client.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,17 @@ impl<C: Chain> Client<C> {
104104
}
105105

106106
impl<C: Chain> Client<C> {
107+
/// Returns true if client is connected to at least one peer and is in synced state.
108+
pub async fn ensure_synced(&self) -> Result<()> {
109+
let health = Substrate::<C, _, _>::system_health(&self.client).await?;
110+
let is_synced = !health.is_syncing && (!health.should_have_peers || health.peers > 0);
111+
if is_synced {
112+
Ok(())
113+
} else {
114+
Err(Error::ClientNotSynced(health))
115+
}
116+
}
117+
107118
/// Return hash of the genesis block.
108119
pub fn genesis_hash(&self) -> &C::Hash {
109120
&self.genesis_hash

relays/substrate-client/src/error.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
use jsonrpsee::client::RequestError;
2020
use jsonrpsee::transport::ws::WsNewDnsError;
2121
use relay_utils::MaybeConnectionError;
22+
use sc_rpc_api::system::Health;
2223

2324
/// Result type used by Substrate client.
2425
pub type Result<T> = std::result::Result<T, Error>;
@@ -38,6 +39,8 @@ pub enum Error {
3839
UninitializedBridgePallet,
3940
/// Account does not exist on the chain.
4041
AccountDoesNotExist,
42+
/// The client we're connected to is not synced, so we can't rely on its state.
43+
ClientNotSynced(Health),
4144
/// Custom logic error.
4245
Custom(String),
4346
}
@@ -56,7 +59,11 @@ impl From<RequestError> for Error {
5659

5760
impl MaybeConnectionError for Error {
5861
fn is_connection_error(&self) -> bool {
59-
matches!(*self, Error::Request(RequestError::TransportError(_)))
62+
matches!(
63+
*self,
64+
Error::Request(RequestError::TransportError(_))
65+
| Error::ClientNotSynced(_)
66+
)
6067
}
6168
}
6269

@@ -74,6 +81,7 @@ impl ToString for Error {
7481
Self::ResponseParseFailed(e) => e.what().to_string(),
7582
Self::UninitializedBridgePallet => "The Substrate bridge pallet has not been initialized yet.".into(),
7683
Self::AccountDoesNotExist => "Account does not exist on the chain".into(),
84+
Self::ClientNotSynced(health) => format!("Substrate client is not synced: {}", health),
7785
Self::Custom(e) => e.clone(),
7886
}
7987
}

relays/substrate-client/src/headers_source.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ where
7373
P::Header: SourceHeader<C::Hash, C::BlockNumber>,
7474
{
7575
async fn best_block_number(&self) -> Result<P::Number, Error> {
76+
// we **CAN** continue to relay headers if source node is out of sync, because
77+
// target node may be missing headers that are already available at the source
7678
Ok(*self.client.best_header().await?.number())
7779
}
7880

relays/substrate-client/src/rpc.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use crate::chain::Chain;
2525

2626
use bp_message_lane::{LaneId, MessageNonce};
2727
use bp_runtime::InstanceId;
28+
use sc_rpc_api::system::Health;
2829
use sp_core::{
2930
storage::{StorageData, StorageKey},
3031
Bytes,
@@ -33,6 +34,8 @@ use sp_version::RuntimeVersion;
3334

3435
jsonrpsee::rpc_api! {
3536
pub(crate) Substrate<C: Chain> {
37+
#[rpc(method = "system_health", positional_params)]
38+
fn system_health() -> Health;
3639
#[rpc(method = "chain_getHeader", positional_params)]
3740
fn chain_get_header(block_hash: Option<C::Hash>) -> C::Header;
3841
#[rpc(method = "chain_getFinalizedHead", positional_params)]

relays/substrate/src/headers_target.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,10 @@ where
7373
P: SubstrateHeadersSyncPipeline<Completion = Justification, Extra = ()>,
7474
{
7575
async fn best_header_id(&self) -> Result<HeaderIdOf<P>, SubstrateError> {
76+
// we can't continue to relay headers if target node is out of sync, because
77+
// it may have already received (some of) headers that we're going to relay
78+
self.client.ensure_synced().await?;
79+
7680
let call = P::BEST_BLOCK_METHOD.into();
7781
let data = Bytes(Vec::new());
7882

relays/substrate/src/messages_source.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,10 @@ where
9999
P::TargetHeaderHash: Decode,
100100
{
101101
async fn state(&self) -> Result<SourceClientState<P>, SubstrateError> {
102+
// we can't continue to deliver confirmations if source node is out of sync, because
103+
// it may have already received confirmations that we're going to deliver
104+
self.client.ensure_synced().await?;
105+
102106
read_client_state::<_, P::TargetHeaderHash, P::TargetHeaderNumber>(
103107
&self.client,
104108
P::BEST_FINALIZED_TARGET_HEADER_ID_AT_SOURCE,

relays/substrate/src/messages_target.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,10 @@ where
9595
P::SourceHeaderHash: Decode,
9696
{
9797
async fn state(&self) -> Result<TargetClientState<P>, SubstrateError> {
98+
// we can't continue to deliver messages if target node is out of sync, because
99+
// it may have already received (some of) messages that we're going to deliver
100+
self.client.ensure_synced().await?;
101+
98102
read_client_state::<_, P::SourceHeaderHash, P::SourceHeaderNumber>(
99103
&self.client,
100104
P::BEST_FINALIZED_SOURCE_HEADER_ID_AT_TARGET,

0 commit comments

Comments
 (0)