From e1a480c6eb5a39d7c3848720e35f573794112b4a Mon Sep 17 00:00:00 2001
From: akarnokd <akarnokd@gmail.com>
Date: Thu, 23 Jan 2020 14:05:17 +0100
Subject: [PATCH 1/2] 3.x: Add Maybe/Single/Completable blockingSubscribe

---
 .../reactivex/rxjava3/core/Completable.java   | 100 ++++
 .../java/io/reactivex/rxjava3/core/Maybe.java | 131 ++++-
 .../io/reactivex/rxjava3/core/Single.java     | 100 ++++
 .../BlockingDisposableMultiObserver.java      | 154 ++++++
 .../observers/BlockingMultiObserver.java      |  38 ++
 .../CompletableBlockingSubscribeTest.java     | 331 ++++++++++++
 .../maybe/MaybeBlockingSubscribeTest.java     | 502 ++++++++++++++++++
 .../single/SingleBlockingSubscribeTest.java   | 343 ++++++++++++
 8 files changed, 1698 insertions(+), 1 deletion(-)
 create mode 100644 src/main/java/io/reactivex/rxjava3/internal/observers/BlockingDisposableMultiObserver.java
 create mode 100644 src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableBlockingSubscribeTest.java
 create mode 100644 src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeBlockingSubscribeTest.java
 create mode 100644 src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleBlockingSubscribeTest.java

diff --git a/src/main/java/io/reactivex/rxjava3/core/Completable.java b/src/main/java/io/reactivex/rxjava3/core/Completable.java
index 1266255315..4e4ecbe102 100644
--- a/src/main/java/io/reactivex/rxjava3/core/Completable.java
+++ b/src/main/java/io/reactivex/rxjava3/core/Completable.java
@@ -1310,6 +1310,106 @@ public final boolean blockingAwait(long timeout, @NonNull TimeUnit unit) {
         return observer.blockingAwait(timeout, unit);
     }
 
