-
Notifications
You must be signed in to change notification settings - Fork 2.3k
Added aggregation precomputation for rare terms #18106
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 2 commits
11ecaf3
65e20b8
d51c2a0
ab13378
b5e08d8
ebca7e1
9d73b57
66171ca
b4a4128
b60c221
0375104
86a23cb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,8 +31,13 @@ | |
|
||
package org.opensearch.search.aggregations.bucket.missing; | ||
|
||
import org.apache.lucene.index.DocValues; | ||
import org.apache.lucene.index.FieldInfos; | ||
import org.apache.lucene.index.LeafReaderContext; | ||
import org.apache.lucene.index.NumericDocValues; | ||
import org.apache.lucene.search.Weight; | ||
import org.opensearch.index.fielddata.DocValueBits; | ||
import org.opensearch.index.mapper.DocCountFieldMapper; | ||
import org.opensearch.search.aggregations.Aggregator; | ||
import org.opensearch.search.aggregations.AggregatorFactories; | ||
import org.opensearch.search.aggregations.CardinalityUpperBound; | ||
|
@@ -46,7 +51,11 @@ | |
import org.opensearch.search.internal.SearchContext; | ||
|
||
import java.io.IOException; | ||
import java.util.HashSet; | ||
import java.util.Map; | ||
import java.util.Set; | ||
|
||
import static org.apache.lucene.index.SortedSetDocValues.NO_MORE_DOCS; | ||
|
||
/** | ||
* Aggregate all docs that are missing a value. | ||
|
@@ -55,7 +64,10 @@ | |
*/ | ||
public class MissingAggregator extends BucketsAggregator implements SingleBucketAggregator { | ||
|
||
private Weight weight; | ||
private final ValuesSource valuesSource; | ||
protected final String fieldName; | ||
private final ValuesSourceConfig valuesSourceConfig; | ||
|
||
public MissingAggregator( | ||
String name, | ||
|
@@ -69,6 +81,16 @@ | |
super(name, factories, aggregationContext, parent, cardinality, metadata); | ||
// TODO: Stop using nulls here | ||
this.valuesSource = valuesSourceConfig.hasValues() ? valuesSourceConfig.getValuesSource() : null; | ||
if (this.valuesSource != null) { | ||
this.fieldName = valuesSource.getIndexFieldName(); | ||
} else { | ||
this.fieldName = null; | ||
} | ||
this.valuesSourceConfig = valuesSourceConfig; | ||
} | ||
|
||
public void setWeight(Weight weight) { | ||
this.weight = weight; | ||
} | ||
|
||
@Override | ||
|
@@ -94,6 +116,66 @@ | |
}; | ||
} | ||
|
||
@Override | ||
protected boolean tryPrecomputeAggregationForLeaf(LeafReaderContext ctx) throws IOException { | ||
if (subAggregators.length > 0) { | ||
// The optimization does not work when there are subaggregations or if there is a filter. | ||
// The query has to be a match all, otherwise | ||
// | ||
return false; | ||
Check warning on line 125 in server/src/main/java/org/opensearch/search/aggregations/bucket/missing/MissingAggregator.java
|
||
} | ||
|
||
if (valuesSourceConfig.missing() != null) { | ||
// we do not collect any documents through the missing aggregation when the missing parameter | ||
// is up. | ||
return true; | ||
} | ||
|
||
if (fieldName == null) { | ||
// The optimization does not work when there are subaggregations or if there is a filter. | ||
// The query has to be a match all, otherwise | ||
// | ||
return false; | ||
} | ||
|
||
// The optimization could only be used if there are no deleted documents and the top-level | ||
// query matches all documents in the segment. | ||
if (weight == null) { | ||
return false; | ||
Check warning on line 144 in server/src/main/java/org/opensearch/search/aggregations/bucket/missing/MissingAggregator.java
|
||
} else { | ||
if (weight.count(ctx) == 0) { | ||
return true; | ||
Check warning on line 147 in server/src/main/java/org/opensearch/search/aggregations/bucket/missing/MissingAggregator.java
|
||
} else if (weight.count(ctx) != ctx.reader().maxDoc()) { | ||
return false; | ||
Check warning on line 149 in server/src/main/java/org/opensearch/search/aggregations/bucket/missing/MissingAggregator.java
|
||
} | ||
} | ||
|
||
Set<String> indexedFields = new HashSet<>(FieldInfos.getIndexedFields(ctx.reader())); | ||
|
||
// This will only work if the field name is indexed because otherwise, the reader would not | ||
// have kept track of the doc count of the fieldname. There is a case where a field might be nonexistent | ||
// but still can be calculated. | ||
if (indexedFields.contains(fieldName) == false && ctx.reader().getFieldInfos().fieldInfo(fieldName) != null) { | ||
return false; | ||
} | ||
|
||
NumericDocValues docCountValues = DocValues.getNumeric(ctx.reader(), DocCountFieldMapper.NAME); | ||
if (docCountValues.nextDoc() != NO_MORE_DOCS) { | ||
// This segment has at least one document with the _doc_count field. | ||
return false; | ||
Check warning on line 165 in server/src/main/java/org/opensearch/search/aggregations/bucket/missing/MissingAggregator.java
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think if you separate out the test cases as I commented in test files - that can give you a good code coverage as well. |
||
} | ||
|
||
long docCountWithFieldName = ctx.reader().getDocCount(fieldName); | ||
int totalDocCount = ctx.reader().maxDoc(); | ||
|
||
// The missing aggregation bucket will count the number of documents where the field name is | ||
// either null or not present in that document. We are subtracting the documents where the field | ||
// value is valid. | ||
incrementBucketDocCount(0, totalDocCount - docCountWithFieldName); | ||
ajleong623 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
return true; | ||
} | ||
|
||
@Override | ||
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { | ||
return buildAggregationsForSingleBucket( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,15 +31,21 @@ | |
|
||
package org.opensearch.search.aggregations.bucket.terms; | ||
|
||
import org.apache.lucene.index.DocValues; | ||
import org.apache.lucene.index.LeafReaderContext; | ||
import org.apache.lucene.index.NumericDocValues; | ||
import org.apache.lucene.index.Terms; | ||
import org.apache.lucene.index.TermsEnum; | ||
import org.apache.lucene.search.ScoreMode; | ||
import org.apache.lucene.search.Weight; | ||
import org.apache.lucene.util.BytesRef; | ||
import org.apache.lucene.util.BytesRefBuilder; | ||
import org.apache.lucene.util.PriorityQueue; | ||
import org.opensearch.common.lease.Releasable; | ||
import org.opensearch.common.lease.Releasables; | ||
import org.opensearch.common.util.LongArray; | ||
import org.opensearch.index.fielddata.SortedBinaryDocValues; | ||
import org.opensearch.index.mapper.DocCountFieldMapper; | ||
import org.opensearch.search.DocValueFormat; | ||
import org.opensearch.search.aggregations.Aggregator; | ||
import org.opensearch.search.aggregations.AggregatorFactories; | ||
|
@@ -54,6 +60,7 @@ | |
import org.opensearch.search.aggregations.bucket.terms.SignificanceLookup.BackgroundFrequencyForBytes; | ||
import org.opensearch.search.aggregations.bucket.terms.heuristic.SignificanceHeuristic; | ||
import org.opensearch.search.aggregations.support.ValuesSource; | ||
import org.opensearch.search.aggregations.support.ValuesSourceConfig; | ||
import org.opensearch.search.internal.SearchContext; | ||
|
||
import java.io.IOException; | ||
|
@@ -65,6 +72,8 @@ | |
import java.util.function.Supplier; | ||
|
||
import static org.opensearch.search.aggregations.InternalOrder.isKeyOrder; | ||
import static org.apache.lucene.index.SortedSetDocValues.NO_MORE_DOCS; | ||
import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS; | ||
|
||
/** | ||
* An aggregator of string values that hashes the strings on the fly rather | ||
|
@@ -75,8 +84,11 @@ | |
public class MapStringTermsAggregator extends AbstractStringTermsAggregator { | ||
private final CollectorSource collectorSource; | ||
private final ResultStrategy<?, ?> resultStrategy; | ||
private Weight weight; | ||
private final BytesKeyedBucketOrds bucketOrds; | ||
private final IncludeExclude.StringFilter includeExclude; | ||
protected final String fieldName; | ||
private final ValuesSourceConfig config; | ||
|
||
public MapStringTermsAggregator( | ||
String name, | ||
|
@@ -92,13 +104,52 @@ | |
SubAggCollectionMode collectionMode, | ||
boolean showTermDocCountError, | ||
CardinalityUpperBound cardinality, | ||
Map<String, Object> metadata | ||
Map<String, Object> metadata, | ||
String fieldName, | ||
ValuesSourceConfig config | ||
) throws IOException { | ||
super(name, factories, context, parent, order, format, bucketCountThresholds, collectionMode, showTermDocCountError, metadata); | ||
this.collectorSource = collectorSource; | ||
this.resultStrategy = resultStrategy.apply(this); // ResultStrategy needs a reference to the Aggregator to do its job. | ||
this.includeExclude = includeExclude; | ||
bucketOrds = BytesKeyedBucketOrds.build(context.bigArrays(), cardinality); | ||
this.fieldName = fieldName; | ||
this.config = config; | ||
} | ||
|
||
public MapStringTermsAggregator( | ||
String name, | ||
AggregatorFactories factories, | ||
CollectorSource collectorSource, | ||
Function<MapStringTermsAggregator, ResultStrategy<?, ?>> resultStrategy, | ||
BucketOrder order, | ||
DocValueFormat format, | ||
BucketCountThresholds bucketCountThresholds, | ||
IncludeExclude.StringFilter includeExclude, | ||
SearchContext context, | ||
Aggregator parent, | ||
SubAggCollectionMode collectionMode, | ||
boolean showTermDocCountError, | ||
CardinalityUpperBound cardinality, | ||
Map<String, Object> metadata, | ||
ValuesSourceConfig config | ||
) throws IOException { | ||
super(name, factories, context, parent, order, format, bucketCountThresholds, collectionMode, showTermDocCountError, metadata); | ||
this.collectorSource = collectorSource; | ||
this.resultStrategy = resultStrategy.apply(this); // ResultStrategy needs a reference to the Aggregator to do its job. | ||
this.includeExclude = includeExclude; | ||
bucketOrds = BytesKeyedBucketOrds.build(context.bigArrays(), cardinality); | ||
if (collectorSource instanceof ValuesSourceCollectorSource) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't like the idea of being uncertain about where the Also, you can probably use pattern matching for instanceof:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point. I will just stick with fetching from the value source. Since I made the modification to add the field name to the constructor, previous implementations should not be affected. |
||
ValuesSource valuesCollectorSource = ((ValuesSourceCollectorSource) collectorSource).getValuesSource(); | ||
this.fieldName = valuesCollectorSource.getIndexFieldName(); | ||
} else { | ||
this.fieldName = null; | ||
Check warning on line 146 in server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java
|
||
} | ||
this.config = config; | ||
} | ||
|
||
public void setWeight(Weight weight) { | ||
this.weight = weight; | ||
} | ||
|
||
@Override | ||
|
@@ -130,6 +181,68 @@ | |
); | ||
} | ||
|
||
@Override | ||
protected boolean tryPrecomputeAggregationForLeaf(LeafReaderContext ctx) throws IOException { | ||
// TODO: A note is that in scripted aggregations, the way of collecting from buckets is determined from | ||
// the script aggregator. For now, we will not be able to support the script aggregation. | ||
|
||
if (subAggregators.length > 0 || includeExclude != null || fieldName == null) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can pull up null checks for Right now you are checking for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We might be able to proceed if |
||
// The optimization does not work when there are subaggregations or if there is a filter. | ||
// The query has to be a match all, otherwise | ||
return false; | ||
} | ||
|
||
// If the missing property is specified in the builder, and there are documents with the | ||
// field missing, we might not be able to use the index unless there is some way we can | ||
// calculate which ordinal value that missing field is (something I am not sure how to | ||
// do yet). | ||
if (config != null && config.missing() != null && ((weight.count(ctx) == ctx.reader().getDocCount(fieldName)) == false)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right. I looked at the formatting guidelines again, and I only have to assert the equality as false for unary negations. |
||
return false; | ||
Check warning on line 200 in server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java
|
||
} | ||
|
||
// The optimization could only be used if there are no deleted documents and the top-level | ||
// query matches all documents in the segment. | ||
if (weight == null) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: Moving this null check towards the start of method can make this more readable. |
||
return false; | ||
} else { | ||
if (weight.count(ctx) == 0) { | ||
return true; | ||
} else if (weight.count(ctx) != ctx.reader().maxDoc()) { | ||
return false; | ||
} | ||
} | ||
|
||
Terms stringTerms = ctx.reader().terms(fieldName); | ||
if (stringTerms == null) { | ||
// Field is not indexed. | ||
return false; | ||
} | ||
|
||
NumericDocValues docCountValues = DocValues.getNumeric(ctx.reader(), DocCountFieldMapper.NAME); | ||
if (docCountValues.nextDoc() != NO_MORE_DOCS) { | ||
// This segment has at least one document with the _doc_count field. | ||
return false; | ||
Check warning on line 224 in server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java
|
||
} | ||
|
||
TermsEnum stringTermsEnum = stringTerms.iterator(); | ||
BytesRef stringTerm = stringTermsEnum.next(); | ||
|
||
// Here, we will iterate over all the terms in the segment and add the counts into the bucket. | ||
while (stringTerm != null) { | ||
long bucketOrdinal = bucketOrds.add(0L, stringTerm); | ||
if (bucketOrdinal < 0) { // already seen | ||
bucketOrdinal = -1 - bucketOrdinal; | ||
Check warning on line 234 in server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java
|
||
} | ||
int amount = stringTermsEnum.docFreq(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: rename |
||
if (resultStrategy instanceof SignificantTermsResults) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit:
|
||
((SignificantTermsResults) resultStrategy).updateSubsetSizes(0L, amount); | ||
} | ||
incrementBucketDocCount(bucketOrdinal, amount); | ||
stringTerm = stringTermsEnum.next(); | ||
} | ||
return true; | ||
} | ||
|
||
@Override | ||
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { | ||
return resultStrategy.buildAggregations(owningBucketOrds); | ||
|
@@ -196,6 +309,10 @@ | |
return valuesSource.needsScores(); | ||
} | ||
|
||
public ValuesSource getValuesSource() { | ||
return valuesSource; | ||
} | ||
|
||
@Override | ||
public LeafBucketCollector getLeafCollector( | ||
IncludeExclude.StringFilter includeExclude, | ||
|
@@ -501,6 +618,11 @@ | |
return "significant_terms"; | ||
} | ||
|
||
public void updateSubsetSizes(long owningBucketOrd, int amount) { | ||
subsetSizes = context.bigArrays().grow(subsetSizes, owningBucketOrd + 1); | ||
subsetSizes.increment(owningBucketOrd, amount); | ||
} | ||
|
||
@Override | ||
LeafBucketCollector wrapCollector(LeafBucketCollector primary) { | ||
return new LeafBucketCollectorBase(primary, null) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the comment is misplaced here.
Can you please check the comments on the entire PR once. Also, please remove empty comment lines.