Skip to content

Commit 67ab221

Browse files
altonenark0f
authored andcommitted
Introduce ChainSyncInterface (paritytech#12489)
* Introduce `ChainSyncInterface` `ChainSyncInterface` provides an asynchronous interface for other subsystems to submit calls to `ChainSync`. This allows `NetworkService` to delegate calls to `ChainSync` while still providing the same API for other subsystems (for now). This makes it possible to move the syncing code in piecemeal fashion out of `protocol.rs` as the calls are just forwarded to `ChainSync`. * Apply review comments * Fix tests
1 parent 2bde525 commit 67ab221

File tree

17 files changed

+301
-45
lines changed

17 files changed

+301
-45
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

client/network/common/src/sync.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -395,4 +395,14 @@ pub trait ChainSync<Block: BlockT>: Send {
395395

396396
/// Decode implementation-specific state response.
397397
fn decode_state_response(&self, response: &[u8]) -> Result<OpaqueStateResponse, String>;
398+
399+
/// Advance the state of `ChainSync`
400+
///
401+
/// Internally calls [`ChainSync::poll_block_announce_validation()`] and
402+
/// this function should be polled until it returns [`Poll::Pending`] to
403+
/// consume all pending events.
404+
fn poll(
405+
&mut self,
406+
cx: &mut std::task::Context,
407+
) -> Poll<PollBlockAnnounceValidation<Block::Header>>;
398408
}

client/network/src/config.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ pub use sc_network_common::{
3333

3434
pub use libp2p::{build_multiaddr, core::PublicKey, identity};
3535

36+
use crate::ChainSyncInterface;
3637
use core::{fmt, iter};
3738
use libp2p::{
3839
identity::{ed25519, Keypair},
@@ -91,6 +92,9 @@ where
9192
/// Instance of chain sync implementation.
9293
pub chain_sync: Box<dyn ChainSync<B>>,
9394

95+
/// Interface that can be used to delegate syncing-related function calls to `ChainSync`
96+
pub chain_sync_service: Box<dyn ChainSyncInterface<B>>,
97+
9498
/// Registry for recording prometheus metrics to.
9599
pub metrics_registry: Option<Registry>,
96100

client/network/src/lib.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,7 @@ pub use service::{
279279
DecodingError, Keypair, NetworkService, NetworkWorker, NotificationSender,
280280
NotificationSenderReady, OutboundFailure, PublicKey,
281281
};
282+
use sp_runtime::traits::{Block as BlockT, NumberFor};
282283

283284
pub use sc_peerset::ReputationChange;
284285

@@ -293,3 +294,14 @@ const MAX_CONNECTIONS_PER_PEER: usize = 2;
293294

294295
/// The maximum number of concurrent established connections that were incoming.
295296
const MAX_CONNECTIONS_ESTABLISHED_INCOMING: u32 = 10_000;
297+
298+
/// Abstraction over syncing-related services
299+
pub trait ChainSyncInterface<B: BlockT>:
300+
NetworkSyncForkRequest<B::Hash, NumberFor<B>> + Send + Sync
301+
{
302+
}
303+
304+
impl<T, B: BlockT> ChainSyncInterface<B> for T where
305+
T: NetworkSyncForkRequest<B::Hash, NumberFor<B>> + Send + Sync
306+
{
307+
}

client/network/src/protocol.rs

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -947,18 +947,6 @@ where
947947
self.chain_sync.clear_justification_requests();
948948
}
949949

950-
/// Request syncing for the given block from given set of peers.
951-
/// Uses `protocol` to queue a new block download request and tries to dispatch all pending
952-
/// requests.
953-
pub fn set_sync_fork_request(
954-
&mut self,
955-
peers: Vec<PeerId>,
956-
hash: &B::Hash,
957-
number: NumberFor<B>,
958-
) {
959-
self.chain_sync.set_sync_fork_request(peers, hash, number)
960-
}
961-
962950
/// A batch of blocks have been processed, with or without errors.
963951
/// Call this when a batch of blocks have been processed by the importqueue, with or without
964952
/// errors.
@@ -1461,8 +1449,11 @@ where
14611449
self.pending_messages.push_back(event);
14621450
}
14631451

1464-
// Check if there is any block announcement validation finished.
1465-
while let Poll::Ready(result) = self.chain_sync.poll_block_announce_validation(cx) {
1452+
// Advance the state of `ChainSync`
1453+
//
1454+
// Process any received requests received from `NetworkService` and
1455+
// check if there is any block announcement validation finished.
1456+
while let Poll::Ready(result) = self.chain_sync.poll(cx) {
14661457
match self.process_block_announce_validation_result(result) {
14671458
CustomMessageOutcome::None => {},
14681459
outcome => self.pending_messages.push_back(outcome),

client/network/src/service.rs

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ use crate::{
3535
NetworkState, NotConnectedPeer as NetworkStateNotConnectedPeer, Peer as NetworkStatePeer,
3636
},
3737
protocol::{self, NotificationsSink, NotifsHandlerError, PeerInfo, Protocol, Ready},
38-
transport, ReputationChange,
38+
transport, ChainSyncInterface, ReputationChange,
3939
};
4040

4141
use futures::{channel::oneshot, prelude::*};
@@ -121,6 +121,8 @@ pub struct NetworkService<B: BlockT + 'static, H: ExHashT> {
121121
peerset: PeersetHandle,
122122
/// Channel that sends messages to the actual worker.
123123
to_worker: TracingUnboundedSender<ServiceToWorkerMsg<B>>,
124+
/// Interface that can be used to delegate calls to `ChainSync`
125+
chain_sync_service: Box<dyn ChainSyncInterface<B>>,
124126
/// For each peer and protocol combination, an object that allows sending notifications to
125127
/// that peer. Updated by the [`NetworkWorker`].
126128
peers_notifications_sinks: Arc<Mutex<HashMap<(PeerId, ProtocolName), NotificationsSink>>>,
@@ -433,6 +435,7 @@ where
433435
local_peer_id,
434436
local_identity,
435437
to_worker,
438+
chain_sync_service: params.chain_sync_service,
436439
peers_notifications_sinks: peers_notifications_sinks.clone(),
437440
notifications_sizes_metric: metrics
438441
.as_ref()
@@ -814,7 +817,7 @@ where
814817
/// a stale fork missing.
815818
/// Passing empty `peers` set effectively removes the sync request.
816819
fn set_sync_fork_request(&self, peers: Vec<PeerId>, hash: B::Hash, number: NumberFor<B>) {
817-
let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::SyncFork(peers, hash, number));
820+
self.chain_sync_service.set_sync_fork_request(peers, hash, number);
818821
}
819822
}
820823

@@ -1219,7 +1222,6 @@ enum ServiceToWorkerMsg<B: BlockT> {
12191222
RemoveSetReserved(ProtocolName, PeerId),
12201223
AddToPeersSet(ProtocolName, PeerId),
12211224
RemoveFromPeersSet(ProtocolName, PeerId),
1222-
SyncFork(Vec<PeerId>, B::Hash, NumberFor<B>),
12231225
EventStream(out_events::Sender),
12241226
Request {
12251227
target: PeerId,
@@ -1380,11 +1382,6 @@ where
13801382
.behaviour_mut()
13811383
.user_protocol_mut()
13821384
.remove_from_peers_set(protocol, peer_id),
1383-
ServiceToWorkerMsg::SyncFork(peer_ids, hash, number) => this
1384-
.network_service
1385-
.behaviour_mut()
1386-
.user_protocol_mut()
1387-
.set_sync_fork_request(peer_ids, &hash, number),
13881385
ServiceToWorkerMsg::EventStream(sender) => this.event_streams.push(sender),
13891386
ServiceToWorkerMsg::Request {
13901387
target,

client/network/src/service/chainsync_tests.rs

Lines changed: 46 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
// You should have received a copy of the GNU General Public License
1717
// along with this program. If not, see <https://www.gnu.org/licenses/>.
1818

19-
use crate::{config, NetworkWorker};
19+
use crate::{config, ChainSyncInterface, NetworkWorker};
2020

2121
use futures::prelude::*;
2222
use libp2p::PeerId;
@@ -35,7 +35,7 @@ use sc_network_common::{
3535
use sc_network_light::light_client_requests::handler::LightClientRequestHandler;
3636
use sc_network_sync::{
3737
block_request_handler::BlockRequestHandler, mock::MockChainSync,
38-
state_request_handler::StateRequestHandler,
38+
service::mock::MockChainSyncInterface, state_request_handler::StateRequestHandler,
3939
};
4040
use sp_core::H256;
4141
use sp_runtime::{
@@ -56,6 +56,7 @@ const PROTOCOL_NAME: &str = "/foo";
5656

5757
fn make_network(
5858
chain_sync: Box<dyn ChainSyncT<substrate_test_runtime_client::runtime::Block>>,
59+
chain_sync_service: Box<dyn ChainSyncInterface<substrate_test_runtime_client::runtime::Block>>,
5960
client: Arc<substrate_test_runtime_client::TestClient>,
6061
) -> (TestNetworkWorker, Arc<substrate_test_runtime_client::TestClient>) {
6162
let network_config = config::NetworkConfiguration {
@@ -174,6 +175,7 @@ fn make_network(
174175
fork_id,
175176
import_queue,
176177
chain_sync,
178+
chain_sync_service,
177179
metrics_registry: None,
178180
block_request_protocol_config,
179181
state_request_protocol_config,
@@ -193,7 +195,7 @@ fn set_default_expecations_no_peers(
193195
chain_sync.expect_state_request().returning(|| None);
194196
chain_sync.expect_justification_requests().returning(|| Box::new(iter::empty()));
195197
chain_sync.expect_warp_sync_request().returning(|| None);
196-
chain_sync.expect_poll_block_announce_validation().returning(|_| Poll::Pending);
198+
chain_sync.expect_poll().returning(|_| Poll::Pending);
197199
chain_sync.expect_status().returning(|| SyncStatus {
198200
state: SyncState::Idle,
199201
best_seen_block: None,
@@ -207,11 +209,18 @@ fn set_default_expecations_no_peers(
207209
#[async_std::test]
208210
async fn normal_network_poll_no_peers() {
209211
let client = Arc::new(TestClientBuilder::with_default_backend().build_with_longest_chain().0);
212+
213+
// build `ChainSync` and set default expectations for it
210214
let mut chain_sync =
211215
Box::new(MockChainSync::<substrate_test_runtime_client::runtime::Block>::new());
212216
set_default_expecations_no_peers(&mut chain_sync);
213217

214-
let (mut network, _) = make_network(chain_sync, client);
218+
// build `ChainSyncInterface` provider and set no expecations for it (i.e., it cannot be
219+
// called)
220+
let chain_sync_service =
221+
Box::new(MockChainSyncInterface::<substrate_test_runtime_client::runtime::Block>::new());
222+
223+
let (mut network, _) = make_network(chain_sync, chain_sync_service, client);
215224

216225
// poll the network once
217226
futures::future::poll_fn(|cx| {
@@ -224,6 +233,13 @@ async fn normal_network_poll_no_peers() {
224233
#[async_std::test]
225234
async fn request_justification() {
226235
let client = Arc::new(TestClientBuilder::with_default_backend().build_with_longest_chain().0);
236+
237+
// build `ChainSyncInterface` provider and set no expecations for it (i.e., it cannot be
238+
// called)
239+
let chain_sync_service =
240+
Box::new(MockChainSyncInterface::<substrate_test_runtime_client::runtime::Block>::new());
241+
242+
// build `ChainSync` and verify that call to `request_justification()` is made
227243
let mut chain_sync =
228244
Box::new(MockChainSync::<substrate_test_runtime_client::runtime::Block>::new());
229245

@@ -237,7 +253,7 @@ async fn request_justification() {
237253
.returning(|_, _| ());
238254

239255
set_default_expecations_no_peers(&mut chain_sync);
240-
let (mut network, _) = make_network(chain_sync, client);
256+
let (mut network, _) = make_network(chain_sync, chain_sync_service, client);
241257

242258
// send "request justifiction" message and poll the network
243259
network.service().request_justification(&hash, number);
@@ -252,13 +268,20 @@ async fn request_justification() {
252268
#[async_std::test]
253269
async fn clear_justification_requests(&mut self) {
254270
let client = Arc::new(TestClientBuilder::with_default_backend().build_with_longest_chain().0);
271+
272+
// build `ChainSyncInterface` provider and set no expecations for it (i.e., it cannot be
273+
// called)
274+
let chain_sync_service =
275+
Box::new(MockChainSyncInterface::<substrate_test_runtime_client::runtime::Block>::new());
276+
277+
// build `ChainSync` and verify that call to `clear_justification_requests()` is made
255278
let mut chain_sync =
256279
Box::new(MockChainSync::<substrate_test_runtime_client::runtime::Block>::new());
257280

258281
chain_sync.expect_clear_justification_requests().once().returning(|| ());
259282

260283
set_default_expecations_no_peers(&mut chain_sync);
261-
let (mut network, _) = make_network(chain_sync, client);
284+
let (mut network, _) = make_network(chain_sync, chain_sync_service, client);
262285

263286
// send "request justifiction" message and poll the network
264287
network.service().clear_justification_requests();
@@ -273,24 +296,31 @@ async fn clear_justification_requests(&mut self) {
273296
#[async_std::test]
274297
async fn set_sync_fork_request() {
275298
let client = Arc::new(TestClientBuilder::with_default_backend().build_with_longest_chain().0);
299+
300+
// build `ChainSync` and set default expectations for it
276301
let mut chain_sync =
277302
Box::new(MockChainSync::<substrate_test_runtime_client::runtime::Block>::new());
303+
set_default_expecations_no_peers(&mut chain_sync);
304+
305+
// build `ChainSyncInterface` provider and verify that the `set_sync_fork_request()`
306+
// call is delegated to `ChainSyncInterface` (which eventually forwards it to `ChainSync`)
307+
let mut chain_sync_service =
308+
MockChainSyncInterface::<substrate_test_runtime_client::runtime::Block>::new();
278309

279310
let hash = H256::random();
280311
let number = 1337u64;
281312
let peers = (0..3).map(|_| PeerId::random()).collect::<Vec<_>>();
282313
let copy_peers = peers.clone();
283314

284-
chain_sync
315+
chain_sync_service
285316
.expect_set_sync_fork_request()
286317
.withf(move |in_peers, in_hash, in_number| {
287318
&peers == in_peers && &hash == in_hash && &number == in_number
288319
})
289320
.once()
290321
.returning(|_, _, _| ());
291322

292-
set_default_expecations_no_peers(&mut chain_sync);
293-
let (mut network, _) = make_network(chain_sync, client);
323+
let (mut network, _) = make_network(chain_sync, Box::new(chain_sync_service), client);
294324

295325
// send "set sync fork request" message and poll the network
296326
network.service().set_sync_fork_request(copy_peers, hash, number);
@@ -305,6 +335,12 @@ async fn set_sync_fork_request() {
305335
#[async_std::test]
306336
async fn on_block_finalized() {
307337
let client = Arc::new(TestClientBuilder::with_default_backend().build_with_longest_chain().0);
338+
// build `ChainSyncInterface` provider and set no expecations for it (i.e., it cannot be
339+
// called)
340+
let chain_sync_service =
341+
Box::new(MockChainSyncInterface::<substrate_test_runtime_client::runtime::Block>::new());
342+
343+
// build `ChainSync` and verify that call to `on_block_finalized()` is made
308344
let mut chain_sync =
309345
Box::new(MockChainSync::<substrate_test_runtime_client::runtime::Block>::new());
310346

@@ -326,7 +362,7 @@ async fn on_block_finalized() {
326362
.returning(|_, _| ());
327363

328364
set_default_expecations_no_peers(&mut chain_sync);
329-
let (mut network, _) = make_network(chain_sync, client);
365+
let (mut network, _) = make_network(chain_sync, chain_sync_service, client);
330366

331367
// send "set sync fork request" message and poll the network
332368
network.on_block_finalized(hash, header);

client/network/src/service/tests.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ fn build_test_full_node(
124124
protocol_config
125125
};
126126

127-
let chain_sync = ChainSync::new(
127+
let (chain_sync, chain_sync_service) = ChainSync::new(
128128
match network_config.sync_mode {
129129
config::SyncMode::Full => sc_network_common::sync::SyncMode::Full,
130130
config::SyncMode::Fast { skip_proofs, storage_chain_mode } =>
@@ -172,6 +172,7 @@ fn build_test_full_node(
172172
fork_id,
173173
import_queue,
174174
chain_sync: Box::new(chain_sync),
175+
chain_sync_service,
175176
metrics_registry: None,
176177
block_request_protocol_config,
177178
state_request_protocol_config,

client/network/sync/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ sc-client-api = { version = "4.0.0-dev", path = "../../api" }
3434
sc-consensus = { version = "0.10.0-dev", path = "../../consensus/common" }
3535
sc-network-common = { version = "0.10.0-dev", path = "../common" }
3636
sc-peerset = { version = "4.0.0-dev", path = "../../peerset" }
37+
sc-utils = { version = "4.0.0-dev", path = "../../utils" }
3738
sp-arithmetic = { version = "5.0.0", path = "../../../primitives/arithmetic" }
3839
sp-blockchain = { version = "4.0.0-dev", path = "../../../primitives/blockchain" }
3940
sp-consensus = { version = "0.10.0-dev", path = "../../../primitives/consensus/common" }
@@ -42,6 +43,7 @@ sp-finality-grandpa = { version = "4.0.0-dev", path = "../../../primitives/final
4243
sp-runtime = { version = "6.0.0", path = "../../../primitives/runtime" }
4344

4445
[dev-dependencies]
46+
async-std = { version = "1.11.0", features = ["attributes"] }
4547
quickcheck = { version = "1.0.3", default-features = false }
4648
sc-block-builder = { version = "0.10.0-dev", path = "../../block-builder" }
4749
sp-test-primitives = { version = "2.0.0", path = "../../../primitives/test-primitives" }

0 commit comments

Comments
 (0)