Skip to content

Commit af3dc6a

Browse files
authored
Modify segment metadata call (#14250)
* Modify segment metadata call * fix tests * add per server metadata api * remove v2 api * Add v2 API and server test, separate functions * fix test * merge getSegmentsMetadataInternalV2 into getSegmentsMetadataInternal * Address comment and add test * final fixes * address comments * address comments * update log
1 parent f8d2c69 commit af3dc6a

File tree

8 files changed

+333
-62
lines changed

8 files changed

+333
-62
lines changed

pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -926,17 +926,20 @@ private void deleteSegmentsInternal(String tableNameWithType, List<String> segme
926926
public String getServerMetadata(
927927
@ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
928928
@ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String tableTypeStr,
929-
@ApiParam(value = "Columns name", allowMultiple = true) @QueryParam("columns") @DefaultValue("")
930-
List<String> columns, @Context HttpHeaders headers) {
929+
@Encoded @ApiParam(value = "Segments to include (all if not specified)", allowMultiple = true)
930+
@QueryParam("segments") @Nullable List<String> segments,
931+
@Encoded @ApiParam(value = "Columns name", allowMultiple = true) @QueryParam("columns")
932+
@Nullable List<String> columns, @Context HttpHeaders headers) {
931933
tableName = DatabaseUtils.translateTableName(tableName, headers);
932-
LOGGER.info("Received a request to fetch metadata for all segments for table {}", tableName);
934+
String segmentCount = (segments == null) ? "all" : String.valueOf(segments.size());
935+
LOGGER.info("Received a request to fetch metadata for {} segments for table {}", segmentCount, tableName);
933936
TableType tableType = Constants.validateTableType(tableTypeStr);
934937

935938
String tableNameWithType =
936939
ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableType, LOGGER).get(0);
937940
String segmentsMetadata;
938941
try {
939-
JsonNode segmentsMetadataJson = getSegmentsMetadataFromServer(tableNameWithType, columns);
942+
JsonNode segmentsMetadataJson = getSegmentsMetadataFromServer(tableNameWithType, columns, segments);
940943
segmentsMetadata = JsonUtils.objectToPrettyString(segmentsMetadataJson);
941944
} catch (InvalidConfigException e) {
942945
throw new ControllerApplicationException(LOGGER, e.getMessage(), Status.BAD_REQUEST);
@@ -1156,14 +1159,17 @@ public List<Map<TableType, List<String>>> getSelectedSegments(
11561159
* This is a helper method to get the metadata for all segments for a given table name.
11571160
* @param tableNameWithType name of the table along with its type
11581161
* @param columns name of the columns
1162+
* @param segments name of the segments to include in metadata
11591163
* @return Map<String, String> metadata of the table segments -> map of segment name to its metadata
11601164
*/
1161-
private JsonNode getSegmentsMetadataFromServer(String tableNameWithType, List<String> columns)
1165+
private JsonNode getSegmentsMetadataFromServer(String tableNameWithType, @Nullable List<String> columns,
1166+
@Nullable List<String> segments)
11621167
throws InvalidConfigException, IOException {
11631168
TableMetadataReader tableMetadataReader =
11641169
new TableMetadataReader(_executor, _connectionManager, _pinotHelixResourceManager);
11651170
return tableMetadataReader
1166-
.getSegmentsMetadata(tableNameWithType, columns, _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
1171+
.getSegmentsMetadata(tableNameWithType, columns, segments,
1172+
_controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
11671173
}
11681174

11691175
@POST

pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import javax.ws.rs.client.ClientBuilder;
4040
import javax.ws.rs.core.MediaType;
4141
import javax.ws.rs.core.Response;
42+
import org.apache.commons.collections.CollectionUtils;
4243
import org.apache.commons.lang3.tuple.Pair;
4344
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
4445
import org.apache.hc.client5.http.io.HttpClientConnectionManager;
@@ -65,6 +66,8 @@
6566
*/
6667
public class ServerSegmentMetadataReader {
6768
private static final Logger LOGGER = LoggerFactory.getLogger(ServerSegmentMetadataReader.class);
69+
private static final String COLUMNS_KEY = "columns";
70+
private static final String SEGMENTS_KEY = "segments";
6871

6972
private final Executor _executor;
7073
private final HttpClientConnectionManager _connectionManager;
@@ -430,18 +433,25 @@ public Map<String, TableStaleSegmentResponse> getStaleSegmentsFromServer(
430433
private String generateAggregateSegmentMetadataServerURL(String tableNameWithType, List<String> columns,
431434
String endpoint) {
432435
tableNameWithType = URLEncoder.encode(tableNameWithType, StandardCharsets.UTF_8);
433-
String paramsStr = generateColumnsParam(columns);
436+
String paramsStr = generateParam(COLUMNS_KEY, columns);
434437
return String.format("%s/tables/%s/metadata?%s", endpoint, tableNameWithType, paramsStr);
435438
}
436439

437-
private String generateSegmentMetadataServerURL(String tableNameWithType, String segmentName, List<String> columns,
438-
String endpoint) {
440+
public String generateSegmentMetadataServerURL(String tableNameWithType, String segmentName,
441+
@Nullable List<String> columns, String endpoint) {
439442
tableNameWithType = URLEncoder.encode(tableNameWithType, StandardCharsets.UTF_8);
440443
segmentName = URLEncoder.encode(segmentName, StandardCharsets.UTF_8);
441-
String paramsStr = generateColumnsParam(columns);
444+
String paramsStr = generateParam(COLUMNS_KEY, columns);
442445
return String.format("%s/tables/%s/segments/%s/metadata?%s", endpoint, tableNameWithType, segmentName, paramsStr);
443446
}
444447

448+
public String generateTableMetadataServerURL(String tableNameWithType, @Nullable List<String> columns,
449+
@Nullable List<String> segmentsToInclude, String endpoint) {
450+
tableNameWithType = URLEncoder.encode(tableNameWithType, StandardCharsets.UTF_8);
451+
String paramsStr = generateParam(COLUMNS_KEY, columns) + "&" + generateParam(SEGMENTS_KEY, segmentsToInclude);
452+
return String.format("%s/tables/%s/segments/metadata?%s", endpoint, tableNameWithType, paramsStr);
453+
}
454+
445455
private String generateCheckReloadSegmentsServerURL(String tableNameWithType, String endpoint) {
446456
tableNameWithType = URLEncoder.encode(tableNameWithType, StandardCharsets.UTF_8);
447457
return String.format("%s/tables/%s/segments/needReload", endpoint, tableNameWithType);
@@ -488,24 +498,24 @@ private Pair<String, String> generateValidDocIdsMetadataURL(String tableNameWith
488498
return Pair.of(url, jsonTableSegments);
489499
}
490500

491-
private String generateColumnsParam(List<String> columns) {
501+
private String generateStaleSegmentsServerURL(String tableNameWithType, String endpoint) {
502+
tableNameWithType = URLEncoder.encode(tableNameWithType, StandardCharsets.UTF_8);
503+
return String.format("%s/tables/%s/segments/isStale", endpoint, tableNameWithType);
504+
}
505+
506+
private String generateParam(String key, List<String> values) {
492507
String paramsStr = "";
493-
if (columns == null || columns.isEmpty()) {
508+
if (CollectionUtils.isEmpty(values)) {
494509
return paramsStr;
495510
}
496-
List<String> params = new ArrayList<>(columns.size());
497-
for (String column : columns) {
498-
params.add(String.format("columns=%s", column));
511+
List<String> params = new ArrayList<>(values.size());
512+
for (String value : values) {
513+
params.add(key + "=" + value);
499514
}
500515
paramsStr = String.join("&", params);
501516
return paramsStr;
502517
}
503518

504-
private String generateStaleSegmentsServerURL(String tableNameWithType, String endpoint) {
505-
tableNameWithType = URLEncoder.encode(tableNameWithType, StandardCharsets.UTF_8);
506-
return String.format("%s/tables/%s/segments/isStale", endpoint, tableNameWithType);
507-
}
508-
509519
public class TableReloadResponse {
510520
private int _numFailedResponses;
511521
private List<String> _serverReloadResponses;

pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java

Lines changed: 83 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,11 @@
1919
package org.apache.pinot.controller.util;
2020

2121
import com.fasterxml.jackson.databind.JsonNode;
22+
import com.fasterxml.jackson.databind.ObjectMapper;
23+
import com.fasterxml.jackson.databind.node.ObjectNode;
2224
import com.google.common.collect.BiMap;
2325
import java.io.IOException;
26+
import java.util.ArrayList;
2427
import java.util.Collections;
2528
import java.util.HashMap;
2629
import java.util.HashSet;
@@ -29,6 +32,7 @@
2932
import java.util.Set;
3033
import java.util.concurrent.Executor;
3134
import java.util.stream.Collectors;
35+
import javax.annotation.Nullable;
3236
import org.apache.hc.client5.http.io.HttpClientConnectionManager;
3337
import org.apache.helix.model.ExternalView;
3438
import org.apache.pinot.common.exception.InvalidConfigException;
@@ -40,6 +44,8 @@
4044
import org.apache.pinot.spi.utils.CommonConstants;
4145
import org.apache.pinot.spi.utils.JsonUtils;
4246
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
47+
import org.slf4j.Logger;
48+
import org.slf4j.LoggerFactory;
4349

4450

4551
/**
@@ -50,6 +56,7 @@
5056
* the column indexes available.
5157
*/
5258
public class TableMetadataReader {
59+
private static final Logger log = LoggerFactory.getLogger(TableMetadataReader.class);
5360
private final Executor _executor;
5461
private final HttpClientConnectionManager _connectionManager;
5562
private final PinotHelixResourceManager _pinotHelixResourceManager;
@@ -127,50 +134,95 @@ private TableReloadJsonResponse processSegmentMetadataReloadResponse(
127134

128135
/**
129136
* This api takes in list of segments for which we need the metadata.
137+
* This calls the server to get the metadata for all segments instead of making a call per segment.
130138
*/
131-
public JsonNode getSegmentsMetadata(String tableNameWithType, List<String> columns, Set<String> segmentsToInclude,
132-
int timeoutMs)
139+
public JsonNode getSegmentsMetadata(String tableNameWithType, @Nullable List<String> columns,
140+
@Nullable List<String> segments, int timeoutMs)
133141
throws InvalidConfigException, IOException {
134-
return getSegmentsMetadataInternal(tableNameWithType, columns, segmentsToInclude, timeoutMs);
142+
return getSegmentsMetadataInternal(tableNameWithType, columns, segments, timeoutMs);
135143
}
136144

137-
private JsonNode getSegmentsMetadataInternal(String tableNameWithType, List<String> columns,
138-
Set<String> segmentsToInclude, int timeoutMs)
145+
/**
146+
* Common helper used by both the new (server-level) and legacy (segment-level) endpoints.
147+
*/
148+
private JsonNode fetchAndAggregateMetadata(List<String> urls, BiMap<String, String> endpoints, boolean perSegmentJson,
149+
String tableNameWithType, int timeoutMs)
139150
throws InvalidConfigException, IOException {
140-
final Map<String, List<String>> serverToSegmentsMap =
141-
_pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType);
142-
BiMap<String, String> endpoints =
143-
_pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegmentsMap.keySet());
144-
ServerSegmentMetadataReader serverSegmentMetadataReader =
145-
new ServerSegmentMetadataReader(_executor, _connectionManager);
151+
CompletionServiceHelper cs = new CompletionServiceHelper(_executor, _connectionManager, endpoints);
152+
CompletionServiceHelper.CompletionServiceResponse resp =
153+
cs.doMultiGetRequest(urls, tableNameWithType, perSegmentJson, timeoutMs);
154+
// all requests will fail if new server endpoint is not available
155+
if (resp._failedResponseCount > 0) {
156+
throw new RuntimeException(String.format("Got %d failed responses from total %d server instances. "
157+
+ "Falling back to legacy segment metadata api", resp._failedResponseCount, urls.size()));
158+
}
146159

147-
// Filter segments that we need
148-
for (Map.Entry<String, List<String>> serverToSegment : serverToSegmentsMap.entrySet()) {
149-
List<String> segments = serverToSegment.getValue();
150-
if (segmentsToInclude != null && !segmentsToInclude.isEmpty()) {
151-
segments.retainAll(segmentsToInclude);
160+
ObjectMapper mapper = new ObjectMapper();
161+
ObjectNode aggregatedNode = mapper.createObjectNode();
162+
for (String body : resp._httpResponses.values()) {
163+
JsonNode node = JsonUtils.stringToJsonNode(body);
164+
// legacy returns one JSON per segment; new returns one JSON with many fields
165+
if (perSegmentJson) {
166+
String segmentName = node.get("segmentName").asText();
167+
aggregatedNode.set(segmentName, node);
168+
} else {
169+
node.fields().forEachRemaining(entry -> aggregatedNode.set(entry.getKey(), entry.getValue()));
152170
}
153171
}
172+
return aggregatedNode;
173+
}
154174

155-
List<String> segmentsMetadata =
156-
serverSegmentMetadataReader.getSegmentMetadataFromServer(tableNameWithType, serverToSegmentsMap, endpoints,
157-
columns, timeoutMs);
158-
Map<String, JsonNode> response = new HashMap<>();
159-
for (String segmentMetadata : segmentsMetadata) {
160-
JsonNode responseJson = JsonUtils.stringToJsonNode(segmentMetadata);
161-
response.put(responseJson.get("segmentName").asText(), responseJson);
175+
private List<String> buildTableLevelUrls(Map<String, List<String>> serverToSegs, BiMap<String, String> endpoints,
176+
String tableNameWithType, List<String> columns, List<String> segmentsFilter, ServerSegmentMetadataReader reader) {
177+
List<String> urls = new ArrayList<>(serverToSegs.size());
178+
for (String server : serverToSegs.keySet()) {
179+
urls.add(reader.generateTableMetadataServerURL(
180+
tableNameWithType, columns, segmentsFilter, endpoints.get(server)));
162181
}
163-
return JsonUtils.objectToJsonNode(response);
182+
return urls;
164183
}
165184

166-
/**
167-
* This method retrieves the full segment metadata for a given table.
168-
* Currently supports only OFFLINE tables.
169-
* @return a map of segmentName to its metadata
170-
*/
171-
public JsonNode getSegmentsMetadata(String tableNameWithType, List<String> columns, int timeoutMs)
185+
private List<String> buildSegmentLevelUrls(Map<String, List<String>> serverToSegs, BiMap<String, String> endpoints,
186+
String tableNameWithType, List<String> columns, List<String> segmentsFilter, ServerSegmentMetadataReader reader) {
187+
List<String> urls = new ArrayList<>();
188+
for (Map.Entry<String, List<String>> e : serverToSegs.entrySet()) {
189+
for (String segment : e.getValue()) {
190+
if (segmentsFilter == null || segmentsFilter.isEmpty()
191+
|| segmentsFilter.contains(segment)) {
192+
urls.add(reader.generateSegmentMetadataServerURL(
193+
tableNameWithType, segment, columns, endpoints.get(e.getKey())));
194+
}
195+
}
196+
}
197+
return urls;
198+
}
199+
200+
private JsonNode getSegmentsMetadataInternal(String tableNameWithType, @Nullable List<String> columns,
201+
@Nullable List<String> segments, int timeoutMs)
172202
throws InvalidConfigException, IOException {
173-
return getSegmentsMetadataInternal(tableNameWithType, columns, null, timeoutMs);
203+
Map<String, List<String>> serverToSegs =
204+
_pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType);
205+
BiMap<String, String> endpoints =
206+
_pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegs.keySet());
207+
ServerSegmentMetadataReader reader =
208+
new ServerSegmentMetadataReader(_executor, _connectionManager);
209+
210+
// try table level endpoint first
211+
try {
212+
List<String> tableUrls = buildTableLevelUrls(serverToSegs, endpoints,
213+
tableNameWithType, columns, segments, reader);
214+
return fetchAndAggregateMetadata(tableUrls, endpoints, /*perSegmentJson=*/false,
215+
tableNameWithType, timeoutMs);
216+
} catch (RuntimeException e) {
217+
log.warn("Failed to fetch table metadata for table {} using new server endpoint, falling back to legacy "
218+
+ "per-segment endpoint", tableNameWithType, e);
219+
}
220+
221+
// legacy per segment endpoint
222+
List<String> segmentUrls = buildSegmentLevelUrls(serverToSegs, endpoints,
223+
tableNameWithType, columns, segments, reader);
224+
return fetchAndAggregateMetadata(segmentUrls, endpoints.inverse(), /*perSegmentJson=*/true,
225+
tableNameWithType, timeoutMs);
174226
}
175227

176228
/**

pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -873,6 +873,14 @@ protected JsonNode getColumnIndexSize(String column)
873873
.get("columnIndexSizeMap").get(column);
874874
}
875875

876+
/**
877+
* Get all segment names for a given tableName and tableType.
878+
*/
879+
protected List<String> getSegmentNames(String tableName, @Nullable String tableType)
880+
throws Exception {
881+
return getControllerRequestClient().listSegments(tableName, tableType, true);
882+
}
883+
876884
protected List<ValidDocIdsMetadataInfo> getValidDocIdsMetadata(String tableNameWithType,
877885
ValidDocIdsType validDocIdsType)
878886
throws Exception {

0 commit comments

Comments
 (0)