Skip to content

Commit fc219f4

Browse files
committed
Hasher::update_with_join
This is a new interface that allows the caller to provide a multi-threading implementation. It's defined in terms of a new `Join` trait, for which we provide two implementations, `SerialJoin` and `RayonJoin`. This lets the caller control when multi-threading is used, rather than the previous all-or-nothing design of the "rayon" feature. Although existing callers should keep working, this is a compatibility break, because callers who were relying on automatic multi-threading before will now be single-threaded. Thus the next release of this crate will need to be version 0.2. See #25 and #54.
1 parent 24071db commit fc219f4

File tree

6 files changed

+377
-50
lines changed

6 files changed

+377
-50
lines changed

Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ c_avx512 = []
2121
c_neon = []
2222
std = ["digest/std"]
2323

24+
[package.metadata.docs.rs]
25+
# Document blake3::join::RayonJoin on docs.rs.
26+
features = ["rayon"]
27+
2428
[dependencies]
2529
arrayref = "0.3.5"
2630
arrayvec = { version = "0.5.1", default-features = false, features = ["array-sizes-33-128"] }

b3sum/src/main.rs

Lines changed: 38 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use clap::{App, Arg};
33
use std::cmp;
44
use std::convert::TryInto;
55
use std::fs::File;
6+
use std::io;
67
use std::io::prelude::*;
78

89
const FILE_ARG: &str = "file";
@@ -57,21 +58,37 @@ fn clap_parse_argv() -> clap::ArgMatches<'static> {
5758
.get_matches()
5859
}
5960

