Skip to content

Commit ac032b4

Browse files
committed
Add comprehensive tests for partition ID remapping functionality with allowPartitionRemapping flag
- Add testPartitionIdRemappingLogic() to test modulo-based remapping when allowPartitionRemapping=true * Tests 8 Kafka partitions → 4 Pinot partitions remapping using _allowPartitionRemapping flag * Validates segments with IDs 0,4 map to partition 0; 1,5 map to partition 1; etc. * Verifies fully replicated server tracking with remapped partitions - Add testPartitionIdRemappingInvalidCases() to test invalid remapping scenarios * Tests non-divisible partition counts (8 partitions → 3 partitions) with _allowPartitionRemapping=true * Validates segments are correctly marked as invalid when 8 % 3 ≠ 0 - Add testPartitionIdRemappingDisabled() to test behavior when _allowPartitionRemapping=false * Tests that segments with mismatched partition counts are marked invalid when flag is disabled * Validates only segments with exact partition count matches are accepted * Ensures _allowPartitionRemapping=false enforces strict partition count matching These tests comprehensively validate the _allowPartitionRemapping flag behavior in SegmentPartitionMetadataManager, covering both enabled (with modulo remapping) and disabled (strict matching) scenarios. All tests pass and maintain compatibility with existing functionality.
1 parent af3dc6a commit ac032b4

File tree

4 files changed

+222
-12
lines changed

4 files changed

+222
-12
lines changed

pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -587,7 +587,8 @@ public synchronized void buildRouting(String tableNameWithType) {
587587
LOGGER.info("Enabling SegmentPartitionMetadataManager for table: {} on partition column: {}",
588588
tableNameWithType, partitionConfig.getKey());
589589
partitionMetadataManager = new SegmentPartitionMetadataManager(tableNameWithType, partitionConfig.getKey(),
590-
partitionConfig.getValue().getFunctionName(), partitionConfig.getValue().getNumPartitions());
590+
partitionConfig.getValue().getFunctionName(), partitionConfig.getValue().getNumPartitions(),
591+
partitionConfig.getValue().isAllowPartitionRemapping());
591592
} else {
592593
LOGGER.warn("Cannot enable SegmentPartitionMetadataManager for table: {} with multiple partition columns: {}",
593594
tableNameWithType, columnPartitionMap.keySet());

pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpartition/SegmentPartitionMetadataManager.java

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ public class SegmentPartitionMetadataManager implements SegmentZkMetadataFetchLi
6363
private final String _partitionColumn;
6464
private final String _partitionFunctionName;
6565
private final int _numPartitions;
66+
private final boolean _allowPartitionRemapping;
6667

6768
// cache-able content, only follow changes if onlineSegments list (of ideal-state) is changed.
6869
private final Map<String, SegmentInfo> _segmentInfoMap = new HashMap<>();
@@ -72,11 +73,12 @@ public class SegmentPartitionMetadataManager implements SegmentZkMetadataFetchLi
7273
private transient TablePartitionReplicatedServersInfo _tablePartitionReplicatedServersInfo;
7374

7475
public SegmentPartitionMetadataManager(String tableNameWithType, String partitionColumn, String partitionFunctionName,
75-
int numPartitions) {
76+
int numPartitions, boolean allowPartitionRemapping) {
7677
_tableNameWithType = tableNameWithType;
7778
_partitionColumn = partitionColumn;
7879
_partitionFunctionName = partitionFunctionName;
7980
_numPartitions = numPartitions;
81+
_allowPartitionRemapping = allowPartitionRemapping;
8082
}
8183

8284
@Override
@@ -106,14 +108,25 @@ private int getPartitionId(String segment, @Nullable ZNRecord znRecord) {
106108
if (!_partitionFunctionName.equalsIgnoreCase(partitionFunction.getName())) {
107109
return INVALID_PARTITION_ID;
108110
}
109-
if (_numPartitions != partitionFunction.getNumPartitions()) {
110-
return INVALID_PARTITION_ID;
111-
}
112-
Set<Integer> partitions = segmentPartitionInfo.getPartitions();
113-
if (partitions.size() != 1) {
114-
return INVALID_PARTITION_ID;
111+
if (_allowPartitionRemapping) {
112+
if (partitionFunction.getNumPartitions() % _numPartitions != 0) {
113+
return INVALID_PARTITION_ID;
114+
}
115+
Set<Integer> partitions = segmentPartitionInfo.getPartitions();
116+
if (partitions.size() != 1) {
117+
return INVALID_PARTITION_ID;
118+
}
119+
return partitions.iterator().next() % _numPartitions;
120+
} else {
121+
if (partitionFunction.getNumPartitions() != _numPartitions) {
122+
return INVALID_PARTITION_ID;
123+
}
124+
Set<Integer> partitions = segmentPartitionInfo.getPartitions();
125+
if (partitions.size() != 1) {
126+
return INVALID_PARTITION_ID;
127+
}
128+
return partitions.iterator().next();
115129
}
116-
return partitions.iterator().next();
117130
}
118131

119132
private static long getCreationTimeMs(@Nullable ZNRecord znRecord) {

pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpartition/SegmentPartitionMetadataManagerTest.java

Lines changed: 186 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ public void testPartitionMetadataManagerProcessingThroughSegmentChangesSinglePar
8888

8989
SegmentPartitionMetadataManager partitionMetadataManager =
9090
new SegmentPartitionMetadataManager(OFFLINE_TABLE_NAME, PARTITION_COLUMN, PARTITION_COLUMN_FUNC,
91-
NUM_PARTITIONS);
91+
NUM_PARTITIONS, false);
9292
SegmentZkMetadataFetcher segmentZkMetadataFetcher =
9393
new SegmentZkMetadataFetcher(OFFLINE_TABLE_NAME, _propertyStore);
9494
segmentZkMetadataFetcher.register(partitionMetadataManager);
@@ -272,6 +272,191 @@ public void testPartitionMetadataManagerProcessingThroughSegmentChangesSinglePar
272272
assertTrue(tablePartitionReplicatedServersInfo.getSegmentsWithInvalidPartition().isEmpty());
273273
}
274274

275+
@Test
276+
public void testPartitionIdRemappingLogic() {
277+
ExternalView externalView = new ExternalView(OFFLINE_TABLE_NAME);
278+
Map<String, Map<String, String>> segmentAssignment = externalView.getRecord().getMapFields();
279+
Map<String, String> onlineInstanceStateMap = ImmutableMap.of(SERVER_0, ONLINE, SERVER_1, ONLINE);
280+
Set<String> onlineSegments = new HashSet<>();
281+
IdealState idealState = new IdealState(OFFLINE_TABLE_NAME);
282+
283+
// Create partition metadata manager with remapping enabled (4 table partitions from 8 segment partitions)
284+
SegmentPartitionMetadataManager partitionMetadataManager =
285+
new SegmentPartitionMetadataManager(OFFLINE_TABLE_NAME, PARTITION_COLUMN, PARTITION_COLUMN_FUNC, 4, true);
286+
SegmentZkMetadataFetcher segmentZkMetadataFetcher =
287+
new SegmentZkMetadataFetcher(OFFLINE_TABLE_NAME, _propertyStore);
288+
segmentZkMetadataFetcher.register(partitionMetadataManager);
289+
290+
// Initial state should be empty
291+
segmentZkMetadataFetcher.init(idealState, externalView, onlineSegments);
292+
TablePartitionReplicatedServersInfo tablePartitionReplicatedServersInfo = partitionMetadataManager
293+
.getTablePartitionReplicatedServersInfo();
294+
assertEquals(tablePartitionReplicatedServersInfo.getPartitionInfoMap(),
295+
new TablePartitionReplicatedServersInfo.PartitionInfo[4]);
296+
297+
// Add segments with partition IDs 0, 4 (should both map to partition 0 via modulo)
298+
String segment0 = "segment_partition_0";
299+
String segment4 = "segment_partition_4";
300+
onlineSegments.add(segment0);
301+
onlineSegments.add(segment4);
302+
segmentAssignment.put(segment0, Collections.singletonMap(SERVER_0, ONLINE));
303+
segmentAssignment.put(segment4, Collections.singletonMap(SERVER_1, ONLINE));
304+
305+
// Set metadata for segments with 8 total partitions (will be remapped to 4)
306+
setSegmentZKMetadata(segment0, PARTITION_COLUMN_FUNC, 8, 0, 0L);
307+
setSegmentZKMetadata(segment4, PARTITION_COLUMN_FUNC, 8, 4, 0L);
308+
309+
segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, onlineSegments);
310+
tablePartitionReplicatedServersInfo = partitionMetadataManager.getTablePartitionReplicatedServersInfo();
311+
TablePartitionReplicatedServersInfo.PartitionInfo[] partitionInfoMap = tablePartitionReplicatedServersInfo
312+
.getPartitionInfoMap();
313+
314+
// Both segments should be in partition 0 (0 % 4 = 0, 4 % 4 = 0)
315+
assertNull(partitionInfoMap[1]);
316+
assertNull(partitionInfoMap[2]);
317+
assertNull(partitionInfoMap[3]);
318+
// No single server has all segments in partition 0, so fullyReplicatedServers should be empty
319+
assertTrue(partitionInfoMap[0]._fullyReplicatedServers.isEmpty());
320+
assertEqualsNoOrder(partitionInfoMap[0]._segments.toArray(), new String[]{segment0, segment4});
321+
322+
// Add segments with partition IDs 1, 5 (should both map to partition 1)
323+
String segment1 = "segment_partition_1";
324+
String segment5 = "segment_partition_5";
325+
onlineSegments.add(segment1);
326+
onlineSegments.add(segment5);
327+
segmentAssignment.put(segment1, Collections.singletonMap(SERVER_0, ONLINE));
328+
segmentAssignment.put(segment5, Collections.singletonMap(SERVER_0, ONLINE));
329+
330+
setSegmentZKMetadata(segment1, PARTITION_COLUMN_FUNC, 8, 1, 0L);
331+
setSegmentZKMetadata(segment5, PARTITION_COLUMN_FUNC, 8, 5, 0L);
332+
333+
segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, onlineSegments);
334+
tablePartitionReplicatedServersInfo = partitionMetadataManager.getTablePartitionReplicatedServersInfo();
335+
partitionInfoMap = tablePartitionReplicatedServersInfo.getPartitionInfoMap();
336+
337+
// Partition 1 should have both segments on SERVER_0, making it fully replicated
338+
assertEquals(partitionInfoMap[1]._fullyReplicatedServers, Collections.singleton(SERVER_0));
339+
assertEqualsNoOrder(partitionInfoMap[1]._segments.toArray(), new String[]{segment1, segment5});
340+
341+
// Add segments with partition IDs 2, 6 (should both map to partition 2)
342+
String segment2 = "segment_partition_2";
343+
String segment6 = "segment_partition_6";
344+
onlineSegments.add(segment2);
345+
onlineSegments.add(segment6);
346+
segmentAssignment.put(segment2, ImmutableMap.of(SERVER_0, ONLINE, SERVER_1, ONLINE));
347+
segmentAssignment.put(segment6, ImmutableMap.of(SERVER_0, ONLINE, SERVER_1, ONLINE));
348+
349+
setSegmentZKMetadata(segment2, PARTITION_COLUMN_FUNC, 8, 2, 0L);
350+
setSegmentZKMetadata(segment6, PARTITION_COLUMN_FUNC, 8, 6, 0L);
351+
352+
segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, onlineSegments);
353+
tablePartitionReplicatedServersInfo = partitionMetadataManager.getTablePartitionReplicatedServersInfo();
354+
partitionInfoMap = tablePartitionReplicatedServersInfo.getPartitionInfoMap();
355+
356+
// Partition 2 should have both segments on both servers, making both servers fully replicated
357+
assertEquals(partitionInfoMap[2]._fullyReplicatedServers, ImmutableSet.of(SERVER_0, SERVER_1));
358+
assertEqualsNoOrder(partitionInfoMap[2]._segments.toArray(), new String[]{segment2, segment6});
359+
360+
assertTrue(tablePartitionReplicatedServersInfo.getSegmentsWithInvalidPartition().isEmpty());
361+
}
362+
363+
@Test
364+
public void testPartitionIdRemappingInvalidCases() {
365+
ExternalView externalView = new ExternalView(OFFLINE_TABLE_NAME);
366+
Map<String, Map<String, String>> segmentAssignment = externalView.getRecord().getMapFields();
367+
Set<String> onlineSegments = new HashSet<>();
368+
IdealState idealState = new IdealState(OFFLINE_TABLE_NAME);
369+
370+
// Create partition metadata manager with remapping enabled but invalid divisibility (3 table partitions from 8
371+
// segment partitions)
372+
SegmentPartitionMetadataManager partitionMetadataManager =
373+
new SegmentPartitionMetadataManager(OFFLINE_TABLE_NAME, PARTITION_COLUMN, PARTITION_COLUMN_FUNC, 3, true);
374+
SegmentZkMetadataFetcher segmentZkMetadataFetcher =
375+
new SegmentZkMetadataFetcher(OFFLINE_TABLE_NAME, _propertyStore);
376+
segmentZkMetadataFetcher.register(partitionMetadataManager);
377+
378+
segmentZkMetadataFetcher.init(idealState, externalView, onlineSegments);
379+
380+
// Add segment with partition ID from 8-partition scheme (should be invalid because 8 % 3 != 0)
381+
String invalidSegment = "invalid_segment";
382+
onlineSegments.add(invalidSegment);
383+
segmentAssignment.put(invalidSegment, Collections.singletonMap(SERVER_0, ONLINE));
384+
setSegmentZKMetadata(invalidSegment, PARTITION_COLUMN_FUNC, 8, 0, 0L);
385+
386+
segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, onlineSegments);
387+
TablePartitionReplicatedServersInfo tablePartitionReplicatedServersInfo = partitionMetadataManager
388+
.getTablePartitionReplicatedServersInfo();
389+
390+
// Segment should be marked as invalid due to non-divisible partition count
391+
assertEquals(tablePartitionReplicatedServersInfo.getSegmentsWithInvalidPartition(),
392+
Collections.singletonList(invalidSegment));
393+
assertEquals(tablePartitionReplicatedServersInfo.getPartitionInfoMap(),
394+
new TablePartitionReplicatedServersInfo.PartitionInfo[3]);
395+
}
396+
397+
@Test
398+
public void testPartitionIdRemappingDisabled() {
399+
ExternalView externalView = new ExternalView(OFFLINE_TABLE_NAME);
400+
Map<String, Map<String, String>> segmentAssignment = externalView.getRecord().getMapFields();
401+
Set<String> onlineSegments = new HashSet<>();
402+
IdealState idealState = new IdealState(OFFLINE_TABLE_NAME);
403+
404+
// Create partition metadata manager with remapping DISABLED (4 table partitions, remapping = false)
405+
SegmentPartitionMetadataManager partitionMetadataManager =
406+
new SegmentPartitionMetadataManager(OFFLINE_TABLE_NAME, PARTITION_COLUMN, PARTITION_COLUMN_FUNC, 4, false);
407+
SegmentZkMetadataFetcher segmentZkMetadataFetcher =
408+
new SegmentZkMetadataFetcher(OFFLINE_TABLE_NAME, _propertyStore);
409+
segmentZkMetadataFetcher.register(partitionMetadataManager);
410+
411+
segmentZkMetadataFetcher.init(idealState, externalView, onlineSegments);
412+
413+
// Add segment with matching partition count (4 partitions) - should be valid
414+
String validSegment = "valid_segment";
415+
onlineSegments.add(validSegment);
416+
segmentAssignment.put(validSegment, Collections.singletonMap(SERVER_0, ONLINE));
417+
setSegmentZKMetadata(validSegment, PARTITION_COLUMN_FUNC, 4, 0, 0L);
418+
419+
segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, onlineSegments);
420+
TablePartitionReplicatedServersInfo tablePartitionReplicatedServersInfo = partitionMetadataManager
421+
.getTablePartitionReplicatedServersInfo();
422+
TablePartitionReplicatedServersInfo.PartitionInfo[] partitionInfoMap = tablePartitionReplicatedServersInfo
423+
.getPartitionInfoMap();
424+
425+
// Valid segment should be placed in partition 0
426+
assertEquals(partitionInfoMap[0]._fullyReplicatedServers, Collections.singleton(SERVER_0));
427+
assertEquals(partitionInfoMap[0]._segments, Collections.singleton(validSegment));
428+
assertTrue(tablePartitionReplicatedServersInfo.getSegmentsWithInvalidPartition().isEmpty());
429+
430+
// Add segment with different partition count (8 partitions) - should be invalid when remapping is disabled
431+
String invalidSegment = "invalid_segment_8_partitions";
432+
onlineSegments.add(invalidSegment);
433+
segmentAssignment.put(invalidSegment, Collections.singletonMap(SERVER_1, ONLINE));
434+
setSegmentZKMetadata(invalidSegment, PARTITION_COLUMN_FUNC, 8, 0, 0L);
435+
436+
segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, onlineSegments);
437+
tablePartitionReplicatedServersInfo = partitionMetadataManager.getTablePartitionReplicatedServersInfo();
438+
partitionInfoMap = tablePartitionReplicatedServersInfo.getPartitionInfoMap();
439+
440+
// Invalid segment should be marked as invalid, valid segment should remain in partition 0
441+
assertEquals(partitionInfoMap[0]._fullyReplicatedServers, Collections.singleton(SERVER_0));
442+
assertEquals(partitionInfoMap[0]._segments, Collections.singleton(validSegment));
443+
assertEquals(tablePartitionReplicatedServersInfo.getSegmentsWithInvalidPartition(),
444+
Collections.singletonList(invalidSegment));
445+
446+
// Add another segment with different partition count (2 partitions) - should also be invalid
447+
String invalidSegment2 = "invalid_segment_2_partitions";
448+
onlineSegments.add(invalidSegment2);
449+
segmentAssignment.put(invalidSegment2, Collections.singletonMap(SERVER_0, ONLINE));
450+
setSegmentZKMetadata(invalidSegment2, PARTITION_COLUMN_FUNC, 2, 0, 0L);
451+
452+
segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, onlineSegments);
453+
tablePartitionReplicatedServersInfo = partitionMetadataManager.getTablePartitionReplicatedServersInfo();
454+
455+
// Both invalid segments should be marked as invalid
456+
assertEqualsNoOrder(tablePartitionReplicatedServersInfo.getSegmentsWithInvalidPartition().toArray(),
457+
new String[]{invalidSegment, invalidSegment2});
458+
}
459+
275460
private void setSegmentZKMetadata(String segment, String partitionFunction, int numPartitions, int partitionId,
276461
long creationTimeMs) {
277462
SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(segment);

pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ColumnPartitionConfig.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,20 +30,23 @@ public class ColumnPartitionConfig extends BaseJsonConfig {
3030
private final String _functionName;
3131
private final int _numPartitions;
3232
private final Map<String, String> _functionConfig;
33+
private final boolean _allowPartitionRemapping;
3334

3435
public ColumnPartitionConfig(String functionName, int numPartitions) {
35-
this(functionName, numPartitions, null);
36+
this(functionName, numPartitions, null, false);
3637
}
3738

3839
@JsonCreator
3940
public ColumnPartitionConfig(@JsonProperty(value = "functionName", required = true) String functionName,
4041
@JsonProperty(value = "numPartitions", required = true) int numPartitions,
41-
@JsonProperty(value = "functionConfig") @Nullable Map<String, String> functionConfig) {
42+
@JsonProperty(value = "functionConfig") @Nullable Map<String, String> functionConfig,
43+
@JsonProperty(value = "allowPartitionRemapping", required = true) boolean allowPartitionRemapping) {
4244
Preconditions.checkArgument(functionName != null, "'functionName' must be configured");
4345
Preconditions.checkArgument(numPartitions > 0, "'numPartitions' must be positive");
4446
_functionName = functionName;
4547
_numPartitions = numPartitions;
4648
_functionConfig = functionConfig;
49+
_allowPartitionRemapping = allowPartitionRemapping;
4750
}
4851

4952
/**
@@ -73,4 +76,12 @@ public Map<String, String> getFunctionConfig() {
7376
public int getNumPartitions() {
7477
return _numPartitions;
7578
}
79+
80+
/**
81+
* Allow partition remapping for this column. This is used when you expand(usually multiple) the number of
82+
* partitions for a column. But want to keep the existing partition assignment.
83+
*/
84+
public boolean isAllowPartitionRemapping() {
85+
return _allowPartitionRemapping;
86+
}
7687
}

0 commit comments

Comments
 (0)