|
| 1 | +// This module provides a relatively simple thread-safe pool of reusable |
| 2 | +// objects. For the most part, it's implemented by a stack represented by a |
| 3 | +// Mutex<Vec<T>>. It has one small trick: because unlocking a mutex is somewhat |
| 4 | +// costly, in the case where a pool is accessed by the first thread that tried |
| 5 | +// to get a value, we bypass the mutex. Here are some benchmarks showing the |
| 6 | +// difference. |
| 7 | +// |
| 8 | +// 1) misc::anchored_literal_long_non_match 21 (18571 MB/s) |
| 9 | +// 2) misc::anchored_literal_long_non_match 107 (3644 MB/s) |
| 10 | +// 3) misc::anchored_literal_long_non_match 45 (8666 MB/s) |
| 11 | +// 4) misc::anchored_literal_long_non_match 19 (20526 MB/s) |
| 12 | +// |
| 13 | +// (1) represents our baseline: the master branch at the time of writing when |
| 14 | +// using the 'thread_local' crate to implement the pool below. |
| 15 | +// |
| 16 | +// (2) represents a naive pool implemented completely via Mutex<Vec<T>>. There |
| 17 | +// is no special trick for bypassing the mutex. |
| 18 | +// |
| 19 | +// (3) is the same as (2), except it uses Mutex<Vec<Box<T>>>. It is twice as |
| 20 | +// fast because a Box<T> is much smaller than the T we use with a Pool in this |
| 21 | +// crate. So pushing and popping a Box<T> from a Vec is quite a bit faster |
| 22 | +// than for T. |
| 23 | +// |
| 24 | +// (4) is the same as (3), but with the trick for bypassing the mutex in the |
| 25 | +// case of the first-to-get thread. |
| 26 | +// |
| 27 | +// Why move off of thread_local? Even though (4) is a hair faster than (1) |
| 28 | +// above, this was not the main goal. The main goal was to move off of |
| 29 | +// thread_local and find a way to *simply* re-capture some of its speed for |
| 30 | +// regex's specific case. So again, why move off of it? The *primary* reason is |
| 31 | +// because of memory leaks. See https://github.com/rust-lang/regex/issues/362 |
| 32 | +// for example. (Why do I want it to be simple? Well, I suppose what I mean is, |
| 33 | +// "use as much safe code as possible to minimize risk and be as sure as I can |
| 34 | +// be that it is correct.") |
| 35 | +// |
| 36 | +// My guess is that the thread_local design is probably not appropriate for |
| 37 | +// regex since its memory usage scales to the number of active threads that |
| 38 | +// have used a regex, where as the pool below scales to the number of threads |
| 39 | +// that simultaneously use a regex. While neither case permits contraction, |
| 40 | +// since we own the pool data structure below, we can add contraction if a |
| 41 | +// clear use case pops up in the wild. More pressingly though, it seems that |
| 42 | +// there are at least some use case patterns where one might have many threads |
| 43 | +// sitting around that might have used a regex at one point. While thread_local |
| 44 | +// does try to reuse space previously used by a thread that has since stopped, |
| 45 | +// its maximal memory usage still scales with the total number of active |
| 46 | +// threads. In contrast, the pool below scales with the total number of threads |
| 47 | +// *simultaneously* using the pool. The hope is that this uses less memory |
| 48 | +// overall. And if it doesn't, we can hopefully tune it somehow. |
| 49 | +// |
| 50 | +// It seems that these sort of conditions happen frequently |
| 51 | +// in FFI inside of other more "managed" languages. This was |
| 52 | +// mentioned in the issue linked above, and also mentioned here: |
| 53 | +// https://github.com/BurntSushi/rure-go/issues/3. And in particular, users |
| 54 | +// confirm that disabling the use of thread_local resolves the leak. |
| 55 | +// |
| 56 | +// There were other weaker reasons for moving off of thread_local as well. |
| 57 | +// Namely, at the time, I was looking to reduce dependencies. And for something |
| 58 | +// like regex, maintenance can be simpler when we own the full dependency tree. |
| 59 | + |
| 60 | +use std::panic::{RefUnwindSafe, UnwindSafe}; |
| 61 | +use std::sync::atomic::{AtomicUsize, Ordering}; |
| 62 | +use std::sync::Mutex; |
| 63 | + |
| 64 | +/// An atomic counter used to allocate thread IDs. |
| 65 | +static COUNTER: AtomicUsize = AtomicUsize::new(1); |
| 66 | + |
| 67 | +thread_local!( |
| 68 | + /// A thread local used to assign an ID to a thread. |
| 69 | + static THREAD_ID: usize = { |
| 70 | + let next = COUNTER.fetch_add(1, Ordering::Relaxed); |
| 71 | + // SAFETY: We cannot permit the reuse of thread IDs since reusing a |
| 72 | + // thread ID might result in more than one thread "owning" a pool, |
| 73 | + // and thus, permit accessing a mutable value from multiple threads |
| 74 | + // simultaneously without synchronization. The intent of this panic is |
| 75 | + // to be a sanity check. It is not expected that the thread ID space |
| 76 | + // will actually be exhausted in practice. |
| 77 | + // |
| 78 | + // This checks that the counter never wraps around, since atomic |
| 79 | + // addition wraps around on overflow. |
| 80 | + if next == 0 { |
| 81 | + panic!("regex: thread ID allocation space exhausted"); |
| 82 | + } |
| 83 | + next |
| 84 | + }; |
| 85 | +); |
| 86 | + |
| 87 | +/// The type of the function used to create values in a pool when the pool is |
| 88 | +/// empty and the caller requests one. |
| 89 | +type CreateFn<T> = |
| 90 | + Box<dyn Fn() -> T + Send + Sync + UnwindSafe + RefUnwindSafe + 'static>; |
| 91 | + |
| 92 | +/// A simple thread safe pool for reusing values. |
| 93 | +/// |
| 94 | +/// Getting a value out comes with a guard. When that guard is dropped, the |
| 95 | +/// value is automatically put back in the pool. |
| 96 | +/// |
| 97 | +/// A Pool<T> impls Sync when T is Send (even if it's not Sync). This means |
| 98 | +/// that T can use interior mutability. This is possible because a pool is |
| 99 | +/// guaranteed to provide a value to exactly one thread at any time. |
| 100 | +/// |
| 101 | +/// Currently, a pool never contracts in size. Its size is proportional to the |
| 102 | +/// number of simultaneous uses. |
| 103 | +pub struct Pool<T> { |
| 104 | + /// A stack of T values to hand out. These are used when a Pool is |
| 105 | + /// accessed by a thread that didn't create it. |
| 106 | + stack: Mutex<Vec<Box<T>>>, |
| 107 | + /// A function to create more T values when stack is empty and a caller |
| 108 | + /// has requested a T. |
| 109 | + create: CreateFn<T>, |
| 110 | + /// The ID of the thread that owns this pool. The owner is the thread |
| 111 | + /// that makes the first call to 'get'. When the owner calls 'get', it |
| 112 | + /// gets 'owner_val' directly instead of returning a T from 'stack'. |
| 113 | + /// See comments elsewhere for details, but this is intended to be an |
| 114 | + /// optimization for the common case that makes getting a T faster. |
| 115 | + /// |
| 116 | + /// It is initialized to a value of zero (an impossible thread ID) as a |
| 117 | + /// sentinel to indicate that it is unowned. |
| 118 | + owner: AtomicUsize, |
| 119 | + /// A value to return when the caller is in the same thread that created |
| 120 | + /// the Pool. |
| 121 | + owner_val: T, |
| 122 | +} |
| 123 | + |
| 124 | +// SAFETY: Since we want to use a Pool from multiple threads simultaneously |
| 125 | +// behind an Arc, we need for it to be Sync. In cases where T is sync, Pool<T> |
| 126 | +// would be Sync. However, since we use a Pool to store mutable scratch space, |
| 127 | +// we wind up using a T that has interior mutability and is thus itself not |
| 128 | +// Sync. So what we *really* want is for our Pool<T> to by Sync even when T is |
| 129 | +// not Sync (but is at least Send). |
| 130 | +// |
| 131 | +// The only non-sync aspect of a Pool is its 'owner_val' field, which is used |
| 132 | +// to implement faster access to a pool value in the common case of a pool |
| 133 | +// being accessed in the same thread in which it was created. The 'stack' field |
| 134 | +// is also shared, but a Mutex<T> where T: Send is already Sync. So we only |
| 135 | +// need to worry about 'owner_val'. |
| 136 | +// |
| 137 | +// The key is to guarantee that 'owner_val' can only ever be accessed from one |
| 138 | +// thread. In our implementation below, we guarantee this by only returning the |
| 139 | +// 'owner_val' when the ID of the current thread matches the ID of the thread |
| 140 | +// that created the Pool. Since this can only ever be one thread, it follows |
| 141 | +// that only one thread can access 'owner_val' at any point in time. Thus, it |
| 142 | +// is safe to declare that Pool<T> is Sync when T is Send. |
| 143 | +// |
| 144 | +// NOTE: It would also be possible to make the owning thread be the *first* |
| 145 | +// thread that tries to get a value out of a Pool. However, the current |
| 146 | +// implementation is a little simpler and it's not clear if making the first |
| 147 | +// thread (rather than the creating thread) is meaningfully better. |
| 148 | +// |
| 149 | +// If there is a way to achieve our performance goals using safe code, then |
| 150 | +// I would very much welcome a patch. As it stands, the implementation below |
| 151 | +// tries to balance safety with performance. The case where a Regex is used |
| 152 | +// from multiple threads simultaneously will suffer a bit since getting a cache |
| 153 | +// will require unlocking a mutex. |
| 154 | +unsafe impl<T: Send> Sync for Pool<T> {} |
| 155 | + |
| 156 | +impl<T: ::std::fmt::Debug> ::std::fmt::Debug for Pool<T> { |
| 157 | + fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { |
| 158 | + f.debug_struct("Pool") |
| 159 | + .field("stack", &self.stack) |
| 160 | + .field("owner", &self.owner) |
| 161 | + .field("owner_val", &self.owner_val) |
| 162 | + .finish() |
| 163 | + } |
| 164 | +} |
| 165 | + |
| 166 | +/// A guard that is returned when a caller requests a value from the pool. |
| 167 | +/// |
| 168 | +/// The purpose of the guard is to use RAII to automatically put the value back |
| 169 | +/// in the pool once it's dropped. |
| 170 | +#[derive(Debug)] |
| 171 | +pub struct PoolGuard<'a, T: 'a + Send> { |
| 172 | + /// The pool that this guard is attached to. |
| 173 | + pool: &'a Pool<T>, |
| 174 | + /// This is None when the guard represents the special "owned" value. In |
| 175 | + /// which case, the value is retrieved from 'pool.owner_val'. |
| 176 | + value: Option<Box<T>>, |
| 177 | +} |
| 178 | + |
| 179 | +impl<T: Send> Pool<T> { |
| 180 | + /// Create a new pool. The given closure is used to create values in the |
| 181 | + /// pool when necessary. |
| 182 | + pub fn new(create: CreateFn<T>) -> Pool<T> { |
| 183 | + let owner = AtomicUsize::new(0); |
| 184 | + let owner_val = create(); |
| 185 | + Pool { stack: Mutex::new(vec![]), create, owner, owner_val } |
| 186 | + } |
| 187 | + |
| 188 | + /// Get a value from the pool. The caller is guaranteed to have exclusive |
| 189 | + /// access to the given value. |
| 190 | + /// |
| 191 | + /// Note that there is no guarantee provided about which value in the |
| 192 | + /// pool is returned. That is, calling get, dropping the guard (causing |
| 193 | + /// the value to go back into the pool) and then calling get again is NOT |
| 194 | + /// guaranteed to return the same value received in the first get call. |
| 195 | + #[cfg_attr(feature = "perf-inline", inline(always))] |
| 196 | + pub fn get(&self) -> PoolGuard<T> { |
| 197 | + // Our fast path checks if the caller is the thread that "owns" this |
| 198 | + // pool. Or stated differently, whether it is the first thread that |
| 199 | + // tried to extract a value from the pool. If it is, then we can return |
| 200 | + // a T to the caller without going through a mutex. |
| 201 | + // |
| 202 | + // SAFETY: We must guarantee that only one thread gets access to this |
| 203 | + // value. Since a thread is uniquely identified by the THREAD_ID thread |
| 204 | + // local, it follows that is the caller's thread ID is equal to the |
| 205 | + // owner, then only one thread may receive this value. |
| 206 | + let caller = THREAD_ID.with(|id| *id); |
| 207 | + let owner = self.owner.load(Ordering::Relaxed); |
| 208 | + if caller == owner { |
| 209 | + return self.guard_owned(); |
| 210 | + } |
| 211 | + self.get_slow(caller, owner) |
| 212 | + } |
| 213 | + |
| 214 | + /// This is the "slow" version that goes through a mutex to pop an |
| 215 | + /// allocated value off a stack to return to the caller. (Or, if the stack |
| 216 | + /// is empty, a new value is created.) |
| 217 | + /// |
| 218 | + /// If the pool has no owner, then this will set the owner. |
| 219 | + #[cold] |
| 220 | + fn get_slow(&self, caller: usize, owner: usize) -> PoolGuard<T> { |
| 221 | + use std::sync::atomic::Ordering::Relaxed; |
| 222 | + |
| 223 | + if owner == 0 { |
| 224 | + // The sentinel 0 value means this pool is not yet owned. We |
| 225 | + // try to atomically set the owner. If we do, then this thread |
| 226 | + // becomes the owner and we can return a guard that represents |
| 227 | + // the special T for the owner. |
| 228 | + let res = self.owner.compare_exchange(0, caller, Relaxed, Relaxed); |
| 229 | + if res.is_ok() { |
| 230 | + return self.guard_owned(); |
| 231 | + } |
| 232 | + } |
| 233 | + let mut stack = self.stack.lock().unwrap(); |
| 234 | + let value = match stack.pop() { |
| 235 | + None => Box::new((self.create)()), |
| 236 | + Some(value) => value, |
| 237 | + }; |
| 238 | + self.guard_stack(value) |
| 239 | + } |
| 240 | + |
| 241 | + /// Puts a value back into the pool. Callers don't need to call this. Once |
| 242 | + /// the guard that's returned by 'get' is dropped, it is put back into the |
| 243 | + /// pool automatically. |
| 244 | + fn put(&self, value: Box<T>) { |
| 245 | + let mut stack = self.stack.lock().unwrap(); |
| 246 | + stack.push(value); |
| 247 | + } |
| 248 | + |
| 249 | + /// Create a guard that represents the special owned T. |
| 250 | + fn guard_owned(&self) -> PoolGuard<'_, T> { |
| 251 | + PoolGuard { pool: self, value: None } |
| 252 | + } |
| 253 | + |
| 254 | + /// Create a guard that contains a value from the pool's stack. |
| 255 | + fn guard_stack(&self, value: Box<T>) -> PoolGuard<'_, T> { |
| 256 | + PoolGuard { pool: self, value: Some(value) } |
| 257 | + } |
| 258 | +} |
| 259 | + |
| 260 | +impl<'a, T: Send> PoolGuard<'a, T> { |
| 261 | + /// Return the underlying value. |
| 262 | + pub fn value(&self) -> &T { |
| 263 | + match self.value { |
| 264 | + None => &self.pool.owner_val, |
| 265 | + Some(ref v) => &**v, |
| 266 | + } |
| 267 | + } |
| 268 | +} |
| 269 | + |
| 270 | +impl<'a, T: Send> Drop for PoolGuard<'a, T> { |
| 271 | + #[cfg_attr(feature = "perf-inline", inline(always))] |
| 272 | + fn drop(&mut self) { |
| 273 | + if let Some(value) = self.value.take() { |
| 274 | + self.pool.put(value); |
| 275 | + } |
| 276 | + } |
| 277 | +} |
| 278 | + |
| 279 | +#[cfg(test)] |
| 280 | +mod tests { |
| 281 | + use std::panic::{RefUnwindSafe, UnwindSafe}; |
| 282 | + |
| 283 | + use super::*; |
| 284 | + |
| 285 | + #[test] |
| 286 | + fn oibits() { |
| 287 | + use exec::ProgramCache; |
| 288 | + |
| 289 | + fn has_oibits<T: Send + Sync + UnwindSafe + RefUnwindSafe>() {} |
| 290 | + has_oibits::<Pool<ProgramCache>>(); |
| 291 | + } |
| 292 | + |
| 293 | + // Tests that Pool implements the "single owner" optimization. That is, the |
| 294 | + // thread that first accesses the pool gets its own copy, while all other |
| 295 | + // threads get distinct copies. |
| 296 | + #[test] |
| 297 | + fn thread_owner_optimization() { |
| 298 | + use std::cell::RefCell; |
| 299 | + use std::sync::Arc; |
| 300 | + |
| 301 | + let pool: Arc<Pool<RefCell<Vec<char>>>> = |
| 302 | + Arc::new(Pool::new(Box::new(|| RefCell::new(vec!['a'])))); |
| 303 | + pool.get().value().borrow_mut().push('x'); |
| 304 | + |
| 305 | + let pool1 = pool.clone(); |
| 306 | + let t1 = std::thread::spawn(move || { |
| 307 | + let guard = pool1.get(); |
| 308 | + let v = guard.value(); |
| 309 | + v.borrow_mut().push('y'); |
| 310 | + }); |
| 311 | + |
| 312 | + let pool2 = pool.clone(); |
| 313 | + let t2 = std::thread::spawn(move || { |
| 314 | + let guard = pool2.get(); |
| 315 | + let v = guard.value(); |
| 316 | + v.borrow_mut().push('z'); |
| 317 | + }); |
| 318 | + |
| 319 | + t1.join().unwrap(); |
| 320 | + t2.join().unwrap(); |
| 321 | + |
| 322 | + // If we didn't implement the single owner optimization, then one of |
| 323 | + // the threads above is likely to have mutated the [a, x] vec that |
| 324 | + // we stuffed in the pool before spawning the threads. But since |
| 325 | + // neither thread was first to access the pool, and because of the |
| 326 | + // optimization, we should be guaranteed that neither thread mutates |
| 327 | + // the special owned pool value. |
| 328 | + // |
| 329 | + // (Technically this is an implementation detail and not a contract of |
| 330 | + // Pool's API.) |
| 331 | + assert_eq!(vec!['a', 'x'], *pool.get().value().borrow()); |
| 332 | + } |
| 333 | +} |
0 commit comments