Skip to content

Commit 607ec56

Browse files
author
Jonathan Bastien-Filiatrault
committed
Commit ugly code that crashed rustc.
1 parent 581cf62 commit 607ec56

File tree

4 files changed

+56
-17
lines changed

4 files changed

+56
-17
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ repository = "https://github.com/zerospam/smtpbis.git"
1212

1313
[dependencies]
1414
rustyknife = {version="0.2", features=["quoted-string-rfc2047"]}
15-
tokio = {version="0.2", features=["signal", "io-util", "sync", "signal", "rt-core", "tcp", "dns", "rt-threaded"]}
15+
tokio = {version="0.2", features=["signal", "io-util", "sync", "signal", "rt-core", "tcp", "dns", "rt-threaded", "macros"]}
1616
tokio-util = {version="0.2", features=["codec"]}
1717
bytes = "0.5"
1818
futures = "0.3"

src/bin/smtpbis-server/main.rs

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ use futures_util::stream::TryStreamExt;
1515

1616
use tokio::net::{TcpListener, TcpStream};
1717
use tokio::prelude::*;
18-
use tokio::runtime::Runtime;
1918
use tokio::sync::oneshot::Receiver;
2019

2120
use tokio_rustls::rustls::{
@@ -160,18 +159,16 @@ impl DummyHandler {
160159
}
161160
}
162161

163-
fn main() -> Result<(), Box<dyn std::error::Error>> {
164-
let mut rt = Runtime::new()?;
162+
#[tokio::main]
163+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
164+
let (listen_shutdown_tx, listen_shutdown_rx) = tokio::sync::oneshot::channel();
165+
let join = tokio::spawn(listen_loop(listen_shutdown_rx));
165166

166-
rt.block_on(async {
167-
let (listen_shutdown_tx, listen_shutdown_rx) = tokio::sync::oneshot::channel();
168-
tokio::spawn(listen_loop(listen_shutdown_rx));
169-
170-
tokio::signal::ctrl_c().await.unwrap();
171-
listen_shutdown_tx.send(()).unwrap();
172-
println!("Waiting for tasks to finish...");
173-
// FIXME: actually wait on tasks here.
174-
});
167+
tokio::signal::ctrl_c().await.unwrap();
168+
listen_shutdown_tx.send(()).unwrap();
169+
println!("Waiting for tasks to finish...");
170+
join.await?;
171+
println!("Done !");
175172

176173
Ok(())
177174
}
@@ -187,7 +184,7 @@ async fn listen_loop(mut shutdown: Receiver<()>) {
187184
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
188185
let shutdown_rx = shutdown_rx.map_err(|_| ()).shared();
189186

190-
loop {
187+
smtpbis::taskjoin::join_tasks(move |spawn| Box::pin(async move { loop {
191188
let accept = listener.accept();
192189
pin_mut!(accept);
193190

@@ -197,18 +194,18 @@ async fn listen_loop(mut shutdown: Receiver<()>) {
197194
let mut shutdown_rx = shutdown_rx.clone();
198195
let tls_config = tls_config.clone();
199196

200-
tokio::spawn(async move {
197+
spawn.read().unwrap().push(tokio::spawn(async move {
201198
let smtp_res = serve_smtp(socket, addr, tls_config, &mut shutdown_rx).await;
202199
println!("SMTP task done: {:?}", smtp_res);
203-
})
200+
}))
204201
}
205202
Either::Right(..) => {
206203
println!("socket listening loop stopping");
207204
shutdown_tx.send(()).unwrap();
208205
break;
209206
}
210207
};
211-
}
208+
}})).await;
212209
}
213210

214211
async fn serve_smtp(

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ mod codecs;
44
mod reply;
55
mod server;
66
mod syntax;
7+
pub mod taskjoin;
78

89
pub use codecs::{LineCodec, LineError};
910
pub use reply::*;

src/taskjoin.rs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
use std::sync::{Arc, RwLock};
2+
3+
use futures::stream::futures_unordered::FuturesUnordered;
4+
use futures::Future;
5+
use futures::future::{Either};
6+
use futures_util::stream::StreamExt;
7+
8+
pub async fn join_tasks<Fut, Fut2, FutO, Task>(fut: Fut) -> FutO
9+
where Fut: FnOnce(Arc<RwLock<FuturesUnordered<Task>>>) -> Fut2,
10+
Fut2: Future<Output=FutO> + Unpin,
11+
FutO: Sized,
12+
Task: Future,
13+
{
14+
let fo = Arc::new(RwLock::new(FuturesUnordered::new()));
15+
let output;
16+
17+
let mut task = fut(fo.clone());
18+
19+
loop {
20+
match futures::future::select(&mut task, fo.write().unwrap().next()).await {
21+
Either::Left((spawn_output, _fo_next)) => {
22+
output = spawn_output;
23+
eprintln!("spawner ended");
24+
break;
25+
},
26+
Either::Right((_task_output, _task)) => {
27+
eprintln!("task awaited (spawner alive)");
28+
}
29+
}
30+
}
31+
32+
let mut fo = fo.write().unwrap();
33+
34+
eprintln!("waiting for remaining tasks");
35+
while !fo.is_empty() {
36+
fo.next().await;
37+
eprintln!("task awaited (join)");
38+
}
39+
eprintln!("done");
40+
output
41+
}

0 commit comments

Comments
 (0)