diff --git a/src/main/java/io/reactivex/Flowable.java b/src/main/java/io/reactivex/Flowable.java index 574eb37a50..9f45a1a610 100644 --- a/src/main/java/io/reactivex/Flowable.java +++ b/src/main/java/io/reactivex/Flowable.java @@ -9494,8 +9494,7 @@ public final Flowable doOnTerminate(final Action onTerminate) { * *
*
Backpressure:
- *
The operator honors backpressure from downstream and consumes the source {@code Publisher} in an unbounded manner - * (i.e., no backpressure applied to it).
+ *
The operator honors backpressure from downstream and consumes the source {@code Publisher} in a bounded manner.
*
Scheduler:
*
{@code elementAt} does not operate by default on a particular {@link Scheduler}.
*
@@ -9507,7 +9506,7 @@ public final Flowable doOnTerminate(final Action onTerminate) { * @see ReactiveX operators documentation: ElementAt */ @CheckReturnValue - @BackpressureSupport(BackpressureKind.UNBOUNDED_IN) + @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport(SchedulerSupport.NONE) public final Maybe elementAt(long index) { if (index < 0) { @@ -9523,8 +9522,7 @@ public final Maybe elementAt(long index) { * *
*
Backpressure:
- *
The operator honors backpressure from downstream and consumes the source {@code Publisher} in an unbounded manner - * (i.e., no backpressure applied to it).
+ *
The operator honors backpressure from downstream and consumes the source {@code Publisher} in a bounded manner.
*
Scheduler:
*
{@code elementAt} does not operate by default on a particular {@link Scheduler}.
*
@@ -9541,7 +9539,7 @@ public final Maybe elementAt(long index) { */ @CheckReturnValue @NonNull - @BackpressureSupport(BackpressureKind.UNBOUNDED_IN) + @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport(SchedulerSupport.NONE) public final Single elementAt(long index, T defaultItem) { if (index < 0) { @@ -9558,8 +9556,7 @@ public final Single elementAt(long index, T defaultItem) { * *
*
Backpressure:
- *
The operator honors backpressure from downstream and consumes the source {@code Publisher} in an unbounded manner - * (i.e., no backpressure applied to it).
+ *
The operator honors backpressure from downstream and consumes the source {@code Publisher} in a bounded manner.
*
Scheduler:
*
{@code elementAtOrError} does not operate by default on a particular {@link Scheduler}.
*
@@ -9573,7 +9570,7 @@ public final Single elementAt(long index, T defaultItem) { * @see ReactiveX operators documentation: ElementAt */ @CheckReturnValue - @BackpressureSupport(BackpressureKind.UNBOUNDED_IN) + @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport(SchedulerSupport.NONE) public final Single elementAtOrError(long index) { if (index < 0) { @@ -9617,8 +9614,7 @@ public final Flowable filter(Predicate predicate) { * *
*
Backpressure:
- *
The operator honors backpressure from downstream and consumes the source {@code Publisher} in an - * unbounded manner (i.e., without applying backpressure).
+ *
The operator honors backpressure from downstream and consumes the source {@code Publisher} in a bounded manner.
*
Scheduler:
*
{@code firstElement} does not operate by default on a particular {@link Scheduler}.
*
@@ -9627,7 +9623,7 @@ public final Flowable filter(Predicate predicate) { * @see ReactiveX operators documentation: First */ @CheckReturnValue - @BackpressureSupport(BackpressureKind.SPECIAL) // take may trigger UNBOUNDED_IN + @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport(SchedulerSupport.NONE) public final Maybe firstElement() { return elementAt(0); @@ -9640,8 +9636,7 @@ public final Maybe firstElement() { * *
*
Backpressure:
- *
The operator honors backpressure from downstream and consumes the source {@code Publisher} in an - * unbounded manner (i.e., without applying backpressure).
+ *
The operator honors backpressure from downstream and consumes the source {@code Publisher} in a bounded manner.
*
Scheduler:
*
{@code first} does not operate by default on a particular {@link Scheduler}.
*
@@ -9653,7 +9648,7 @@ public final Maybe firstElement() { * @see ReactiveX operators documentation: First */ @CheckReturnValue - @BackpressureSupport(BackpressureKind.SPECIAL) // take may trigger UNBOUNDED_IN + @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport(SchedulerSupport.NONE) public final Single first(T defaultItem) { return elementAt(0, defaultItem); @@ -9666,8 +9661,7 @@ public final Single first(T defaultItem) { * *
*
Backpressure:
- *
The operator honors backpressure from downstream and consumes the source {@code Publisher} in an - * unbounded manner (i.e., without applying backpressure).
+ *
The operator honors backpressure from downstream and consumes the source {@code Publisher} in a bounded manner.
*
Scheduler:
*
{@code firstOrError} does not operate by default on a particular {@link Scheduler}.
*
@@ -9676,7 +9670,7 @@ public final Single first(T defaultItem) { * @see ReactiveX operators documentation: First */ @CheckReturnValue - @BackpressureSupport(BackpressureKind.SPECIAL) // take may trigger UNBOUNDED_IN + @BackpressureSupport(BackpressureKind.FULL) // take may trigger UNBOUNDED_IN @SchedulerSupport(SchedulerSupport.NONE) public final Single firstOrError() { return elementAtOrError(0); diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableElementAtMaybe.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableElementAtMaybe.java index 4d411990ac..3ef874d5b5 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableElementAtMaybe.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableElementAtMaybe.java @@ -63,7 +63,7 @@ public void onSubscribe(Subscription s) { if (SubscriptionHelper.validate(this.upstream, s)) { this.upstream = s; downstream.onSubscribe(this); - s.request(Long.MAX_VALUE); + s.request(index + 1); } } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableElementAtSingle.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableElementAtSingle.java index 7cd542d497..e488724dac 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableElementAtSingle.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableElementAtSingle.java @@ -70,7 +70,7 @@ public void onSubscribe(Subscription s) { if (SubscriptionHelper.validate(this.upstream, s)) { this.upstream = s; downstream.onSubscribe(this); - s.request(Long.MAX_VALUE); + s.request(index + 1); } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableElementAtTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableElementAtTest.java index 27b967d15a..06530b329b 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableElementAtTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableElementAtTest.java @@ -26,6 +26,7 @@ import io.reactivex.disposables.Disposables; import io.reactivex.exceptions.TestException; import io.reactivex.functions.Function; +import io.reactivex.functions.LongConsumer; import io.reactivex.internal.subscriptions.BooleanSubscription; import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.processors.PublishProcessor; @@ -69,6 +70,38 @@ public void elementAt() { assertEquals(2, Flowable.fromArray(1, 2).elementAt(1).blockingGet() .intValue()); } + + @Test + public void elementAtConstrainsUpstreamRequests() { + final List requests = new ArrayList(); + Flowable.fromArray(1, 2, 3, 4) + .doOnRequest(new LongConsumer() { + @Override + public void accept(long n) throws Throwable { + requests.add(n); + } + }) + .elementAt(2) + .blockingGet() + .intValue(); + assertEquals(Arrays.asList(3L), requests); + } + + @Test + public void elementAtWithDefaultConstrainsUpstreamRequests() { + final List requests = new ArrayList(); + Flowable.fromArray(1, 2, 3, 4) + .doOnRequest(new LongConsumer() { + @Override + public void accept(long n) throws Throwable { + requests.add(n); + } + }) + .elementAt(2, 100) + .blockingGet() + .intValue(); + assertEquals(Arrays.asList(3L), requests); + } @Test(expected = IndexOutOfBoundsException.class) public void elementAtWithMinusIndex() {