Skip to content

Commit d9cf88b

Browse files
committed
Support RxJava 3
Motivation: Recently RxJava 3 has been released. https://github.com/ReactiveX/RxJava/releases/tag/v3.0.0 Armeria supports RxJava2 integration with `armeria-rxjava`. This PR migrates `RequestContextAssembly` from RxJava2 to RxJava3. RxJava 3 being based on Java 8 also supports seamless conversions between CompletionStage and Single, Maybe, Observable. Modifications: * Rename original `armeria-rxjava` to `armeria-rxjava2` * Make armeria-rxjava support RxJava 3 * Use built-in converter methods for ObservableResponseConverterFunction * Change `*Callable` to `*Supplier` ReactiveX/RxJava#6511 * Don't migrate `RequestContextScalarCallableCompletable` and `RequestContextCallableCompletable` There is no subclasses of `Completable` that implement `Supplier` * Delegate `reset()` in `ConnectableFlowable` https://github.com/ReactiveX/RxJava/wiki/What's-different-in-3.0#connectable-source-reset Result: You can now use RxJava 3 with Armeria. Fixes: line#2378
1 parent 96c5a99 commit d9cf88b

File tree

71 files changed

+2512
-132
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

71 files changed

+2512
-132
lines changed

dependencies.yml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,12 @@ io.reactivex.rxjava2:
241241
javadocs:
242242
- http://reactivex.io/RxJava/2.x/javadoc/
243243

244+
io.reactivex.rxjava3:
245+
rxjava:
246+
version: '3.0.0'
247+
javadocs:
248+
- http://reactivex.io/RxJava/3.x/javadoc/
249+
244250
io.zipkin.brave:
245251
brave:
246252
# ':site:javadoc' fails when we use a newer version of Javadoc.

