Skip to content

3.x: constrain upstream requests from take, remove limit operator #6650

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 22 additions & 51 deletions src/main/java/io/reactivex/rxjava3/core/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -11212,52 +11212,6 @@ public final <R> Flowable<R> lift(FlowableOperator<? extends R, ? super T> lifte
return RxJavaPlugins.onAssembly(new FlowableLift<R, T>(this, lifter));
}

/**
* Limits both the number of upstream items (after which the sequence completes)
* and the total downstream request amount requested from the upstream to
* possibly prevent the creation of excess items by the upstream.
* <p>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Javadoc of take should contain these extra information.

* The operator requests at most the given {@code count} of items from upstream even
* if the downstream requests more than that. For example, given a {@code limit(5)},
* if the downstream requests 1, a request of 1 is submitted to the upstream
* and the operator remembers that only 4 items can be requested now on. A request
* of 5 at this point will request 4 from the upstream and any subsequent requests will
* be ignored.
* <p>
* Note that requests are negotiated on an operator boundary and {@code limit}'s amount
* may not be preserved further upstream. For example,
* {@code source.observeOn(Schedulers.computation()).limit(5)} will still request the
* default (128) elements from the given {@code source}.
* <p>
* The main use of this operator is with sources that are async boundaries that
* don't interfere with request amounts, such as certain {@code Flowable}-based
* network endpoints that relay downstream request amounts unchanged and are, therefore,
* prone to trigger excessive item creation/transmission over the network.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator requests a total of the given {@code count} items from the upstream.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code limit} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* <p>History: 2.1.6 - experimental
* @param count the maximum number of items and the total request amount, non-negative.
* Zero will immediately cancel the upstream on subscription and complete
* the downstream.
* @return the new Flowable instance
* @see #take(long)
* @see #rebatchRequests(int)
* @since 2.2
*/
@BackpressureSupport(BackpressureKind.SPECIAL)
@SchedulerSupport(SchedulerSupport.NONE)
@CheckReturnValue
public final Flowable<T> limit(long count) {
if (count < 0) {
throw new IllegalArgumentException("count >= 0 required but it was " + count);
}
return RxJavaPlugins.onAssembly(new FlowableLimit<T>(this, count));
}

/**
* Returns a Flowable that applies a specified function to each item emitted by the source Publisher and
* emits the results of these function applications.
Expand Down Expand Up @@ -15372,6 +15326,7 @@ public final <R> Flowable<R> switchMapSingleDelayError(@NonNull Function<? super
return RxJavaPlugins.onAssembly(new FlowableSwitchMapSingle<T, R>(this, mapper, true));
}


/**
* Returns a Flowable that emits only the first {@code count} items emitted by the source Publisher. If the source emits fewer than
* {@code count} items then all of its items are emitted.
Expand All @@ -15381,23 +15336,39 @@ public final <R> Flowable<R> switchMapSingleDelayError(@NonNull Function<? super
* This method returns a Publisher that will invoke a subscribing {@link Subscriber}'s
* {@link Subscriber#onNext onNext} function a maximum of {@code count} times before invoking
* {@link Subscriber#onComplete onComplete}.
* <p>
* Limits both the number of upstream items (after which the sequence completes)
* and the total downstream request amount requested from the upstream to
* possibly prevent the creation of excess items by the upstream.
* <p>
* The operator requests at most the given {@code count} of items from upstream even
* if the downstream requests more than that. For example, given a {@code limit(5)},
* if the downstream requests 1, a request of 1 is submitted to the upstream
* and the operator remembers that only 4 items can be requested now on. A request
* of 5 at this point will request 4 from the upstream and any subsequent requests will
* be ignored.
* <p>
* Note that requests are negotiated on an operator boundary and {@code limit}'s amount
* may not be preserved further upstream. For example,
* {@code source.observeOn(Schedulers.computation()).limit(5)} will still request the
* default (128) elements from the given {@code source}.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator doesn't interfere with backpressure which is determined by the source {@code Publisher}'s backpressure
* behavior in case the first request is smaller than the {@code count}. Otherwise, the source {@code Publisher}
* is consumed in an unbounded manner (i.e., without applying backpressure to it).</dd>
* <dd>The source {@code Publisher} is consumed in a bounded manner.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>This version of {@code take} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param count
* the maximum number of items to emit
* the maximum number of items and the total request amount, non-negative.
* Zero will immediately cancel the upstream on subscription and complete
* the downstream.
* @return a Flowable that emits only the first {@code count} items emitted by the source Publisher, or
* all of the items from the source Publisher if that Publisher emits fewer than {@code count} items
* @see <a href="http://reactivex.io/documentation/operators/take.html">ReactiveX operators documentation: Take</a>
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.SPECIAL) // may trigger UNBOUNDED_IN
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
public final Flowable<T> take(long count) {
if (count < 0) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

package io.reactivex.rxjava3.internal.operators.flowable;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import org.reactivestreams.*;

Expand All @@ -22,68 +22,67 @@
import io.reactivex.rxjava3.plugins.RxJavaPlugins;

public final class FlowableTake<T> extends AbstractFlowableWithUpstream<T, T> {
final long limit;
public FlowableTake(Flowable<T> source, long limit) {

final long n;

public FlowableTake(Flowable<T> source, long n) {
super(source);
this.limit = limit;
this.n = n;
}

@Override
protected void subscribeActual(Subscriber<? super T> s) {
source.subscribe(new TakeSubscriber<T>(s, limit));
source.subscribe(new TakeSubscriber<T>(s, n));
}

static final class TakeSubscriber<T> extends AtomicBoolean implements FlowableSubscriber<T>, Subscription {
static final class TakeSubscriber<T>
extends AtomicLong
implements FlowableSubscriber<T>, Subscription {

private static final long serialVersionUID = -5636543848937116287L;
private static final long serialVersionUID = 2288246011222124525L;

final Subscriber<? super T> downstream;

final long limit;

boolean done;
long remaining;

Subscription upstream;

long remaining;

TakeSubscriber(Subscriber<? super T> actual, long limit) {
TakeSubscriber(Subscriber<? super T> actual, long remaining) {
this.downstream = actual;
this.limit = limit;
this.remaining = limit;
this.remaining = remaining;
lazySet(remaining);
}

@Override
public void onSubscribe(Subscription s) {
if (SubscriptionHelper.validate(this.upstream, s)) {
upstream = s;
if (limit == 0L) {
if (remaining == 0L) {
s.cancel();
done = true;
EmptySubscription.complete(downstream);
} else {
this.upstream = s;
downstream.onSubscribe(this);
}
}
}

@Override
public void onNext(T t) {
if (!done && remaining-- > 0) {
boolean stop = remaining == 0;
long r = remaining;
if (r > 0L) {
remaining = --r;
downstream.onNext(t);
if (stop) {
if (r == 0L) {
upstream.cancel();
onComplete();
downstream.onComplete();
}
}
}

@Override
public void onError(Throwable t) {
if (!done) {
done = true;
upstream.cancel();
if (remaining > 0L) {
remaining = 0L;
downstream.onError(t);
} else {
RxJavaPlugins.onError(t);
Expand All @@ -92,29 +91,39 @@ public void onError(Throwable t) {

@Override
public void onComplete() {
if (!done) {
done = true;
if (remaining > 0L) {
remaining = 0L;
downstream.onComplete();
}
}

@Override
public void request(long n) {
if (!SubscriptionHelper.validate(n)) {
return;
}
if (!get() && compareAndSet(false, true)) {
if (n >= limit) {
upstream.request(Long.MAX_VALUE);
return;
if (SubscriptionHelper.validate(n)) {
for (;;) {
long r = get();
if (r == 0L) {
break;
}
long toRequest;
if (r <= n) {
toRequest = r;
} else {
toRequest = n;
}
long u = r - toRequest;
if (compareAndSet(r, u)) {
upstream.request(toRequest);
break;
}
}
}
upstream.request(n);
}

@Override
public void cancel() {
upstream.cancel();
}

}
}
Loading