From dfaf9f9cf71fd47cd74ff5e9950dbd1c1f2bda9a Mon Sep 17 00:00:00 2001 From: TaiJuWu Date: Thu, 31 Jul 2025 22:54:27 +0800 Subject: [PATCH] MINOR: add test for `kafka-consumer-groups.sh` should not fail when partition offline (#20235) See: https://github.com/apache/kafka/pull/20168#discussion_r2227310093 add follow test case: Given a topic with three partitions, where partition `t-2` is offline, if partitionsToReset contains only `t-1`, the method filterNoneLeaderPartitions incorrectly includes `t-2` in the result, leading to a failure in the tool. Reviewers: Chia-Ping Tsai , Jhen-Yung Hsu , Ken Huang , Andrew Schofield --- .../group/ResetConsumerGroupOffsetTest.java | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java index 5fb704cf53d..5bf9da0c370 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java @@ -52,6 +52,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Properties; +import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.function.Function; import java.util.function.Supplier; @@ -153,6 +154,28 @@ public class ResetConsumerGroupOffsetTest { } } + @ClusterTest( + brokers = 2, + serverProperties = { + @ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "2"), + } + ) + public void testResetOffsetsWithOfflinePartitionNotInResetTarget(ClusterInstance cluster) throws Exception { + String topic = generateRandomTopic(); + String group = "new.group"; + String[] args = buildArgsForGroup(cluster, group, "--to-earliest", "--execute", "--topic", topic + ":0"); + + try (Admin admin = cluster.admin(); ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(args)) { + admin.createTopics(List.of(new NewTopic(topic, Map.of(0, List.of(0), 1, List.of(1))))); + cluster.waitTopicCreation(topic, 2); + + cluster.shutdownBroker(1); + + Map resetOffsets = service.resetOffsets().get(group); + assertEquals(Set.of(new TopicPartition(topic, 0)), resetOffsets.keySet()); + } + } + @ClusterTest public void testResetOffsetsExistingTopic(ClusterInstance cluster) { String topic = generateRandomTopic();