+    /**
+     * Subscribes to the current {@code Completable} and <em>blocks the current thread</em> until it terminates.
+     * <p>
+     * <img width="640" height="348" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.blockingSubscribe.png" alt="">
+     * <dl>
+     *  <dt><b>Scheduler:</b></dt>
+     *  <dd>{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
+     *  <dt><b>Error handling:</b></dt>
+     *  <dd>If the current {@code Completable} signals an error,
+     *  the {@link Throwable} is routed to the global error handler via {@link RxJavaPlugins#onError(Throwable)}.
+     *  If the current thread is interrupted, an {@link InterruptedException} is routed to the same global error handler.
+     *  </dd>
+     * </dl>
+     * @since 3.0.0
+     * @see #blockingSubscribe(Action)
+     * @see #blockingSubscribe(Action, Consumer)
+     */
+    @SchedulerSupport(SchedulerSupport.NONE)
+    public final void blockingSubscribe() {
+        blockingSubscribe(Functions.EMPTY_ACTION, Functions.ERROR_CONSUMER);
+    }
+
+    /**
+     * Subscribes to the current {@code Completable} and calls given {@code onComplete} callback on the <em>current thread</em>
+     * when it completes normally.
+     * <p>
+     * <img width="640" height="348" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.blockingSubscribe.a.png" alt="">
+     * <dl>
+     *  <dt><b>Scheduler:</b></dt>
+     *  <dd>{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
+     *  <dt><b>Error handling:</b></dt>
+     *  <dd>If either the current {@code Completable} signals an error or {@code onComplete} throws,
+     *  the respective {@link Throwable} is routed to the global error handler via {@link RxJavaPlugins#onError(Throwable)}.
+     *  If the current thread is interrupted, an {@link InterruptedException} is routed to the same global error handler.
+     *  </dd>
+     * </dl>
+     * @param onComplete the {@link Action} to call if the current {@code Completable} completes normally
+     * @throws NullPointerException if {@code onComplete} is {@code null}
+     * @since 3.0.0
+     * @see #blockingSubscribe(Action, Consumer)
+     */
+    @SchedulerSupport(SchedulerSupport.NONE)
+    public final void blockingSubscribe(@NonNull Action onComplete) {
+        blockingSubscribe(onComplete, Functions.ERROR_CONSUMER);
+    }
+
+    /**
+     * Subscribes to the current {@code Completable} and calls the appropriate callback on the <em>current thread</em>
+     * when it terminates.
+     * <p>
+     * <img width="640" height="348" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.blockingSubscribe.ac.png" alt="">
+     * <dl>
+     *  <dt><b>Scheduler:</b></dt>
+     *  <dd>{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
+     *  <dt><b>Error handling:</b></dt>
+     *  <dd>If either {@code onComplete} or {@code onError} throw, the {@link Throwable} is routed to the
+     *  global error handler via {@link RxJavaPlugins#onError(Throwable)}.
+     *  If the current thread is interrupted, the {@code onError} consumer is called with an {@link InterruptedException}.
+     *  </dd>
+     * </dl>
+     * @param onComplete the {@link Action} to call if the current {@code Completable} completes normally
+     * @param onError the {@link Consumer} to call if the current {@code Completable} signals an error
+     * @throws NullPointerException if {@code onComplete} or {@code onError} is {@code null}
+     * @since 3.0.0
+     */
+    @SchedulerSupport(SchedulerSupport.NONE)
+    public final void blockingSubscribe(@NonNull Action onComplete, @NonNull Consumer<? super Throwable> onError) {
+        Objects.requireNonNull(onComplete, "onComplete is null");
+        Objects.requireNonNull(onError, "onError is null");
+        BlockingMultiObserver<Void> observer = new BlockingMultiObserver<>();
+        subscribe(observer);
+        observer.blockingConsume(Functions.emptyConsumer(), onError, onComplete);
+    }
+
+    /**
+     * Subscribes to the current {@code Completable} and calls the appropriate {@link CompletableObserver} method on the <em>current thread</em>.
+     * <p>
+     * <img width="640" height="348" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.blockingSubscribe.o.png" alt="">
+     * <dl>
+     *  <dt><b>Scheduler:</b></dt>
+     *  <dd>{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
+     *  <dt><b>Error handling:</b></dt>
+     *  <dd>An {@code onError} signal is delivered to the {@link CompletableObserver#onError(Throwable)} method.
+     *  If any of the {@code CompletableObserver}'s methods throw, the {@link RuntimeException} is propagated to the caller of this method.
+     *  If the current thread is interrupted, an {@link InterruptedException} is delivered to {@code observer.onError}.
+     *  </dd>
+     * </dl>
+     * @param observer the {@code CompletableObserver} to call methods on the current thread
+     * @throws NullPointerException if {@code observer} is {@code null}
+     * @since 3.0.0
+     */
+    @SchedulerSupport(SchedulerSupport.NONE)
+    public final void blockingSubscribe(@NonNull CompletableObserver observer) {
+        Objects.requireNonNull(observer, "observer is null");
+        BlockingDisposableMultiObserver<Void> blockingObserver = new BlockingDisposableMultiObserver<>();
+        observer.onSubscribe(blockingObserver);
+        subscribe(blockingObserver);
+        blockingObserver.blockingConsume(observer);
+    }
+
     /**
      * Subscribes to this {@code Completable} only once, when the first {@link CompletableObserver}
      * subscribes to the result {@code Completable}, caches its terminal event
diff --git a/src/main/java/io/reactivex/rxjava3/core/Maybe.java b/src/main/java/io/reactivex/rxjava3/core/Maybe.java
index 4322afaa93..627435e9e9 100644
--- a/src/main/java/io/reactivex/rxjava3/core/Maybe.java
+++ b/src/main/java/io/reactivex/rxjava3/core/Maybe.java
@@ -26,7 +26,7 @@
 import io.reactivex.rxjava3.internal.functions.*;
 import io.reactivex.rxjava3.internal.fuseable.*;
 import io.reactivex.rxjava3.internal.jdk8.*;
-import io.reactivex.rxjava3.internal.observers.BlockingMultiObserver;
+import io.reactivex.rxjava3.internal.observers.*;
 import io.reactivex.rxjava3.internal.operators.flowable.*;
 import io.reactivex.rxjava3.internal.operators.maybe.*;
 import io.reactivex.rxjava3.internal.operators.mixed.*;
@@ -2475,6 +2475,135 @@ public final T blockingGet(@NonNull T defaultValue) {
         return observer.blockingGet(defaultValue);
     }
 
+    /**
+     * Subscribes to the current {@code Maybe} and <em>blocks the current thread</em> until it terminates.
+     * <p>
+     * <img width="640" height="348" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.blockingSubscribe.png" alt="">
+     * <dl>
+     *  <dt><b>Scheduler:</b></dt>
+     *  <dd>{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
+     *  <dt><b>Error handling:</b></dt>
+     *  <dd>If the current {@code Maybe} signals an error,
+     *  the {@link Throwable} is routed to the global error handler via {@link RxJavaPlugins#onError(Throwable)}.
+     *  If the current thread is interrupted, an {@link InterruptedException} is routed to the same global error handler.
+     *  </dd>
+     * </dl>
+     * @since 3.0.0
+     * @see #blockingSubscribe(Consumer)
+     * @see #blockingSubscribe(Consumer, Consumer)
+     * @see #blockingSubscribe(Consumer, Consumer, Action)
+     */
+    @SchedulerSupport(SchedulerSupport.NONE)
+    public final void blockingSubscribe() {
+        blockingSubscribe(Functions.emptyConsumer(), Functions.ERROR_CONSUMER, Functions.EMPTY_ACTION);
+    }
+
+    /**
+     * Subscribes to the current {@code Maybe} and calls given {@code onSuccess} callback on the <em>current thread</em>
+     * when it completes normally.
+     * <p>
+     * <img width="640" height="348" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.blockingSubscribe.c.png" alt="">
+     * <dl>
+     *  <dt><b>Scheduler:</b></dt>
+     *  <dd>{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
+     *  <dt><b>Error handling:</b></dt>
+     *  <dd>If either the current {@code Maybe} signals an error or {@code onSuccess} throws,
+     *  the respective {@link Throwable} is routed to the global error handler via {@link RxJavaPlugins#onError(Throwable)}.
+     *  If the current thread is interrupted, an {@link InterruptedException} is routed to the same global error handler.
+     *  </dd>
+     * </dl>
+     * @param onSuccess the {@link Consumer} to call if the current {@code Maybe} succeeds
+     * @throws NullPointerException if {@code onSuccess} is {@code null}
+     * @since 3.0.0
+     * @see #blockingSubscribe(Consumer, Consumer)
+     * @see #blockingSubscribe(Consumer, Consumer, Action)
+     */
+    @SchedulerSupport(SchedulerSupport.NONE)
+    public final void blockingSubscribe(@NonNull Consumer<? super T> onSuccess) {
+        blockingSubscribe(onSuccess, Functions.ERROR_CONSUMER, Functions.EMPTY_ACTION);
+    }
+
+    /**
+     * Subscribes to the current {@code Maybe} and calls the appropriate callback on the <em>current thread</em>
+     * when it terminates.
+     * <p>
+     * <img width="640" height="348" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.blockingSubscribe.cc.png" alt="">
+     * <dl>
+     *  <dt><b>Scheduler:</b></dt>
+     *  <dd>{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
+     *  <dt><b>Error handling:</b></dt>
+     *  <dd>If either {@code onSuccess} or {@code onError} throw, the {@link Throwable} is routed to the
+     *  global error handler via {@link RxJavaPlugins#onError(Throwable)}.
+     *  If the current thread is interrupted, the {@code onError} consumer is called with an {@link InterruptedException}.
+     *  </dd>
+     * </dl>
+     * @param onSuccess the {@link Consumer} to call if the current {@code Maybe} succeeds
+     * @param onError the {@code Consumer} to call if the current {@code Maybe} signals an error
+     * @throws NullPointerException if {@code onSuccess} or {@code onError} is {@code null}
+     * @since 3.0.0
+     * @see #blockingSubscribe(Consumer, Consumer, Action)
+     */
+    @SchedulerSupport(SchedulerSupport.NONE)
+    public final void blockingSubscribe(@NonNull Consumer<? super T> onSuccess, @NonNull Consumer<? super Throwable> onError) {
+        blockingSubscribe(onSuccess, onError, Functions.EMPTY_ACTION);
+    }
+
+    /**
+     * Subscribes to the current {@code Maybe} and calls the appropriate callback on the <em>current thread</em>
+     * when it terminates.
+     * <p>
+     * <img width="640" height="348" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.blockingSubscribe.cca.png" alt="">
+     * <dl>
+     *  <dt><b>Scheduler:</b></dt>
+     *  <dd>{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
+     *  <dt><b>Error handling:</b></dt>
+     *  <dd>If either {@code onSuccess}, {@code onError} or {@code onComplete} throw, the {@link Throwable} is routed to the
+     *  global error handler via {@link RxJavaPlugins#onError(Throwable)}.
+     *  If the current thread is interrupted, the {@code onError} consumer is called with an {@link InterruptedException}.
+     *  </dd>
+     * </dl>
+     * @param onSuccess the {@link Consumer} to call if the current {@code Maybe} succeeds
+     * @param onError the {@code Consumer} to call if the current {@code Maybe} signals an error
+     * @param onComplete the {@linnk Action} to call if the current {@code Maybe} completes without a value
+     * @throws NullPointerException if {@code onSuccess}, {@code onError} or {@code onComplete} is {@code null}
+     * @since 3.0.0
+     */
+    @SchedulerSupport(SchedulerSupport.NONE)
+    public final void blockingSubscribe(@NonNull Consumer<? super T> onSuccess, @NonNull Consumer<? super Throwable> onError, @NonNull Action onComplete) {
+        Objects.requireNonNull(onSuccess, "onSuccess is null");
+        Objects.requireNonNull(onError, "onError is null");
+        Objects.requireNonNull(onComplete, "onComplete is null");
+        BlockingMultiObserver<T> observer = new BlockingMultiObserver<>();
+        subscribe(observer);
+        observer.blockingConsume(onSuccess, onError, onComplete);
+    }
+
+    /**
+     * Subscribes to the current {@code Maybe} and calls the appropriate {@link MaybeObserver} method on the <em>current thread</em>.
+     * <p>
+     * <img width="640" height="348" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.blockingSubscribe.o.png" alt="">
+     * <dl>
+     *  <dt><b>Scheduler:</b></dt>
+     *  <dd>{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
+     *  <dt><b>Error handling:</b></dt>
+     *  <dd>An {@code onError} signal is delivered to the {@link MaybeObserver#onError(Throwable)} method.
+     *  If any of the {@code MaybeObserver}'s methods throw, the {@link RuntimeException} is propagated to the caller of this method.
+     *  If the current thread is interrupted, an {@link InterruptedException} is delivered to {@code observer.onError}.
+     *  </dd>
+     * </dl>
+     * @param observer the {@code MaybeObserver} to call methods on the current thread
+     * @throws NullPointerException if {@code observer} is {@code null}
+     * @since 3.0.0
+     */
+    @SchedulerSupport(SchedulerSupport.NONE)
+    public final void blockingSubscribe(@NonNull MaybeObserver<? super T> observer) {
+        Objects.requireNonNull(observer, "observer is null");
+        BlockingDisposableMultiObserver<T> blockingObserver = new BlockingDisposableMultiObserver<>();
+        observer.onSubscribe(blockingObserver);
+        subscribe(blockingObserver);
+        blockingObserver.blockingConsume(observer);
+    }
+
     /**
      * Returns a {@code Maybe} that subscribes to this {@code Maybe} lazily, caches its event
      * and replays it, to all the downstream subscribers.
diff --git a/src/main/java/io/reactivex/rxjava3/core/Single.java b/src/main/java/io/reactivex/rxjava3/core/Single.java
index c93bfa9282..2db4f16212 100644
--- a/src/main/java/io/reactivex/rxjava3/core/Single.java
+++ b/src/main/java/io/reactivex/rxjava3/core/Single.java
@@ -2947,6 +2947,106 @@ public final T blockingGet() {
         return observer.blockingGet();
     }
 
+    /**
+     * Subscribes to the current {@code Single} and <em>blocks the current thread</em> until it terminates.
+     * <p>
+     * <img width="640" height="348" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.blockingSubscribe.png" alt="">
+     * <dl>
+     *  <dt><b>Scheduler:</b></dt>
+     *  <dd>{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
+     *  <dt><b>Error handling:</b></dt>
+     *  <dd>If the current {@code Single} signals an error,
+     *  the {@link Throwable} is routed to the global error handler via {@link RxJavaPlugins#onError(Throwable)}.
+     *  If the current thread is interrupted, an {@link InterruptedException} is routed to the same global error handler.
+     *  </dd>
+     * </dl>
+     * @since 3.0.0
+     * @see #blockingSubscribe(Consumer)
+     * @see #blockingSubscribe(Consumer, Consumer)
+     */
+    @SchedulerSupport(SchedulerSupport.NONE)
+    public final void blockingSubscribe() {
+        blockingSubscribe(Functions.emptyConsumer(), Functions.ERROR_CONSUMER);
+    }
+
+    /**
+     * Subscribes to the current {@code Single} and calls given {@code onSuccess} callback on the <em>current thread</em>
+     * when it completes normally.
+     * <p>
+     * <img width="640" height="348" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.blockingSubscribe.c.png" alt="">
+     * <dl>
+     *  <dt><b>Scheduler:</b></dt>
+     *  <dd>{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
+     *  <dt><b>Error handling:</b></dt>
+     *  <dd>If either the current {@code Single} signals an error or {@code onSuccess} throws,
+     *  the respective {@link Throwable} is routed to the global error handler via {@link RxJavaPlugins#onError(Throwable)}.
+     *  If the current thread is interrupted, an {@link InterruptedException} is routed to the same global error handler.
+     *  </dd>
+     * </dl>
+     * @param onSuccess the {@link Consumer} to call if the current {@code Single} succeeds
+     * @throws NullPointerException if {@code onSuccess} is {@code null}
+     * @since 3.0.0
+     * @see #blockingSubscribe(Consumer, Consumer)
+     */
+    @SchedulerSupport(SchedulerSupport.NONE)
+    public final void blockingSubscribe(@NonNull Consumer<? super T> onSuccess) {
+        blockingSubscribe(onSuccess, Functions.ERROR_CONSUMER);
+    }
+
+    /**
+     * Subscribes to the current {@code Single} and calls the appropriate callback on the <em>current thread</em>
+     * when it terminates.
+     * <p>
+     * <img width="640" height="348" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.blockingSubscribe.cc.png" alt="">
+     * <dl>
+     *  <dt><b>Scheduler:</b></dt>
+     *  <dd>{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
+     *  <dt><b>Error handling:</b></dt>
+     *  <dd>If either {@code onSuccess} or {@code onError} throw, the {@link Throwable} is routed to the
+     *  global error handler via {@link RxJavaPlugins#onError(Throwable)}.
+     *  If the current thread is interrupted, the {@code onError} consumer is called with an {@link InterruptedException}.
+     *  </dd>
+     * </dl>
+     * @param onSuccess the {@link Consumer} to call if the current {@code Single} succeeds
+     * @param onError the {@code Consumer} to call if the current {@code Single} signals an error
+     * @throws NullPointerException if {@code onSuccess} or {@code onError} is {@code null}
+     * @since 3.0.0
+     */
+    @SchedulerSupport(SchedulerSupport.NONE)
+    public final void blockingSubscribe(@NonNull Consumer<? super T> onSuccess, @NonNull Consumer<? super Throwable> onError) {
+        Objects.requireNonNull(onSuccess, "onSuccess is null");
+        Objects.requireNonNull(onError, "onError is null");
+        BlockingMultiObserver<T> observer = new BlockingMultiObserver<>();
+        subscribe(observer);
+        observer.blockingConsume(onSuccess, onError, Functions.EMPTY_ACTION);
+    }
+
+    /**
+     * Subscribes to the current {@code Single} and calls the appropriate {@link SingleObserver} method on the <em>current thread</em>.
+     * <p>
+     * <img width="640" height="348" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.blockingSubscribe.o.png" alt="">
+     * <dl>
+     *  <dt><b>Scheduler:</b></dt>
+     *  <dd>{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
+     *  <dt><b>Error handling:</b></dt>
+     *  <dd>An {@code onError} signal is delivered to the {@link SingleObserver#onError(Throwable)} method.
+     *  If any of the {@code SingleObserver}'s methods throw, the {@link RuntimeException} is propagated to the caller of this method.
+     *  If the current thread is interrupted, an {@link InterruptedException} is delivered to {@code observer.onError}.
+     *  </dd>
+     * </dl>
+     * @param observer the {@code SingleObserver} to call methods on the current thread
+     * @throws NullPointerException if {@code observer} is {@code null}
+     * @since 3.0.0
+     */
+    @SchedulerSupport(SchedulerSupport.NONE)
+    public final void blockingSubscribe(@NonNull SingleObserver<? super T> observer) {
+        Objects.requireNonNull(observer, "observer is null");
+        BlockingDisposableMultiObserver<T> blockingObserver = new BlockingDisposableMultiObserver<>();
+        observer.onSubscribe(blockingObserver);
+        subscribe(blockingObserver);
+        blockingObserver.blockingConsume(observer);
+    }
+
     /**
      * <strong>This method requires advanced knowledge about building operators, please consider
      * other standard composition methods first;</strong>
diff --git a/src/main/java/io/reactivex/rxjava3/internal/observers/BlockingDisposableMultiObserver.java b/src/main/java/io/reactivex/rxjava3/internal/observers/BlockingDisposableMultiObserver.java
new file mode 100644
index 0000000000..da7e289e27
--- /dev/null
+++ b/src/main/java/io/reactivex/rxjava3/internal/observers/BlockingDisposableMultiObserver.java
@@ -0,0 +1,154 @@
+/**
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.rxjava3.internal.observers;
+
+import java.util.concurrent.CountDownLatch;
+
+import io.reactivex.rxjava3.annotations.NonNull;
+import io.reactivex.rxjava3.core.*;
+import io.reactivex.rxjava3.disposables.Disposable;
+import io.reactivex.rxjava3.internal.disposables.*;
+import io.reactivex.rxjava3.internal.util.BlockingHelper;
+
+/**
+ * Blocks until the upstream terminates and dispatches the outcome to
+ * the actual observer.
+ *
+ * @param <T> the element type of the source
+ * @since 3.0.0
+ */
+public final class BlockingDisposableMultiObserver<T>
+extends CountDownLatch
+implements MaybeObserver<T>, SingleObserver<T>, CompletableObserver, Disposable {
+
+    T value;
+    Throwable error;
+
+    final SequentialDisposable upstream;
+
+    public BlockingDisposableMultiObserver() {
+        super(1);
+        upstream = new SequentialDisposable();
+    }
+
+    @Override
+    public void dispose() {
+        upstream.dispose();
+        countDown();
+    }
+
+    @Override
+    public boolean isDisposed() {
+        return upstream.isDisposed();
+    }
+
+    @Override
+    public void onSubscribe(@NonNull Disposable d) {
+        DisposableHelper.setOnce(upstream, d);
+    }
+
+    @Override
+    public void onSuccess(@NonNull T t) {
+        this.value = t;
+        upstream.lazySet(Disposable.disposed());
+        countDown();
+    }
+
+    @Override
+    public void onError(@NonNull Throwable e) {
+        this.error = e;
+        upstream.lazySet(Disposable.disposed());
+        countDown();
+    }
+
+    @Override
+    public void onComplete() {
+        upstream.lazySet(Disposable.disposed());
+        countDown();
+    }
+
+    public void blockingConsume(CompletableObserver observer) {
+        if (getCount() != 0) {
+            try {
+                BlockingHelper.verifyNonBlocking();
+                await();
+            } catch (InterruptedException ex) {
+                dispose();
+                observer.onError(ex);
+                return;
+            }
+        }
+        if (isDisposed()) {
+            return;
+        }
+
+        Throwable ex = error;
+        if (ex != null) {
+            observer.onError(ex);
+        } else {
+            observer.onComplete();
+        }
+    }
+
+    public void blockingConsume(SingleObserver<? super T> observer) {
+        if (getCount() != 0) {
+            try {
+                BlockingHelper.verifyNonBlocking();
+                await();
+            } catch (InterruptedException ex) {
+                dispose();
+                observer.onError(ex);
+                return;
+            }
+        }
+        if (isDisposed()) {
+            return;
+        }
+
+        Throwable ex = error;
+        if (ex != null) {
+            observer.onError(ex);
+        } else {
+            observer.onSuccess(value);
+        }
+    }
+
+    public void blockingConsume(MaybeObserver<? super T> observer) {
+        if (getCount() != 0) {
+            try {
+                BlockingHelper.verifyNonBlocking();
+                await();
+            } catch (InterruptedException ex) {
+                dispose();
+                observer.onError(ex);
+                return;
+            }
+        }
+        if (isDisposed()) {
+            return;
+        }
+
+        Throwable ex = error;
+        if (ex != null) {
+            observer.onError(ex);
+        } else {
+            T v = value;
+            if (v == null) {
+                observer.onComplete();
+            } else {
+                observer.onSuccess(v);
+            }
+        }
+    }
+}
diff --git a/src/main/java/io/reactivex/rxjava3/internal/observers/BlockingMultiObserver.java b/src/main/java/io/reactivex/rxjava3/internal/observers/BlockingMultiObserver.java
index 1f4ee305bd..4945730ab5 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/observers/BlockingMultiObserver.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/observers/BlockingMultiObserver.java
@@ -17,7 +17,10 @@
 
 import io.reactivex.rxjava3.core.*;
 import io.reactivex.rxjava3.disposables.Disposable;
