diff --git a/src/main/java/io/reactivex/internal/schedulers/IOScheduler.java b/src/main/java/io/reactivex/internal/schedulers/IOScheduler.java index 72541f1c03..f369234597 100644 --- a/src/main/java/io/reactivex/internal/schedulers/IOScheduler.java +++ b/src/main/java/io/reactivex/internal/schedulers/IOScheduler.java @@ -61,7 +61,6 @@ private static final class CachedWorkerPool { Future> task = null; if (unit != null) { evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY); - ((ScheduledThreadPoolExecutor)evictor).setRemoveOnCancelPolicy(true); try { task = evictor.scheduleWithFixedDelay( new Runnable() { diff --git a/src/main/java/io/reactivex/internal/schedulers/NewThreadWorker.java b/src/main/java/io/reactivex/internal/schedulers/NewThreadWorker.java index 8f032748b6..0f519ae971 100644 --- a/src/main/java/io/reactivex/internal/schedulers/NewThreadWorker.java +++ b/src/main/java/io/reactivex/internal/schedulers/NewThreadWorker.java @@ -32,11 +32,7 @@ public class NewThreadWorker extends Scheduler.Worker implements Disposable { /* package */ public NewThreadWorker(ThreadFactory threadFactory) { - ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, threadFactory); - // Java 7+: cancelled future tasks can be removed from the executor thus avoiding memory leak - if (exec instanceof ScheduledThreadPoolExecutor) { - ((ScheduledThreadPoolExecutor)exec).setRemoveOnCancelPolicy(true); - } + ScheduledExecutorService exec = SchedulerPoolHelper.create(threadFactory); executor = exec; } @@ -59,7 +55,7 @@ public Disposable schedule(final Runnable action, long delayTime, TimeUnit unit) * @param run * @param delayTime * @param unit - * @return + * @return a Disposable that let's the caller cancel the scheduled runnable */ public Disposable scheduleDirect(final Runnable run, long delayTime, TimeUnit unit) { Runnable decoratedRun = RxJavaPlugins.onSchedule(run); @@ -84,7 +80,7 @@ public Disposable scheduleDirect(final Runnable run, long delayTime, TimeUnit un * @param initialDelay * @param period * @param unit - * @return + * @return a Disposable that let's the caller cancel the scheduled runnable */ public Disposable schedulePeriodicallyDirect(final Runnable run, long initialDelay, long period, TimeUnit unit) { Runnable decoratedRun = RxJavaPlugins.onSchedule(run); @@ -103,12 +99,15 @@ public Disposable schedulePeriodicallyDirect(final Runnable run, long initialDel * on the underlying ScheduledExecutorService. *
If the schedule has been rejected, the ScheduledRunnable.wasScheduled will return
* false.
- * @param action
+ * @param run
* @param delayTime
* @param unit
- * @return
+ * @param parent the parent tracking structure to add the created ScheduledRunnable instance before
+ * it gets scheduled.
+ * @return a Disposable that let's the caller cancel the scheduled runnable
*/
- public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, TimeUnit unit, CompositeResource Note that enabling or disabling the force-purge by itself doesn't apply to
+ * existing schedulers and they have to be restarted.
+ * @param force the new force state
+ */
+ /* test */
+ public static void forcePurge(boolean force) {
+ FORCE_PURGE = force;
+ }
+
+ /**
+ * Returns purge frequency in milliseconds.
+ * @return purge frequency in milliseconds
+ */
+ public static int purgeFrequency() {
+ return PURGE_FREQUENCY;
+ }
+
+ /**
+ * Returns true if the platform supports removeOnCancelPolicy.
+ * @return true if the platform supports removeOnCancelPolicy.
+ */
+ public static boolean isRemoveOnCancelPolicySupported() {
+ return SET_REMOVE_ON_CANCEL_POLICY != null;
+ }
+
+ /**
+ * Creates a single threaded ScheduledExecutorService and wires up all
+ * necessary purging or removeOnCancelPolicy settings with it.
+ * @param factory the thread factory to use
+ * @return the created ScheduledExecutorService
+ * @throws IllegalStateException if force-purge is not enabled yet the platform doesn't support removeOnCancelPolicy;
+ * or Executors.newScheduledThreadPool doesn't return a ScheduledThreadPoolExecutor.
+ */
+ public static ScheduledExecutorService create(ThreadFactory factory) {
+ ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
+ if (FORCE_PURGE) {
+ if (exec instanceof ScheduledThreadPoolExecutor) {
+ ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) exec;
+ POOLS.put(e, e);
+ } else {
+ throw new IllegalStateException("The Executors.newScheduledThreadPool didn't return a ScheduledThreadPoolExecutor.");
+ }
+ } else {
+ Method m = SET_REMOVE_ON_CANCEL_POLICY;
+ if (m == null) {
+ throw new IllegalStateException("The ScheduledThreadPoolExecutor doesn't support the removeOnCancelPolicy and purging is not enabled.");
+ }
+ try {
+ m.invoke(exec, true);
+ } catch (IllegalAccessException | InvocationTargetException e) {
+ Exceptions.propagate(e);
+ }
+ }
+
+ return exec;
+ }
+
+ /**
+ * Starts the purge thread and the periodic purging.
+ */
+ public static void start() {
+ // if purge is not enabled don't do anything
+ if (!FORCE_PURGE) {
+ return;
+ }
+ for (;;) {
+ ScheduledExecutorService curr = PURGE_THREAD.get();
+ if (curr != null) {
+ return;
+ }
+ ScheduledExecutorService next = Executors.newScheduledThreadPool(1, PURGE_THREAD_FACTORY);
+ if (PURGE_THREAD.compareAndSet(null, next)) {
+
+ next.scheduleAtFixedRate(SchedulerPoolHelper::doPurge,
+ PURGE_FREQUENCY, PURGE_FREQUENCY, TimeUnit.MILLISECONDS);
+
+ return;
+ } else {
+ next.shutdownNow();
+ }
+ }
+ }
+
+ /**
+ * Shuts down the purge thread and forgets the known ScheduledExecutorServices.
+ * Note that this stops purging the known ScheduledExecutorServices which may be shut
+ * down as well to appreciate a new forcePurge state.
+ */
+ public static void shutdown() {
+ shutdown(true);
+ }
+ /**
+ * Shuts down the purge thread and clears the known ScheduledExecutorServices from POOLS when
+ * requested.
+ * Note that this stops purging the known ScheduledExecutorServices which may be shut
+ * down as well to appreciate a new forcePurge state.
+ * @param clear if true, the helper forgets all associated ScheduledExecutorServices
+ */
+ public static void shutdown(boolean clear) {
+ for (;;) {
+ ScheduledExecutorService curr = PURGE_THREAD.get();
+ if (curr == null) {
+ return;
+ }
+ if (PURGE_THREAD.compareAndSet(curr, null)) {
+ curr.shutdownNow();
+ if (clear) {
+ POOLS.clear();
+ }
+ return;
+ }
+ }
+ }
+
+ /**
+ * Loops through the known ScheduledExecutors and removes the ones that were shut down
+ * and purges the others
+ */
+ static void doPurge() {
+ try {
+ for (ScheduledThreadPoolExecutor e : new ArrayList<>(POOLS.keySet())) {
+ if (e.isShutdown()) {
+ POOLS.remove(e);
+ } else {
+ e.purge();
+ }
+ }
+ } catch (Throwable ex) {
+ // ignoring any error, just in case
+ }
+ }
+
+ /**
+ * Purges all known ScheduledExecutorServices immediately on the purge thread.
+ */
+ public static void purgeAsync() {
+ ScheduledExecutorService exec = PURGE_THREAD.get();
+ if (exec != null) {
+ try {
+ exec.submit(SchedulerPoolHelper::doPurge);
+ } catch (RejectedExecutionException ex) {
+ // ignored, we are in shutdown
+ }
+ }
+ }
+}
diff --git a/src/main/java/io/reactivex/internal/schedulers/SingleScheduler.java b/src/main/java/io/reactivex/internal/schedulers/SingleScheduler.java
index 26af80ce93..6ea819bd24 100644
--- a/src/main/java/io/reactivex/internal/schedulers/SingleScheduler.java
+++ b/src/main/java/io/reactivex/internal/schedulers/SingleScheduler.java
@@ -38,8 +38,7 @@ public SingleScheduler() {
}
static ScheduledExecutorService createExecutor() {
- ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, new RxThreadFactory("RxSingleScheduler-"));
- ((ScheduledThreadPoolExecutor)exec).setRemoveOnCancelPolicy(true);
+ ScheduledExecutorService exec = SchedulerPoolHelper.create(new RxThreadFactory("RxSingleScheduler-"));
return exec;
}
diff --git a/src/main/java/io/reactivex/schedulers/Schedulers.java b/src/main/java/io/reactivex/schedulers/Schedulers.java
index d3548b5166..fe2a2b691b 100644
--- a/src/main/java/io/reactivex/schedulers/Schedulers.java
+++ b/src/main/java/io/reactivex/schedulers/Schedulers.java
@@ -94,15 +94,28 @@ public static Scheduler from(Executor executor) {
return new ExecutorScheduler(executor);
}
+ /**
+ * Shuts down all standard schedulers: computation, io, newThread, single and trampoline.
+ * The method is threadsafe and idempotent with respect to other calls to it until the
+ * {@link #start()} method is called.
+ * Note that this may cut streams in half and they may end up hanging indefinitely.
+ * Make sure you cancel all outstanding streams before you shut down the standard schedulers.
+ * Schedulers created from Executors via {@link #from(Executor)} are not affected.
+ */
public static void shutdown() {
computation().shutdown();
io().shutdown();
newThread().shutdown();
single().shutdown();
trampoline().shutdown();
+ SchedulerPoolHelper.shutdown();
}
+ /**
+ * Starts up all standard schedulers: computation, io, newThread, single and trampoline.
+ */
public static void start() {
+ SchedulerPoolHelper.start();
computation().start();
io().start();
newThread().start();
diff --git a/src/test/java/io/reactivex/internal/operators/OperatorWindowWithSizeTest.java b/src/test/java/io/reactivex/internal/operators/OperatorWindowWithSizeTest.java
index 89e671e617..01dc9b4ca1 100644
--- a/src/test/java/io/reactivex/internal/operators/OperatorWindowWithSizeTest.java
+++ b/src/test/java/io/reactivex/internal/operators/OperatorWindowWithSizeTest.java
@@ -182,7 +182,7 @@ public void accept(Integer t1) {
})
.observeOn(Schedulers.computation())
.window(5, 4)
- .take(2))
+ .take(2), 128)
.subscribe(ts);
ts.awaitTerminalEvent(500, TimeUnit.MILLISECONDS);
ts.assertTerminated();
diff --git a/src/test/java/io/reactivex/internal/operators/nbp/NbpOperatorWindowWithSizeTest.java b/src/test/java/io/reactivex/internal/operators/nbp/NbpOperatorWindowWithSizeTest.java
index e201097009..13027e0529 100644
--- a/src/test/java/io/reactivex/internal/operators/nbp/NbpOperatorWindowWithSizeTest.java
+++ b/src/test/java/io/reactivex/internal/operators/nbp/NbpOperatorWindowWithSizeTest.java
@@ -178,9 +178,9 @@ public void accept(Integer t1) {
})
.observeOn(Schedulers.computation())
.window(5, 4)
- .take(2))
+ .take(2), 128)
.subscribe(ts);
- ts.awaitTerminalEvent(500, TimeUnit.MILLISECONDS);
+ ts.awaitTerminalEvent(400, TimeUnit.MILLISECONDS);
ts.assertTerminated();
ts.assertValues(1, 2, 3, 4, 5, 5, 6, 7, 8, 9);
// make sure we don't emit all values ... the unsubscribe should propagate
diff --git a/src/test/java/io/reactivex/schedulers/CachedThreadSchedulerTest.java b/src/test/java/io/reactivex/schedulers/CachedThreadSchedulerTest.java
index b2d3091055..1b942ab612 100644
--- a/src/test/java/io/reactivex/schedulers/CachedThreadSchedulerTest.java
+++ b/src/test/java/io/reactivex/schedulers/CachedThreadSchedulerTest.java
@@ -20,7 +20,6 @@
import org.junit.*;
import io.reactivex.*;
-import io.reactivex.Scheduler.Worker;
public class CachedThreadSchedulerTest extends AbstractSchedulerConcurrencyTests {
@@ -66,20 +65,9 @@ public final void testHandledErrorIsNotDeliveredToThreadHandler() throws Interru
SchedulerTests.testHandledErrorIsNotDeliveredToThreadHandler(getScheduler());
}
- @Test(timeout = 30000)
+ @Test(timeout = 90000)
public void testCancelledTaskRetention() throws InterruptedException {
- Worker w = Schedulers.io().createWorker();
- try {
- ExecutorSchedulerTest.testCancelledRetention(w, false);
- } finally {
- w.dispose();
- }
- w = Schedulers.io().createWorker();
- try {
- ExecutorSchedulerTest.testCancelledRetention(w, true);
- } finally {
- w.dispose();
- }
+ SchedulerRetentionTest.testCancellationRetention(Schedulers.io(), true);
}
}
\ No newline at end of file
diff --git a/src/test/java/io/reactivex/schedulers/ComputationSchedulerTests.java b/src/test/java/io/reactivex/schedulers/ComputationSchedulerTests.java
index 7ed0644185..2a50489098 100644
--- a/src/test/java/io/reactivex/schedulers/ComputationSchedulerTests.java
+++ b/src/test/java/io/reactivex/schedulers/ComputationSchedulerTests.java
@@ -22,7 +22,6 @@
import org.junit.*;
import io.reactivex.*;
-import io.reactivex.Scheduler.Worker;
public class ComputationSchedulerTests extends AbstractSchedulerConcurrencyTests {
@@ -147,19 +146,8 @@ public final void testHandledErrorIsNotDeliveredToThreadHandler() throws Interru
SchedulerTests.testHandledErrorIsNotDeliveredToThreadHandler(getScheduler());
}
- @Test(timeout = 30000)
+ @Test(timeout = 90000)
public void testCancelledTaskRetention() throws InterruptedException {
- Worker w = Schedulers.computation().createWorker();
- try {
- ExecutorSchedulerTest.testCancelledRetention(w, false);
- } finally {
- w.dispose();
- }
- w = Schedulers.computation().createWorker();
- try {
- ExecutorSchedulerTest.testCancelledRetention(w, true);
- } finally {
- w.dispose();
- }
+ SchedulerRetentionTest.testCancellationRetention(Schedulers.computation(), true);
}
}
\ No newline at end of file
diff --git a/src/test/java/io/reactivex/schedulers/ExecutorSchedulerTest.java b/src/test/java/io/reactivex/schedulers/ExecutorSchedulerTest.java
index ef53811add..02862a9341 100644
--- a/src/test/java/io/reactivex/schedulers/ExecutorSchedulerTest.java
+++ b/src/test/java/io/reactivex/schedulers/ExecutorSchedulerTest.java
@@ -13,9 +13,8 @@
package io.reactivex.schedulers;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
-import java.lang.management.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
@@ -46,90 +45,14 @@ public final void testHandledErrorIsNotDeliveredToThreadHandler() throws Interru
SchedulerTests.testHandledErrorIsNotDeliveredToThreadHandler(getScheduler());
}
- public static void testCancelledRetention(Scheduler.Worker w, boolean periodic) throws InterruptedException {
- System.out.println("Wait before GC");
- Thread.sleep(1000);
-
- System.out.println("GC");
- System.gc();
-
- Thread.sleep(1000);
-
-
- MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
- MemoryUsage memHeap = memoryMXBean.getHeapMemoryUsage();
- long initial = memHeap.getUsed();
-
- System.out.printf("Starting: %.3f MB%n", initial / 1024.0 / 1024.0);
-
- int n = 500 * 1000;
- if (periodic) {
- final CountDownLatch cdl = new CountDownLatch(n);
- final Runnable action = new Runnable() {
- @Override
- public void run() {
- cdl.countDown();
- }
- };
- for (int i = 0; i < n; i++) {
- if (i % 50000 == 0) {
- System.out.println(" -> still scheduling: " + i);
- }
- w.schedulePeriodically(action, 0, 1, TimeUnit.DAYS);
- }
-
- System.out.println("Waiting for the first round to finish...");
- cdl.await();
- } else {
- for (int i = 0; i < n; i++) {
- if (i % 50000 == 0) {
- System.out.println(" -> still scheduling: " + i);
- }
- w.schedule(() -> { }, 1, TimeUnit.DAYS);
- }
- }
-
- memHeap = memoryMXBean.getHeapMemoryUsage();
- long after = memHeap.getUsed();
- System.out.printf("Peak: %.3f MB%n", after / 1024.0 / 1024.0);
-
- w.dispose();
-
- System.out.println("Wait before second GC");
- Thread.sleep(1000 + 2000);
-
- System.out.println("Second GC");
- System.gc();
-
- Thread.sleep(1000);
-
- memHeap = memoryMXBean.getHeapMemoryUsage();
- long finish = memHeap.getUsed();
- System.out.printf("After: %.3f MB%n", finish / 1024.0 / 1024.0);
-
- if (finish > initial * 5) {
- fail(String.format("Tasks retained: %.3f -> %.3f -> %.3f", initial / 1024 / 1024.0, after / 1024 / 1024.0, finish / 1024 / 1024d));
- }
- }
- @Test(timeout = 30000)
+
+ @Test(timeout = 90000)
public void testCancelledTaskRetention() throws InterruptedException {
ExecutorService exec = Executors.newSingleThreadExecutor();
Scheduler s = Schedulers.from(exec);
try {
- Scheduler.Worker w = s.createWorker();
- try {
- testCancelledRetention(w, false);
- } finally {
- w.dispose();
- }
-
- w = s.createWorker();
- try {
- testCancelledRetention(w, true);
- } finally {
- w.dispose();
- }
+ SchedulerRetentionTest.testCancellationRetention(s, false);
} finally {
exec.shutdownNow();
}
diff --git a/src/test/java/io/reactivex/schedulers/SchedulerRetentionTest.java b/src/test/java/io/reactivex/schedulers/SchedulerRetentionTest.java
new file mode 100644
index 0000000000..7fab0e84c8
--- /dev/null
+++ b/src/test/java/io/reactivex/schedulers/SchedulerRetentionTest.java
@@ -0,0 +1,176 @@
+/**
+ * Copyright 2015 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.schedulers;
+
+import static org.junit.Assert.fail;
+
+import java.lang.management.*;
+import java.util.concurrent.*;
+
+import io.reactivex.Scheduler;
+import io.reactivex.internal.schedulers.SchedulerPoolHelper;
+
+public class SchedulerRetentionTest {
+
+ static void testCancelledRetentionWith(Scheduler.Worker w, boolean periodic) throws InterruptedException {
+ System.out.println(" Wait before GC");
+ Thread.sleep(1000);
+
+ System.out.println(" GC");
+ System.gc();
+
+ Thread.sleep(1000);
+
+
+ MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
+ MemoryUsage memHeap = memoryMXBean.getHeapMemoryUsage();
+ long initial = memHeap.getUsed();
+
+ System.out.printf(" Starting: %.3f MB%n", initial / 1024.0 / 1024.0);
+
+ int n = 100 * 1000;
+ if (periodic) {
+ final CountDownLatch cdl = new CountDownLatch(n);
+ final Runnable action = new Runnable() {
+ @Override
+ public void run() {
+ cdl.countDown();
+ }
+ };
+ for (int i = 0; i < n; i++) {
+ if (i % 50000 == 0) {
+ System.out.println(" -> still scheduling: " + i);
+ }
+ w.schedulePeriodically(action, 0, 1, TimeUnit.DAYS);
+ }
+ if (n % 50000 != 0) {
+ System.out.println(" -> still scheduling: " + n);
+ }
+
+ System.out.println(" Waiting for the first round to finish...");
+ cdl.await();
+ } else {
+ for (int i = 0; i < n; i++) {
+ if (i % 50000 == 0) {
+ System.out.println(" -> still scheduling: " + i);
+ }
+ w.schedule(() -> { }, 1, TimeUnit.DAYS);
+ }
+ }
+
+ memHeap = memoryMXBean.getHeapMemoryUsage();
+ long after = memHeap.getUsed();
+ System.out.printf(" Peak: %.3f MB%n", after / 1024.0 / 1024.0);
+
+ w.dispose();
+
+ System.out.println(" Wait before second GC");
+ int wait = 1000;
+
+ if (SchedulerPoolHelper.forcePurge()) {
+ System.out.println(" Purging is enabled, increasing wait time.");
+ wait += SchedulerPoolHelper.purgeFrequency();
+ wait += (int)(n * Math.log(n) / 200);
+ }
+
+ while (wait > 0) {
+ System.out.printf(" -> Waiting before scond GC: %.0f seconds remaining%n", wait / 1000d);
+ Thread.sleep(1000);
+
+ wait -= 1000;
+ }
+
+ System.out.println(" Second GC");
+ System.gc();
+
+ Thread.sleep(1000);
+
+ memHeap = memoryMXBean.getHeapMemoryUsage();
+ long finish = memHeap.getUsed();
+ System.out.printf(" After: %.3f MB%n", finish / 1024.0 / 1024.0);
+
+ if (finish > initial * 5) {
+ fail(String.format(" Tasks retained: %.3f -> %.3f -> %.3f", initial / 1024 / 1024.0, after / 1024 / 1024.0, finish / 1024 / 1024d));
+ }
+ }
+
+ static void runCancellationRetention(Scheduler s) throws InterruptedException {
+ Scheduler.Worker w = s.createWorker();
+ System.out.println("<<<< Testing with one-shot delayed tasks");
+ try {
+ testCancelledRetentionWith(w, false);
+ } finally {
+ w.dispose();
+ }
+ System.out.println("<<<< Testing with periodic delayed tasks");
+ w = Schedulers.computation().createWorker();
+ try {
+ testCancelledRetentionWith(w, true);
+ } finally {
+ w.dispose();
+ }
+ }
+
+ /**
+ * Check if the worker handles the cancellation properly and doesn't retain cancelled
+ *
+ * @param s the scheduler to test
+ * @param bothMode if supported, check both setRemoveOnCancelPolicy and purge mode.
+ * @throws InterruptedException
+ */
+ public static void testCancellationRetention(Scheduler s, boolean bothMode) throws InterruptedException {
+ System.out.println("------------------------------------------------------------------------");
+ System.out.println(">> testCancellationRetention : " + s.getClass());
+ if (bothMode && SchedulerPoolHelper.isRemoveOnCancelPolicySupported()) {
+ boolean force = SchedulerPoolHelper.forcePurge();
+
+ // switch to the other mode
+
+ System.out.println(" Shutting down pool helper: force is " + force);
+ // don't clear any purge registrations from other schedulers
+ s.shutdown();
+ SchedulerPoolHelper.shutdown(false);
+ System.out.println(" Letting the pool helper terminate");
+ Thread.sleep(1000);
+
+ System.out.println(" Setting forcePurge to " + !force);
+ SchedulerPoolHelper.forcePurge(!force);
+
+ System.out.println(" Starting pool helper");
+ SchedulerPoolHelper.start();
+ s.start();
+
+ runCancellationRetention(s);
+
+ // switch back to the original mode
+
+ System.out.println(" Shutting down pool helper again");
+ // clear the pool if the original mode wasn't force
+ s.shutdown();
+ SchedulerPoolHelper.shutdown(!force);
+
+ System.out.println(" Letting the pool helper terminate again");
+ Thread.sleep(1000);
+
+ System.out.println(" Restoring forcePurge to " + force);
+ SchedulerPoolHelper.forcePurge(force);
+
+ System.out.println(" Starting pool helper");
+ SchedulerPoolHelper.start();
+ s.start();
+ }
+ // run the regular mode checks
+ runCancellationRetention(s);
+ }
+}
diff --git a/src/test/java/io/reactivex/schedulers/SingleSchedulerTest.java b/src/test/java/io/reactivex/schedulers/SingleSchedulerTest.java
new file mode 100644
index 0000000000..56c7ff29be
--- /dev/null
+++ b/src/test/java/io/reactivex/schedulers/SingleSchedulerTest.java
@@ -0,0 +1,23 @@
+/**
+ * Copyright 2015 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.schedulers;
+
+import org.junit.Test;
+
+public class SingleSchedulerTest {
+ @Test(timeout = 90000)
+ public void testCancelledTaskRetention() throws InterruptedException {
+ SchedulerRetentionTest.testCancellationRetention(Schedulers.single(), true);
+ }
+}