61+
// A 16 KiB buffer is enough to take advantage of all the SIMD instruction sets
62+
// that we support, but `std::io::copy` currently uses 8 KiB. Most platforms
63+
// can support at least 64 KiB, and there's some performance benefit to using
64+
// bigger reads, so that's what we use here.
65+
fn copy_wide(mut reader: impl Read, hasher: &mut blake3::Hasher) -> io::Result<u64> {
66+
let mut buffer = [0; 65536];
67+
let mut total = 0;
68+
loop {
69+
match reader.read(&mut buffer) {
70+
Ok(0) => return Ok(total),
71+
Ok(n) => {
72+
hasher.update(&buffer[..n]);
73+
total += n as u64;
74+
}
75+
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => continue,
76+
Err(e) => return Err(e),
77+
}
78+
}
79+
}
80+
6081
// The slow path, for inputs that we can't memmap.
61-
fn hash_reader(
62-
base_hasher: &blake3::Hasher,
63-
mut reader: impl Read,
64-
) -> Result<blake3::OutputReader> {
82+
fn hash_reader(base_hasher: &blake3::Hasher, reader: impl Read) -> Result<blake3::OutputReader> {
6583
let mut hasher = base_hasher.clone();
66-
// TODO: This is a narrow copy, so it might not take advantage of SIMD or
67-
// threads. With a larger buffer size, most of that performance can be
68-
// recovered. However, this requires some platform-specific tuning, based
69-
// on both the SIMD degree and the number of cores. A double-buffering
70-
// strategy is also helpful, where a dedicated background thread reads
71-
// input into one buffer while another thread is calling update() on a
72-
// second buffer. Since this is the slow path anyway, do the simple thing
73-
// for now.
74-
std::io::copy(&mut reader, &mut hasher)?;
84+
// This is currently all single-threaded. Doing multi-threaded hashing
85+
// without memory mapping is tricky, since all your worker threads have to
86+
// stop every time you refill the buffer, and that ends up being a lot of
87+
// overhead. To solve that, we need a more complicated double-buffering
88+
// strategy where a background thread fills one buffer while the worker
89+
// threads are hashing the other one. We might implement that in the
90+
// future, but since this is the slow path anyway, it's not high priority.
91+
copy_wide(reader, &mut hasher)?;
7592
Ok(hasher.finalize_xof())
7693
}
7794

@@ -114,7 +131,14 @@ fn maybe_hash_memmap(
114131
#[cfg(feature = "memmap")]
115132
{
116133
if let Some(map) = maybe_memmap_file(_file)? {
117-
return Ok(Some(_base_hasher.clone().update(&map).finalize_xof()));
134+
// Memory mapping worked. Use Rayon-based multi-threading to split
135+
// up the whole file across many worker threads.
136+
return Ok(Some(
137+
_base_hasher
138+
.clone()
139+
.update_with_join::<blake3::join::RayonJoin>(&map)
140+
.finalize_xof(),
141+
));
118142
}
119143
}
120144
Ok(None)

benches/bench.rs

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -417,3 +417,85 @@ fn bench_reference_0512_kib(b: &mut Bencher) {
417417
fn bench_reference_1024_kib(b: &mut Bencher) {
418418
bench_reference(b, 1024 * KIB);
419419
}
420+
421+
#[cfg(feature = "rayon")]
422+
fn bench_rayon(b: &mut Bencher, len: usize) {
423+
let mut input = RandomInput::new(b, len);
424+
b.iter(|| {
425+
blake3::Hasher::new()
426+
.update_with_join::<blake3::join::RayonJoin>(input.get())
427+
.finalize()
428+
});
429+
}
430+
431+
#[bench]
432+
#[cfg(feature = "rayon")]
433+
fn bench_rayon_0001_block(b: &mut Bencher) {
434+
bench_rayon(b, BLOCK_LEN);
435+
}
436+
437+
#[bench]
438+
#[cfg(feature = "rayon")]
439+
fn bench_rayon_0001_kib(b: &mut Bencher) {
440+
bench_rayon(b, 1 * KIB);
441+
}
442+
443+
#[bench]
444+
#[cfg(feature = "rayon")]
445+
fn bench_rayon_0002_kib(b: &mut Bencher) {
446+
bench_rayon(b, 2 * KIB);
447+
}
448+
449+
#[bench]
450+
#[cfg(feature = "rayon")]
451+
fn bench_rayon_0004_kib(b: &mut Bencher) {
452+
bench_rayon(b, 4 * KIB);
453+
}
454+
455+
#[bench]
456+
#[cfg(feature = "rayon")]
457+
fn bench_rayon_0008_kib(b: &mut Bencher) {
458+
bench_rayon(b, 8 * KIB);
459+
}
460+
461+
#[bench]
462+
#[cfg(feature = "rayon")]
463+
fn bench_rayon_0016_kib(b: &mut Bencher) {
464+
bench_rayon(b, 16 * KIB);
465+
}
466+
467+
#[bench]
468+
#[cfg(feature = "rayon")]
469+
fn bench_rayon_0032_kib(b: &mut Bencher) {
470+
bench_rayon(b, 32 * KIB);
471+
}
472+
473+
#[bench]
474+
#[cfg(feature = "rayon")]
475+
fn bench_rayon_0064_kib(b: &mut Bencher) {
476+
bench_rayon(b, 64 * KIB);
477+
}
478+
479+
#[bench]
480+
#[cfg(feature = "rayon")]
481+
fn bench_rayon_0128_kib(b: &mut Bencher) {
482+
bench_rayon(b, 128 * KIB);
483+
}
484+
485+
#[bench]
486+
#[cfg(feature = "rayon")]
487+
fn bench_rayon_0256_kib(b: &mut Bencher) {
488+
bench_rayon(b, 256 * KIB);
489+
}
490+
491+
#[bench]
492+
#[cfg(feature = "rayon")]
493+
fn bench_rayon_0512_kib(b: &mut Bencher) {
494+
bench_rayon(b, 512 * KIB);
495+
}
496+
497+
#[bench]
498+
#[cfg(feature = "rayon")]
499+
fn bench_rayon_1024_kib(b: &mut Bencher) {
500+
bench_rayon(b, 1024 * KIB);
501+
}

src/join.rs

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
//! The multi-threading abstractions used by [`Hasher::update_with_join`].
2+
//!
3+
//! Different implementations of the `Join` trait determine whether
4+
//! [`Hasher::update_with_join`] performs multi-threading on sufficiently large
5+
//! inputs. The `SerialJoin` implementation is single-threaded, and the
6+
//! `RayonJoin` implementation (gated by the `rayon` feature) is
7+
//! multi-threaded. Interfaces other than [`Hasher::update_with_join`], like
8+
//! [`hash`] and [`Hasher::update`], always use `SerialJoin` internally.
9+
//!
10+
//! The `Join` trait is an almost exact copy of the [`rayon::join`] API, and
11+
//! `RayonJoin` is the only non-trivial implementation provided. The only
12+
//! difference between the function signature in the `Join` trait and the
13+
//! underlying one in Rayon, is that the trait method includes two length
14+
//! parameters. This gives an implementation the option of e.g. setting a
15+
//! subtree size threshold below which it keeps splits on the same thread.
16+
//! However, neither of the two provided implementations currently makes use of
17+
//! those parameters. Note that in Rayon, the very first `join` call is more
18+
//! expensive than subsequent calls, because it moves work from the calling
19+
//! thread into the thread pool. That makes a coarse-grained input length
20+
//! threshold in the caller more effective than a fine-grained subtree size
21+
//! threshold after the implementation has already started recursing.
22+
//!
23+
//! # Example
24+
//!
25+
//! ```
26+
//! // Hash a large input using multi-threading. Note that multi-threading
27+
//! // comes with some overhead, and it can actually hurt performance for small
28+
//! // inputs. The meaning of "small" varies, however, depending on the
29+
//! // platform and the number of threads. (On x86_64, the cutoff tends to be
30+
//! // around 128 KiB.) You should benchmark your own use case to see whether
31+
//! // multi-threading helps.
32+
//! # #[cfg(feature = "rayon")]
33+
//! # {
34+
//! # fn some_large_input() -> &'static [u8] { b"foo" }
35+
//! let input: &[u8] = some_large_input();
36+
//! let mut hasher = blake3::Hasher::new();
37+
//! hasher.update_with_join::<blake3::join::RayonJoin>(input);
38+
//! let hash = hasher.finalize();
39+
//! # }
40+
//! ```
41+
//!
42+
//! [`Hasher::update_with_join`]: ../struct.Hasher.html#method.update_with_join
43+
//! [`Hasher::update`]: ../struct.Hasher.html#method.update
44+
//! [`hash`]: ../fn.hash.html
45+
//! [`rayon::join`]: https://docs.rs/rayon/1.3.0/rayon/fn.join.html
46+
47+
/// The trait that abstracts over single-threaded and multi-threaded recursion.
48+
pub trait Join {
49+
fn join<A, B, RA, RB>(oper_a: A, oper_b: B, len_a: usize, len_b: usize) -> (RA, RB)
50+
where
51+
A: FnOnce() -> RA + Send,
52+
B: FnOnce() -> RB + Send,
53+
RA: Send,
54+
RB: Send;
55+
}
56+
57+
/// The trivial, serial implementation of `Join`. The left and right sides are
58+
/// executed one after the other, on the calling thread. The standalone hashing
59+
/// functions and the `Hasher::update` method use this implementation
60+
/// internally.
61+
pub enum SerialJoin {}
62+
63+
impl Join for SerialJoin {
64+
#[inline]
65+
fn join<A, B, RA, RB>(oper_a: A, oper_b: B, _len_a: usize, _len_b: usize) -> (RA, RB)
66+
where
67+
A: FnOnce() -> RA + Send,
68+
B: FnOnce() -> RB + Send,
69+
RA: Send,
70+
RB: Send,
71+
{
72+
(oper_a(), oper_b())
73+
}
74+
}
75+
76+
/// The Rayon-based implementation of `Join`. The left and right sides are
77+
/// executed on the Rayon thread pool, potentially in parallel. This
78+
/// implementation is gated by the `rayon` feature, which is off by default.
79+
#[cfg(feature = "rayon")]
80+
pub enum RayonJoin {}
81+
82+
#[cfg(feature = "rayon")]
83+
impl Join for RayonJoin {
84+
#[inline]
85+
fn join<A, B, RA, RB>(oper_a: A, oper_b: B, _len_a: usize, _len_b: usize) -> (RA, RB)
86+
where
87+
A: FnOnce() -> RA + Send,
88+
B: FnOnce() -> RB + Send,
89+
RA: Send,
90+
RB: Send,
91+
{
92+
rayon::join(oper_a, oper_b)
93+
}
94+
}
95+
96+
#[cfg(test)]
97+
mod test {
98+
use super::*;
99+
100+
#[test]
101+
fn test_serial_join() {
102+
let oper_a = || 1 + 1;
103+
let oper_b = || 2 + 2;
104+
assert_eq!((2, 4), SerialJoin::join(oper_a, oper_b, 3, 4));
105+
}
106+
107+
#[test]
108+
#[cfg(feature = "rayon")]
109+
fn test_rayon_join() {
110+
let oper_a = || 1 + 1;
111+
let oper_b = || 2 + 2;
112+
assert_eq!((2, 4), RayonJoin::join(oper_a, oper_b, 3, 4));
113+
}
114+
}

0 commit comments

Comments
 (0)