Skip to content

Commit 2f12e28

Browse files
opensearch-trigger-bot[bot]github-actions[bot]Peter Alfonsi
authored
Revert "Switch percentiles implementation to MergingDigest (#18124)" (#18497) (#18517)
--------- (cherry picked from commit afb08a0) Signed-off-by: Peter Alfonsi <[email protected]> Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: Peter Alfonsi <[email protected]>
1 parent 69bf501 commit 2f12e28

File tree

6 files changed

+24
-69
lines changed

6 files changed

+24
-69
lines changed

gradle/libs.versions.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ protobuf = "3.25.5"
2525
jakarta_annotation = "1.3.5"
2626
google_http_client = "1.44.1"
2727
google_auth = "1.29.0"
28-
tdigest = "3.3" # Warning: Before updating tdigest, ensure its serialization code for MergingDigest hasn't changed
28+
tdigest = "3.3"
2929
hdrhistogram = "2.2.2"
3030
grpc = "1.68.2"
3131
json_smart = "2.5.2"

server/src/main/java/org/opensearch/search/aggregations/metrics/InternalMedianAbsoluteDeviation.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,6 @@
4343
import java.util.Map;
4444
import java.util.Objects;
4545

46-
import com.tdunning.math.stats.Centroid;
47-
4846
/**
4947
* Implementation of median absolute deviation agg
5048
*
@@ -59,14 +57,11 @@ static double computeMedianAbsoluteDeviation(TDigestState valuesSketch) {
5957
} else {
6058
final double approximateMedian = valuesSketch.quantile(0.5);
6159
final TDigestState approximatedDeviationsSketch = new TDigestState(valuesSketch.compression());
62-
for (Centroid centroid : valuesSketch.centroids()) {
60+
valuesSketch.centroids().forEach(centroid -> {
6361
final double deviation = Math.abs(approximateMedian - centroid.mean());
64-
// Weighted add() isn't supported for faster MergingDigest implementation, so add iteratively instead. see
65-
// https://github.com/tdunning/t-digest/issues/167
66-
for (int i = 0; i < centroid.count(); i++) {
67-
approximatedDeviationsSketch.add(deviation);
68-
}
69-
}
62+
approximatedDeviationsSketch.add(deviation, centroid.count());
63+
});
64+
7065
return approximatedDeviationsSketch.quantile(0.5);
7166
}
7267
}

server/src/main/java/org/opensearch/search/aggregations/metrics/TDigestState.java

Lines changed: 12 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -31,25 +31,21 @@
3131

3232
package org.opensearch.search.aggregations.metrics;
3333

34-
import org.opensearch.Version;
3534
import org.opensearch.core.common.io.stream.StreamInput;
3635
import org.opensearch.core.common.io.stream.StreamOutput;
3736

3837
import java.io.IOException;
39-
import java.nio.ByteBuffer;
4038
import java.util.Iterator;
41-
import java.util.List;
4239

4340
import com.tdunning.math.stats.AVLTreeDigest;
4441
import com.tdunning.math.stats.Centroid;
45-
import com.tdunning.math.stats.MergingDigest;
4642

4743
/**
4844
* Extension of {@link com.tdunning.math.stats.TDigest} with custom serialization.
4945
*
5046
* @opensearch.internal
5147
*/
52-
public class TDigestState extends MergingDigest {
48+
public class TDigestState extends AVLTreeDigest {
5349

5450
private final double compression;
5551

@@ -58,64 +54,28 @@ public TDigestState(double compression) {
5854
this.compression = compression;
5955
}
6056

61-
private TDigestState(double compression, MergingDigest in) {
62-
super(compression);
63-
this.compression = compression;
64-
this.add(List.of(in));
65-
}
66-
6757
@Override
6858
public double compression() {
6959
return compression;
7060
}
7161

7262
public static void write(TDigestState state, StreamOutput out) throws IOException {
73-
if (out.getVersion().before(Version.V_3_1_0)) {
74-
out.writeDouble(state.compression);
75-
out.writeVInt(state.centroidCount());
76-
for (Centroid centroid : state.centroids()) {
77-
out.writeDouble(centroid.mean());
78-
out.writeVLong(centroid.count());
79-
}
80-
} else {
81-
int byteSize = state.byteSize();
82-
out.writeVInt(byteSize);
83-
ByteBuffer buf = ByteBuffer.allocate(byteSize);
84-
state.asBytes(buf);
85-
out.writeBytes(buf.array());
63+
out.writeDouble(state.compression);
64+
out.writeVInt(state.centroidCount());
65+
for (Centroid centroid : state.centroids()) {
66+
out.writeDouble(centroid.mean());
67+
out.writeVLong(centroid.count());
8668
}
8769
}
8870

8971
public static TDigestState read(StreamInput in) throws IOException {
90-
if (in.getVersion().before(Version.V_3_1_0)) {
91-
// In older versions TDigestState was based on AVLTreeDigest. Load centroids into this class, then add it to MergingDigest.
92-
double compression = in.readDouble();
93-
AVLTreeDigest treeDigest = new AVLTreeDigest(compression);
94-
int n = in.readVInt();
95-
if (n > 0) {
96-
for (int i = 0; i < n; i++) {
97-
treeDigest.add(in.readDouble(), in.readVInt());
98-
}
99-
TDigestState state = new TDigestState(compression);
100-
state.add(List.of(treeDigest));
101-
return state;
102-
}
103-
return new TDigestState(compression);
104-
} else {
105-
// For MergingDigest, adding the original centroids in ascending order to a new, empty MergingDigest isn't guaranteed
106-
// to produce a MergingDigest whose centroids are exactly equal to the originals.
107-
// So, use the library's serialization code to ensure we get the exact same centroids, allowing us to compare with equals().
108-
// The AVLTreeDigest had the same limitation for equals() where it was only guaranteed to return true if the other object was
109-
// produced by de/serializing the object, so this should be fine.
110-
int byteSize = in.readVInt();
111-
byte[] bytes = new byte[byteSize];
112-
in.readBytes(bytes, 0, byteSize);
113-
MergingDigest mergingDigest = MergingDigest.fromBytes(ByteBuffer.wrap(bytes));
114-
if (mergingDigest.centroids().isEmpty()) {
115-
return new TDigestState(mergingDigest.compression());
116-
}
117-
return new TDigestState(mergingDigest.compression(), mergingDigest);
72+
double compression = in.readDouble();
73+
TDigestState state = new TDigestState(compression);
74+
int n = in.readVInt();
75+
for (int i = 0; i < n; i++) {
76+
state.add(in.readDouble(), in.readVInt());
11877
}
78+
return state;
11979
}
12080

12181
@Override

server/src/test/java/org/opensearch/search/aggregations/metrics/InternalTDigestPercentilesRanksTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ protected InternalTDigestPercentileRanks createTestInstance(
5454
Arrays.stream(values).forEach(state::add);
5555

5656
// the number of centroids is defined as <= the number of samples inserted
57-
assertTrue(state.centroids().size() <= values.length);
57+
assertTrue(state.centroidCount() <= values.length);
5858
return new InternalTDigestPercentileRanks(name, percents, state, keyed, format, metadata);
5959
}
6060

@@ -66,7 +66,7 @@ protected void assertReduced(InternalTDigestPercentileRanks reduced, List<Intern
6666
double max = Double.NEGATIVE_INFINITY;
6767
long totalCount = 0;
6868
for (InternalTDigestPercentileRanks ranks : inputs) {
69-
if (ranks.state.centroids().isEmpty()) {
69+
if (ranks.state.centroidCount() == 0) {
7070
// quantiles would return NaN
7171
continue;
7272
}

server/src/test/java/org/opensearch/search/aggregations/metrics/InternalTDigestPercentilesTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ protected InternalTDigestPercentiles createTestInstance(
5454
Arrays.stream(values).forEach(state::add);
5555

5656
// the number of centroids is defined as <= the number of samples inserted
57-
assertTrue(state.centroids().size() <= values.length);
57+
assertTrue(state.centroidCount() <= values.length);
5858
return new InternalTDigestPercentiles(name, percents, state, keyed, format, metadata);
5959
}
6060

server/src/test/java/org/opensearch/search/aggregations/metrics/TDigestPercentilesAggregatorTests.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ public void testSomeMatchesSortedNumericDocValues() throws IOException {
104104
iw.addDocument(singleton(new SortedNumericDocValuesField("number", 0)));
105105
}, tdigest -> {
106106
assertEquals(7L, tdigest.state.size());
107-
assertEquals(7L, tdigest.state.centroids().size());
107+
assertEquals(7L, tdigest.state.centroidCount());
108108
assertEquals(5.0d, tdigest.percentile(75), 0.0d);
109109
assertEquals("5.0", tdigest.percentileAsString(75));
110110
assertEquals(3.0d, tdigest.percentile(71), 0.0d);
@@ -128,7 +128,7 @@ public void testSomeMatchesNumericDocValues() throws IOException {
128128
iw.addDocument(singleton(new NumericDocValuesField("number", 0)));
129129
}, tdigest -> {
130130
assertEquals(tdigest.state.size(), 7L);
131-
assertTrue(tdigest.state.centroids().size() <= 7L);
131+
assertTrue(tdigest.state.centroidCount() <= 7L);
132132
assertEquals(8.0d, tdigest.percentile(100), 0.0d);
133133
assertEquals("8.0", tdigest.percentileAsString(100));
134134
assertEquals(8.0d, tdigest.percentile(88), 0.0d);
@@ -156,7 +156,7 @@ public void testQueryFiltering() throws IOException {
156156

157157
testCase(LongPoint.newRangeQuery("row", 1, 4), docs, tdigest -> {
158158
assertEquals(4L, tdigest.state.size());
159-
assertEquals(4L, tdigest.state.centroids().size());
159+
assertEquals(4L, tdigest.state.centroidCount());
160160
assertEquals(2.0d, tdigest.percentile(100), 0.0d);
161161
assertEquals(1.0d, tdigest.percentile(50), 0.0d);
162162
assertEquals(1.0d, tdigest.percentile(25), 0.0d);
@@ -165,7 +165,7 @@ public void testQueryFiltering() throws IOException {
165165

166166
testCase(LongPoint.newRangeQuery("row", 100, 110), docs, tdigest -> {
167167
assertEquals(0L, tdigest.state.size());
168-
assertEquals(0L, tdigest.state.centroids().size());
168+
assertEquals(0L, tdigest.state.centroidCount());
169169
assertFalse(AggregationInspectionHelper.hasValue(tdigest));
170170
});
171171
}

0 commit comments

Comments
 (0)