From f188a311243eb2801474e089d32fa77db76b6dc4 Mon Sep 17 00:00:00 2001 From: xijiu <422766572@qq.com> Date: Mon, 21 Jul 2025 16:25:40 +0800 Subject: [PATCH] KAFKA-19500: `kafka-consumer-groups.sh` should fail quickly if the partition leader is unavailable (#20168) 1. Add check leader missing logic in method `ConsumerGroupCommand.ConsumerGroupService#prepareOffsetsToReset` in order to fail quickly 2. Add some tests Reviewers: TaiJuWu , Lan Ding , Ken Huang , Andrew Schofield --- .../consumer/group/ConsumerGroupCommand.java | 37 ++++++++++++++++++ .../group/ConsumerGroupServiceTest.java | 5 ++- .../group/ResetConsumerGroupOffsetTest.java | 38 +++++++++++++++++++ 3 files changed, 79 insertions(+), 1 deletion(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java index da3ccff9260..6a0d1ffd224 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java @@ -41,6 +41,8 @@ import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.GroupIdNotFoundException; +import org.apache.kafka.common.errors.LeaderNotAvailableException; +import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.server.util.CommandLineUtils; @@ -1000,6 +1002,9 @@ public class ConsumerGroupCommand { } private Map prepareOffsetsToReset(String groupId, Collection partitionsToReset) { + // ensure all partitions are valid, otherwise throw a runtime exception + checkAllTopicPartitionsValid(partitionsToReset); + if (opts.options.has(opts.resetToOffsetOpt)) { return offsetsUtils.resetToOffset(partitionsToReset); } else if (opts.options.has(opts.resetToEarliestOpt)) { @@ -1024,6 +1029,38 @@ public class ConsumerGroupCommand { return null; } + private void checkAllTopicPartitionsValid(Collection partitionsToReset) { + // check the partitions exist + List partitionsNotExistList = filterNonExistentPartitions(partitionsToReset); + if (!partitionsNotExistList.isEmpty()) { + String partitionStr = partitionsNotExistList.stream().map(TopicPartition::toString).collect(Collectors.joining(",")); + throw new UnknownTopicOrPartitionException("The partitions \"" + partitionStr + "\" do not exist"); + } + + // check the partitions have leader + List partitionsWithoutLeader = filterNoneLeaderPartitions(partitionsToReset); + if (!partitionsWithoutLeader.isEmpty()) { + String partitionStr = partitionsWithoutLeader.stream().map(TopicPartition::toString).collect(Collectors.joining(",")); + throw new LeaderNotAvailableException("The partitions \"" + partitionStr + "\" have no leader"); + } + } + + private List filterNonExistentPartitions(Collection topicPartitions) { + // collect all topics + Set topics = topicPartitions.stream().map(TopicPartition::topic).collect(Collectors.toSet()); + try { + List existPartitions = adminClient.describeTopics(topics).allTopicNames().get().entrySet() + .stream() + .flatMap(entry -> entry.getValue().partitions().stream() + .map(partitionInfo -> new TopicPartition(entry.getKey(), partitionInfo.partition()))) + .toList(); + + return topicPartitions.stream().filter(element -> !existPartitions.contains(element)).toList(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + String exportOffsetsToCsv(Map> assignments) { boolean isSingleGroupQuery = opts.options.valuesOf(opts.groupOpt).size() == 1; ObjectWriter csvWriter = isSingleGroupQuery diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java index 0ddba4346ba..242aa1bc894 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java @@ -62,6 +62,7 @@ import java.util.stream.IntStream; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anySet; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -234,6 +235,8 @@ public class ConsumerGroupServiceTest { .thenReturn(describeGroupsResult(GroupState.DEAD)); when(admin.describeTopics(ArgumentMatchers.eq(topicsWithoutPartitionsSpecified), any())) .thenReturn(describeTopicsResult(topicsWithoutPartitionsSpecified)); + when(admin.describeTopics(anySet())) + .thenReturn(describeTopicsResult(TOPICS)); when(admin.listOffsets(offsetsArgMatcher(), any())) .thenReturn(listOffsetsResult()); @@ -317,7 +320,7 @@ public class ConsumerGroupServiceTest { topics.forEach(topic -> { List partitions = IntStream.range(0, NUM_PARTITIONS) - .mapToObj(i -> new TopicPartitionInfo(i, null, Collections.emptyList(), Collections.emptyList())) + .mapToObj(i -> new TopicPartitionInfo(i, Node.noNode(), Collections.emptyList(), Collections.emptyList())) .collect(Collectors.toList()); topicDescriptions.put(topic, new TopicDescription(topic, false, partitions)); }); diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java index 3c417cdeeac..5fb704cf53d 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java @@ -27,6 +27,8 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.GroupState; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.LeaderNotAvailableException; +import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.test.ClusterInstance; @@ -81,6 +83,7 @@ import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_IN import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG; import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG; import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -659,6 +662,41 @@ public class ResetConsumerGroupOffsetTest { assertThrows(OptionException.class, () -> getConsumerGroupService(cgcArgs)); } + @ClusterTest(brokers = 3, serverProperties = {@ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "2")}) + public void testResetOffsetsWithPartitionNoneLeader(ClusterInstance cluster) throws Exception { + String group = generateRandomGroupId(); + String topic = generateRandomTopic(); + String[] args = buildArgsForGroup(cluster, group, "--topic", topic + ":0,1,2", + "--to-earliest", "--execute"); + + try (Admin admin = cluster.admin(); + ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(args)) { + + admin.createTopics(singleton(new NewTopic(topic, 3, (short) 1))).all().get(); + produceConsumeAndShutdown(cluster, topic, group, 2, GroupProtocol.CLASSIC); + assertDoesNotThrow(() -> resetOffsets(service)); + // shutdown a broker to make some partitions missing leader + cluster.shutdownBroker(0); + assertThrows(LeaderNotAvailableException.class, () -> resetOffsets(service)); + } + } + + @ClusterTest + public void testResetOffsetsWithPartitionNotExist(ClusterInstance cluster) throws Exception { + String group = generateRandomGroupId(); + String topic = generateRandomTopic(); + String[] args = buildArgsForGroup(cluster, group, "--topic", topic + ":2,3", + "--to-earliest", "--execute"); + + try (Admin admin = cluster.admin(); + ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(args)) { + + admin.createTopics(singleton(new NewTopic(topic, 1, (short) 1))).all().get(); + produceConsumeAndShutdown(cluster, topic, group, 2, GroupProtocol.CLASSIC); + assertThrows(UnknownTopicOrPartitionException.class, () -> resetOffsets(service)); + } + } + private String generateRandomTopic() { return TOPIC_PREFIX + TestUtils.randomString(10); }