Skip to content

Commit eb934a6

Browse files
committed
Unify behavior of both query engines to follow standard SQL behavior; allow overriding behavior for performance
1 parent 54a9b92 commit eb934a6

File tree

8 files changed

+47
-53
lines changed

8 files changed

+47
-53
lines changed

pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -234,12 +234,8 @@ public static boolean isServerReturnFinalResultKeyUnpartitioned(Map<String, Stri
234234
return Boolean.parseBoolean(queryOptions.get(QueryOptionKey.SERVER_RETURN_FINAL_RESULT_KEY_UNPARTITIONED));
235235
}
236236

237-
public static Optional<Boolean> isFilteredAggregationsComputeAllGroups(Map<String, String> queryOptions) {
238-
String value = queryOptions.get(QueryOptionKey.FILTERED_AGGREGATIONS_COMPUTE_ALL_GROUPS);
239-
if (value == null) {
240-
return Optional.empty();
241-
}
242-
return Optional.of(Boolean.parseBoolean(value));
237+
public static boolean isFilteredAggregationsSkipEmptyGroups(Map<String, String> queryOptions) {
238+
return Boolean.parseBoolean(queryOptions.get(QueryOptionKey.FILTERED_AGGREGATIONS_SKIP_EMPTY_GROUPS));
243239
}
244240

245241
@Nullable

pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -385,12 +385,14 @@ public static List<AggregationInfo> buildFilteredAggregationInfos(SegmentContext
385385
}
386386
}
387387

