Skip to content

Commit f6371a2

Browse files
committed
added aggregation precomputation for rare string terms aggregators and mapped string terms aggregator Signed-off-by: Anthony Leong <[email protected]>
1 parent 8d3386c commit f6371a2

File tree

5 files changed

+211
-1
lines changed

5 files changed

+211
-1
lines changed

server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,10 @@
3232
package org.opensearch.search.aggregations.bucket.terms;
3333

3434
import org.apache.lucene.index.LeafReaderContext;
35+
import org.apache.lucene.index.Terms;
36+
import org.apache.lucene.index.TermsEnum;
3537
import org.apache.lucene.search.ScoreMode;
38+
import org.apache.lucene.search.Weight;
3639
import org.apache.lucene.util.BytesRef;
3740
import org.apache.lucene.util.BytesRefBuilder;
3841
import org.apache.lucene.util.PriorityQueue;
@@ -75,8 +78,35 @@
7578
public class MapStringTermsAggregator extends AbstractStringTermsAggregator {
7679
private final CollectorSource collectorSource;
7780
private final ResultStrategy<?, ?> resultStrategy;
81+
private Weight weight;
7882
private final BytesKeyedBucketOrds bucketOrds;
7983
private final IncludeExclude.StringFilter includeExclude;
84+
protected final String fieldName;
85+
86+
public MapStringTermsAggregator(
87+
String name,
88+
AggregatorFactories factories,
89+
CollectorSource collectorSource,
90+
Function<MapStringTermsAggregator, ResultStrategy<?, ?>> resultStrategy,
91+
BucketOrder order,
92+
DocValueFormat format,
93+
BucketCountThresholds bucketCountThresholds,
94+
IncludeExclude.StringFilter includeExclude,
95+
SearchContext context,
96+
Aggregator parent,
97+
SubAggCollectionMode collectionMode,
98+
boolean showTermDocCountError,
99+
CardinalityUpperBound cardinality,
100+
Map<String, Object> metadata,
101+
String fieldName
102+
) throws IOException {
103+
super(name, factories, context, parent, order, format, bucketCountThresholds, collectionMode, showTermDocCountError, metadata);
104+
this.collectorSource = collectorSource;
105+
this.resultStrategy = resultStrategy.apply(this); // ResultStrategy needs a reference to the Aggregator to do its job.
106+
this.includeExclude = includeExclude;
107+
bucketOrds = BytesKeyedBucketOrds.build(context.bigArrays(), cardinality);
108+
this.fieldName = fieldName;
109+
}
80110

81111
public MapStringTermsAggregator(
82112
String name,
@@ -99,6 +129,20 @@ public MapStringTermsAggregator(
99129
this.resultStrategy = resultStrategy.apply(this); // ResultStrategy needs a reference to the Aggregator to do its job.
100130
this.includeExclude = includeExclude;
101131
bucketOrds = BytesKeyedBucketOrds.build(context.bigArrays(), cardinality);
132+
if (collectorSource instanceof ValuesSourceCollectorSource) {
133+
ValuesSource valuesCollectorSource = ((ValuesSourceCollectorSource) collectorSource).getValuesSource();
134+
if (valuesCollectorSource instanceof ValuesSource.Bytes.FieldData) {
135+
this.fieldName = ((ValuesSource.Bytes.FieldData) valuesCollectorSource).getIndexFieldName();
136+
} else {
137+
this.fieldName = null;
138+
}
139+
} else {
140+
this.fieldName = null;
141+
}
142+
}
143+
144+
public void setWeight(Weight weight) {
145+
this.weight = weight;
102146
}
103147

104148
@Override
@@ -130,6 +174,51 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCol
130174
);
131175
}
132176

177+
@Override
178+
protected boolean tryPrecomputeAggregationForLeaf(LeafReaderContext ctx) throws IOException {
179+
if (subAggregators.length > 0 || includeExclude != null || fieldName == null) {
180+
// The optimization does not work when there are subaggregations or if there is a filter.
181+
// The query has to be a match all, otherwise
182+
return false;
183+
}
184+
185+
// The optimization could only be used if there are no deleted documents and the top-level
186+
// query matches all documents in the segment.
187+
if (weight == null) {
188+
return false;
189+
} else {
190+
if (weight.count(ctx) == 0) {
191+
return true;
192+
} else if (weight.count(ctx) != ctx.reader().maxDoc()) {
193+
return false;
194+
}
195+
}
196+
197+
Terms stringTerms = ctx.reader().terms(fieldName);
198+
if (stringTerms == null) {
199+
// Field is not indexed.
200+
return false;
201+
}
202+
203+
TermsEnum stringTermsEnum = stringTerms.iterator();
204+
BytesRef stringTerm = stringTermsEnum.next();
205+
206+
// Here, we will iterate over all the terms in the segment and add the counts into the bucket.
207+
while (stringTerm != null) {
208+
long bucketOrdinal = bucketOrds.add(0L, stringTerm);
209+
if (bucketOrdinal < 0) { // already seen
210+
bucketOrdinal = -1 - bucketOrdinal;
211+
}
212+
int amount = stringTermsEnum.docFreq();
213+
if (resultStrategy instanceof SignificantTermsResults) {
214+
((SignificantTermsResults)resultStrategy).updateSubsetSizes(0L, amount);
215+
}
216+
incrementBucketDocCount(bucketOrdinal, amount);
217+
stringTerm = stringTermsEnum.next();
218+
}
219+
return true;
220+
}
221+
133222
@Override
134223
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
135224
return resultStrategy.buildAggregations(owningBucketOrds);
@@ -196,6 +285,10 @@ public boolean needsScores() {
196285
return valuesSource.needsScores();
197286
}
198287