+import io.reactivex.rxjava3.exceptions.Exceptions;
+import io.reactivex.rxjava3.functions.*;
 import io.reactivex.rxjava3.internal.util.*;
+import io.reactivex.rxjava3.plugins.RxJavaPlugins;
 
 /**
  * A combined Observer that awaits the success or error signal via a CountDownLatch.
@@ -143,4 +146,39 @@ public boolean blockingAwait(long timeout, TimeUnit unit) {
         }
         return true;
     }
+
+    /**
+     * Blocks until the source completes and calls the appropriate callback.
+     * @param onSuccess for a succeeding source
+     * @param onError for a failing source
+     * @param onComplete for an empty source
+     */
+    public void blockingConsume(Consumer<? super T> onSuccess, Consumer<? super Throwable> onError, Action onComplete) {
+        try {
+            if (getCount() != 0) {
+                try {
+                    BlockingHelper.verifyNonBlocking();
+                    await();
+                } catch (InterruptedException ex) {
+                    dispose();
+                    onError.accept(ex);
+                    return;
+                }
+            }
+            Throwable ex = error;
+            if (ex != null) {
+                onError.accept(ex);
+                return;
+            }
+            T v = value;
+            if (v != null) {
+                onSuccess.accept(v);
+            } else {
+                onComplete.run();
+            }
+        } catch (Throwable t) {
+            Exceptions.throwIfFatal(t);
+            RxJavaPlugins.onError(t);
+        }
+    }
 }
diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableBlockingSubscribeTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableBlockingSubscribeTest.java
new file mode 100644
index 0000000000..e70aac7157
--- /dev/null
+++ b/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableBlockingSubscribeTest.java
@@ -0,0 +1,331 @@
+/**
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.rxjava3.internal.operators.completable;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.*;
+
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+
+import io.reactivex.rxjava3.core.Completable;
+import io.reactivex.rxjava3.exceptions.TestException;
+import io.reactivex.rxjava3.functions.*;
+import io.reactivex.rxjava3.observers.TestObserver;
+import io.reactivex.rxjava3.schedulers.Schedulers;
+import io.reactivex.rxjava3.testsupport.TestHelper;
+
+public class CompletableBlockingSubscribeTest {
+
+    @Test
+    public void noArgComplete() {
+        Completable.complete()
+        .blockingSubscribe();
+    }
+
+    @Test
+    public void noArgCompleteAsync() {
+        Completable.complete()
+        .delay(100, TimeUnit.MILLISECONDS)
+        .blockingSubscribe();
+    }
+
+    @Test
+    public void noArgError() throws Throwable {
+        TestHelper.withErrorTracking(errors -> {
+            Completable.error(new TestException())
+            .blockingSubscribe();
+
+            TestHelper.assertUndeliverable(errors, 0, TestException.class);
+        });
+    }
+
+    @Test
+    public void noArgErrorAsync() throws Throwable {
+        TestHelper.withErrorTracking(errors -> {
+            Completable.error(new TestException())
+            .delay(100, TimeUnit.MILLISECONDS, Schedulers.computation(), true)
+            .blockingSubscribe();
+
+            TestHelper.assertUndeliverable(errors, 0, TestException.class);
+        });
+    }
+
+    @Test
+    public void oneArgComplete() throws Throwable {
+        Action action = mock(Action.class);
+
+        Completable.complete()
+        .blockingSubscribe(action);
+
+        verify(action).run();
+    }
+
+    @Test
+    public void oneArgCompleteAsync() throws Throwable {
+        Action action = mock(Action.class);
+
+        Completable.complete()
+        .delay(50, TimeUnit.MILLISECONDS)
+        .blockingSubscribe(action);
+
+        verify(action).run();
+    }
+
+    @Test
+    public void oneArgCompleteFails() throws Throwable {
+        TestHelper.withErrorTracking(errors -> {
+            Action action = mock(Action.class);
+            doThrow(new TestException()).when(action).run();
+
+            Completable.complete()
+            .blockingSubscribe(action);
+
+            TestHelper.assertUndeliverable(errors, 0, TestException.class);
+
+            verify(action).run();
+        });
+    }
+
+    @Test
+    public void oneArgError() throws Throwable {
+        TestHelper.withErrorTracking(errors -> {
+            Action action = mock(Action.class);
+
+            Completable.error(new TestException())
+            .blockingSubscribe(action);
+
+            TestHelper.assertUndeliverable(errors, 0, TestException.class);
+
+            verify(action, never()).run();
+        });
+    }
+
+    @Test
+    public void oneArgErrorAsync() throws Throwable {
+        TestHelper.withErrorTracking(errors -> {
+            Action action = mock(Action.class);
+
+            Completable.error(new TestException())
+            .delay(50, TimeUnit.MILLISECONDS, Schedulers.computation(), true)
+            .blockingSubscribe(action);
+
+            TestHelper.assertUndeliverable(errors, 0, TestException.class);
+
+            verify(action, never()).run();
+        });
+    }
+
+    @Test
+    public void twoArgComplete() throws Throwable {
+        Action action = mock(Action.class);
+        @SuppressWarnings("unchecked")
+        Consumer<? super Throwable> consumer = mock(Consumer.class);
+
+        Completable.complete()
+        .blockingSubscribe(action, consumer);
+
+        verify(action).run();
+        verify(consumer, never()).accept(any());
+    }
+
+    @Test
+    public void twoArgCompleteAsync() throws Throwable {
+        Action action = mock(Action.class);
+        @SuppressWarnings("unchecked")
+        Consumer<? super Throwable> consumer = mock(Consumer.class);
+
+        Completable.complete()
+        .delay(50, TimeUnit.MILLISECONDS)
+        .blockingSubscribe(action, consumer);
+
+        verify(action).run();
+        verify(consumer, never()).accept(any());
+    }
+
+    @Test
+    public void twoArgCompleteFails() throws Throwable {
+        TestHelper.withErrorTracking(errors -> {
+            Action action = mock(Action.class);
+            doThrow(new TestException()).when(action).run();
+            @SuppressWarnings("unchecked")
+            Consumer<? super Throwable> consumer = mock(Consumer.class);
+
+            Completable.complete()
+            .blockingSubscribe(action, consumer);
+
+            TestHelper.assertUndeliverable(errors, 0, TestException.class);
+
+            verify(action).run();
+            verify(consumer, never()).accept(any());
+        });
+    }
+
+    @Test
+    public void twoArgError() throws Throwable {
+        TestHelper.withErrorTracking(errors -> {
+            Action action = mock(Action.class);
+            @SuppressWarnings("unchecked")
+            Consumer<? super Throwable> consumer = mock(Consumer.class);
+
+            Completable.error(new TestException())
+            .blockingSubscribe(action, consumer);
+
+            assertTrue("" + errors, errors.isEmpty());
+
+            verify(action, never()).run();
+            verify(consumer).accept(any(TestException.class));
+        });
+    }
+
+    @Test
+    public void twoArgErrorAsync() throws Throwable {
+        TestHelper.withErrorTracking(errors -> {
+            Action action = mock(Action.class);
+            @SuppressWarnings("unchecked")
+            Consumer<? super Throwable> consumer = mock(Consumer.class);
+
+            Completable.error(new TestException())
+            .delay(50, TimeUnit.MILLISECONDS, Schedulers.computation(), true)
+            .blockingSubscribe(action, consumer);
+
+            assertTrue("" + errors, errors.isEmpty());
+
+            verify(action, never()).run();
+            verify(consumer).accept(any(TestException.class));
+        });
+    }
+
+    @Test
+    public void twoArgErrorFails() throws Throwable {
+        TestHelper.withErrorTracking(errors -> {
+            Action action = mock(Action.class);
+            @SuppressWarnings("unchecked")
+            Consumer<? super Throwable> consumer = mock(Consumer.class);
+            doThrow(new TestException()).when(consumer).accept(any());
+
+            Completable.error(new TestException())
+            .delay(50, TimeUnit.MILLISECONDS, Schedulers.computation(), true)
+            .blockingSubscribe(action, consumer);
+
+            TestHelper.assertUndeliverable(errors, 0, TestException.class);
+
+            verify(action, never()).run();
+            verify(consumer).accept(any(TestException.class));
+        });
+    }
+
+    @Test
+    public void twoArgInterrupted() throws Throwable {
+        TestHelper.withErrorTracking(errors -> {
+            Action onDispose = mock(Action.class);
+
+            Action action = mock(Action.class);
+            @SuppressWarnings("unchecked")
+            Consumer<? super Throwable> consumer = mock(Consumer.class);
+
+            Thread.currentThread().interrupt();
+
+            Completable.never()
+            .doOnDispose(onDispose)
+            .blockingSubscribe(action, consumer);
+
+            assertTrue("" + errors, errors.isEmpty());
+
+            verify(onDispose).run();
+            verify(action, never()).run();
+            verify(consumer).accept(any(InterruptedException.class));
+        });
+    }
+
+    @Test
+    public void observerComplete() {
+        TestObserver<Void> to = new TestObserver<>();
+
+        Completable.complete()
+        .blockingSubscribe(to);
+
+        to.assertResult();
+    }
+
+    @Test
+    public void observerCompleteAsync() {
+        TestObserver<Void> to = new TestObserver<>();
+
+        Completable.complete()
+        .delay(50, TimeUnit.MILLISECONDS, Schedulers.computation(), true)
+        .blockingSubscribe(to);
+
+        to.assertResult();
+    }
+
+    @Test
+    public void observerError() {
+        TestObserver<Void> to = new TestObserver<>();
+
+        Completable.error(new TestException())
+        .blockingSubscribe(to);
+
+        to.assertFailure(TestException.class);
+    }
+
+    @Test
+    public void observerErrorAsync() {
+        TestObserver<Void> to = new TestObserver<>();
+
+        Completable.error(new TestException())
+        .delay(50, TimeUnit.MILLISECONDS, Schedulers.computation(), true)
+        .blockingSubscribe(to);
+
+        to.assertFailure(TestException.class);
+    }
+
+    @Test
+    public void observerDispose() throws Throwable {
+        Action onDispose = mock(Action.class);
+
+        TestObserver<Void> to = new TestObserver<>();
+        to.dispose();
+
+        Completable.never()
+        .doOnDispose(onDispose)
+        .blockingSubscribe(to);
+
+        to.assertEmpty();
+
+        verify(onDispose).run();
+    }
+
+    @Test
+    public void ovserverInterrupted() throws Throwable {
+        TestHelper.withErrorTracking(errors -> {
+            Action onDispose = mock(Action.class);
+
+            TestObserver<Void> to = new TestObserver<>();
+
+            Thread.currentThread().interrupt();
+
+            Completable.never()
+            .doOnDispose(onDispose)
+            .blockingSubscribe(to);
+
+            assertTrue("" + errors, errors.isEmpty());
+
+            verify(onDispose).run();
+            to.assertFailure(InterruptedException.class);
+        });
+    }
+}
diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeBlockingSubscribeTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeBlockingSubscribeTest.java
new file mode 100644
index 0000000000..647d385eef
--- /dev/null
+++ b/src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeBlockingSubscribeTest.java
@@ -0,0 +1,502 @@
+/**
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.rxjava3.internal.operators.maybe;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.*;
+
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+
+import io.reactivex.rxjava3.core.Maybe;
+import io.reactivex.rxjava3.exceptions.TestException;
+import io.reactivex.rxjava3.functions.*;
+import io.reactivex.rxjava3.observers.TestObserver;
+import io.reactivex.rxjava3.schedulers.Schedulers;
+import io.reactivex.rxjava3.testsupport.TestHelper;
+
+public class MaybeBlockingSubscribeTest {
+
+    @Test
+    public void noArgSuccess() {
+        Maybe.just(1)
+        .blockingSubscribe();
+    }
+
+    @Test
+    public void noArgSuccessAsync() {
+        Maybe.just(1)
+        .delay(100, TimeUnit.MILLISECONDS)
+        .blockingSubscribe();
+    }
+
+    @Test
+    public void noArgEmpty() {
+        Maybe.empty()
+        .blockingSubscribe();
+    }
+
+    @Test
+    public void noArgEmptyAsync() {
+        Maybe.empty()
+        .delay(100, TimeUnit.MILLISECONDS)
+        .blockingSubscribe();
+    }
+
+    @Test
+    public void noArgError() throws Throwable {
+        TestHelper.withErrorTracking(errors -> {
+            Maybe.error(new TestException())
+            .blockingSubscribe();
+
+            TestHelper.assertUndeliverable(errors, 0, TestException.class);
+        });
+    }
+
+    @Test
+    public void noArgErrorAsync() throws Throwable {
+        TestHelper.withErrorTracking(errors -> {
+            Maybe.error(new TestException())
+            .delay(100, TimeUnit.MILLISECONDS, Schedulers.computation())
+            .blockingSubscribe();
+
+            TestHelper.assertUndeliverable(errors, 0, TestException.class);
+        });
+    }
+
+    @Test
+    public void oneArgSuccess() throws Throwable {
+        @SuppressWarnings("unchecked")
+        Consumer<Integer> success = mock(Consumer.class);
+
+        Maybe.just(1)
+        .blockingSubscribe(success);
+
+        verify(success).accept(1);
+    }
+
+    @Test
+    public void oneArgSuccessAsync() throws Throwable {
+        @SuppressWarnings("unchecked")
+        Consumer<Integer> success = mock(Consumer.class);
+
+        Maybe.just(1)
+        .delay(50, TimeUnit.MILLISECONDS)
+        .blockingSubscribe(success);
+
+        verify(success).accept(1);
+    }
+
+    @Test
+    public void oneArgEmpty() throws Throwable {
+        @SuppressWarnings("unchecked")
+        Consumer<Integer> success = mock(Consumer.class);
+
+        Maybe.<Integer>empty()
+        .blockingSubscribe(success);
+
+        verify(success, never()).accept(any());
+    }
+
+    @Test
+    public void oneArgEmptyAsync() throws Throwable {
+        @SuppressWarnings("unchecked")
+        Consumer<Integer> success = mock(Consumer.class);
+
+        Maybe.<Integer>empty()
+        .delay(50, TimeUnit.MILLISECONDS)
+        .blockingSubscribe(success);
+
+        verify(success, never()).accept(any());
+    }
+
+    @Test
+    public void oneArgSuccessFails() throws Throwable {
+        TestHelper.withErrorTracking(errors -> {
+            @SuppressWarnings("unchecked")
+            Consumer<Integer> success = mock(Consumer.class);
+            doThrow(new TestException()).when(success).accept(any());
+
+            Maybe.just(1)
+            .blockingSubscribe(success);
+
+            TestHelper.assertUndeliverable(errors, 0, TestException.class);
+
+            verify(success).accept(1);
+        });
+    }
+
+    @Test
+    public void oneArgError() throws Throwable {
+        TestHelper.withErrorTracking(errors -> {
+            @SuppressWarnings("unchecked")
+            Consumer<Integer> success = mock(Consumer.class);
+
+            Maybe.<Integer>error(new TestException())
+            .blockingSubscribe(success);
+
+            TestHelper.assertUndeliverable(errors, 0, TestException.class);
+
+            verify(success, never()).accept(any());
+        });
+    }
+
+    @Test
+    public void oneArgErrorAsync() throws Throwable {
+        TestHelper.withErrorTracking(errors -> {
+            @SuppressWarnings("unchecked")
+            Consumer<Integer> success = mock(Consumer.class);
+
+            Maybe.<Integer>error(new TestException())
+            .delay(50, TimeUnit.MILLISECONDS, Schedulers.computation())
+            .blockingSubscribe(success);
+
+            TestHelper.assertUndeliverable(errors, 0, TestException.class);
+
+            verify(success, never()).accept(any());
+        });
+    }
+
+    @Test
+    public void twoArgSuccess() throws Throwable {
+        @SuppressWarnings("unchecked")
+        Consumer<Integer> success = mock(Consumer.class);
+        @SuppressWarnings("unchecked")
+        Consumer<? super Throwable> consumer = mock(Consumer.class);
+
+        Maybe.just(1)
+        .blockingSubscribe(success, consumer);
+
+        verify(success).accept(1);
+        verify(consumer, never()).accept(any());
+    }
+
+    @Test
+    public void twoArgSuccessAsync() throws Throwable {
+        @SuppressWarnings("unchecked")
+        Consumer<Integer> success = mock(Consumer.class);
+        @SuppressWarnings("unchecked")
+        Consumer<? super Throwable> consumer = mock(Consumer.class);
+
+        Maybe.just(1)
+        .delay(50, TimeUnit.MILLISECONDS)
+        .blockingSubscribe(success, consumer);
+
+        verify(success).accept(any());
+        verify(consumer, never()).accept(any());
+    }
+
+    @Test
+    public void twoArgEmpty() throws Throwable {
+        @SuppressWarnings("unchecked")
+        Consumer<Integer> success = mock(Consumer.class);
+        @SuppressWarnings("unchecked")
+        Consumer<? super Throwable> consumer = mock(Consumer.class);
+
+        Maybe.<Integer>empty()
+        .blockingSubscribe(success, consumer);
+
+        verify(success, never()).accept(any());
+        verify(consumer, never()).accept(any());
+    }
+
+    @Test
+    public void twoArgEmptyAsync() throws Throwable {
+        @SuppressWarnings("unchecked")
+        Consumer<Integer> success = mock(Consumer.class);
+        @SuppressWarnings("unchecked")
+        Consumer<? super Throwable> consumer = mock(Consumer.class);
+
+        Maybe.<Integer>empty()
+        .delay(50, TimeUnit.MILLISECONDS)
+        .blockingSubscribe(success, consumer);
+
+        verify(success, never()).accept(any());
+        verify(consumer, never()).accept(any());
+    }
+
+    @Test
+    public void twoArgSuccessFails() throws Throwable {
+        TestHelper.withErrorTracking(errors -> {
+            @SuppressWarnings("unchecked")
+            Consumer<Integer> success = mock(Consumer.class);
+            doThrow(new TestException()).when(success).accept(any());
+            @SuppressWarnings("unchecked")
+            Consumer<? super Throwable> consumer = mock(Consumer.class);
+
+            Maybe.just(1)
+            .blockingSubscribe(success, consumer);
+
+            TestHelper.assertUndeliverable(errors, 0, TestException.class);
+
+            verify(success).accept(any());
+            verify(consumer, never()).accept(any());
+        });
+    }
+
+    @Test
+    public void twoArgError() throws Throwable {
+        TestHelper.withErrorTracking(errors -> {
+            @SuppressWarnings("unchecked")
+            Consumer<Integer> success = mock(Consumer.class);
+            @SuppressWarnings("unchecked")
+            Consumer<? super Throwable> consumer = mock(Consumer.class);
+
+            Maybe.<Integer>error(new TestException())
+            .blockingSubscribe(success, consumer);
+
+            assertTrue("" + errors, errors.isEmpty());
+
+            verify(success, never()).accept(any());
+            verify(consumer).accept(any(TestException.class));
+        });
+    }
+
+    @Test
+    public void twoArgErrorAsync() throws Throwable {
+        TestHelper.withErrorTracking(errors -> {
+            @SuppressWarnings("unchecked")
+            Consumer<Integer> success = mock(Consumer.class);
+            @SuppressWarnings("unchecked")
+            Consumer<? super Throwable> consumer = mock(Consumer.class);
+
+            Maybe.<Integer>error(new TestException())
+            .delay(50, TimeUnit.MILLISECONDS, Schedulers.computation())
+            .blockingSubscribe(success, consumer);
+
+            assertTrue("" + errors, errors.isEmpty());
+
+            verify(success, never()).accept(any());
+            verify(consumer).accept(any(TestException.class));
+        });
+    }
+
+    @Test
+    public void twoArgErrorFails() throws Throwable {
+        TestHelper.withErrorTracking(errors -> {
+            @SuppressWarnings("unchecked")
+            Consumer<Integer> success = mock(Consumer.class);
+            @SuppressWarnings("unchecked")
+            Consumer<? super Throwable> consumer = mock(Consumer.class);
+            doThrow(new TestException()).when(consumer).accept(any());
+
+            Maybe.<Integer>error(new TestException())
+            .delay(50, TimeUnit.MILLISECONDS, Schedulers.computation())
+            .blockingSubscribe(success, consumer);
+
+            TestHelper.assertUndeliverable(errors, 0, TestException.class);
+
+            verify(success, never()).accept(any());
+            verify(consumer).accept(any(TestException.class));
+        });
+    }
+
+    @Test
+    public void threeArgSuccess() throws Throwable {
+        @SuppressWarnings("unchecked")
+        Consumer<Integer> success = mock(Consumer.class);
+        @SuppressWarnings("unchecked")
+        Consumer<? super Throwable> consumer = mock(Consumer.class);
+        Action action = mock(Action.class);
+
+        Maybe.just(1)
+        .blockingSubscribe(success, consumer, action);
+
+        verify(success).accept(any());
+        verify(consumer, never()).accept(any(Throwable.class));
+        verify(action, never()).run();
+    }
+
+    @Test
+    public void threeArgEmpty() throws Throwable {
+        @SuppressWarnings("unchecked")
+        Consumer<Integer> success = mock(Consumer.class);
+        @SuppressWarnings("unchecked")
+        Consumer<? super Throwable> consumer = mock(Consumer.class);
+        Action action = mock(Action.class);
+
+        Maybe.<Integer>empty()
+        .blockingSubscribe(success, consumer, action);
+
+        verify(success, never()).accept(any());
+        verify(consumer, never()).accept(any(Throwable.class));
+        verify(action).run();
+    }
+
+    @Test
+    public void threeArgError() throws Throwable {
+        @SuppressWarnings("unchecked")
+        Consumer<Integer> success = mock(Consumer.class);
+        @SuppressWarnings("unchecked")
+        Consumer<? super Throwable> consumer = mock(Consumer.class);
+        Action action = mock(Action.class);
+
+        Maybe.<Integer>error(new TestException())
+        .blockingSubscribe(success, consumer, action);
+
+        verify(success, never()).accept(any());
+        verify(consumer).accept(any(TestException.class));
+        verify(action, never()).run();
+    }
+
+    @Test
+    public void threeArgEmptyFails() throws Throwable {
+        TestHelper.withErrorTracking(errors -> {
+            @SuppressWarnings("unchecked")
+            Consumer<Integer> success = mock(Consumer.class);
+            @SuppressWarnings("unchecked")
+            Consumer<? super Throwable> consumer = mock(Consumer.class);
+
+            Action action = mock(Action.class);
+            doThrow(new TestException()).when(action).run();
+
+            Maybe.<Integer>empty()
+            .delay(50, TimeUnit.MILLISECONDS, Schedulers.computation())
+            .blockingSubscribe(success, consumer, action);
+
+            TestHelper.assertUndeliverable(errors, 0, TestException.class);
+
+            verify(success, never()).accept(any());
+            verify(consumer, never()).accept(any());
+            verify(action).run();
+        });
+    }
+
+    @Test
+    public void threeArgInterrupted() throws Throwable {
+        TestHelper.withErrorTracking(errors -> {
+            Action onDispose = mock(Action.class);
+
+            @SuppressWarnings("unchecked")
+            Consumer<Integer> success = mock(Consumer.class);
+            @SuppressWarnings("unchecked")
+            Consumer<? super Throwable> consumer = mock(Consumer.class);
+            Action action = mock(Action.class);
+
+            Thread.currentThread().interrupt();
+
+            Maybe.<Integer>never()
+            .doOnDispose(onDispose)
+            .blockingSubscribe(success, consumer, action);
+
+            assertTrue("" + errors, errors.isEmpty());
+
+            verify(onDispose).run();
+            verify(success, never()).accept(any());
+            verify(action, never()).run();
+            verify(consumer).accept(any(InterruptedException.class));
+        });
+    }
+
+    @Test
+    public void observerSuccess() {
+        TestObserver<Integer> to = new TestObserver<>();
+
+        Maybe.just(1)
+        .blockingSubscribe(to);
+
+        to.assertResult(1);
+    }
+
+    @Test
+    public void observerSuccessAsync() {
+        TestObserver<Integer> to = new TestObserver<>();
+
+        Maybe.just(1)
+        .delay(50, TimeUnit.MILLISECONDS, Schedulers.computation())
+        .blockingSubscribe(to);
+
+        to.assertResult(1);
+    }
+
+    @Test
+    public void observerEmpty() {
+        TestObserver<Integer> to = new TestObserver<>();
+
+        Maybe.<Integer>empty()
+        .blockingSubscribe(to);
+
+        to.assertResult();
+    }
+
+    @Test
+    public void observerEmptyAsync() {
+        TestObserver<Integer> to = new TestObserver<>();
+
+        Maybe.<Integer>empty()
+        .delay(50, TimeUnit.MILLISECONDS, Schedulers.computation())
+        .blockingSubscribe(to);
+
+        to.assertResult();
+    }
+
+    @Test
+    public void observerError() {
+        TestObserver<Object> to = new TestObserver<>();
+
+        Maybe.error(new TestException())
+        .blockingSubscribe(to);
+
+        to.assertFailure(TestException.class);
+    }
+
+    @Test
+    public void observerErrorAsync() {
+        TestObserver<Object> to = new TestObserver<>();
+
+        Maybe.error(new TestException())
+        .delay(50, TimeUnit.MILLISECONDS, Schedulers.computation())
+        .blockingSubscribe(to);
+
+        to.assertFailure(TestException.class);
+    }
+
+    @Test
+    public void observerDispose() throws Throwable {
+        Action onDispose = mock(Action.class);
+
+        TestObserver<Object> to = new TestObserver<>();
+        to.dispose();
+
+        Maybe.never()
+        .doOnDispose(onDispose)
+        .blockingSubscribe(to);
+
+        to.assertEmpty();
+
+        verify(onDispose).run();
+    }
+
+    @Test
+    public void ovserverInterrupted() throws Throwable {
+        TestHelper.withErrorTracking(errors -> {
+            Action onDispose = mock(Action.class);
+
+            TestObserver<Object> to = new TestObserver<>();
+
+            Thread.currentThread().interrupt();
+
+            Maybe.never()
+            .doOnDispose(onDispose)
+            .blockingSubscribe(to);
+
+            assertTrue("" + errors, errors.isEmpty());
+
+            verify(onDispose).run();
+            to.assertFailure(InterruptedException.class);
+        });
+    }
+}
diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleBlockingSubscribeTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleBlockingSubscribeTest.java
new file mode 100644
index 0000000000..50eceede5c
--- /dev/null
+++ b/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleBlockingSubscribeTest.java
@@ -0,0 +1,343 @@
+/**
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.rxjava3.internal.operators.single;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.*;
+
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+
+import io.reactivex.rxjava3.core.Single;
+import io.reactivex.rxjava3.exceptions.TestException;
+import io.reactivex.rxjava3.functions.*;
+import io.reactivex.rxjava3.observers.TestObserver;
+import io.reactivex.rxjava3.schedulers.Schedulers;
+import io.reactivex.rxjava3.testsupport.TestHelper;
+
+public class SingleBlockingSubscribeTest {
+
+    @Test
+    public void noArgSuccess() {
+        Single.just(1)
+        .blockingSubscribe();
+    }
+
+    @Test
+    public void noArgSuccessAsync() {
+        Single.just(1)
+        .delay(100, TimeUnit.MILLISECONDS)
+        .blockingSubscribe();
+    }
+
+    @Test
+    public void noArgError() throws Throwable {
+        TestHelper.withErrorTracking(errors -> {
+            Single.error(new TestException())
+            .blockingSubscribe();
+
+            TestHelper.assertUndeliverable(errors, 0, TestException.class);
+        });
+    }
+
+    @Test
+    public void noArgErrorAsync() throws Throwable {
+        TestHelper.withErrorTracking(errors -> {
+            Single.error(new TestException())
+            .delay(100, TimeUnit.MILLISECONDS, Schedulers.computation(), true)
+            .blockingSubscribe();
+
+            TestHelper.assertUndeliverable(errors, 0, TestException.class);
+        });
+    }
+
+    @Test
+    public void oneArgSuccess() throws Throwable {
+        @SuppressWarnings("unchecked")
+        Consumer<Integer> success = mock(Consumer.class);
+
+        Single.just(1)
+        .blockingSubscribe(success);
+
+        verify(success).accept(1);
+    }
+
+    @Test
+    public void oneArgSuccessAsync() throws Throwable {
+        @SuppressWarnings("unchecked")
+        Consumer<Integer> success = mock(Consumer.class);
+
+        Single.just(1)
+        .delay(50, TimeUnit.MILLISECONDS)
+        .blockingSubscribe(success);
+
+        verify(success).accept(1);
+    }
+
+    @Test
+    public void oneArgSuccessFails() throws Throwable {
+        TestHelper.withErrorTracking(errors -> {
+            @SuppressWarnings("unchecked")
+            Consumer<Integer> success = mock(Consumer.class);
+            doThrow(new TestException()).when(success).accept(any());
+
+            Single.just(1)
+            .blockingSubscribe(success);
+
+            TestHelper.assertUndeliverable(errors, 0, TestException.class);
+
+            verify(success).accept(1);
+        });
+    }
+
+    @Test
+    public void oneArgError() throws Throwable {
+        TestHelper.withErrorTracking(errors -> {
+            @SuppressWarnings("unchecked")
+            Consumer<Integer> success = mock(Consumer.class);
+
+            Single.<Integer>error(new TestException())
+            .blockingSubscribe(success);
+
+            TestHelper.assertUndeliverable(errors, 0, TestException.class);
+
+            verify(success, never()).accept(any());
+        });
+    }
+
+    @Test
+    public void oneArgErrorAsync() throws Throwable {
+        TestHelper.withErrorTracking(errors -> {
+            @SuppressWarnings("unchecked")
+            Consumer<Integer> success = mock(Consumer.class);
+
+            Single.<Integer>error(new TestException())
+            .delay(50, TimeUnit.MILLISECONDS, Schedulers.computation(), true)
+            .blockingSubscribe(success);
+
+            TestHelper.assertUndeliverable(errors, 0, TestException.class);
+
+            verify(success, never()).accept(any());
+        });
+    }
+
+    @Test
+    public void twoArgSuccess() throws Throwable {
+        @SuppressWarnings("unchecked")
+        Consumer<Integer> success = mock(Consumer.class);
+        @SuppressWarnings("unchecked")
+        Consumer<? super Throwable> consumer = mock(Consumer.class);
+
+        Single.just(1)
+        .blockingSubscribe(success, consumer);
+
+        verify(success).accept(1);
+        verify(consumer, never()).accept(any());
+    }
+
+    @Test
+    public void twoArgSuccessAsync() throws Throwable {
+        @SuppressWarnings("unchecked")
+        Consumer<Integer> success = mock(Consumer.class);
+        @SuppressWarnings("unchecked")
+        Consumer<? super Throwable> consumer = mock(Consumer.class);
+
+        Single.just(1)
+        .delay(50, TimeUnit.MILLISECONDS)
+        .blockingSubscribe(success, consumer);
+
+        verify(success).accept(any());
+        verify(consumer, never()).accept(any());
+    }
+
+    @Test
+    public void twoArgSuccessFails() throws Throwable {
+        TestHelper.withErrorTracking(errors -> {
+            @SuppressWarnings("unchecked")
+            Consumer<Integer> success = mock(Consumer.class);
+            doThrow(new TestException()).when(success).accept(any());
+            @SuppressWarnings("unchecked")
+            Consumer<? super Throwable> consumer = mock(Consumer.class);
+
+            Single.just(1)
+            .blockingSubscribe(success, consumer);
+
+            TestHelper.assertUndeliverable(errors, 0, TestException.class);
+
+            verify(success).accept(any());
+            verify(consumer, never()).accept(any());
+        });
+    }
+
+    @Test
+    public void twoArgError() throws Throwable {
+        TestHelper.withErrorTracking(errors -> {
+            @SuppressWarnings("unchecked")
+            Consumer<Integer> success = mock(Consumer.class);
+            @SuppressWarnings("unchecked")
+            Consumer<? super Throwable> consumer = mock(Consumer.class);
+
+            Single.<Integer>error(new TestException())
+            .blockingSubscribe(success, consumer);
+
+            assertTrue("" + errors, errors.isEmpty());
+
+            verify(success, never()).accept(any());
+            verify(consumer).accept(any(TestException.class));
+        });
+    }
+
+    @Test
+    public void twoArgErrorAsync() throws Throwable {
+        TestHelper.withErrorTracking(errors -> {
+            @SuppressWarnings("unchecked")
+            Consumer<Integer> success = mock(Consumer.class);
+            @SuppressWarnings("unchecked")
+            Consumer<? super Throwable> consumer = mock(Consumer.class);
+
+            Single.<Integer>error(new TestException())
+            .delay(50, TimeUnit.MILLISECONDS, Schedulers.computation(), true)
+            .blockingSubscribe(success, consumer);
+
+            assertTrue("" + errors, errors.isEmpty());
+
+            verify(success, never()).accept(any());
+            verify(consumer).accept(any(TestException.class));
+        });
+    }
+
+    @Test
+    public void twoArgErrorFails() throws Throwable {
+        TestHelper.withErrorTracking(errors -> {
+            @SuppressWarnings("unchecked")
+            Consumer<Integer> success = mock(Consumer.class);
+            @SuppressWarnings("unchecked")
+            Consumer<? super Throwable> consumer = mock(Consumer.class);
+            doThrow(new TestException()).when(consumer).accept(any());
+
+            Single.<Integer>error(new TestException())
+            .delay(50, TimeUnit.MILLISECONDS, Schedulers.computation(), true)
+            .blockingSubscribe(success, consumer);
+
+            TestHelper.assertUndeliverable(errors, 0, TestException.class);
+
+            verify(success, never()).accept(any());
+            verify(consumer).accept(any(TestException.class));
+        });
+    }
+
+    @Test
+    public void twoArgInterrupted() throws Throwable {
+        TestHelper.withErrorTracking(errors -> {
+            Action onDispose = mock(Action.class);
+
+            @SuppressWarnings("unchecked")
+            Consumer<Integer> success = mock(Consumer.class);
+            @SuppressWarnings("unchecked")
+            Consumer<? super Throwable> consumer = mock(Consumer.class);
+
+            Thread.currentThread().interrupt();
+
+            Single.<Integer>never()
+            .doOnDispose(onDispose)
+            .blockingSubscribe(success, consumer);
+
+            assertTrue("" + errors, errors.isEmpty());
+
+            verify(onDispose).run();
+            verify(success, never()).accept(any());
+            verify(consumer).accept(any(InterruptedException.class));
+        });
+    }
+
+    @Test
+    public void observerSuccess() {
+        TestObserver<Integer> to = new TestObserver<>();
+
+        Single.just(1)
+        .blockingSubscribe(to);
+
+        to.assertResult(1);
+    }
+
+    @Test
+    public void observerSuccessAsync() {
+        TestObserver<Integer> to = new TestObserver<>();
+
+        Single.just(1)
+        .delay(50, TimeUnit.MILLISECONDS, Schedulers.computation(), true)
+        .blockingSubscribe(to);
+
+        to.assertResult(1);
+    }
+
+    @Test
+    public void observerError() {
+        TestObserver<Object> to = new TestObserver<>();
+
+        Single.error(new TestException())
+        .blockingSubscribe(to);
+
+        to.assertFailure(TestException.class);
+    }
+
+    @Test
+    public void observerErrorAsync() {
+        TestObserver<Object> to = new TestObserver<>();
+
+        Single.error(new TestException())
+        .delay(50, TimeUnit.MILLISECONDS, Schedulers.computation(), true)
+        .blockingSubscribe(to);
+
+        to.assertFailure(TestException.class);
+    }
+
+    @Test
+    public void observerDispose() throws Throwable {
+        Action onDispose = mock(Action.class);
+
+        TestObserver<Object> to = new TestObserver<>();
+        to.dispose();
+
+        Single.never()
+        .doOnDispose(onDispose)
+        .blockingSubscribe(to);
+
+        to.assertEmpty();
+
+        verify(onDispose).run();
+    }
+
+    @Test
+    public void ovserverInterrupted() throws Throwable {
+        TestHelper.withErrorTracking(errors -> {
+            Action onDispose = mock(Action.class);
+
+            TestObserver<Object> to = new TestObserver<>();
+
+            Thread.currentThread().interrupt();
+
+            Single.never()
+            .doOnDispose(onDispose)
+            .blockingSubscribe(to);
+
+            assertTrue("" + errors, errors.isEmpty());
+
+            verify(onDispose).run();
+            to.assertFailure(InterruptedException.class);
+        });
+    }
+}

From 4ae6b1b54693c55ef38a2929d14d7dfdf54c7efc Mon Sep 17 00:00:00 2001
From: akarnokd <akarnokd@gmail.com>
Date: Thu, 23 Jan 2020 15:26:24 +0100
Subject: [PATCH 2/2] Update marble dimensions

---
 .../java/io/reactivex/rxjava3/core/Completable.java    |  8 ++++----
 src/main/java/io/reactivex/rxjava3/core/Maybe.java     | 10 +++++-----
 src/main/java/io/reactivex/rxjava3/core/Single.java    |  6 +++---
 3 files changed, 12 insertions(+), 12 deletions(-)

diff --git a/src/main/java/io/reactivex/rxjava3/core/Completable.java b/src/main/java/io/reactivex/rxjava3/core/Completable.java
index 4e4ecbe102..0913ac398d 100644
--- a/src/main/java/io/reactivex/rxjava3/core/Completable.java
+++ b/src/main/java/io/reactivex/rxjava3/core/Completable.java
@@ -1313,7 +1313,7 @@ public final boolean blockingAwait(long timeout, @NonNull TimeUnit unit) {
     /**
      * Subscribes to the current {@code Completable} and <em>blocks the current thread</em> until it terminates.
      * <p>
-     * <img width="640" height="348" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.blockingSubscribe.png" alt="">
+     * <img width="640" height="346" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.blockingSubscribe.png" alt="">
      * <dl>
      *  <dt><b>Scheduler:</b></dt>
      *  <dd>{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
@@ -1336,7 +1336,7 @@ public final void blockingSubscribe() {
      * Subscribes to the current {@code Completable} and calls given {@code onComplete} callback on the <em>current thread</em>
      * when it completes normally.
      * <p>
-     * <img width="640" height="348" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.blockingSubscribe.a.png" alt="">
+     * <img width="640" height="351" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.blockingSubscribe.a.png" alt="">
      * <dl>
      *  <dt><b>Scheduler:</b></dt>
      *  <dd>{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
@@ -1360,7 +1360,7 @@ public final void blockingSubscribe(@NonNull Action onComplete) {
      * Subscribes to the current {@code Completable} and calls the appropriate callback on the <em>current thread</em>
      * when it terminates.
      * <p>
-     * <img width="640" height="348" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.blockingSubscribe.ac.png" alt="">
+     * <img width="640" height="352" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.blockingSubscribe.ac.png" alt="">
      * <dl>
      *  <dt><b>Scheduler:</b></dt>
      *  <dd>{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
@@ -1387,7 +1387,7 @@ public final void blockingSubscribe(@NonNull Action onComplete, @NonNull Consume
     /**
      * Subscribes to the current {@code Completable} and calls the appropriate {@link CompletableObserver} method on the <em>current thread</em>.
      * <p>
-     * <img width="640" height="348" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.blockingSubscribe.o.png" alt="">
+     * <img width="640" height="468" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.blockingSubscribe.o.png" alt="">
      * <dl>
      *  <dt><b>Scheduler:</b></dt>
      *  <dd>{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
diff --git a/src/main/java/io/reactivex/rxjava3/core/Maybe.java b/src/main/java/io/reactivex/rxjava3/core/Maybe.java
index 627435e9e9..cc3bef199e 100644
--- a/src/main/java/io/reactivex/rxjava3/core/Maybe.java
+++ b/src/main/java/io/reactivex/rxjava3/core/Maybe.java
@@ -2478,7 +2478,7 @@ public final T blockingGet(@NonNull T defaultValue) {
     /**
      * Subscribes to the current {@code Maybe} and <em>blocks the current thread</em> until it terminates.
      * <p>
-     * <img width="640" height="348" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.blockingSubscribe.png" alt="">
+     * <img width="640" height="238" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.blockingSubscribe.png" alt="">
      * <dl>
      *  <dt><b>Scheduler:</b></dt>
      *  <dd>{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
@@ -2502,7 +2502,7 @@ public final void blockingSubscribe() {
      * Subscribes to the current {@code Maybe} and calls given {@code onSuccess} callback on the <em>current thread</em>
      * when it completes normally.
      * <p>
-     * <img width="640" height="348" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.blockingSubscribe.c.png" alt="">
+     * <img width="640" height="245" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.blockingSubscribe.c.png" alt="">
      * <dl>
      *  <dt><b>Scheduler:</b></dt>
      *  <dd>{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
@@ -2527,7 +2527,7 @@ public final void blockingSubscribe(@NonNull Consumer<? super T> onSuccess) {
      * Subscribes to the current {@code Maybe} and calls the appropriate callback on the <em>current thread</em>
      * when it terminates.
      * <p>
-     * <img width="640" height="348" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.blockingSubscribe.cc.png" alt="">
+     * <img width="640" height="256" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.blockingSubscribe.cc.png" alt="">
      * <dl>
      *  <dt><b>Scheduler:</b></dt>
      *  <dd>{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
@@ -2552,7 +2552,7 @@ public final void blockingSubscribe(@NonNull Consumer<? super T> onSuccess, @Non
      * Subscribes to the current {@code Maybe} and calls the appropriate callback on the <em>current thread</em>
      * when it terminates.
      * <p>
-     * <img width="640" height="348" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.blockingSubscribe.cca.png" alt="">
+     * <img width="640" height="251" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.blockingSubscribe.cca.png" alt="">
      * <dl>
      *  <dt><b>Scheduler:</b></dt>
      *  <dd>{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
@@ -2581,7 +2581,7 @@ public final void blockingSubscribe(@NonNull Consumer<? super T> onSuccess, @Non
     /**
      * Subscribes to the current {@code Maybe} and calls the appropriate {@link MaybeObserver} method on the <em>current thread</em>.
      * <p>
-     * <img width="640" height="348" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.blockingSubscribe.o.png" alt="">
+     * <img width="640" height="398" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.blockingSubscribe.o.png" alt="">
      * <dl>
      *  <dt><b>Scheduler:</b></dt>
      *  <dd>{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
diff --git a/src/main/java/io/reactivex/rxjava3/core/Single.java b/src/main/java/io/reactivex/rxjava3/core/Single.java
index 2db4f16212..216d3deb87 100644
--- a/src/main/java/io/reactivex/rxjava3/core/Single.java
+++ b/src/main/java/io/reactivex/rxjava3/core/Single.java
@@ -2950,7 +2950,7 @@ public final T blockingGet() {
     /**
      * Subscribes to the current {@code Single} and <em>blocks the current thread</em> until it terminates.
      * <p>
-     * <img width="640" height="348" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.blockingSubscribe.png" alt="">
+     * <img width="640" height="329" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.blockingSubscribe.png" alt="">
      * <dl>
      *  <dt><b>Scheduler:</b></dt>
      *  <dd>{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
@@ -2973,7 +2973,7 @@ public final void blockingSubscribe() {
      * Subscribes to the current {@code Single} and calls given {@code onSuccess} callback on the <em>current thread</em>
      * when it completes normally.
      * <p>
-     * <img width="640" height="348" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.blockingSubscribe.c.png" alt="">
+     * <img width="640" height="351" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.blockingSubscribe.c.png" alt="">
      * <dl>
      *  <dt><b>Scheduler:</b></dt>
      *  <dd>{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
@@ -3024,7 +3024,7 @@ public final void blockingSubscribe(@NonNull Consumer<? super T> onSuccess, @Non
     /**
      * Subscribes to the current {@code Single} and calls the appropriate {@link SingleObserver} method on the <em>current thread</em>.
      * <p>
-     * <img width="640" height="348" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.blockingSubscribe.o.png" alt="">
+     * <img width="640" height="479" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.blockingSubscribe.o.png" alt="">
      * <dl>
      *  <dt><b>Scheduler:</b></dt>
      *  <dd>{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>