Skip to content

Commit 9d0ca4d

Browse files
bowenlan-amznrgsriram
authored andcommitted
Support sub agg in filter rewrite optimization (opensearch-project#17447)
* Support sub agg in filter rewrite optimization Signed-off-by: bowenlan-amzn <[email protected]> * Clean unused code Signed-off-by: bowenlan-amzn <[email protected]> * remove singleton DV related change Signed-off-by: bowenlan-amzn <[email protected]> * let aggregator decide whether to support sub agg Signed-off-by: bowenlan-amzn <[email protected]> * refactor range collector Signed-off-by: bowenlan-amzn <[email protected]> * prevent NPE Signed-off-by: bowenlan-amzn <[email protected]> * handle tryPrecomputeAggregationForLeaf interface Signed-off-by: bowenlan-amzn <[email protected]> * clean up for review Signed-off-by: bowenlan-amzn <[email protected]> * add changelog Signed-off-by: bowenlan-amzn <[email protected]> * add segment level check this is for the regression we see in pmc, docs per segment doesn't work well. we should also consider number of ranges. From experiments I choose 1000. Signed-off-by: bowenlan-amzn <[email protected]> * improvements - throw exception for unreachable path - several place to only run when hasSub - range agg only works for match all Signed-off-by: bowenlan-amzn <[email protected]> * experiment annotation Signed-off-by: bowenlan-amzn <[email protected]> * Update server/src/main/java/org/opensearch/search/SearchService.java Signed-off-by: bowenlan-amzn <[email protected]> * Collect sub agg after each bucket Signed-off-by: bowenlan-amzn <[email protected]> * try fixed bit set Signed-off-by: bowenlan-amzn <[email protected]> * address comments Signed-off-by: bowenlan-amzn <[email protected]> --------- Signed-off-by: bowenlan-amzn <[email protected]> Signed-off-by: Sriram Ganesh <[email protected]>
1 parent bb12e5c commit 9d0ca4d

21 files changed

+1062
-150
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
3838
- [Star Tree] [Search] Add query changes to support unsigned-long in star tree ([#17275](https://github.com/opensearch-project/OpenSearch/pull/17275))
3939
- Add `ApproximateMatchAllQuery` that targets match_all queries and approximates sorts ([#17772](https://github.com/opensearch-project/OpenSearch/pull/17772))
4040
- Add TermsQuery support to Search GRPC endpoint ([#17888](https://github.com/opensearch-project/OpenSearch/pull/17888))
41+
- Support sub agg in filter rewrite optimization ([#17447](https://github.com/opensearch-project/OpenSearch/pull/17447)
4142

4243
### Changed
4344
- Migrate BC libs to their FIPS counterparts ([#14912](https://github.com/opensearch-project/OpenSearch/pull/14912))

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -552,6 +552,7 @@ public void apply(Settings value, Settings current, Settings previous) {
552552
SearchService.MAX_OPEN_PIT_CONTEXT,
553553
SearchService.MAX_PIT_KEEPALIVE_SETTING,
554554
SearchService.MAX_AGGREGATION_REWRITE_FILTERS,
555+
SearchService.AGGREGATION_REWRITE_FILTER_SEGMENT_THRESHOLD,
555556
SearchService.INDICES_MAX_CLAUSE_COUNT_SETTING,
556557
SearchService.CARDINALITY_AGGREGATION_PRUNING_THRESHOLD,
557558
SearchService.KEYWORD_INDEX_OR_DOC_VALUES_ENABLED,

server/src/main/java/org/opensearch/search/DefaultSearchContext.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@
115115
import java.util.function.Function;
116116
import java.util.function.LongSupplier;
117117

118+
import static org.opensearch.search.SearchService.AGGREGATION_REWRITE_FILTER_SEGMENT_THRESHOLD;
118119
import static org.opensearch.search.SearchService.CARDINALITY_AGGREGATION_PRUNING_THRESHOLD;
119120
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_MODE;
120121
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
@@ -207,6 +208,7 @@ final class DefaultSearchContext extends SearchContext {
207208
private final String concurrentSearchMode;
208209
private final SetOnce<Boolean> requestShouldUseConcurrentSearch = new SetOnce<>();
209210
private final int maxAggRewriteFilters;
211+
private final int filterRewriteSegmentThreshold;
210212
private final int cardinalityAggregationPruningThreshold;
211213
private final boolean keywordIndexOrDocValuesEnabled;
212214

@@ -267,6 +269,7 @@ final class DefaultSearchContext extends SearchContext {
267269
this.requestToAggReduceContextBuilder = requestToAggReduceContextBuilder;
268270

269271
this.maxAggRewriteFilters = evaluateFilterRewriteSetting();
272+
this.filterRewriteSegmentThreshold = evaluateAggRewriteFilterSegThreshold();
270273
this.cardinalityAggregationPruningThreshold = evaluateCardinalityAggregationPruningThreshold();
271274
this.concurrentSearchDeciderFactories = concurrentSearchDeciderFactories;
272275
this.keywordIndexOrDocValuesEnabled = evaluateKeywordIndexOrDocValuesEnabled();
@@ -1124,6 +1127,18 @@ private int evaluateFilterRewriteSetting() {
11241127
return 0;
11251128
}
11261129

1130+
@Override
1131+
public int filterRewriteSegmentThreshold() {
1132+
return filterRewriteSegmentThreshold;
1133+
}
1134+
1135+
private int evaluateAggRewriteFilterSegThreshold() {
1136+
if (clusterService != null) {
1137+
return clusterService.getClusterSettings().get(AGGREGATION_REWRITE_FILTER_SEGMENT_THRESHOLD);
1138+
}
1139+
return 0;
1140+
}
1141+
11271142
@Override
11281143
public int cardinalityAggregationPruningThreshold() {
11291144
return cardinalityAggregationPruningThreshold;

server/src/main/java/org/opensearch/search/SearchService.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import org.opensearch.cluster.service.ClusterService;
5454
import org.opensearch.common.CheckedSupplier;
5555
import org.opensearch.common.UUIDs;
56+
import org.opensearch.common.annotation.ExperimentalApi;
5657
import org.opensearch.common.lease.Releasable;
5758
import org.opensearch.common.lease.Releasables;
5859
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
@@ -309,6 +310,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
309310
);
310311

311312
// value 0 means rewrite filters optimization in aggregations will be disabled
313+
@ExperimentalApi
312314
public static final Setting<Integer> MAX_AGGREGATION_REWRITE_FILTERS = Setting.intSetting(
313315
"search.max_aggregation_rewrite_filters",
314316
3000,
@@ -317,6 +319,16 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
317319
Property.NodeScope
318320
);
319321

322+
// only do optimization when there's enough docs per range at segment level and sub agg exists
323+
@ExperimentalApi
324+
public static final Setting<Integer> AGGREGATION_REWRITE_FILTER_SEGMENT_THRESHOLD = Setting.intSetting(
325+
"search.aggregation_rewrite_filters.segment_threshold.docs_per_bucket",
326+
1000,
327+
0,
328+
Property.Dynamic,
329+
Property.NodeScope
330+
);
331+
320332
public static final Setting<Integer> INDICES_MAX_CLAUSE_COUNT_SETTING = Setting.intSetting(
321333
"indices.query.bool.max_clause_count",
322334
1024,

server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@
9494
import java.util.stream.Collectors;
9595

9696
import static org.opensearch.search.aggregations.MultiBucketConsumerService.MAX_BUCKET_SETTING;
97-
import static org.opensearch.search.aggregations.bucket.filterrewrite.DateHistogramAggregatorBridge.segmentMatchAll;
97+
import static org.opensearch.search.aggregations.bucket.filterrewrite.AggregatorBridge.segmentMatchAll;
9898

9999
/**
100100
* Main aggregator that aggregates docs from multiple aggregations
@@ -173,6 +173,9 @@ public final class CompositeAggregator extends BucketsAggregator {
173173

174174
@Override
175175
protected boolean canOptimize() {
176+
if (subAggregators.length > 0) {
177+
return false;
178+
}
176179
if (canOptimize(sourceConfigs)) {
177180
this.valuesSource = (RoundingValuesSource) sourceConfigs[0].valuesSource();
178181
if (rawAfterKey != null) {
@@ -566,7 +569,12 @@ private void processLeafFromQuery(LeafReaderContext ctx, Sort indexSortPrefix) t
566569
@Override
567570
protected boolean tryPrecomputeAggregationForLeaf(LeafReaderContext ctx) throws IOException {
568571
finishLeaf(); // May need to wrap up previous leaf if it could not be precomputed
569-
return filterRewriteOptimizationContext.tryOptimize(ctx, this::incrementBucketDocCount, segmentMatchAll(context, ctx));
572+
return filterRewriteOptimizationContext.tryOptimize(
573+
ctx,
574+
this::incrementBucketDocCount,
575+
segmentMatchAll(context, ctx),
576+
collectableSubAggregators
577+
);
570578
}
571579

572580
@Override

server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/AggregatorBridge.java

Lines changed: 48 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,23 @@
88

99
package org.opensearch.search.aggregations.bucket.filterrewrite;
1010

11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
1113
import org.apache.lucene.index.LeafReaderContext;
1214
import org.apache.lucene.index.PointValues;
1315
import org.apache.lucene.search.ScoreMode;
1416
import org.apache.lucene.search.Weight;
1517
import org.opensearch.index.mapper.MappedFieldType;
18+
import org.opensearch.search.aggregations.bucket.filterrewrite.rangecollector.RangeCollector;
1619
import org.opensearch.search.internal.SearchContext;
1720

1821
import java.io.IOException;
1922
import java.util.function.BiConsumer;
2023
import java.util.function.Consumer;
24+
import java.util.function.Function;
25+
26+
import static org.opensearch.search.aggregations.bucket.filterrewrite.PointTreeTraversal.createCollector;
27+
import static org.opensearch.search.aggregations.bucket.filterrewrite.PointTreeTraversal.multiRangesTraverse;
2128

2229
/**
2330
* This interface provides a bridge between an aggregator and the optimization context, allowing
@@ -35,6 +42,8 @@
3542
*/
3643
public abstract class AggregatorBridge {
3744

45+
static final Logger logger = LogManager.getLogger(Helper.loggerName);
46+
3847
/**
3948
* The field type associated with this aggregator bridge.
4049
*/
@@ -75,16 +84,51 @@ void setRangesConsumer(Consumer<Ranges> setRanges) {
7584
/**
7685
* Attempts to build aggregation results for a segment
7786
*
78-
* @param values the point values (index structure for numeric values) for a segment
79-
* @param incrementDocCount a consumer to increment the document count for a range bucket. The First parameter is document count, the second is the key of the bucket
87+
* @param values the point values (index structure for numeric values) for a segment
88+
* @param incrementDocCount a consumer to increment the document count for a range bucket. The First parameter is document count, the second is the key of the bucket
8089
* @param ranges
90+
* @param subAggCollectorParam
8191
*/
82-
abstract FilterRewriteOptimizationContext.DebugInfo tryOptimize(
92+
abstract FilterRewriteOptimizationContext.OptimizeResult tryOptimize(
8393
PointValues values,
8494
BiConsumer<Long, Long> incrementDocCount,
85-
Ranges ranges
95+
Ranges ranges,
96+
FilterRewriteOptimizationContext.SubAggCollectorParam subAggCollectorParam
8697
) throws IOException;
8798

99+
static FilterRewriteOptimizationContext.OptimizeResult getResult(
100+
PointValues values,
101+
BiConsumer<Long, Long> incrementDocCount,
102+
Ranges ranges,
103+
Function<Integer, Long> getBucketOrd,
104+
int size,
105+
FilterRewriteOptimizationContext.SubAggCollectorParam subAggCollectorParam
106+
) throws IOException {
107+
BiConsumer<Integer, Integer> incrementFunc = (activeIndex, docCount) -> {
108+
long bucketOrd = getBucketOrd.apply(activeIndex);
109+
incrementDocCount.accept(bucketOrd, (long) docCount);
110+
};
111+
112+
PointValues.PointTree tree = values.getPointTree();
113+
FilterRewriteOptimizationContext.OptimizeResult optimizeResult = new FilterRewriteOptimizationContext.OptimizeResult();
114+
int activeIndex = ranges.firstRangeIndex(tree.getMinPackedValue(), tree.getMaxPackedValue());
115+
if (activeIndex < 0) {
116+
logger.debug("No ranges match the query, skip the fast filter optimization");
117+
return optimizeResult;
118+
}
119+
RangeCollector collector = createCollector(
120+
ranges,
121+
incrementFunc,
122+
size,
123+
activeIndex,
124+
getBucketOrd,
125+
optimizeResult,
126+
subAggCollectorParam
127+
);
128+
129+
return multiRangesTraverse(tree, collector);
130+
}
131+
88132
/**
89133
* Checks whether the top level query matches all documents on the segment
90134
*

server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/DateHistogramAggregatorBridge.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@
2323
import java.util.function.BiConsumer;
2424
import java.util.function.Function;
2525

26-
import static org.opensearch.search.aggregations.bucket.filterrewrite.PointTreeTraversal.multiRangesTraverse;
27-
2826
/**
2927
* For date histogram aggregation
3028
*/
@@ -127,27 +125,31 @@ private DateFieldMapper.DateFieldType getFieldType() {
127125
return (DateFieldMapper.DateFieldType) fieldType;
128126
}
129127

128+
/**
129+
* Get the size of buckets to stop early
130+
*/
130131
protected int getSize() {
131132
return Integer.MAX_VALUE;
132133
}
133134

134135
@Override
135-
final FilterRewriteOptimizationContext.DebugInfo tryOptimize(
136+
final FilterRewriteOptimizationContext.OptimizeResult tryOptimize(
136137
PointValues values,
137138
BiConsumer<Long, Long> incrementDocCount,
138-
Ranges ranges
139+
Ranges ranges,
140+
FilterRewriteOptimizationContext.SubAggCollectorParam subAggCollectorParam
139141
) throws IOException {
140142
int size = getSize();
141143

142144
DateFieldMapper.DateFieldType fieldType = getFieldType();
143-
BiConsumer<Integer, Integer> incrementFunc = (activeIndex, docCount) -> {
145+
146+
Function<Integer, Long> getBucketOrd = (activeIndex) -> {
144147
long rangeStart = LongPoint.decodeDimension(ranges.lowers[activeIndex], 0);
145148
rangeStart = fieldType.convertNanosToMillis(rangeStart);
146-
long bucketOrd = getBucketOrd(bucketOrdProducer().apply(rangeStart));
147-
incrementDocCount.accept(bucketOrd, (long) docCount);
149+
return getBucketOrd(bucketOrdProducer().apply(rangeStart));
148150
};
149151

150-
return multiRangesTraverse(values.getPointTree(), ranges, incrementFunc, size);
152+
return getResult(values, incrementDocCount, ranges, getBucketOrd, size, subAggCollectorParam);
151153
}
152154

153155
private static long getBucketOrd(long bucketOrd) {

server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/FilterRewriteOptimizationContext.java

Lines changed: 53 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@
1414
import org.apache.lucene.index.LeafReaderContext;
1515
import org.apache.lucene.index.NumericDocValues;
1616
import org.apache.lucene.index.PointValues;
17+
import org.apache.lucene.util.DocIdSetBuilder;
1718
import org.opensearch.index.mapper.DocCountFieldMapper;
19+
import org.opensearch.search.aggregations.BucketCollector;
1820
import org.opensearch.search.internal.SearchContext;
1921

2022
import java.io.IOException;
@@ -42,12 +44,16 @@ public final class FilterRewriteOptimizationContext {
4244

4345
private Ranges ranges; // built at shard level
4446

47+
private boolean hasSubAgg;
48+
4549
// debug info related fields
4650
private final AtomicInteger leafNodeVisited = new AtomicInteger();
4751
private final AtomicInteger innerNodeVisited = new AtomicInteger();
4852
private final AtomicInteger segments = new AtomicInteger();
4953
private final AtomicInteger optimizedSegments = new AtomicInteger();
5054

55+
private int segmentThreshold = 0;
56+
5157
public FilterRewriteOptimizationContext(
5258
AggregatorBridge aggregatorBridge,
5359
final Object parent,
@@ -65,7 +71,8 @@ public FilterRewriteOptimizationContext(
6571
private boolean canOptimize(final Object parent, final int subAggLength, SearchContext context) throws IOException {
6672
if (context.maxAggRewriteFilters() == 0) return false;
6773

68-
if (parent != null || subAggLength != 0) return false;
74+
if (parent != null) return false;
75+
this.hasSubAgg = subAggLength > 0;
6976

7077
boolean canOptimize = aggregatorBridge.canOptimize();
7178
if (canOptimize) {
@@ -81,6 +88,7 @@ private boolean canOptimize(final Object parent, final int subAggLength, SearchC
8188
}
8289
logger.debug("Fast filter rewriteable: {} for shard {}", canOptimize, shardId);
8390

91+
segmentThreshold = context.filterRewriteSegmentThreshold();
8492
return canOptimize;
8593
}
8694

@@ -94,10 +102,14 @@ void setRanges(Ranges ranges) {
94102
* Usage: invoked at segment level — in getLeafCollector of aggregator
95103
*
96104
* @param incrementDocCount consume the doc_count results for certain ordinal
97-
* @param segmentMatchAll if your optimization can prepareFromSegment, you should pass in this flag to decide whether to prepareFromSegment
105+
* @param segmentMatchAll we can always tryOptimize for match all scenario
98106
*/
99-
public boolean tryOptimize(final LeafReaderContext leafCtx, final BiConsumer<Long, Long> incrementDocCount, boolean segmentMatchAll)
100-
throws IOException {
107+
public boolean tryOptimize(
108+
final LeafReaderContext leafCtx,
109+
final BiConsumer<Long, Long> incrementDocCount,
110+
boolean segmentMatchAll,
111+
BucketCollector collectableSubAggregators
112+
) throws IOException {
101113
segments.incrementAndGet();
102114
if (!canOptimize) {
103115
return false;
@@ -123,7 +135,25 @@ public boolean tryOptimize(final LeafReaderContext leafCtx, final BiConsumer<Lon
123135
Ranges ranges = getRanges(leafCtx, segmentMatchAll);
124136
if (ranges == null) return false;
125137

126-
consumeDebugInfo(aggregatorBridge.tryOptimize(values, incrementDocCount, ranges));
138+
if (hasSubAgg && this.segmentThreshold > leafCtx.reader().maxDoc() / ranges.getSize()) {
139+
// comparing with a rough estimate of docs per range in this segment
140+
return false;
141+
}
142+
143+
OptimizeResult optimizeResult;
144+
SubAggCollectorParam subAggCollectorParam;
145+
if (hasSubAgg) {
146+
subAggCollectorParam = new SubAggCollectorParam(collectableSubAggregators, leafCtx);
147+
} else {
148+
subAggCollectorParam = null;
149+
}
150+
try {
151+
optimizeResult = aggregatorBridge.tryOptimize(values, incrementDocCount, ranges, subAggCollectorParam);
152+
consumeDebugInfo(optimizeResult);
153+
} catch (AbortFilterRewriteOptimizationException e) {
154+
logger.error("Abort filter rewrite optimization, fall back to default path");
155+
return false;
156+
}
127157

128158
optimizedSegments.incrementAndGet();
129159
logger.debug("Fast filter optimization applied to shard {} segment {}", shardId, leafCtx.ord);
@@ -132,6 +162,18 @@ public boolean tryOptimize(final LeafReaderContext leafCtx, final BiConsumer<Lon
132162
return true;
133163
}
134164

165+
/**
166+
* Parameters for {@link org.opensearch.search.aggregations.bucket.filterrewrite.rangecollector.SubAggRangeCollector}
167+
*/
168+
public record SubAggCollectorParam(BucketCollector collectableSubAggregators, LeafReaderContext leafCtx) {
169+
}
170+
171+
static class AbortFilterRewriteOptimizationException extends RuntimeException {
172+
AbortFilterRewriteOptimizationException(String message, Exception e) {
173+
super(message, e);
174+
}
175+
}
176+
135177
Ranges getRanges(LeafReaderContext leafCtx, boolean segmentMatchAll) {
136178
if (!preparedAtShardLevel) {
137179
try {
@@ -160,20 +202,22 @@ private Ranges getRangesFromSegment(LeafReaderContext leafCtx, boolean segmentMa
160202
/**
161203
* Contains debug info of BKD traversal to show in profile
162204
*/
163-
static class DebugInfo {
205+
public static class OptimizeResult {
164206
private final AtomicInteger leafNodeVisited = new AtomicInteger(); // leaf node visited
165207
private final AtomicInteger innerNodeVisited = new AtomicInteger(); // inner node visited
166208

167-
void visitLeaf() {
209+
public DocIdSetBuilder[] builders = new DocIdSetBuilder[0];
210+
211+
public void visitLeaf() {
168212
leafNodeVisited.incrementAndGet();
169213
}
170214

171-
void visitInner() {
215+
public void visitInner() {
172216
innerNodeVisited.incrementAndGet();
173217
}
174218
}
175219

176-
void consumeDebugInfo(DebugInfo debug) {
220+
void consumeDebugInfo(OptimizeResult debug) {
177221
leafNodeVisited.addAndGet(debug.leafNodeVisited.get());
178222
innerNodeVisited.addAndGet(debug.innerNodeVisited.get());
179223
}

0 commit comments

Comments
 (0)