Skip to content

Commit 14819c6

Browse files
committed
Created async request-reply for Tari p2p services
This PR contains the building blocks for async p2p services. It consists of the following modules: - `builder`: contains the `MakeServicePair` trait which should be implemented by a service builder and the `StackBuilder` struct which is responsible for building the service and making service _handles_ available to all the other services. Handles are any object which is able to control a service in some way. Most commonly the handle will be a `transport::Requester<MyServiceRequest>`. - `handles`: struct for collecting named handles for services. The `StackBuilder` uses this to make all handles available to services. - `transport`: This allows messages to be reliably send/received to/from services. A `Requester`/`Responder` pair is created using the `transport::channel` function which takes an impl of `tower_service::Service` as it's first parameter. A `Requester` implements `tower_service::Service` and is used to send requests which return a Future which resolves to a response. The `Requester` uses a `oneshot` channel allow responses to be sent back. A `Responder` receives a `(request, oneshot::Sender)` tuple, calls the given tower service with that request and sends the result on the `oneshot::Sender`. The `Responder` handles many requests simultaneously.
1 parent e6564d9 commit 14819c6

File tree

8 files changed

+755
-185
lines changed

8 files changed

+755
-185
lines changed

base_layer/p2p/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ chrono = { version = "0.4.6", features = ["serde"]}
2222
tokio = "0.1.22"
2323
futures = "0.1.28"
2424
tower-service = "0.2.0"
25+
tower-util = "0.1.0"
2526
ttl_cache = "0.5.1"
2627

2728
[dev-dependencies]
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
// Copyright 2019 The Tari Project
2+
//
3+
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4+
// following conditions are met:
5+
//
6+
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7+
// disclaimer.
8+
//
9+
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10+
// following disclaimer in the documentation and/or other materials provided with the distribution.
11+
//
12+
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13+
// products derived from this software without specific prior written permission.
14+
//
15+
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16+
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17+
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18+
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19+
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20+
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21+
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22+
23+
use crate::executor::handles::ServiceHandles;
24+
use futures::{future, Future, IntoFuture};
25+
use std::{any::Any, hash::Hash, sync::Arc};
26+
27+
/// Builder trait for creating a service/handle pair.
28+
/// The `StackBuilder` builds impls of this trait.
29+
pub trait MakeServicePair<N> {
30+
type Future: Future<Item = (), Error = ()> + Send;
31+
type Handle: Any + Send + Sync;
32+
33+
fn make_pair(self, handles: Arc<ServiceHandles<N>>) -> (Self::Handle, Self::Future);
34+
}
35+
36+
impl<FN, F, H, N> MakeServicePair<N> for FN
37+
where
38+
FN: FnOnce(Arc<ServiceHandles<N>>) -> (H, F),
39+
F: Future<Item = (), Error = ()> + Send,
40+
H: Any,
41+
H: Send + Sync,
42+
N: Eq + Hash,
43+
{
44+
type Future = F;
45+
type Handle = H;
46+
47+
fn make_pair(self, handles: Arc<ServiceHandles<N>>) -> (Self::Handle, Self::Future) {
48+
(self)(handles)
49+
}
50+
}
51+
52+
/// Responsible for building and collecting handles and (usually long-running) service futures.
53+
/// This can be converted into a future which resolves once all contained service futures are complete
54+
/// by using the `IntoFuture` implementation.
55+
pub struct StackBuilder<N> {
56+
handles: Arc<ServiceHandles<N>>,
57+
futures: Vec<Box<dyn Future<Item = (), Error = ()> + Send>>,
58+
}
59+
60+
impl<N> StackBuilder<N>
61+
where
62+
N: Eq + Hash,
63+
N: Send + Sync + 'static,
64+
{
65+
pub fn new() -> Self {
66+
let handles = Arc::new(ServiceHandles::new());
67+
Self {
68+
handles,
69+
futures: Vec::new(),
70+
}
71+
}
72+
73+
pub fn add_service(mut self, name: N, maker: impl MakeServicePair<N> + Send + 'static) -> Self {
74+
let (handle, fut) = maker.make_pair(self.handles.clone());
75+
self.handles.insert(name, handle);
76+
self.futures.push(Box::new(fut));
77+
self
78+
}
79+
}
80+
81+
impl<N> IntoFuture for StackBuilder<N> {
82+
type Error = ();
83+
type Item = ();
84+
85+
existential type Future: Future<Item = (), Error = ()> + Send;
86+
87+
fn into_future(self) -> Self::Future {
88+
future::join_all(self.futures).map(|_| ())
89+
}
90+
}
91+
92+
#[cfg(test)]
93+
mod test {
94+
use super::*;
95+
use futures::{future::poll_fn, Async};
96+
use std::sync::atomic::{AtomicBool, Ordering};
97+
98+
#[test]
99+
fn service_stack_new() {
100+
let state = Arc::new(AtomicBool::new(false));
101+
let state_inner = Arc::clone(&state.clone());
102+
let service_pair_factory = |handles: Arc<ServiceHandles<&'static str>>| {
103+
let fut = poll_fn(move || {
104+
// Test that this futures own handle is available
105+
let fake_handle = handles.get_handle::<&str>(&"test-service").unwrap();
106+
assert_eq!(fake_handle, "Fake Handle");
107+
let not_found = handles.get_handle::<&str>(&"not-found");
108+
assert!(not_found.is_none());
109+
110+
// Any panics above won't fail the test so a marker bool is set
111+
// if there are no panics.
112+
// catch_unwind could be used but then the UnwindSafe trait bound
113+
// needs to be added. TODO: handle panics in service poll functions
114+
state_inner.store(true, Ordering::Release);
115+
Ok(Async::Ready(()))
116+
});
117+
118+
("Fake Handle", fut)
119+
};
120+
121+
tokio::run(
122+
StackBuilder::new()
123+
.add_service("test-service", service_pair_factory)
124+
.into_future(),
125+
);
126+
127+
assert!(state.load(Ordering::Acquire))
128+
}
129+
}

base_layer/p2p/src/executor/handle.rs

Lines changed: 0 additions & 184 deletions
This file was deleted.

0 commit comments

Comments
 (0)