Skip to content

Commit 22a6194

Browse files
peteralfonsiPeter Alfonsi
andauthored
Switch percentiles implementation to MergingDigest (opensearch-project#18124)
--------- Signed-off-by: Peter Alfonsi <[email protected]> Signed-off-by: Peter Alfonsi <[email protected]> Co-authored-by: Peter Alfonsi <[email protected]>
1 parent a29e6e8 commit 22a6194

File tree

7 files changed

+70
-24
lines changed

7 files changed

+70
-24
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
3838

3939
### Changed
4040
- Create generic DocRequest to better categorize ActionRequests ([#18269](https://github.com/opensearch-project/OpenSearch/pull/18269)))
41+
- Change implementation for `percentiles` aggregation for latency improvement [#18124](https://github.com/opensearch-project/OpenSearch/pull/18124)
4142

4243
### Dependencies
4344
- Update Apache Lucene from 10.1.0 to 10.2.1 ([#17961](https://github.com/opensearch-project/OpenSearch/pull/17961))

gradle/libs.versions.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ protobuf = "3.25.5"
2525
jakarta_annotation = "1.3.5"
2626
google_http_client = "1.44.1"
2727
google_auth = "1.29.0"
28-
tdigest = "3.3"
28+
tdigest = "3.3" # Warning: Before updating tdigest, ensure its serialization code for MergingDigest hasn't changed
2929
hdrhistogram = "2.2.2"
3030
grpc = "1.68.2"
3131
json_smart = "2.5.2"

server/src/main/java/org/opensearch/search/aggregations/metrics/InternalMedianAbsoluteDeviation.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@
4343
import java.util.Map;
4444
import java.util.Objects;
4545

46+
import com.tdunning.math.stats.Centroid;
47+
4648
/**
4749
* Implementation of median absolute deviation agg
4850
*
@@ -57,11 +59,14 @@ static double computeMedianAbsoluteDeviation(TDigestState valuesSketch) {
5759
} else {
5860
final double approximateMedian = valuesSketch.quantile(0.5);
5961
final TDigestState approximatedDeviationsSketch = new TDigestState(valuesSketch.compression());
60-
valuesSketch.centroids().forEach(centroid -> {
62+
for (Centroid centroid : valuesSketch.centroids()) {
6163
final double deviation = Math.abs(approximateMedian - centroid.mean());
62-
approximatedDeviationsSketch.add(deviation, centroid.count());
63-
});
64-
64+
// Weighted add() isn't supported for faster MergingDigest implementation, so add iteratively instead. see
65+
// https://github.com/tdunning/t-digest/issues/167
66+
for (int i = 0; i < centroid.count(); i++) {
67+
approximatedDeviationsSketch.add(deviation);
68+
}
69+
}
6570
return approximatedDeviationsSketch.quantile(0.5);
6671
}
6772
}

server/src/main/java/org/opensearch/search/aggregations/metrics/TDigestState.java

Lines changed: 52 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -31,21 +31,25 @@
3131

3232
package org.opensearch.search.aggregations.metrics;
3333

34+
import org.opensearch.Version;
3435
import org.opensearch.core.common.io.stream.StreamInput;
3536
import org.opensearch.core.common.io.stream.StreamOutput;
3637

3738
import java.io.IOException;
39+
import java.nio.ByteBuffer;
3840
import java.util.Iterator;
41+
import java.util.List;
3942

4043
import com.tdunning.math.stats.AVLTreeDigest;
4144
import com.tdunning.math.stats.Centroid;
45+
import com.tdunning.math.stats.MergingDigest;
4246

4347
/**
4448
* Extension of {@link com.tdunning.math.stats.TDigest} with custom serialization.
4549
*
4650
* @opensearch.internal
4751
*/
48-
public class TDigestState extends AVLTreeDigest {
52+
public class TDigestState extends MergingDigest {
4953

5054
private final double compression;
5155

@@ -54,28 +58,64 @@ public TDigestState(double compression) {
5458
this.compression = compression;
5559
}
5660

61+
private TDigestState(double compression, MergingDigest in) {
62+
super(compression);
63+
this.compression = compression;
64+
this.add(List.of(in));
65+
}
66+
5767
@Override
5868
public double compression() {
5969
return compression;
6070
}
6171

6272
public static void write(TDigestState state, StreamOutput out) throws IOException {
63-
out.writeDouble(state.compression);
64-
out.writeVInt(state.centroidCount());
65-
for (Centroid centroid : state.centroids()) {
66-
out.writeDouble(centroid.mean());
67-
out.writeVLong(centroid.count());
73+
if (out.getVersion().before(Version.V_3_1_0)) {
74+
out.writeDouble(state.compression);
75+
out.writeVInt(state.centroidCount());
76+
for (Centroid centroid : state.centroids()) {
77+
out.writeDouble(centroid.mean());
78+
out.writeVLong(centroid.count());
79+
}
80+
} else {
81+
int byteSize = state.byteSize();
82+
out.writeVInt(byteSize);
83+
ByteBuffer buf = ByteBuffer.allocate(byteSize);
84+
state.asBytes(buf);
85+
out.writeBytes(buf.array());
6886
}
6987
}
7088

7189
public static TDigestState read(StreamInput in) throws IOException {
72-
double compression = in.readDouble();
73-
TDigestState state = new TDigestState(compression);
74-
int n = in.readVInt();
75-
for (int i = 0; i < n; i++) {
76-
state.add(in.readDouble(), in.readVInt());
90+
if (in.getVersion().before(Version.V_3_1_0)) {
91+
// In older versions TDigestState was based on AVLTreeDigest. Load centroids into this class, then add it to MergingDigest.
92+
double compression = in.readDouble();
93+
AVLTreeDigest treeDigest = new AVLTreeDigest(compression);
94+
int n = in.readVInt();
95+
if (n > 0) {
96+
for (int i = 0; i < n; i++) {
97+
treeDigest.add(in.readDouble(), in.readVInt());
98+
}
99+
TDigestState state = new TDigestState(compression);
100+
state.add(List.of(treeDigest));
101+
return state;
102+
}
103+
return new TDigestState(compression);
104+
} else {
105+
// For MergingDigest, adding the original centroids in ascending order to a new, empty MergingDigest isn't guaranteed
106+
// to produce a MergingDigest whose centroids are exactly equal to the originals.
107+
// So, use the library's serialization code to ensure we get the exact same centroids, allowing us to compare with equals().
108+
// The AVLTreeDigest had the same limitation for equals() where it was only guaranteed to return true if the other object was
109+
// produced by de/serializing the object, so this should be fine.
110+
int byteSize = in.readVInt();
111+
byte[] bytes = new byte[byteSize];
112+
in.readBytes(bytes, 0, byteSize);
113+
MergingDigest mergingDigest = MergingDigest.fromBytes(ByteBuffer.wrap(bytes));
114+
if (mergingDigest.centroids().isEmpty()) {
115+
return new TDigestState(mergingDigest.compression());
116+
}
117+
return new TDigestState(mergingDigest.compression(), mergingDigest);
77118
}
78-
return state;
79119
}
80120

81121
@Override

server/src/test/java/org/opensearch/search/aggregations/metrics/InternalTDigestPercentilesRanksTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ protected InternalTDigestPercentileRanks createTestInstance(
5454
Arrays.stream(values).forEach(state::add);
5555

5656
// the number of centroids is defined as <= the number of samples inserted
57-
assertTrue(state.centroidCount() <= values.length);
57+
assertTrue(state.centroids().size() <= values.length);
5858
return new InternalTDigestPercentileRanks(name, percents, state, keyed, format, metadata);
5959
}
6060

@@ -66,7 +66,7 @@ protected void assertReduced(InternalTDigestPercentileRanks reduced, List<Intern
6666
double max = Double.NEGATIVE_INFINITY;
6767
long totalCount = 0;
6868
for (InternalTDigestPercentileRanks ranks : inputs) {
69-
if (ranks.state.centroidCount() == 0) {
69+
if (ranks.state.centroids().isEmpty()) {
7070
// quantiles would return NaN
7171
continue;
7272
}

server/src/test/java/org/opensearch/search/aggregations/metrics/InternalTDigestPercentilesTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ protected InternalTDigestPercentiles createTestInstance(
5454
Arrays.stream(values).forEach(state::add);
5555

5656
// the number of centroids is defined as <= the number of samples inserted
57-
assertTrue(state.centroidCount() <= values.length);
57+
assertTrue(state.centroids().size() <= values.length);
5858
return new InternalTDigestPercentiles(name, percents, state, keyed, format, metadata);
5959
}
6060

server/src/test/java/org/opensearch/search/aggregations/metrics/TDigestPercentilesAggregatorTests.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ public void testSomeMatchesSortedNumericDocValues() throws IOException {
104104
iw.addDocument(singleton(new SortedNumericDocValuesField("number", 0)));
105105
}, tdigest -> {
106106
assertEquals(7L, tdigest.state.size());
107-
assertEquals(7L, tdigest.state.centroidCount());
107+
assertEquals(7L, tdigest.state.centroids().size());
108108
assertEquals(5.0d, tdigest.percentile(75), 0.0d);
109109
assertEquals("5.0", tdigest.percentileAsString(75));
110110
assertEquals(3.0d, tdigest.percentile(71), 0.0d);
@@ -128,7 +128,7 @@ public void testSomeMatchesNumericDocValues() throws IOException {
128128
iw.addDocument(singleton(new NumericDocValuesField("number", 0)));
129129
}, tdigest -> {
130130
assertEquals(tdigest.state.size(), 7L);
131-
assertTrue(tdigest.state.centroidCount() <= 7L);
131+
assertTrue(tdigest.state.centroids().size() <= 7L);
132132
assertEquals(8.0d, tdigest.percentile(100), 0.0d);
133133
assertEquals("8.0", tdigest.percentileAsString(100));
134134
assertEquals(8.0d, tdigest.percentile(88), 0.0d);
@@ -156,7 +156,7 @@ public void testQueryFiltering() throws IOException {
156156

157157
testCase(LongPoint.newRangeQuery("row", 1, 4), docs, tdigest -> {
158158
assertEquals(4L, tdigest.state.size());
159-
assertEquals(4L, tdigest.state.centroidCount());
159+
assertEquals(4L, tdigest.state.centroids().size());
160160
assertEquals(2.0d, tdigest.percentile(100), 0.0d);
161161
assertEquals(1.0d, tdigest.percentile(50), 0.0d);
162162
assertEquals(1.0d, tdigest.percentile(25), 0.0d);
@@ -165,7 +165,7 @@ public void testQueryFiltering() throws IOException {
165165

166166
testCase(LongPoint.newRangeQuery("row", 100, 110), docs, tdigest -> {
167167
assertEquals(0L, tdigest.state.size());
168-
assertEquals(0L, tdigest.state.centroidCount());
168+
assertEquals(0L, tdigest.state.centroids().size());
169169
assertFalse(AggregationInspectionHelper.hasValue(tdigest));
170170
});
171171
}

0 commit comments

Comments
 (0)