From eb2d610fee1ea7370f7187649aaed9ab7959fb79 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Mon, 19 Oct 2015 13:58:13 +0200 Subject: [PATCH 1/2] 2.x: platform-aware purge/removeOnCancelPolicy management. --- .../internal/schedulers/IOScheduler.java | 1 - .../internal/schedulers/NewThreadWorker.java | 19 +- .../schedulers/SchedulerPoolHelper.java | 244 ++++++++++++++++++ .../internal/schedulers/SingleScheduler.java | 3 +- .../io/reactivex/schedulers/Schedulers.java | 13 + .../operators/OperatorWindowWithSizeTest.java | 2 +- .../nbp/NbpOperatorWindowWithSizeTest.java | 2 +- .../schedulers/CachedThreadSchedulerTest.java | 16 +- .../schedulers/ComputationSchedulerTests.java | 16 +- .../schedulers/ExecutorSchedulerTest.java | 85 +----- .../schedulers/SchedulerRetentionTest.java | 176 +++++++++++++ .../schedulers/SingleSchedulerTest.java | 23 ++ 12 files changed, 476 insertions(+), 124 deletions(-) create mode 100644 src/main/java/io/reactivex/internal/schedulers/SchedulerPoolHelper.java create mode 100644 src/test/java/io/reactivex/schedulers/SchedulerRetentionTest.java create mode 100644 src/test/java/io/reactivex/schedulers/SingleSchedulerTest.java diff --git a/src/main/java/io/reactivex/internal/schedulers/IOScheduler.java b/src/main/java/io/reactivex/internal/schedulers/IOScheduler.java index 72541f1c03..f369234597 100644 --- a/src/main/java/io/reactivex/internal/schedulers/IOScheduler.java +++ b/src/main/java/io/reactivex/internal/schedulers/IOScheduler.java @@ -61,7 +61,6 @@ private static final class CachedWorkerPool { Future task = null; if (unit != null) { evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY); - ((ScheduledThreadPoolExecutor)evictor).setRemoveOnCancelPolicy(true); try { task = evictor.scheduleWithFixedDelay( new Runnable() { diff --git a/src/main/java/io/reactivex/internal/schedulers/NewThreadWorker.java b/src/main/java/io/reactivex/internal/schedulers/NewThreadWorker.java index 8f032748b6..0f519ae971 100644 --- a/src/main/java/io/reactivex/internal/schedulers/NewThreadWorker.java +++ b/src/main/java/io/reactivex/internal/schedulers/NewThreadWorker.java @@ -32,11 +32,7 @@ public class NewThreadWorker extends Scheduler.Worker implements Disposable { /* package */ public NewThreadWorker(ThreadFactory threadFactory) { - ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, threadFactory); - // Java 7+: cancelled future tasks can be removed from the executor thus avoiding memory leak - if (exec instanceof ScheduledThreadPoolExecutor) { - ((ScheduledThreadPoolExecutor)exec).setRemoveOnCancelPolicy(true); - } + ScheduledExecutorService exec = SchedulerPoolHelper.create(threadFactory); executor = exec; } @@ -59,7 +55,7 @@ public Disposable schedule(final Runnable action, long delayTime, TimeUnit unit) * @param run * @param delayTime * @param unit - * @return + * @return a Disposable that let's the caller cancel the scheduled runnable */ public Disposable scheduleDirect(final Runnable run, long delayTime, TimeUnit unit) { Runnable decoratedRun = RxJavaPlugins.onSchedule(run); @@ -84,7 +80,7 @@ public Disposable scheduleDirect(final Runnable run, long delayTime, TimeUnit un * @param initialDelay * @param period * @param unit - * @return + * @return a Disposable that let's the caller cancel the scheduled runnable */ public Disposable schedulePeriodicallyDirect(final Runnable run, long initialDelay, long period, TimeUnit unit) { Runnable decoratedRun = RxJavaPlugins.onSchedule(run); @@ -103,12 +99,15 @@ public Disposable schedulePeriodicallyDirect(final Runnable run, long initialDel * on the underlying ScheduledExecutorService. *

If the schedule has been rejected, the ScheduledRunnable.wasScheduled will return * false. - * @param action + * @param run * @param delayTime * @param unit - * @return + * @param parent the parent tracking structure to add the created ScheduledRunnable instance before + * it gets scheduled. + * @return a Disposable that let's the caller cancel the scheduled runnable */ - public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, TimeUnit unit, CompositeResource parent) { + public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, TimeUnit unit, + CompositeResource parent) { Runnable decoratedRun = RxJavaPlugins.onSchedule(run); ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent); diff --git a/src/main/java/io/reactivex/internal/schedulers/SchedulerPoolHelper.java b/src/main/java/io/reactivex/internal/schedulers/SchedulerPoolHelper.java new file mode 100644 index 0000000000..f255bd32cc --- /dev/null +++ b/src/main/java/io/reactivex/internal/schedulers/SchedulerPoolHelper.java @@ -0,0 +1,244 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.schedulers; + +import java.lang.reflect.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicReference; + +import io.reactivex.internal.util.Exceptions; + +/** + * Manages the purging of cancelled and delayed tasks considering platform specifics. + */ +public enum SchedulerPoolHelper { + ; + + /** Key to force purging instead of using removeOnCancelPolicy if available. */ + private static final String FORCE_PURGE_KEY = "rx2.scheduler.purge-force"; + /** + * Force periodic purging instead of removeOnCancelPolicy even if available. + * Default {@code false}. + */ + private static volatile boolean FORCE_PURGE; + + /** Key to the purge frequency parameter in milliseconds. */ + private static final String PURGE_FREQUENCY_KEY = "rx2.scheduler.purge-frequency"; + /** The purge frequency in milliseconds. */ + private static volatile int PURGE_FREQUENCY; + /** + * Holds onto the ScheduledExecutorService that periodically purges all known + * ScheduledThreadPoolExecutors in POOLS. + */ + static final AtomicReference PURGE_THREAD; + /** + * Holds onto the created ScheduledThreadPoolExecutors by this helper. + */ + static final Map POOLS; + + /** + * The reflective method used for setting the removeOnCancelPolicy (JDK 6 safe way). + */ + static final Method SET_REMOVE_ON_CANCEL_POLICY; + + static final ThreadFactory PURGE_THREAD_FACTORY; + /** + * Initializes the static fields and figures out the settings for purging. + */ + static { + PURGE_THREAD = new AtomicReference<>(); + POOLS = new ConcurrentHashMap<>(); + PURGE_THREAD_FACTORY = new RxThreadFactory("RxSchedulerPurge-"); + + Properties props = System.getProperties(); + + boolean forcePurgeValue = false; + Method removeOnCancelMethod = null; + + // this is necessary because force is turned on and off by tests on desktop + try { + removeOnCancelMethod = ScheduledThreadPoolExecutor.class.getMethod("setRemoveOnCancelPolicy", Boolean.TYPE); + } catch (NoSuchMethodException | SecurityException e) { + // if not present, no problem + forcePurgeValue = true; + } + + if (!forcePurgeValue && props.containsKey(FORCE_PURGE_KEY)) { + forcePurgeValue = Boolean.getBoolean(FORCE_PURGE_KEY); + } + + PURGE_FREQUENCY = Integer.getInteger(PURGE_FREQUENCY_KEY, 2000); + + FORCE_PURGE = forcePurgeValue; + SET_REMOVE_ON_CANCEL_POLICY = removeOnCancelMethod; + start(); + } + + /** + * Returns the status of the force-purge settings. + * @return the force purge settings + */ + public static boolean forcePurge() { + return FORCE_PURGE; + } + + /** + * Sets the force-purge settings. + *

Note that enabling or disabling the force-purge by itself doesn't apply to + * existing schedulers and they have to be restarted. + * @param force the new force state + */ + /* test */ + public static void forcePurge(boolean force) { + FORCE_PURGE = force; + } + + /** + * Returns purge frequency in milliseconds. + * @return purge frequency in milliseconds + */ + public static int purgeFrequency() { + return PURGE_FREQUENCY; + } + + /** + * Returns true if the platform supports removeOnCancelPolicy. + * @return true if the platform supports removeOnCancelPolicy. + */ + public static boolean isRemoveOnCancelPolicySupported() { + return SET_REMOVE_ON_CANCEL_POLICY != null; + } + + /** + * Creates a single threaded ScheduledExecutorService and wires up all + * necessary purging or removeOnCancelPolicy settings with it. + * @param factory the thread factory to use + * @return the created ScheduledExecutorService + * @throws IllegalStateException if force-purge is not enabled yet the platform doesn't support removeOnCancelPolicy; + * or Executors.newScheduledThreadPool doesn't return a ScheduledThreadPoolExecutor. + */ + public static ScheduledExecutorService create(ThreadFactory factory) { + ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory); + if (FORCE_PURGE) { + if (exec instanceof ScheduledThreadPoolExecutor) { + ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) exec; + POOLS.put(e, e); + } else { + throw new IllegalStateException("The Executors.newScheduledThreadPool didn't return a ScheduledThreadPoolExecutor."); + } + } else { + Method m = SET_REMOVE_ON_CANCEL_POLICY; + if (m == null) { + throw new IllegalStateException("The ScheduledThreadPoolExecutor doesn't support the removeOnCancelPolicy and purging is not enabled."); + } + try { + m.invoke(exec, true); + } catch (IllegalAccessException | InvocationTargetException e) { + Exceptions.propagate(e); + } + } + + return exec; + } + + /** + * Starts the purge thread and the periodic purging. + */ + public static void start() { + // if purge is not enabled don't do anything + if (!FORCE_PURGE) { + return; + } + for (;;) { + ScheduledExecutorService curr = PURGE_THREAD.get(); + if (curr != null) { + return; + } + ScheduledExecutorService next = Executors.newScheduledThreadPool(1, PURGE_THREAD_FACTORY); + if (PURGE_THREAD.compareAndSet(null, next)) { + + next.scheduleAtFixedRate(SchedulerPoolHelper::doPurge, + PURGE_FREQUENCY, PURGE_FREQUENCY, TimeUnit.MILLISECONDS); + + return; + } else { + next.shutdownNow(); + } + } + } + + /** + * Shuts down the purge thread and forgets the known ScheduledExecutorServices. + *

Note that this stops purging the known ScheduledExecutorServices which may be shut + * down as well to appreciate a new forcePurge state. + */ + public static void shutdown() { + shutdown(true); + } + /** + * Shuts down the purge thread and clears the known ScheduledExecutorServices from POOLS when + * requested. + *

Note that this stops purging the known ScheduledExecutorServices which may be shut + * down as well to appreciate a new forcePurge state. + * @param clear if true, the helper forgets all associated ScheduledExecutorServices + */ + public static void shutdown(boolean clear) { + for (;;) { + ScheduledExecutorService curr = PURGE_THREAD.get(); + if (curr == null) { + return; + } + if (PURGE_THREAD.compareAndSet(curr, null)) { + curr.shutdownNow(); + if (clear) { + POOLS.clear(); + } + return; + } + } + } + + /** + * Loops through the known ScheduledExecutors and removes the ones that were shut down + * and purges the others + */ + static void doPurge() { + try { + for (ScheduledThreadPoolExecutor e : new ArrayList<>(POOLS.keySet())) { + if (e.isShutdown()) { + POOLS.remove(e); + } else { + e.purge(); + } + } + } catch (Throwable ex) { + // ignoring any error, just in case + } + } + + /** + * Purges all known ScheduledExecutorServices immediately on the purge thread. + */ + public static void purgeAsync() { + ScheduledExecutorService exec = PURGE_THREAD.get(); + if (exec != null) { + try { + exec.submit(SchedulerPoolHelper::doPurge); + } catch (RejectedExecutionException ex) { + // ignored, we are in shutdown + } + } + } +} diff --git a/src/main/java/io/reactivex/internal/schedulers/SingleScheduler.java b/src/main/java/io/reactivex/internal/schedulers/SingleScheduler.java index 26af80ce93..6ea819bd24 100644 --- a/src/main/java/io/reactivex/internal/schedulers/SingleScheduler.java +++ b/src/main/java/io/reactivex/internal/schedulers/SingleScheduler.java @@ -38,8 +38,7 @@ public SingleScheduler() { } static ScheduledExecutorService createExecutor() { - ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, new RxThreadFactory("RxSingleScheduler-")); - ((ScheduledThreadPoolExecutor)exec).setRemoveOnCancelPolicy(true); + ScheduledExecutorService exec = SchedulerPoolHelper.create(new RxThreadFactory("RxSingleScheduler-")); return exec; } diff --git a/src/main/java/io/reactivex/schedulers/Schedulers.java b/src/main/java/io/reactivex/schedulers/Schedulers.java index d3548b5166..fe2a2b691b 100644 --- a/src/main/java/io/reactivex/schedulers/Schedulers.java +++ b/src/main/java/io/reactivex/schedulers/Schedulers.java @@ -94,15 +94,28 @@ public static Scheduler from(Executor executor) { return new ExecutorScheduler(executor); } + /** + * Shuts down all standard schedulers: computation, io, newThread, single and trampoline. + *

The method is threadsafe and idempotent with respect to other calls to it until the + * {@link #start()} method is called. + *

Note that this may cut streams in half and they may end up hanging indefinitely. + * Make sure you cancel all outstanding streams before you shut down the standard schedulers. + *

Schedulers created from Executors via {@link #from(Executor)} are not affected. + */ public static void shutdown() { computation().shutdown(); io().shutdown(); newThread().shutdown(); single().shutdown(); trampoline().shutdown(); + SchedulerPoolHelper.shutdown(); } + /** + * Starts up all standard schedulers: computation, io, newThread, single and trampoline. + */ public static void start() { + SchedulerPoolHelper.start(); computation().start(); io().start(); newThread().start(); diff --git a/src/test/java/io/reactivex/internal/operators/OperatorWindowWithSizeTest.java b/src/test/java/io/reactivex/internal/operators/OperatorWindowWithSizeTest.java index 89e671e617..01dc9b4ca1 100644 --- a/src/test/java/io/reactivex/internal/operators/OperatorWindowWithSizeTest.java +++ b/src/test/java/io/reactivex/internal/operators/OperatorWindowWithSizeTest.java @@ -182,7 +182,7 @@ public void accept(Integer t1) { }) .observeOn(Schedulers.computation()) .window(5, 4) - .take(2)) + .take(2), 128) .subscribe(ts); ts.awaitTerminalEvent(500, TimeUnit.MILLISECONDS); ts.assertTerminated(); diff --git a/src/test/java/io/reactivex/internal/operators/nbp/NbpOperatorWindowWithSizeTest.java b/src/test/java/io/reactivex/internal/operators/nbp/NbpOperatorWindowWithSizeTest.java index e201097009..97d9782d56 100644 --- a/src/test/java/io/reactivex/internal/operators/nbp/NbpOperatorWindowWithSizeTest.java +++ b/src/test/java/io/reactivex/internal/operators/nbp/NbpOperatorWindowWithSizeTest.java @@ -178,7 +178,7 @@ public void accept(Integer t1) { }) .observeOn(Schedulers.computation()) .window(5, 4) - .take(2)) + .take(2), 128) .subscribe(ts); ts.awaitTerminalEvent(500, TimeUnit.MILLISECONDS); ts.assertTerminated(); diff --git a/src/test/java/io/reactivex/schedulers/CachedThreadSchedulerTest.java b/src/test/java/io/reactivex/schedulers/CachedThreadSchedulerTest.java index b2d3091055..1b942ab612 100644 --- a/src/test/java/io/reactivex/schedulers/CachedThreadSchedulerTest.java +++ b/src/test/java/io/reactivex/schedulers/CachedThreadSchedulerTest.java @@ -20,7 +20,6 @@ import org.junit.*; import io.reactivex.*; -import io.reactivex.Scheduler.Worker; public class CachedThreadSchedulerTest extends AbstractSchedulerConcurrencyTests { @@ -66,20 +65,9 @@ public final void testHandledErrorIsNotDeliveredToThreadHandler() throws Interru SchedulerTests.testHandledErrorIsNotDeliveredToThreadHandler(getScheduler()); } - @Test(timeout = 30000) + @Test(timeout = 90000) public void testCancelledTaskRetention() throws InterruptedException { - Worker w = Schedulers.io().createWorker(); - try { - ExecutorSchedulerTest.testCancelledRetention(w, false); - } finally { - w.dispose(); - } - w = Schedulers.io().createWorker(); - try { - ExecutorSchedulerTest.testCancelledRetention(w, true); - } finally { - w.dispose(); - } + SchedulerRetentionTest.testCancellationRetention(Schedulers.io(), true); } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/schedulers/ComputationSchedulerTests.java b/src/test/java/io/reactivex/schedulers/ComputationSchedulerTests.java index 7ed0644185..2a50489098 100644 --- a/src/test/java/io/reactivex/schedulers/ComputationSchedulerTests.java +++ b/src/test/java/io/reactivex/schedulers/ComputationSchedulerTests.java @@ -22,7 +22,6 @@ import org.junit.*; import io.reactivex.*; -import io.reactivex.Scheduler.Worker; public class ComputationSchedulerTests extends AbstractSchedulerConcurrencyTests { @@ -147,19 +146,8 @@ public final void testHandledErrorIsNotDeliveredToThreadHandler() throws Interru SchedulerTests.testHandledErrorIsNotDeliveredToThreadHandler(getScheduler()); } - @Test(timeout = 30000) + @Test(timeout = 90000) public void testCancelledTaskRetention() throws InterruptedException { - Worker w = Schedulers.computation().createWorker(); - try { - ExecutorSchedulerTest.testCancelledRetention(w, false); - } finally { - w.dispose(); - } - w = Schedulers.computation().createWorker(); - try { - ExecutorSchedulerTest.testCancelledRetention(w, true); - } finally { - w.dispose(); - } + SchedulerRetentionTest.testCancellationRetention(Schedulers.computation(), true); } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/schedulers/ExecutorSchedulerTest.java b/src/test/java/io/reactivex/schedulers/ExecutorSchedulerTest.java index ef53811add..02862a9341 100644 --- a/src/test/java/io/reactivex/schedulers/ExecutorSchedulerTest.java +++ b/src/test/java/io/reactivex/schedulers/ExecutorSchedulerTest.java @@ -13,9 +13,8 @@ package io.reactivex.schedulers; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; -import java.lang.management.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; @@ -46,90 +45,14 @@ public final void testHandledErrorIsNotDeliveredToThreadHandler() throws Interru SchedulerTests.testHandledErrorIsNotDeliveredToThreadHandler(getScheduler()); } - public static void testCancelledRetention(Scheduler.Worker w, boolean periodic) throws InterruptedException { - System.out.println("Wait before GC"); - Thread.sleep(1000); - - System.out.println("GC"); - System.gc(); - - Thread.sleep(1000); - - - MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); - MemoryUsage memHeap = memoryMXBean.getHeapMemoryUsage(); - long initial = memHeap.getUsed(); - - System.out.printf("Starting: %.3f MB%n", initial / 1024.0 / 1024.0); - - int n = 500 * 1000; - if (periodic) { - final CountDownLatch cdl = new CountDownLatch(n); - final Runnable action = new Runnable() { - @Override - public void run() { - cdl.countDown(); - } - }; - for (int i = 0; i < n; i++) { - if (i % 50000 == 0) { - System.out.println(" -> still scheduling: " + i); - } - w.schedulePeriodically(action, 0, 1, TimeUnit.DAYS); - } - - System.out.println("Waiting for the first round to finish..."); - cdl.await(); - } else { - for (int i = 0; i < n; i++) { - if (i % 50000 == 0) { - System.out.println(" -> still scheduling: " + i); - } - w.schedule(() -> { }, 1, TimeUnit.DAYS); - } - } - - memHeap = memoryMXBean.getHeapMemoryUsage(); - long after = memHeap.getUsed(); - System.out.printf("Peak: %.3f MB%n", after / 1024.0 / 1024.0); - - w.dispose(); - - System.out.println("Wait before second GC"); - Thread.sleep(1000 + 2000); - - System.out.println("Second GC"); - System.gc(); - - Thread.sleep(1000); - - memHeap = memoryMXBean.getHeapMemoryUsage(); - long finish = memHeap.getUsed(); - System.out.printf("After: %.3f MB%n", finish / 1024.0 / 1024.0); - - if (finish > initial * 5) { - fail(String.format("Tasks retained: %.3f -> %.3f -> %.3f", initial / 1024 / 1024.0, after / 1024 / 1024.0, finish / 1024 / 1024d)); - } - } - @Test(timeout = 30000) + + @Test(timeout = 90000) public void testCancelledTaskRetention() throws InterruptedException { ExecutorService exec = Executors.newSingleThreadExecutor(); Scheduler s = Schedulers.from(exec); try { - Scheduler.Worker w = s.createWorker(); - try { - testCancelledRetention(w, false); - } finally { - w.dispose(); - } - - w = s.createWorker(); - try { - testCancelledRetention(w, true); - } finally { - w.dispose(); - } + SchedulerRetentionTest.testCancellationRetention(s, false); } finally { exec.shutdownNow(); } diff --git a/src/test/java/io/reactivex/schedulers/SchedulerRetentionTest.java b/src/test/java/io/reactivex/schedulers/SchedulerRetentionTest.java new file mode 100644 index 0000000000..7fab0e84c8 --- /dev/null +++ b/src/test/java/io/reactivex/schedulers/SchedulerRetentionTest.java @@ -0,0 +1,176 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.schedulers; + +import static org.junit.Assert.fail; + +import java.lang.management.*; +import java.util.concurrent.*; + +import io.reactivex.Scheduler; +import io.reactivex.internal.schedulers.SchedulerPoolHelper; + +public class SchedulerRetentionTest { + + static void testCancelledRetentionWith(Scheduler.Worker w, boolean periodic) throws InterruptedException { + System.out.println(" Wait before GC"); + Thread.sleep(1000); + + System.out.println(" GC"); + System.gc(); + + Thread.sleep(1000); + + + MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); + MemoryUsage memHeap = memoryMXBean.getHeapMemoryUsage(); + long initial = memHeap.getUsed(); + + System.out.printf(" Starting: %.3f MB%n", initial / 1024.0 / 1024.0); + + int n = 100 * 1000; + if (periodic) { + final CountDownLatch cdl = new CountDownLatch(n); + final Runnable action = new Runnable() { + @Override + public void run() { + cdl.countDown(); + } + }; + for (int i = 0; i < n; i++) { + if (i % 50000 == 0) { + System.out.println(" -> still scheduling: " + i); + } + w.schedulePeriodically(action, 0, 1, TimeUnit.DAYS); + } + if (n % 50000 != 0) { + System.out.println(" -> still scheduling: " + n); + } + + System.out.println(" Waiting for the first round to finish..."); + cdl.await(); + } else { + for (int i = 0; i < n; i++) { + if (i % 50000 == 0) { + System.out.println(" -> still scheduling: " + i); + } + w.schedule(() -> { }, 1, TimeUnit.DAYS); + } + } + + memHeap = memoryMXBean.getHeapMemoryUsage(); + long after = memHeap.getUsed(); + System.out.printf(" Peak: %.3f MB%n", after / 1024.0 / 1024.0); + + w.dispose(); + + System.out.println(" Wait before second GC"); + int wait = 1000; + + if (SchedulerPoolHelper.forcePurge()) { + System.out.println(" Purging is enabled, increasing wait time."); + wait += SchedulerPoolHelper.purgeFrequency(); + wait += (int)(n * Math.log(n) / 200); + } + + while (wait > 0) { + System.out.printf(" -> Waiting before scond GC: %.0f seconds remaining%n", wait / 1000d); + Thread.sleep(1000); + + wait -= 1000; + } + + System.out.println(" Second GC"); + System.gc(); + + Thread.sleep(1000); + + memHeap = memoryMXBean.getHeapMemoryUsage(); + long finish = memHeap.getUsed(); + System.out.printf(" After: %.3f MB%n", finish / 1024.0 / 1024.0); + + if (finish > initial * 5) { + fail(String.format(" Tasks retained: %.3f -> %.3f -> %.3f", initial / 1024 / 1024.0, after / 1024 / 1024.0, finish / 1024 / 1024d)); + } + } + + static void runCancellationRetention(Scheduler s) throws InterruptedException { + Scheduler.Worker w = s.createWorker(); + System.out.println("<<<< Testing with one-shot delayed tasks"); + try { + testCancelledRetentionWith(w, false); + } finally { + w.dispose(); + } + System.out.println("<<<< Testing with periodic delayed tasks"); + w = Schedulers.computation().createWorker(); + try { + testCancelledRetentionWith(w, true); + } finally { + w.dispose(); + } + } + + /** + * Check if the worker handles the cancellation properly and doesn't retain cancelled + * + * @param s the scheduler to test + * @param bothMode if supported, check both setRemoveOnCancelPolicy and purge mode. + * @throws InterruptedException + */ + public static void testCancellationRetention(Scheduler s, boolean bothMode) throws InterruptedException { + System.out.println("------------------------------------------------------------------------"); + System.out.println(">> testCancellationRetention : " + s.getClass()); + if (bothMode && SchedulerPoolHelper.isRemoveOnCancelPolicySupported()) { + boolean force = SchedulerPoolHelper.forcePurge(); + + // switch to the other mode + + System.out.println(" Shutting down pool helper: force is " + force); + // don't clear any purge registrations from other schedulers + s.shutdown(); + SchedulerPoolHelper.shutdown(false); + System.out.println(" Letting the pool helper terminate"); + Thread.sleep(1000); + + System.out.println(" Setting forcePurge to " + !force); + SchedulerPoolHelper.forcePurge(!force); + + System.out.println(" Starting pool helper"); + SchedulerPoolHelper.start(); + s.start(); + + runCancellationRetention(s); + + // switch back to the original mode + + System.out.println(" Shutting down pool helper again"); + // clear the pool if the original mode wasn't force + s.shutdown(); + SchedulerPoolHelper.shutdown(!force); + + System.out.println(" Letting the pool helper terminate again"); + Thread.sleep(1000); + + System.out.println(" Restoring forcePurge to " + force); + SchedulerPoolHelper.forcePurge(force); + + System.out.println(" Starting pool helper"); + SchedulerPoolHelper.start(); + s.start(); + } + // run the regular mode checks + runCancellationRetention(s); + } +} diff --git a/src/test/java/io/reactivex/schedulers/SingleSchedulerTest.java b/src/test/java/io/reactivex/schedulers/SingleSchedulerTest.java new file mode 100644 index 0000000000..56c7ff29be --- /dev/null +++ b/src/test/java/io/reactivex/schedulers/SingleSchedulerTest.java @@ -0,0 +1,23 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.schedulers; + +import org.junit.Test; + +public class SingleSchedulerTest { + @Test(timeout = 90000) + public void testCancelledTaskRetention() throws InterruptedException { + SchedulerRetentionTest.testCancellationRetention(Schedulers.single(), true); + } +} From 9958667b0564fa51b49174f66cf583997309952a Mon Sep 17 00:00:00 2001 From: akarnokd Date: Mon, 19 Oct 2015 14:15:21 +0200 Subject: [PATCH 2/2] Change wait time on a non-deterministic test --- .../internal/operators/nbp/NbpOperatorWindowWithSizeTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/io/reactivex/internal/operators/nbp/NbpOperatorWindowWithSizeTest.java b/src/test/java/io/reactivex/internal/operators/nbp/NbpOperatorWindowWithSizeTest.java index 97d9782d56..13027e0529 100644 --- a/src/test/java/io/reactivex/internal/operators/nbp/NbpOperatorWindowWithSizeTest.java +++ b/src/test/java/io/reactivex/internal/operators/nbp/NbpOperatorWindowWithSizeTest.java @@ -180,7 +180,7 @@ public void accept(Integer t1) { .window(5, 4) .take(2), 128) .subscribe(ts); - ts.awaitTerminalEvent(500, TimeUnit.MILLISECONDS); + ts.awaitTerminalEvent(400, TimeUnit.MILLISECONDS); ts.assertTerminated(); ts.assertValues(1, 2, 3, 4, 5, 5, 6, 7, 8, 9); // make sure we don't emit all values ... the unsubscribe should propagate