diff --git a/src/main/java/io/reactivex/Completable.java b/src/main/java/io/reactivex/Completable.java index 6d64f5f8f3..032fa1608f 100644 --- a/src/main/java/io/reactivex/Completable.java +++ b/src/main/java/io/reactivex/Completable.java @@ -942,7 +942,12 @@ public final Throwable blockingGet(long timeout, TimeUnit unit) { */ @SchedulerSupport(SchedulerSupport.NONE) public final Completable compose(CompletableTransformer transformer) { - return wrap(to(transformer)); + try { + return wrap(transformer.apply(this)); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + throw ExceptionHelper.wrapOrThrow(ex); + } } /** diff --git a/src/main/java/io/reactivex/CompletableTransformer.java b/src/main/java/io/reactivex/CompletableTransformer.java index fd3204a704..432c699771 100644 --- a/src/main/java/io/reactivex/CompletableTransformer.java +++ b/src/main/java/io/reactivex/CompletableTransformer.java @@ -13,12 +13,10 @@ package io.reactivex; -import io.reactivex.functions.Function; - /** * Convenience interface and callback used by the compose operator to turn a Completable into another * Completable fluently. */ -public interface CompletableTransformer extends Function { - +public interface CompletableTransformer { + CompletableSource apply(Completable completable) throws Exception; } diff --git a/src/main/java/io/reactivex/Flowable.java b/src/main/java/io/reactivex/Flowable.java index a593c84c71..0031f53f4d 100644 --- a/src/main/java/io/reactivex/Flowable.java +++ b/src/main/java/io/reactivex/Flowable.java @@ -6278,7 +6278,12 @@ public final Single collectInto(final U initialItem, BiConsumer Flowable compose(FlowableTransformer composer) { - return fromPublisher(to(composer)); + try { + return fromPublisher(composer.apply(this)); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + throw ExceptionHelper.wrapOrThrow(ex); + } } /** diff --git a/src/main/java/io/reactivex/FlowableTransformer.java b/src/main/java/io/reactivex/FlowableTransformer.java index 825c0511f1..0f2017a02b 100644 --- a/src/main/java/io/reactivex/FlowableTransformer.java +++ b/src/main/java/io/reactivex/FlowableTransformer.java @@ -15,14 +15,12 @@ import org.reactivestreams.Publisher; -import io.reactivex.functions.Function; - /** * Interface to compose Flowables. * * @param the upstream value type * @param the downstream value type */ -public interface FlowableTransformer extends Function, Publisher> { - +public interface FlowableTransformer { + Publisher apply(Flowable flowable) throws Exception; } diff --git a/src/main/java/io/reactivex/Maybe.java b/src/main/java/io/reactivex/Maybe.java index b4b559851a..3c3386a6fd 100644 --- a/src/main/java/io/reactivex/Maybe.java +++ b/src/main/java/io/reactivex/Maybe.java @@ -1974,7 +1974,12 @@ public final Maybe cast(final Class clazz) { */ @SchedulerSupport(SchedulerSupport.NONE) public final Maybe compose(MaybeTransformer transformer) { - return wrap(to(transformer)); + try { + return wrap(transformer.apply(this)); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + throw ExceptionHelper.wrapOrThrow(ex); + } } /** diff --git a/src/main/java/io/reactivex/MaybeTransformer.java b/src/main/java/io/reactivex/MaybeTransformer.java index ecd14838e5..c169e2023f 100644 --- a/src/main/java/io/reactivex/MaybeTransformer.java +++ b/src/main/java/io/reactivex/MaybeTransformer.java @@ -13,14 +13,12 @@ package io.reactivex; -import io.reactivex.functions.Function; - /** * Interface to compose Maybes. * * @param the upstream value type * @param the downstream value type */ -public interface MaybeTransformer extends Function, MaybeSource> { - +public interface MaybeTransformer { + MaybeSource apply(Maybe maybe) throws Exception; } diff --git a/src/main/java/io/reactivex/Observable.java b/src/main/java/io/reactivex/Observable.java index 910e14fdb1..2ed857205f 100644 --- a/src/main/java/io/reactivex/Observable.java +++ b/src/main/java/io/reactivex/Observable.java @@ -5489,7 +5489,12 @@ public final Single collectInto(final U initialValue, BiConsumer Observable compose(ObservableTransformer composer) { - return wrap(to(composer)); + try { + return wrap(composer.apply(this)); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + throw ExceptionHelper.wrapOrThrow(ex); + } } diff --git a/src/main/java/io/reactivex/ObservableTransformer.java b/src/main/java/io/reactivex/ObservableTransformer.java index 1a9c192eee..1dec08d247 100644 --- a/src/main/java/io/reactivex/ObservableTransformer.java +++ b/src/main/java/io/reactivex/ObservableTransformer.java @@ -13,14 +13,12 @@ package io.reactivex; -import io.reactivex.functions.Function; - /** * Interface to compose Observables. * * @param the upstream value type * @param the downstream value type */ -public interface ObservableTransformer extends Function, ObservableSource> { - +public interface ObservableTransformer { + ObservableSource apply(Observable upstream) throws Exception; } diff --git a/src/main/java/io/reactivex/Single.java b/src/main/java/io/reactivex/Single.java index c143e4f148..e367a4baf7 100644 --- a/src/main/java/io/reactivex/Single.java +++ b/src/main/java/io/reactivex/Single.java @@ -1473,7 +1473,12 @@ public final Single hide() { */ @SchedulerSupport(SchedulerSupport.NONE) public final Single compose(SingleTransformer transformer) { - return wrap(to(transformer)); + try { + return wrap(transformer.apply(this)); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + throw ExceptionHelper.wrapOrThrow(ex); + } } /** diff --git a/src/main/java/io/reactivex/SingleTransformer.java b/src/main/java/io/reactivex/SingleTransformer.java index 5763d034a1..4ce319aec4 100644 --- a/src/main/java/io/reactivex/SingleTransformer.java +++ b/src/main/java/io/reactivex/SingleTransformer.java @@ -13,14 +13,12 @@ package io.reactivex; -import io.reactivex.functions.Function; - /** * Interface to compose Singles. * * @param the upstream value type * @param the downstream value type */ -public interface SingleTransformer extends Function, SingleSource> { - +public interface SingleTransformer { + SingleSource apply(Single upstream) throws Exception; }