examples/context-propagation/rxjava/build.gradle

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@ plugins {
55
dependencies {
66
implementation project(':core')
77
implementation project(':rxjava')
8-
9-
implementation 'net.javacrumbs.future-converter:future-converter-rxjava2-java8'
108
}
119

1210
application {

examples/context-propagation/rxjava/src/main/java/example/armeria/contextpropagation/rxjava/Main.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,10 @@ public class Main {
88

99
public static void main(String[] args) {
1010
final Server backend = Server.builder()
11-
.service("/square/{num}", ((ctx, req) -> {
11+
.service("/square/{num}", (ctx, req) -> {
1212
final long num = Long.parseLong(ctx.pathParam("num"));
1313
return HttpResponse.of(Long.toString(num * num));
14-
}))
14+
})
1515
.http(8081)
1616
.build();
1717

examples/context-propagation/rxjava/src/main/java/example/armeria/contextpropagation/rxjava/MainService.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@
77
import java.util.ArrayList;
88
import java.util.List;
99

10-
import net.javacrumbs.futureconverter.java8rx2.FutureConverter;
11-
1210
import com.google.common.base.Splitter;
1311
import com.google.common.collect.ImmutableList;
1412
import com.google.common.collect.Iterables;
@@ -21,10 +19,10 @@
2119
import com.linecorp.armeria.server.HttpService;
2220
import com.linecorp.armeria.server.ServiceRequestContext;
2321

24-
import io.reactivex.Flowable;
25-
import io.reactivex.Scheduler;
26-
import io.reactivex.Single;
27-
import io.reactivex.schedulers.Schedulers;
22+
import io.reactivex.rxjava3.core.Flowable;
23+
import io.reactivex.rxjava3.core.Scheduler;
24+
import io.reactivex.rxjava3.core.Single;
25+
import io.reactivex.rxjava3.schedulers.Schedulers;
2826

2927
public class MainService implements HttpService {
3028

@@ -60,7 +58,7 @@ public HttpResponse serve(ServiceRequestContext ctx, HttpRequest req) {
6058
.flattenAsFlowable(l -> l);
6159

6260
final Flowable<Long> extractNumsFromRequest =
63-
FutureConverter.toSingle(req.aggregate())
61+
Single.fromCompletionStage(req.aggregate())
6462
// Unless you know what you're doing, always use subscribeOn with the context
6563
// executor to have the context mounted and stay on a single thread to reduce
6664
// concurrency issues.
@@ -96,13 +94,13 @@ public HttpResponse serve(ServiceRequestContext ctx, HttpRequest req) {
9694
checkState(ServiceRequestContext.current() == ctx);
9795
checkState(ctx.eventLoop().inEventLoop());
9896

99-
return FutureConverter.toSingle(backendClient.get("/square/" + num).aggregate());
97+
return Single.fromCompletionStage(backendClient.get("/square/" + num).aggregate());
10098
})
10199
.map(AggregatedHttpResponse::contentUtf8)
102100
.collectInto(new StringBuilder(), (current, item) -> current.append(item).append('\n'))
103101
.map(content -> HttpResponse.of(content.toString()))
104102
.onErrorReturn(HttpResponse::ofFailure);
105103

106-
return HttpResponse.from(FutureConverter.toCompletableFuture(response));
104+
return HttpResponse.from(response.toCompletionStage());
107105
}
108106
}

rxjava/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
dependencies {
2-
api 'io.reactivex.rxjava2:rxjava'
2+
api 'io.reactivex.rxjava3:rxjava'
33
}

rxjava/src/main/java/com/linecorp/armeria/internal/server/rxjava/ObservableResponseConverterFunctionProvider.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,10 @@
2525
import com.linecorp.armeria.server.annotation.ResponseConverterFunctionProvider;
2626
import com.linecorp.armeria.server.rxjava.ObservableResponseConverterFunction;
2727

28-
import io.reactivex.Completable;
29-
import io.reactivex.Maybe;
30-
import io.reactivex.Observable;
31-
import io.reactivex.Single;
28+
import io.reactivex.rxjava3.core.Completable;
29+
import io.reactivex.rxjava3.core.Maybe;
30+
import io.reactivex.rxjava3.core.Observable;
31+
import io.reactivex.rxjava3.core.Single;
3232

3333
/**
3434
* Provides an {@link ObservableResponseConverterFunction} to annotated services.
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Copyright 2018 LINE Corporation
3+
*
4+
* LINE Corporation licenses this file to you under the Apache License,
5+
* version 2.0 (the "License"); you may not use this file except in compliance
6+
* with the License. You may obtain a copy of the License at:
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
17+
/**
18+
* Provide a default {@link com.linecorp.armeria.server.annotation.ResponseConverterFunction}
19+
* which automatically converts an {@link io.reactivex.rxjava3.core.ObservableSource} into an
20+
* {@link com.linecorp.armeria.common.HttpResponse} when the {@link io.reactivex.rxjava3.core.ObservableSource}
21+
* is returned by an annotated HTTP service.
22+
*/
23+
@NonNullByDefault
24+
package com.linecorp.armeria.internal.server.rxjava;
25+
26+
import com.linecorp.armeria.common.util.NonNullByDefault;
Lines changed: 243 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,243 @@
1+
/*
2+
* Copyright 2018 LINE Corporation
3+
*
4+
* LINE Corporation licenses this file to you under the Apache License,
5+
* version 2.0 (the "License"); you may not use this file except in compliance
6+
* with the License. You may obtain a copy of the License at:
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
17+
package com.linecorp.armeria.rxjava;
18+
19+
import javax.annotation.Nullable;
20+
import javax.annotation.concurrent.GuardedBy;
21+
22+
import com.linecorp.armeria.common.RequestContext;
23+
24+
import io.reactivex.rxjava3.core.Completable;
25+
import io.reactivex.rxjava3.core.Flowable;
26+
import io.reactivex.rxjava3.core.Maybe;
27+
import io.reactivex.rxjava3.core.Observable;
28+
import io.reactivex.rxjava3.core.Single;
29+
import io.reactivex.rxjava3.flowables.ConnectableFlowable;
30+
import io.reactivex.rxjava3.functions.Function;
31+
import io.reactivex.rxjava3.functions.Supplier;
32+
import io.reactivex.rxjava3.internal.fuseable.ScalarSupplier;
33+
import io.reactivex.rxjava3.observables.ConnectableObservable;
34+
import io.reactivex.rxjava3.parallel.ParallelFlowable;
35+
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
36+
37+
/**
38+
* Utility class to keep {@link RequestContext} during RxJava operations.
39+
*/
40+
public final class RequestContextAssembly {
41+
42+
@SuppressWarnings("rawtypes")
43+
@Nullable
44+
@GuardedBy("RequestContextAssembly.class")
45+
private static Function<? super Observable, ? extends Observable> oldOnObservableAssembly;
46+
@SuppressWarnings("rawtypes")
47+
@Nullable
48+
@GuardedBy("RequestContextAssembly.class")
49+
private static Function<? super ConnectableObservable, ? extends ConnectableObservable>
50+
oldOnConnectableObservableAssembly;
51+
@SuppressWarnings("rawtypes")
52+
@Nullable
53+
@GuardedBy("RequestContextAssembly.class")
54+
private static Function<? super Completable, ? extends Completable> oldOnCompletableAssembly;
55+
@SuppressWarnings("rawtypes")
56+
@Nullable
57+
@GuardedBy("RequestContextAssembly.class")
58+
private static Function<? super Single, ? extends Single> oldOnSingleAssembly;
59+
@SuppressWarnings("rawtypes")
60+
@Nullable
61+
@GuardedBy("RequestContextAssembly.class")
62+
private static Function<? super Maybe, ? extends Maybe> oldOnMaybeAssembly;
63+
@SuppressWarnings("rawtypes")
64+
@Nullable
65+
@GuardedBy("RequestContextAssembly.class")
66+
private static Function<? super Flowable, ? extends Flowable> oldOnFlowableAssembly;
67+
@SuppressWarnings("rawtypes")
68+
@Nullable
69+
@GuardedBy("RequestContextAssembly.class")
70+
private static Function<? super ConnectableFlowable, ? extends ConnectableFlowable>
71+
oldOnConnectableFlowableAssembly;
72+
@SuppressWarnings("rawtypes")
73+
@Nullable
74+
@GuardedBy("RequestContextAssembly.class")
75+
private static Function<? super ParallelFlowable, ? extends ParallelFlowable> oldOnParallelAssembly;
76+
77+
@GuardedBy("RequestContextAssembly.class")
78+
private static boolean enabled;
79+
80+
private RequestContextAssembly() {
81+
}
82+
83+
/**
84+
* Enable {@link RequestContext} during operators.
85+
*/
86+
@SuppressWarnings({ "rawtypes", "unchecked" })
87+
public static synchronized void enable() {
88+
if (enabled) {
89+
return;
90+
}
91+
92+
oldOnObservableAssembly = RxJavaPlugins.getOnObservableAssembly();
93+
RxJavaPlugins.setOnObservableAssembly(compose(
94+
oldOnObservableAssembly,
95+
new ConditionalOnCurrentRequestContextFunction<Observable>() {
96+
@Override
97+
Observable applyActual(Observable o, RequestContext ctx) {
98+
if (!(o instanceof Supplier)) {
99+
return new RequestContextObservable(o, ctx);
100+
}
101+
if (o instanceof ScalarSupplier) {
102+
return new RequestContextScalarSupplierObservable(o, ctx);
103+
}
104+
return new RequestContextSupplierObservable(o, ctx);
105+
}
106+
}));
107+
108+
oldOnConnectableObservableAssembly = RxJavaPlugins.getOnConnectableObservableAssembly();
109+
RxJavaPlugins.setOnConnectableObservableAssembly(compose(
110+
oldOnConnectableObservableAssembly,
111+
new ConditionalOnCurrentRequestContextFunction<ConnectableObservable>() {
112+
@Override
113+
ConnectableObservable applyActual(ConnectableObservable co, RequestContext ctx) {
114+
return new RequestContextConnectableObservable(co, ctx);
115+
}
116+
}));
117+
118+
oldOnCompletableAssembly = RxJavaPlugins.getOnCompletableAssembly();
119+
RxJavaPlugins.setOnCompletableAssembly(
120+
compose(oldOnCompletableAssembly,
121+
new ConditionalOnCurrentRequestContextFunction<Completable>() {
122+
@Override
123+
Completable applyActual(Completable c, RequestContext ctx) {
124+
return new RequestContextCompletable(c, ctx);
125+
}
126+
}));
127+
128+
oldOnSingleAssembly = RxJavaPlugins.getOnSingleAssembly();
129+
RxJavaPlugins.setOnSingleAssembly(
130+
compose(oldOnSingleAssembly, new ConditionalOnCurrentRequestContextFunction<Single>() {
131+
@Override
132+
Single applyActual(Single s, RequestContext ctx) {
133+
if (!(s instanceof Supplier)) {
134+
return new RequestContextSingle(s, ctx);
135+
}
136+
if (s instanceof ScalarSupplier) {
137+
return new RequestContextScalarSupplierSingle(s, ctx);
138+
}
139+
return new RequestContextSupplierSingle(s, ctx);
140+
}
141+
}));
142+
143+
oldOnMaybeAssembly = RxJavaPlugins.getOnMaybeAssembly();
144+
RxJavaPlugins.setOnMaybeAssembly(
145+
compose(oldOnMaybeAssembly, new ConditionalOnCurrentRequestContextFunction<Maybe>() {
146+
@Override
147+
Maybe applyActual(Maybe m, RequestContext ctx) {
148+
if (!(m instanceof Supplier)) {
149+
return new RequestContextMaybe(m, ctx);
150+
}
151+
if (m instanceof ScalarSupplier) {
152+
return new RequestContextScalarSupplierMaybe(m, ctx);
153+
}
154+
return new RequestContextSupplierMaybe(m, ctx);
155+
}
156+
}));
157+
158+
oldOnFlowableAssembly = RxJavaPlugins.getOnFlowableAssembly();
159+
RxJavaPlugins.setOnFlowableAssembly(
160+
compose(oldOnFlowableAssembly, new ConditionalOnCurrentRequestContextFunction<Flowable>() {
161+
@Override
162+
Flowable applyActual(Flowable f, RequestContext ctx) {
163+
if (!(f instanceof Supplier)) {
164+
return new RequestContextFlowable(f, ctx);
165+
}
166+
if (f instanceof ScalarSupplier) {
167+
return new RequestContextScalarSupplierFlowable(f, ctx);
168+
}
169+
return new RequestContextSupplierFlowable(f, ctx);
170+
}
171+
}));
172+
173+
oldOnConnectableFlowableAssembly = RxJavaPlugins.getOnConnectableFlowableAssembly();
174+
RxJavaPlugins.setOnConnectableFlowableAssembly(
175+
compose(oldOnConnectableFlowableAssembly,
176+
new ConditionalOnCurrentRequestContextFunction<ConnectableFlowable>() {
177+
@Override
178+
ConnectableFlowable applyActual(
179+
ConnectableFlowable cf,
180+
RequestContext ctx) {
181+
return new RequestContextConnectableFlowable(
182+
cf, ctx);
183+
}
184+
}
185+
));
186+
187+
oldOnParallelAssembly = RxJavaPlugins.getOnParallelAssembly();
188+
RxJavaPlugins.setOnParallelAssembly(
189+
compose(oldOnParallelAssembly,
190+
new ConditionalOnCurrentRequestContextFunction<ParallelFlowable>() {
191+
@Override
192+
ParallelFlowable applyActual(ParallelFlowable pf, RequestContext ctx) {
193+
return new RequestContextParallelFlowable(pf, ctx);
194+
}
195+
}
196+
));
197+
enabled = true;
198+
}
199+
200+
/**
201+
* Disable {@link RequestContext} during operators.
202+
*/
203+
public static synchronized void disable() {
204+
if (!enabled) {
205+
return;
206+
}
207+
RxJavaPlugins.setOnObservableAssembly(oldOnObservableAssembly);
208+
oldOnObservableAssembly = null;
209+
RxJavaPlugins.setOnConnectableObservableAssembly(oldOnConnectableObservableAssembly);
210+
oldOnConnectableObservableAssembly = null;
211+
RxJavaPlugins.setOnCompletableAssembly(oldOnCompletableAssembly);
212+
oldOnCompletableAssembly = null;
213+
RxJavaPlugins.setOnSingleAssembly(oldOnSingleAssembly);
214+
oldOnSingleAssembly = null;
215+
RxJavaPlugins.setOnMaybeAssembly(oldOnMaybeAssembly);
216+
oldOnMaybeAssembly = null;
217+
RxJavaPlugins.setOnFlowableAssembly(oldOnFlowableAssembly);
218+
oldOnFlowableAssembly = null;
219+
RxJavaPlugins.setOnConnectableFlowableAssembly(oldOnConnectableFlowableAssembly);
220+
oldOnConnectableFlowableAssembly = null;
221+
RxJavaPlugins.setOnParallelAssembly(oldOnParallelAssembly);
222+
oldOnParallelAssembly = null;
223+
enabled = false;
224+
}
225+
226+
private abstract static class ConditionalOnCurrentRequestContextFunction<T> implements Function<T, T> {
227+
@Override
228+
public final T apply(T t) {
229+
return RequestContext.mapCurrent(requestContext -> applyActual(t, requestContext), () -> t);
230+
}
231+
232+
abstract T applyActual(T t, RequestContext ctx);
233+
}
234+
235+
private static <T> Function<? super T, ? extends T> compose(
236+
@Nullable Function<? super T, ? extends T> before,
237+
Function<? super T, ? extends T> after) {
238+
if (before == null) {
239+
return after;
240+
}
241+
return (T v) -> after.apply(before.apply(v));
242+
}
243+
}

0 commit comments

Comments
 (0)