mirror of https://github.com/apache/kafka.git
KAFKA-19500: `kafka-consumer-groups.sh` should fail quickly if the partition leader is unavailable (#20168)
1. Add check leader missing logic in method `ConsumerGroupCommand.ConsumerGroupService#prepareOffsetsToReset` in order to fail quickly 2. Add some tests Reviewers: TaiJuWu <tjwu1217@gmail.com>, Lan Ding <isDing_L@163.com>, Ken Huang <s7133700@gmail.com>, Andrew Schofield <aschofield@confluent.io>
This commit is contained in:
parent
ef07b5fad1
commit
f188a31124
|
@ -41,6 +41,8 @@ import org.apache.kafka.common.KafkaFuture;
|
|||
import org.apache.kafka.common.Node;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.errors.GroupIdNotFoundException;
|
||||
import org.apache.kafka.common.errors.LeaderNotAvailableException;
|
||||
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
|
||||
import org.apache.kafka.common.protocol.Errors;
|
||||
import org.apache.kafka.common.utils.Utils;
|
||||
import org.apache.kafka.server.util.CommandLineUtils;
|
||||
|
@ -1000,6 +1002,9 @@ public class ConsumerGroupCommand {
|
|||
}
|
||||
|
||||
private Map<TopicPartition, OffsetAndMetadata> prepareOffsetsToReset(String groupId, Collection<TopicPartition> partitionsToReset) {
|
||||
// ensure all partitions are valid, otherwise throw a runtime exception
|
||||
checkAllTopicPartitionsValid(partitionsToReset);
|
||||
|
||||
if (opts.options.has(opts.resetToOffsetOpt)) {
|
||||
return offsetsUtils.resetToOffset(partitionsToReset);
|
||||
} else if (opts.options.has(opts.resetToEarliestOpt)) {
|
||||
|
@ -1024,6 +1029,38 @@ public class ConsumerGroupCommand {
|
|||
return null;
|
||||
}
|
||||
|
||||
private void checkAllTopicPartitionsValid(Collection<TopicPartition> partitionsToReset) {
|
||||
// check the partitions exist
|
||||
List<TopicPartition> partitionsNotExistList = filterNonExistentPartitions(partitionsToReset);
|
||||
if (!partitionsNotExistList.isEmpty()) {
|
||||
String partitionStr = partitionsNotExistList.stream().map(TopicPartition::toString).collect(Collectors.joining(","));
|
||||
throw new UnknownTopicOrPartitionException("The partitions \"" + partitionStr + "\" do not exist");
|
||||
}
|
||||
|
||||
// check the partitions have leader
|
||||
List<TopicPartition> partitionsWithoutLeader = filterNoneLeaderPartitions(partitionsToReset);
|
||||
if (!partitionsWithoutLeader.isEmpty()) {
|
||||
String partitionStr = partitionsWithoutLeader.stream().map(TopicPartition::toString).collect(Collectors.joining(","));
|
||||
throw new LeaderNotAvailableException("The partitions \"" + partitionStr + "\" have no leader");
|
||||
}
|
||||
}
|
||||
|
||||
private List<TopicPartition> filterNonExistentPartitions(Collection<TopicPartition> topicPartitions) {
|
||||
// collect all topics
|
||||
Set<String> topics = topicPartitions.stream().map(TopicPartition::topic).collect(Collectors.toSet());
|
||||
try {
|
||||
List<TopicPartition> existPartitions = adminClient.describeTopics(topics).allTopicNames().get().entrySet()
|
||||
.stream()
|
||||
.flatMap(entry -> entry.getValue().partitions().stream()
|
||||
.map(partitionInfo -> new TopicPartition(entry.getKey(), partitionInfo.partition())))
|
||||
.toList();
|
||||
|
||||
return topicPartitions.stream().filter(element -> !existPartitions.contains(element)).toList();
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
String exportOffsetsToCsv(Map<String, Map<TopicPartition, OffsetAndMetadata>> assignments) {
|
||||
boolean isSingleGroupQuery = opts.options.valuesOf(opts.groupOpt).size() == 1;
|
||||
ObjectWriter csvWriter = isSingleGroupQuery
|
||||
|
|
|
@ -62,6 +62,7 @@ import java.util.stream.IntStream;
|
|||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.anySet;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
@ -234,6 +235,8 @@ public class ConsumerGroupServiceTest {
|
|||
.thenReturn(describeGroupsResult(GroupState.DEAD));
|
||||
when(admin.describeTopics(ArgumentMatchers.eq(topicsWithoutPartitionsSpecified), any()))
|
||||
.thenReturn(describeTopicsResult(topicsWithoutPartitionsSpecified));
|
||||
when(admin.describeTopics(anySet()))
|
||||
.thenReturn(describeTopicsResult(TOPICS));
|
||||
when(admin.listOffsets(offsetsArgMatcher(), any()))
|
||||
.thenReturn(listOffsetsResult());
|
||||
|
||||
|
@ -317,7 +320,7 @@ public class ConsumerGroupServiceTest {
|
|||
|
||||
topics.forEach(topic -> {
|
||||
List<TopicPartitionInfo> partitions = IntStream.range(0, NUM_PARTITIONS)
|
||||
.mapToObj(i -> new TopicPartitionInfo(i, null, Collections.emptyList(), Collections.emptyList()))
|
||||
.mapToObj(i -> new TopicPartitionInfo(i, Node.noNode(), Collections.emptyList(), Collections.emptyList()))
|
||||
.collect(Collectors.toList());
|
||||
topicDescriptions.put(topic, new TopicDescription(topic, false, partitions));
|
||||
});
|
||||
|
|
|
@ -27,6 +27,8 @@ import org.apache.kafka.clients.producer.Producer;
|
|||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.apache.kafka.common.GroupState;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.errors.LeaderNotAvailableException;
|
||||
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
|
||||
import org.apache.kafka.common.serialization.ByteArraySerializer;
|
||||
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||
import org.apache.kafka.common.test.ClusterInstance;
|
||||
|
@ -81,6 +83,7 @@ import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_IN
|
|||
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
|
||||
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
|
||||
import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
|
||||
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
@ -659,6 +662,41 @@ public class ResetConsumerGroupOffsetTest {
|
|||
assertThrows(OptionException.class, () -> getConsumerGroupService(cgcArgs));
|
||||
}
|
||||
|
||||
@ClusterTest(brokers = 3, serverProperties = {@ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "2")})
|
||||
public void testResetOffsetsWithPartitionNoneLeader(ClusterInstance cluster) throws Exception {
|
||||
String group = generateRandomGroupId();
|
||||
String topic = generateRandomTopic();
|
||||
String[] args = buildArgsForGroup(cluster, group, "--topic", topic + ":0,1,2",
|
||||
"--to-earliest", "--execute");
|
||||
|
||||
try (Admin admin = cluster.admin();
|
||||
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(args)) {
|
||||
|
||||
admin.createTopics(singleton(new NewTopic(topic, 3, (short) 1))).all().get();
|
||||
produceConsumeAndShutdown(cluster, topic, group, 2, GroupProtocol.CLASSIC);
|
||||
assertDoesNotThrow(() -> resetOffsets(service));
|
||||
// shutdown a broker to make some partitions missing leader
|
||||
cluster.shutdownBroker(0);
|
||||
assertThrows(LeaderNotAvailableException.class, () -> resetOffsets(service));
|
||||
}
|
||||
}
|
||||
|
||||
@ClusterTest
|
||||
public void testResetOffsetsWithPartitionNotExist(ClusterInstance cluster) throws Exception {
|
||||
String group = generateRandomGroupId();
|
||||
String topic = generateRandomTopic();
|
||||
String[] args = buildArgsForGroup(cluster, group, "--topic", topic + ":2,3",
|
||||
"--to-earliest", "--execute");
|
||||
|
||||
try (Admin admin = cluster.admin();
|
||||
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(args)) {
|
||||
|
||||
admin.createTopics(singleton(new NewTopic(topic, 1, (short) 1))).all().get();
|
||||
produceConsumeAndShutdown(cluster, topic, group, 2, GroupProtocol.CLASSIC);
|
||||
assertThrows(UnknownTopicOrPartitionException.class, () -> resetOffsets(service));
|
||||
}
|
||||
}
|
||||
|
||||
private String generateRandomTopic() {
|
||||
return TOPIC_PREFIX + TestUtils.randomString(10);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue