diff --git a/src/main/java/io/reactivex/rxjava3/annotations/BackpressureKind.java b/src/main/java/io/reactivex/rxjava3/annotations/BackpressureKind.java
index aa20e5f78e..fd53c196b3 100644
--- a/src/main/java/io/reactivex/rxjava3/annotations/BackpressureKind.java
+++ b/src/main/java/io/reactivex/rxjava3/annotations/BackpressureKind.java
@@ -32,13 +32,13 @@ public enum BackpressureKind {
*/
SPECIAL,
/**
- * The operator requests Long.MAX_VALUE from upstream but respects the backpressure
+ * The operator requests {@link Long#MAX_VALUE} from upstream but respects the backpressure
* of the downstream.
*/
UNBOUNDED_IN,
/**
- * The operator will emit a MissingBackpressureException if the downstream didn't request
- * enough or in time.
+ * The operator will emit a {@link io.reactivex.rxjava3.exceptions.MissingBackpressureException MissingBackpressureException}
+ * if the downstream didn't request enough or in time.
*/
ERROR,
/**
diff --git a/src/main/java/io/reactivex/rxjava3/core/BackpressureOverflowStrategy.java b/src/main/java/io/reactivex/rxjava3/core/BackpressureOverflowStrategy.java
index 5ee8ecca1d..750a1308f2 100644
--- a/src/main/java/io/reactivex/rxjava3/core/BackpressureOverflowStrategy.java
+++ b/src/main/java/io/reactivex/rxjava3/core/BackpressureOverflowStrategy.java
@@ -19,7 +19,10 @@
* Options to deal with buffer overflow when using onBackpressureBuffer.
*/
public enum BackpressureOverflowStrategy {
- /** Signal a MissingBackpressureException and terminate the sequence. */
+ /**
+ * Signal a {@link io.reactivex.rxjava3.exceptions.MissingBackpressureException MissingBackpressureException}
+ * and terminate the sequence.
+ */
ERROR,
/** Drop the oldest value from the buffer. */
DROP_OLDEST,
diff --git a/src/main/java/io/reactivex/rxjava3/core/BackpressureStrategy.java b/src/main/java/io/reactivex/rxjava3/core/BackpressureStrategy.java
index c0831d7759..4a26072650 100644
--- a/src/main/java/io/reactivex/rxjava3/core/BackpressureStrategy.java
+++ b/src/main/java/io/reactivex/rxjava3/core/BackpressureStrategy.java
@@ -18,25 +18,26 @@
*/
public enum BackpressureStrategy {
/**
- * OnNext events are written without any buffering or dropping.
+ * The {@code onNext} events are written without any buffering or dropping.
* Downstream has to deal with any overflow.
*
Useful when one applies one of the custom-parameter onBackpressureXXX operators.
*/
MISSING,
/**
- * Signals a MissingBackpressureException in case the downstream can't keep up.
+ * Signals a {@link io.reactivex.rxjava3.exceptions.MissingBackpressureException MissingBackpressureException}
+ * in case the downstream can't keep up.
*/
ERROR,
/**
- * Buffers all onNext values until the downstream consumes it.
+ * Buffers all {@code onNext} values until the downstream consumes it.
*/
BUFFER,
/**
- * Drops the most recent onNext value if the downstream can't keep up.
+ * Drops the most recent {@code onNext} value if the downstream can't keep up.
*/
DROP,
/**
- * Keeps only the latest onNext value, overwriting any previous value if the
+ * Keeps only the latest {@code onNext} value, overwriting any previous value if the
* downstream can't keep up.
*/
LATEST
diff --git a/src/main/java/io/reactivex/rxjava3/core/CompletableEmitter.java b/src/main/java/io/reactivex/rxjava3/core/CompletableEmitter.java
index 51c9780f73..5ebb4e1877 100644
--- a/src/main/java/io/reactivex/rxjava3/core/CompletableEmitter.java
+++ b/src/main/java/io/reactivex/rxjava3/core/CompletableEmitter.java
@@ -82,12 +82,12 @@ public interface CompletableEmitter {
boolean isDisposed();
/**
- * Attempts to emit the specified {@code Throwable} error if the downstream
+ * Attempts to emit the specified {@link Throwable} error if the downstream
* hasn't cancelled the sequence or is otherwise terminated, returning false
* if the emission is not allowed to happen due to lifecycle restrictions.
*
- * Unlike {@link #onError(Throwable)}, the {@code RxJavaPlugins.onError} is not called
- * if the error could not be delivered.
+ * Unlike {@link #onError(Throwable)}, the {@link io.reactivex.rxjava3.plugins.RxJavaPlugins#onError(Throwable) RxjavaPlugins.onError}
+ * is not called if the error could not be delivered.
*
History: 2.1.1 - experimental
* @param t the throwable error to signal if possible
* @return true if successful, false if the downstream is not able to accept further
diff --git a/src/main/java/io/reactivex/rxjava3/core/CompletableObserver.java b/src/main/java/io/reactivex/rxjava3/core/CompletableObserver.java
index af0a017d66..7c01ebfee1 100644
--- a/src/main/java/io/reactivex/rxjava3/core/CompletableObserver.java
+++ b/src/main/java/io/reactivex/rxjava3/core/CompletableObserver.java
@@ -49,9 +49,9 @@
*/
public interface CompletableObserver {
/**
- * Called once by the Completable to set a Disposable on this instance which
+ * Called once by the {@link Completable} to set a {@link Disposable} on this instance which
* then can be used to cancel the subscription at any time.
- * @param d the Disposable instance to call dispose on for cancellation, not null
+ * @param d the {@code Disposable} instance to call dispose on for cancellation, not null
*/
void onSubscribe(@NonNull Disposable d);
@@ -62,7 +62,7 @@ public interface CompletableObserver {
/**
* Called once if the deferred computation 'throws' an exception.
- * @param e the exception, not null.
+ * @param e the exception, not {@code null}.
*/
void onError(@NonNull Throwable e);
}
diff --git a/src/main/java/io/reactivex/rxjava3/core/CompletableOnSubscribe.java b/src/main/java/io/reactivex/rxjava3/core/CompletableOnSubscribe.java
index 6aea056be7..70d79e62b6 100644
--- a/src/main/java/io/reactivex/rxjava3/core/CompletableOnSubscribe.java
+++ b/src/main/java/io/reactivex/rxjava3/core/CompletableOnSubscribe.java
@@ -23,8 +23,8 @@
public interface CompletableOnSubscribe {
/**
- * Called for each CompletableObserver that subscribes.
- * @param emitter the safe emitter instance, never null
+ * Called for each {@link CompletableObserver} that subscribes.
+ * @param emitter the safe emitter instance, never {@code null}
* @throws Throwable on error
*/
void subscribe(@NonNull CompletableEmitter emitter) throws Throwable;
diff --git a/src/main/java/io/reactivex/rxjava3/core/CompletableOperator.java b/src/main/java/io/reactivex/rxjava3/core/CompletableOperator.java
index 763cd670c6..e9b1df83cb 100644
--- a/src/main/java/io/reactivex/rxjava3/core/CompletableOperator.java
+++ b/src/main/java/io/reactivex/rxjava3/core/CompletableOperator.java
@@ -21,11 +21,11 @@
@FunctionalInterface
public interface CompletableOperator {
/**
- * Applies a function to the child CompletableObserver and returns a new parent CompletableObserver.
- * @param observer the child CompletableObservable instance
- * @return the parent CompletableObserver instance
- * @throws Exception on failure
+ * Applies a function to the child {@link CompletableObserver} and returns a new parent {@code CompletableObserver}.
+ * @param observer the child {@code CompletableObserver} instance
+ * @return the parent {@code CompletableObserver} instance
+ * @throws Throwable on failure
*/
@NonNull
- CompletableObserver apply(@NonNull CompletableObserver observer) throws Exception;
+ CompletableObserver apply(@NonNull CompletableObserver observer) throws Throwable;
}
diff --git a/src/main/java/io/reactivex/rxjava3/core/CompletableSource.java b/src/main/java/io/reactivex/rxjava3/core/CompletableSource.java
index 9b754a698a..58edf9471c 100644
--- a/src/main/java/io/reactivex/rxjava3/core/CompletableSource.java
+++ b/src/main/java/io/reactivex/rxjava3/core/CompletableSource.java
@@ -24,9 +24,9 @@
public interface CompletableSource {
/**
- * Subscribes the given CompletableObserver to this CompletableSource instance.
- * @param co the CompletableObserver, not null
- * @throws NullPointerException if {@code co} is null
+ * Subscribes the given {@link CompletableObserver} to this {@code CompletableSource} instance.
+ * @param observer the {@code CompletableObserver}, not {@code null}
+ * @throws NullPointerException if {@code observer} is {@code null}
*/
- void subscribe(@NonNull CompletableObserver co);
+ void subscribe(@NonNull CompletableObserver observer);
}
diff --git a/src/main/java/io/reactivex/rxjava3/core/CompletableTransformer.java b/src/main/java/io/reactivex/rxjava3/core/CompletableTransformer.java
index 91e1a361ba..98a9e2aa19 100644
--- a/src/main/java/io/reactivex/rxjava3/core/CompletableTransformer.java
+++ b/src/main/java/io/reactivex/rxjava3/core/CompletableTransformer.java
@@ -16,15 +16,15 @@
import io.reactivex.rxjava3.annotations.NonNull;
/**
- * Convenience interface and callback used by the compose operator to turn a Completable into another
- * Completable fluently.
+ * Convenience interface and callback used by the compose operator to turn a {@link Completable} into another
+ * {@code Completable} fluently.
*/
@FunctionalInterface
public interface CompletableTransformer {
/**
- * Applies a function to the upstream Completable and returns a CompletableSource.
- * @param upstream the upstream Completable instance
- * @return the transformed CompletableSource instance
+ * Applies a function to the upstream {@link Completable} and returns a {@link CompletableSource}.
+ * @param upstream the upstream {@code Completable} instance
+ * @return the transformed {@code CompletableSource} instance
*/
@NonNull
CompletableSource apply(@NonNull Completable upstream);
diff --git a/src/main/java/io/reactivex/rxjava3/core/Emitter.java b/src/main/java/io/reactivex/rxjava3/core/Emitter.java
index e222f9c44f..0fc2b47c0d 100644
--- a/src/main/java/io/reactivex/rxjava3/core/Emitter.java
+++ b/src/main/java/io/reactivex/rxjava3/core/Emitter.java
@@ -29,13 +29,13 @@ public interface Emitter {
/**
* Signal a normal value.
- * @param value the value to signal, not null
+ * @param value the value to signal, not {@code null}
*/
void onNext(@NonNull T value);
/**
- * Signal a Throwable exception.
- * @param error the Throwable to signal, not null
+ * Signal a {@link Throwable} exception.
+ * @param error the {@code Throwable} to signal, not {@code null}
*/
void onError(@NonNull Throwable error);
diff --git a/src/main/java/io/reactivex/rxjava3/core/Flowable.java b/src/main/java/io/reactivex/rxjava3/core/Flowable.java
index 09cb859b83..0b2e9197cb 100644
--- a/src/main/java/io/reactivex/rxjava3/core/Flowable.java
+++ b/src/main/java/io/reactivex/rxjava3/core/Flowable.java
@@ -1434,7 +1434,7 @@ public static Flowable concatArrayEager(Publisher extends T>... sources
*
* @param the value type
* @param sources an array of Publishers that need to be eagerly concatenated
- * @param maxConcurrency the maximum number of concurrent subscriptions at a time, Integer.MAX_VALUE
+ * @param maxConcurrency the maximum number of concurrent subscriptions at a time, {@link Integer#MAX_VALUE}
* is interpreted as an indication to subscribe to all sources at once
* @param prefetch the number of elements to prefetch from each Publisher source
* @return the new Publisher instance with the specified concatenation behavior
@@ -1503,7 +1503,7 @@ public static Flowable concatArrayEagerDelayError(Publisher extends T>.
*
* @param the value type
* @param sources an array of {@code Publisher}s that need to be eagerly concatenated
- * @param maxConcurrency the maximum number of concurrent subscriptions at a time, Integer.MAX_VALUE
+ * @param maxConcurrency the maximum number of concurrent subscriptions at a time, {@link Integer#MAX_VALUE}
* is interpreted as indication to subscribe to all sources at once
* @param prefetch the number of elements to prefetch from each {@code Publisher} source
* @return the new Flowable instance with the specified concatenation behavior
@@ -1635,7 +1635,7 @@ public static Flowable concatEager(Publisher extends Publisher extend
*
* @param the value type
* @param sources a sequence of Publishers that need to be eagerly concatenated
- * @param maxConcurrency the maximum number of concurrently running inner Publishers; Integer.MAX_VALUE
+ * @param maxConcurrency the maximum number of concurrently running inner Publishers; {@link Integer#MAX_VALUE}
* is interpreted as all inner Publishers can be active at the same time
* @param prefetch the number of elements to prefetch from each inner Publisher source
* @return the new Publisher instance with the specified concatenation behavior
@@ -1695,7 +1695,7 @@ public static Flowable concatEager(Iterable extends Publisher extends
*
* @param the value type
* @param sources a sequence of Publishers that need to be eagerly concatenated
- * @param maxConcurrency the maximum number of concurrently running inner Publishers; Integer.MAX_VALUE
+ * @param maxConcurrency the maximum number of concurrently running inner Publishers; {@link Integer#MAX_VALUE}
* is interpreted as all inner Publishers can be active at the same time
* @param prefetch the number of elements to prefetch from each inner Publisher source
* @return the new Publisher instance with the specified concatenation behavior
@@ -3976,7 +3976,7 @@ public static Flowable never() {
* @return a Flowable that emits a range of sequential Integers
* @throws IllegalArgumentException
* if {@code count} is less than zero, or if {@code start} + {@code count} − 1 exceeds
- * {@code Integer.MAX_VALUE}
+ * {@link Integer#MAX_VALUE}
* @see ReactiveX operators documentation: Range
*/
@CheckReturnValue
@@ -4016,7 +4016,7 @@ public static Flowable range(int start, int count) {
* @return a Flowable that emits a range of sequential Longs
* @throws IllegalArgumentException
* if {@code count} is less than zero, or if {@code start} + {@code count} − 1 exceeds
- * {@code Long.MAX_VALUE}
+ * {@link Long#MAX_VALUE}
* @see ReactiveX operators documentation: Range
*/
@CheckReturnValue
@@ -6258,7 +6258,7 @@ public final > Flowable buffer(int count, Sup
*
*
*
Backpressure:
- *
This operator does not support backpressure as it uses time. It requests {@code Long.MAX_VALUE}
+ *
This operator does not support backpressure as it uses time. It requests {@link Long#MAX_VALUE}
* upstream and does not obey downstream requests.
*
Scheduler:
*
This version of {@code buffer} operates by default on the {@code computation} {@link Scheduler}.
@@ -6293,7 +6293,7 @@ public final Flowable> buffer(long timespan, long timeskip, TimeUnit uni
*
*
*
Backpressure:
- *
This operator does not support backpressure as it uses time. It requests {@code Long.MAX_VALUE}
+ *
This operator does not support backpressure as it uses time. It requests {@link Long#MAX_VALUE}
* upstream and does not obey downstream requests.
*
Scheduler:
*
You specify which {@link Scheduler} this operator will use.
@@ -6330,7 +6330,7 @@ public final Flowable> buffer(long timespan, long timeskip, TimeUnit uni
*
*
*
Backpressure:
- *
This operator does not support backpressure as it uses time. It requests {@code Long.MAX_VALUE}
+ *
This operator does not support backpressure as it uses time. It requests {@link Long#MAX_VALUE}
* upstream and does not obey downstream requests.
*
Scheduler:
*
You specify which {@link Scheduler} this operator will use.
@@ -6375,7 +6375,7 @@ public final > Flowable buffer(long timespan,
*
*
*
Backpressure:
- *
This operator does not support backpressure as it uses time. It requests {@code Long.MAX_VALUE}
+ *
This operator does not support backpressure as it uses time. It requests {@link Long#MAX_VALUE}
* upstream and does not obey downstream requests.
*
Scheduler:
*
This version of {@code buffer} operates by default on the {@code computation} {@link Scheduler}.
@@ -6408,7 +6408,7 @@ public final Flowable> buffer(long timespan, TimeUnit unit) {
*
*
*
Backpressure:
- *
This operator does not support backpressure as it uses time. It requests {@code Long.MAX_VALUE}
+ *
This operator does not support backpressure as it uses time. It requests {@link Long#MAX_VALUE}
* upstream and does not obey downstream requests.
*
Scheduler:
*
This version of {@code buffer} operates by default on the {@code computation} {@link Scheduler}.
@@ -6445,7 +6445,7 @@ public final Flowable> buffer(long timespan, TimeUnit unit, int count) {
*
*
*
Backpressure:
- *
This operator does not support backpressure as it uses time. It requests {@code Long.MAX_VALUE}
+ *
This operator does not support backpressure as it uses time. It requests {@link Long#MAX_VALUE}
* upstream and does not obey downstream requests.
*
Scheduler:
*
You specify which {@link Scheduler} this operator will use.
@@ -6484,7 +6484,7 @@ public final Flowable> buffer(long timespan, TimeUnit unit, Scheduler sc
*
*
*
Backpressure:
- *
This operator does not support backpressure as it uses time. It requests {@code Long.MAX_VALUE}
+ *
This operator does not support backpressure as it uses time. It requests {@link Long#MAX_VALUE}
* upstream and does not obey downstream requests.
*
Scheduler:
*
You specify which {@link Scheduler} this operator will use.
@@ -6536,7 +6536,7 @@ public final > Flowable buffer(
*
*
*
Backpressure:
- *
This operator does not support backpressure as it uses time. It requests {@code Long.MAX_VALUE}
+ *
This operator does not support backpressure as it uses time. It requests {@link Long#MAX_VALUE}
* upstream and does not obey downstream requests.
*
Scheduler:
*
You specify which {@link Scheduler} this operator will use.
@@ -6571,7 +6571,7 @@ public final Flowable> buffer(long timespan, TimeUnit unit, Scheduler sc
*
*
Backpressure:
*
This operator does not support backpressure as it is instead controlled by the given Publishers and
- * buffers data. It requests {@code Long.MAX_VALUE} upstream and does not obey downstream requests.
+ * buffers data. It requests {@link Long#MAX_VALUE} upstream and does not obey downstream requests.
*
Scheduler:
*
This version of {@code buffer} does not operate by default on a particular {@link Scheduler}.
*
@@ -6607,7 +6607,7 @@ public final Flowable> buffer(
*
*
Backpressure:
*
This operator does not support backpressure as it is instead controlled by the given Publishers and
- * buffers data. It requests {@code Long.MAX_VALUE} upstream and does not obey downstream requests.
+ * buffers data. It requests {@link Long#MAX_VALUE} upstream and does not obey downstream requests.
*
Scheduler:
*
This version of {@code buffer} does not operate by default on a particular {@link Scheduler}.
*
@@ -6652,7 +6652,7 @@ public final > Flowable b
*
*
Backpressure:
*
This operator does not support backpressure as it is instead controlled by the {@code Publisher}
- * {@code boundary} and buffers data. It requests {@code Long.MAX_VALUE} upstream and does not obey
+ * {@code boundary} and buffers data. It requests {@link Long#MAX_VALUE} upstream and does not obey
* downstream requests.
*
Scheduler:
*
This version of {@code buffer} does not operate by default on a particular {@link Scheduler}.
@@ -6686,7 +6686,7 @@ public final Flowable> buffer(Publisher boundaryIndicator) {
*
*
Backpressure:
*
This operator does not support backpressure as it is instead controlled by the {@code Publisher}
- * {@code boundary} and buffers data. It requests {@code Long.MAX_VALUE} upstream and does not obey
+ * {@code boundary} and buffers data. It requests {@link Long#MAX_VALUE} upstream and does not obey
* downstream requests.
*
Scheduler:
*
This version of {@code buffer} does not operate by default on a particular {@link Scheduler}.
@@ -6723,7 +6723,7 @@ public final Flowable> buffer(Publisher boundaryIndicator, final
*
*
Backpressure:
*
This operator does not support backpressure as it is instead controlled by the {@code Publisher}
- * {@code boundary} and buffers data. It requests {@code Long.MAX_VALUE} upstream and does not obey
+ * {@code boundary} and buffers data. It requests {@link Long#MAX_VALUE} upstream and does not obey
* downstream requests.
*
Scheduler:
*
This version of {@code buffer} does not operate by default on a particular {@link Scheduler}.
@@ -8503,7 +8503,7 @@ public final Flowable delay(Publisher subscriptionIndicator,
*
*
Backpressure:
*
The operator forwards the backpressure requests to this Publisher once
- * the subscription happens and requests Long.MAX_VALUE from the other Publisher
+ * the subscription happens and requests {@link Long#MAX_VALUE} from the other Publisher
*
Scheduler:
*
This method does not operate by default on a particular {@link Scheduler}.
*
@@ -10004,9 +10004,9 @@ public final Completable flatMapCompletable(Function super T, ? extends Comple
* waits until the upstream and all CompletableSources complete, optionally delaying all errors.
*
*
Backpressure:
- *
If {@code maxConcurrency == Integer.MAX_VALUE} the operator consumes the upstream in an unbounded manner.
+ *
If {@code maxConcurrency == }{@link Integer#MAX_VALUE} the operator consumes the upstream in an unbounded manner.
* Otherwise, the operator expects the upstream to honor backpressure. If the upstream doesn't support backpressure
- * the operator behaves as if {@code maxConcurrency == Integer.MAX_VALUE} was used.
+ * the operator behaves as if {@code maxConcurrency == }{@link Integer#MAX_VALUE} was used.
*
Scheduler:
*
{@code flatMapCompletable} does not operate by default on a particular {@link Scheduler}.
*
@@ -10198,9 +10198,9 @@ public final Flowable flatMapMaybe(Function super T, ? extends MaybeSou
* in no particular order, into a single Flowable sequence, optionally delaying all errors.
*
*
Backpressure:
- *
If {@code maxConcurrency == Integer.MAX_VALUE} the operator consumes the upstream in an unbounded manner.
+ *
If {@code maxConcurrency == }{@link Integer#MAX_VALUE} the operator consumes the upstream in an unbounded manner.
* Otherwise, the operator expects the upstream to honor backpressure. If the upstream doesn't support backpressure
- * the operator behaves as if {@code maxConcurrency == Integer.MAX_VALUE} was used.
+ * the operator behaves as if {@code maxConcurrency == }{@link Integer#MAX_VALUE} was used.
*
Scheduler:
*
{@code flatMapMaybe} does not operate by default on a particular {@link Scheduler}.
*
@@ -10247,9 +10247,9 @@ public final Flowable flatMapSingle(Function super T, ? extends SingleS
* in no particular order, into a single Flowable sequence, optionally delaying all errors.
*
*
Backpressure:
- *
If {@code maxConcurrency == Integer.MAX_VALUE} the operator consumes the upstream in an unbounded manner.
+ *
If {@code maxConcurrency == }{@link Integer#MAX_VALUE} the operator consumes the upstream in an unbounded manner.
* Otherwise, the operator expects the upstream to honor backpressure. If the upstream doesn't support backpressure
- * the operator behaves as if {@code maxConcurrency == Integer.MAX_VALUE} was used.
+ * the operator behaves as if {@code maxConcurrency == }{@link Integer#MAX_VALUE} was used.
*
Scheduler:
*
{@code flatMapSingle} does not operate by default on a particular {@link Scheduler}.
*
@@ -10415,7 +10415,7 @@ public final Disposable forEachWhile(final Predicate super T> onNext, final Co
* coordination of the {@code groupBy} operator. Such hangs can be usually avoided by using
* {@link #flatMap(Function, int)} or {@link #concatMapEager(Function, int, int)} and overriding the default maximum concurrency
* value to be greater or equal to the expected number of groups, possibly using
- * {@code Integer.MAX_VALUE} if the number of expected groups is unknown.
+ * {@link Integer#MAX_VALUE} if the number of expected groups is unknown.
*
* Note also that ignoring groups or subscribing later (i.e., on another thread) will result in
* so-called group abandonment where a group will only contain one element and the group will be
@@ -10473,7 +10473,7 @@ public final Flowable> groupBy(Function super T, ? e
* coordination of the {@code groupBy} operator. Such hangs can be usually avoided by using
* {@link #flatMap(Function, int)} or {@link #concatMapEager(Function, int, int)} and overriding the default maximum concurrency
* value to be greater or equal to the expected number of groups, possibly using
- * {@code Integer.MAX_VALUE} if the number of expected groups is unknown.
+ * {@link Integer#MAX_VALUE} if the number of expected groups is unknown.
*
* Note also that ignoring groups or subscribing later (i.e., on another thread) will result in
* so-called group abandonment where a group will only contain one element and the group will be
@@ -10532,7 +10532,7 @@ public final Flowable> groupBy(Function super T, ? e
* coordination of the {@code groupBy} operator. Such hangs can be usually avoided by using
* {@link #flatMap(Function, int)} or {@link #concatMapEager(Function, int, int)} and overriding the default maximum concurrency
* value to be greater or equal to the expected number of groups, possibly using
- * {@code Integer.MAX_VALUE} if the number of expected groups is unknown.
+ * {@link Integer#MAX_VALUE} if the number of expected groups is unknown.
*
* Note also that ignoring groups or subscribing later (i.e., on another thread) will result in
* so-called group abandonment where a group will only contain one element and the group will be
@@ -10596,7 +10596,7 @@ public final Flowable> groupBy(Function super T,
* coordination of the {@code groupBy} operator. Such hangs can be usually avoided by using
* {@link #flatMap(Function, int)} or {@link #concatMapEager(Function, int, int)} and overriding the default maximum concurrency
* value to be greater or equal to the expected number of groups, possibly using
- * {@code Integer.MAX_VALUE} if the number of expected groups is unknown.
+ * {@link Integer#MAX_VALUE} if the number of expected groups is unknown.
*
* Note also that ignoring groups or subscribing later (i.e., on another thread) will result in
* so-called group abandonment where a group will only contain one element and the group will be
@@ -10661,7 +10661,7 @@ public final Flowable> groupBy(Function super T,
* coordination of the {@code groupBy} operator. Such hangs can be usually avoided by using
* {@link #flatMap(Function, int)} or {@link #concatMapEager(Function, int, int)} and overriding the default maximum concurrency
* value to be greater or equal to the expected number of groups, possibly using
- * {@code Integer.MAX_VALUE} if the number of expected groups is unknown.
+ * {@link Integer#MAX_VALUE} if the number of expected groups is unknown.
*
* Note also that ignoring groups or subscribing later (i.e., on another thread) will result in
* so-called group abandonment where a group will only contain one element and the group will be
@@ -10774,7 +10774,7 @@ public final Flowable> groupBy(Function super T,
* coordination of the {@code groupBy} operator. Such hangs can be usually avoided by using
* {@link #flatMap(Function, int)} or {@link #concatMapEager(Function, int, int)} and overriding the default maximum concurrency
* value to be greater or equal to the expected number of groups, possibly using
- * {@code Integer.MAX_VALUE} if the number of expected groups is unknown.
+ * {@link Integer#MAX_VALUE} if the number of expected groups is unknown.
*
* Note also that ignoring groups or subscribing later (i.e., on another thread) will result in
* so-called group abandonment where a group will only contain one element and the group will be
@@ -12337,7 +12337,7 @@ public final ConnectableFlowable publish(int bufferSize) {
* Requests {@code n} initially from the upstream and then 75% of {@code n} subsequently
* after 75% of {@code n} values have been emitted to the downstream.
*
- *
This operator allows preventing the downstream to trigger unbounded mode via {@code request(Long.MAX_VALUE)}
+ *
This operator allows preventing the downstream to trigger unbounded mode via {@code request(}{@link Long#MAX_VALUE}{@code )}
* or compensate for the per-item overhead of small and frequent requests.
*
*
@@ -17224,7 +17224,7 @@ public final Single
* @param the value type
* @param sources an array of {@code ObservableSource}s that need to be eagerly concatenated
- * @param maxConcurrency the maximum number of concurrent subscriptions at a time, Integer.MAX_VALUE
+ * @param maxConcurrency the maximum number of concurrent subscriptions at a time, {@link Integer#MAX_VALUE}
* is interpreted as indication to subscribe to all sources at once
* @param prefetch the number of elements to prefetch from each {@code ObservableSource} source
* @return the new Observable instance with the specified concatenation behavior
@@ -1432,7 +1432,7 @@ public static Observable concatEager(ObservableSource extends Observabl
*
* @param the value type
* @param sources a sequence of ObservableSources that need to be eagerly concatenated
- * @param maxConcurrency the maximum number of concurrently running inner ObservableSources; Integer.MAX_VALUE
+ * @param maxConcurrency the maximum number of concurrently running inner ObservableSources; {@link Integer#MAX_VALUE}
* is interpreted as all inner ObservableSources can be active at the same time
* @param prefetch the number of elements to prefetch from each inner ObservableSource source
* @return the new ObservableSource instance with the specified concatenation behavior
@@ -1482,7 +1482,7 @@ public static Observable concatEager(Iterable extends ObservableSource<
*
* @param the value type
* @param sources a sequence of ObservableSources that need to be eagerly concatenated
- * @param maxConcurrency the maximum number of concurrently running inner ObservableSources; Integer.MAX_VALUE
+ * @param maxConcurrency the maximum number of concurrently running inner ObservableSources; {@link Integer#MAX_VALUE}
* is interpreted as all inner ObservableSources can be active at the same time
* @param prefetch the number of elements to prefetch from each inner ObservableSource source
* @return the new ObservableSource instance with the specified concatenation behavior
@@ -3562,7 +3562,7 @@ public static Observable never() {
* @return an Observable that emits a range of sequential Integers
* @throws IllegalArgumentException
* if {@code count} is less than zero, or if {@code start} + {@code count} − 1 exceeds
- * {@code Integer.MAX_VALUE}
+ * {@link Integer#MAX_VALUE}
* @see ReactiveX operators documentation: Range
*/
@CheckReturnValue
@@ -3599,7 +3599,7 @@ public static Observable range(final int start, final int count) {
* @return an Observable that emits a range of sequential Longs
* @throws IllegalArgumentException
* if {@code count} is less than zero, or if {@code start} + {@code count} − 1 exceeds
- * {@code Long.MAX_VALUE}
+ * {@link Long#MAX_VALUE}
* @see ReactiveX operators documentation: Range
*/
@CheckReturnValue
diff --git a/src/main/java/io/reactivex/rxjava3/core/ObservableConverter.java b/src/main/java/io/reactivex/rxjava3/core/ObservableConverter.java
index 40b432cfea..acdde63b29 100644
--- a/src/main/java/io/reactivex/rxjava3/core/ObservableConverter.java
+++ b/src/main/java/io/reactivex/rxjava3/core/ObservableConverter.java
@@ -16,7 +16,7 @@
import io.reactivex.rxjava3.annotations.NonNull;
/**
- * Convenience interface and callback used by the {@link Observable#to} operator to turn an Observable into another
+ * Convenience interface and callback used by the {@link Observable#to} operator to turn an {@link Observable} into another
* value fluently.
*
History: 2.1.7 - experimental
* @param the upstream type
@@ -26,9 +26,9 @@
@FunctionalInterface
public interface ObservableConverter {
/**
- * Applies a function to the upstream Observable and returns a converted value of type {@code R}.
+ * Applies a function to the upstream {@link Observable} and returns a converted value of type {@code R}.
*
- * @param upstream the upstream Observable instance
+ * @param upstream the upstream {@code Observable} instance
* @return the converted value
*/
@NonNull
diff --git a/src/main/java/io/reactivex/rxjava3/core/ObservableEmitter.java b/src/main/java/io/reactivex/rxjava3/core/ObservableEmitter.java
index f4f7ef6bb0..776d6963d9 100644
--- a/src/main/java/io/reactivex/rxjava3/core/ObservableEmitter.java
+++ b/src/main/java/io/reactivex/rxjava3/core/ObservableEmitter.java
@@ -50,16 +50,18 @@
public interface ObservableEmitter extends Emitter {
/**
- * Sets a Disposable on this emitter; any previous {@link Disposable}
+ * Sets a {@link Disposable} on this emitter; any previous {@code Disposable}
* or {@link Cancellable} will be disposed/cancelled.
- * @param d the disposable, null is allowed
+ *
This method is thread-safe.
+ * @param d the {@code Disposable}, {@code null} is allowed
*/
void setDisposable(@Nullable Disposable d);
/**
- * Sets a Cancellable on this emitter; any previous {@link Disposable}
- * or {@link Cancellable} will be disposed/cancelled.
- * @param c the cancellable resource, null is allowed
+ * Sets a {@link Cancellable} on this emitter; any previous {@link Disposable}
+ * or {@code Cancellable} will be disposed/cancelled.
+ *
This method is thread-safe.
+ * @param c the {@code Cancellable} resource, {@code null} is allowed
*/
void setCancellable(@Nullable Cancellable c);
@@ -73,21 +75,21 @@ public interface ObservableEmitter extends Emitter {
boolean isDisposed();
/**
- * Ensures that calls to onNext, onError and onComplete are properly serialized.
- * @return the serialized ObservableEmitter
+ * Ensures that calls to {@code onNext}, {@code onError} and {@code onComplete} are properly serialized.
+ * @return the serialized {@link ObservableEmitter}
*/
@NonNull
ObservableEmitter serialize();
/**
- * Attempts to emit the specified {@code Throwable} error if the downstream
+ * Attempts to emit the specified {@link Throwable} error if the downstream
* hasn't cancelled the sequence or is otherwise terminated, returning false
* if the emission is not allowed to happen due to lifecycle restrictions.
*
- * Unlike {@link #onError(Throwable)}, the {@code RxJavaPlugins.onError} is not called
- * if the error could not be delivered.
+ * Unlike {@link #onError(Throwable)}, the {@link io.reactivex.rxjava3.plugins.RxJavaPlugins#onError(Throwable) RxjavaPlugins.onError}
+ * is not called if the error could not be delivered.
*
History: 2.1.1 - experimental
- * @param t the throwable error to signal if possible
+ * @param t the {@code Throwable} error to signal if possible
* @return true if successful, false if the downstream is not able to accept further
* events
* @since 2.2
diff --git a/src/main/java/io/reactivex/rxjava3/core/ObservableOnSubscribe.java b/src/main/java/io/reactivex/rxjava3/core/ObservableOnSubscribe.java
index 43d6252849..bcde496462 100644
--- a/src/main/java/io/reactivex/rxjava3/core/ObservableOnSubscribe.java
+++ b/src/main/java/io/reactivex/rxjava3/core/ObservableOnSubscribe.java
@@ -25,8 +25,8 @@
public interface ObservableOnSubscribe {
/**
- * Called for each Observer that subscribes.
- * @param emitter the safe emitter instance, never null
+ * Called for each {@link Observer} that subscribes.
+ * @param emitter the safe emitter instance, never {@code null}
* @throws Throwable on error
*/
void subscribe(@NonNull ObservableEmitter emitter) throws Throwable;
diff --git a/src/main/java/io/reactivex/rxjava3/core/ObservableOperator.java b/src/main/java/io/reactivex/rxjava3/core/ObservableOperator.java
index 76ec06e295..ff9dbc5852 100644
--- a/src/main/java/io/reactivex/rxjava3/core/ObservableOperator.java
+++ b/src/main/java/io/reactivex/rxjava3/core/ObservableOperator.java
@@ -16,7 +16,7 @@
import io.reactivex.rxjava3.annotations.NonNull;
/**
- * Interface to map/wrap a downstream observer to an upstream observer.
+ * Interface to map/wrap a downstream {@link Observer} to an upstream {@code Observer}.
*
* @param the value type of the downstream
* @param the value type of the upstream
@@ -24,11 +24,11 @@
@FunctionalInterface
public interface ObservableOperator {
/**
- * Applies a function to the child Observer and returns a new parent Observer.
- * @param observer the child Observer instance
- * @return the parent Observer instance
- * @throws Exception on failure
+ * Applies a function to the child {@link Observer} and returns a new parent {@code Observer}.
+ * @param observer the child {@code Observer} instance
+ * @return the parent {@code Observer} instance
+ * @throws Throwable on failure
*/
@NonNull
- Observer super Upstream> apply(@NonNull Observer super Downstream> observer) throws Exception;
+ Observer super Upstream> apply(@NonNull Observer super Downstream> observer) throws Throwable;
}
diff --git a/src/main/java/io/reactivex/rxjava3/core/ObservableSource.java b/src/main/java/io/reactivex/rxjava3/core/ObservableSource.java
index ad5d5c79c3..c2309e5b8d 100644
--- a/src/main/java/io/reactivex/rxjava3/core/ObservableSource.java
+++ b/src/main/java/io/reactivex/rxjava3/core/ObservableSource.java
@@ -25,9 +25,9 @@
public interface ObservableSource {
/**
- * Subscribes the given Observer to this ObservableSource instance.
- * @param observer the Observer, not null
- * @throws NullPointerException if {@code observer} is null
+ * Subscribes the given {@link Observer} to this {@link ObservableSource} instance.
+ * @param observer the {@code Observer}, not {@code null}
+ * @throws NullPointerException if {@code observer} is {@code null}
*/
void subscribe(@NonNull Observer super T> observer);
}
diff --git a/src/main/java/io/reactivex/rxjava3/core/ObservableTransformer.java b/src/main/java/io/reactivex/rxjava3/core/ObservableTransformer.java
index 259deefa85..18ce5e02c9 100644
--- a/src/main/java/io/reactivex/rxjava3/core/ObservableTransformer.java
+++ b/src/main/java/io/reactivex/rxjava3/core/ObservableTransformer.java
@@ -16,7 +16,7 @@
import io.reactivex.rxjava3.annotations.NonNull;
/**
- * Interface to compose Observables.
+ * Interface to compose {@link Observable}s.
*
* @param the upstream value type
* @param the downstream value type
@@ -24,10 +24,10 @@
@FunctionalInterface
public interface ObservableTransformer {
/**
- * Applies a function to the upstream Observable and returns an ObservableSource with
+ * Applies a function to the upstream {@link Observable} and returns an {@link ObservableSource} with
* optionally different element type.
- * @param upstream the upstream Observable instance
- * @return the transformed ObservableSource instance
+ * @param upstream the upstream {@code Observable} instance
+ * @return the transformed {@code ObservableSource} instance
*/
@NonNull
ObservableSource apply(@NonNull Observable upstream);
diff --git a/src/main/java/io/reactivex/rxjava3/core/Observer.java b/src/main/java/io/reactivex/rxjava3/core/Observer.java
index ef95e18034..de02f8ac88 100644
--- a/src/main/java/io/reactivex/rxjava3/core/Observer.java
+++ b/src/main/java/io/reactivex/rxjava3/core/Observer.java
@@ -76,17 +76,17 @@
public interface Observer {
/**
- * Provides the Observer with the means of cancelling (disposing) the
- * connection (channel) with the Observable in both
+ * Provides the {@link Observer} with the means of cancelling (disposing) the
+ * connection (channel) with the {@link Observable} in both
* synchronous (from within {@link #onNext(Object)}) and asynchronous manner.
- * @param d the Disposable instance whose {@link Disposable#dispose()} can
+ * @param d the {@link Disposable} instance whose {@link Disposable#dispose()} can
* be called anytime to cancel the connection
* @since 2.0
*/
void onSubscribe(@NonNull Disposable d);
/**
- * Provides the Observer with a new item to observe.
+ * Provides the {@link Observer} with a new item to observe.
*
* The {@link Observable} may call this method 0 or more times.
*
@@ -99,9 +99,9 @@ public interface Observer {
void onNext(@NonNull T t);
/**
- * Notifies the Observer that the {@link Observable} has experienced an error condition.
+ * Notifies the {@link Observer} that the {@link Observable} has experienced an error condition.
*
- * If the {@link Observable} calls this method, it will not thereafter call {@link #onNext} or
+ * If the {@code Observable} calls this method, it will not thereafter call {@link #onNext} or
* {@link #onComplete}.
*
* @param e
@@ -110,9 +110,9 @@ public interface Observer {
void onError(@NonNull Throwable e);
/**
- * Notifies the Observer that the {@link Observable} has finished sending push-based notifications.
+ * Notifies the {@link Observer} that the {@link Observable} has finished sending push-based notifications.
*
- * The {@link Observable} will not call this method if it calls {@link #onError}.
+ * The {@code Observable} will not call this method if it calls {@link #onError}.
*/
void onComplete();
diff --git a/src/main/java/io/reactivex/rxjava3/core/Single.java b/src/main/java/io/reactivex/rxjava3/core/Single.java
index 27fad6d53e..612d57a25e 100644
--- a/src/main/java/io/reactivex/rxjava3/core/Single.java
+++ b/src/main/java/io/reactivex/rxjava3/core/Single.java
@@ -2346,7 +2346,7 @@ public final Single delaySubscription(ObservableSource other) {
*
*
If the delaying source signals an error, that error is re-emitted and no subscription
* to the current Single happens.
- *
The other source is consumed in an unbounded manner (requesting Long.MAX_VALUE from it).
+ *
The other source is consumed in an unbounded manner (requesting {@link Long#MAX_VALUE} from it).
*
*
Backpressure:
*
The {@code other} publisher is consumed in an unbounded fashion but will be
diff --git a/src/main/java/io/reactivex/rxjava3/core/SingleConverter.java b/src/main/java/io/reactivex/rxjava3/core/SingleConverter.java
index 4adc22b8c5..eb100b6c87 100644
--- a/src/main/java/io/reactivex/rxjava3/core/SingleConverter.java
+++ b/src/main/java/io/reactivex/rxjava3/core/SingleConverter.java
@@ -16,7 +16,7 @@
import io.reactivex.rxjava3.annotations.NonNull;
/**
- * Convenience interface and callback used by the {@link Single#to} operator to turn a Single into another
+ * Convenience interface and callback used by the {@link Single#to} operator to turn a {@link Single} into another
* value fluently.
*
History: 2.1.7 - experimental
* @param the upstream type
@@ -26,9 +26,9 @@
@FunctionalInterface
public interface SingleConverter {
/**
- * Applies a function to the upstream Single and returns a converted value of type {@code R}.
+ * Applies a function to the upstream {@link Single} and returns a converted value of type {@code R}.
*
- * @param upstream the upstream Single instance
+ * @param upstream the upstream {@code Single} instance
* @return the converted value
*/
@NonNull
diff --git a/src/main/java/io/reactivex/rxjava3/core/SingleEmitter.java b/src/main/java/io/reactivex/rxjava3/core/SingleEmitter.java
index a2302c5f4c..bb3768243d 100644
--- a/src/main/java/io/reactivex/rxjava3/core/SingleEmitter.java
+++ b/src/main/java/io/reactivex/rxjava3/core/SingleEmitter.java
@@ -57,21 +57,23 @@ public interface SingleEmitter {
/**
* Signal an exception.
- * @param t the exception, not null
+ * @param t the exception, not {@code null}
*/
void onError(@NonNull Throwable t);
/**
- * Sets a Disposable on this emitter; any previous Disposable
- * or Cancellable will be disposed/cancelled.
- * @param d the disposable, null is allowed
+ * Sets a {@link Disposable} on this emitter; any previous {@code Disposable}
+ * or {@link Cancellable} will be disposed/cancelled.
+ *
This method is thread-safe.
+ * @param d the {@code Disposable}, {@code null} is allowed
*/
void setDisposable(@Nullable Disposable d);
/**
* Sets a Cancellable on this emitter; any previous {@link Disposable}
* or {@link Cancellable} will be disposed/cancelled.
- * @param c the cancellable resource, null is allowed
+ *
This method is thread-safe.
+ * @param c the {@code Cancellable} resource, {@code null} is allowed
*/
void setCancellable(@Nullable Cancellable c);
@@ -85,12 +87,12 @@ public interface SingleEmitter {
boolean isDisposed();
/**
- * Attempts to emit the specified {@code Throwable} error if the downstream
+ * Attempts to emit the specified {@link Throwable} error if the downstream
* hasn't cancelled the sequence or is otherwise terminated, returning false
* if the emission is not allowed to happen due to lifecycle restrictions.
*
- * Unlike {@link #onError(Throwable)}, the {@code RxJavaPlugins.onError} is not called
- * if the error could not be delivered.
+ * Unlike {@link #onError(Throwable)}, the {@link io.reactivex.rxjava3.plugins.RxJavaPlugins#onError(Throwable) RxjavaPlugins.onError}
+ * is not called if the error could not be delivered.
*
History: 2.1.1 - experimental
* @param t the throwable error to signal if possible
* @return true if successful, false if the downstream is not able to accept further
diff --git a/src/main/java/io/reactivex/rxjava3/core/SingleObserver.java b/src/main/java/io/reactivex/rxjava3/core/SingleObserver.java
index 778d6feabc..d2d788d38f 100644
--- a/src/main/java/io/reactivex/rxjava3/core/SingleObserver.java
+++ b/src/main/java/io/reactivex/rxjava3/core/SingleObserver.java
@@ -53,7 +53,7 @@
public interface SingleObserver {
/**
- * Provides the SingleObserver with the means of cancelling (disposing) the
+ * Provides the {@link SingleObserver} with the means of cancelling (disposing) the
* connection (channel) with the Single in both
* synchronous (from within {@code onSubscribe(Disposable)} itself) and asynchronous manner.
* @param d the Disposable instance whose {@link Disposable#dispose()} can
@@ -63,23 +63,23 @@ public interface SingleObserver {
void onSubscribe(@NonNull Disposable d);
/**
- * Notifies the SingleObserver with a single item and that the {@link Single} has finished sending
+ * Notifies the {@link SingleObserver} with a single item and that the {@link Single} has finished sending
* push-based notifications.
*
- * The {@link Single} will not call this method if it calls {@link #onError}.
+ * The {@code Single} will not call this method if it calls {@link #onError}.
*
* @param t
- * the item emitted by the Single
+ * the item emitted by the {@code Single}
*/
void onSuccess(@NonNull T t);
/**
- * Notifies the SingleObserver that the {@link Single} has experienced an error condition.
+ * Notifies the {@link SingleObserver} that the {@link Single} has experienced an error condition.
*
- * If the {@link Single} calls this method, it will not thereafter call {@link #onSuccess}.
+ * If the {@code Single} calls this method, it will not thereafter call {@link #onSuccess}.
*
* @param e
- * the exception encountered by the Single
+ * the exception encountered by the {@code Single}
*/
void onError(@NonNull Throwable e);
}
diff --git a/src/main/java/io/reactivex/rxjava3/core/SingleOnSubscribe.java b/src/main/java/io/reactivex/rxjava3/core/SingleOnSubscribe.java
index 46cc6a6f56..7a6a5adf68 100644
--- a/src/main/java/io/reactivex/rxjava3/core/SingleOnSubscribe.java
+++ b/src/main/java/io/reactivex/rxjava3/core/SingleOnSubscribe.java
@@ -25,8 +25,8 @@
public interface SingleOnSubscribe {
/**
- * Called for each SingleObserver that subscribes.
- * @param emitter the safe emitter instance, never null
+ * Called for each {@link SingleObserver} that subscribes.
+ * @param emitter the safe emitter instance, never {@code null}
* @throws Throwable on error
*/
void subscribe(@NonNull SingleEmitter emitter) throws Throwable;
diff --git a/src/main/java/io/reactivex/rxjava3/core/SingleOperator.java b/src/main/java/io/reactivex/rxjava3/core/SingleOperator.java
index 2fc2d80ab6..de2f38f23d 100644
--- a/src/main/java/io/reactivex/rxjava3/core/SingleOperator.java
+++ b/src/main/java/io/reactivex/rxjava3/core/SingleOperator.java
@@ -16,7 +16,7 @@
import io.reactivex.rxjava3.annotations.NonNull;
/**
- * Interface to map/wrap a downstream observer to an upstream observer.
+ * Interface to map/wrap a downstream {@link SingleObserver} to an upstream {@code SingleObserver}.
*
* @param the value type of the downstream
* @param the value type of the upstream
@@ -24,11 +24,11 @@
@FunctionalInterface
public interface SingleOperator {
/**
- * Applies a function to the child SingleObserver and returns a new parent SingleObserver.
- * @param observer the child SingleObserver instance
- * @return the parent SingleObserver instance
- * @throws Exception on failure
+ * Applies a function to the child {@link SingleObserver} and returns a new parent {@code SingleObserver}.
+ * @param observer the child {@code SingleObserver} instance
+ * @return the parent {@code SingleObserver} instance
+ * @throws Throwable on failure
*/
@NonNull
- SingleObserver super Upstream> apply(@NonNull SingleObserver super Downstream> observer) throws Exception;
+ SingleObserver super Upstream> apply(@NonNull SingleObserver super Downstream> observer) throws Throwable;
}
diff --git a/src/main/java/io/reactivex/rxjava3/core/SingleSource.java b/src/main/java/io/reactivex/rxjava3/core/SingleSource.java
index 496462d0e1..b4c923905d 100644
--- a/src/main/java/io/reactivex/rxjava3/core/SingleSource.java
+++ b/src/main/java/io/reactivex/rxjava3/core/SingleSource.java
@@ -28,9 +28,9 @@
public interface SingleSource {
/**
- * Subscribes the given SingleObserver to this SingleSource instance.
- * @param observer the SingleObserver, not null
- * @throws NullPointerException if {@code observer} is null
+ * Subscribes the given {@link SingleObserver} to this {@link SingleSource} instance.
+ * @param observer the {@code SingleObserver}, not {@code null}
+ * @throws NullPointerException if {@code observer} is {@code null}
*/
void subscribe(@NonNull SingleObserver super T> observer);
}
diff --git a/src/main/java/io/reactivex/rxjava3/core/SingleTransformer.java b/src/main/java/io/reactivex/rxjava3/core/SingleTransformer.java
index 08f9408cdb..1f0d478a0e 100644
--- a/src/main/java/io/reactivex/rxjava3/core/SingleTransformer.java
+++ b/src/main/java/io/reactivex/rxjava3/core/SingleTransformer.java
@@ -16,7 +16,7 @@
import io.reactivex.rxjava3.annotations.NonNull;
/**
- * Interface to compose Singles.
+ * Interface to compose {@link Single}s.
*
* @param the upstream value type
* @param the downstream value type
@@ -24,10 +24,10 @@
@FunctionalInterface
public interface SingleTransformer {
/**
- * Applies a function to the upstream Single and returns a SingleSource with
+ * Applies a function to the upstream {@link Single} and returns a {@link SingleSource} with
* optionally different element type.
- * @param upstream the upstream Single instance
- * @return the transformed SingleSource instance
+ * @param upstream the upstream {@code Single} instance
+ * @return the transformed {@code SingleSource} instance
*/
@NonNull
SingleSource apply(@NonNull Single upstream);
diff --git a/src/main/java/io/reactivex/rxjava3/disposables/CompositeDisposable.java b/src/main/java/io/reactivex/rxjava3/disposables/CompositeDisposable.java
index c9c0a8988c..3a75ac5f3e 100644
--- a/src/main/java/io/reactivex/rxjava3/disposables/CompositeDisposable.java
+++ b/src/main/java/io/reactivex/rxjava3/disposables/CompositeDisposable.java
@@ -37,7 +37,7 @@ public CompositeDisposable() {
/**
* Creates a CompositeDisposables with the given array of initial elements.
* @param disposables the array of Disposables to start with
- * @throws NullPointerException if {@code disposables} or any of its array items is null
+ * @throws NullPointerException if {@code disposables} or any of its array items is {@code null}
*/
public CompositeDisposable(@NonNull Disposable... disposables) {
Objects.requireNonNull(disposables, "disposables is null");
@@ -51,7 +51,7 @@ public CompositeDisposable(@NonNull Disposable... disposables) {
/**
* Creates a CompositeDisposables with the given Iterable sequence of initial elements.
* @param disposables the Iterable sequence of Disposables to start with
- * @throws NullPointerException if {@code disposables} or any of its items is null
+ * @throws NullPointerException if {@code disposables} or any of its items is {@code null}
*/
public CompositeDisposable(@NonNull Iterable extends Disposable> disposables) {
Objects.requireNonNull(disposables, "disposables is null");
@@ -90,7 +90,7 @@ public boolean isDisposed() {
* container has been disposed.
* @param disposable the disposable to add, not null
* @return true if successful, false if this container has been disposed
- * @throws NullPointerException if {@code disposable} is null
+ * @throws NullPointerException if {@code disposable} is {@code null}
*/
@Override
public boolean add(@NonNull Disposable disposable) {
@@ -117,7 +117,7 @@ public boolean add(@NonNull Disposable disposable) {
* disposes them all if the container has been disposed.
* @param disposables the array of Disposables
* @return true if the operation was successful, false if the container has been disposed
- * @throws NullPointerException if {@code disposables} or any of its array items is null
+ * @throws NullPointerException if {@code disposables} or any of its array items is {@code null}
*/
public boolean addAll(@NonNull Disposable... disposables) {
Objects.requireNonNull(disposables, "disposables is null");
@@ -163,7 +163,7 @@ public boolean remove(@NonNull Disposable disposable) {
* container.
* @param disposable the disposable to remove, not null
* @return true if the operation was successful
- * @throws NullPointerException if {@code disposable} is null
+ * @throws NullPointerException if {@code disposable} is {@code null}
*/
@Override
public boolean delete(@NonNull Disposable disposable) {
diff --git a/src/main/java/io/reactivex/rxjava3/exceptions/UndeliverableException.java b/src/main/java/io/reactivex/rxjava3/exceptions/UndeliverableException.java
index 6c84920e74..08f926e9b3 100644
--- a/src/main/java/io/reactivex/rxjava3/exceptions/UndeliverableException.java
+++ b/src/main/java/io/reactivex/rxjava3/exceptions/UndeliverableException.java
@@ -14,7 +14,7 @@
package io.reactivex.rxjava3.exceptions;
/**
- * Wrapper for Throwable errors that are sent to `RxJavaPlugins.onError`.
+ * Wrapper for Throwable errors that are sent to {@link io.reactivex.rxjava3.plugins.RxJavaPlugins#onError(Throwable) RxJavaPlugins.onError}.
*
History: 2.0.6 - experimental; 2.1 - beta
* @since 2.2
*/
diff --git a/src/main/java/io/reactivex/rxjava3/internal/util/BackpressureHelper.java b/src/main/java/io/reactivex/rxjava3/internal/util/BackpressureHelper.java
index 9237c11a18..4ccfe67845 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/util/BackpressureHelper.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/util/BackpressureHelper.java
@@ -14,6 +14,7 @@
import java.util.concurrent.atomic.AtomicLong;
+import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
/**
@@ -26,10 +27,10 @@ private BackpressureHelper() {
}
/**
- * Adds two long values and caps the sum at Long.MAX_VALUE.
+ * Adds two long values and caps the sum at {@link Long#MAX_VALUE}.
* @param a the first value
* @param b the second value
- * @return the sum capped at Long.MAX_VALUE
+ * @return the sum capped at {@link Long#MAX_VALUE}
*/
public static long addCap(long a, long b) {
long u = a + b;
@@ -40,10 +41,10 @@ public static long addCap(long a, long b) {
}
/**
- * Multiplies two long values and caps the product at Long.MAX_VALUE.
+ * Multiplies two long values and caps the product at {@link Long#MAX_VALUE}.
* @param a the first value
* @param b the second value
- * @return the product capped at Long.MAX_VALUE
+ * @return the product capped at {@link Long#MAX_VALUE}
*/
public static long multiplyCap(long a, long b) {
long u = a * b;
@@ -56,13 +57,13 @@ public static long multiplyCap(long a, long b) {
}
/**
- * Atomically adds the positive value n to the requested value in the AtomicLong and
- * caps the result at Long.MAX_VALUE and returns the previous value.
- * @param requested the AtomicLong holding the current requested value
+ * Atomically adds the positive value n to the requested value in the {@link AtomicLong} and
+ * caps the result at {@link Long#MAX_VALUE} and returns the previous value.
+ * @param requested the {@code AtomicLong} holding the current requested value
* @param n the value to add, must be positive (not verified)
* @return the original value before the add
*/
- public static long add(AtomicLong requested, long n) {
+ public static long add(@NonNull AtomicLong requested, long n) {
for (;;) {
long r = requested.get();
if (r == Long.MAX_VALUE) {
@@ -76,14 +77,14 @@ public static long add(AtomicLong requested, long n) {
}
/**
- * Atomically adds the positive value n to the requested value in the AtomicLong and
- * caps the result at Long.MAX_VALUE and returns the previous value and
- * considers Long.MIN_VALUE as a cancel indication (no addition then).
- * @param requested the AtomicLong holding the current requested value
+ * Atomically adds the positive value n to the requested value in the {@link AtomicLong} and
+ * caps the result at {@link Long#MAX_VALUE} and returns the previous value and
+ * considers {@link Long#MIN_VALUE} as a cancel indication (no addition then).
+ * @param requested the {@code AtomicLong} holding the current requested value
* @param n the value to add, must be positive (not verified)
* @return the original value before the add
*/
- public static long addCancel(AtomicLong requested, long n) {
+ public static long addCancel(@NonNull AtomicLong requested, long n) {
for (;;) {
long r = requested.get();
if (r == Long.MIN_VALUE) {
@@ -100,12 +101,12 @@ public static long addCancel(AtomicLong requested, long n) {
}
/**
- * Atomically subtract the given number (positive, not validated) from the target field unless it contains Long.MAX_VALUE.
+ * Atomically subtract the given number (positive, not validated) from the target field unless it contains {@link Long#MAX_VALUE}.
* @param requested the target field holding the current requested amount
* @param n the produced element count, positive (not validated)
* @return the new amount
*/
- public static long produced(AtomicLong requested, long n) {
+ public static long produced(@NonNull AtomicLong requested, long n) {
for (;;) {
long current = requested.get();
if (current == Long.MAX_VALUE) {
@@ -124,12 +125,12 @@ public static long produced(AtomicLong requested, long n) {
/**
* Atomically subtract the given number (positive, not validated) from the target field if
- * it doesn't contain Long.MIN_VALUE (indicating some cancelled state) or Long.MAX_VALUE (unbounded mode).
+ * it doesn't contain {@link Long#MIN_VALUE} (indicating some cancelled state) or {@link Long#MAX_VALUE} (unbounded mode).
* @param requested the target field holding the current requested amount
* @param n the produced element count, positive (not validated)
* @return the new amount
*/
- public static long producedCancel(AtomicLong requested, long n) {
+ public static long producedCancel(@NonNull AtomicLong requested, long n) {
for (;;) {
long current = requested.get();
if (current == Long.MIN_VALUE) {
diff --git a/src/main/java/io/reactivex/rxjava3/internal/util/QueueDrainHelper.java b/src/main/java/io/reactivex/rxjava3/internal/util/QueueDrainHelper.java
index 41906b9cf7..9cdfab07ce 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/util/QueueDrainHelper.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/util/QueueDrainHelper.java
@@ -218,7 +218,7 @@ public static SimpleQueue createQueue(int capacityHint) {
}
/**
- * Requests Long.MAX_VALUE if prefetch is negative or the exact
+ * Requests {@link Long#MAX_VALUE} if prefetch is negative or the exact
* amount if prefetch is positive.
* @param s the Subscription to request from
* @param prefetch the prefetch value
@@ -383,7 +383,7 @@ static boolean postCompleteDrain(long n,
* in completed mode, requests no-longer reach the upstream but help in draining the queue.
*
* The algorithm utilizes the most significant bit (bit 63) of a long value (AtomicLong) since
- * request amount only goes up to Long.MAX_VALUE (bits 0-62) and negative values aren't
+ * request amount only goes up to {@link Long#MAX_VALUE} (bits 0-62) and negative values aren't
* allowed.
*
* @param the value type emitted
diff --git a/src/main/java/io/reactivex/rxjava3/observers/BaseTestConsumer.java b/src/main/java/io/reactivex/rxjava3/observers/BaseTestConsumer.java
index f8a0356a75..458325c280 100644
--- a/src/main/java/io/reactivex/rxjava3/observers/BaseTestConsumer.java
+++ b/src/main/java/io/reactivex/rxjava3/observers/BaseTestConsumer.java
@@ -16,15 +16,17 @@
import java.util.*;
import java.util.concurrent.*;
+import io.reactivex.rxjava3.annotations.*;
import io.reactivex.rxjava3.exceptions.CompositeException;
import io.reactivex.rxjava3.functions.Predicate;
-import io.reactivex.rxjava3.internal.functions.*;
+import io.reactivex.rxjava3.internal.functions.Functions;
import io.reactivex.rxjava3.internal.util.*;
/**
- * Base class with shared infrastructure to support TestSubscriber and TestObserver.
+ * Base class with shared infrastructure to support
+ * {@link io.reactivex.rxjava3.subscribers.TestSubscriber TestSubscriber} and {@link TestObserver}.
* @param the value type consumed
- * @param the subclass of this BaseTestConsumer
+ * @param the subclass of this {@code BaseTestConsumer}
*/
public abstract class BaseTestConsumer> {
/** The latch that indicates an onError or onComplete has been called. */
@@ -47,26 +49,26 @@ public abstract class BaseTestConsumer> {
protected CharSequence tag;
/**
- * Indicates that one of the awaitX method has timed out.
+ * Indicates that one of the {@code awaitX} method has timed out.
* @since 2.0.7
*/
protected boolean timeout;
public BaseTestConsumer() {
- this.values = new VolatileSizeArrayList();
- this.errors = new VolatileSizeArrayList();
+ this.values = new VolatileSizeArrayList<>();
+ this.errors = new VolatileSizeArrayList<>();
this.done = new CountDownLatch(1);
}
/**
- * Returns a shared list of received onNext values.
+ * Returns a shared list of received {@code onNext} values or the single {@code onSuccess} value.
*
* Note that accessing the items via certain methods of the {@link List}
* interface while the upstream is still actively emitting
* more items may result in a {@code ConcurrentModificationException}.
*
* The {@link List#size()} method will return the number of items
- * already received by this TestObserver/TestSubscriber in a thread-safe
+ * already received by this {@code TestObserver}/{@code TestSubscriber} in a thread-safe
* manner that can be read via {@link List#get(int)}) method
* (index range of 0 to {@code List.size() - 1}).
*
@@ -76,6 +78,7 @@ public BaseTestConsumer() {
* {@code ConcurrentModificationException}.
* @return a list of received onNext values
*/
+ @NonNull
public final List values() {
return values;
}
@@ -89,7 +92,8 @@ public final List values() {
* @param message the message to use
* @return AssertionError the prepared AssertionError instance
*/
- protected final AssertionError fail(String message) {
+ @NonNull
+ protected final AssertionError fail(@NonNull String message) {
StringBuilder b = new StringBuilder(64 + message.length());
b.append(message);
@@ -131,11 +135,12 @@ protected final AssertionError fail(String message) {
}
/**
- * Awaits until this TestObserver/TestSubscriber receives an onError or onComplete events.
+ * Awaits until this {@code TestObserver}/{@code TestSubscriber} receives an {@code onError} or {@code onComplete} events.
* @return this
* @throws InterruptedException if the current thread is interrupted while waiting
*/
@SuppressWarnings("unchecked")
+ @NonNull
public final U await() throws InterruptedException {
if (done.getCount() == 0) {
return (U)this;
@@ -146,14 +151,14 @@ public final U await() throws InterruptedException {
}
/**
- * Awaits the specified amount of time or until this TestObserver/TestSubscriber
- * receives an onError or onComplete events, whichever happens first.
+ * Awaits the specified amount of time or until this {@code TestObserver}/{@code TestSubscriber}
+ * receives an {@code onError} or {@code onComplete} events, whichever happens first.
* @param time the waiting time
* @param unit the time unit of the waiting time
- * @return true if the TestObserver/TestSubscriber terminated, false if timeout happened
+ * @return true if the {@code TestObserver}/{@code TestSubscriber} terminated, false if timeout happened
* @throws InterruptedException if the current thread is interrupted while waiting
*/
- public final boolean await(long time, TimeUnit unit) throws InterruptedException {
+ public final boolean await(long time, @NonNull TimeUnit unit) throws InterruptedException {
boolean d = done.getCount() == 0 || (done.await(time, unit));
timeout = !d;
return d;
@@ -162,10 +167,11 @@ public final boolean await(long time, TimeUnit unit) throws InterruptedException
// assertion methods
/**
- * Assert that this TestObserver/TestSubscriber received exactly one onComplete event.
+ * Assert that this {@code TestObserver}/{@code TestSubscriber} received exactly one {@code onComplete} event.
* @return this
*/
@SuppressWarnings("unchecked")
+ @NonNull
public final U assertComplete() {
long c = completions;
if (c == 0) {
@@ -178,10 +184,11 @@ public final U assertComplete() {
}
/**
- * Assert that this TestObserver/TestSubscriber has not received any onComplete event.
+ * Assert that this {@code TestObserver}/{@code TestSubscriber} has not received an {@code onComplete} event.
* @return this
*/
@SuppressWarnings("unchecked")
+ @NonNull
public final U assertNotComplete() {
long c = completions;
if (c == 1) {
@@ -194,10 +201,11 @@ public final U assertNotComplete() {
}
/**
- * Assert that this TestObserver/TestSubscriber has not received any onError event.
+ * Assert that this {@code TestObserver}/{@code TestSubscriber} has not received an {@code onError} event.
* @return this
*/
@SuppressWarnings("unchecked")
+ @NonNull
public final U assertNoErrors() {
int s = errors.size();
if (s != 0) {
@@ -207,9 +215,9 @@ public final U assertNoErrors() {
}
/**
- * Assert that this TestObserver/TestSubscriber received exactly the specified onError event value.
+ * Assert that this {@code TestObserver}/{@code TestSubscriber} received exactly the specified {@code onError} event value.
*
- *
The comparison is performed via Objects.equals(); since most exceptions don't
+ *
The comparison is performed via {@link Objects#equals(Object, Object)}; since most exceptions don't
* implement equals(), this assertion may fail. Use the {@link #assertError(Class)}
* overload to test against the class of an error instead of an instance of an error
* or {@link #assertError(Predicate)} to test with different condition.
@@ -218,31 +226,34 @@ public final U assertNoErrors() {
* @see #assertError(Class)
* @see #assertError(Predicate)
*/
- public final U assertError(Throwable error) {
+ @NonNull
+ public final U assertError(@NonNull Throwable error) {
return assertError(Functions.equalsWith(error));
}
/**
- * Asserts that this TestObserver/TestSubscriber received exactly one onError event which is an
- * instance of the specified errorClass class.
- * @param errorClass the error class to expect
+ * Asserts that this {@code TestObserver}/{@code TestSubscriber} received exactly one {@code onError} event which is an
+ * instance of the specified {@code errorClass} {@link Class}.
+ * @param errorClass the error {@code Class} to expect
* @return this
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
- public final U assertError(Class extends Throwable> errorClass) {
+ @NonNull
+ public final U assertError(@NonNull Class extends Throwable> errorClass) {
return (U)assertError((Predicate)Functions.isInstanceOf(errorClass));
}
/**
- * Asserts that this TestObserver/TestSubscriber received exactly one onError event for which
- * the provided predicate returns true.
+ * Asserts that this {@code TestObserver}/{@code TestSubscriber} received exactly one {@code onError} event for which
+ * the provided predicate returns {@code true}.
* @param errorPredicate
- * the predicate that receives the error Throwable
- * and should return true for expected errors.
+ * the predicate that receives the error {@link Throwable}
+ * and should return {@code true} for expected errors.
* @return this
*/
@SuppressWarnings("unchecked")
- public final U assertError(Predicate errorPredicate) {
+ @NonNull
+ public final U assertError(@NonNull Predicate errorPredicate) {
int s = errors.size();
if (s == 0) {
throw fail("No errors");
@@ -272,13 +283,14 @@ public final U assertError(Predicate errorPredicate) {
}
/**
- * Assert that this TestObserver/TestSubscriber received exactly one onNext value which is equal to
- * the given value with respect to Objects.equals.
+ * Assert that this {@code TestObserver}/{@code TestSubscriber} received exactly one {@code onNext} value which is equal to
+ * the given value with respect to {@link Objects#equals(Object, Object)}.
* @param value the value to expect
* @return this
*/
@SuppressWarnings("unchecked")
- public final U assertValue(T value) {
+ @NonNull
+ public final U assertValue(@NonNull T value) {
int s = values.size();
if (s != 1) {
throw fail("expected: " + valueAndClass(value) + " but was: " + values);
@@ -291,15 +303,16 @@ public final U assertValue(T value) {
}
/**
- * Asserts that this TestObserver/TestSubscriber received exactly one onNext value for which
- * the provided predicate returns true.
+ * Asserts that this {@code TestObserver}/{@code TestSubscriber} received exactly one {@code onNext} value for which
+ * the provided predicate returns {@code true}.
* @param valuePredicate
- * the predicate that receives the onNext value
- * and should return true for the expected value.
+ * the predicate that receives the {@code onNext} value
+ * and should return {@code true} for the expected value.
* @return this
*/
@SuppressWarnings("unchecked")
- public final U assertValue(Predicate valuePredicate) {
+ @NonNull
+ public final U assertValue(@NonNull Predicate valuePredicate) {
assertValueAt(0, valuePredicate);
if (values.size() > 1) {
@@ -310,8 +323,8 @@ public final U assertValue(Predicate valuePredicate) {
}
/**
- * Asserts that this TestObserver/TestSubscriber received an onNext value at the given index
- * which is equal to the given value with respect to null-safe Object.equals.
+ * Asserts that this {@code TestObserver}/{@code TestSubscriber} received an {@code onNext} value at the given index
+ * which is equal to the given value with respect to {@code null}-safe {@link Objects#equals(Object, Object)}.
*
History: 2.1.3 - experimental
* @param index the position to assert on
* @param value the value to expect
@@ -319,7 +332,8 @@ public final U assertValue(Predicate valuePredicate) {
* @since 2.2
*/
@SuppressWarnings("unchecked")
- public final U assertValueAt(int index, T value) {
+ @NonNull
+ public final U assertValueAt(int index, @NonNull T value) {
int s = values.size();
if (s == 0) {
throw fail("No values");
@@ -337,16 +351,17 @@ public final U assertValueAt(int index, T value) {
}
/**
- * Asserts that this TestObserver/TestSubscriber received an onNext value at the given index
- * for the provided predicate returns true.
+ * Asserts that this {@code TestObserver}/{@code TestSubscriber} received an {@code onNext} value at the given index
+ * for the provided predicate returns {@code true}.
* @param index the position to assert on
* @param valuePredicate
- * the predicate that receives the onNext value
- * and should return true for the expected value.
+ * the predicate that receives the {@code onNext} value
+ * and should return {@code true} for the expected value.
* @return this
*/
@SuppressWarnings("unchecked")
- public final U assertValueAt(int index, Predicate valuePredicate) {
+ @NonNull
+ public final U assertValueAt(int index, @NonNull Predicate valuePredicate) {
int s = values.size();
if (s == 0) {
throw fail("No values");
@@ -373,11 +388,12 @@ public final U assertValueAt(int index, Predicate valuePredicate) {
}
/**
- * Appends the class name to a non-null value.
+ * Appends the class name to a non-{@code null} value or returns {@code "null"}.
* @param o the object
* @return the string representation
*/
- public static String valueAndClass(Object o) {
+ @NonNull
+ public static String valueAndClass(@Nullable Object o) {
if (o != null) {
return o + " (class: " + o.getClass().getSimpleName() + ")";
}
@@ -385,11 +401,12 @@ public static String valueAndClass(Object o) {
}
/**
- * Assert that this TestObserver/TestSubscriber received the specified number onNext events.
- * @param count the expected number of onNext events
+ * Assert that this {@code TestObserver}/{@code TestSubscriber} received the specified number {@code onNext} events.
+ * @param count the expected number of {@code onNext} events
* @return this
*/
@SuppressWarnings("unchecked")
+ @NonNull
public final U assertValueCount(int count) {
int s = values.size();
if (s != count) {
@@ -399,20 +416,23 @@ public final U assertValueCount(int count) {
}
/**
- * Assert that this TestObserver/TestSubscriber has not received any onNext events.
+ * Assert that this {@code TestObserver}/{@code TestSubscriber} has not received any {@code onNext} events.
* @return this
*/
+ @NonNull
public final U assertNoValues() {
return assertValueCount(0);
}
/**
- * Assert that the TestObserver/TestSubscriber received only the specified values in the specified order.
+ * Assert that the {@code TestObserver}/{@code TestSubscriber} received only the specified values in the specified order.
* @param values the values expected
* @return this
*/
@SuppressWarnings("unchecked")
- public final U assertValues(T... values) {
+ @SafeVarargs
+ @NonNull
+ public final U assertValues(@NonNull T... values) {
int s = this.values.size();
if (s != values.length) {
throw fail("Value count differs; expected: " + values.length + " " + Arrays.toString(values)
@@ -429,14 +449,15 @@ public final U assertValues(T... values) {
}
/**
- * Assert that the TestObserver/TestSubscriber received only the specified values in the specified order without terminating.
+ * Assert that the {@code TestObserver}/{@code TestSubscriber} received only the specified values in the specified order without terminating.
*
History: 2.1.4 - experimental
* @param values the values expected
* @return this
* @since 2.2
*/
@SafeVarargs
- public final U assertValuesOnly(T... values) {
+ @NonNull
+ public final U assertValuesOnly(@NonNull T... values) {
return assertSubscribed()
.assertValues(values)
.assertNoErrors()
@@ -444,12 +465,13 @@ public final U assertValuesOnly(T... values) {
}
/**
- * Assert that the TestObserver/TestSubscriber received only the specified sequence of values in the same order.
+ * Assert that the {@code TestObserver}/{@code TestSubscriber} received only the specified sequence of values in the same order.
* @param sequence the sequence of expected values in order
* @return this
*/
@SuppressWarnings("unchecked")
- public final U assertValueSequence(Iterable extends T> sequence) {
+ @NonNull
+ public final U assertValueSequence(@NonNull Iterable extends T> sequence) {
int i = 0;
Iterator actualIterator = values.iterator();
Iterator extends T> expectedIterator = sequence.iterator();
@@ -482,20 +504,22 @@ public final U assertValueSequence(Iterable extends T> sequence) {
}
/**
- * Assert that the onSubscribe method was called exactly once.
+ * Assert that the {@code onSubscribe} method was called exactly once.
* @return this
*/
+ @NonNull
protected abstract U assertSubscribed();
/**
- * Assert that the upstream signalled the specified values in order and
+ * Assert that the upstream signaled the specified values in order and
* completed normally.
* @param values the expected values, asserted in order
* @return this
* @see #assertFailure(Class, Object...)
*/
@SafeVarargs
- public final U assertResult(T... values) {
+ @NonNull
+ public final U assertResult(@NonNull T... values) {
return assertSubscribed()
.assertValues(values)
.assertNoErrors()
@@ -503,14 +527,15 @@ public final U assertResult(T... values) {
}
/**
- * Assert that the upstream signalled the specified values in order
- * and then failed with a specific class or subclass of Throwable.
- * @param error the expected exception (parent) class
+ * Assert that the upstream signaled the specified values in order
+ * and then failed with a specific class or subclass of {@link Throwable}.
+ * @param error the expected exception (parent) {@link Class}
* @param values the expected values, asserted in order
* @return this
*/
@SafeVarargs
- public final U assertFailure(Class extends Throwable> error, T... values) {
+ @NonNull
+ public final U assertFailure(@NonNull Class extends Throwable> error, @NonNull T... values) {
return assertSubscribed()
.assertValues(values)
.assertError(error)
@@ -519,14 +544,15 @@ public final U assertFailure(Class extends Throwable> error, T... values) {
/**
* Awaits until the internal latch is counted down.
- *
If the wait times out or gets interrupted, the TestObserver/TestSubscriber is cancelled.
+ *
If the wait times out or gets interrupted, the {@code TestObserver}/{@code TestSubscriber} is cancelled.
* @param time the waiting time
* @param unit the time unit of the waiting time
* @return this
- * @throws RuntimeException wrapping an InterruptedException if the wait is interrupted
+ * @throws RuntimeException wrapping an {@link InterruptedException} if the wait is interrupted
*/
@SuppressWarnings("unchecked")
- public final U awaitDone(long time, TimeUnit unit) {
+ @NonNull
+ public final U awaitDone(long time, @NonNull TimeUnit unit) {
try {
if (!done.await(time, unit)) {
timeout = true;
@@ -540,9 +566,12 @@ public final U awaitDone(long time, TimeUnit unit) {
}
/**
- * Assert that the TestObserver/TestSubscriber has received a Disposable but no other events.
+ * Assert that the {@code TestObserver}/{@code TestSubscriber} has received a
+ * {@link io.reactivex.rxjava3.disposables.Disposable Disposable}/{@link org.reactivestreams.Subscription Subscription}
+ * via {@code onSubscribe} but no other events.
* @return this
*/
+ @NonNull
public final U assertEmpty() {
return assertSubscribed()
.assertNoValues()
@@ -554,18 +583,19 @@ public final U assertEmpty() {
* Set the tag displayed along with an assertion failure's
* other state information.
*
History: 2.0.7 - experimental
- * @param tag the string to display (null won't print any tag)
+ * @param tag the string to display ({@code null} won't print any tag)
* @return this
* @since 2.1
*/
@SuppressWarnings("unchecked")
- public final U withTag(CharSequence tag) {
+ @NonNull
+ public final U withTag(@Nullable CharSequence tag) {
this.tag = tag;
return (U)this;
}
/**
- * Await until the TestObserver/TestSubscriber receives the given
+ * Await until the {@code TestObserver}/{@code TestSubscriber} receives the given
* number of items or terminates by sleeping 10 milliseconds at a time
* up to 5000 milliseconds of timeout.
*
History: 2.0.7 - experimental
@@ -574,6 +604,7 @@ public final U withTag(CharSequence tag) {
* @since 2.1
*/
@SuppressWarnings("unchecked")
+ @NonNull
public final U awaitCount(int atLeast) {
long start = System.currentTimeMillis();
long timeoutMillis = 5000;
diff --git a/src/main/java/io/reactivex/rxjava3/observers/DisposableCompletableObserver.java b/src/main/java/io/reactivex/rxjava3/observers/DisposableCompletableObserver.java
index 25926663d5..940654b622 100644
--- a/src/main/java/io/reactivex/rxjava3/observers/DisposableCompletableObserver.java
+++ b/src/main/java/io/reactivex/rxjava3/observers/DisposableCompletableObserver.java
@@ -53,7 +53,7 @@
*/
public abstract class DisposableCompletableObserver implements CompletableObserver, Disposable {
- final AtomicReference upstream = new AtomicReference();
+ final AtomicReference upstream = new AtomicReference<>();
@Override
public final void onSubscribe(@NonNull Disposable d) {
@@ -63,7 +63,7 @@ public final void onSubscribe(@NonNull Disposable d) {
}
/**
- * Called once the single upstream Disposable is set via onSubscribe.
+ * Called once the single upstream {@link Disposable} is set via {@link #onSubscribe(Disposable)}.
*/
protected void onStart() {
}
diff --git a/src/main/java/io/reactivex/rxjava3/observers/DisposableMaybeObserver.java b/src/main/java/io/reactivex/rxjava3/observers/DisposableMaybeObserver.java
index 66ac6a8620..092994e818 100644
--- a/src/main/java/io/reactivex/rxjava3/observers/DisposableMaybeObserver.java
+++ b/src/main/java/io/reactivex/rxjava3/observers/DisposableMaybeObserver.java
@@ -22,7 +22,7 @@
import io.reactivex.rxjava3.internal.util.EndConsumerHelper;
/**
- * An abstract {@link MaybeObserver} that allows asynchronous cancellation by implementing Disposable.
+ * An abstract {@link MaybeObserver} that allows asynchronous cancellation by implementing {@link Disposable}.
*
*
All pre-implemented final methods are thread-safe.
*
@@ -62,7 +62,7 @@
*/
public abstract class DisposableMaybeObserver implements MaybeObserver, Disposable {
- final AtomicReference upstream = new AtomicReference();
+ final AtomicReference upstream = new AtomicReference<>();
@Override
public final void onSubscribe(@NonNull Disposable d) {
@@ -72,7 +72,7 @@ public final void onSubscribe(@NonNull Disposable d) {
}
/**
- * Called once the single upstream Disposable is set via onSubscribe.
+ * Called once the single upstream {@link Disposable} is set via {@link #onSubscribe(Disposable)}.
*/
protected void onStart() {
}
diff --git a/src/main/java/io/reactivex/rxjava3/observers/DisposableObserver.java b/src/main/java/io/reactivex/rxjava3/observers/DisposableObserver.java
index a15587317c..ff2eba7758 100644
--- a/src/main/java/io/reactivex/rxjava3/observers/DisposableObserver.java
+++ b/src/main/java/io/reactivex/rxjava3/observers/DisposableObserver.java
@@ -22,7 +22,7 @@
import io.reactivex.rxjava3.internal.util.EndConsumerHelper;
/**
- * An abstract {@link Observer} that allows asynchronous cancellation by implementing Disposable.
+ * An abstract {@link Observer} that allows asynchronous cancellation by implementing {@link Disposable}.
*
*
All pre-implemented final methods are thread-safe.
*
@@ -66,7 +66,7 @@
*/
public abstract class DisposableObserver implements Observer, Disposable {
- final AtomicReference upstream = new AtomicReference();
+ final AtomicReference upstream = new AtomicReference<>();
@Override
public final void onSubscribe(@NonNull Disposable d) {
diff --git a/src/main/java/io/reactivex/rxjava3/observers/DisposableSingleObserver.java b/src/main/java/io/reactivex/rxjava3/observers/DisposableSingleObserver.java
index 6d8ce73fb0..56e5c543b7 100644
--- a/src/main/java/io/reactivex/rxjava3/observers/DisposableSingleObserver.java
+++ b/src/main/java/io/reactivex/rxjava3/observers/DisposableSingleObserver.java
@@ -22,7 +22,7 @@
import io.reactivex.rxjava3.internal.util.EndConsumerHelper;
/**
- * An abstract {@link SingleObserver} that allows asynchronous cancellation by implementing Disposable.
+ * An abstract {@link SingleObserver} that allows asynchronous cancellation by implementing {@link Disposable}.
*
*
All pre-implemented final methods are thread-safe.
*
@@ -55,7 +55,7 @@
*/
public abstract class DisposableSingleObserver implements SingleObserver, Disposable {
- final AtomicReference upstream = new AtomicReference();
+ final AtomicReference upstream = new AtomicReference<>();
@Override
public final void onSubscribe(@NonNull Disposable d) {
@@ -65,7 +65,7 @@ public final void onSubscribe(@NonNull Disposable d) {
}
/**
- * Called once the single upstream Disposable is set via onSubscribe.
+ * Called once the single upstream {@link Disposable} is set via {@link #onSubscribe(Disposable)}.
*/
protected void onStart() {
}
diff --git a/src/main/java/io/reactivex/rxjava3/observers/LambdaConsumerIntrospection.java b/src/main/java/io/reactivex/rxjava3/observers/LambdaConsumerIntrospection.java
index 5d74ac5e8c..e3f5c46cfe 100644
--- a/src/main/java/io/reactivex/rxjava3/observers/LambdaConsumerIntrospection.java
+++ b/src/main/java/io/reactivex/rxjava3/observers/LambdaConsumerIntrospection.java
@@ -24,8 +24,8 @@
public interface LambdaConsumerIntrospection {
/**
- * Returns true or false if a custom onError consumer has been provided.
- * @return {@code true} if a custom onError consumer implementation was supplied. Returns {@code false} if the
+ * Returns {@code true} or {@code false} if a custom {@code onError} consumer has been provided.
+ * @return {@code true} if a custom {@code onError} consumer implementation was supplied. Returns {@code false} if the
* implementation is missing an error consumer and thus using a throwing default implementation.
*/
boolean hasCustomOnError();
diff --git a/src/main/java/io/reactivex/rxjava3/observers/ResourceCompletableObserver.java b/src/main/java/io/reactivex/rxjava3/observers/ResourceCompletableObserver.java
index b57d77f6f5..df05fe0169 100644
--- a/src/main/java/io/reactivex/rxjava3/observers/ResourceCompletableObserver.java
+++ b/src/main/java/io/reactivex/rxjava3/observers/ResourceCompletableObserver.java
@@ -74,17 +74,17 @@
*/
public abstract class ResourceCompletableObserver implements CompletableObserver, Disposable {
/** The active subscription. */
- private final AtomicReference upstream = new AtomicReference();
+ private final AtomicReference upstream = new AtomicReference<>();
/** The resource composite, can never be null. */
private final ListCompositeDisposable resources = new ListCompositeDisposable();
/**
- * Adds a resource to this ResourceObserver.
+ * Adds a resource to this {@code ResourceCompletableObserver}.
*
* @param resource the resource to add
*
- * @throws NullPointerException if resource is null
+ * @throws NullPointerException if resource is {@code null}
*/
public final void add(@NonNull Disposable resource) {
Objects.requireNonNull(resource, "resource is null");
@@ -99,7 +99,7 @@ public final void onSubscribe(@NonNull Disposable d) {
}
/**
- * Called once the upstream sets a Subscription on this ResourceObserver.
+ * Called once the upstream sets a {@link Disposable} on this {@code ResourceCompletableObserver}.
*
*
You can perform initialization at this moment. The default
* implementation does nothing.
@@ -109,10 +109,10 @@ protected void onStart() {
/**
* Cancels the main disposable (if any) and disposes the resources associated with
- * this ResourceObserver (if any).
+ * this {@code ResourceCompletableObserver} (if any).
*
- *
This method can be called before the upstream calls onSubscribe at which
- * case the main Disposable will be immediately disposed.
+ *
This method can be called before the upstream calls {@link #onSubscribe(Disposable)} at which
+ * case the main {@link Disposable} will be immediately disposed.
*/
@Override
public final void dispose() {
@@ -122,8 +122,8 @@ public final void dispose() {
}
/**
- * Returns true if this ResourceObserver has been disposed/cancelled.
- * @return true if this ResourceObserver has been disposed/cancelled
+ * Returns true if this {@code ResourceCompletableObserver} has been disposed/cancelled.
+ * @return true if this {@code ResourceCompletableObserver} has been disposed/cancelled
*/
@Override
public final boolean isDisposed() {
diff --git a/src/main/java/io/reactivex/rxjava3/observers/ResourceMaybeObserver.java b/src/main/java/io/reactivex/rxjava3/observers/ResourceMaybeObserver.java
index cb42a70fbd..22345e60ce 100644
--- a/src/main/java/io/reactivex/rxjava3/observers/ResourceMaybeObserver.java
+++ b/src/main/java/io/reactivex/rxjava3/observers/ResourceMaybeObserver.java
@@ -84,17 +84,17 @@
*/
public abstract class ResourceMaybeObserver implements MaybeObserver, Disposable {
/** The active subscription. */
- private final AtomicReference upstream = new AtomicReference();
+ private final AtomicReference upstream = new AtomicReference<>();
/** The resource composite, can never be null. */
private final ListCompositeDisposable resources = new ListCompositeDisposable();
/**
- * Adds a resource to this ResourceObserver.
+ * Adds a resource to this {@code ResourceMaybeObserver}.
*
* @param resource the resource to add
*
- * @throws NullPointerException if resource is null
+ * @throws NullPointerException if resource is {@code null}
*/
public final void add(@NonNull Disposable resource) {
Objects.requireNonNull(resource, "resource is null");
@@ -109,7 +109,7 @@ public final void onSubscribe(@NonNull Disposable d) {
}
/**
- * Called once the upstream sets a Subscription on this ResourceObserver.
+ * Called once the upstream sets a {@link Disposable} on this {@code ResourceMaybeObserver}.
*
*
You can perform initialization at this moment. The default
* implementation does nothing.
@@ -119,10 +119,10 @@ protected void onStart() {
/**
* Cancels the main disposable (if any) and disposes the resources associated with
- * this ResourceObserver (if any).
+ * this {@code ResourceMaybeObserver} (if any).
*
- *
This method can be called before the upstream calls onSubscribe at which
- * case the main Disposable will be immediately disposed.
+ *
This method can be called before the upstream calls {@link #onSubscribe(Disposable)} at which
+ * case the main {@link Disposable} will be immediately disposed.
*/
@Override
public final void dispose() {
@@ -132,8 +132,8 @@ public final void dispose() {
}
/**
- * Returns true if this ResourceObserver has been disposed/cancelled.
- * @return true if this ResourceObserver has been disposed/cancelled
+ * Returns true if this {@code ResourceMaybeObserver} has been disposed/cancelled.
+ * @return true if this {@code ResourceMaybeObserver} has been disposed/cancelled
*/
@Override
public final boolean isDisposed() {
diff --git a/src/main/java/io/reactivex/rxjava3/observers/ResourceObserver.java b/src/main/java/io/reactivex/rxjava3/observers/ResourceObserver.java
index b6d77a0d5f..f0810f4c5e 100644
--- a/src/main/java/io/reactivex/rxjava3/observers/ResourceObserver.java
+++ b/src/main/java/io/reactivex/rxjava3/observers/ResourceObserver.java
@@ -82,17 +82,17 @@
*/
public abstract class ResourceObserver implements Observer, Disposable {
/** The active subscription. */
- private final AtomicReference upstream = new AtomicReference();
+ private final AtomicReference upstream = new AtomicReference<>();
/** The resource composite, can never be null. */
private final ListCompositeDisposable resources = new ListCompositeDisposable();
/**
- * Adds a resource to this ResourceObserver.
+ * Adds a resource to this {@code ResourceObserver}.
*
* @param resource the resource to add
*
- * @throws NullPointerException if resource is null
+ * @throws NullPointerException if resource is {@code null}
*/
public final void add(@NonNull Disposable resource) {
Objects.requireNonNull(resource, "resource is null");
@@ -107,7 +107,7 @@ public final void onSubscribe(Disposable d) {
}
/**
- * Called once the upstream sets a Subscription on this ResourceObserver.
+ * Called once the upstream sets a {@link Disposable} on this {@code ResourceObserver}.
*
*
You can perform initialization at this moment. The default
* implementation does nothing.
@@ -117,10 +117,10 @@ protected void onStart() {
/**
* Cancels the main disposable (if any) and disposes the resources associated with
- * this ResourceObserver (if any).
+ * this {@code ResourceObserver} (if any).
*
- *
This method can be called before the upstream calls onSubscribe at which
- * case the main Disposable will be immediately disposed.
+ *
This method can be called before the upstream calls {@link #onSubscribe(Disposable)} at which
+ * case the main {@link Disposable} will be immediately disposed.
*/
@Override
public final void dispose() {
@@ -130,8 +130,8 @@ public final void dispose() {
}
/**
- * Returns true if this ResourceObserver has been disposed/cancelled.
- * @return true if this ResourceObserver has been disposed/cancelled
+ * Returns true if this {@code ResourceObserver} has been disposed/cancelled.
+ * @return true if this {@code ResourceObserver} has been disposed/cancelled
*/
@Override
public final boolean isDisposed() {
diff --git a/src/main/java/io/reactivex/rxjava3/observers/ResourceSingleObserver.java b/src/main/java/io/reactivex/rxjava3/observers/ResourceSingleObserver.java
index af723bf789..efebfaa85c 100644
--- a/src/main/java/io/reactivex/rxjava3/observers/ResourceSingleObserver.java
+++ b/src/main/java/io/reactivex/rxjava3/observers/ResourceSingleObserver.java
@@ -77,17 +77,17 @@
*/
public abstract class ResourceSingleObserver implements SingleObserver, Disposable {
/** The active subscription. */
- private final AtomicReference upstream = new AtomicReference();
+ private final AtomicReference upstream = new AtomicReference<>();
/** The resource composite, can never be null. */
private final ListCompositeDisposable resources = new ListCompositeDisposable();
/**
- * Adds a resource to this ResourceObserver.
+ * Adds a resource to this {@code ResourceSingleObserver}.
*
* @param resource the resource to add
*
- * @throws NullPointerException if resource is null
+ * @throws NullPointerException if resource is {@code null}
*/
public final void add(@NonNull Disposable resource) {
Objects.requireNonNull(resource, "resource is null");
@@ -102,7 +102,7 @@ public final void onSubscribe(@NonNull Disposable d) {
}
/**
- * Called once the upstream sets a Subscription on this ResourceObserver.
+ * Called once the upstream sets a {@link Disposable} on this {@code ResourceSingleObserver}.
*
*
You can perform initialization at this moment. The default
* implementation does nothing.
@@ -112,10 +112,10 @@ protected void onStart() {
/**
* Cancels the main disposable (if any) and disposes the resources associated with
- * this ResourceObserver (if any).
+ * this {@code ResourceSingleObserver} (if any).
*
- *
This method can be called before the upstream calls onSubscribe at which
- * case the main Disposable will be immediately disposed.
+ *
This method can be called before the upstream calls {@link #onSubscribe(Disposable)} at which
+ * case the main {@link Disposable} will be immediately disposed.
*/
@Override
public final void dispose() {
@@ -125,8 +125,8 @@ public final void dispose() {
}
/**
- * Returns true if this ResourceObserver has been disposed/cancelled.
- * @return true if this ResourceObserver has been disposed/cancelled
+ * Returns true if this {@code ResourceSingleObserver} has been disposed/cancelled.
+ * @return true if this {@code ResourceSingleObserver} has been disposed/cancelled
*/
@Override
public final boolean isDisposed() {
diff --git a/src/main/java/io/reactivex/rxjava3/observers/SafeObserver.java b/src/main/java/io/reactivex/rxjava3/observers/SafeObserver.java
index f7612378e2..c3935e79dc 100644
--- a/src/main/java/io/reactivex/rxjava3/observers/SafeObserver.java
+++ b/src/main/java/io/reactivex/rxjava3/observers/SafeObserver.java
@@ -21,7 +21,7 @@
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
/**
- * Wraps another Subscriber and ensures all onXXX methods conform the protocol
+ * Wraps another {@link Observer} and ensures all {@code onXXX} methods conform the protocol
* (except the requirement for serialized access).
*
* @param the value type
@@ -35,8 +35,8 @@ public final class SafeObserver implements Observer, Disposable {
boolean done;
/**
- * Constructs a SafeObserver by wrapping the given actual Observer.
- * @param downstream the actual Observer to wrap, not null (not validated)
+ * Constructs a {@code SafeObserver} by wrapping the given actual {@link Observer}.
+ * @param downstream the actual {@code Observer} to wrap, not {@code null} (not validated)
*/
public SafeObserver(@NonNull Observer super T> downstream) {
this.downstream = downstream;
diff --git a/src/main/java/io/reactivex/rxjava3/observers/SerializedObserver.java b/src/main/java/io/reactivex/rxjava3/observers/SerializedObserver.java
index a0ffe75cce..e0d32fbd7a 100644
--- a/src/main/java/io/reactivex/rxjava3/observers/SerializedObserver.java
+++ b/src/main/java/io/reactivex/rxjava3/observers/SerializedObserver.java
@@ -20,13 +20,14 @@
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
/**
- * Serializes access to the onNext, onError and onComplete methods of another Observer.
+ * Serializes access to the {@link Observer#onNext(Object)}, {@link Observer#onError(Throwable)} and
+ * {@link Observer#onComplete()} methods of another {@link Observer}.
*
*
Note that {@link #onSubscribe(Disposable)} is not serialized in respect of the other methods so
- * make sure the {@code onSubscribe()} is called with a non-null {@code Disposable}
+ * make sure the {@code onSubscribe()} is called with a non-null {@link Disposable}
* before any of the other methods are called.
*
- *
The implementation assumes that the actual Observer's methods don't throw.
+ *
The implementation assumes that the actual {@code Observer}'s methods don't throw.
*
* @param the value type
*/
@@ -44,19 +45,19 @@ public final class SerializedObserver implements Observer, Disposable {
volatile boolean done;
/**
- * Construct a SerializedObserver by wrapping the given actual Observer.
- * @param downstream the actual Observer, not null (not verified)
+ * Construct a {@code SerializedObserver} by wrapping the given actual {@link Observer}.
+ * @param downstream the actual {@code Observer}, not {@code null} (not verified)
*/
public SerializedObserver(@NonNull Observer super T> downstream) {
this(downstream, false);
}
/**
- * Construct a SerializedObserver by wrapping the given actual Observer and
+ * Construct a SerializedObserver by wrapping the given actual {@link Observer} and
* optionally delaying the errors till all regular values have been emitted
* from the internal buffer.
- * @param actual the actual Observer, not null (not verified)
- * @param delayError if true, errors are emitted after regular values have been emitted
+ * @param actual the actual {@code Observer}, not {@code null} (not verified)
+ * @param delayError if {@code true}, errors are emitted after regular values have been emitted
*/
public SerializedObserver(@NonNull Observer super T> actual, boolean delayError) {
this.downstream = actual;
@@ -100,7 +101,7 @@ public void onNext(@NonNull T t) {
if (emitting) {
AppendOnlyLinkedArrayList q = queue;
if (q == null) {
- q = new AppendOnlyLinkedArrayList(QUEUE_LINK_SIZE);
+ q = new AppendOnlyLinkedArrayList<>(QUEUE_LINK_SIZE);
queue = q;
}
q.add(NotificationLite.next(t));
@@ -129,7 +130,7 @@ public void onError(@NonNull Throwable t) {
done = true;
AppendOnlyLinkedArrayList q = queue;
if (q == null) {
- q = new AppendOnlyLinkedArrayList(QUEUE_LINK_SIZE);
+ q = new AppendOnlyLinkedArrayList<>(QUEUE_LINK_SIZE);
queue = q;
}
Object err = NotificationLite.error(t);
@@ -167,7 +168,7 @@ public void onComplete() {
if (emitting) {
AppendOnlyLinkedArrayList q = queue;
if (q == null) {
- q = new AppendOnlyLinkedArrayList(QUEUE_LINK_SIZE);
+ q = new AppendOnlyLinkedArrayList<>(QUEUE_LINK_SIZE);
queue = q;
}
q.add(NotificationLite.complete());
diff --git a/src/main/java/io/reactivex/rxjava3/observers/TestObserver.java b/src/main/java/io/reactivex/rxjava3/observers/TestObserver.java
index eae4fe113e..44b129fdab 100644
--- a/src/main/java/io/reactivex/rxjava3/observers/TestObserver.java
+++ b/src/main/java/io/reactivex/rxjava3/observers/TestObserver.java
@@ -14,19 +14,24 @@
import java.util.concurrent.atomic.AtomicReference;
+import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
/**
- * An Observer that records events and allows making assertions about them.
+ * An {@link Observer}, {@link MaybeObserver}, {@link SingleObserver} and
+ * {@link CompletableObserver} composite that can record events from
+ * {@link Observable}s, {@link Maybe}s, {@link Single}s and {@link Completable}s
+ * and allows making assertions about them.
*
- *
You can override the onSubscribe, onNext, onError, onComplete, onSuccess and
- * cancel methods but not the others (this is by design).
+ *
You can override the {@link #onSubscribe(Disposable)}, {@link #onNext(Object)}, {@link #onError(Throwable)},
+ * {@link #onComplete()} and {@link #onSuccess(Object)} methods but not the others (this is by design).
*
- *
The TestObserver implements Disposable for convenience where dispose calls cancel.
+ *
The {@code TestObserver} implements {@link Disposable} for convenience where dispose calls cancel.
*
* @param the value type
+ * @see io.reactivex.rxjava3.subscribers.TestSubscriber
*/
public class TestObserver
extends BaseTestConsumer>
@@ -35,25 +40,27 @@ public class TestObserver
private final Observer super T> downstream;
/** Holds the current subscription if any. */
- private final AtomicReference upstream = new AtomicReference();
+ private final AtomicReference upstream = new AtomicReference<>();
/**
- * Constructs a non-forwarding TestObserver.
+ * Constructs a non-forwarding {@code TestObserver}.
* @param the value type received
- * @return the new TestObserver instance
+ * @return the new {@code TestObserver} instance
*/
+ @NonNull
public static TestObserver create() {
- return new TestObserver();
+ return new TestObserver<>();
}
/**
- * Constructs a forwarding TestObserver.
+ * Constructs a forwarding {@code TestObserver}.
* @param the value type received
- * @param delegate the actual Observer to forward events to
- * @return the new TestObserver instance
+ * @param delegate the actual {@link Observer} to forward events to
+ * @return the new {@code TestObserver} instance
*/
- public static TestObserver create(Observer super T> delegate) {
- return new TestObserver(delegate);
+ @NonNull
+ public static TestObserver create(@NonNull Observer super T> delegate) {
+ return new TestObserver<>(delegate);
}
/**
@@ -64,15 +71,15 @@ public TestObserver() {
}
/**
- * Constructs a forwarding TestObserver.
- * @param downstream the actual Observer to forward events to
+ * Constructs a forwarding {@code TestObserver}.
+ * @param downstream the actual {@link Observer} to forward events to
*/
- public TestObserver(Observer super T> downstream) {
+ public TestObserver(@NonNull Observer super T> downstream) {
this.downstream = downstream;
}
@Override
- public void onSubscribe(Disposable d) {
+ public void onSubscribe(@NonNull Disposable d) {
lastThread = Thread.currentThread();
if (d == null) {
@@ -91,7 +98,7 @@ public void onSubscribe(Disposable d) {
}
@Override
- public void onNext(T t) {
+ public void onNext(@NonNull T t) {
if (!checkSubscriptionOnce) {
checkSubscriptionOnce = true;
if (upstream.get() == null) {
@@ -111,7 +118,7 @@ public void onNext(T t) {
}
@Override
- public void onError(Throwable t) {
+ public void onError(@NonNull Throwable t) {
if (!checkSubscriptionOnce) {
checkSubscriptionOnce = true;
if (upstream.get() == null) {
@@ -164,18 +171,19 @@ public final boolean isDisposed() {
// state retrieval methods
/**
- * Returns true if this TestObserver received a subscription.
- * @return true if this TestObserver received a subscription
+ * Returns true if this {@code TestObserver} received a subscription.
+ * @return true if this {@code TestObserver} received a subscription
*/
public final boolean hasSubscription() {
return upstream.get() != null;
}
/**
- * Assert that the onSubscribe method was called exactly once.
- * @return this;
+ * Assert that the {@link #onSubscribe(Disposable)} method was called exactly once.
+ * @return this
*/
@Override
+ @NonNull
protected final TestObserver assertSubscribed() {
if (upstream.get() == null) {
throw fail("Not subscribed!");
@@ -184,7 +192,7 @@ protected final TestObserver assertSubscribed() {
}
@Override
- public void onSuccess(T value) {
+ public void onSuccess(@NonNull T value) {
onNext(value);
onComplete();
}
diff --git a/src/main/java/io/reactivex/rxjava3/processors/AsyncProcessor.java b/src/main/java/io/reactivex/rxjava3/processors/AsyncProcessor.java
index 9a75a1f4ca..9491773080 100644
--- a/src/main/java/io/reactivex/rxjava3/processors/AsyncProcessor.java
+++ b/src/main/java/io/reactivex/rxjava3/processors/AsyncProcessor.java
@@ -68,7 +68,7 @@
*
The {@code AsyncProcessor} honors the backpressure of the downstream {@code Subscriber}s and won't emit
* its single value to a particular {@code Subscriber} until that {@code Subscriber} has requested an item.
* When the {@code AsyncProcessor} is subscribed to a {@link io.reactivex.rxjava3.core.Flowable}, the processor consumes this
- * {@code Flowable} in an unbounded manner (requesting `Long.MAX_VALUE`) as only the very last upstream item is
+ * {@code Flowable} in an unbounded manner (requesting {@link Long#MAX_VALUE}) as only the very last upstream item is
* retained by it.
*
*
Scheduler:
diff --git a/src/main/java/io/reactivex/rxjava3/processors/BehaviorProcessor.java b/src/main/java/io/reactivex/rxjava3/processors/BehaviorProcessor.java
index eaf47ecae3..4b85ff5ab0 100644
--- a/src/main/java/io/reactivex/rxjava3/processors/BehaviorProcessor.java
+++ b/src/main/java/io/reactivex/rxjava3/processors/BehaviorProcessor.java
@@ -108,7 +108,7 @@
* that returns true if any of the {@code Subscriber}s is not ready to receive {@code onNext} events. If
* there are no {@code Subscriber}s to the processor, {@code offer()} always succeeds.
* If the {@code BehaviorProcessor} is (optionally) subscribed to another {@code Publisher}, this upstream
- * {@code Publisher} is consumed in an unbounded fashion (requesting {@code Long.MAX_VALUE}).
+ * {@code Publisher} is consumed in an unbounded fashion (requesting {@link Long#MAX_VALUE}).
*
Scheduler:
*
{@code BehaviorProcessor} does not operate by default on a particular {@link io.reactivex.rxjava3.core.Scheduler} and
* the {@code Subscriber}s get notified on the thread the respective {@code onXXX} methods were invoked.
@@ -229,7 +229,7 @@ public static BehaviorProcessor createDefault(T defaultValue) {
/**
* Constructs a BehaviorProcessor with the given initial value.
* @param defaultValue the initial value, not null (verified)
- * @throws NullPointerException if {@code defaultValue} is null
+ * @throws NullPointerException if {@code defaultValue} is {@code null}
* @since 2.0
*/
BehaviorProcessor(T defaultValue) {
diff --git a/src/main/java/io/reactivex/rxjava3/processors/PublishProcessor.java b/src/main/java/io/reactivex/rxjava3/processors/PublishProcessor.java
index 2d44f60f05..4dc9bc9371 100644
--- a/src/main/java/io/reactivex/rxjava3/processors/PublishProcessor.java
+++ b/src/main/java/io/reactivex/rxjava3/processors/PublishProcessor.java
@@ -71,7 +71,7 @@
*
*
Backpressure:
*
The processor does not coordinate backpressure for its subscribers and implements a weaker {@code onSubscribe} which
- * calls requests Long.MAX_VALUE from the incoming Subscriptions. This makes it possible to subscribe the {@code PublishProcessor}
+ * calls requests {@link Long#MAX_VALUE} from the incoming Subscriptions. This makes it possible to subscribe the {@code PublishProcessor}
* to multiple sources (note on serialization though) unlike the standard {@code Subscriber} contract. Child subscribers, however, are not overflown but receive an
* {@link IllegalStateException} in case their requested amount is zero.
* This {@code ReplayProcessor} respects the individual backpressure behavior of its {@code Subscriber}s but
* does not coordinate their request amounts towards the upstream (because there might not be any) and
- * consumes the upstream in an unbounded manner (requesting {@code Long.MAX_VALUE}).
+ * consumes the upstream in an unbounded manner (requesting {@link Long#MAX_VALUE}).
* Note that {@code Subscriber}s receive a continuous sequence of values after they subscribed even
* if an individual item gets delayed due to backpressure.
* Due to concurrency requirements, a size-bounded {@code ReplayProcessor} may hold strong references to more source
@@ -104,7 +104,7 @@
*
Backpressure:
*
This {@code ReplayProcessor} respects the individual backpressure behavior of its {@code Subscriber}s but
* does not coordinate their request amounts towards the upstream (because there might not be any) and
- * consumes the upstream in an unbounded manner (requesting {@code Long.MAX_VALUE}).
+ * consumes the upstream in an unbounded manner (requesting {@link Long#MAX_VALUE}).
* Note that {@code Subscriber}s receive a continuous sequence of values after they subscribed even
* if an individual item gets delayed due to backpressure.
*
Scheduler:
diff --git a/src/main/java/io/reactivex/rxjava3/processors/UnicastProcessor.java b/src/main/java/io/reactivex/rxjava3/processors/UnicastProcessor.java
index 4a1862a95f..ee93a5ac9c 100644
--- a/src/main/java/io/reactivex/rxjava3/processors/UnicastProcessor.java
+++ b/src/main/java/io/reactivex/rxjava3/processors/UnicastProcessor.java
@@ -67,7 +67,7 @@
* {@link NullPointerException} being thrown and the processor's state is not changed.
*
* Since a {@code UnicastProcessor} is a {@link io.reactivex.rxjava3.core.Flowable} as well as a {@link FlowableProcessor}, it
- * honors the downstream backpressure but consumes an upstream source in an unbounded manner (requesting {@code Long.MAX_VALUE}).
+ * honors the downstream backpressure but consumes an upstream source in an unbounded manner (requesting {@link Long#MAX_VALUE}).
*
* When this {@code UnicastProcessor} is terminated via {@link #onError(Throwable)} the current or late single {@code Subscriber}
* may receive the {@code Throwable} before any available items could be emitted. To make sure an {@code onError} event is delivered
@@ -91,7 +91,7 @@
*
*
Backpressure:
*
{@code UnicastProcessor} honors the downstream backpressure but consumes an upstream source
- * (if any) in an unbounded manner (requesting {@code Long.MAX_VALUE}).
+ * (if any) in an unbounded manner (requesting {@link Long#MAX_VALUE}).
*
Scheduler:
*
{@code UnicastProcessor} does not operate by default on a particular {@link io.reactivex.rxjava3.core.Scheduler} and
* the single {@code Subscriber} gets notified on the thread the respective {@code onXXX} methods were invoked.
diff --git a/src/main/java/io/reactivex/rxjava3/schedulers/Timed.java b/src/main/java/io/reactivex/rxjava3/schedulers/Timed.java
index 845273ff22..814b223dcf 100644
--- a/src/main/java/io/reactivex/rxjava3/schedulers/Timed.java
+++ b/src/main/java/io/reactivex/rxjava3/schedulers/Timed.java
@@ -29,11 +29,11 @@ public final class Timed {
final TimeUnit unit;
/**
- * Constructs a Timed instance with the given value and time information.
+ * Constructs a {@code Timed} instance with the given value and time information.
* @param value the value to hold
* @param time the time to hold
* @param unit the time unit, not null
- * @throws NullPointerException if unit is null
+ * @throws NullPointerException if unit is {@code null}
*/
public Timed(@NonNull T value, long time, @NonNull TimeUnit unit) {
this.value = value;
@@ -69,7 +69,7 @@ public long time() {
/**
* Returns the contained time value in the time unit specified.
- * @param unit the time unt
+ * @param unit the time unit
* @return the converted time
*/
public long time(@NonNull TimeUnit unit) {
diff --git a/src/main/java/io/reactivex/rxjava3/subjects/BehaviorSubject.java b/src/main/java/io/reactivex/rxjava3/subjects/BehaviorSubject.java
index 68efd5bf8f..b3b212eea4 100644
--- a/src/main/java/io/reactivex/rxjava3/subjects/BehaviorSubject.java
+++ b/src/main/java/io/reactivex/rxjava3/subjects/BehaviorSubject.java
@@ -213,7 +213,7 @@ public static BehaviorSubject createDefault(T defaultValue) {
/**
* Constructs a BehaviorSubject with the given initial value.
* @param defaultValue the initial value, not null (verified)
- * @throws NullPointerException if {@code defaultValue} is null
+ * @throws NullPointerException if {@code defaultValue} is {@code null}
* @since 2.0
*/
BehaviorSubject(T defaultValue) {
diff --git a/src/main/java/io/reactivex/rxjava3/subscribers/DefaultSubscriber.java b/src/main/java/io/reactivex/rxjava3/subscribers/DefaultSubscriber.java
index 3e674a16b0..6a936eb8d9 100644
--- a/src/main/java/io/reactivex/rxjava3/subscribers/DefaultSubscriber.java
+++ b/src/main/java/io/reactivex/rxjava3/subscribers/DefaultSubscriber.java
@@ -27,7 +27,7 @@
*
*
All pre-implemented final methods are thread-safe.
*
- *
The default {@link #onStart()} requests Long.MAX_VALUE by default. Override
+ *
The default {@link #onStart()} requests {@link Long#MAX_VALUE} by default. Override
* the method to request a custom positive amount.
*
*
Note that calling {@link #request(long)} from {@link #onStart()} may trigger
@@ -85,7 +85,7 @@ public final void onSubscribe(Subscription s) {
}
/**
- * Requests from the upstream Subscription.
+ * Requests from the upstream {@link Subscription}.
* @param n the request amount, positive
*/
protected final void request(long n) {
@@ -96,7 +96,7 @@ protected final void request(long n) {
}
/**
- * Cancels the upstream's Subscription.
+ * Cancels the upstream's {@link Subscription}.
*/
protected final void cancel() {
Subscription s = this.upstream;
diff --git a/src/main/java/io/reactivex/rxjava3/subscribers/DisposableSubscriber.java b/src/main/java/io/reactivex/rxjava3/subscribers/DisposableSubscriber.java
index 2c6c126f8d..566b0097f7 100644
--- a/src/main/java/io/reactivex/rxjava3/subscribers/DisposableSubscriber.java
+++ b/src/main/java/io/reactivex/rxjava3/subscribers/DisposableSubscriber.java
@@ -23,11 +23,11 @@
import io.reactivex.rxjava3.internal.util.EndConsumerHelper;
/**
- * An abstract Subscriber that allows asynchronous, external cancellation by implementing Disposable.
+ * An abstract Subscriber that allows asynchronous, external cancellation by implementing {@link Disposable}.
*
*
All pre-implemented final methods are thread-safe.
*
- *
The default {@link #onStart()} requests Long.MAX_VALUE by default. Override
+ *
The default {@link #onStart()} requests {@link Long#MAX_VALUE} by default. Override
* the method to request a custom positive amount. Use the protected {@link #request(long)}
* to request more items and {@link #cancel()} to cancel the sequence from within an
* {@code onNext} implementation.
@@ -74,7 +74,7 @@
* @param the received value type.
*/
public abstract class DisposableSubscriber implements FlowableSubscriber, Disposable {
- final AtomicReference upstream = new AtomicReference();
+ final AtomicReference upstream = new AtomicReference<>();
@Override
public final void onSubscribe(Subscription s) {
@@ -84,18 +84,18 @@ public final void onSubscribe(Subscription s) {
}
/**
- * Called once the single upstream Subscription is set via onSubscribe.
+ * Called once the single upstream {@link Subscription} is set via {@link #onSubscribe(Subscription)}.
*/
protected void onStart() {
upstream.get().request(Long.MAX_VALUE);
}
/**
- * Requests the specified amount from the upstream if its Subscription is set via
+ * Requests the specified amount from the upstream if its {@link Subscription} is set via
* onSubscribe already.
- *
Note that calling this method before a Subscription is set via onSubscribe
- * leads to NullPointerException and meant to be called from inside onStart or
- * onNext.
+ *
Note that calling this method before a {@link Subscription} is set via {@link #onSubscribe(Subscription)}
+ * leads to {@link NullPointerException} and meant to be called from inside {@link #onStart()} or
+ * {@link #onNext(Object)}.
* @param n the request amount, positive
*/
protected final void request(long n) {
@@ -103,8 +103,8 @@ protected final void request(long n) {
}
/**
- * Cancels the Subscription set via onSubscribe or makes sure a
- * Subscription set asynchronously (later) is cancelled immediately.
+ * Cancels the Subscription set via {@link #onSubscribe(Subscription)} or makes sure a
+ * {@link Subscription} set asynchronously (later) is cancelled immediately.
*
This method is thread-safe and can be exposed as a public API.
*/
protected final void cancel() {
diff --git a/src/main/java/io/reactivex/rxjava3/subscribers/ResourceSubscriber.java b/src/main/java/io/reactivex/rxjava3/subscribers/ResourceSubscriber.java
index 1990939311..7d9411048a 100644
--- a/src/main/java/io/reactivex/rxjava3/subscribers/ResourceSubscriber.java
+++ b/src/main/java/io/reactivex/rxjava3/subscribers/ResourceSubscriber.java
@@ -40,7 +40,7 @@
* {@code ResourceSubscriber} and then add/remove resources to/from the {@code CompositeDisposable}
* freely.
*
- *
The default {@link #onStart()} requests Long.MAX_VALUE by default. Override
+ *
The default {@link #onStart()} requests {@link Long#MAX_VALUE} by default. Override
* the method to request a custom positive amount. Use the protected {@link #request(long)}
* to request more items and {@link #dispose()} to cancel the sequence from within an
* {@code onNext} implementation.
@@ -94,7 +94,7 @@
*/
public abstract class ResourceSubscriber implements FlowableSubscriber, Disposable {
/** The active subscription. */
- private final AtomicReference upstream = new AtomicReference();
+ private final AtomicReference upstream = new AtomicReference<>();
/** The resource composite, can never be null. */
private final ListCompositeDisposable resources = new ListCompositeDisposable();
@@ -103,11 +103,11 @@ public abstract class ResourceSubscriber implements FlowableSubscriber, Di
private final AtomicLong missedRequested = new AtomicLong();
/**
- * Adds a resource to this AsyncObserver.
+ * Adds a resource to this {@code ResourceSubscriber}.
*
* @param resource the resource to add
*
- * @throws NullPointerException if resource is null
+ * @throws NullPointerException if {@code resource} is {@code null}
*/
public final void add(Disposable resource) {
Objects.requireNonNull(resource, "resource is null");
@@ -126,10 +126,10 @@ public final void onSubscribe(Subscription s) {
}
/**
- * Called once the upstream sets a Subscription on this AsyncObserver.
+ * Called once the upstream sets a {@link Subscription} on this {@code ResourceSubscriber}.
*
*
You can perform initialization at this moment. The default
- * implementation requests Long.MAX_VALUE from upstream.
+ * implementation requests {@link Long#MAX_VALUE} from upstream.
*/
protected void onStart() {
request(Long.MAX_VALUE);
@@ -138,7 +138,7 @@ protected void onStart() {
/**
* Request the specified amount of elements from upstream.
*
- *
This method can be called before the upstream calls onSubscribe().
+ *
This method can be called before the upstream calls {@link #onSubscribe(Subscription)}.
* When the subscription happens, all missed requests are requested.
*
* @param n the request amount, must be positive
@@ -149,10 +149,10 @@ protected final void request(long n) {
/**
* Cancels the subscription (if any) and disposes the resources associated with
- * this AsyncObserver (if any).
+ * this {@code ResourceSubscriber} (if any).
*
- *
This method can be called before the upstream calls onSubscribe at which
- * case the Subscription will be immediately cancelled.
+ *
This method can be called before the upstream calls {@link #onSubscribe(Subscription)} at which
+ * case the {@link Subscription} will be immediately cancelled.
*/
@Override
public final void dispose() {
@@ -162,8 +162,8 @@ public final void dispose() {
}
/**
- * Returns true if this AsyncObserver has been disposed/cancelled.
- * @return true if this AsyncObserver has been disposed/cancelled
+ * Returns true if this {@code ResourceSubscriber} has been disposed/cancelled.
+ * @return true if this {@code ResourceSubscriber} has been disposed/cancelled
*/
@Override
public final boolean isDisposed() {
diff --git a/src/main/java/io/reactivex/rxjava3/subscribers/SafeSubscriber.java b/src/main/java/io/reactivex/rxjava3/subscribers/SafeSubscriber.java
index 096a99e623..0db19796fa 100644
--- a/src/main/java/io/reactivex/rxjava3/subscribers/SafeSubscriber.java
+++ b/src/main/java/io/reactivex/rxjava3/subscribers/SafeSubscriber.java
@@ -14,6 +14,7 @@
import org.reactivestreams.*;
+import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.FlowableSubscriber;
import io.reactivex.rxjava3.exceptions.*;
import io.reactivex.rxjava3.internal.subscriptions.*;
@@ -21,7 +22,7 @@
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
/**
- * Wraps another Subscriber and ensures all onXXX methods conform the protocol
+ * Wraps another {@link Subscriber} and ensures all {@code onXXX} methods conform the protocol
* (except the requirement for serialized access).
*
* @param the value type
@@ -35,15 +36,15 @@ public final class SafeSubscriber implements FlowableSubscriber, Subscript
boolean done;
/**
- * Constructs a SafeSubscriber by wrapping the given actual Subscriber.
- * @param downstream the actual Subscriber to wrap, not null (not validated)
+ * Constructs a {@code SafeSubscriber} by wrapping the given actual {@link Subscriber}.
+ * @param downstream the actual {@code Subscriber} to wrap, not {@code null} (not validated)
*/
- public SafeSubscriber(Subscriber super T> downstream) {
+ public SafeSubscriber(@NonNull Subscriber super T> downstream) {
this.downstream = downstream;
}
@Override
- public void onSubscribe(Subscription s) {
+ public void onSubscribe(@NonNull Subscription s) {
if (SubscriptionHelper.validate(this.upstream, s)) {
this.upstream = s;
try {
@@ -65,7 +66,7 @@ public void onSubscribe(Subscription s) {
}
@Override
- public void onNext(T t) {
+ public void onNext(@NonNull T t) {
if (done) {
return;
}
@@ -124,7 +125,7 @@ void onNextNoSubscription() {
}
@Override
- public void onError(Throwable t) {
+ public void onError(@NonNull Throwable t) {
if (done) {
RxJavaPlugins.onError(t);
return;
diff --git a/src/main/java/io/reactivex/rxjava3/subscribers/SerializedSubscriber.java b/src/main/java/io/reactivex/rxjava3/subscribers/SerializedSubscriber.java
index 92ecbbe6d0..9698d97e21 100644
--- a/src/main/java/io/reactivex/rxjava3/subscribers/SerializedSubscriber.java
+++ b/src/main/java/io/reactivex/rxjava3/subscribers/SerializedSubscriber.java
@@ -14,19 +14,21 @@
import org.reactivestreams.*;
+import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.FlowableSubscriber;
import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper;
import io.reactivex.rxjava3.internal.util.*;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
/**
- * Serializes access to the onNext, onError and onComplete methods of another Subscriber.
+ * Serializes access to the {@link Subscriber#onNext(Object)}, {@link Subscriber#onError(Throwable)} and
+ * {@link Subscriber#onComplete()} methods of another {@link Subscriber}.
*
*
Note that {@link #onSubscribe(Subscription)} is not serialized in respect of the other methods so
- * make sure the {@code onSubscribe} is called with a non-null {@code Subscription}
+ * make sure the {@code onSubscribe} is called with a non-{@code null} {@link Subscription}
* before any of the other methods are called.
*
- *
The implementation assumes that the actual Subscriber's methods don't throw.
+ *
The implementation assumes that the actual {@code Subscriber}'s methods don't throw.
*
* @param the value type
*/
@@ -44,27 +46,27 @@ public final class SerializedSubscriber implements FlowableSubscriber, Sub
volatile boolean done;
/**
- * Construct a SerializedSubscriber by wrapping the given actual Subscriber.
- * @param downstream the actual Subscriber, not null (not verified)
+ * Construct a {@code SerializedSubscriber} by wrapping the given actual {@link Subscriber}.
+ * @param downstream the actual {@code Subscriber}, not null (not verified)
*/
public SerializedSubscriber(Subscriber super T> downstream) {
this(downstream, false);
}
/**
- * Construct a SerializedSubscriber by wrapping the given actual Observer and
+ * Construct a {@code SerializedSubscriber} by wrapping the given actual {@link Subscriber} and
* optionally delaying the errors till all regular values have been emitted
* from the internal buffer.
- * @param actual the actual Subscriber, not null (not verified)
- * @param delayError if true, errors are emitted after regular values have been emitted
+ * @param actual the actual {@code Subscriber}, not {@code null} (not verified)
+ * @param delayError if {@code true}, errors are emitted after regular values have been emitted
*/
- public SerializedSubscriber(Subscriber super T> actual, boolean delayError) {
+ public SerializedSubscriber(@NonNull Subscriber super T> actual, boolean delayError) {
this.downstream = actual;
this.delayError = delayError;
}
@Override
- public void onSubscribe(Subscription s) {
+ public void onSubscribe(@NonNull Subscription s) {
if (SubscriptionHelper.validate(this.upstream, s)) {
this.upstream = s;
downstream.onSubscribe(this);
@@ -72,7 +74,7 @@ public void onSubscribe(Subscription s) {
}
@Override
- public void onNext(T t) {
+ public void onNext(@NonNull T t) {
if (done) {
return;
}
@@ -88,7 +90,7 @@ public void onNext(T t) {
if (emitting) {
AppendOnlyLinkedArrayList q = queue;
if (q == null) {
- q = new AppendOnlyLinkedArrayList(QUEUE_LINK_SIZE);
+ q = new AppendOnlyLinkedArrayList<>(QUEUE_LINK_SIZE);
queue = q;
}
q.add(NotificationLite.next(t));
@@ -117,7 +119,7 @@ public void onError(Throwable t) {
done = true;
AppendOnlyLinkedArrayList q = queue;
if (q == null) {
- q = new AppendOnlyLinkedArrayList(QUEUE_LINK_SIZE);
+ q = new AppendOnlyLinkedArrayList<>(QUEUE_LINK_SIZE);
queue = q;
}
Object err = NotificationLite.error(t);
@@ -155,7 +157,7 @@ public void onComplete() {
if (emitting) {
AppendOnlyLinkedArrayList q = queue;
if (q == null) {
- q = new AppendOnlyLinkedArrayList(QUEUE_LINK_SIZE);
+ q = new AppendOnlyLinkedArrayList<>(QUEUE_LINK_SIZE);
queue = q;
}
q.add(NotificationLite.complete());
diff --git a/src/main/java/io/reactivex/rxjava3/subscribers/TestSubscriber.java b/src/main/java/io/reactivex/rxjava3/subscribers/TestSubscriber.java
index 3607337b36..cbc9d10320 100644
--- a/src/main/java/io/reactivex/rxjava3/subscribers/TestSubscriber.java
+++ b/src/main/java/io/reactivex/rxjava3/subscribers/TestSubscriber.java
@@ -16,20 +16,19 @@
import org.reactivestreams.*;
+import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.FlowableSubscriber;
import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper;
import io.reactivex.rxjava3.observers.BaseTestConsumer;
/**
- * A subscriber that records events and allows making assertions about them.
+ * A {@link Subscriber} implementation that records events and allows making assertions about them.
*
- *
You can override the onSubscribe, onNext, onError, onComplete, request and
- * cancel methods but not the others (this is by design).
- *
- *
The TestSubscriber implements Disposable for convenience where dispose calls cancel.
+ *
You can override the {@link #onSubscribe(Subscription)}, {@link #onNext(Object)}, {@link #onError(Throwable)} and
+ * {@link #onComplete()} methods but not the others (this is by design).
*
*
When calling the default request method, you are requesting on behalf of the
- * wrapped actual subscriber.
+ * wrapped actual {@link Subscriber} if any.
*
* @param the value type
*/
@@ -49,78 +48,82 @@ public class TestSubscriber
private final AtomicLong missedRequested;
/**
- * Creates a TestSubscriber with Long.MAX_VALUE initial request.
+ * Creates a {@code TestSubscriber} with {@link Long#MAX_VALUE} initial request amount.
* @param the value type
- * @return the new TestSubscriber instance.
+ * @return the new {@code TestSubscriber} instance.
+ * @see #create(long)
*/
+ @NonNull
public static TestSubscriber create() {
- return new TestSubscriber();
+ return new TestSubscriber<>();
}
/**
- * Creates a TestSubscriber with the given initial request.
+ * Creates a {@code TestSubscriber} with the given initial request amount.
* @param the value type
* @param initialRequested the initial requested amount
- * @return the new TestSubscriber instance.
+ * @return the new {@code TestSubscriber} instance.
*/
+ @NonNull
public static TestSubscriber create(long initialRequested) {
- return new TestSubscriber(initialRequested);
+ return new TestSubscriber<>(initialRequested);
}
/**
- * Constructs a forwarding TestSubscriber.
+ * Constructs a forwarding {@code TestSubscriber}.
* @param the value type received
- * @param delegate the actual Subscriber to forward events to
+ * @param delegate the actual {@link Subscriber} to forward events to
* @return the new TestObserver instance
*/
- public static TestSubscriber create(Subscriber super T> delegate) {
- return new TestSubscriber(delegate);
+ public static TestSubscriber create(@NonNull Subscriber super T> delegate) {
+ return new TestSubscriber<>(delegate);
}
/**
- * Constructs a non-forwarding TestSubscriber with an initial request value of Long.MAX_VALUE.
+ * Constructs a non-forwarding {@code TestSubscriber} with an initial request value of {@link Long#MAX_VALUE}.
*/
public TestSubscriber() {
this(EmptySubscriber.INSTANCE, Long.MAX_VALUE);
}
/**
- * Constructs a non-forwarding TestSubscriber with the specified initial request value.
- *
The TestSubscriber doesn't validate the initialRequest value so one can
+ * Constructs a non-forwarding {@code TestSubscriber} with the specified initial request value.
+ *
The {@code TestSubscriber} doesn't validate the {@code initialRequest} amount so one can
* test sources with invalid values as well.
- * @param initialRequest the initial request value
+ * @param initialRequest the initial request amount
*/
public TestSubscriber(long initialRequest) {
this(EmptySubscriber.INSTANCE, initialRequest);
}
/**
- * Constructs a forwarding TestSubscriber but leaves the requesting to the wrapped subscriber.
- * @param downstream the actual Subscriber to forward events to
+ * Constructs a forwarding {@code TestSubscriber} but leaves the requesting to the wrapped {@link Subscriber}.
+ * @param downstream the actual {@code Subscriber} to forward events to
*/
- public TestSubscriber(Subscriber super T> downstream) {
+ public TestSubscriber(@NonNull Subscriber super T> downstream) {
this(downstream, Long.MAX_VALUE);
}
/**
- * Constructs a forwarding TestSubscriber with the specified initial request value.
- *
The TestSubscriber doesn't validate the initialRequest value so one can
+ * Constructs a forwarding {@code TestSubscriber} with the specified initial request amount
+ * and an actual {@link Subscriber} to forward events to.
+ *
The {@code TestSubscriber} doesn't validate the initialRequest value so one can
* test sources with invalid values as well.
- * @param actual the actual Subscriber to forward events to
- * @param initialRequest the initial request value
+ * @param actual the actual {@code Subscriber} to forward events to
+ * @param initialRequest the initial request amount
*/
- public TestSubscriber(Subscriber super T> actual, long initialRequest) {
+ public TestSubscriber(@NonNull Subscriber super T> actual, long initialRequest) {
super();
if (initialRequest < 0) {
throw new IllegalArgumentException("Negative initial request not allowed");
}
this.downstream = actual;
- this.upstream = new AtomicReference();
+ this.upstream = new AtomicReference<>();
this.missedRequested = new AtomicLong(initialRequest);
}
@Override
- public void onSubscribe(Subscription s) {
+ public void onSubscribe(@NonNull Subscription s) {
lastThread = Thread.currentThread();
if (s == null) {
@@ -153,7 +156,7 @@ protected void onStart() {
}
@Override
- public void onNext(T t) {
+ public void onNext(@NonNull T t) {
if (!checkSubscriptionOnce) {
checkSubscriptionOnce = true;
if (upstream.get() == null) {
@@ -172,7 +175,7 @@ public void onNext(T t) {
}
@Override
- public void onError(Throwable t) {
+ public void onError(@NonNull Throwable t) {
if (!checkSubscriptionOnce) {
checkSubscriptionOnce = true;
if (upstream.get() == null) {
@@ -225,8 +228,8 @@ public final void cancel() {
}
/**
- * Returns true if this TestSubscriber has been cancelled.
- * @return true if this TestSubscriber has been cancelled
+ * Returns true if this {@code TestSubscriber} has been cancelled.
+ * @return true if this {@code TestSubscriber} has been cancelled
*/
public final boolean isCancelled() {
return cancelled;
@@ -245,8 +248,8 @@ protected final boolean isDisposed() {
// state retrieval methods
/**
- * Returns true if this TestSubscriber received a subscription.
- * @return true if this TestSubscriber received a subscription
+ * Returns true if this {@code TestSubscriber} received a {@link Subscription} via {@link #onSubscribe(Subscription)}.
+ * @return true if this {@code TestSubscriber} received a {@link Subscription} via {@link #onSubscribe(Subscription)}
*/
public final boolean hasSubscription() {
return upstream.get() != null;
@@ -255,7 +258,7 @@ public final boolean hasSubscription() {
// assertion methods
/**
- * Assert that the onSubscribe method was called exactly once.
+ * Assert that the {@link #onSubscribe(Subscription)} method was called exactly once.
* @return this
*/
@Override
diff --git a/src/test/java/io/reactivex/rxjava3/flowable/FlowableSubscriberTest.java b/src/test/java/io/reactivex/rxjava3/flowable/FlowableSubscriberTest.java
index 32c954f42b..2a94f73b8e 100644
--- a/src/test/java/io/reactivex/rxjava3/flowable/FlowableSubscriberTest.java
+++ b/src/test/java/io/reactivex/rxjava3/flowable/FlowableSubscriberTest.java
@@ -83,7 +83,7 @@ public void cancel() {
}
@Test
- public void requestFromChainedOperator() throws Exception {
+ public void requestFromChainedOperator() throws Throwable {
TestSubscriber s = new TestSubscriber(10L);
FlowableOperator o = new FlowableOperator() {
@Override
@@ -135,7 +135,7 @@ public void cancel() {
}
@Test
- public void requestFromDecoupledOperator() throws Exception {
+ public void requestFromDecoupledOperator() throws Throwable {
TestSubscriber s = new TestSubscriber(0L);
FlowableOperator o = new FlowableOperator() {
@Override
@@ -188,7 +188,7 @@ public void cancel() {
}
@Test
- public void requestFromDecoupledOperatorThatRequestsN() throws Exception {
+ public void requestFromDecoupledOperatorThatRequestsN() throws Throwable {
TestSubscriber s = new TestSubscriber(10L);
final AtomicLong innerR = new AtomicLong();
FlowableOperator o = new FlowableOperator() {
diff --git a/src/test/java/io/reactivex/rxjava3/tck/BaseTck.java b/src/test/java/io/reactivex/rxjava3/tck/BaseTck.java
index d14aac229f..82c25e9591 100644
--- a/src/test/java/io/reactivex/rxjava3/tck/BaseTck.java
+++ b/src/test/java/io/reactivex/rxjava3/tck/BaseTck.java
@@ -51,8 +51,8 @@ public long maxElementsFromPublisher() {
/**
* Creates an Iterable with the specified number of elements or an infinite one if
- * elements > Integer.MAX_VALUE.
- * @param elements the number of elements to return, Integer.MAX_VALUE means an infinite sequence
+ * elements > {@link Integer#MAX_VALUE}.
+ * @param elements the number of elements to return, {@link Integer#MAX_VALUE} means an infinite sequence
* @return the Iterable
*/
protected Iterable iterate(long elements) {
diff --git a/src/test/java/io/reactivex/rxjava3/testsupport/TestHelper.java b/src/test/java/io/reactivex/rxjava3/testsupport/TestHelper.java
index 3faefd13a5..886728709d 100644
--- a/src/test/java/io/reactivex/rxjava3/testsupport/TestHelper.java
+++ b/src/test/java/io/reactivex/rxjava3/testsupport/TestHelper.java
@@ -69,7 +69,7 @@ public enum TestHelper {
public static final int RACE_LONG_LOOPS = 10000;
/**
- * Mocks a subscriber and prepares it to request Long.MAX_VALUE.
+ * Mocks a subscriber and prepares it to request {@link Long#MAX_VALUE}.
* @param the value type
* @return the mocked subscriber
*/
diff --git a/src/test/java/io/reactivex/rxjava3/testsupport/TestSubscriberEx.java b/src/test/java/io/reactivex/rxjava3/testsupport/TestSubscriberEx.java
index bdc1a6a1da..23297689b8 100644
--- a/src/test/java/io/reactivex/rxjava3/testsupport/TestSubscriberEx.java
+++ b/src/test/java/io/reactivex/rxjava3/testsupport/TestSubscriberEx.java
@@ -51,7 +51,7 @@ public class TestSubscriberEx
private QueueSubscription qs;
/**
- * Constructs a non-forwarding TestSubscriber with an initial request value of Long.MAX_VALUE.
+ * Constructs a non-forwarding TestSubscriber with an initial request value of {@link Long#MAX_VALUE}.
*/
public TestSubscriberEx() {
this(EmptySubscriber.INSTANCE, Long.MAX_VALUE);