11use std:: cell:: UnsafeCell ;
22use std:: fmt;
3+ use std:: isize;
34use std:: ops:: { Deref , DerefMut } ;
45use std:: pin:: Pin ;
6+ use std:: process;
57use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
68
7- use slab:: Slab ;
8-
99use crate :: future:: Future ;
10- use crate :: task:: { Context , Poll , Waker } ;
10+ use crate :: sync:: WakerSet ;
11+ use crate :: task:: { Context , Poll } ;
1112
1213/// Set if a write lock is held.
1314#[ allow( clippy:: identity_op) ]
1415const WRITE_LOCK : usize = 1 << 0 ;
1516
16- /// Set if there are read operations blocked on the lock.
17- const BLOCKED_READS : usize = 1 << 1 ;
18-
19- /// Set if there are write operations blocked on the lock.
20- const BLOCKED_WRITES : usize = 1 << 2 ;
21-
2217/// The value of a single blocked read contributing to the read count.
23- const ONE_READ : usize = 1 << 3 ;
18+ const ONE_READ : usize = 1 << 1 ;
2419
2520/// The bits in which the read count is stored.
2621const READ_COUNT_MASK : usize = !( ONE_READ - 1 ) ;
@@ -56,8 +51,8 @@ const READ_COUNT_MASK: usize = !(ONE_READ - 1);
5651/// ```
5752pub struct RwLock < T > {
5853 state : AtomicUsize ,
59- reads : std :: sync :: Mutex < Slab < Option < Waker > > > ,
60- writes : std :: sync :: Mutex < Slab < Option < Waker > > > ,
54+ read_wakers : WakerSet ,
55+ write_wakers : WakerSet ,
6156 value : UnsafeCell < T > ,
6257}
6358
@@ -77,8 +72,8 @@ impl<T> RwLock<T> {
7772 pub fn new ( t : T ) -> RwLock < T > {
7873 RwLock {
7974 state : AtomicUsize :: new ( 0 ) ,
80- reads : std :: sync :: Mutex :: new ( Slab :: new ( ) ) ,
81- writes : std :: sync :: Mutex :: new ( Slab :: new ( ) ) ,
75+ read_wakers : WakerSet :: new ( ) ,
76+ write_wakers : WakerSet :: new ( ) ,
8277 value : UnsafeCell :: new ( t) ,
8378 }
8479 }
@@ -104,100 +99,61 @@ impl<T> RwLock<T> {
10499 /// # })
105100 /// ```
106101 pub async fn read ( & self ) -> RwLockReadGuard < ' _ , T > {
107- pub struct LockFuture < ' a , T > {
102+ pub struct ReadFuture < ' a , T > {
108103 lock : & ' a RwLock < T > ,
109104 opt_key : Option < usize > ,
110- acquired : bool ,
111105 }
112106
113- impl < ' a , T > Future for LockFuture < ' a , T > {
107+ impl < ' a , T > Future for ReadFuture < ' a , T > {
114108 type Output = RwLockReadGuard < ' a , T > ;
115109
116110 fn poll ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Self :: Output > {
117- match self . lock . try_read ( ) {
118- Some ( guard) => {
119- self . acquired = true ;
120- Poll :: Ready ( guard)
121- }
111+ let poll = match self . lock . try_read ( ) {
112+ Some ( guard) => Poll :: Ready ( guard) ,
122113 None => {
123- let mut reads = self . lock . reads . lock ( ) . unwrap ( ) ;
124-
125- // Register the current task.
114+ // Insert this lock operation.
126115 match self . opt_key {
127- None => {
128- // Insert a new entry into the list of blocked reads.
129- let w = cx. waker ( ) . clone ( ) ;
130- let key = reads. insert ( Some ( w) ) ;
131- self . opt_key = Some ( key) ;
132-
133- if reads. len ( ) == 1 {
134- self . lock . state . fetch_or ( BLOCKED_READS , Ordering :: Relaxed ) ;
135- }
136- }
137- Some ( key) => {
138- // There is already an entry in the list of blocked reads. Just
139- // reset the waker if it was removed.
140- if reads[ key] . is_none ( ) {
141- let w = cx. waker ( ) . clone ( ) ;
142- reads[ key] = Some ( w) ;
143- }
144- }
116+ None => self . opt_key = Some ( self . lock . read_wakers . insert ( cx) ) ,
117+ Some ( key) => self . lock . read_wakers . update ( key, cx) ,
145118 }
146119
147120 // Try locking again because it's possible the lock got unlocked just
148- // before the current task was registered as a blocked task .
121+ // before the current task was inserted into the waker set .
149122 match self . lock . try_read ( ) {
150- Some ( guard) => {
151- self . acquired = true ;
152- Poll :: Ready ( guard)
153- }
123+ Some ( guard) => Poll :: Ready ( guard) ,
154124 None => Poll :: Pending ,
155125 }
156126 }
127+ } ;
128+
129+ if poll. is_ready ( ) {
130+ // If the current task is in the set, remove it.
131+ if let Some ( key) = self . opt_key . take ( ) {
132+ self . lock . read_wakers . complete ( key) ;
133+ }
157134 }
135+
136+ poll
158137 }
159138 }
160139
161- impl < T > Drop for LockFuture < ' _ , T > {
140+ impl < T > Drop for ReadFuture < ' _ , T > {
162141 fn drop ( & mut self ) {
142+ // If the current task is still in the set, that means it is being cancelled now.
163143 if let Some ( key) = self . opt_key {
164- let mut reads = self . lock . reads . lock ( ) . unwrap ( ) ;
165- let opt_waker = reads. remove ( key) ;
166-
167- if reads. is_empty ( ) {
168- self . lock . state . fetch_and ( !BLOCKED_READS , Ordering :: Relaxed ) ;
169- }
144+ self . lock . read_wakers . cancel ( key) ;
170145
171- if opt_waker. is_none ( ) {
172- // We were awoken. Wake up another blocked read.
173- if let Some ( ( _, opt_waker) ) = reads. iter_mut ( ) . next ( ) {
174- if let Some ( w) = opt_waker. take ( ) {
175- w. wake ( ) ;
176- return ;
177- }
178- }
179- drop ( reads) ;
180-
181- if !self . acquired {
182- // We didn't acquire the lock and didn't wake another blocked read.
183- // Wake a blocked write instead.
184- let mut writes = self . lock . writes . lock ( ) . unwrap ( ) ;
185- if let Some ( ( _, opt_waker) ) = writes. iter_mut ( ) . next ( ) {
186- if let Some ( w) = opt_waker. take ( ) {
187- w. wake ( ) ;
188- return ;
189- }
190- }
191- }
146+ // If there are no active readers, wake one of the writers.
147+ if self . lock . state . load ( Ordering :: SeqCst ) & READ_COUNT_MASK == 0 {
148+ self . lock . write_wakers . notify_one ( ) ;
192149 }
193150 }
194151 }
195152 }
196153
197- LockFuture {
154+ ReadFuture {
198155 lock : self ,
199156 opt_key : None ,
200- acquired : false ,
201157 }
202158 . await
203159 }
@@ -226,20 +182,25 @@ impl<T> RwLock<T> {
226182 /// # })
227183 /// ```
228184 pub fn try_read ( & self ) -> Option < RwLockReadGuard < ' _ , T > > {
229- let mut state = self . state . load ( Ordering :: Acquire ) ;
185+ let mut state = self . state . load ( Ordering :: SeqCst ) ;
230186
231187 loop {
232188 // If a write lock is currently held, then a read lock cannot be acquired.
233189 if state & WRITE_LOCK != 0 {
234190 return None ;
235191 }
236192
193+ // Make sure the number of readers doesn't overflow.
194+ if state > isize:: MAX as usize {
195+ process:: abort ( ) ;
196+ }
197+
237198 // Increment the number of active reads.
238199 match self . state . compare_exchange_weak (
239200 state,
240201 state + ONE_READ ,
241- Ordering :: AcqRel ,
242- Ordering :: Acquire ,
202+ Ordering :: SeqCst ,
203+ Ordering :: SeqCst ,
243204 ) {
244205 Ok ( _) => return Some ( RwLockReadGuard ( self ) ) ,
245206 Err ( s) => state = s,
@@ -268,99 +229,59 @@ impl<T> RwLock<T> {
268229 /// # })
269230 /// ```
270231 pub async fn write ( & self ) -> RwLockWriteGuard < ' _ , T > {
271- pub struct LockFuture < ' a , T > {
232+ pub struct WriteFuture < ' a , T > {
272233 lock : & ' a RwLock < T > ,
273234 opt_key : Option < usize > ,
274- acquired : bool ,
275235 }
276236
277- impl < ' a , T > Future for LockFuture < ' a , T > {
237+ impl < ' a , T > Future for WriteFuture < ' a , T > {
278238 type Output = RwLockWriteGuard < ' a , T > ;
279239
280240 fn poll ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Self :: Output > {
281- match self . lock . try_write ( ) {
282- Some ( guard) => {
283- self . acquired = true ;
284- Poll :: Ready ( guard)
285- }
241+ let poll = match self . lock . try_write ( ) {
242+ Some ( guard) => Poll :: Ready ( guard) ,
286243 None => {
287- let mut writes = self . lock . writes . lock ( ) . unwrap ( ) ;
288-
289- // Register the current task.
244+ // Insert this lock operation.
290245 match self . opt_key {
291- None => {
292- // Insert a new entry into the list of blocked writes.
293- let w = cx. waker ( ) . clone ( ) ;
294- let key = writes. insert ( Some ( w) ) ;
295- self . opt_key = Some ( key) ;
296-
297- if writes. len ( ) == 1 {
298- self . lock . state . fetch_or ( BLOCKED_WRITES , Ordering :: Relaxed ) ;
299- }
300- }
301- Some ( key) => {
302- // There is already an entry in the list of blocked writes. Just
303- // reset the waker if it was removed.
304- if writes[ key] . is_none ( ) {
305- let w = cx. waker ( ) . clone ( ) ;
306- writes[ key] = Some ( w) ;
307- }
308- }
246+ None => self . opt_key = Some ( self . lock . write_wakers . insert ( cx) ) ,
247+ Some ( key) => self . lock . write_wakers . update ( key, cx) ,
309248 }
310249
311250 // Try locking again because it's possible the lock got unlocked just
312- // before the current task was registered as a blocked task .
251+ // before the current task was inserted into the waker set .
313252 match self . lock . try_write ( ) {
314- Some ( guard) => {
315- self . acquired = true ;
316- Poll :: Ready ( guard)
317- }
253+ Some ( guard) => Poll :: Ready ( guard) ,
318254 None => Poll :: Pending ,
319255 }
320256 }
257+ } ;
258+
259+ if poll. is_ready ( ) {
260+ // If the current task is in the set, remove it.
261+ if let Some ( key) = self . opt_key . take ( ) {
262+ self . lock . write_wakers . complete ( key) ;
263+ }
321264 }
265+
266+ poll
322267 }
323268 }
324269
325- impl < T > Drop for LockFuture < ' _ , T > {
270+ impl < T > Drop for WriteFuture < ' _ , T > {
326271 fn drop ( & mut self ) {
272+ // If the current task is still in the set, that means it is being cancelled now.
327273 if let Some ( key) = self . opt_key {
328- let mut writes = self . lock . writes . lock ( ) . unwrap ( ) ;
329- let opt_waker = writes. remove ( key) ;
330-
331- if writes. is_empty ( ) {
332- self . lock
333- . state
334- . fetch_and ( !BLOCKED_WRITES , Ordering :: Relaxed ) ;
335- }
336-
337- if opt_waker. is_none ( ) && !self . acquired {
338- // We were awoken but didn't acquire the lock. Wake up another write.
339- if let Some ( ( _, opt_waker) ) = writes. iter_mut ( ) . next ( ) {
340- if let Some ( w) = opt_waker. take ( ) {
341- w. wake ( ) ;
342- return ;
343- }
344- }
345- drop ( writes) ;
346-
347- // There are no blocked writes. Wake a blocked read instead.
348- let mut reads = self . lock . reads . lock ( ) . unwrap ( ) ;
349- if let Some ( ( _, opt_waker) ) = reads. iter_mut ( ) . next ( ) {
350- if let Some ( w) = opt_waker. take ( ) {
351- w. wake ( ) ;
352- return ;
353- }
354- }
274+ if !self . lock . write_wakers . cancel ( key) {
275+ // If no other blocked reader was notified, notify all readers.
276+ self . lock . read_wakers . notify_all ( ) ;
355277 }
356278 }
357279 }
358280 }
359281
360- LockFuture {
282+ WriteFuture {
361283 lock : self ,
362284 opt_key : None ,
363- acquired : false ,
364285 }
365286 . await
366287 }
@@ -389,24 +310,10 @@ impl<T> RwLock<T> {
389310 /// # })
390311 /// ```
391312 pub fn try_write ( & self ) -> Option < RwLockWriteGuard < ' _ , T > > {
392- let mut state = self . state . load ( Ordering :: Acquire ) ;
393-
394- loop {
395- // If any kind of lock is currently held, then a write lock cannot be acquired.
396- if state & ( WRITE_LOCK | READ_COUNT_MASK ) != 0 {
397- return None ;
398- }
399-
400- // Set the write lock.
401- match self . state . compare_exchange_weak (
402- state,
403- state | WRITE_LOCK ,
404- Ordering :: AcqRel ,
405- Ordering :: Acquire ,
406- ) {
407- Ok ( _) => return Some ( RwLockWriteGuard ( self ) ) ,
408- Err ( s) => state = s,
409- }
313+ if self . state . compare_and_swap ( 0 , WRITE_LOCK , Ordering :: SeqCst ) == 0 {
314+ Some ( RwLockWriteGuard ( self ) )
315+ } else {
316+ None
410317 }
411318 }
412319
@@ -449,18 +356,15 @@ impl<T> RwLock<T> {
449356
450357impl < T : fmt:: Debug > fmt:: Debug for RwLock < T > {
451358 fn fmt ( & self , f : & mut fmt:: Formatter < ' _ > ) -> fmt:: Result {
452- match self . try_read ( ) {
453- None => {
454- struct LockedPlaceholder ;
455- impl fmt:: Debug for LockedPlaceholder {
456- fn fmt ( & self , f : & mut fmt:: Formatter < ' _ > ) -> fmt:: Result {
457- f. write_str ( "<locked>" )
458- }
459- }
460- f. debug_struct ( "RwLock" )
461- . field ( "data" , & LockedPlaceholder )
462- . finish ( )
359+ struct Locked ;
360+ impl fmt:: Debug for Locked {
361+ fn fmt ( & self , f : & mut fmt:: Formatter < ' _ > ) -> fmt:: Result {
362+ f. write_str ( "<locked>" )
463363 }
364+ }
365+
366+ match self . try_read ( ) {
367+ None => f. debug_struct ( "RwLock" ) . field ( "data" , & Locked ) . finish ( ) ,
464368 Some ( guard) => f. debug_struct ( "RwLock" ) . field ( "data" , & & * guard) . finish ( ) ,
465369 }
466370 }
@@ -486,18 +390,11 @@ unsafe impl<T: Sync> Sync for RwLockReadGuard<'_, T> {}
486390
487391impl < T > Drop for RwLockReadGuard < ' _ , T > {
488392 fn drop ( & mut self ) {
489- let state = self . 0 . state . fetch_sub ( ONE_READ , Ordering :: AcqRel ) ;
490-
491- // If this was the last read and there are blocked writes, wake one of them up.
492- if ( state & READ_COUNT_MASK ) == ONE_READ && state & BLOCKED_WRITES != 0 {
493- let mut writes = self . 0 . writes . lock ( ) . unwrap ( ) ;
393+ let state = self . 0 . state . fetch_sub ( ONE_READ , Ordering :: SeqCst ) ;
494394
495- if let Some ( ( _, opt_waker) ) = writes. iter_mut ( ) . next ( ) {
496- // If there is no waker in this entry, that means it was already woken.
497- if let Some ( w) = opt_waker. take ( ) {
498- w. wake ( ) ;
499- }
500- }
395+ // If this was the last read, wake one of the writers.
396+ if state & READ_COUNT_MASK == ONE_READ {
397+ self . 0 . write_wakers . notify_one ( ) ;
501398 }
502399 }
503400}
@@ -530,25 +427,12 @@ unsafe impl<T: Sync> Sync for RwLockWriteGuard<'_, T> {}
530427
531428impl < T > Drop for RwLockWriteGuard < ' _ , T > {
532429 fn drop ( & mut self ) {
533- let state = self . 0 . state . fetch_and ( !WRITE_LOCK , Ordering :: AcqRel ) ;
534-
535- let mut guard = None ;
536-
537- // Check if there are any blocked reads or writes.
538- if state & BLOCKED_READS != 0 {
539- guard = Some ( self . 0 . reads . lock ( ) . unwrap ( ) ) ;
540- } else if state & BLOCKED_WRITES != 0 {
541- guard = Some ( self . 0 . writes . lock ( ) . unwrap ( ) ) ;
542- }
430+ self . 0 . state . store ( 0 , Ordering :: SeqCst ) ;
543431
544- // Wake up a single blocked task.
545- if let Some ( mut guard) = guard {
546- if let Some ( ( _, opt_waker) ) = guard. iter_mut ( ) . next ( ) {
547- // If there is no waker in this entry, that means it was already woken.
548- if let Some ( w) = opt_waker. take ( ) {
549- w. wake ( ) ;
550- }
551- }
432+ // Notify all blocked readers.
433+ if !self . 0 . read_wakers . notify_all ( ) {
434+ // If there were no blocked readers, notify a blocked writer.
435+ self . 0 . write_wakers . notify_one ( ) ;
552436 }
553437 }
554438}
0 commit comments