Skip to content

Commit 3c32c84

Browse files
j-hajAndyGauge
andauthored
Adds crossbeam channel example of a parallel data pipeline. (#554)
Co-authored-by: Andrew Gauger <[email protected]>
1 parent a00ff78 commit 3c32c84

File tree

3 files changed

+89
-0
lines changed

3 files changed

+89
-0
lines changed

src/concurrency.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
| Recipe | Crates | Categories |
44
|--------|--------|------------|
55
| [Spawn a short-lived thread][ex-crossbeam-spawn] | [![crossbeam-badge]][crossbeam] | [![cat-concurrency-badge]][cat-concurrency] |
6+
| [Create a parallel data pipeline][ex-crossbeam-pipeline] | [![crossbeam-badge]][crossbeam] | [![cat-concurrency-badge]][cat-concurrency] |
67
| [Pass data between two threads][ex-crossbeam-spsc] | [![crossbeam-badge]][crossbeam] | [![cat-concurrency-badge]][cat-concurrency] |
78
| [Maintain global mutable state][ex-global-mut-state] | [![lazy_static-badge]][lazy_static] | [![cat-rust-patterns-badge]][cat-rust-patterns] |
89
| [Calculate SHA1 sum of *.iso files concurrently][ex-threadpool-walk] | [![threadpool-badge]][threadpool] [![walkdir-badge]][walkdir] [![num_cpus-badge]][num_cpus] [![ring-badge]][ring] | [![cat-concurrency-badge]][cat-concurrency][![cat-filesystem-badge]][cat-filesystem] |
@@ -16,6 +17,7 @@
1617

1718

1819
[ex-crossbeam-spawn]: concurrency/threads.html#spawn-a-short-lived-thread
20+
[ex-crossbeam-pipeline]: concurrency/threads.html#create-a-parallel-pipeline
1921
[ex-crossbeam-spsc]: concurrency/threads.html#pass-data-between-two-threads
2022
[ex-global-mut-state]: concurrency/threads.html#maintain-global-mutable-state
2123
[ex-threadpool-walk]: concurrency/threads.html#calculate-sha256-sum-of-iso-files-concurrently
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
## Create a parallel pipeline
2+
3+
[![crossbeam-badge]][crossbeam] [![cat-concurrency-badge]][cat-concurrency]
4+
5+
This example uses the [crossbeam] and [crossbeam-channel] crates to create
6+
a parallel pipline, similar to that described in the ZeroMQ [guide]
7+
There is a data source and a data sink, with data being processed by two worker
8+
threads in parallel on its way from the source to the sink.
9+
10+
We use bounded channels with a capacity of one using
11+
[`crossbeam_channel::bounded`]. The producer must be on its own thread because
12+
it produces messages faster than the workers can process them (since they sleep
13+
for half a second) - this means the producer blocks on the call to
14+
`[crossbeam_channel::Sender::send`] for half a second until one of the workers
15+
processes the data in the channel. Also note that the data in the channel is
16+
consumed by whichever worker calls receive first, so each message is delivered
17+
to a single worker rather than both workers.
18+
19+
Reading from the channels via the iterator
20+
[`crossbeam_channel::Receiver::iter`] method will block, either waiting
21+
for new messages or until the channel is closed. Because the channels were
22+
created within the [`crossbeam::scope`], we must manually close them via `drop`
23+
to prevent the entire program from blocking on the worker for-loops. You can
24+
think of the calls to `drop` as signaling that no more messages will be sent.
25+
26+
27+
```rust
28+
extern crate crossbeam;
29+
extern crate crossbeam_channel;
30+
31+
use std::thread;
32+
use std::time::Duration;
33+
use crossbeam_channel::bounded;
34+
35+
fn main() {
36+
let (snd1, rcv1) = bounded(1);
37+
let (snd2, rcv2) = bounded(1);
38+
let n_msgs = 4;
39+
let n_workers = 2;
40+
41+
crossbeam::scope(|s| {
42+
// Producer thread
43+
s.spawn(|_| {
44+
for i in 0..n_msgs {
45+
snd1.send(i).unwrap();
46+
println!("Source sent {}", i);
47+
}
48+
// Close the channel - this is necessary to exit
49+
// the for-loop in the worker
50+
drop(snd1);
51+
});
52+
53+
// Parallel processing by 2 threads
54+
for _ in 0..n_workers {
55+
// Send to sink, receive from source
56+
let (sendr, recvr) = (snd2.clone(), rcv1.clone());
57+
// Spawn workers in separate threads
58+
s.spawn(move |_| {
59+
thread::sleep(Duration::from_millis(500));
60+
// Receive until channel closes
61+
for msg in recvr.iter() {
62+
println!("Worker {:?} received {}.",
63+
thread::current().id(), msg);
64+
sendr.send(msg * 2).unwrap();
65+
}
66+
});
67+
}
68+
// Close the channel, otherwise sink will never
69+
// exit the for-loop
70+
drop(snd2);
71+
72+
// Sink
73+
for msg in rcv2.iter() {
74+
println!("Sink received {}", msg);
75+
}
76+
}).unwrap();
77+
}
78+
```
79+
80+
[`crossbeam::scope`]: https://docs.rs/crossbeam/*/crossbeam/fn.scope.html
81+
[crossbeam-channel]: https://docs.rs/crossbeam-channel/*/crossbeam_channel/index.html
82+
[`crossbeam_channel::bounded`]: https://docs.rs/crossbeam-channel/*/crossbeam_channel/fn.bounded.html
83+
[`crossbeam_channel::Receiver::iter`]: https://docs.rs/crossbeam-channel/*/crossbeam_channel/struct.Receiver.html#method.iter
84+
[`crossbeam_channel::Sender::send`]: https://docs.rs/crossbeam-channel/*/crossbeam_channel/struct.Sender.html#method.send
85+
[guide]: http://zguide.zeromq.org/page:all#Divide-and-Conquer

src/concurrency/threads.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
{{#include thread/crossbeam-spawn.md}}
44

5+
{{#include thread/crossbeam-complex.md}}
6+
57
{{#include thread/crossbeam-spsc.md}}
68

79
{{#include thread/global-mut-state.md}}

0 commit comments

Comments
 (0)