Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 2cc1e16

Browse files
committedNov 25, 2013
auto merge of #10603 : alexcrichton/rust/no-linked-failure, r=brson
The reasons for doing this are: * The model on which linked failure is based is inherently complex * The implementation is also very complex, and there are few remaining who fully understand the implementation * There are existing race conditions in the core context switching function of the scheduler, and possibly others. * It's unclear whether this model of linked failure maps well to a 1:1 threading model Linked failure is often a desired aspect of tasks, but we would like to take a much more conservative approach in re-implementing linked failure if at all. Closes #8674 Closes #8318 Closes #8863
2 parents ca32743 + acca9e3 commit 2cc1e16

39 files changed

+403
-2531
lines changed
 

‎doc/rust.md‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2349,9 +2349,9 @@ Indices are zero-based, and may be of any integral type. Vector access
23492349
is bounds-checked at run-time. When the check fails, it will put the
23502350
task in a _failing state_.
23512351

2352-
~~~~
2352+
~~~~ {.xfail-test}
23532353
# use std::task;
2354-
# do task::spawn_unlinked {
2354+
# do task::spawn {
23552355
23562356
([1, 2, 3, 4])[0];
23572357
(["a", "b"])[10]; // fails

‎doc/tutorial-tasks.md‎

Lines changed: 1 addition & 111 deletions
Original file line numberDiff line numberDiff line change
@@ -402,22 +402,6 @@ freeing memory along the way---and then exits. Unlike exceptions in C++,
402402
exceptions in Rust are unrecoverable within a single task: once a task fails,
403403
there is no way to "catch" the exception.
404404

405-
All tasks are, by default, _linked_ to each other. That means that the fates
406-
of all tasks are intertwined: if one fails, so do all the others.
407-
408-
~~~{.xfail-test .linked-failure}
409-
# use std::task::spawn;
410-
# use std::task;
411-
# fn do_some_work() { loop { task::yield() } }
412-
# do task::try {
413-
// Create a child task that fails
414-
do spawn { fail!() }
415-
416-
// This will also fail because the task we spawned failed
417-
do_some_work();
418-
# };
419-
~~~
420-
421405
While it isn't possible for a task to recover from failure, tasks may notify
422406
each other of failure. The simplest way of handling task failure is with the
423407
`try` function, which is similar to `spawn`, but immediately blocks waiting
@@ -464,101 +448,7 @@ it trips, indicates an unrecoverable logic error); in other cases you
464448
might want to contain the failure at a certain boundary (perhaps a
465449
small piece of input from the outside world, which you happen to be
466450
processing in parallel, is malformed and its processing task can't
467-
proceed). Hence, you will need different _linked failure modes_.
468-
469-
## Failure modes
470-
471-
By default, task failure is _bidirectionally linked_, which means that if
472-
either task fails, it kills the other one.
473-
474-
~~~{.xfail-test .linked-failure}
475-
# use std::task;
476-
# use std::comm::oneshot;
477-
# fn sleep_forever() { loop { let (p, c) = oneshot::<()>(); p.recv(); } }
478-
# do task::try {
479-
do spawn {
480-
do spawn {
481-
fail!(); // All three tasks will fail.
482-
}
483-
sleep_forever(); // Will get woken up by force, then fail
484-
}
485-
sleep_forever(); // Will get woken up by force, then fail
486-
# };
487-
~~~
488-
489-
If you want parent tasks to be able to kill their children, but do not want a
490-
parent to fail automatically if one of its child task fails, you can call
491-
`task::spawn_supervised` for _unidirectionally linked_ failure. The
492-
function `task::try`, which we saw previously, uses `spawn_supervised`
493-
internally, with additional logic to wait for the child task to finish
494-
before returning. Hence:
495-
496-
~~~{.xfail-test .linked-failure}
497-
# use std::comm::{stream, Chan, Port};
498-
# use std::comm::oneshot;
499-
# use std::task::{spawn, try};
500-
# use std::task;
501-
# fn sleep_forever() { loop { let (p, c) = oneshot::<()>(); p.recv(); } }
502-
# do task::try {
503-
let (receiver, sender): (Port<int>, Chan<int>) = stream();
504-
do spawn { // Bidirectionally linked
505-
// Wait for the supervised child task to exist.
506-
let message = receiver.recv();
507-
// Kill both it and the parent task.
508-
assert!(message != 42);
509-
}
510-
do try { // Unidirectionally linked
511-
sender.send(42);
512-
sleep_forever(); // Will get woken up by force
513-
}
514-
// Flow never reaches here -- parent task was killed too.
515-
# };
516-
~~~
517-
518-
Supervised failure is useful in any situation where one task manages
519-
multiple fallible child tasks, and the parent task can recover
520-
if any child fails. On the other hand, if the _parent_ (supervisor) fails,
521-
then there is nothing the children can do to recover, so they should
522-
also fail.
523-
524-
Supervised task failure propagates across multiple generations even if
525-
an intermediate generation has already exited:
526-
527-
~~~{.xfail-test .linked-failure}
528-
# use std::task;
529-
# use std::comm::oneshot;
530-
# fn sleep_forever() { loop { let (p, c) = oneshot::<()>(); p.recv(); } }
531-
# fn wait_for_a_while() { for _ in range(0, 1000u) { task::yield() } }
532-
# do task::try::<int> {
533-
do task::spawn_supervised {
534-
do task::spawn_supervised {
535-
sleep_forever(); // Will get woken up by force, then fail
536-
}
537-
// Intermediate task immediately exits
538-
}
539-
wait_for_a_while();
540-
fail!(); // Will kill grandchild even if child has already exited
541-
# };
542-
~~~
543-
544-
Finally, tasks can be configured to not propagate failure to each
545-
other at all, using `task::spawn_unlinked` for _isolated failure_.
546-
547-
~~~{.xfail-test .linked-failure}
548-
# use std::task;
549-
# fn random() -> uint { 100 }
550-
# fn sleep_for(i: uint) { for _ in range(0, i) { task::yield() } }
551-
# do task::try::<()> {
552-
let (time1, time2) = (random(), random());
553-
do task::spawn_unlinked {
554-
sleep_for(time2); // Won't get forced awake
555-
fail!();
556-
}
557-
sleep_for(time1); // Won't get forced awake
558-
fail!();
559-
// It will take MAX(time1,time2) for the program to finish.
560-
# };
561-
~~~
451+
proceed).
562452

563453
## Creating a task with a bi-directional communication path
564454

