mirror of https://github.com/apache/kafka.git
MINOR: add test for `kafka-consumer-groups.sh` should not fail when partition offline (#20235)
CI / build (push) Waiting to run
Details
CI / build (push) Waiting to run
Details
See: https://github.com/apache/kafka/pull/20168#discussion_r2227310093 add follow test case: Given a topic with three partitions, where partition `t-2` is offline, if partitionsToReset contains only `t-1`, the method filterNoneLeaderPartitions incorrectly includes `t-2` in the result, leading to a failure in the tool. Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Jhen-Yung Hsu <jhenyunghsu@gmail.com>, Ken Huang <s7133700@gmail.com>, Andrew Schofield <aschofield@confluent.io>
This commit is contained in:
parent
d0a9a04a02
commit
dfaf9f9cf7
|
@ -52,6 +52,7 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
|
@ -153,6 +154,28 @@ public class ResetConsumerGroupOffsetTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ClusterTest(
|
||||||
|
brokers = 2,
|
||||||
|
serverProperties = {
|
||||||
|
@ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "2"),
|
||||||
|
}
|
||||||
|
)
|
||||||
|
public void testResetOffsetsWithOfflinePartitionNotInResetTarget(ClusterInstance cluster) throws Exception {
|
||||||
|
String topic = generateRandomTopic();
|
||||||
|
String group = "new.group";
|
||||||
|
String[] args = buildArgsForGroup(cluster, group, "--to-earliest", "--execute", "--topic", topic + ":0");
|
||||||
|
|
||||||
|
try (Admin admin = cluster.admin(); ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(args)) {
|
||||||
|
admin.createTopics(List.of(new NewTopic(topic, Map.of(0, List.of(0), 1, List.of(1)))));
|
||||||
|
cluster.waitTopicCreation(topic, 2);
|
||||||
|
|
||||||
|
cluster.shutdownBroker(1);
|
||||||
|
|
||||||
|
Map<TopicPartition, OffsetAndMetadata> resetOffsets = service.resetOffsets().get(group);
|
||||||
|
assertEquals(Set.of(new TopicPartition(topic, 0)), resetOffsets.keySet());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@ClusterTest
|
@ClusterTest
|
||||||
public void testResetOffsetsExistingTopic(ClusterInstance cluster) {
|
public void testResetOffsetsExistingTopic(ClusterInstance cluster) {
|
||||||
String topic = generateRandomTopic();
|
String topic = generateRandomTopic();
|
||||||
|
|
Loading…
Reference in New Issue