Skip to content

Commit 456ca97

Browse files
authored
[Star Tree] [Search] Support for metric aggregations with/without term query (#15289)
--------- Signed-off-by: Sandesh Kumar <sandeshkr419@gmail.com>
1 parent 322bdc4 commit 456ca97

32 files changed

+1853
-23
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2727
- Latency and Memory allocation improvements to Multi Term Aggregation queries ([#14993](https://github.com/opensearch-project/OpenSearch/pull/14993))
2828
- Flat object field use IndexOrDocValuesQuery to optimize query ([#14383](https://github.com/opensearch-project/OpenSearch/issues/14383))
2929
- Add method to return dynamic SecureTransportParameters from SecureTransportSettingsProvider interface ([#16387](https://github.com/opensearch-project/OpenSearch/pull/16387)
30+
- [Star Tree - Search] Add support for metric aggregations with/without term query ([15289](https://github.com/opensearch-project/OpenSearch/pull/15289))
3031

3132
### Dependencies
3233
- Bump `com.azure:azure-identity` from 1.13.0 to 1.13.2 ([#15578](https://github.com/opensearch-project/OpenSearch/pull/15578))

server/src/main/java/org/opensearch/index/compositeindex/datacube/DateDimension.java

+5
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
package org.opensearch.index.compositeindex.datacube;
1010

11+
import org.apache.lucene.index.DocValuesType;
1112
import org.opensearch.common.Rounding;
1213
import org.opensearch.common.annotation.ExperimentalApi;
1314
import org.opensearch.common.time.DateUtils;
@@ -169,4 +170,8 @@ public int compare(DateTimeUnitRounding unit1, DateTimeUnitRounding unit2) {
169170
public static List<DateTimeUnitRounding> getSortedDateTimeUnits(List<DateTimeUnitRounding> dateTimeUnits) {
170171
return dateTimeUnits.stream().sorted(new DateTimeUnitComparator()).collect(Collectors.toList());
171172
}
173+
174+
public DocValuesType getDocValuesType() {
175+
return DocValuesType.SORTED_NUMERIC;
176+
}
172177
}

server/src/main/java/org/opensearch/index/compositeindex/datacube/Dimension.java

+3
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
package org.opensearch.index.compositeindex.datacube;
1010

11+
import org.apache.lucene.index.DocValuesType;
1112
import org.opensearch.common.annotation.ExperimentalApi;
1213
import org.opensearch.core.xcontent.ToXContent;
1314

@@ -42,4 +43,6 @@ public interface Dimension extends ToXContent {
4243
* Returns the list of dimension fields that represent the dimension
4344
*/
4445
List<String> getSubDimensionNames();
46+
47+
DocValuesType getDocValuesType();
4548
}

server/src/main/java/org/opensearch/index/compositeindex/datacube/NumericDimension.java

+6
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
package org.opensearch.index.compositeindex.datacube;
1010

11+
import org.apache.lucene.index.DocValuesType;
1112
import org.opensearch.common.annotation.ExperimentalApi;
1213
import org.opensearch.core.xcontent.XContentBuilder;
1314
import org.opensearch.index.mapper.CompositeDataCubeFieldType;
@@ -71,4 +72,9 @@ public boolean equals(Object o) {
7172
public int hashCode() {
7273
return Objects.hash(field);
7374
}
75+
76+
@Override
77+
public DocValuesType getDocValuesType() {
78+
return DocValuesType.SORTED_NUMERIC;
79+
}
7480
}

server/src/main/java/org/opensearch/index/compositeindex/datacube/ReadDimension.java

+6
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
package org.opensearch.index.compositeindex.datacube;
1010

11+
import org.apache.lucene.index.DocValuesType;
1112
import org.opensearch.core.xcontent.XContentBuilder;
1213
import org.opensearch.index.mapper.CompositeDataCubeFieldType;
1314

@@ -69,4 +70,9 @@ public boolean equals(Object o) {
6970
public int hashCode() {
7071
return Objects.hash(field);
7172
}
73+
74+
@Override
75+
public DocValuesType getDocValuesType() {
76+
return DocValuesType.SORTED_NUMERIC;
77+
}
7278
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,248 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.compositeindex.datacube.startree.utils;
10+
11+
import org.apache.lucene.index.DocValuesType;
12+
import org.apache.lucene.index.LeafReaderContext;
13+
import org.apache.lucene.index.SegmentReader;
14+
import org.apache.lucene.search.CollectionTerminatedException;
15+
import org.apache.lucene.search.DocIdSetIterator;
16+
import org.apache.lucene.util.FixedBitSet;
17+
import org.opensearch.common.lucene.Lucene;
18+
import org.opensearch.index.codec.composite.CompositeIndexFieldInfo;
19+
import org.opensearch.index.codec.composite.CompositeIndexReader;
20+
import org.opensearch.index.compositeindex.datacube.Dimension;
21+
import org.opensearch.index.compositeindex.datacube.Metric;
22+
import org.opensearch.index.compositeindex.datacube.MetricStat;
23+
import org.opensearch.index.compositeindex.datacube.startree.index.StarTreeValues;
24+
import org.opensearch.index.compositeindex.datacube.startree.utils.iterator.SortedNumericStarTreeValuesIterator;
25+
import org.opensearch.index.mapper.CompositeDataCubeFieldType;
26+
import org.opensearch.index.query.MatchAllQueryBuilder;
27+
import org.opensearch.index.query.QueryBuilder;
28+
import org.opensearch.index.query.TermQueryBuilder;
29+
import org.opensearch.search.aggregations.AggregatorFactory;
30+
import org.opensearch.search.aggregations.LeafBucketCollector;
31+
import org.opensearch.search.aggregations.LeafBucketCollectorBase;
32+
import org.opensearch.search.aggregations.metrics.MetricAggregatorFactory;
33+
import org.opensearch.search.aggregations.support.ValuesSource;
34+
import org.opensearch.search.builder.SearchSourceBuilder;
35+
import org.opensearch.search.internal.SearchContext;
36+
import org.opensearch.search.startree.StarTreeFilter;
37+
import org.opensearch.search.startree.StarTreeQueryContext;
38+
39+
import java.io.IOException;
40+
import java.util.HashMap;
41+
import java.util.List;
42+
import java.util.Map;
43+
import java.util.function.Consumer;
44+
import java.util.stream.Collectors;
45+
46+
/**
47+
* Helper class for building star-tree query
48+
*
49+
* @opensearch.internal
50+
* @opensearch.experimental
51+
*/
52+
public class StarTreeQueryHelper {
53+
54+
/**
55+
* Checks if the search context can be supported by star-tree
56+
*/
57+
public static boolean isStarTreeSupported(SearchContext context) {
58+
return context.aggregations() != null && context.mapperService().isCompositeIndexPresent() && context.parsedPostFilter() == null;
59+
}
60+
61+
/**
62+
* Gets StarTreeQueryContext from the search context and source builder.
63+
* Returns null if the query and aggregation cannot be supported.
64+
*/
65+
public static StarTreeQueryContext getStarTreeQueryContext(SearchContext context, SearchSourceBuilder source) throws IOException {
66+
// Current implementation assumes only single star-tree is supported
67+
CompositeDataCubeFieldType compositeMappedFieldType = (CompositeDataCubeFieldType) context.mapperService()
68+
.getCompositeFieldTypes()
69+
.iterator()
70+
.next();
71+
CompositeIndexFieldInfo starTree = new CompositeIndexFieldInfo(
72+
compositeMappedFieldType.name(),
73+
compositeMappedFieldType.getCompositeIndexType()
74+
);
75+
76+
for (AggregatorFactory aggregatorFactory : context.aggregations().factories().getFactories()) {
77+
MetricStat metricStat = validateStarTreeMetricSupport(compositeMappedFieldType, aggregatorFactory);
78+
if (metricStat == null) {
79+
return null;
80+
}
81+
}
82+
83+
// need to cache star tree values only for multiple aggregations
84+
boolean cacheStarTreeValues = context.aggregations().factories().getFactories().length > 1;
85+
int cacheSize = cacheStarTreeValues ? context.indexShard().segments(false).size() : -1;
86+
87+
return StarTreeQueryHelper.tryCreateStarTreeQueryContext(starTree, compositeMappedFieldType, source.query(), cacheSize);
88+
}
89+
90+
/**
91+
* Uses query builder and composite index info to form star-tree query context
92+
*/
93+
private static StarTreeQueryContext tryCreateStarTreeQueryContext(
94+
CompositeIndexFieldInfo compositeIndexFieldInfo,
95+
CompositeDataCubeFieldType compositeFieldType,
96+
QueryBuilder queryBuilder,
97+
int cacheStarTreeValuesSize
98+
) {
99+
Map<String, Long> queryMap;
100+
if (queryBuilder == null || queryBuilder instanceof MatchAllQueryBuilder) {
101+
queryMap = null;
102+
} else if (queryBuilder instanceof TermQueryBuilder) {
103+
// TODO: Add support for keyword fields
104+
if (compositeFieldType.getDimensions().stream().anyMatch(d -> d.getDocValuesType() != DocValuesType.SORTED_NUMERIC)) {
105+
// return null for non-numeric fields
106+
return null;
107+
}
108+
109+
List<String> supportedDimensions = compositeFieldType.getDimensions()
110+
.stream()
111+
.map(Dimension::getField)
112+
.collect(Collectors.toList());
113+
queryMap = getStarTreePredicates(queryBuilder, supportedDimensions);
114+
if (queryMap == null) {
115+
return null;
116+
}
117+
} else {
118+
return null;
119+
}
120+
return new StarTreeQueryContext(compositeIndexFieldInfo, queryMap, cacheStarTreeValuesSize);
121+
}
122+
123+
/**
124+
* Parse query body to star-tree predicates
125+
* @param queryBuilder to match star-tree supported query shape
126+
* @return predicates to match
127+
*/
128+
private static Map<String, Long> getStarTreePredicates(QueryBuilder queryBuilder, List<String> supportedDimensions) {
129+
TermQueryBuilder tq = (TermQueryBuilder) queryBuilder;
130+
String field = tq.fieldName();
131+
if (!supportedDimensions.contains(field)) {
132+
return null;
133+
}
134+
long inputQueryVal = Long.parseLong(tq.value().toString());
135+
136+
// Create a map with the field and the value
137+
Map<String, Long> predicateMap = new HashMap<>();
138+
predicateMap.put(field, inputQueryVal);
139+
return predicateMap;
140+
}
141+
142+
private static MetricStat validateStarTreeMetricSupport(
143+
CompositeDataCubeFieldType compositeIndexFieldInfo,
144+
AggregatorFactory aggregatorFactory
145+
) {
146+
if (aggregatorFactory instanceof MetricAggregatorFactory && aggregatorFactory.getSubFactories().getFactories().length == 0) {
147+
String field;
148+
Map<String, List<MetricStat>> supportedMetrics = compositeIndexFieldInfo.getMetrics()
149+
.stream()
150+
.collect(Collectors.toMap(Metric::getField, Metric::getMetrics));
151+
152+
MetricStat metricStat = ((MetricAggregatorFactory) aggregatorFactory).getMetricStat();
153+
field = ((MetricAggregatorFactory) aggregatorFactory).getField();
154+
155+
if (supportedMetrics.containsKey(field) && supportedMetrics.get(field).contains(metricStat)) {
156+
return metricStat;
157+
}
158+
}
159+
return null;
160+
}
161+
162+
public static CompositeIndexFieldInfo getSupportedStarTree(SearchContext context) {
163+
StarTreeQueryContext starTreeQueryContext = context.getStarTreeQueryContext();
164+
return (starTreeQueryContext != null) ? starTreeQueryContext.getStarTree() : null;
165+
}
166+
167+
public static StarTreeValues getStarTreeValues(LeafReaderContext context, CompositeIndexFieldInfo starTree) throws IOException {
168+
SegmentReader reader = Lucene.segmentReader(context.reader());
169+
if (!(reader.getDocValuesReader() instanceof CompositeIndexReader)) {
170+
return null;
171+
}
172+
CompositeIndexReader starTreeDocValuesReader = (CompositeIndexReader) reader.getDocValuesReader();
173+
return (StarTreeValues) starTreeDocValuesReader.getCompositeIndexValues(starTree);
174+
}
175+
176+
/**
177+
* Get the star-tree leaf collector
178+
* This collector computes the aggregation prematurely and invokes an early termination collector
179+
*/
180+
public static LeafBucketCollector getStarTreeLeafCollector(
181+
SearchContext context,
182+
ValuesSource.Numeric valuesSource,
183+
LeafReaderContext ctx,
184+
LeafBucketCollector sub,
185+
CompositeIndexFieldInfo starTree,
186+
String metric,
187+
Consumer<Long> valueConsumer,
188+
Runnable finalConsumer
189+
) throws IOException {
190+
StarTreeValues starTreeValues = getStarTreeValues(ctx, starTree);
191+
assert starTreeValues != null;
192+
String fieldName = ((ValuesSource.Numeric.FieldData) valuesSource).getIndexFieldName();
193+
String metricName = StarTreeUtils.fullyQualifiedFieldNameForStarTreeMetricsDocValues(starTree.getField(), fieldName, metric);
194+
195+
assert starTreeValues != null;
196+
SortedNumericStarTreeValuesIterator valuesIterator = (SortedNumericStarTreeValuesIterator) starTreeValues.getMetricValuesIterator(
197+
metricName
198+
);
199+
// Obtain a FixedBitSet of matched star tree document IDs
200+
FixedBitSet filteredValues = getStarTreeFilteredValues(context, ctx, starTreeValues);
201+
assert filteredValues != null;
202+
203+
int numBits = filteredValues.length(); // Get the number of the filtered values (matching docs)
204+
if (numBits > 0) {
205+
// Iterate over the filtered values
206+
for (int bit = filteredValues.nextSetBit(0); bit != DocIdSetIterator.NO_MORE_DOCS; bit = (bit + 1 < numBits)
207+
? filteredValues.nextSetBit(bit + 1)
208+
: DocIdSetIterator.NO_MORE_DOCS) {
209+
// Advance to the entryId in the valuesIterator
210+
if (valuesIterator.advanceExact(bit) == false) {
211+
continue; // Skip if no more entries
212+
}
213+
214+
// Iterate over the values for the current entryId
215+
for (int i = 0, count = valuesIterator.entryValueCount(); i < count; i++) {
216+
long value = valuesIterator.nextValue();
217+
valueConsumer.accept(value); // Apply the consumer operation (e.g., max, sum)
218+
}
219+
}
220+
}
221+
222+
// Call the final consumer after processing all entries
223+
finalConsumer.run();
224+
225+
// Return a LeafBucketCollector that terminates collection
226+
return new LeafBucketCollectorBase(sub, valuesSource.doubleValues(ctx)) {
227+
@Override
228+
public void collect(int doc, long bucket) {
229+
throw new CollectionTerminatedException();
230+
}
231+
};
232+
}
233+
234+
/**
235+
* Get the filtered values for the star-tree query
236+
* Cache the results in case of multiple aggregations (if cache is initialized)
237+
* @return FixedBitSet of matched document IDs
238+
*/
239+
public static FixedBitSet getStarTreeFilteredValues(SearchContext context, LeafReaderContext ctx, StarTreeValues starTreeValues)
240+
throws IOException {
241+
FixedBitSet result = context.getStarTreeQueryContext().getStarTreeValues(ctx);
242+
if (result == null) {
243+
result = StarTreeFilter.getStarTreeResult(starTreeValues, context.getStarTreeQueryContext().getQueryMap());
244+
context.getStarTreeQueryContext().setStarTreeValues(ctx, result);
245+
}
246+
return result;
247+
}
248+
}

server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/iterator/SortedNumericStarTreeValuesIterator.java

+8
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,12 @@ public SortedNumericStarTreeValuesIterator(DocIdSetIterator docIdSetIterator) {
2929
public long nextValue() throws IOException {
3030
return ((SortedNumericDocValues) docIdSetIterator).nextValue();
3131
}
32+
33+
public int entryValueCount() throws IOException {
34+
return ((SortedNumericDocValues) docIdSetIterator).docValueCount();
35+
}
36+
37+
public boolean advanceExact(int target) throws IOException {
38+
return ((SortedNumericDocValues) docIdSetIterator).advanceExact(target);
39+
}
3240
}

0 commit comments

Comments
 (0)