‎src/libextra/arc.rs‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -655,7 +655,7 @@ mod tests {
655655
let arc2 = ~arc.clone();
656656
let (p, c) = comm::stream();
657657

658-
do task::spawn_unlinked || {
658+
do spawn {
659659
let _ = p.recv();
660660
do arc2.access_cond |one, cond| {
661661
cond.signal();

‎src/libextra/comm.rs‎

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,6 @@ pub fn rendezvous<T: Send>() -> (SyncPort<T>, SyncChan<T>) {
137137
mod test {
138138
use comm::{DuplexStream, rendezvous};
139139
use std::rt::test::run_in_uv_task;
140-
use std::task::spawn_unlinked;
141140

142141

143142
#[test]
@@ -177,7 +176,7 @@ mod test {
177176
#[test]
178177
fn send_and_fail_and_try_recv() {
179178
let (port, chan) = rendezvous();
180-
do spawn_unlinked {
179+
do spawn {
181180
chan.duplex_stream.send(()); // Can't access this field outside this module
182181
fail!()
183182
}
@@ -187,7 +186,7 @@ mod test {
187186
#[test]
188187
fn try_send_and_recv_then_fail_before_ack() {
189188
let (port, chan) = rendezvous();
190-
do spawn_unlinked {
189+
do spawn {
191190
port.duplex_stream.recv();
192191
fail!()
193192
}
@@ -198,7 +197,7 @@ mod test {
198197
#[should_fail]
199198
fn send_and_recv_then_fail_before_ack() {
200199
let (port, chan) = rendezvous();
201-
do spawn_unlinked {
200+
do spawn {
202201
port.duplex_stream.recv();
203202
fail!()
204203
}

‎src/libextra/future.rs‎

Lines changed: 1 addition & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727

2828
use std::cell::Cell;
2929
use std::comm::{PortOne, oneshot};
30-
use std::task;
3130
use std::util::replace;
3231

3332
/// A type encapsulating the result of a computation which may not be complete
@@ -130,29 +129,12 @@ impl<A:Send> Future<A> {
130129

131130
let (port, chan) = oneshot();
132131

133-
do task::spawn_with(chan) |chan| {
132+
do spawn {
134133
chan.send(blk());
135134
}
136135

137136
Future::from_port(port)
138137
}
139-
140-
pub fn spawn_with<B: Send>(v: B, blk: proc(B) -> A) -> Future<A> {
141-
/*!
142-
* Create a future from a unique closure taking one argument.
143-
*
144-
* The closure and its argument will be moved into a new task. The
145-
* closure will be run and its result used as the value of the future.
146-
*/
147-
148-
let (port, chan) = oneshot();
149-
150-
do task::spawn_with((v, chan)) |(v, chan)| {
151-
chan.send(blk(v));
152-
}
153-
154-
Future::from_port(port)
155-
}
156138
}
157139

158140
#[cfg(test)]
@@ -207,12 +189,6 @@ mod test {
207189
assert_eq!(f.get(), ~"bale");
208190
}
209191
210-
#[test]
211-
fn test_spawn_with() {
212-
let mut f = Future::spawn_with(~"gale", |s| { s });
213-
assert_eq!(f.get(), ~"gale");
214-
}
215-
216192
#[test]
217193
#[should_fail]
218194
fn test_futurefail() {

‎src/libextra/sync.rs‎

Lines changed: 109 additions & 135 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ use std::borrow;
2222
use std::comm;
2323
use std::comm::SendDeferred;
2424
use std::comm::{GenericPort, Peekable};
25-
use std::task;
2625
use std::unstable::sync::{Exclusive, UnsafeArc};
2726
use std::unstable::atomics;
2827
use std::unstable::finally::Finally;
@@ -134,13 +133,11 @@ impl<Q:Send> Sem<Q> {
134133
}
135134

136135
pub fn access<U>(&self, blk: || -> U) -> U {
137-
do task::unkillable {
138-
do (|| {
139-
self.acquire();
140-
do task::rekillable { blk() }
141-
}).finally {
142-
self.release();
143-
}
136+
do (|| {
137+
self.acquire();
138+
blk()
139+
}).finally {
140+
self.release();
144141
}
145142
}
146143
}
@@ -206,48 +203,41 @@ impl<'self> Condvar<'self> {
206203
pub fn wait_on(&self, condvar_id: uint) {
207204
let mut WaitEnd = None;
208205
let mut out_of_bounds = None;
209-
do task::unkillable {
210-
// Release lock, 'atomically' enqueuing ourselves in so doing.
211-
unsafe {
212-
do (**self.sem).with |state| {
213-
if condvar_id < state.blocked.len() {
214-
// Drop the lock.
215-
state.count += 1;
216-
if state.count <= 0 {
217-
state.waiters.signal();
218-
}
219-
// Create waiter nobe, and enqueue ourself to
220-
// be woken up by a signaller.
221-
WaitEnd = Some(state.blocked[condvar_id].wait_end());
222-
} else {
223-
out_of_bounds = Some(state.blocked.len());
206+
// Release lock, 'atomically' enqueuing ourselves in so doing.
207+
unsafe {
208+
do (**self.sem).with |state| {
209+
if condvar_id < state.blocked.len() {
210+
// Drop the lock.
211+
state.count += 1;
212+
if state.count <= 0 {
213+
state.waiters.signal();
224214
}
215+
// Create waiter nobe, and enqueue ourself to
216+
// be woken up by a signaller.
217+
WaitEnd = Some(state.blocked[condvar_id].wait_end());
218+
} else {
219+
out_of_bounds = Some(state.blocked.len());
225220
}
226221
}
222+
}
227223

228-
// If deschedule checks start getting inserted anywhere, we can be
229-
// killed before or after enqueueing. Deciding whether to
230-
// unkillably reacquire the lock needs to happen atomically
231-
// wrt enqueuing.
232-
do check_cvar_bounds(out_of_bounds, condvar_id, "cond.wait_on()") {
233-
// Unconditionally "block". (Might not actually block if a
234-
// signaller already sent -- I mean 'unconditionally' in contrast
235-
// with acquire().)
236-
do (|| {
237-
do task::rekillable {
238-
let _ = WaitEnd.take_unwrap().recv();
239-
}
240-
}).finally {
241-
// Reacquire the condvar. Note this is back in the unkillable
242-
// section; it needs to succeed, instead of itself dying.
243-
match self.order {
244-
Just(lock) => do lock.access {
245-
self.sem.acquire();
246-
},
247-
Nothing => {
248-
self.sem.acquire();
249-
},
250-
}
224+
// If deschedule checks start getting inserted anywhere, we can be
225+
// killed before or after enqueueing.
226+
do check_cvar_bounds(out_of_bounds, condvar_id, "cond.wait_on()") {
227+
// Unconditionally "block". (Might not actually block if a
228+
// signaller already sent -- I mean 'unconditionally' in contrast
229+
// with acquire().)
230+
do (|| {
231+
let _ = WaitEnd.take_unwrap().recv();
232+
}).finally {
233+
// Reacquire the condvar.
234+
match self.order {
235+
Just(lock) => do lock.access {
236+
self.sem.acquire();
237+
},
238+
Nothing => {
239+
self.sem.acquire();
240+
},
251241
}
252242
}
253243
}
@@ -484,30 +474,28 @@ impl RWLock {
484474
*/
485475
pub fn read<U>(&self, blk: || -> U) -> U {
486476
unsafe {
487-
do task::unkillable {
488-
do (&self.order_lock).access {
489-
let state = &mut *self.state.get();
490-
let old_count = state.read_count.fetch_add(1, atomics::Acquire);
491-
if old_count == 0 {
492-
(&self.access_lock).acquire();
493-
state.read_mode = true;
494-
}
477+
do (&self.order_lock).access {
478+
let state = &mut *self.state.get();
479+
let old_count = state.read_count.fetch_add(1, atomics::Acquire);
480+
if old_count == 0 {
481+
(&self.access_lock).acquire();
482+
state.read_mode = true;
495483
}
496-
do (|| {
497-
do task::rekillable { blk() }
498-
}).finally {
499-
let state = &mut *self.state.get();
500-
assert!(state.read_mode);
501-
let old_count = state.read_count.fetch_sub(1, atomics::Release);
502-
assert!(old_count > 0);
503-
if old_count == 1 {
504-
state.read_mode = false;
505-
// Note: this release used to be outside of a locked access
506-
// to exclusive-protected state. If this code is ever
507-
// converted back to such (instead of using atomic ops),
508-
// this access MUST NOT go inside the exclusive access.
509-
(&self.access_lock).release();
510-
}
484+
}
485+
do (|| {
486+
blk()
487+
}).finally {
488+
let state = &mut *self.state.get();
489+
assert!(state.read_mode);
490+
let old_count = state.read_count.fetch_sub(1, atomics::Release);
491+
assert!(old_count > 0);
492+
if old_count == 1 {
493+
state.read_mode = false;
494+
// Note: this release used to be outside of a locked access
495+
// to exclusive-protected state. If this code is ever
496+
// converted back to such (instead of using atomic ops),
497+
// this access MUST NOT go inside the exclusive access.
498+
(&self.access_lock).release();
511499
}
512500
}
513501
}
@@ -518,14 +506,10 @@ impl RWLock {
518506
* 'write' from other tasks will run concurrently with this one.
519507
*/
520508
pub fn write<U>(&self, blk: || -> U) -> U {
521-
do task::unkillable {
522-
(&self.order_lock).acquire();
523-
do (&self.access_lock).access {
524-
(&self.order_lock).release();
525-
do task::rekillable {
526-
blk()
527-
}
528-
}
509+
(&self.order_lock).acquire();
510+
do (&self.access_lock).access {
511+
(&self.order_lock).release();
512+
blk()
529513
}
530514
}
531515

@@ -562,16 +546,12 @@ impl RWLock {
562546
// which can't happen until T2 finishes the downgrade-read entirely.
563547
// The astute reader will also note that making waking writers use the
564548
// order_lock is better for not starving readers.
565-
do task::unkillable {
566-
(&self.order_lock).acquire();
567-
do (&self.access_lock).access_cond |cond| {
568-
(&self.order_lock).release();
569-
do task::rekillable {
570-
let opt_lock = Just(&self.order_lock);
571-
blk(&Condvar { sem: cond.sem, order: opt_lock,
572-
token: NonCopyable })
573-
}
574-
}
549+
(&self.order_lock).acquire();
550+
do (&self.access_lock).access_cond |cond| {
551+
(&self.order_lock).release();
552+
let opt_lock = Just(&self.order_lock);
553+
blk(&Condvar { sem: cond.sem, order: opt_lock,
554+
token: NonCopyable })
575555
}
576556
}
577557

@@ -599,39 +579,35 @@ impl RWLock {
599579
pub fn write_downgrade<U>(&self, blk: |v: RWLockWriteMode| -> U) -> U {
600580
// Implementation slightly different from the slicker 'write's above.
601581
// The exit path is conditional on whether the caller downgrades.
602-
do task::unkillable {
603-
(&self.order_lock).acquire();
604-
(&self.access_lock).acquire();
605-
(&self.order_lock).release();
606-
do (|| {
607-
do task::rekillable {
608-
blk(RWLockWriteMode { lock: self, token: NonCopyable })
609-
}
610-
}).finally {
611-
let writer_or_last_reader;
612-
// Check if we're releasing from read mode or from write mode.
613-
let state = unsafe { &mut *self.state.get() };
614-
if state.read_mode {
615-
// Releasing from read mode.
616-
let old_count = state.read_count.fetch_sub(1, atomics::Release);
617-
assert!(old_count > 0);
618-
// Check if other readers remain.
619-
if old_count == 1 {
620-
// Case 1: Writer downgraded & was the last reader
621-
writer_or_last_reader = true;
622-
state.read_mode = false;
623-
} else {
624-
// Case 2: Writer downgraded & was not the last reader
625-
writer_or_last_reader = false;
626-
}
627-
} else {
628-
// Case 3: Writer did not downgrade
582+
(&self.order_lock).acquire();
583+
(&self.access_lock).acquire();
584+
(&self.order_lock).release();
585+
do (|| {
586+
blk(RWLockWriteMode { lock: self, token: NonCopyable })
587+
}).finally {
588+
let writer_or_last_reader;
589+
// Check if we're releasing from read mode or from write mode.
590+
let state = unsafe { &mut *self.state.get() };
591+
if state.read_mode {
592+
// Releasing from read mode.
593+
let old_count = state.read_count.fetch_sub(1, atomics::Release);
594+
assert!(old_count > 0);
595+
// Check if other readers remain.
596+
if old_count == 1 {
597+
// Case 1: Writer downgraded & was the last reader
629598
writer_or_last_reader = true;
599+
state.read_mode = false;
600+
} else {
601+
// Case 2: Writer downgraded & was not the last reader
602+
writer_or_last_reader = false;
630603
}
631-
if writer_or_last_reader {
632-
// Nobody left inside; release the "reader cloud" lock.
633-
(&self.access_lock).release();
634-
}
604+
} else {
605+
// Case 3: Writer did not downgrade
606+
writer_or_last_reader = true;
607+
}
608+
if writer_or_last_reader {
609+
// Nobody left inside; release the "reader cloud" lock.
610+
(&self.access_lock).release();
635611
}
636612
}
637613
}
@@ -643,23 +619,21 @@ impl RWLock {
643619
fail!("Can't downgrade() with a different rwlock's write_mode!");
644620
}
645621
unsafe {
646-
do task::unkillable {
647-
let state = &mut *self.state.get();
648-
assert!(!state.read_mode);
649-
state.read_mode = true;
650-
// If a reader attempts to enter at this point, both the
651-
// downgrader and reader will set the mode flag. This is fine.
652-
let old_count = state.read_count.fetch_add(1, atomics::Release);
653-
// If another reader was already blocking, we need to hand-off
654-
// the "reader cloud" access lock to them.
655-
if old_count != 0 {
656-
// Guaranteed not to let another writer in, because
657-
// another reader was holding the order_lock. Hence they
658-
// must be the one to get the access_lock (because all
659-
// access_locks are acquired with order_lock held). See
660-
// the comment in write_cond for more justification.
661-
(&self.access_lock).release();
662-
}
622+
let state = &mut *self.state.get();
623+
assert!(!state.read_mode);
624+
state.read_mode = true;
625+
// If a reader attempts to enter at this point, both the
626+
// downgrader and reader will set the mode flag. This is fine.
627+
let old_count = state.read_count.fetch_add(1, atomics::Release);
628+
// If another reader was already blocking, we need to hand-off
629+
// the "reader cloud" access lock to them.
630+
if old_count != 0 {
631+
// Guaranteed not to let another writer in, because
632+
// another reader was holding the order_lock. Hence they
633+
// must be the one to get the access_lock (because all
634+
// access_locks are acquired with order_lock held). See
635+
// the comment in write_cond for more justification.
636+
(&self.access_lock).release();
663637
}
664638
}
665639
RWLockReadMode { lock: token.lock, token: NonCopyable }

‎src/libextra/test.rs‎

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -875,7 +875,6 @@ pub fn run_test(force_ignore: bool,
875875
let testfn_cell = ::std::cell::Cell::new(testfn);
876876
do task::spawn {
877877
let mut task = task::task();
878-
task.unlinked();
879878
task.name(match desc.name {
880879
DynTestName(ref name) => SendStrOwned(name.clone()),
881880
StaticTestName(name) => SendStrStatic(name),

‎src/libextra/time.rs‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -954,10 +954,10 @@ mod tests {
954954
955955
use std::f64;
956956
use std::result::{Err, Ok};
957-
use std::libc;
958957
959958
#[cfg(windows)]
960959
fn set_time_zone() {
960+
use std::libc;
961961
// Windows crt doesn't see any environment variable set by
962962
// `SetEnvironmentVariable`, which `os::setenv` internally uses.
963963
// It is why we use `putenv` here.

‎src/librustc/lib.rs‎

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,6 @@ pub fn monitor(f: proc(@diagnostic::Emitter)) {
338338
let ch_capture = ch.clone();
339339
let mut task_builder = task::task();
340340
task_builder.name("rustc");
341-
task_builder.supervised();
342341

343342
// XXX: Hacks on hacks. If the env is trying to override the stack size
344343
// then *don't* set it explicitly.

‎src/librustdoc/html/render.rs‎

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -709,10 +709,11 @@ impl Context {
709709
let prog_chan = prog_chan.clone();
710710

711711
let mut task = task::task();
712-
task.unlinked(); // we kill things manually
713712
task.name(format!("worker{}", i));
714-
task.spawn_with(cache.clone(),
715-
|cache| worker(cache, &port, &chan, &prog_chan));
713+
let cache = cache.clone();
714+
do task.spawn {
715+
worker(cache, &port, &chan, &prog_chan);
716+
}
716717

717718
fn worker(cache: RWArc<Cache>,
718719
port: &SharedPort<Work>,

‎src/librustuv/file.rs‎

Lines changed: 16 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ use std::io;
2020
use std::rt::local::Local;
2121
use std::rt::rtio;
2222
use std::rt::sched::{Scheduler, SchedHandle};
23-
use std::task;
2423
use std::vec;
2524

2625
use super::{Loop, UvError, uv_error_to_io_error, wait_until_woken_after};
@@ -298,26 +297,24 @@ impl Drop for FsRequest {
298297
fn execute(f: &fn(*uvll::uv_fs_t, uvll::uv_fs_cb) -> c_int)
299298
-> Result<FsRequest, UvError>
300299
{
301-
return do task::unkillable {
302-
let mut req = FsRequest {
303-
fired: false,
304-
req: unsafe { uvll::malloc_req(uvll::UV_FS) }
305-
};
306-
match f(req.req, fs_cb) {
307-
0 => {
308-
req.fired = true;
309-
let mut slot = None;
310-
do wait_until_woken_after(&mut slot) {
311-
unsafe { uvll::set_data_for_req(req.req, &slot) }
312-
}
313-
match req.get_result() {
314-
n if n < 0 => Err(UvError(n)),
315-
_ => Ok(req),
316-
}
300+
let mut req = FsRequest {
301+
fired: false,
302+
req: unsafe { uvll::malloc_req(uvll::UV_FS) }
303+
};
304+
return match f(req.req, fs_cb) {
305+
0 => {
306+
req.fired = true;
307+
let mut slot = None;
308+
do wait_until_woken_after(&mut slot) {
309+
unsafe { uvll::set_data_for_req(req.req, &slot) }
310+
}
311+
match req.get_result() {
312+
n if n < 0 => Err(UvError(n)),
313+
_ => Ok(req),
317314
}
318-
n => Err(UvError(n))
319-
320315
}
316+
n => Err(UvError(n))
317+
321318
};
322319

323320
extern fn fs_cb(req: *uvll::uv_fs_t) {

‎src/librustuv/net.rs‎

Lines changed: 63 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ use std::rt::rtio;
2020
use std::rt::sched::{Scheduler, SchedHandle};
2121
use std::rt::tube::Tube;
2222
use std::str;
23-
use std::task;
2423
use std::vec;
2524

2625
use stream::StreamWatcher;
@@ -176,36 +175,34 @@ impl TcpWatcher {
176175
{
177176
struct Ctx { status: c_int, task: Option<BlockedTask> }
178177

179-
return do task::unkillable {
180-
let tcp = TcpWatcher::new(loop_);
181-
let ret = do socket_addr_as_sockaddr(address) |addr| {
182-
let mut req = Request::new(uvll::UV_CONNECT);
183-
let result = unsafe {
184-
uvll::uv_tcp_connect(req.handle, tcp.handle, addr,
185-
connect_cb)
186-
};
187-
match result {
188-
0 => {
189-
req.defuse(); // uv callback now owns this request
190-
let mut cx = Ctx { status: 0, task: None };
191-
do wait_until_woken_after(&mut cx.task) {
192-
req.set_data(&cx);
193-
}
194-
match cx.status {
195-
0 => Ok(()),
196-
n => Err(UvError(n)),
197-
}
178+
let tcp = TcpWatcher::new(loop_);
179+
let ret = do socket_addr_as_sockaddr(address) |addr| {
180+
let mut req = Request::new(uvll::UV_CONNECT);
181+
let result = unsafe {
182+
uvll::uv_tcp_connect(req.handle, tcp.handle, addr,
183+
connect_cb)
184+
};
185+
match result {
186+
0 => {
187+
req.defuse(); // uv callback now owns this request
188+
let mut cx = Ctx { status: 0, task: None };
189+
do wait_until_woken_after(&mut cx.task) {
190+
req.set_data(&cx);
191+
}
192+
match cx.status {
193+
0 => Ok(()),
194+
n => Err(UvError(n)),
198195
}
199-
n => Err(UvError(n))
200196
}
201-
};
202-
203-
match ret {
204-
Ok(()) => Ok(tcp),
205-
Err(e) => Err(e),
197+
n => Err(UvError(n))
206198
}
207199
};
208200

201+
return match ret {
202+
Ok(()) => Ok(tcp),
203+
Err(e) => Err(e),
204+
};
205+
209206
extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) {
210207
let req = Request::wrap(req);
211208
assert!(status != uvll::ECANCELED);
@@ -291,25 +288,23 @@ impl TcpListener {
291288
pub fn bind(loop_: &mut Loop, address: SocketAddr)
292289
-> Result<~TcpListener, UvError>
293290
{
294-
do task::unkillable {
295-
let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) };
296-
assert_eq!(unsafe {
297-
uvll::uv_tcp_init(loop_.handle, handle)
298-
}, 0);
299-
let l = ~TcpListener {
300-
home: get_handle_to_current_scheduler!(),
301-
handle: handle,
302-
closing_task: None,
303-
outgoing: Tube::new(),
304-
};
305-
let res = socket_addr_as_sockaddr(address, |addr| unsafe {
306-
uvll::uv_tcp_bind(l.handle, addr)
307-
});
308-
match res {
309-
0 => Ok(l.install()),
310-
n => Err(UvError(n))
311-
}
312-
}
291+
let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) };
292+
assert_eq!(unsafe {
293+
uvll::uv_tcp_init(loop_.handle, handle)
294+
}, 0);
295+
let l = ~TcpListener {
296+
home: get_handle_to_current_scheduler!(),
297+
handle: handle,
298+
closing_task: None,
299+
outgoing: Tube::new(),
300+
};
301+
let res = socket_addr_as_sockaddr(address, |addr| unsafe {
302+
uvll::uv_tcp_bind(l.handle, addr)
303+
});
304+
return match res {
305+
0 => Ok(l.install()),
306+
n => Err(UvError(n))
307+
};
313308
}
314309
}
315310

@@ -426,22 +421,20 @@ impl UdpWatcher {
426421
pub fn bind(loop_: &Loop, address: SocketAddr)
427422
-> Result<UdpWatcher, UvError>
428423
{
429-
do task::unkillable {
430-
let udp = UdpWatcher {
431-
handle: unsafe { uvll::malloc_handle(uvll::UV_UDP) },
432-
home: get_handle_to_current_scheduler!(),
433-
};
434-
assert_eq!(unsafe {
435-
uvll::uv_udp_init(loop_.handle, udp.handle)
436-
}, 0);
437-
let result = socket_addr_as_sockaddr(address, |addr| unsafe {
438-
uvll::uv_udp_bind(udp.handle, addr, 0u32)
439-
});
440-
match result {
441-
0 => Ok(udp),
442-
n => Err(UvError(n)),
443-
}
444-
}
424+
let udp = UdpWatcher {
425+
handle: unsafe { uvll::malloc_handle(uvll::UV_UDP) },
426+
home: get_handle_to_current_scheduler!(),
427+
};
428+
assert_eq!(unsafe {
429+
uvll::uv_udp_init(loop_.handle, udp.handle)
430+
}, 0);
431+
let result = socket_addr_as_sockaddr(address, |addr| unsafe {
432+
uvll::uv_udp_bind(udp.handle, addr, 0u32)
433+
});
434+
return match result {
435+
0 => Ok(udp),
436+
n => Err(UvError(n)),
437+
};
445438
}
446439
}
447440

@@ -1123,16 +1116,14 @@ mod test {
11231116
assert!(maybe_socket.is_ok());
11241117
11251118
// block self on sched1
1126-
do task::unkillable { // FIXME(#8674)
1127-
let scheduler: ~Scheduler = Local::take();
1128-
do scheduler.deschedule_running_task_and_then |_, task| {
1129-
// unblock task
1130-
do task.wake().map |task| {
1131-
// send self to sched2
1132-
tasksFriendHandle.take().send(TaskFromFriend(task));
1133-
};
1134-
// sched1 should now sleep since it has nothing else to do
1135-
}
1119+
let scheduler: ~Scheduler = Local::take();
1120+
do scheduler.deschedule_running_task_and_then |_, task| {
1121+
// unblock task
1122+
do task.wake().map |task| {
1123+
// send self to sched2
1124+
tasksFriendHandle.take().send(TaskFromFriend(task));
1125+
};
1126+
// sched1 should now sleep since it has nothing else to do
11361127
}
11371128
// sched2 will wake up and get the task as we do nothing else,
11381129
// the function ends and the socket goes out of scope sched2
@@ -1180,7 +1171,7 @@ mod test {
11801171
let chan = Cell::new(chan);
11811172
let addr = next_test_ip4();
11821173
1183-
do task::spawn_unlinked { // please no linked failure
1174+
do spawn {
11841175
let w = TcpListener::bind(local_loop(), addr).unwrap();
11851176
let mut w = w.listen().unwrap();
11861177
chan.take().send(());

‎src/librustuv/pipe.rs‎

Lines changed: 32 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ use std::rt::local::Local;
1616
use std::rt::rtio::{RtioPipe, RtioUnixListener, RtioUnixAcceptor};
1717
use std::rt::sched::{Scheduler, SchedHandle};
1818
use std::rt::tube::Tube;
19-
use std::task;
2019

2120
use stream::StreamWatcher;
2221
use super::{Loop, UvError, UvHandle, Request, uv_error_to_io_error,
@@ -74,26 +73,23 @@ impl PipeWatcher {
7473
pub fn connect(loop_: &Loop, name: &CString) -> Result<PipeWatcher, UvError>
7574
{
7675
struct Ctx { task: Option<BlockedTask>, result: libc::c_int, }
77-
return do task::unkillable {
78-
let mut cx = Ctx { task: None, result: 0 };
79-
let mut req = Request::new(uvll::UV_CONNECT);
80-
let pipe = PipeWatcher::new(loop_, false);
81-
82-
do wait_until_woken_after(&mut cx.task) {
83-
unsafe {
84-
uvll::uv_pipe_connect(req.handle,
85-
pipe.handle(),
86-
name.with_ref(|p| p),
87-
connect_cb)
88-
}
89-
req.set_data(&cx);
90-
req.defuse(); // uv callback now owns this request
91-
}
92-
match cx.result {
93-
0 => Ok(pipe),
94-
n => Err(UvError(n))
95-
}
76+
let mut cx = Ctx { task: None, result: 0 };
77+
let mut req = Request::new(uvll::UV_CONNECT);
78+
let pipe = PipeWatcher::new(loop_, false);
9679

80+
do wait_until_woken_after(&mut cx.task) {
81+
unsafe {
82+
uvll::uv_pipe_connect(req.handle,
83+
pipe.handle(),
84+
name.with_ref(|p| p),
85+
connect_cb)
86+
}
87+
req.set_data(&cx);
88+
req.defuse(); // uv callback now owns this request
89+
}
90+
return match cx.result {
91+
0 => Ok(pipe),
92+
n => Err(UvError(n))
9793
};
9894

9995
extern fn connect_cb(req: *uvll::uv_connect_t, status: libc::c_int) {;
@@ -153,24 +149,22 @@ extern fn pipe_close_cb(handle: *uvll::uv_handle_t) {
153149

154150
impl PipeListener {
155151
pub fn bind(loop_: &Loop, name: &CString) -> Result<~PipeListener, UvError> {
156-
do task::unkillable {
157-
let pipe = PipeWatcher::new(loop_, false);
158-
match unsafe {
159-
uvll::uv_pipe_bind(pipe.handle(), name.with_ref(|p| p))
160-
} {
161-
0 => {
162-
// If successful, unwrap the PipeWatcher because we control how
163-
// we close the pipe differently. We can't rely on
164-
// StreamWatcher's default close method.
165-
let p = ~PipeListener {
166-
home: get_handle_to_current_scheduler!(),
167-
pipe: pipe.unwrap(),
168-
outgoing: Tube::new(),
169-
};
170-
Ok(p.install())
171-
}
172-
n => Err(UvError(n))
152+
let pipe = PipeWatcher::new(loop_, false);
153+
match unsafe {
154+
uvll::uv_pipe_bind(pipe.handle(), name.with_ref(|p| p))
155+
} {
156+
0 => {
157+
// If successful, unwrap the PipeWatcher because we control how
158+
// we close the pipe differently. We can't rely on
159+
// StreamWatcher's default close method.
160+
let p = ~PipeListener {
161+
home: get_handle_to_current_scheduler!(),
162+
pipe: pipe.unwrap(),
163+
outgoing: Tube::new(),
164+
};
165+
Ok(p.install())
173166
}
167+
n => Err(UvError(n))
174168
}
175169
}
176170
}
@@ -245,7 +239,6 @@ mod tests {
245239
use std::comm::oneshot;
246240
use std::rt::rtio::{RtioUnixListener, RtioUnixAcceptor, RtioPipe};
247241
use std::rt::test::next_test_unix;
248-
use std::task;
249242

250243
use super::*;
251244
use super::super::local_loop;
@@ -314,7 +307,7 @@ mod tests {
314307
let (port, chan) = oneshot();
315308
let chan = Cell::new(chan);
316309

317-
do task::spawn_unlinked { // plz no linked failure
310+
do spawn {
318311
let p = PipeListener::bind(local_loop(), &path2.to_c_str()).unwrap();
319312
let mut p = p.listen().unwrap();
320313
chan.take().send(());

‎src/librustuv/uvio.rs‎

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,6 @@ pub trait HomingIO {
4444
fn go_to_IO_home(&mut self) -> uint {
4545
use std::rt::sched::RunOnce;
4646

47-
unsafe {
48-
let task: *mut Task = Local::unsafe_borrow();
49-
(*task).death.inhibit_kill((*task).unwinder.unwinding);
50-
}
51-
5247
let _f = ForbidUnwind::new("going home");
5348

5449
let current_sched_id = do Local::borrow |sched: &mut Scheduler| {
@@ -127,11 +122,6 @@ impl Drop for HomingMissile {
127122
}
128123

129124
util::ignore(f);
130-
131-
unsafe {
132-
let task: *mut Task = Local::unsafe_borrow();
133-
(*task).death.allow_kill((*task).unwinder.unwinding);
134-
}
135125
}
136126
}
137127

‎src/libstd/path/posix.rs‎

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -565,7 +565,6 @@ mod tests {
565565
($name:expr => $code:block) => (
566566
{
567567
let mut t = task::task();
568-
t.supervised();
569568
t.name($name);
570569
let res = do t.try $code;
571570
assert!(res.is_err());

‎src/libstd/path/windows.rs‎

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1290,7 +1290,6 @@ mod tests {
12901290
($name:expr => $code:block) => (
12911291
{
12921292
let mut t = task::task();
1293-
t.supervised();
12941293
t.name($name);
12951294
let res = do t.try $code;
12961295
assert!(res.is_err());

‎src/libstd/rand/os.rs‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ mod test {
162162
for _ in range(0, 20) {
163163
let (p, c) = comm::stream();
164164
chans.push(c);
165-
do task::spawn_with(p) |p| {
165+
do task::spawn {
166166
// wait until all the tasks are ready to go.
167167
p.recv();
168168

‎src/libstd/rt/kill.rs‎

Lines changed: 30 additions & 713 deletions
Large diffs are not rendered by default.

‎src/libstd/rt/mod.rs‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ pub use self::util::set_exit_status;
8888
pub use self::util::default_sched_threads;
8989

9090
// Re-export of the functionality in the kill module
91-
pub use self::kill::{KillHandle, BlockedTask};
91+
pub use self::kill::BlockedTask;
9292

9393
// XXX: these probably shouldn't be public...
9494
#[doc(hidden)]

‎src/libstd/rt/sched.rs‎

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
// option. This file may not be copied, modified, or distributed
99
// except according to those terms.
1010

11-
use either::{Left, Right};
1211
use option::{Option, Some, None};
1312
use cast::{transmute, transmute_mut_region, transmute_mut_unsafe};
1413
use clone::Clone;
@@ -621,9 +620,6 @@ impl Scheduler {
621620
unsafe {
622621
let task: *mut Task = Local::unsafe_borrow();
623622
(*task).sched.get_mut_ref().run_cleanup_job();
624-
625-
// Must happen after running the cleanup job (of course).
626-
(*task).death.check_killed((*task).unwinder.unwinding);
627623
}
628624
}
629625

@@ -689,14 +685,9 @@ impl Scheduler {
689685
pub fn switch_running_tasks_and_then(~self, next_task: ~Task,
690686
f: |&mut Scheduler, BlockedTask|) {
691687
// This is where we convert the BlockedTask-taking closure into one
692-
// that takes just a Task, and is aware of the block-or-killed protocol.
688+
// that takes just a Task
693689
do self.change_task_context(next_task) |sched, task| {
694-
// Task might need to receive a kill signal instead of blocking.
695-
// We can call the "and_then" only if it blocks successfully.
696-
match BlockedTask::try_block(task) {
697-
Left(killed_task) => sched.enqueue_task(killed_task),
698-
Right(blocked_task) => f(sched, blocked_task),
699-
}
690+
f(sched, BlockedTask::block(task))
700691
}
701692
}
702693

‎src/libstd/rt/task.rs‎

Lines changed: 3 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,6 @@ use rt::logging::StdErrLogger;
3636
use rt::sched::{Scheduler, SchedHandle};
3737
use rt::stack::{StackSegment, StackPool};
3838
use send_str::SendStr;
39-
use task::LinkedFailure;
40-
use task::spawn::Taskgroup;
4139
use unstable::finally::Finally;
4240

4341
// The Task struct represents all state associated with a rust
@@ -52,7 +50,6 @@ pub struct Task {
5250
storage: LocalStorage,
5351
logger: Option<StdErrLogger>,
5452
unwinder: Unwinder,
55-
taskgroup: Option<Taskgroup>,
5653
death: Death,
5754
destroyed: bool,
5855
name: Option<SendStr>,
@@ -188,7 +185,6 @@ impl Task {
188185
storage: LocalStorage(None),
189186
logger: None,
190187
unwinder: Unwinder { unwinding: false, cause: None },
191-
taskgroup: None,
192188
death: Death::new(),
193189
destroyed: false,
194190
coroutine: Some(Coroutine::empty()),
@@ -223,7 +219,6 @@ impl Task {
223219
storage: LocalStorage(None),
224220
logger: None,
225221
unwinder: Unwinder { unwinding: false, cause: None },
226-
taskgroup: None,
227222
death: Death::new(),
228223
destroyed: false,
229224
name: None,
@@ -246,9 +241,7 @@ impl Task {
246241
storage: LocalStorage(None),
247242
logger: None,
248243
unwinder: Unwinder { unwinding: false, cause: None },
249-
taskgroup: None,
250-
// FIXME(#7544) make watching optional
251-
death: self.death.new_child(),
244+
death: Death::new(),
252245
destroyed: false,
253246
name: None,
254247
coroutine: Some(Coroutine::new(stack_pool, stack_size, start)),
@@ -333,11 +326,7 @@ impl Task {
333326
// Cleanup the dynamic borrowck debugging info
334327
borrowck::clear_task_borrow_list();
335328

336-
// NB. We pass the taskgroup into death so that it can be dropped while
337-
// the unkillable counter is set. This is necessary for when the
338-
// taskgroup destruction code drops references on KillHandles, which
339-
// might require using unkillable (to synchronize with an unwrapper).
340-
self.death.collect_failure(self.unwinder.to_unwind_result(), self.taskgroup.take());
329+
self.death.collect_failure(self.unwinder.to_unwind_result());
341330
self.destroyed = true;
342331
}
343332

@@ -660,10 +649,7 @@ pub fn begin_unwind<M: Any + Send>(msg: M, file: &'static str, line: uint) -> !
660649
Some(s) => *s,
661650
None => match msg.as_ref::<~str>() {
662651
Some(s) => s.as_slice(),
663-
None => match msg.as_ref::<LinkedFailure>() {
664-
Some(*) => "linked failure",
665-
None => "~Any",
666-
}
652+
None => "~Any",
667653
}
668654
};
669655

@@ -785,16 +771,6 @@ mod test {
785771
}
786772
}
787773
788-
#[test]
789-
fn linked_failure() {
790-
do run_in_newsched_task() {
791-
let res = do spawntask_try {
792-
spawntask_random(|| fail!());
793-
};
794-
assert!(res.is_err());
795-
}
796-
}
797-
798774
#[test]
799775
fn heap_cycles() {
800776
use option::{Option, Some, None};

‎src/libstd/run.rs‎

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ use io;
2121
use libc::{pid_t, c_int};
2222
use libc;
2323
use prelude::*;
24-
use task;
2524

2625
/**
2726
* A value representing a child process.
@@ -221,21 +220,15 @@ impl Process {
221220
let ch = SharedChan::new(ch);
222221
let ch_clone = ch.clone();
223222

224-
// FIXME(#910, #8674): right now I/O is incredibly brittle when it comes
225-
// to linked failure, so these tasks must be spawn so they're not
226-
// affected by linked failure. If these are removed, then the
227-
// runtime may never exit because linked failure will cause some
228-
// SchedHandle structures to not get destroyed, meaning that
229-
// there's always an async watcher available.
230-
do task::spawn_unlinked {
223+
do spawn {
231224
do io::ignore_io_error {
232225
match error.take() {
233226
Some(ref mut e) => ch.send((2, e.read_to_end())),
234227
None => ch.send((2, ~[]))
235228
}
236229
}
237230
}
238-
do task::spawn_unlinked {
231+
do spawn {
239232
do io::ignore_io_error {
240233
match output.take() {
241234
Some(ref mut e) => ch_clone.send((1, e.read_to_end())),

‎src/libstd/select.rs‎

Lines changed: 29 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ use rt::local::Local;
2121
use rt::rtio::EventLoop;
2222
use rt::sched::Scheduler;
2323
use rt::shouldnt_be_public::{SelectInner, SelectPortInner};
24-
use task;
2524
use unstable::finally::Finally;
2625
use vec::{OwnedVector, MutableVector};
2726

@@ -79,11 +78,10 @@ pub fn select<A: Select>(ports: &mut [A]) -> uint {
7978
do sched.event_loop.callback { c.take().send_deferred(()) }
8079
}
8180
}).finally {
82-
let p = Cell::new(p.take());
8381
// Unkillable is necessary not because getting killed is dangerous here,
8482
// but to force the recv not to use the same kill-flag that we used for
8583
// selecting. Otherwise a user-sender could spuriously wakeup us here.
86-
do task::unkillable { p.take().recv(); }
84+
p.take().recv();
8785
}
8886

8987
// Task resumes. Now unblock ourselves from all the ports we blocked on.
@@ -230,63 +228,50 @@ mod test {
230228
}
231229

232230
#[test]
233-
fn select_unkillable() {
231+
fn select_simple() {
234232
do run_in_uv_task {
235-
do task::unkillable { select_helper(2, [1]) }
233+
select_helper(2, [1])
236234
}
237235
}
238236

239237
/* blocking select tests */
240238

241239
#[test]
242240
fn select_blocking() {
243-
select_blocking_helper(true);
244-
select_blocking_helper(false);
245-
246-
fn select_blocking_helper(killable: bool) {
247-
do run_in_uv_task {
248-
let (p1,_c) = oneshot();
249-
let (p2,c2) = oneshot();
250-
let mut ports = [p1,p2];
251-
252-
let (p3,c3) = oneshot();
253-
let (p4,c4) = oneshot();
254-
255-
let x = Cell::new((c2, p3, c4));
256-
do task::spawn {
257-
let (c2, p3, c4) = x.take();
258-
p3.recv(); // handshake parent
259-
c4.send(()); // normal receive
260-
task::deschedule();
261-
c2.send(()); // select receive
262-
}
263-
264-
// Try to block before child sends on c2.
265-
c3.send(());
266-
p4.recv();
267-
if killable {
268-
assert!(select(ports) == 1);
269-
} else {
270-
do task::unkillable { assert!(select(ports) == 1); }
271-
}
241+
do run_in_uv_task {
242+
let (p1,_c) = oneshot();
243+
let (p2,c2) = oneshot();
244+
let mut ports = [p1,p2];
245+
246+
let (p3,c3) = oneshot();
247+
let (p4,c4) = oneshot();
248+
249+
let x = Cell::new((c2, p3, c4));
250+
do task::spawn {
251+
let (c2, p3, c4) = x.take();
252+
p3.recv(); // handshake parent
253+
c4.send(()); // normal receive
254+
task::deschedule();
255+
c2.send(()); // select receive
272256
}
257+
258+
// Try to block before child sends on c2.
259+
c3.send(());
260+
p4.recv();
261+
assert!(select(ports) == 1);
273262
}
274263
}
275264

276265
#[test]
277266
fn select_racing_senders() {
278267
static NUM_CHANS: uint = 10;
279268

280-
select_racing_senders_helper(true, ~[0,1,2,3,4,5,6,7,8,9]);
281-
select_racing_senders_helper(false, ~[0,1,2,3,4,5,6,7,8,9]);
282-
select_racing_senders_helper(true, ~[0,1,2]);
283-
select_racing_senders_helper(false, ~[0,1,2]);
284-
select_racing_senders_helper(true, ~[3,4,5,6]);
285-
select_racing_senders_helper(false, ~[3,4,5,6]);
286-
select_racing_senders_helper(true, ~[7,8,9]);
287-
select_racing_senders_helper(false, ~[7,8,9]);
269+
select_racing_senders_helper(~[0,1,2,3,4,5,6,7,8,9]);
270+
select_racing_senders_helper(~[0,1,2]);
271+
select_racing_senders_helper(~[3,4,5,6]);
272+
select_racing_senders_helper(~[7,8,9]);
288273

289-
fn select_racing_senders_helper(killable: bool, send_on_chans: ~[uint]) {
274+
fn select_racing_senders_helper(send_on_chans: ~[uint]) {
290275
use rt::test::spawntask_random;
291276

292277
do run_in_uv_task {
@@ -307,45 +292,10 @@ mod test {
307292
}
308293
}
309294
// nondeterministic result, but should succeed
310-
if killable {
311-
select(ports);
312-
} else {
313-
do task::unkillable { select(ports); }
314-
}
295+
select(ports);
315296
}
316297
}
317298
}
318299
}
319300
}
320-
321-
#[test]
322-
fn select_killed() {
323-
do run_in_uv_task {
324-
let (success_p, success_c) = oneshot::<bool>();
325-
let success_c = Cell::new(success_c);
326-
do task::try {
327-
let success_c = Cell::new(success_c.take());
328-
do task::unkillable {
329-
let (p,c) = oneshot();
330-
let c = Cell::new(c);
331-
do task::spawn {
332-
let (dead_ps, dead_cs) = unzip(range(0u, 5).map(|_| oneshot::<()>()));
333-
let mut ports = dead_ps;
334-
select(ports); // should get killed; nothing should leak
335-
c.take().send(()); // must not happen
336-
// Make sure dead_cs doesn't get closed until after select.
337-
let _ = dead_cs;
338-
}
339-
do task::spawn {
340-
fail!(); // should kill sibling awake
341-
}
342-
343-
// wait for killed selector to close (NOT send on) its c.
344-
// hope to send 'true'.
345-
success_c.take().send(p.try_recv().is_none());
346-
}
347-
};
348-
assert!(success_p.recv());
349-
}
350-
}
351301
}

‎src/libstd/task/mod.rs‎

Lines changed: 7 additions & 642 deletions
Large diffs are not rendered by default.

‎src/libstd/task/spawn.rs‎

Lines changed: 11 additions & 520 deletions
Large diffs are not rendered by default.

‎src/libstd/unstable/finally.rs‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ do || {
2525

2626
use ops::Drop;
2727

28-
#[cfg(test)] use task::{failing, spawn};
28+
#[cfg(test)] use task::failing;
2929

3030
pub trait Finally<T> {
3131
fn finally(&self, dtor: ||) -> T;

‎src/libstd/unstable/sync.rs‎

Lines changed: 58 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -135,65 +135,63 @@ impl<T: Send> UnsafeArc<T> {
135135
/// block; otherwise, an unwrapping task can be killed by linked failure.
136136
pub fn unwrap(self) -> T {
137137
let this = Cell::new(self); // argh
138-
do task::unkillable {
139-
unsafe {
140-
let mut this = this.take();
141-
// The ~ dtor needs to run if this code succeeds.
142-
let mut data: ~ArcData<T> = cast::transmute(this.data);
143-
// Set up the unwrap protocol.
144-
let (p1,c1) = comm::oneshot(); // ()
145-
let (p2,c2) = comm::oneshot(); // bool
146-
// Try to put our server end in the unwrapper slot.
147-
// This needs no barrier -- it's protected by the release barrier on
148-
// the xadd, and the acquire+release barrier in the destructor's xadd.
149-
if data.unwrapper.fill(~(c1,p2), Relaxed).is_none() {
150-
// Got in. Tell this handle's destructor not to run (we are now it).
151-
this.data = ptr::mut_null();
152-
// Drop our own reference.
153-
let old_count = data.count.fetch_sub(1, Release);
154-
assert!(old_count >= 1);
155-
if old_count == 1 {
156-
// We were the last owner. Can unwrap immediately.
157-
// AtomicOption's destructor will free the server endpoint.
138+
unsafe {
139+
let mut this = this.take();
140+
// The ~ dtor needs to run if this code succeeds.
141+
let mut data: ~ArcData<T> = cast::transmute(this.data);
142+
// Set up the unwrap protocol.
143+
let (p1,c1) = comm::oneshot(); // ()
144+
let (p2,c2) = comm::oneshot(); // bool
145+
// Try to put our server end in the unwrapper slot.
146+
// This needs no barrier -- it's protected by the release barrier on
147+
// the xadd, and the acquire+release barrier in the destructor's xadd.
148+
if data.unwrapper.fill(~(c1,p2), Relaxed).is_none() {
149+
// Got in. Tell this handle's destructor not to run (we are now it).
150+
this.data = ptr::mut_null();
151+
// Drop our own reference.
152+
let old_count = data.count.fetch_sub(1, Release);
153+
assert!(old_count >= 1);
154+
if old_count == 1 {
155+
// We were the last owner. Can unwrap immediately.
156+
// AtomicOption's destructor will free the server endpoint.
157+
// FIXME(#3224): it should be like this
158+
// let ~ArcData { data: user_data, _ } = data;
159+
// user_data
160+
data.data.take_unwrap()
161+
} else {
162+
// The *next* person who sees the refcount hit 0 will wake us.
163+
let p1 = Cell::new(p1); // argh
164+
// Unlike the above one, this cell is necessary. It will get
165+
// taken either in the do block or in the finally block.
166+
let c2_and_data = Cell::new((c2,data));
167+
do (|| {
168+
p1.take().recv();
169+
// Got here. Back in the 'unkillable' without getting killed.
170+
let (c2, data) = c2_and_data.take();
171+
c2.send(true);
158172
// FIXME(#3224): it should be like this
159173
// let ~ArcData { data: user_data, _ } = data;
160174
// user_data
175+
let mut data = data;
161176
data.data.take_unwrap()
162-
} else {
163-
// The *next* person who sees the refcount hit 0 will wake us.
164-
let p1 = Cell::new(p1); // argh
165-
// Unlike the above one, this cell is necessary. It will get
166-
// taken either in the do block or in the finally block.
167-
let c2_and_data = Cell::new((c2,data));
168-
do (|| {
169-
do task::rekillable { p1.take().recv(); }
170-
// Got here. Back in the 'unkillable' without getting killed.
177+
}).finally {
178+
if task::failing() {
179+
// Killed during wait. Because this might happen while
180+
// someone else still holds a reference, we can't free
181+
// the data now; the "other" last refcount will free it.
171182
let (c2, data) = c2_and_data.take();
172-
c2.send(true);
173-
// FIXME(#3224): it should be like this
174-
// let ~ArcData { data: user_data, _ } = data;
175-
// user_data
176-
let mut data = data;
177-
data.data.take_unwrap()
178-
}).finally {
179-
if task::failing() {
180-
// Killed during wait. Because this might happen while
181-
// someone else still holds a reference, we can't free
182-
// the data now; the "other" last refcount will free it.
183-
let (c2, data) = c2_and_data.take();
184-
c2.send(false);
185-
cast::forget(data);
186-
} else {
187-
assert!(c2_and_data.is_empty());
188-
}
183+
c2.send(false);
184+
cast::forget(data);
185+
} else {
186+
assert!(c2_and_data.is_empty());
189187
}
190188
}
191-
} else {
192-
// If 'put' returns the server end back to us, we were rejected;
193-
// someone else was trying to unwrap. Avoid guaranteed deadlock.
194-
cast::forget(data);
195-
fail!("Another task is already unwrapping this Arc!");
196189
}
190+
} else {
191+
// If 'put' returns the server end back to us, we were rejected;
192+
// someone else was trying to unwrap. Avoid guaranteed deadlock.
193+
cast::forget(data);
194+
fail!("Another task is already unwrapping this Arc!");
197195
}
198196
}
199197
}
@@ -259,17 +257,15 @@ impl<T> Drop for UnsafeArc<T>{
259257
match data.unwrapper.take(Acquire) {
260258
Some(~(message,response)) => {
261259
let cell = Cell::new((message, response, data));
262-
do task::unkillable {
263-
let (message, response, data) = cell.take();
264-
// Send 'ready' and wait for a response.
265-
message.send(());
266-
// Unkillable wait. Message guaranteed to come.
267-
if response.recv() {
268-
// Other task got the data.
269-
cast::forget(data);
270-
} else {
271-
// Other task was killed. drop glue takes over.
272-
}
260+
let (message, response, data) = cell.take();
261+
// Send 'ready' and wait for a response.
262+
message.send(());
263+
// Unkillable wait. Message guaranteed to come.
264+
if response.recv() {
265+
// Other task got the data.
266+
cast::forget(data);
267+
} else {
268+
// Other task was killed. drop glue takes over.
273269
}
274270
}
275271
None => {
@@ -678,24 +674,4 @@ mod tests {
678674
assert!(x.unwrap() == ~~"hello");
679675
assert!(res.recv().is_ok());
680676
}
681-
682-
#[test]
683-
fn exclusive_new_unwrap_deadlock() {
684-
// This is not guaranteed to get to the deadlock before being killed,
685-
// but it will show up sometimes, and if the deadlock were not there,
686-
// the test would nondeterministically fail.
687-
let result = do task::try {
688-
// a task that has two references to the same Exclusive::new will
689-
// deadlock when it unwraps. nothing to be done about that.
690-
let x = Exclusive::new(~~"hello");
691-
let x2 = x.clone();
692-
do task::spawn {
693-
do 10.times { task::deschedule(); } // try to let the unwrapper go
694-
fail!(); // punt it awake from its deadlock
695-
}
696-
let _z = x.unwrap();
697-
unsafe { do x2.with |_hello| { } }
698-
};
699-
assert!(result.is_err());
700-
}
701677
}

‎src/test/bench/shootout-k-nucleotide-pipes.rs‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -179,9 +179,9 @@ fn main() {
179179
180180
let (from_parent, to_child) = comm::stream();
181181
182-
do task::spawn_with(from_parent) |from_parent| {
182+
do spawn {
183183
make_sequence_processor(sz, &from_parent, &to_parent_);
184-
};
184+
}
185185
186186
to_child
187187
}.collect::<~[Chan<~[u8]>]>();

‎src/test/bench/task-perf-jargon-metal-smoke.rs‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ fn child_generation(gens_left: uint, c: comm::Chan<()>) {
2828
// With this code, only as many generations are alive at a time as tasks
2929
// alive at a time,
3030
let c = Cell::new(c);
31-
do task::spawn_supervised {
31+
do spawn {
3232
let c = c.take();
3333
if gens_left & 1 == 1 {
3434
task::deschedule(); // shake things up a bit

‎src/test/bench/task-perf-linked-failure.rs‎

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
// xfail-pretty
2+
// xfail-test linked failure
23

34
// Copyright 2012 The Rust Project Developers. See the COPYRIGHT
45
// file at the top-level directory of this distribution and at
@@ -35,8 +36,6 @@ fn grandchild_group(num_tasks: uint) {
3536
for _ in range(0, num_tasks) {
3637
let ch = ch.clone();
3738
let mut t = task::task();
38-
t.linked();
39-
t.unwatched();
4039
do t.spawn { // linked
4140
ch.send(());
4241
let (p, _c) = stream::<()>();

‎src/test/run-fail/fail-task-name-none.rs‎

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,11 @@
1010

1111
// error-pattern:task '<unnamed>' failed at 'test'
1212

13+
use std::task;
14+
1315
fn main() {
14-
do spawn {
16+
do task::try {
1517
fail!("test");
16-
}
18+
1
19+
}.unwrap()
1720
}

‎src/test/run-fail/fail-task-name-owned.rs‎

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,13 @@
1010

1111
// error-pattern:task 'owned name' failed at 'test'
1212

13+
use std::task;
14+
1315
fn main() {
14-
let mut t = ::std::task::task();
16+
let mut t = task::task();
1517
t.name(~"owned name");
16-
do t.spawn {
18+
do t.try {
1719
fail!("test");
18-
}
20+
1
21+
}.unwrap()
1922
}

‎src/test/run-fail/fail-task-name-send-str.rs‎

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@
1313
fn main() {
1414
let mut t = ::std::task::task();
1515
t.name("send name".to_send_str());
16-
do t.spawn {
16+
do t.try {
1717
fail!("test");
18-
}
18+
3
19+
}.unwrap()
1920
}

‎src/test/run-fail/fail-task-name-static.rs‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
fn main() {
1414
let mut t = ::std::task::task();
1515
t.name("static name");
16-
do t.spawn {
16+
do t.try {
1717
fail!("test");
18-
}
18+
}.unwrap()
1919
}

‎src/test/run-fail/task-spawn-barefn.rs‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use std::task;
1515
fn main() {
1616
// the purpose of this test is to make sure that task::spawn()
1717
// works when provided with a bare function:
18-
task::spawn(startfn);
18+
task::try(startfn).unwrap();
1919
}
2020

2121
fn startfn() {

‎src/test/run-pass/unwind-box.rs‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,5 +18,5 @@ fn f() {
1818
}
1919

2020
pub fn main() {
21-
task::spawn_unlinked(f);
21+
task::spawn(f);
2222
}

‎src/test/run-pass/unwind-resource.rs‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ fn f(c: SharedChan<bool>) {
4242
pub fn main() {
4343
let (p, c) = stream();
4444
let c = SharedChan::new(c);
45-
task::spawn_unlinked(|| f(c.clone()) );
45+
task::spawn(|| f(c.clone()) );
4646
error!("hiiiiiiiii");
4747
assert!(p.recv());
4848
}

‎src/test/run-pass/unwind-resource2.rs‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,5 +35,5 @@ fn f() {
3535
}
3636

3737
pub fn main() {
38-
task::spawn_unlinked(f);
38+
task::spawn(f);
3939
}

‎src/test/run-pass/unwind-unique.rs‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,5 +18,5 @@ fn f() {
1818
}
1919

2020
pub fn main() {
21-
task::spawn_unlinked(f);
21+
task::spawn(f);
2222
}

0 commit comments

Comments
 (0)
Please sign in to comment.