Skip to content

Commit ff51791

Browse files
committed
Merge pull request #3352 from akarnokd/MergePerf2x
2.x: perf checks for flatMap and merge
2 parents db04cfa + 6998d04 commit ff51791

File tree

4 files changed

+302
-0
lines changed

4 files changed

+302
-0
lines changed
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
package io.reactivex;
2+
3+
import java.util.Iterator;
4+
5+
import org.openjdk.jmh.annotations.Setup;
6+
import org.openjdk.jmh.infra.Blackhole;
7+
import org.reactivestreams.*;
8+
import io.reactivex.internal.subscriptions.*;
9+
10+
/**
11+
* Exposes an Observable and Observer that increments n Integers and consumes them in a Blackhole.
12+
*/
13+
public abstract class InputWithIncrementingInteger {
14+
public Iterable<Integer> iterable;
15+
public Observable<Integer> observable;
16+
public Observable<Integer> firehose;
17+
public Blackhole bh;
18+
19+
public abstract int getSize();
20+
21+
@Setup
22+
public void setup(final Blackhole bh) {
23+
this.bh = bh;
24+
final int size = getSize();
25+
observable = Observable.range(0, size);
26+
27+
firehose = Observable.create(new Publisher<Integer>() {
28+
29+
@Override
30+
public void subscribe(Subscriber<? super Integer> s) {
31+
s.onSubscribe(EmptySubscription.INSTANCE);
32+
for (int i = 0; i < size; i++) {
33+
s.onNext(i);
34+
}
35+
s.onComplete();
36+
}
37+
38+
});
39+
iterable = new Iterable<Integer>() {
40+
@Override
41+
public Iterator<Integer> iterator() {
42+
return new Iterator<Integer>() {
43+
int i = 0;
44+
45+
@Override
46+
public boolean hasNext() {
47+
return i < size;
48+
}
49+
50+
@Override
51+
public Integer next() {
52+
Blackhole.consumeCPU(10);
53+
return i++;
54+
}
55+
56+
@Override
57+
public void remove() {
58+
59+
}
60+
};
61+
}
62+
};
63+
64+
}
65+
66+
public LatchedObserver<Integer> newLatchedObserver() {
67+
return new LatchedObserver<>(bh);
68+
}
69+
70+
public Subscriber<Integer> newSubscriber() {
71+
return new Observer<Integer>() {
72+
73+
@Override
74+
public void onComplete() {
75+
76+
}
77+
78+
@Override
79+
public void onError(Throwable e) {
80+
81+
}
82+
83+
@Override
84+
public void onNext(Integer t) {
85+
bh.consume(t);
86+
}
87+
88+
};
89+
}
90+
91+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package io.reactivex;
2+
3+
import java.util.concurrent.CountDownLatch;
4+
5+
import org.openjdk.jmh.infra.Blackhole;
6+
7+
public class LatchedObserver<T> extends Observer<T> {
8+
9+
public CountDownLatch latch = new CountDownLatch(1);
10+
private final Blackhole bh;
11+
12+
public LatchedObserver(Blackhole bh) {
13+
this.bh = bh;
14+
}
15+
16+
@Override
17+
public void onComplete() {
18+
latch.countDown();
19+
}
20+
21+
@Override
22+
public void onError(Throwable e) {
23+
latch.countDown();
24+
}
25+
26+
@Override
27+
public void onNext(T t) {
28+
bh.consume(t);
29+
}
30+
31+
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package io.reactivex;
2+
3+
import java.util.concurrent.TimeUnit;
4+
5+
import org.openjdk.jmh.annotations.*;
6+
import io.reactivex.schedulers.*;
7+
8+
@BenchmarkMode(Mode.Throughput)
9+
@OutputTimeUnit(TimeUnit.SECONDS)
10+
@State(Scope.Thread)
11+
public class OperatorFlatMapPerf {
12+
13+
@State(Scope.Thread)
14+
public static class Input extends InputWithIncrementingInteger {
15+
16+
@Param({ "1", "1000", "1000000" })
17+
public int size;
18+
19+
@Override
20+
public int getSize() {
21+
return size;
22+
}
23+
24+
}
25+
26+
@Benchmark
27+
public void flatMapIntPassthruSync(Input input) throws InterruptedException {
28+
input.observable.flatMap(Observable::just).subscribe(input.newSubscriber());
29+
}
30+
31+
@Benchmark
32+
public void flatMapIntPassthruAsync(Input input) throws InterruptedException {
33+
LatchedObserver<Integer> latchedObserver = input.newLatchedObserver();
34+
input.observable.flatMap(i -> {
35+
return Observable.just(i).subscribeOn(Schedulers.computation());
36+
}).subscribe(latchedObserver);
37+
latchedObserver.latch.await();
38+
}
39+
40+
@Benchmark
41+
public void flatMapTwoNestedSync(final Input input) throws InterruptedException {
42+
Observable.range(1, 2).flatMap(i -> {
43+
return input.observable;
44+
}).subscribe(input.newSubscriber());
45+
}
46+
47+
// this runs out of memory currently
48+
// @Benchmark
49+
// public void flatMapTwoNestedAsync(final Input input) throws InterruptedException {
50+
// Observable.range(1, 2).flatMap(new Func1<Integer, Observable<Integer>>() {
51+
//
52+
// @Override
53+
// public Observable<Integer> call(Integer i) {
54+
// return input.observable.subscribeOn(Schedulers.computation());
55+
// }
56+
//
57+
// }).subscribe(input.observer);
58+
// }
59+
60+
}
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
package io.reactivex;
2+
3+
import java.util.*;
4+
import java.util.concurrent.TimeUnit;
5+
6+
import org.openjdk.jmh.annotations.*;
7+
import org.openjdk.jmh.infra.Blackhole;
8+
9+
import io.reactivex.schedulers.*;
10+
11+
@BenchmarkMode(Mode.Throughput)
12+
@OutputTimeUnit(TimeUnit.SECONDS)
13+
public class OperatorMergePerf {
14+
15+
// flatMap
16+
@Benchmark
17+
public void oneStreamOfNthatMergesIn1(final InputMillion input) throws InterruptedException {
18+
Observable<Observable<Integer>> os = Observable.range(1, input.size).map(Observable::just);
19+
LatchedObserver<Integer> o = input.newLatchedObserver();
20+
Observable.merge(os).subscribe(o);
21+
o.latch.await();
22+
}
23+
24+
// flatMap
25+
@Benchmark
26+
public void merge1SyncStreamOfN(final InputMillion input) throws InterruptedException {
27+
Observable<Observable<Integer>> os = Observable.just(1).map(i -> {
28+
return Observable.range(0, input.size);
29+
});
30+
LatchedObserver<Integer> o = input.newLatchedObserver();
31+
Observable.merge(os).subscribe(o);
32+
o.latch.await();
33+
}
34+
35+
@Benchmark
36+
public void mergeNSyncStreamsOfN(final InputThousand input) throws InterruptedException {
37+
Observable<Observable<Integer>> os = input.observable.map(i -> {
38+
return Observable.range(0, input.size);
39+
});
40+
LatchedObserver<Integer> o = input.newLatchedObserver();
41+
Observable.merge(os).subscribe(o);
42+
o.latch.await();
43+
}
44+
45+
@Benchmark
46+
public void mergeNAsyncStreamsOfN(final InputThousand input) throws InterruptedException {
47+
Observable<Observable<Integer>> os = input.observable.map(i -> {
48+
return Observable.range(0, input.size).subscribeOn(Schedulers.computation());
49+
});
50+
LatchedObserver<Integer> o = input.newLatchedObserver();
51+
Observable.merge(os).subscribe(o);
52+
o.latch.await();
53+
}
54+
55+
@Benchmark
56+
public void mergeTwoAsyncStreamsOfN(final InputThousand input) throws InterruptedException {
57+
LatchedObserver<Integer> o = input.newLatchedObserver();
58+
Observable<Integer> ob = Observable.range(0, input.size).subscribeOn(Schedulers.computation());
59+
Observable.merge(ob, ob).subscribe(o);
60+
o.latch.await();
61+
}
62+
63+
@Benchmark
64+
public void mergeNSyncStreamsOf1(final InputForMergeN input) throws InterruptedException {
65+
LatchedObserver<Integer> o = input.newLatchedObserver();
66+
Observable.merge(input.observables).subscribe(o);
67+
o.latch.await();
68+
}
69+
70+
@State(Scope.Thread)
71+
public static class InputForMergeN {
72+
@Param({ "1", "100", "1000" })
73+
// @Param({ "1000" })
74+
public int size;
75+
76+
private Blackhole bh;
77+
List<Observable<Integer>> observables;
78+
79+
@Setup
80+
public void setup(final Blackhole bh) {
81+
this.bh = bh;
82+
observables = new ArrayList<>();
83+
for (int i = 0; i < size; i++) {
84+
observables.add(Observable.just(i));
85+
}
86+
}
87+
88+
public LatchedObserver<Integer> newLatchedObserver() {
89+
return new LatchedObserver<>(bh);
90+
}
91+
}
92+
93+
@State(Scope.Thread)
94+
public static class InputMillion extends InputWithIncrementingInteger {
95+
96+
@Param({ "1", "1000", "1000000" })
97+
// @Param({ "1000" })
98+
public int size;
99+
100+
@Override
101+
public int getSize() {
102+
return size;
103+
}
104+
105+
}
106+
107+
@State(Scope.Thread)
108+
public static class InputThousand extends InputWithIncrementingInteger {
109+
110+
@Param({ "1", "1000" })
111+
// @Param({ "1000" })
112+
public int size;
113+
114+
@Override
115+
public int getSize() {
116+
return size;
117+
}
118+
119+
}
120+
}

0 commit comments

Comments
 (0)