From cb4623a3768a187df9d6e3944bcd640c6eec442d Mon Sep 17 00:00:00 2001 From: akarnokd Date: Thu, 17 Oct 2019 11:40:23 +0200 Subject: [PATCH 1/2] 3.x: Fix window(time) possible interrupts while terminating --- .../flowable/FlowableWindowTimed.java | 100 ++++--- .../observable/ObservableWindowTimed.java | 94 ++++--- .../flowable/FlowableWindowWithTimeTest.java | 242 ++++++++++++++++- .../ObservableWindowWithTimeTest.java | 244 +++++++++++++++++- 4 files changed, 616 insertions(+), 64 deletions(-) diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowTimed.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowTimed.java index 0f1b43ae4a..15f1687d4c 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowTimed.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowTimed.java @@ -90,6 +90,8 @@ static final class WindowExactUnboundedSubscriber static final Object NEXT = new Object(); + static final Object DISPOSE = new Object(); + volatile boolean terminated; WindowExactUnboundedSubscriber(Subscriber> actual, long timespan, TimeUnit unit, @@ -160,7 +162,10 @@ public void onError(Throwable t) { } downstream.onError(t); - dispose(); + queue.offer(DISPOSE); + if (enter()) { + drainLoop(); + } } @Override @@ -171,7 +176,11 @@ public void onComplete() { } downstream.onComplete(); - dispose(); + + queue.offer(DISPOSE); + if (enter()) { + drainLoop(); + } } @Override @@ -184,22 +193,15 @@ public void cancel() { cancelled = true; } - public void dispose() { - DisposableHelper.dispose(timer); - } - @Override public void run() { - if (cancelled) { terminated = true; - dispose(); } queue.offer(NEXT); if (enter()) { drainLoop(); } - } void drainLoop() { @@ -218,19 +220,26 @@ void drainLoop() { Object o = q.poll(); - if (d && (o == null || o == NEXT)) { + if (d && (o == null || o == NEXT || o == DISPOSE)) { window = null; q.clear(); - dispose(); Throwable err = error; if (err != null) { w.onError(err); } else { w.onComplete(); } + timer.dispose(); return; } + if (o == DISPOSE) { + window = null; + q.clear(); + timer.dispose(); + break; + } + if (o == null) { break; } @@ -251,8 +260,8 @@ void drainLoop() { window = null; queue.clear(); upstream.cancel(); - dispose(); a.onError(new MissingBackpressureException("Could not deliver first window due to lack of requests.")); + timer.dispose(); return; } } else { @@ -295,6 +304,8 @@ static final class WindowExactBoundedSubscriber final SequentialDisposable timer = new SequentialDisposable(); + static final Object DISPOSE = new Object(); + WindowExactBoundedSubscriber( Subscriber> actual, long timespan, TimeUnit unit, Scheduler scheduler, @@ -396,7 +407,7 @@ public void onNext(T t) { window = null; upstream.cancel(); downstream.onError(new MissingBackpressureException("Could not deliver window due to lack of requests")); - dispose(); + disposeTimer(); return; } } else { @@ -424,7 +435,10 @@ public void onError(Throwable t) { } downstream.onError(t); - dispose(); + queue.offer(DISPOSE); + if (enter()) { + drainLoop(); + } } @Override @@ -435,7 +449,10 @@ public void onComplete() { } downstream.onComplete(); - dispose(); + queue.offer(DISPOSE); + if (enter()) { + drainLoop(); + } } @Override @@ -448,8 +465,8 @@ public void cancel() { cancelled = true; } - public void dispose() { - DisposableHelper.dispose(timer); + public void disposeTimer() { + timer.dispose(); Worker w = worker; if (w != null) { w.dispose(); @@ -468,7 +485,7 @@ void drainLoop() { if (terminated) { upstream.cancel(); q.clear(); - dispose(); + disposeTimer(); return; } @@ -478,8 +495,9 @@ void drainLoop() { boolean empty = o == null; boolean isHolder = o instanceof ConsumerIndexHolder; + boolean isDispose = o == DISPOSE; - if (d && (empty || isHolder)) { + if (d && (empty || isHolder || isDispose)) { window = null; q.clear(); Throwable err = error; @@ -488,10 +506,17 @@ void drainLoop() { } else { w.onComplete(); } - dispose(); + disposeTimer(); return; } + if (isDispose) { + window = null; + q.clear(); + disposeTimer(); + break; + } + if (empty) { break; } @@ -515,7 +540,7 @@ void drainLoop() { queue.clear(); upstream.cancel(); a.onError(new MissingBackpressureException("Could not deliver first window due to lack of requests.")); - dispose(); + disposeTimer(); return; } } @@ -554,7 +579,7 @@ void drainLoop() { window = null; upstream.cancel(); downstream.onError(new MissingBackpressureException("Could not deliver window due to lack of requests")); - dispose(); + disposeTimer(); return; } } else { @@ -585,7 +610,6 @@ public void run() { p.queue.offer(this); } else { p.terminated = true; - p.dispose(); } if (p.enter()) { p.drainLoop(); @@ -609,6 +633,8 @@ static final class WindowSkipSubscriber volatile boolean terminated; + static final Object DISPOSE = new Object(); + WindowSkipSubscriber(Subscriber> actual, long timespan, long timeskip, TimeUnit unit, Worker worker, int bufferSize) { @@ -682,7 +708,10 @@ public void onError(Throwable t) { } downstream.onError(t); - dispose(); + queue.offer(DISPOSE); + if (enter()) { + drainLoop(); + } } @Override @@ -693,7 +722,10 @@ public void onComplete() { } downstream.onComplete(); - dispose(); + queue.offer(DISPOSE); + if (enter()) { + drainLoop(); + } } @Override @@ -706,10 +738,6 @@ public void cancel() { cancelled = true; } - public void dispose() { - worker.dispose(); - } - void complete(UnicastProcessor w) { queue.offer(new SubjectWork(w, false)); if (enter()) { @@ -730,9 +758,9 @@ void drainLoop() { for (;;) { if (terminated) { upstream.cancel(); - dispose(); q.clear(); ws.clear(); + worker.dispose(); return; } @@ -742,8 +770,9 @@ void drainLoop() { boolean empty = v == null; boolean sw = v instanceof SubjectWork; + boolean isDispose = v == DISPOSE; - if (d && (empty || sw)) { + if (d && (empty || sw || isDispose)) { q.clear(); Throwable e = error; if (e != null) { @@ -756,10 +785,17 @@ void drainLoop() { } } ws.clear(); - dispose(); + worker.dispose(); return; } + if (isDispose) { + q.clear(); + ws.clear(); + worker.dispose(); + break; + } + if (empty) { break; } diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableWindowTimed.java b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableWindowTimed.java index df10e149ba..3a78753a37 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableWindowTimed.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableWindowTimed.java @@ -15,14 +15,13 @@ import java.util.*; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import io.reactivex.rxjava3.core.*; import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.core.Observer; import io.reactivex.rxjava3.core.Scheduler.Worker; import io.reactivex.rxjava3.disposables.Disposable; -import io.reactivex.rxjava3.internal.disposables.DisposableHelper; +import io.reactivex.rxjava3.internal.disposables.*; import io.reactivex.rxjava3.internal.observers.QueueDrainObserver; import io.reactivex.rxjava3.internal.queue.MpscLinkedQueue; import io.reactivex.rxjava3.internal.util.NotificationLite; @@ -85,10 +84,12 @@ static final class WindowExactUnboundedObserver UnicastSubject window; - final AtomicReference timer = new AtomicReference(); + final SequentialDisposable timer = new SequentialDisposable(); static final Object NEXT = new Object(); + static final Object DISPOSE = new Object(); + volatile boolean terminated; WindowExactUnboundedObserver(Observer> actual, long timespan, TimeUnit unit, @@ -114,7 +115,7 @@ public void onSubscribe(Disposable d) { if (!cancelled) { Disposable task = scheduler.schedulePeriodicallyDirect(this, timespan, timespan, unit); - DisposableHelper.replace(timer, task); + timer.replace(task); } } } @@ -146,8 +147,11 @@ public void onError(Throwable t) { drainLoop(); } - disposeTimer(); downstream.onError(t); + queue.offer(DISPOSE); + if (enter()) { + drainLoop(); + } } @Override @@ -157,8 +161,11 @@ public void onComplete() { drainLoop(); } - disposeTimer(); downstream.onComplete(); + queue.offer(DISPOSE); + if (enter()) { + drainLoop(); + } } @Override @@ -171,15 +178,10 @@ public boolean isDisposed() { return cancelled; } - void disposeTimer() { - DisposableHelper.dispose(timer); - } - @Override public void run() { if (cancelled) { terminated = true; - disposeTimer(); } queue.offer(NEXT); if (enter()) { @@ -203,19 +205,26 @@ void drainLoop() { Object o = q.poll(); - if (d && (o == null || o == NEXT)) { + if (d && (o == null || o == NEXT || o == DISPOSE)) { window = null; q.clear(); - disposeTimer(); Throwable err = error; if (err != null) { w.onError(err); } else { w.onComplete(); } + timer.dispose(); return; } + if (o == DISPOSE) { + window = null; + q.clear(); + timer.dispose(); + break; + } + if (o == null) { break; } @@ -266,7 +275,9 @@ static final class WindowExactBoundedObserver volatile boolean terminated; - final AtomicReference timer = new AtomicReference(); + final SequentialDisposable timer = new SequentialDisposable(); + + static final Object DISPOSE = new Object(); WindowExactBoundedObserver( Observer> actual, @@ -312,7 +323,7 @@ public void onSubscribe(Disposable d) { task = scheduler.schedulePeriodicallyDirect(consumerIndexHolder, timespan, timespan, unit); } - DisposableHelper.replace(timer, task); + timer.replace(task); } } @@ -370,7 +381,10 @@ public void onError(Throwable t) { } downstream.onError(t); - disposeTimer(); + queue.offer(DISPOSE); + if (enter()) { + drainLoop(); + } } @Override @@ -381,7 +395,10 @@ public void onComplete() { } downstream.onComplete(); - disposeTimer(); + queue.offer(DISPOSE); + if (enter()) { + drainLoop(); + } } @Override @@ -424,20 +441,28 @@ void drainLoop() { boolean empty = o == null; boolean isHolder = o instanceof ConsumerIndexHolder; + boolean isDispose = o == DISPOSE; - if (d && (empty || isHolder)) { + if (d && (empty || isHolder || isDispose)) { window = null; q.clear(); - disposeTimer(); Throwable err = error; if (err != null) { w.onError(err); } else { w.onComplete(); } + disposeTimer(); return; } + if (isDispose) { + window = null; + q.clear(); + disposeTimer(); + break; + } + if (empty) { break; } @@ -507,7 +532,6 @@ public void run() { p.queue.offer(this); } else { p.terminated = true; - p.disposeTimer(); } if (p.enter()) { p.drainLoop(); @@ -531,6 +555,8 @@ static final class WindowSkipObserver volatile boolean terminated; + static final Object DISPOSE = new Object(); + WindowSkipObserver(Observer> actual, long timespan, long timeskip, TimeUnit unit, Worker worker, int bufferSize) { @@ -592,7 +618,10 @@ public void onError(Throwable t) { } downstream.onError(t); - disposeWorker(); + queue.offer(DISPOSE); + if (enter()) { + drainLoop(); + } } @Override @@ -603,7 +632,10 @@ public void onComplete() { } downstream.onComplete(); - disposeWorker(); + queue.offer(DISPOSE); + if (enter()) { + drainLoop(); + } } @Override @@ -616,10 +648,6 @@ public boolean isDisposed() { return cancelled; } - void disposeWorker() { - worker.dispose(); - } - void complete(UnicastSubject w) { queue.offer(new SubjectWork(w, false)); if (enter()) { @@ -640,9 +668,9 @@ void drainLoop() { for (;;) { if (terminated) { upstream.dispose(); - disposeWorker(); q.clear(); ws.clear(); + worker.dispose(); return; } @@ -652,8 +680,9 @@ void drainLoop() { boolean empty = v == null; boolean sw = v instanceof SubjectWork; + boolean isDispose = v == DISPOSE; - if (d && (empty || sw)) { + if (d && (empty || sw || isDispose)) { q.clear(); Throwable e = error; if (e != null) { @@ -665,11 +694,18 @@ void drainLoop() { w.onComplete(); } } - disposeWorker(); ws.clear(); + worker.dispose(); return; } + if (isDispose) { + q.clear(); + ws.clear(); + worker.dispose(); + break; + } + if (empty) { break; } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowWithTimeTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowWithTimeTest.java index 7e055bd299..82ac4bb67f 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowWithTimeTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowWithTimeTest.java @@ -16,7 +16,7 @@ import static org.junit.Assert.*; import java.util.*; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.atomic.*; import org.junit.*; @@ -919,5 +919,245 @@ public void nextWindowMissingBackpressureDrainOnTime() { .assertError(MissingBackpressureException.class) .assertNotComplete(); } + + @Test + public void exactTimeBoundNoInterruptWindowOutputOnComplete() throws Exception { + final AtomicBoolean isInterrupted = new AtomicBoolean(); + + final PublishProcessor pp = PublishProcessor.create(); + + final CountDownLatch doOnNextDone = new CountDownLatch(1); + final CountDownLatch secondWindowProcessing = new CountDownLatch(1); + + pp.window(100, TimeUnit.MILLISECONDS) + .doOnNext(new Consumer>() { + int count; + @Override + public void accept(Flowable v) throws Exception { + System.out.println(Thread.currentThread()); + if (count++ == 1) { + secondWindowProcessing.countDown(); + try { + Thread.sleep(200); + isInterrupted.set(Thread.interrupted()); + } catch (InterruptedException ex) { + isInterrupted.set(true); + } + doOnNextDone.countDown(); + } + } + }) + .test(); + + pp.onNext(1); + + assertTrue(secondWindowProcessing.await(5, TimeUnit.SECONDS)); + + pp.onComplete(); + + assertTrue(doOnNextDone.await(5, TimeUnit.SECONDS)); + + assertFalse("The doOnNext got interrupted!", isInterrupted.get()); + } + + @Test + public void exactTimeBoundNoInterruptWindowOutputOnError() throws Exception { + final AtomicBoolean isInterrupted = new AtomicBoolean(); + + final PublishProcessor pp = PublishProcessor.create(); + + final CountDownLatch doOnNextDone = new CountDownLatch(1); + final CountDownLatch secondWindowProcessing = new CountDownLatch(1); + + pp.window(100, TimeUnit.MILLISECONDS) + .doOnNext(new Consumer>() { + int count; + @Override + public void accept(Flowable v) throws Exception { + System.out.println(Thread.currentThread()); + if (count++ == 1) { + secondWindowProcessing.countDown(); + try { + Thread.sleep(200); + isInterrupted.set(Thread.interrupted()); + } catch (InterruptedException ex) { + isInterrupted.set(true); + } + doOnNextDone.countDown(); + } + } + }) + .test(); + + pp.onNext(1); + + assertTrue(secondWindowProcessing.await(5, TimeUnit.SECONDS)); + + pp.onError(new TestException()); + + assertTrue(doOnNextDone.await(5, TimeUnit.SECONDS)); + + assertFalse("The doOnNext got interrupted!", isInterrupted.get()); + } + + @Test + public void exactTimeAndSizeBoundNoInterruptWindowOutputOnComplete() throws Exception { + final AtomicBoolean isInterrupted = new AtomicBoolean(); + + final PublishProcessor pp = PublishProcessor.create(); + + final CountDownLatch doOnNextDone = new CountDownLatch(1); + final CountDownLatch secondWindowProcessing = new CountDownLatch(1); + + pp.window(100, TimeUnit.MILLISECONDS, 10) + .doOnNext(new Consumer>() { + int count; + @Override + public void accept(Flowable v) throws Exception { + System.out.println(Thread.currentThread()); + if (count++ == 1) { + secondWindowProcessing.countDown(); + try { + Thread.sleep(200); + isInterrupted.set(Thread.interrupted()); + } catch (InterruptedException ex) { + isInterrupted.set(true); + } + doOnNextDone.countDown(); + } + } + }) + .test(); + + pp.onNext(1); + + assertTrue(secondWindowProcessing.await(5, TimeUnit.SECONDS)); + + pp.onComplete(); + + assertTrue(doOnNextDone.await(5, TimeUnit.SECONDS)); + + assertFalse("The doOnNext got interrupted!", isInterrupted.get()); + } + + @Test + public void exactTimeAndSizeBoundNoInterruptWindowOutputOnError() throws Exception { + final AtomicBoolean isInterrupted = new AtomicBoolean(); + + final PublishProcessor pp = PublishProcessor.create(); + + final CountDownLatch doOnNextDone = new CountDownLatch(1); + final CountDownLatch secondWindowProcessing = new CountDownLatch(1); + + pp.window(100, TimeUnit.MILLISECONDS, 10) + .doOnNext(new Consumer>() { + int count; + @Override + public void accept(Flowable v) throws Exception { + System.out.println(Thread.currentThread()); + if (count++ == 1) { + secondWindowProcessing.countDown(); + try { + Thread.sleep(200); + isInterrupted.set(Thread.interrupted()); + } catch (InterruptedException ex) { + isInterrupted.set(true); + } + doOnNextDone.countDown(); + } + } + }) + .test(); + + pp.onNext(1); + + assertTrue(secondWindowProcessing.await(5, TimeUnit.SECONDS)); + + pp.onError(new TestException()); + + assertTrue(doOnNextDone.await(5, TimeUnit.SECONDS)); + + assertFalse("The doOnNext got interrupted!", isInterrupted.get()); + } + + @Test + public void skipTimeAndSizeBoundNoInterruptWindowOutputOnComplete() throws Exception { + final AtomicBoolean isInterrupted = new AtomicBoolean(); + + final PublishProcessor pp = PublishProcessor.create(); + + final CountDownLatch doOnNextDone = new CountDownLatch(1); + final CountDownLatch secondWindowProcessing = new CountDownLatch(1); + + pp.window(90, 100, TimeUnit.MILLISECONDS) + .doOnNext(new Consumer>() { + int count; + @Override + public void accept(Flowable v) throws Exception { + System.out.println(Thread.currentThread()); + if (count++ == 1) { + secondWindowProcessing.countDown(); + try { + Thread.sleep(200); + isInterrupted.set(Thread.interrupted()); + } catch (InterruptedException ex) { + isInterrupted.set(true); + } + doOnNextDone.countDown(); + } + } + }) + .test(); + + pp.onNext(1); + + assertTrue(secondWindowProcessing.await(5, TimeUnit.SECONDS)); + + pp.onComplete(); + + assertTrue(doOnNextDone.await(5, TimeUnit.SECONDS)); + + assertFalse("The doOnNext got interrupted!", isInterrupted.get()); + } + + @Test + public void skipTimeAndSizeBoundNoInterruptWindowOutputOnError() throws Exception { + final AtomicBoolean isInterrupted = new AtomicBoolean(); + + final PublishProcessor pp = PublishProcessor.create(); + + final CountDownLatch doOnNextDone = new CountDownLatch(1); + final CountDownLatch secondWindowProcessing = new CountDownLatch(1); + + pp.window(90, 100, TimeUnit.MILLISECONDS) + .doOnNext(new Consumer>() { + int count; + @Override + public void accept(Flowable v) throws Exception { + System.out.println(Thread.currentThread()); + if (count++ == 1) { + secondWindowProcessing.countDown(); + try { + Thread.sleep(200); + isInterrupted.set(Thread.interrupted()); + } catch (InterruptedException ex) { + isInterrupted.set(true); + } + doOnNextDone.countDown(); + } + } + }) + .test(); + + pp.onNext(1); + + assertTrue(secondWindowProcessing.await(5, TimeUnit.SECONDS)); + + pp.onError(new TestException()); + + assertTrue(doOnNextDone.await(5, TimeUnit.SECONDS)); + + assertFalse("The doOnNext got interrupted!", isInterrupted.get()); + } } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableWindowWithTimeTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableWindowWithTimeTest.java index e56a094a8f..ba9544eb9c 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableWindowWithTimeTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableWindowWithTimeTest.java @@ -16,8 +16,8 @@ import static org.junit.Assert.*; import java.util.*; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; import org.junit.*; @@ -709,4 +709,244 @@ public void countRestartsOnTimeTick() { .assertNoErrors() .assertNotComplete(); } + + @Test + public void exactTimeBoundNoInterruptWindowOutputOnComplete() throws Exception { + final AtomicBoolean isInterrupted = new AtomicBoolean(); + + final PublishSubject ps = PublishSubject.create(); + + final CountDownLatch doOnNextDone = new CountDownLatch(1); + final CountDownLatch secondWindowProcessing = new CountDownLatch(1); + + ps.window(100, TimeUnit.MILLISECONDS) + .doOnNext(new Consumer>() { + int count; + @Override + public void accept(Observable v) throws Exception { + System.out.println(Thread.currentThread()); + if (count++ == 1) { + secondWindowProcessing.countDown(); + try { + Thread.sleep(200); + isInterrupted.set(Thread.interrupted()); + } catch (InterruptedException ex) { + isInterrupted.set(true); + } + doOnNextDone.countDown(); + } + } + }) + .test(); + + ps.onNext(1); + + assertTrue(secondWindowProcessing.await(5, TimeUnit.SECONDS)); + + ps.onComplete(); + + assertTrue(doOnNextDone.await(5, TimeUnit.SECONDS)); + + assertFalse("The doOnNext got interrupted!", isInterrupted.get()); + } + + @Test + public void exactTimeBoundNoInterruptWindowOutputOnError() throws Exception { + final AtomicBoolean isInterrupted = new AtomicBoolean(); + + final PublishSubject ps = PublishSubject.create(); + + final CountDownLatch doOnNextDone = new CountDownLatch(1); + final CountDownLatch secondWindowProcessing = new CountDownLatch(1); + + ps.window(100, TimeUnit.MILLISECONDS) + .doOnNext(new Consumer>() { + int count; + @Override + public void accept(Observable v) throws Exception { + System.out.println(Thread.currentThread()); + if (count++ == 1) { + secondWindowProcessing.countDown(); + try { + Thread.sleep(200); + isInterrupted.set(Thread.interrupted()); + } catch (InterruptedException ex) { + isInterrupted.set(true); + } + doOnNextDone.countDown(); + } + } + }) + .test(); + + ps.onNext(1); + + assertTrue(secondWindowProcessing.await(5, TimeUnit.SECONDS)); + + ps.onError(new TestException()); + + assertTrue(doOnNextDone.await(5, TimeUnit.SECONDS)); + + assertFalse("The doOnNext got interrupted!", isInterrupted.get()); + } + + @Test + public void exactTimeAndSizeBoundNoInterruptWindowOutputOnComplete() throws Exception { + final AtomicBoolean isInterrupted = new AtomicBoolean(); + + final PublishSubject ps = PublishSubject.create(); + + final CountDownLatch doOnNextDone = new CountDownLatch(1); + final CountDownLatch secondWindowProcessing = new CountDownLatch(1); + + ps.window(100, TimeUnit.MILLISECONDS, 10) + .doOnNext(new Consumer>() { + int count; + @Override + public void accept(Observable v) throws Exception { + System.out.println(Thread.currentThread()); + if (count++ == 1) { + secondWindowProcessing.countDown(); + try { + Thread.sleep(200); + isInterrupted.set(Thread.interrupted()); + } catch (InterruptedException ex) { + isInterrupted.set(true); + } + doOnNextDone.countDown(); + } + } + }) + .test(); + + ps.onNext(1); + + assertTrue(secondWindowProcessing.await(5, TimeUnit.SECONDS)); + + ps.onComplete(); + + assertTrue(doOnNextDone.await(5, TimeUnit.SECONDS)); + + assertFalse("The doOnNext got interrupted!", isInterrupted.get()); + } + + @Test + public void exactTimeAndSizeBoundNoInterruptWindowOutputOnError() throws Exception { + final AtomicBoolean isInterrupted = new AtomicBoolean(); + + final PublishSubject ps = PublishSubject.create(); + + final CountDownLatch doOnNextDone = new CountDownLatch(1); + final CountDownLatch secondWindowProcessing = new CountDownLatch(1); + + ps.window(100, TimeUnit.MILLISECONDS, 10) + .doOnNext(new Consumer>() { + int count; + @Override + public void accept(Observable v) throws Exception { + System.out.println(Thread.currentThread()); + if (count++ == 1) { + secondWindowProcessing.countDown(); + try { + Thread.sleep(200); + isInterrupted.set(Thread.interrupted()); + } catch (InterruptedException ex) { + isInterrupted.set(true); + } + doOnNextDone.countDown(); + } + } + }) + .test(); + + ps.onNext(1); + + assertTrue(secondWindowProcessing.await(5, TimeUnit.SECONDS)); + + ps.onError(new TestException()); + + assertTrue(doOnNextDone.await(5, TimeUnit.SECONDS)); + + assertFalse("The doOnNext got interrupted!", isInterrupted.get()); + } + + @Test + public void skipTimeAndSizeBoundNoInterruptWindowOutputOnComplete() throws Exception { + final AtomicBoolean isInterrupted = new AtomicBoolean(); + + final PublishSubject ps = PublishSubject.create(); + + final CountDownLatch doOnNextDone = new CountDownLatch(1); + final CountDownLatch secondWindowProcessing = new CountDownLatch(1); + + ps.window(90, 100, TimeUnit.MILLISECONDS) + .doOnNext(new Consumer>() { + int count; + @Override + public void accept(Observable v) throws Exception { + System.out.println(Thread.currentThread()); + if (count++ == 1) { + secondWindowProcessing.countDown(); + try { + Thread.sleep(200); + isInterrupted.set(Thread.interrupted()); + } catch (InterruptedException ex) { + isInterrupted.set(true); + } + doOnNextDone.countDown(); + } + } + }) + .test(); + + ps.onNext(1); + + assertTrue(secondWindowProcessing.await(5, TimeUnit.SECONDS)); + + ps.onComplete(); + + assertTrue(doOnNextDone.await(5, TimeUnit.SECONDS)); + + assertFalse("The doOnNext got interrupted!", isInterrupted.get()); + } + + @Test + public void skipTimeAndSizeBoundNoInterruptWindowOutputOnError() throws Exception { + final AtomicBoolean isInterrupted = new AtomicBoolean(); + + final PublishSubject ps = PublishSubject.create(); + + final CountDownLatch doOnNextDone = new CountDownLatch(1); + final CountDownLatch secondWindowProcessing = new CountDownLatch(1); + + ps.window(90, 100, TimeUnit.MILLISECONDS) + .doOnNext(new Consumer>() { + int count; + @Override + public void accept(Observable v) throws Exception { + System.out.println(Thread.currentThread()); + if (count++ == 1) { + secondWindowProcessing.countDown(); + try { + Thread.sleep(200); + isInterrupted.set(Thread.interrupted()); + } catch (InterruptedException ex) { + isInterrupted.set(true); + } + doOnNextDone.countDown(); + } + } + }) + .test(); + + ps.onNext(1); + + assertTrue(secondWindowProcessing.await(5, TimeUnit.SECONDS)); + + ps.onError(new TestException()); + + assertTrue(doOnNextDone.await(5, TimeUnit.SECONDS)); + + assertFalse("The doOnNext got interrupted!", isInterrupted.get()); + } } From 040b15c488e3e6c8228ce043bfd3771bc53878b2 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Thu, 17 Oct 2019 15:41:59 +0200 Subject: [PATCH 2/2] I don't think the DISPOSE token is necessary after all --- .../flowable/FlowableWindowTimed.java | 60 +------------------ .../observable/ObservableWindowTimed.java | 59 +----------------- 2 files changed, 6 insertions(+), 113 deletions(-) diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowTimed.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowTimed.java index 15f1687d4c..397af690a2 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowTimed.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowTimed.java @@ -90,8 +90,6 @@ static final class WindowExactUnboundedSubscriber static final Object NEXT = new Object(); - static final Object DISPOSE = new Object(); - volatile boolean terminated; WindowExactUnboundedSubscriber(Subscriber> actual, long timespan, TimeUnit unit, @@ -162,10 +160,6 @@ public void onError(Throwable t) { } downstream.onError(t); - queue.offer(DISPOSE); - if (enter()) { - drainLoop(); - } } @Override @@ -176,11 +170,6 @@ public void onComplete() { } downstream.onComplete(); - - queue.offer(DISPOSE); - if (enter()) { - drainLoop(); - } } @Override @@ -220,7 +209,7 @@ void drainLoop() { Object o = q.poll(); - if (d && (o == null || o == NEXT || o == DISPOSE)) { + if (d && (o == null || o == NEXT)) { window = null; q.clear(); Throwable err = error; @@ -233,13 +222,6 @@ void drainLoop() { return; } - if (o == DISPOSE) { - window = null; - q.clear(); - timer.dispose(); - break; - } - if (o == null) { break; } @@ -304,8 +286,6 @@ static final class WindowExactBoundedSubscriber final SequentialDisposable timer = new SequentialDisposable(); - static final Object DISPOSE = new Object(); - WindowExactBoundedSubscriber( Subscriber> actual, long timespan, TimeUnit unit, Scheduler scheduler, @@ -435,10 +415,6 @@ public void onError(Throwable t) { } downstream.onError(t); - queue.offer(DISPOSE); - if (enter()) { - drainLoop(); - } } @Override @@ -449,10 +425,6 @@ public void onComplete() { } downstream.onComplete(); - queue.offer(DISPOSE); - if (enter()) { - drainLoop(); - } } @Override @@ -495,9 +467,8 @@ void drainLoop() { boolean empty = o == null; boolean isHolder = o instanceof ConsumerIndexHolder; - boolean isDispose = o == DISPOSE; - if (d && (empty || isHolder || isDispose)) { + if (d && (empty || isHolder)) { window = null; q.clear(); Throwable err = error; @@ -510,13 +481,6 @@ void drainLoop() { return; } - if (isDispose) { - window = null; - q.clear(); - disposeTimer(); - break; - } - if (empty) { break; } @@ -633,8 +597,6 @@ static final class WindowSkipSubscriber volatile boolean terminated; - static final Object DISPOSE = new Object(); - WindowSkipSubscriber(Subscriber> actual, long timespan, long timeskip, TimeUnit unit, Worker worker, int bufferSize) { @@ -708,10 +670,6 @@ public void onError(Throwable t) { } downstream.onError(t); - queue.offer(DISPOSE); - if (enter()) { - drainLoop(); - } } @Override @@ -722,10 +680,6 @@ public void onComplete() { } downstream.onComplete(); - queue.offer(DISPOSE); - if (enter()) { - drainLoop(); - } } @Override @@ -770,9 +724,8 @@ void drainLoop() { boolean empty = v == null; boolean sw = v instanceof SubjectWork; - boolean isDispose = v == DISPOSE; - if (d && (empty || sw || isDispose)) { + if (d && (empty || sw)) { q.clear(); Throwable e = error; if (e != null) { @@ -789,13 +742,6 @@ void drainLoop() { return; } - if (isDispose) { - q.clear(); - ws.clear(); - worker.dispose(); - break; - } - if (empty) { break; } diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableWindowTimed.java b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableWindowTimed.java index 3a78753a37..cdd64cd6ae 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableWindowTimed.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableWindowTimed.java @@ -88,8 +88,6 @@ static final class WindowExactUnboundedObserver static final Object NEXT = new Object(); - static final Object DISPOSE = new Object(); - volatile boolean terminated; WindowExactUnboundedObserver(Observer> actual, long timespan, TimeUnit unit, @@ -148,10 +146,6 @@ public void onError(Throwable t) { } downstream.onError(t); - queue.offer(DISPOSE); - if (enter()) { - drainLoop(); - } } @Override @@ -162,10 +156,6 @@ public void onComplete() { } downstream.onComplete(); - queue.offer(DISPOSE); - if (enter()) { - drainLoop(); - } } @Override @@ -205,7 +195,7 @@ void drainLoop() { Object o = q.poll(); - if (d && (o == null || o == NEXT || o == DISPOSE)) { + if (d && (o == null || o == NEXT)) { window = null; q.clear(); Throwable err = error; @@ -218,13 +208,6 @@ void drainLoop() { return; } - if (o == DISPOSE) { - window = null; - q.clear(); - timer.dispose(); - break; - } - if (o == null) { break; } @@ -277,8 +260,6 @@ static final class WindowExactBoundedObserver final SequentialDisposable timer = new SequentialDisposable(); - static final Object DISPOSE = new Object(); - WindowExactBoundedObserver( Observer> actual, long timespan, TimeUnit unit, Scheduler scheduler, @@ -381,10 +362,6 @@ public void onError(Throwable t) { } downstream.onError(t); - queue.offer(DISPOSE); - if (enter()) { - drainLoop(); - } } @Override @@ -395,10 +372,6 @@ public void onComplete() { } downstream.onComplete(); - queue.offer(DISPOSE); - if (enter()) { - drainLoop(); - } } @Override @@ -441,9 +414,8 @@ void drainLoop() { boolean empty = o == null; boolean isHolder = o instanceof ConsumerIndexHolder; - boolean isDispose = o == DISPOSE; - if (d && (empty || isHolder || isDispose)) { + if (d && (empty || isHolder)) { window = null; q.clear(); Throwable err = error; @@ -456,13 +428,6 @@ void drainLoop() { return; } - if (isDispose) { - window = null; - q.clear(); - disposeTimer(); - break; - } - if (empty) { break; } @@ -555,8 +520,6 @@ static final class WindowSkipObserver volatile boolean terminated; - static final Object DISPOSE = new Object(); - WindowSkipObserver(Observer> actual, long timespan, long timeskip, TimeUnit unit, Worker worker, int bufferSize) { @@ -618,10 +581,6 @@ public void onError(Throwable t) { } downstream.onError(t); - queue.offer(DISPOSE); - if (enter()) { - drainLoop(); - } } @Override @@ -632,10 +591,6 @@ public void onComplete() { } downstream.onComplete(); - queue.offer(DISPOSE); - if (enter()) { - drainLoop(); - } } @Override @@ -680,9 +635,8 @@ void drainLoop() { boolean empty = v == null; boolean sw = v instanceof SubjectWork; - boolean isDispose = v == DISPOSE; - if (d && (empty || sw || isDispose)) { + if (d && (empty || sw)) { q.clear(); Throwable e = error; if (e != null) { @@ -699,13 +653,6 @@ void drainLoop() { return; } - if (isDispose) { - q.clear(); - ws.clear(); - worker.dispose(); - break; - } - if (empty) { break; }