From 37e04eca81eb507caca2ff3456cd234c8e5695ef Mon Sep 17 00:00:00 2001 From: Andrew Schofield Date: Thu, 4 Sep 2025 18:46:12 +0100 Subject: [PATCH] KAFKA-19662: Allow resetting offset for unsubscribed topic in kafka-share-groups.sh (#20453) The `kafka-share-groups.sh` tool checks whether a topic already has a start-offset in the share group when resetting offsets. This is not necessary. By removing the check, it is possible to set a start offset for a topic which has not yet but will be subscribed in the future, thus initialising the consumption point. There is still a small piece of outstanding work to do with resetting the offset for a non-existent group which should also create the group. A subsequent PR will be used to address that. Reviewers: Jimmy Wang <48462172+JimmyWang6@users.noreply.github.com>, Lan Ding , Apoorv Mittal --- .../consumer/group/ShareGroupCommand.java | 12 ------- .../consumer/group/ShareGroupCommandTest.java | 32 +++++++++---------- 2 files changed, 16 insertions(+), 28 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java index df4353c467a..50565dcc0ba 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java @@ -418,18 +418,6 @@ public class ShareGroupCommand { if (opts.options.has(opts.topicOpt)) { partitionsToReset = offsetsUtils.parseTopicPartitionsToReset(opts.options.valuesOf(opts.topicOpt)); - Set subscribedTopics = offsetsByTopicPartitions.keySet().stream() - .map(TopicPartition::topic) - .collect(Collectors.toSet()); - Set resetTopics = partitionsToReset.stream() - .map(TopicPartition::topic) - .collect(Collectors.toSet()); - if (!subscribedTopics.containsAll(resetTopics)) { - CommandLineUtils - .printErrorAndExit(String.format("Share group '%s' is not subscribed to topic '%s'.", - groupId, resetTopics.stream().filter(topic -> !subscribedTopics.contains(topic)).collect(Collectors.joining(", ")))); - return null; - } } else { partitionsToReset = offsetsByTopicPartitions.keySet(); } diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java index 9333bbbb65e..03308d94c39 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.tools.consumer.group; - import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AdminClientTestUtils; import org.apache.kafka.clients.admin.AlterShareGroupOffsetsResult; @@ -1373,7 +1372,7 @@ public class ShareGroupCommandTest { } @Test - public void testAlterShareGroupFailureWithNonExistentTopic() { + public void testAlterShareGroupUnsubscribedTopicSuccess() { String group = "share-group"; String topic = "none"; String bootstrapServer = "localhost:9092"; @@ -1386,18 +1385,22 @@ public class ShareGroupCommandTest { KafkaFuture.completedFuture(Map.of(new TopicPartition("topic", 0), new OffsetAndMetadata(10L))) ) ); + when(adminClient.listShareGroupOffsets(any())).thenReturn(listShareGroupOffsetsResult); + + AlterShareGroupOffsetsResult alterShareGroupOffsetsResult = mockAlterShareGroupOffsets(adminClient, group); + TopicPartition tp0 = new TopicPartition(topic, 0); + Map partitionOffsets = Map.of(tp0, new OffsetAndMetadata(0L)); + ListOffsetsResult listOffsetsResult = AdminClientTestUtils.createListOffsetsResult(partitionOffsets); + when(adminClient.listOffsets(any(), any(ListOffsetsOptions.class))).thenReturn(listOffsetsResult); + ShareGroupDescription exp = new ShareGroupDescription( group, - List.of(new ShareMemberDescription("memid1", "clId1", "host1", new ShareMemberAssignment( - Set.of(new TopicPartition(topic, 0)) - ), 0)), + List.of(), GroupState.EMPTY, new Node(0, "host1", 9090), 0, 0); DescribeShareGroupsResult describeShareGroupsResult = mock(DescribeShareGroupsResult.class); when(describeShareGroupsResult.describedGroups()).thenReturn(Map.of(group, KafkaFuture.completedFuture(exp))); when(adminClient.describeShareGroups(any(), any(DescribeShareGroupsOptions.class))).thenReturn(describeShareGroupsResult); - AtomicBoolean exited = new AtomicBoolean(false); - when(adminClient.listShareGroupOffsets(any())).thenReturn(listShareGroupOffsetsResult); Map descriptions = Map.of( topic, new TopicDescription(topic, false, List.of( new TopicPartitionInfo(0, Node.noNode(), List.of(), List.of()) @@ -1406,15 +1409,12 @@ public class ShareGroupCommandTest { when(describeTopicResult.allTopicNames()).thenReturn(completedFuture(descriptions)); when(adminClient.describeTopics(anyCollection())).thenReturn(describeTopicResult); when(adminClient.describeTopics(anyCollection(), any(DescribeTopicsOptions.class))).thenReturn(describeTopicResult); - Exit.setExitProcedure(((statusCode, message) -> { - assertNotEquals(0, statusCode); - assertTrue(message.contains("Share group 'share-group' is not subscribed to topic 'none'")); - exited.set(true); - })); - try { - getShareGroupService(cgcArgs, adminClient).resetOffsets(); - } finally { - assertTrue(exited.get()); + try (ShareGroupService service = getShareGroupService(cgcArgs, adminClient)) { + service.resetOffsets(); + verify(adminClient).alterShareGroupOffsets(eq(group), anyMap()); + verify(adminClient).describeTopics(anyCollection(), any(DescribeTopicsOptions.class)); + verify(alterShareGroupOffsetsResult, times(1)).all(); + verify(adminClient).describeShareGroups(ArgumentMatchers.anyCollection(), any(DescribeShareGroupsOptions.class)); } }