388-
if (!nonFilteredFunctions.isEmpty() || (QueryOptionsUtils.isFilteredAggregationsComputeAllGroups(
389-
queryContext.getQueryOptions()).orElse(false))) {
390-
// If there are no non-filtered aggregation functions, but the query option to compute all groups is set, we
391-
// add a new AggregationInfo with an empty AggregationFunction array and the main query filter so that the
392-
// GroupByExecutor will compute all the groups (from the result of applying the main query filter) but no
393-
// unnecessary additional aggregation will be done since the AggregationFunction array is empty.
388+
if (!nonFilteredFunctions.isEmpty() || ((queryContext.getGroupByExpressions() != null)
389+
&& !QueryOptionsUtils.isFilteredAggregationsSkipEmptyGroups(queryContext.getQueryOptions()))) {
390+
// If there are no non-filtered aggregation functions for a group by query, we still add a new AggregationInfo
391+
// with an empty AggregationFunction array and the main query filter so that the GroupByExecutor will compute all
392+
// the groups (from the result of applying the main query filter) but no unnecessary additional aggregation will
393+
// be done since the AggregationFunction array is empty. However, if the query option to skip empty groups is
394+
// enabled, we don't do this in order to avoid unnecessary computation of empty groups (which can be very
395+
// expensive if the main filter has high selectivity).
394396
AggregationFunction[] aggregationFunctions = nonFilteredFunctions.toArray(new AggregationFunction[0]);
395397
aggregationInfos.add(
396398
buildAggregationInfo(segmentContext, queryContext, aggregationFunctions, mainFilter, mainFilterOperator,

pinot-core/src/test/java/org/apache/pinot/queries/FilteredAggregationsTest.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.apache.pinot.spi.data.Schema;
4242
import org.apache.pinot.spi.data.readers.GenericRow;
4343
import org.apache.pinot.spi.data.readers.RecordReader;
44+
import org.apache.pinot.spi.utils.CommonConstants;
4445
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
4546
import org.testng.annotations.AfterClass;
4647
import org.testng.annotations.BeforeClass;
@@ -394,9 +395,9 @@ public void testGroupBy() {
394395

395396
@Test
396397
public void testGroupByMultipleColumns() {
397-
String filterQuery =
398-
"SELECT SUM(INT_COL) FILTER(WHERE INT_COL > 25000) testSum FROM MyTable GROUP BY BOOLEAN_COL, STRING_COL "
399-
+ "ORDER BY BOOLEAN_COL, STRING_COL";
398+
String filterQuery = "SET " + CommonConstants.Broker.Request.QueryOptionKey.FILTERED_AGGREGATIONS_SKIP_EMPTY_GROUPS
399+
+ "=true; SELECT SUM(INT_COL) FILTER(WHERE INT_COL > 25000) testSum FROM MyTable GROUP BY BOOLEAN_COL, "
400+
+ "STRING_COL ORDER BY BOOLEAN_COL, STRING_COL";
400401
String nonFilterQuery =
401402
"SELECT SUM(INT_COL) testSum FROM MyTable WHERE INT_COL > 25000 GROUP BY BOOLEAN_COL, STRING_COL "
402403
+ "ORDER BY BOOLEAN_COL, STRING_COL";

pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1040,16 +1040,16 @@ public void testFilteredAggregationWithNoValueMatchingAggregationFilterWithOptio
10401040
throws Exception {
10411041
// Use a hint to ensure that the aggregation will not be pushed to the leaf stage, so that we can test the
10421042
// MultistageGroupByExecutor
1043-
String sqlQuery = "SET " + CommonConstants.Broker.Request.QueryOptionKey.FILTERED_AGGREGATIONS_COMPUTE_ALL_GROUPS
1044-
+ "=false; SELECT /*+ aggOptions(is_skip_leaf_stage_group_by='true') */"
1043+
String sqlQuery = "SET " + CommonConstants.Broker.Request.QueryOptionKey.FILTERED_AGGREGATIONS_SKIP_EMPTY_GROUPS
1044+
+ "=true; SELECT /*+ aggOptions(is_skip_leaf_stage_group_by='true') */"
10451045
+ "AirlineID, COUNT(*) FILTER (WHERE Origin = 'garbage') FROM mytable WHERE AirlineID > 20000 GROUP BY "
10461046
+ "AirlineID";
10471047

10481048
JsonNode result = postQuery(sqlQuery);
10491049
assertNoError(result);
10501050

10511051
// Result set will be empty since the aggregation filter does not match any rows, and we've set the query option to
1052-
// not compute all groups
1052+
// skip empty groups
10531053
assertEquals(result.get("numRowsResultSet").asInt(), 0);
10541054
}
10551055

pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3730,12 +3730,10 @@ public void testSkipIndexes(boolean useMultiStageQueryEngine)
37303730
reloadAllSegments(TEST_UPDATED_RANGE_INDEX_QUERY, true, numTotalDocs);
37313731
}
37323732

3733-
@Test
3734-
public void testFilteredAggregationWithNoValueMatchingAggregationFilterDefault()
3733+
@Test(dataProvider = "useBothQueryEngines")
3734+
public void testFilteredAggregationWithNoValueMatchingAggregationFilterDefault(boolean useMultiStageQueryEngine)
37353735
throws Exception {
3736-
// The multi-stage query engine computes all groups by default (even without the query option) for group by queries
3737-
// with only filtered aggregates since that is the SQL standard behavior.
3738-
setUseMultiStageQueryEngine(false);
3736+
setUseMultiStageQueryEngine(useMultiStageQueryEngine);
37393737

37403738
String sqlQuery =
37413739
"SELECT AirlineID, COUNT(*) FILTER (WHERE Origin = 'garbage') FROM mytable WHERE AirlineID > 20000 GROUP BY "
@@ -3744,30 +3742,31 @@ public void testFilteredAggregationWithNoValueMatchingAggregationFilterDefault()
37443742
JsonNode result = postQuery(sqlQuery);
37453743
assertNoError(result);
37463744

3747-
// Result set will be empty by default since the aggregation filter does not match any rows
3748-
assertEquals(result.get("numRowsResultSet").asInt(), 0);
3745+
// Ensure that result set is not empty since all groups should be computed by default
3746+
assertTrue(result.get("numRowsResultSet").asInt() > 0);
3747+
3748+
// Ensure that the count is 0 for all groups (because the aggregation filter does not match any rows)
3749+
JsonNode rows = result.get("resultTable").get("rows");
3750+
for (int i = 0; i < rows.size(); i++) {
3751+
assertEquals(rows.get(i).get(1).asInt(), 0);
3752+
// Ensure that the main filter was applied
3753+
assertTrue(rows.get(i).get(0).asInt() > 20000);
3754+
}
37493755
}
37503756

37513757
@Test(dataProvider = "useBothQueryEngines")
37523758
public void testFilteredAggregationWithNoValueMatchingAggregationFilterWithOption(boolean useMultiStageQueryEngine)
37533759
throws Exception {
37543760
setUseMultiStageQueryEngine(useMultiStageQueryEngine);
37553761
String sqlQuery =
3756-
"SET " + CommonConstants.Broker.Request.QueryOptionKey.FILTERED_AGGREGATIONS_COMPUTE_ALL_GROUPS + "=true; "
3762+
"SET " + CommonConstants.Broker.Request.QueryOptionKey.FILTERED_AGGREGATIONS_SKIP_EMPTY_GROUPS + "=true; "
37573763
+ "SELECT AirlineID, COUNT(*) FILTER (WHERE Origin = 'garbage') FROM mytable WHERE AirlineID > 20000 "
37583764
+ "GROUP BY AirlineID";
37593765
JsonNode result = postQuery(sqlQuery);
37603766
assertNoError(result);
37613767

3762-
// Ensure that result set is not empty since all groups should be computed now
3763-
assertTrue(result.get("numRowsResultSet").asInt() > 0);
3764-
3765-
// Ensure that the count is 0 for all groups (because the aggregation filter does not match any rows)
3766-
JsonNode rows = result.get("resultTable").get("rows");
3767-
for (int i = 0; i < rows.size(); i++) {
3768-
assertEquals(rows.get(i).get(1).asInt(), 0);
3769-
// Ensure that the main filter was applied
3770-
assertTrue(rows.get(i).get(0).asInt() > 20000);
3771-
}
3768+
// Result set will be empty since the aggregation filter does not match any rows, and we've set the option to skip
3769+
// empty groups
3770+
assertEquals(result.get("numRowsResultSet").asInt(), 0);
37723771
}
37733772
}

pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageGroupByExecutor.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public class MultistageGroupByExecutor {
5858
private final AggType _aggType;
5959
private final DataSchema _resultSchema;
6060
private final int _numGroupsLimit;
61-
private final boolean _filteredAggregationsComputeAllGroups;
61+
private final boolean _filteredAggregationsSkipEmptyGroups;
6262

6363
// Group By Result holders for each mode
6464
private final GroupByResultHolder[] _aggregateResultHolders;
@@ -82,8 +82,7 @@ public MultistageGroupByExecutor(int[] groupKeyIds, AggregationFunction[] aggFun
8282

8383
// By default, we compute all groups for SQL compliant results. However, we allow overriding this behavior via
8484
// query option for improved performance.
85-
_filteredAggregationsComputeAllGroups =
86-
QueryOptionsUtils.isFilteredAggregationsComputeAllGroups(opChainMetadata).orElse(true);
85+
_filteredAggregationsSkipEmptyGroups = QueryOptionsUtils.isFilteredAggregationsSkipEmptyGroups(opChainMetadata);
8786

8887
int numFunctions = aggFunctions.length;
8988
if (!aggType.isInputIntermediateFormat()) {
@@ -247,9 +246,10 @@ private void processAggregate(TransferableBlock block) {
247246
aggFunction.aggregateGroupBySV(numMatchedRows, filteredIntKeys, groupByResultHolder, blockValSetMap);
248247
}
249248
}
250-
if (intKeys == null && _filteredAggregationsComputeAllGroups) {
249+
if (intKeys == null && !_filteredAggregationsSkipEmptyGroups) {
251250
// _groupIdGenerator should still have all the groups even if there are only filtered aggregates for SQL
252-
// compliant results.
251+
// compliant results. However, if the query option to skip empty groups is set, we avoid this step for
252+
// improved performance.
253253
generateGroupByKeys(block);
254254
}
255255
}

pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -227,10 +227,6 @@ private static void updateQueryOptions(PinotQuery pinotQuery, OpChainExecutionCo
227227
Map<String, String> queryOptions = new HashMap<>(executionContext.getOpChainMetadata());
228228
queryOptions.put(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS,
229229
Long.toString(executionContext.getDeadlineMs() - System.currentTimeMillis()));
230-
// All groups should be computed by default for group by queries with only filtered aggregations in the
231-
// multi-stage query engine since that is the standard SQL behavior.
232-
queryOptions.putIfAbsent(CommonConstants.Broker.Request.QueryOptionKey.FILTERED_AGGREGATIONS_COMPUTE_ALL_GROUPS,
233-
"true");
234230
pinotQuery.setQueryOptions(queryOptions);
235231
}
236232

pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -459,14 +459,14 @@ public static class QueryOptionKey {
459459
// fashion with limited compute.
460460
public static final String IS_SECONDARY_WORKLOAD = "isSecondaryWorkload";
461461

462-
// For group by queries with only filtered aggregations (and no non-filtered aggregations), the v1 query engine
463-
// does not compute all groups by default - instead, it will only compute the groups from the filtered result
464-
// set (i.e., union of the main query filter and all the individual aggregation filters). This is good for
465-
// performance, since indexes can be used, but the result won't match standard SQL behavior (where all groups
466-
// are expected to be returned). If this option is set to true, the v1 query engine will compute all groups for
467-
// group by queries with only filtered aggregations. This could require a full scan if the main query does not
468-
// have a filter and performance could be much worse, but the result will be SQL compliant.
469-
public static final String FILTERED_AGGREGATIONS_COMPUTE_ALL_GROUPS = "filteredAggregationsComputeAllGroups";
462+
// For group by queries with only filtered aggregations (and no non-filtered aggregations), the default behavior
463+
// is to compute all groups over the rows matching the main query filter. This ensures SQL compliant results,
464+
// since empty groups are also expected to be returned in such queries. However, this could be quite inefficient
465+
// if the main query does not have a filter (since a scan would be required to compute all groups). In case
466+
// users are okay with skipping empty groups - i.e., only the groups matching at least one aggregation filter
467+
// will be returned - this query option can be set. This is useful for performance, since indexes can be used
468+
// for the aggregation filters and a full scan can be avoided.
469+
public static final String FILTERED_AGGREGATIONS_SKIP_EMPTY_GROUPS = "filteredAggregationsSkipEmptyGroups";
470470
}
471471

472472
public static class QueryOptionValue {

0 commit comments

Comments
 (0)