diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java index 5fb704cf53d..5bf9da0c370 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java @@ -52,6 +52,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Properties; +import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.function.Function; import java.util.function.Supplier; @@ -153,6 +154,28 @@ public class ResetConsumerGroupOffsetTest { } } + @ClusterTest( + brokers = 2, + serverProperties = { + @ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "2"), + } + ) + public void testResetOffsetsWithOfflinePartitionNotInResetTarget(ClusterInstance cluster) throws Exception { + String topic = generateRandomTopic(); + String group = "new.group"; + String[] args = buildArgsForGroup(cluster, group, "--to-earliest", "--execute", "--topic", topic + ":0"); + + try (Admin admin = cluster.admin(); ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(args)) { + admin.createTopics(List.of(new NewTopic(topic, Map.of(0, List.of(0), 1, List.of(1))))); + cluster.waitTopicCreation(topic, 2); + + cluster.shutdownBroker(1); + + Map resetOffsets = service.resetOffsets().get(group); + assertEquals(Set.of(new TopicPartition(topic, 0)), resetOffsets.keySet()); + } + } + @ClusterTest public void testResetOffsetsExistingTopic(ClusterInstance cluster) { String topic = generateRandomTopic();