Skip to content

Commit aa400d1

Browse files
committed
Merge pull request #3376 from akarnokd/RxVsStreamPerf2x
2.x: perf comparing Observable, NbpObservable, Stream and ParallelStream
2 parents 0c2ce61 + 6f12f6f commit aa400d1

File tree

1 file changed

+99
-0
lines changed

1 file changed

+99
-0
lines changed
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/**
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex;
15+
16+
import java.util.*;
17+
import java.util.concurrent.TimeUnit;
18+
19+
import org.openjdk.jmh.annotations.*;
20+
import org.openjdk.jmh.infra.Blackhole;
21+
22+
@BenchmarkMode(Mode.Throughput)
23+
@Warmup(iterations = 5)
24+
@Measurement(iterations = 5, time = 5, timeUnit = TimeUnit.SECONDS)
25+
@OutputTimeUnit(TimeUnit.SECONDS)
26+
@Fork(value = 1)
27+
@State(Scope.Thread)
28+
public class RxVsStreamPerf {
29+
@Param({ "1", "1000", "1000000" })
30+
public int times;
31+
32+
Observable<Integer> range;
33+
34+
NbpObservable<Integer> rangeNbp;
35+
36+
Observable<Integer> rangeFlatMap;
37+
38+
NbpObservable<Integer> rangeNbpFlatMap;
39+
40+
List<Integer> values;
41+
42+
@Setup
43+
public void setup() {
44+
range = Observable.range(1, times);
45+
46+
rangeFlatMap = range.flatMap(v -> Observable.range(v, 2));
47+
48+
rangeNbp = NbpObservable.range(1, times);
49+
50+
rangeNbpFlatMap = rangeNbp.flatMap(v -> NbpObservable.range(v, 2));
51+
52+
values = range.toList().toBlocking().first();
53+
}
54+
55+
@Benchmark
56+
public void range(Blackhole bh) {
57+
range.subscribe(new LatchedObserver<>(bh));
58+
}
59+
60+
@Benchmark
61+
public void rangeNbp(Blackhole bh) {
62+
rangeNbp.subscribe(new LatchedNbpObserver<>(bh));
63+
}
64+
65+
@Benchmark
66+
public void rangeFlatMap(Blackhole bh) {
67+
rangeFlatMap.subscribe(new LatchedObserver<>(bh));
68+
}
69+
70+
@Benchmark
71+
public void rangeNbpFlatMap(Blackhole bh) {
72+
rangeNbpFlatMap.subscribe(new LatchedNbpObserver<>(bh));
73+
}
74+
75+
@Benchmark
76+
public void stream(Blackhole bh) {
77+
values.stream().forEach(bh::consume);
78+
}
79+
80+
@Benchmark
81+
public void streamFlatMap(Blackhole bh) {
82+
values.stream()
83+
.flatMap(v -> Arrays.asList(v, v + 1).stream())
84+
.forEach(bh::consume);
85+
}
86+
87+
@Benchmark
88+
public void streamParallel(Blackhole bh) {
89+
values.stream().parallel().forEach(bh::consume);
90+
}
91+
92+
@Benchmark
93+
public void streamParallelFlatMap(Blackhole bh) {
94+
values.stream()
95+
.flatMap(v -> Arrays.asList(v, v + 1).stream())
96+
.parallel()
97+
.forEach(bh::consume);
98+
}
99+
}

0 commit comments

Comments
 (0)