Skip to content

Commit db0ef23

Browse files
committed
Created async request-reply for Tari p2p services
This PR contains the building blocks for async p2p services. It consists of the following modules: - `builder`: contains the `MakeServicePair` trait which should be implemented by a service builder and the `StackBuilder` struct which is responsible for building the service and making service _handles_ available to all the other services. Handles are any object which is able to control a service in some way. Most commonly the handle will be a `transport::Requester<MyServiceRequest>`. - `handles`: struct for collecting named handles for services. The `StackBuilder` uses this to make all handles available to services. - `transport`: This allows messages to be reliably send/received to/from services. A `Requester`/`Responder` pair is created using the `transport::channel` function which takes an impl of `tower_service::Service` as it's first parameter. A `Requester` implements `tower_service::Service` and is used to send requests which return a Future which resolves to a response. The `Requester` uses a `oneshot` channel allow responses to be sent back. A `Responder` receives a `(request, oneshot::Sender)` tuple, calls the given tower service with that request and sends the result on the `oneshot::Sender`. The `Responder` handles many requests simultaneously.
1 parent e6564d9 commit db0ef23

File tree

8 files changed

+749
-185
lines changed

8 files changed

+749
-185
lines changed

base_layer/p2p/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ chrono = { version = "0.4.6", features = ["serde"]}
2222
tokio = "0.1.22"
2323
futures = "0.1.28"
2424
tower-service = "0.2.0"
25+
tower-util = "0.1.0"
2526
ttl_cache = "0.5.1"
2627

2728
[dev-dependencies]
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
// Copyright 2019 The Tari Project
2+
//
3+
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4+
// following conditions are met:
5+
//
6+
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7+
// disclaimer.
8+
//
9+
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10+
// following disclaimer in the documentation and/or other materials provided with the distribution.
11+
//
12+
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13+
// products derived from this software without specific prior written permission.
14+
//
15+
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16+
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17+
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18+
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19+
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20+
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21+
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22+
23+
use crate::executor::handles::ServiceHandles;
24+
use futures::{future, Future, IntoFuture};
25+
use std::{any::Any, hash::Hash, sync::Arc};
26+
27+
pub trait MakeServicePair<N> {
28+
type Future: Future<Item = (), Error = ()> + Send;
29+
type Handle: Any + Send + Sync;
30+
31+
fn make_pair(self, handles: Arc<ServiceHandles<N>>) -> (Self::Handle, Self::Future);
32+
}
33+
34+
impl<FN, F, H, N> MakeServicePair<N> for FN
35+
where
36+
FN: FnOnce(Arc<ServiceHandles<N>>) -> (H, F),
37+
F: Future<Item = (), Error = ()> + Send,
38+
H: Any,
39+
H: Send + Sync,
40+
N: Eq + Hash,
41+
{
42+
type Future = F;
43+
type Handle = H;
44+
45+
fn make_pair(self, handles: Arc<ServiceHandles<N>>) -> (Self::Handle, Self::Future) {
46+
(self)(handles)
47+
}
48+
}
49+
50+
pub struct StackBuilder<N> {
51+
handles: Arc<ServiceHandles<N>>,
52+
futures: Vec<Box<dyn Future<Item = (), Error = ()> + Send>>,
53+
}
54+
55+
impl<N> StackBuilder<N>
56+
where
57+
N: Eq + Hash,
58+
N: Send + Sync + 'static,
59+
{
60+
pub fn new() -> Self {
61+
let handles = Arc::new(ServiceHandles::new());
62+
Self {
63+
handles,
64+
futures: Vec::new(),
65+
}
66+
}
67+
68+
pub fn add_service(mut self, name: N, maker: impl MakeServicePair<N> + Send + 'static) -> Self {
69+
let (handle, fut) = maker.make_pair(self.handles.clone());
70+
self.handles.insert(name, handle);
71+
self.futures.push(Box::new(fut));
72+
self
73+
}
74+
}
75+
76+
impl<N> IntoFuture for StackBuilder<N> {
77+
type Error = ();
78+
type Future = Box<dyn Future<Item = (), Error = ()> + Send>;
79+
type Item = ();
80+
81+
fn into_future(self) -> Self::Future {
82+
Box::new(future::join_all(self.futures).map(|_| ()))
83+
}
84+
}
85+
86+
#[cfg(test)]
87+
mod test {
88+
use super::*;
89+
use futures::{future::poll_fn, Async};
90+
use std::sync::atomic::{AtomicBool, Ordering};
91+
92+
#[test]
93+
fn service_stack_new() {
94+
let state = Arc::new(AtomicBool::new(false));
95+
let state_inner = Arc::clone(&state.clone());
96+
let service_pair_factory = |handles: Arc<ServiceHandles<&'static str>>| {
97+
let fut = poll_fn(move || {
98+
// Test that this futures own handle is available
99+
let fake_handle = handles.get_handle::<&str>(&"test-service").unwrap();
100+
assert_eq!(fake_handle, "Fake Handle");
101+
let not_found = handles.get_handle::<&str>(&"not-found");
102+
assert!(not_found.is_none());
103+
104+
// Any panics above won't fail the test so a marker bool is set
105+
// if there are no panics.
106+
// catch_unwind could be used but then the UnwindSafe trait bound
107+
// needs to be added. TODO: handle panics in service poll functions
108+
state_inner.store(true, Ordering::Release);
109+
Ok(Async::Ready(()))
110+
});
111+
112+
("Fake Handle", fut)
113+
};
114+
115+
tokio::run(
116+
StackBuilder::new()
117+
.add_service("test-service", service_pair_factory)
118+
.into_future(),
119+
);
120+
121+
assert!(state.load(Ordering::Acquire))
122+
}
123+
}

base_layer/p2p/src/executor/handle.rs

Lines changed: 0 additions & 184 deletions
This file was deleted.

0 commit comments

Comments
 (0)