Skip to content

Commit 37e04ec

Browse files
KAFKA-19662: Allow resetting offset for unsubscribed topic in kafka-share-groups.sh (#20453)
The `kafka-share-groups.sh` tool checks whether a topic already has a start-offset in the share group when resetting offsets. This is not necessary. By removing the check, it is possible to set a start offset for a topic which has not yet but will be subscribed in the future, thus initialising the consumption point. There is still a small piece of outstanding work to do with resetting the offset for a non-existent group which should also create the group. A subsequent PR will be used to address that. Reviewers: Jimmy Wang <[email protected]>, Lan Ding <[email protected]>, Apoorv Mittal <[email protected]>
1 parent 1d0c5f2 commit 37e04ec

File tree

2 files changed

+16
-28
lines changed

2 files changed

+16
-28
lines changed

tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -418,18 +418,6 @@ protected Map<TopicPartition, OffsetAndMetadata> prepareOffsetsToReset(String gr
418418

419419
if (opts.options.has(opts.topicOpt)) {
420420
partitionsToReset = offsetsUtils.parseTopicPartitionsToReset(opts.options.valuesOf(opts.topicOpt));
421-
Set<String> subscribedTopics = offsetsByTopicPartitions.keySet().stream()
422-
.map(TopicPartition::topic)
423-
.collect(Collectors.toSet());
424-
Set<String> resetTopics = partitionsToReset.stream()
425-
.map(TopicPartition::topic)
426-
.collect(Collectors.toSet());
427-
if (!subscribedTopics.containsAll(resetTopics)) {
428-
CommandLineUtils
429-
.printErrorAndExit(String.format("Share group '%s' is not subscribed to topic '%s'.",
430-
groupId, resetTopics.stream().filter(topic -> !subscribedTopics.contains(topic)).collect(Collectors.joining(", "))));
431-
return null;
432-
}
433421
} else {
434422
partitionsToReset = offsetsByTopicPartitions.keySet();
435423
}

tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
*/
1717
package org.apache.kafka.tools.consumer.group;
1818

19-
2019
import org.apache.kafka.clients.admin.Admin;
2120
import org.apache.kafka.clients.admin.AdminClientTestUtils;
2221
import org.apache.kafka.clients.admin.AlterShareGroupOffsetsResult;
@@ -1373,7 +1372,7 @@ public void testAlterShareGroupOffsetsArgsFailureWithoutResetOffsetsArgs() {
13731372
}
13741373

13751374
@Test
1376-
public void testAlterShareGroupFailureWithNonExistentTopic() {
1375+
public void testAlterShareGroupUnsubscribedTopicSuccess() {
13771376
String group = "share-group";
13781377
String topic = "none";
13791378
String bootstrapServer = "localhost:9092";
@@ -1386,18 +1385,22 @@ public void testAlterShareGroupFailureWithNonExistentTopic() {
13861385
KafkaFuture.completedFuture(Map.of(new TopicPartition("topic", 0), new OffsetAndMetadata(10L)))
13871386
)
13881387
);
1388+
when(adminClient.listShareGroupOffsets(any())).thenReturn(listShareGroupOffsetsResult);
1389+
1390+
AlterShareGroupOffsetsResult alterShareGroupOffsetsResult = mockAlterShareGroupOffsets(adminClient, group);
1391+
TopicPartition tp0 = new TopicPartition(topic, 0);
1392+
Map<TopicPartition, OffsetAndMetadata> partitionOffsets = Map.of(tp0, new OffsetAndMetadata(0L));
1393+
ListOffsetsResult listOffsetsResult = AdminClientTestUtils.createListOffsetsResult(partitionOffsets);
1394+
when(adminClient.listOffsets(any(), any(ListOffsetsOptions.class))).thenReturn(listOffsetsResult);
1395+
13891396
ShareGroupDescription exp = new ShareGroupDescription(
13901397
group,
1391-
List.of(new ShareMemberDescription("memid1", "clId1", "host1", new ShareMemberAssignment(
1392-
Set.of(new TopicPartition(topic, 0))
1393-
), 0)),
1398+
List.of(),
13941399
GroupState.EMPTY,
13951400
new Node(0, "host1", 9090), 0, 0);
13961401
DescribeShareGroupsResult describeShareGroupsResult = mock(DescribeShareGroupsResult.class);
13971402
when(describeShareGroupsResult.describedGroups()).thenReturn(Map.of(group, KafkaFuture.completedFuture(exp)));
13981403
when(adminClient.describeShareGroups(any(), any(DescribeShareGroupsOptions.class))).thenReturn(describeShareGroupsResult);
1399-
AtomicBoolean exited = new AtomicBoolean(false);
1400-
when(adminClient.listShareGroupOffsets(any())).thenReturn(listShareGroupOffsetsResult);
14011404
Map<String, TopicDescription> descriptions = Map.of(
14021405
topic, new TopicDescription(topic, false, List.of(
14031406
new TopicPartitionInfo(0, Node.noNode(), List.of(), List.of())
@@ -1406,15 +1409,12 @@ topic, new TopicDescription(topic, false, List.of(
14061409
when(describeTopicResult.allTopicNames()).thenReturn(completedFuture(descriptions));
14071410
when(adminClient.describeTopics(anyCollection())).thenReturn(describeTopicResult);
14081411
when(adminClient.describeTopics(anyCollection(), any(DescribeTopicsOptions.class))).thenReturn(describeTopicResult);
1409-
Exit.setExitProcedure(((statusCode, message) -> {
1410-
assertNotEquals(0, statusCode);
1411-
assertTrue(message.contains("Share group 'share-group' is not subscribed to topic 'none'"));
1412-
exited.set(true);
1413-
}));
1414-
try {
1415-
getShareGroupService(cgcArgs, adminClient).resetOffsets();
1416-
} finally {
1417-
assertTrue(exited.get());
1412+
try (ShareGroupService service = getShareGroupService(cgcArgs, adminClient)) {
1413+
service.resetOffsets();
1414+
verify(adminClient).alterShareGroupOffsets(eq(group), anyMap());
1415+
verify(adminClient).describeTopics(anyCollection(), any(DescribeTopicsOptions.class));
1416+
verify(alterShareGroupOffsetsResult, times(1)).all();
1417+
verify(adminClient).describeShareGroups(ArgumentMatchers.anyCollection(), any(DescribeShareGroupsOptions.class));
14181418
}
14191419
}
14201420

0 commit comments

Comments
 (0)