Skip to content

2.x: platform-aware purge/removeOnCancelPolicy management. #3460

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ private static final class CachedWorkerPool {
Future<?> task = null;
if (unit != null) {
evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
((ScheduledThreadPoolExecutor)evictor).setRemoveOnCancelPolicy(true);
try {
task = evictor.scheduleWithFixedDelay(
new Runnable() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,7 @@ public class NewThreadWorker extends Scheduler.Worker implements Disposable {

/* package */
public NewThreadWorker(ThreadFactory threadFactory) {
ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, threadFactory);
// Java 7+: cancelled future tasks can be removed from the executor thus avoiding memory leak
if (exec instanceof ScheduledThreadPoolExecutor) {
((ScheduledThreadPoolExecutor)exec).setRemoveOnCancelPolicy(true);
}
ScheduledExecutorService exec = SchedulerPoolHelper.create(threadFactory);
executor = exec;
}

Expand All @@ -59,7 +55,7 @@ public Disposable schedule(final Runnable action, long delayTime, TimeUnit unit)
* @param run
* @param delayTime
* @param unit
* @return
* @return a Disposable that let's the caller cancel the scheduled runnable
*/
public Disposable scheduleDirect(final Runnable run, long delayTime, TimeUnit unit) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
Expand All @@ -84,7 +80,7 @@ public Disposable scheduleDirect(final Runnable run, long delayTime, TimeUnit un
* @param initialDelay
* @param period
* @param unit
* @return
* @return a Disposable that let's the caller cancel the scheduled runnable
*/
public Disposable schedulePeriodicallyDirect(final Runnable run, long initialDelay, long period, TimeUnit unit) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
Expand All @@ -103,12 +99,15 @@ public Disposable schedulePeriodicallyDirect(final Runnable run, long initialDel
* on the underlying ScheduledExecutorService.
* <p>If the schedule has been rejected, the ScheduledRunnable.wasScheduled will return
* false.
* @param action
* @param run
* @param delayTime
* @param unit
* @return
* @param parent the parent tracking structure to add the created ScheduledRunnable instance before
* it gets scheduled.
* @return a Disposable that let's the caller cancel the scheduled runnable
*/
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, TimeUnit unit, CompositeResource<Disposable> parent) {
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, TimeUnit unit,
CompositeResource<Disposable> parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
/**
* Copyright 2015 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.schedulers;

import java.lang.reflect.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;

import io.reactivex.internal.util.Exceptions;

/**
* Manages the purging of cancelled and delayed tasks considering platform specifics.
*/
public enum SchedulerPoolHelper {
;

/** Key to force purging instead of using removeOnCancelPolicy if available. */
private static final String FORCE_PURGE_KEY = "rx2.scheduler.purge-force";
/**
* Force periodic purging instead of removeOnCancelPolicy even if available.
* Default {@code false}.
*/
private static volatile boolean FORCE_PURGE;

/** Key to the purge frequency parameter in milliseconds. */
private static final String PURGE_FREQUENCY_KEY = "rx2.scheduler.purge-frequency";
/** The purge frequency in milliseconds. */
private static volatile int PURGE_FREQUENCY;
/**
* Holds onto the ScheduledExecutorService that periodically purges all known
* ScheduledThreadPoolExecutors in POOLS.
*/
static final AtomicReference<ScheduledExecutorService> PURGE_THREAD;
/**
* Holds onto the created ScheduledThreadPoolExecutors by this helper.
*/
static final Map<ScheduledThreadPoolExecutor, ScheduledExecutorService> POOLS;

/**
* The reflective method used for setting the removeOnCancelPolicy (JDK 6 safe way).
*/
static final Method SET_REMOVE_ON_CANCEL_POLICY;

static final ThreadFactory PURGE_THREAD_FACTORY;
/**
* Initializes the static fields and figures out the settings for purging.
*/
static {
PURGE_THREAD = new AtomicReference<>();
POOLS = new ConcurrentHashMap<>();
PURGE_THREAD_FACTORY = new RxThreadFactory("RxSchedulerPurge-");

Properties props = System.getProperties();

boolean forcePurgeValue = false;
Method removeOnCancelMethod = null;

// this is necessary because force is turned on and off by tests on desktop
try {
removeOnCancelMethod = ScheduledThreadPoolExecutor.class.getMethod("setRemoveOnCancelPolicy", Boolean.TYPE);
} catch (NoSuchMethodException | SecurityException e) {
// if not present, no problem
forcePurgeValue = true;
}

if (!forcePurgeValue && props.containsKey(FORCE_PURGE_KEY)) {
forcePurgeValue = Boolean.getBoolean(FORCE_PURGE_KEY);
}

PURGE_FREQUENCY = Integer.getInteger(PURGE_FREQUENCY_KEY, 2000);

FORCE_PURGE = forcePurgeValue;
SET_REMOVE_ON_CANCEL_POLICY = removeOnCancelMethod;
start();
}

/**
* Returns the status of the force-purge settings.
* @return the force purge settings
*/
public static boolean forcePurge() {
return FORCE_PURGE;
}

/**
* Sets the force-purge settings.
* <p>Note that enabling or disabling the force-purge by itself doesn't apply to
* existing schedulers and they have to be restarted.
* @param force the new force state
*/
/* test */
public static void forcePurge(boolean force) {
FORCE_PURGE = force;
}

/**
* Returns purge frequency in milliseconds.
* @return purge frequency in milliseconds
*/
public static int purgeFrequency() {
return PURGE_FREQUENCY;
}

/**
* Returns true if the platform supports removeOnCancelPolicy.
* @return true if the platform supports removeOnCancelPolicy.
*/
public static boolean isRemoveOnCancelPolicySupported() {
return SET_REMOVE_ON_CANCEL_POLICY != null;
}

/**
* Creates a single threaded ScheduledExecutorService and wires up all
* necessary purging or removeOnCancelPolicy settings with it.
* @param factory the thread factory to use
* @return the created ScheduledExecutorService
* @throws IllegalStateException if force-purge is not enabled yet the platform doesn't support removeOnCancelPolicy;
* or Executors.newScheduledThreadPool doesn't return a ScheduledThreadPoolExecutor.
*/
public static ScheduledExecutorService create(ThreadFactory factory) {
ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
if (FORCE_PURGE) {
if (exec instanceof ScheduledThreadPoolExecutor) {
ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) exec;
POOLS.put(e, e);
} else {
throw new IllegalStateException("The Executors.newScheduledThreadPool didn't return a ScheduledThreadPoolExecutor.");
}
} else {
Method m = SET_REMOVE_ON_CANCEL_POLICY;
if (m == null) {
throw new IllegalStateException("The ScheduledThreadPoolExecutor doesn't support the removeOnCancelPolicy and purging is not enabled.");
}
try {
m.invoke(exec, true);
} catch (IllegalAccessException | InvocationTargetException e) {
Exceptions.propagate(e);
}
}

return exec;
}

/**
* Starts the purge thread and the periodic purging.
*/
public static void start() {
// if purge is not enabled don't do anything
if (!FORCE_PURGE) {
return;
}
for (;;) {
ScheduledExecutorService curr = PURGE_THREAD.get();
if (curr != null) {
return;
}
ScheduledExecutorService next = Executors.newScheduledThreadPool(1, PURGE_THREAD_FACTORY);
if (PURGE_THREAD.compareAndSet(null, next)) {

next.scheduleAtFixedRate(SchedulerPoolHelper::doPurge,
PURGE_FREQUENCY, PURGE_FREQUENCY, TimeUnit.MILLISECONDS);

return;
} else {
next.shutdownNow();
}
}
}

/**
* Shuts down the purge thread and forgets the known ScheduledExecutorServices.
* <p>Note that this stops purging the known ScheduledExecutorServices which may be shut
* down as well to appreciate a new forcePurge state.
*/
public static void shutdown() {
shutdown(true);
}
/**
* Shuts down the purge thread and clears the known ScheduledExecutorServices from POOLS when
* requested.
* <p>Note that this stops purging the known ScheduledExecutorServices which may be shut
* down as well to appreciate a new forcePurge state.
* @param clear if true, the helper forgets all associated ScheduledExecutorServices
*/
public static void shutdown(boolean clear) {
for (;;) {
ScheduledExecutorService curr = PURGE_THREAD.get();
if (curr == null) {
return;
}
if (PURGE_THREAD.compareAndSet(curr, null)) {
curr.shutdownNow();
if (clear) {
POOLS.clear();
}
return;
}
}
}

/**
* Loops through the known ScheduledExecutors and removes the ones that were shut down
* and purges the others
*/
static void doPurge() {
try {
for (ScheduledThreadPoolExecutor e : new ArrayList<>(POOLS.keySet())) {
if (e.isShutdown()) {
POOLS.remove(e);
} else {
e.purge();
}
}
} catch (Throwable ex) {
// ignoring any error, just in case
}
}

/**
* Purges all known ScheduledExecutorServices immediately on the purge thread.
*/
public static void purgeAsync() {
ScheduledExecutorService exec = PURGE_THREAD.get();
if (exec != null) {
try {
exec.submit(SchedulerPoolHelper::doPurge);
} catch (RejectedExecutionException ex) {
// ignored, we are in shutdown
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ public SingleScheduler() {
}

static ScheduledExecutorService createExecutor() {
ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, new RxThreadFactory("RxSingleScheduler-"));
((ScheduledThreadPoolExecutor)exec).setRemoveOnCancelPolicy(true);
ScheduledExecutorService exec = SchedulerPoolHelper.create(new RxThreadFactory("RxSingleScheduler-"));
return exec;
}

Expand Down
13 changes: 13 additions & 0 deletions src/main/java/io/reactivex/schedulers/Schedulers.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,28 @@ public static Scheduler from(Executor executor) {
return new ExecutorScheduler(executor);
}

/**
* Shuts down all standard schedulers: computation, io, newThread, single and trampoline.
* <p>The method is threadsafe and idempotent with respect to other calls to it until the
* {@link #start()} method is called.
* <p>Note that this may cut streams in half and they may end up hanging indefinitely.
* Make sure you cancel all outstanding streams before you shut down the standard schedulers.
* <p>Schedulers created from Executors via {@link #from(Executor)} are not affected.
*/
public static void shutdown() {
computation().shutdown();
io().shutdown();
newThread().shutdown();
single().shutdown();
trampoline().shutdown();
SchedulerPoolHelper.shutdown();
}

/**
* Starts up all standard schedulers: computation, io, newThread, single and trampoline.
*/
public static void start() {
SchedulerPoolHelper.start();
computation().start();
io().start();
newThread().start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ public void accept(Integer t1) {
})
.observeOn(Schedulers.computation())
.window(5, 4)
.take(2))
.take(2), 128)
.subscribe(ts);
ts.awaitTerminalEvent(500, TimeUnit.MILLISECONDS);
ts.assertTerminated();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,9 @@ public void accept(Integer t1) {
})
.observeOn(Schedulers.computation())
.window(5, 4)
.take(2))
.take(2), 128)
.subscribe(ts);
ts.awaitTerminalEvent(500, TimeUnit.MILLISECONDS);
ts.awaitTerminalEvent(400, TimeUnit.MILLISECONDS);
ts.assertTerminated();
ts.assertValues(1, 2, 3, 4, 5, 5, 6, 7, 8, 9);
// make sure we don't emit all values ... the unsubscribe should propagate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.junit.*;

import io.reactivex.*;
import io.reactivex.Scheduler.Worker;

public class CachedThreadSchedulerTest extends AbstractSchedulerConcurrencyTests {

Expand Down Expand Up @@ -66,20 +65,9 @@ public final void testHandledErrorIsNotDeliveredToThreadHandler() throws Interru
SchedulerTests.testHandledErrorIsNotDeliveredToThreadHandler(getScheduler());
}

@Test(timeout = 30000)
@Test(timeout = 90000)
public void testCancelledTaskRetention() throws InterruptedException {
Worker w = Schedulers.io().createWorker();
try {
ExecutorSchedulerTest.testCancelledRetention(w, false);
} finally {
w.dispose();
}
w = Schedulers.io().createWorker();
try {
ExecutorSchedulerTest.testCancelledRetention(w, true);
} finally {
w.dispose();
}
SchedulerRetentionTest.testCancellationRetention(Schedulers.io(), true);
}

}
Loading