Skip to content

Commit 76b219b

Browse files
authored
Compute all groups for group by queries with only filtered aggregations (#14211)
1 parent fe47073 commit 76b219b

File tree

8 files changed

+133
-34
lines changed

8 files changed

+133
-34
lines changed

pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,10 @@ public static boolean isServerReturnFinalResultKeyUnpartitioned(Map<String, Stri
234234
return Boolean.parseBoolean(queryOptions.get(QueryOptionKey.SERVER_RETURN_FINAL_RESULT_KEY_UNPARTITIONED));
235235
}
236236

237+
public static boolean isFilteredAggregationsSkipEmptyGroups(Map<String, String> queryOptions) {
238+
return Boolean.parseBoolean(queryOptions.get(QueryOptionKey.FILTERED_AGGREGATIONS_SKIP_EMPTY_GROUPS));
239+
}
240+
237241
@Nullable
238242
public static String getOrderByAlgorithm(Map<String, String> queryOptions) {
239243
return queryOptions.get(QueryOptionKey.ORDER_BY_ALGORITHM);

pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java

Lines changed: 16 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -124,33 +124,24 @@ protected GroupByResultsBlock getNextBlock() {
124124

125125
// Perform aggregation group-by on all the blocks
126126
DefaultGroupByExecutor groupByExecutor;
127-
if (groupKeyGenerator == null) {
128-
// The group key generator should be shared across all AggregationFunctions so that agg results can be
129-
// aligned. Given that filtered aggregations are stored as an iterable of iterables so that all filtered aggs
130-
// with the same filter can share transform blocks, rather than a singular flat iterable in the case where
131-
// aggs are all non-filtered, sharing a GroupKeyGenerator across all aggs cannot be accomplished by allowing
132-
// the GroupByExecutor to have sole ownership of the GroupKeyGenerator. Therefore, we allow constructing a
133-
// GroupByExecutor with a pre-existing GroupKeyGenerator so that the GroupKeyGenerator can be shared across
134-
// loop iterations i.e. across all aggs.
135-
if (aggregationInfo.isUseStarTree()) {
136-
groupByExecutor =
137-
new StarTreeGroupByExecutor(_queryContext, aggregationFunctions, _groupByExpressions, projectOperator);
138-
} else {
139-
groupByExecutor =
140-
new DefaultGroupByExecutor(_queryContext, aggregationFunctions, _groupByExpressions, projectOperator);
141-
}
142-
groupKeyGenerator = groupByExecutor.getGroupKeyGenerator();
127+
128+
if (aggregationInfo.isUseStarTree()) {
129+
groupByExecutor =
130+
new StarTreeGroupByExecutor(_queryContext, aggregationFunctions, _groupByExpressions, projectOperator,
131+
groupKeyGenerator);
143132
} else {
144-
if (aggregationInfo.isUseStarTree()) {
145-
groupByExecutor =
146-
new StarTreeGroupByExecutor(_queryContext, aggregationFunctions, _groupByExpressions, projectOperator,
147-
groupKeyGenerator);
148-
} else {
149-
groupByExecutor =
150-
new DefaultGroupByExecutor(_queryContext, aggregationFunctions, _groupByExpressions, projectOperator,
151-
groupKeyGenerator);
152-
}
133+
groupByExecutor =
134+
new DefaultGroupByExecutor(_queryContext, aggregationFunctions, _groupByExpressions, projectOperator,
135+
groupKeyGenerator);
153136
}
137+
// The group key generator should be shared across all AggregationFunctions so that agg results can be
138+
// aligned. Given that filtered aggregations are stored as an iterable of iterables so that all filtered aggs
139+
// with the same filter can share transform blocks, rather than a singular flat iterable in the case where
140+
// aggs are all non-filtered, sharing a GroupKeyGenerator across all aggs cannot be accomplished by allowing
141+
// the GroupByExecutor to have sole ownership of the GroupKeyGenerator. Therefore, we allow constructing a
142+
// GroupByExecutor with a pre-existing GroupKeyGenerator so that the GroupKeyGenerator can be shared across
143+
// loop iterations i.e. across all aggs.
144+
groupKeyGenerator = groupByExecutor.getGroupKeyGenerator();
154145

155146
int numDocsScanned = 0;
156147
ValueBlock valueBlock;

pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.pinot.common.request.context.FilterContext;
4040
import org.apache.pinot.common.request.context.predicate.Predicate;
4141
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
42+
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
4243
import org.apache.pinot.core.common.BlockValSet;
4344
import org.apache.pinot.core.common.ObjectSerDeUtils;
4445
import org.apache.pinot.core.operator.BaseProjectOperator;
@@ -384,7 +385,14 @@ public static List<AggregationInfo> buildFilteredAggregationInfos(SegmentContext
384385
}
385386
}
386387

387-
if (!nonFilteredFunctions.isEmpty()) {
388+
if (!nonFilteredFunctions.isEmpty() || ((queryContext.getGroupByExpressions() != null)
389+
&& !QueryOptionsUtils.isFilteredAggregationsSkipEmptyGroups(queryContext.getQueryOptions()))) {
390+
// If there are no non-filtered aggregation functions for a group by query, we still add a new AggregationInfo
391+
// with an empty AggregationFunction array and the main query filter so that the GroupByExecutor will compute all
392+
// the groups (from the result of applying the main query filter) but no unnecessary additional aggregation will
393+
// be done since the AggregationFunction array is empty. However, if the query option to skip empty groups is
394+
// enabled, we don't do this in order to avoid unnecessary computation of empty groups (which can be very
395+
// expensive if the main filter has high selectivity).
388396
AggregationFunction[] aggregationFunctions = nonFilteredFunctions.toArray(new AggregationFunction[0]);
389397
aggregationInfos.add(
390398
buildAggregationInfo(segmentContext, queryContext, aggregationFunctions, mainFilter, mainFilterOperator,

pinot-core/src/test/java/org/apache/pinot/queries/FilteredAggregationsTest.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.apache.pinot.spi.data.Schema;
4242
import org.apache.pinot.spi.data.readers.GenericRow;
4343
import org.apache.pinot.spi.data.readers.RecordReader;
44+
import org.apache.pinot.spi.utils.CommonConstants;
4445
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
4546
import org.testng.annotations.AfterClass;
4647
import org.testng.annotations.BeforeClass;
@@ -394,9 +395,9 @@ public void testGroupBy() {
394395

395396
@Test
396397
public void testGroupByMultipleColumns() {
397-
String filterQuery =
398-
"SELECT SUM(INT_COL) FILTER(WHERE INT_COL > 25000) testSum FROM MyTable GROUP BY BOOLEAN_COL, STRING_COL "
399-
+ "ORDER BY BOOLEAN_COL, STRING_COL";
398+
String filterQuery = "SET " + CommonConstants.Broker.Request.QueryOptionKey.FILTERED_AGGREGATIONS_SKIP_EMPTY_GROUPS
399+
+ "=true; SELECT SUM(INT_COL) FILTER(WHERE INT_COL > 25000) testSum FROM MyTable GROUP BY BOOLEAN_COL, "
400+
+ "STRING_COL ORDER BY BOOLEAN_COL, STRING_COL";
400401
String nonFilterQuery =
401402
"SELECT SUM(INT_COL) testSum FROM MyTable WHERE INT_COL > 25000 GROUP BY BOOLEAN_COL, STRING_COL "
402403
+ "ORDER BY BOOLEAN_COL, STRING_COL";

pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java

Lines changed: 40 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -140,11 +140,6 @@ protected void setupTenants()
140140
throws IOException {
141141
}
142142

143-
// @Override
144-
// protected boolean useMultiStageQueryEngine() {
145-
// return true;
146-
// }
147-
148143
@BeforeMethod
149144
@Override
150145
public void resetMultiStage() {
@@ -1043,6 +1038,46 @@ public void testMVNumericCastInFilter() throws Exception {
10431038
assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(0).asInt(), 15482);
10441039
}
10451040

1041+
@Test
1042+
public void testFilteredAggregationWithNoValueMatchingAggregationFilterDefault()
1043+
throws Exception {
1044+
// Use a hint to ensure that the aggregation will not be pushed to the leaf stage, so that we can test the
1045+
// MultistageGroupByExecutor
1046+
String sqlQuery = "SELECT /*+ aggOptions(is_skip_leaf_stage_group_by='true') */"
1047+
+ "AirlineID, COUNT(*) FILTER (WHERE Origin = 'garbage') FROM mytable WHERE AirlineID > 20000 GROUP BY "
1048+
+ "AirlineID";
1049+
JsonNode result = postQuery(sqlQuery);
1050+
assertNoError(result);
1051+
// Ensure that result set is not empty
1052+
assertTrue(result.get("numRowsResultSet").asInt() > 0);
1053+
1054+
// Ensure that the count is 0 for all groups (because the aggregation filter does not match any rows)
1055+
JsonNode rows = result.get("resultTable").get("rows");
1056+
for (int i = 0; i < rows.size(); i++) {
1057+
assertEquals(rows.get(i).get(1).asInt(), 0);
1058+
// Ensure that the main filter was applied
1059+
assertTrue(rows.get(i).get(0).asInt() > 20000);
1060+
}
1061+
}
1062+
1063+
@Test
1064+
public void testFilteredAggregationWithNoValueMatchingAggregationFilterWithOption()
1065+
throws Exception {
1066+
// Use a hint to ensure that the aggregation will not be pushed to the leaf stage, so that we can test the
1067+
// MultistageGroupByExecutor
1068+
String sqlQuery = "SET " + CommonConstants.Broker.Request.QueryOptionKey.FILTERED_AGGREGATIONS_SKIP_EMPTY_GROUPS
1069+
+ "=true; SELECT /*+ aggOptions(is_skip_leaf_stage_group_by='true') */"
1070+
+ "AirlineID, COUNT(*) FILTER (WHERE Origin = 'garbage') FROM mytable WHERE AirlineID > 20000 GROUP BY "
1071+
+ "AirlineID";
1072+
1073+
JsonNode result = postQuery(sqlQuery);
1074+
assertNoError(result);
1075+
1076+
// Result set will be empty since the aggregation filter does not match any rows, and we've set the query option to
1077+
// skip empty groups
1078+
assertEquals(result.get("numRowsResultSet").asInt(), 0);
1079+
}
1080+
10461081
@Override
10471082
protected String getTableName() {
10481083
return _tableName;

pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3722,4 +3722,44 @@ public void testSkipIndexes(boolean useMultiStageQueryEngine)
37223722
updateTableConfig(tableConfig);
37233723
reloadAllSegments(TEST_UPDATED_RANGE_INDEX_QUERY, true, numTotalDocs);
37243724
}
3725+
3726+
@Test(dataProvider = "useBothQueryEngines")
3727+
public void testFilteredAggregationWithNoValueMatchingAggregationFilterDefault(boolean useMultiStageQueryEngine)
3728+
throws Exception {
3729+
setUseMultiStageQueryEngine(useMultiStageQueryEngine);
3730+
3731+
String sqlQuery =
3732+
"SELECT AirlineID, COUNT(*) FILTER (WHERE Origin = 'garbage') FROM mytable WHERE AirlineID > 20000 GROUP BY "
3733+
+ "AirlineID";
3734+
3735+
JsonNode result = postQuery(sqlQuery);
3736+
assertNoError(result);
3737+
3738+
// Ensure that result set is not empty since all groups should be computed by default
3739+
assertTrue(result.get("numRowsResultSet").asInt() > 0);
3740+
3741+
// Ensure that the count is 0 for all groups (because the aggregation filter does not match any rows)
3742+
JsonNode rows = result.get("resultTable").get("rows");
3743+
for (int i = 0; i < rows.size(); i++) {
3744+
assertEquals(rows.get(i).get(1).asInt(), 0);
3745+
// Ensure that the main filter was applied
3746+
assertTrue(rows.get(i).get(0).asInt() > 20000);
3747+
}
3748+
}
3749+
3750+
@Test(dataProvider = "useBothQueryEngines")
3751+
public void testFilteredAggregationWithNoValueMatchingAggregationFilterWithOption(boolean useMultiStageQueryEngine)
3752+
throws Exception {
3753+
setUseMultiStageQueryEngine(useMultiStageQueryEngine);
3754+
String sqlQuery =
3755+
"SET " + CommonConstants.Broker.Request.QueryOptionKey.FILTERED_AGGREGATIONS_SKIP_EMPTY_GROUPS + "=true; "
3756+
+ "SELECT AirlineID, COUNT(*) FILTER (WHERE Origin = 'garbage') FROM mytable WHERE AirlineID > 20000 "
3757+
+ "GROUP BY AirlineID";
3758+
JsonNode result = postQuery(sqlQuery);
3759+
assertNoError(result);
3760+
3761+
// Result set will be empty since the aggregation filter does not match any rows, and we've set the option to skip
3762+
// empty groups
3763+
assertEquals(result.get("numRowsResultSet").asInt(), 0);
3764+
}
37253765
}

pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageGroupByExecutor.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ public class MultistageGroupByExecutor {
5858
private final AggType _aggType;
5959
private final DataSchema _resultSchema;
6060
private final int _numGroupsLimit;
61+
private final boolean _filteredAggregationsSkipEmptyGroups;
6162

6263
// Group By Result holders for each mode
6364
private final GroupByResultHolder[] _aggregateResultHolders;
@@ -79,6 +80,10 @@ public MultistageGroupByExecutor(int[] groupKeyIds, AggregationFunction[] aggFun
7980
int maxInitialResultHolderCapacity = getMaxInitialResultHolderCapacity(opChainMetadata, nodeHint);
8081
_numGroupsLimit = getNumGroupsLimit(opChainMetadata, nodeHint);
8182

83+
// By default, we compute all groups for SQL compliant results. However, we allow overriding this behavior via
84+
// query option for improved performance.
85+
_filteredAggregationsSkipEmptyGroups = QueryOptionsUtils.isFilteredAggregationsSkipEmptyGroups(opChainMetadata);
86+
8287
int numFunctions = aggFunctions.length;
8388
if (!aggType.isInputIntermediateFormat()) {
8489
_aggregateResultHolders = new GroupByResultHolder[numFunctions];
@@ -241,6 +246,12 @@ private void processAggregate(TransferableBlock block) {
241246
aggFunction.aggregateGroupBySV(numMatchedRows, filteredIntKeys, groupByResultHolder, blockValSetMap);
242247
}
243248
}
249+
if (intKeys == null && !_filteredAggregationsSkipEmptyGroups) {
250+
// _groupIdGenerator should still have all the groups even if there are only filtered aggregates for SQL
251+
// compliant results. However, if the query option to skip empty groups is set, we avoid this step for
252+
// improved performance.
253+
generateGroupByKeys(block);
254+
}
244255
}
245256
}
246257

pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -463,6 +463,15 @@ public static class QueryOptionKey {
463463
// executed in an Unbounded FCFS fashion. However, secondary workloads are executed in a constrainted FCFS
464464
// fashion with limited compute.
465465
public static final String IS_SECONDARY_WORKLOAD = "isSecondaryWorkload";
466+
467+
// For group by queries with only filtered aggregations (and no non-filtered aggregations), the default behavior
468+
// is to compute all groups over the rows matching the main query filter. This ensures SQL compliant results,
469+
// since empty groups are also expected to be returned in such queries. However, this could be quite inefficient
470+
// if the main query does not have a filter (since a scan would be required to compute all groups). In case
471+
// users are okay with skipping empty groups - i.e., only the groups matching at least one aggregation filter
472+
// will be returned - this query option can be set. This is useful for performance, since indexes can be used
473+
// for the aggregation filters and a full scan can be avoided.
474+
public static final String FILTERED_AGGREGATIONS_SKIP_EMPTY_GROUPS = "filteredAggregationsSkipEmptyGroups";
466475
}
467476

468477
public static class QueryOptionValue {

0 commit comments

Comments
 (0)