288+
public ValuesSource getValuesSource() {
289+
return valuesSource;
290+
}
291+
199292
@Override
200293
public LeafBucketCollector getLeafCollector(
201294
IncludeExclude.StringFilter includeExclude,
@@ -501,6 +594,11 @@ String describe() {
501594
return "significant_terms";
502595
}
503596

597+
public void updateSubsetSizes(long owningBucketOrd, int amount) {
598+
subsetSizes = context.bigArrays().grow(subsetSizes, owningBucketOrd + 1);
599+
subsetSizes.increment(owningBucketOrd, amount);
600+
}
601+
504602
@Override
505603
LeafBucketCollector wrapCollector(LeafBucketCollector primary) {
506604
return new LeafBucketCollectorBase(primary, null) {

server/src/main/java/org/opensearch/search/aggregations/bucket/terms/SignificantTextAggregatorFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,8 @@ protected Aggregator createInternal(
168168
SubAggCollectionMode.BREADTH_FIRST,
169169
false,
170170
cardinality,
171-
metadata
171+
metadata,
172+
indexedFieldName
172173
);
173174
}
174175

server/src/main/java/org/opensearch/search/aggregations/bucket/terms/StringRareTermsAggregator.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@
3232
package org.opensearch.search.aggregations.bucket.terms;
3333

3434
import org.apache.lucene.index.LeafReaderContext;
35+
import org.apache.lucene.index.Terms;
36+
import org.apache.lucene.index.TermsEnum;
37+
import org.apache.lucene.search.Weight;
3538
import org.apache.lucene.util.BytesRef;
3639
import org.apache.lucene.util.BytesRefBuilder;
3740
import org.opensearch.common.lease.Releasables;
@@ -64,7 +67,9 @@
6467
public class StringRareTermsAggregator extends AbstractRareTermsAggregator {
6568
private final ValuesSource.Bytes valuesSource;
6669
private final IncludeExclude.StringFilter filter;
70+
private Weight weight;
6771
private final BytesKeyedBucketOrds bucketOrds;
72+
protected final String fieldName;
6873

6974
StringRareTermsAggregator(
7075
String name,
@@ -83,6 +88,13 @@ public class StringRareTermsAggregator extends AbstractRareTermsAggregator {
8388
this.valuesSource = valuesSource;
8489
this.filter = filter;
8590
this.bucketOrds = BytesKeyedBucketOrds.build(context.bigArrays(), cardinality);
91+
this.fieldName = (valuesSource instanceof ValuesSource.Bytes.WithOrdinals.FieldData)
92+
? ((ValuesSource.Bytes.WithOrdinals.FieldData) valuesSource).getIndexFieldName()
93+
: null;
94+
}
95+
96+
public void setWeight(Weight weight) {
97+
this.weight = weight;
8698
}
8799

88100
@Override
@@ -122,6 +134,47 @@ public void collect(int docId, long owningBucketOrd) throws IOException {
122134
};
123135
}
124136

137+
@Override
138+
protected boolean tryPrecomputeAggregationForLeaf(LeafReaderContext ctx) throws IOException {
139+
if (subAggregators.length > 0 || filter != null) {
140+
// The optimization does not work when there are subaggregations or if there is a filter.
141+
// The query has to be a match all, otherwise
142+
return false;
143+
}
144+
145+
// The optimization could only be used if there are no deleted documents and the top-level
146+
// query matches all documents in the segment.
147+
if (weight == null) {
148+
return false;
149+
} else {
150+
if (weight.count(ctx) == 0) {
151+
return true;
152+
} else if (weight.count(ctx) != ctx.reader().maxDoc()) {
153+
return false;
154+
}
155+
}
156+
157+
Terms stringTerms = ctx.reader().terms(fieldName);
158+
if (stringTerms == null) {
159+
// Field is not indexed.
160+
return false;
161+
}
162+
163+
TermsEnum stringTermsEnum = stringTerms.iterator();
164+
BytesRef stringTerm = stringTermsEnum.next();
165+
166+
// Here, we will iterate over all the terms in the segment and add the counts into the bucket.
167+
while (stringTerm != null) {
168+
long bucketOrdinal = bucketOrds.add(0L, stringTerm);
169+
if (bucketOrdinal < 0) { // already seen
170+
bucketOrdinal = -1 - bucketOrdinal;
171+
}
172+
incrementBucketDocCount(bucketOrdinal, stringTermsEnum.docFreq());
173+
stringTerm = stringTermsEnum.next();
174+
}
175+
return true;
176+
}
177+
125178
@Override
126179
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
127180
/*

server/src/main/java/org/opensearch/search/aggregations/support/ValuesSource.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,10 @@ public SortedBinaryDocValues bytesValues(LeafReaderContext context) {
304304
return indexFieldData.load(context).getBytesValues();
305305
}
306306

307+
public String getIndexFieldName() {
308+
return this.indexFieldData.getFieldName();
309+
}
310+
307311
}
308312

309313
/**

server/src/test/java/org/opensearch/search/aggregations/bucket/terms/RareTermsAggregatorTests.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.lucene.document.SortedDocValuesField;
4040
import org.apache.lucene.document.SortedNumericDocValuesField;
4141
import org.apache.lucene.document.SortedSetDocValuesField;
42+
import org.apache.lucene.document.StringField;
4243
import org.apache.lucene.index.DirectoryReader;
4344
import org.apache.lucene.index.IndexReader;
4445
import org.apache.lucene.search.FieldExistsQuery;
@@ -156,6 +157,12 @@ public void testMatchAllDocs() throws IOException {
156157
assertThat(bucket.getKeyAsString(), equalTo("1"));
157158
assertThat(bucket.getDocCount(), equalTo(1L));
158159
});
160+
testSearchCaseIndexString(query, dataset, aggregation -> aggregation.field(KEYWORD_FIELD).maxDocCount(1), agg -> {
161+
assertEquals(1, agg.getBuckets().size());
162+
StringRareTerms.Bucket bucket = (StringRareTerms.Bucket) agg.getBuckets().get(0);
163+
assertThat(bucket.getKeyAsString(), equalTo("1"));
164+
assertThat(bucket.getDocCount(), equalTo(1L));
165+
}, true);
159166
}
160167

161168
public void testManyDocsOneRare() throws IOException {
@@ -581,6 +588,21 @@ private void testSearchCase(
581588

582589
}
583590

591+
private void testSearchCaseIndexString(
592+
Query query,
593+
List<Long> dataset,
594+
Consumer<RareTermsAggregationBuilder> configure,
595+
Consumer<InternalMappedRareTerms<?, ?>> verify,
596+
boolean shouldIndex
597+
) throws IOException {
598+
RareTermsAggregationBuilder aggregationBuilder = new RareTermsAggregationBuilder("_name");
599+
if (configure != null) {
600+
configure.accept(aggregationBuilder);
601+
}
602+
verify.accept(executeTestCaseIndexString(query, dataset, aggregationBuilder, shouldIndex));
603+
604+
}
605+
584606
private <A extends InternalAggregation> A executeTestCase(Query query, List<Long> dataset, AggregationBuilder aggregationBuilder)
585607
throws IOException {
586608
try (Directory directory = newDirectory()) {
@@ -610,6 +632,38 @@ private <A extends InternalAggregation> A executeTestCase(Query query, List<Long
610632
}
611633
}
612634

635+
private <A extends InternalAggregation> A executeTestCaseIndexString(Query query, List<Long> dataset, AggregationBuilder aggregationBuilder, boolean shouldIndex)
636+
throws IOException {
637+
try (Directory directory = newDirectory()) {
638+
try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) {
639+
Document document = new Document();
640+
List<Long> shuffledDataset = new ArrayList<>(dataset);
641+
Collections.shuffle(shuffledDataset, random());
642+
for (Long value : shuffledDataset) {
643+
document.add(new SortedNumericDocValuesField(LONG_FIELD, value));
644+
document.add(new LongPoint(LONG_FIELD, value));
645+
document.add(new SortedSetDocValuesField(KEYWORD_FIELD, new BytesRef(Long.toString(value))));
646+
if (shouldIndex) {
647+
document.add(new StringField(KEYWORD_FIELD, Long.toString(value), Field.Store.NO));
648+
}
649+
document.add(new SortedSetDocValuesField("even_odd", new BytesRef(value % 2 == 0 ? "even" : "odd")));
650+
indexWriter.addDocument(document);
651+
document.clear();
652+
}
653+
}
654+
655+
try (IndexReader indexReader = DirectoryReader.open(directory)) {
656+
IndexSearcher indexSearcher = newIndexSearcher(indexReader);
657+
658+
MappedFieldType[] types = new MappedFieldType[] {
659+
keywordField(KEYWORD_FIELD),
660+
longField(LONG_FIELD),
661+
keywordField("even_odd") };
662+
return searchAndReduce(indexSearcher, query, aggregationBuilder, types);
663+
}
664+
}
665+
}
666+
613667
@Override
614668
public void doAssertReducedMultiBucketConsumer(Aggregation agg, MultiBucketConsumerService.MultiBucketConsumer bucketConsumer) {
615669
/*

0 commit comments

Comments
 (0)