From 3327435c8d5e564232707bac5fdc4706fd93b7d2 Mon Sep 17 00:00:00 2001 From: Ken Huang <100591800+m1a2st@users.noreply.github.com> Date: Thu, 30 May 2024 22:51:16 +0900 Subject: [PATCH] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra (#15779) Reviewers: Chia-Ping Tsai --- .../group/ResetConsumerGroupOffsetTest.java | 1049 +++++++++++------ 1 file changed, 673 insertions(+), 376 deletions(-) diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java index e86c61ee0ab..c493a0c8ebc 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java @@ -17,36 +17,64 @@ package org.apache.kafka.tools.consumer.group; import joptsimple.OptionException; -import org.apache.kafka.clients.consumer.Consumer; +import kafka.test.ClusterConfig; +import kafka.test.ClusterInstance; +import kafka.test.annotation.ClusterTemplate; +import kafka.test.junit.ClusterTestExtensions; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.consumer.GroupProtocol; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.RangeAssignor; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.ConsumerGroupState; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.test.TestUtils; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; import java.io.BufferedWriter; import java.io.File; import java.io.FileWriter; -import java.text.SimpleDateFormat; +import java.io.IOException; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; import java.util.ArrayList; -import java.util.Arrays; -import java.util.Calendar; -import java.util.Collection; -import java.util.Collections; -import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Optional; import java.util.Properties; -import java.util.Set; +import java.util.concurrent.ExecutionException; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.IntStream; +import static java.time.LocalDateTime.now; +import static java.util.Arrays.asList; +import static java.util.Collections.singleton; +import static java.util.Collections.singletonList; +import static java.util.Collections.singletonMap; +import static java.util.stream.Collectors.toMap; +import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.admin.AdminClientConfig.RETRIES_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG; import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -62,506 +90,775 @@ import static org.junit.jupiter.api.Assertions.assertTrue; * - scope=topics+partitions, scenario=to-earliest * - export/import */ -public class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest { - private String[] basicArgs() { +@ExtendWith(value = ClusterTestExtensions.class) +public class ResetConsumerGroupOffsetTest { + + private static final String TOPIC_PREFIX = "foo-"; + private static final String GROUP_PREFIX = "test.group-"; + + private static List generator() { + return ConsumerGroupCommandTestUtils.generator(); + } + + private String[] basicArgs(ClusterInstance cluster) { return new String[]{"--reset-offsets", - "--bootstrap-server", bootstrapServers(listenerName()), + "--bootstrap-server", cluster.bootstrapServers(), "--timeout", Long.toString(DEFAULT_MAX_WAIT_MS)}; } - private String[] buildArgsForGroups(List groups, String...args) { - List res = new ArrayList<>(Arrays.asList(basicArgs())); + private String[] buildArgsForGroups(ClusterInstance cluster, List groups, String... args) { + List res = new ArrayList<>(asList(basicArgs(cluster))); for (String group : groups) { res.add("--group"); res.add(group); } - res.addAll(Arrays.asList(args)); + res.addAll(asList(args)); return res.toArray(new String[0]); } - private String[] buildArgsForGroup(String group, String...args) { - return buildArgsForGroups(Collections.singletonList(group), args); + private String[] buildArgsForGroup(ClusterInstance cluster, String group, String... args) { + return buildArgsForGroups(cluster, singletonList(group), args); } - private String[] buildArgsForAllGroups(String...args) { - List res = new ArrayList<>(Arrays.asList(basicArgs())); + private String[] buildArgsForAllGroups(ClusterInstance cluster, String... args) { + List res = new ArrayList<>(asList(basicArgs(cluster))); res.add("--all-groups"); - res.addAll(Arrays.asList(args)); + res.addAll(asList(args)); return res.toArray(new String[0]); } - @Test - public void testResetOffsetsNotExistingGroup() throws Exception { + @ClusterTemplate("generator") + public void testResetOffsetsNotExistingGroup(ClusterInstance cluster) throws Exception { + String topic = generateRandomTopic(); String group = "missing.group"; - String[] args = buildArgsForGroup(group, "--all-topics", "--to-current", "--execute"); - ConsumerGroupCommand.ConsumerGroupService consumerGroupCommand = getConsumerGroupService(args); - // Make sure we got a coordinator - TestUtils.waitForCondition( - () -> Objects.equals(consumerGroupCommand.collectGroupState(group).coordinator.host(), "localhost"), - "Can't find a coordinator"); - Map resetOffsets = consumerGroupCommand.resetOffsets().get(group); - assertTrue(resetOffsets.isEmpty()); - assertTrue(committedOffsets(TOPIC, group).isEmpty()); + String[] args = buildArgsForGroup(cluster, group, "--all-topics", "--to-current", "--execute"); + + try (ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(args)) { + // Make sure we got a coordinator + TestUtils.waitForCondition( + () -> "localhost".equals(service.collectGroupState(group).coordinator.host()), + "Can't find a coordinator"); + Map resetOffsets = service.resetOffsets().get(group); + assertTrue(resetOffsets.isEmpty()); + assertTrue(committedOffsets(cluster, topic, group).isEmpty()); + } } - @Test - public void testResetOffsetsExistingTopic() { + @ClusterTemplate("generator") + public void testResetOffsetsExistingTopic(ClusterInstance cluster) { + String topic = generateRandomTopic(); String group = "new.group"; - String[] args = buildArgsForGroup(group, "--topic", TOPIC, "--to-offset", "50"); - produceMessages(TOPIC, 100); - resetAndAssertOffsets(args, 50, true, Collections.singletonList(TOPIC)); - resetAndAssertOffsets(addTo(args, "--dry-run"), 50, true, Collections.singletonList(TOPIC)); - resetAndAssertOffsets(addTo(args, "--execute"), 50, false, Collections.singletonList(TOPIC)); + String[] args = buildArgsForGroup(cluster, group, "--topic", topic, "--to-offset", "50"); + + produceMessages(cluster, topic, 100); + resetAndAssertOffsets(cluster, args, 50, true, singletonList(topic)); + resetAndAssertOffsets(cluster, addTo(args, "--dry-run"), + 50, true, singletonList(topic)); + resetAndAssertOffsets(cluster, addTo(args, "--execute"), + 50, false, singletonList(topic)); } - @Test - public void testResetOffsetsExistingTopicSelectedGroups() throws Exception { - produceMessages(TOPIC, 100); - List groups = IntStream.rangeClosed(1, 3).mapToObj(id -> GROUP + id).collect(Collectors.toList()); - for (String group : groups) { - ConsumerGroupExecutor executor = addConsumerGroupExecutor(1, TOPIC, group, GroupProtocol.CLASSIC.name); - awaitConsumerProgress(TOPIC, group, 100L); - executor.shutdown(); - } - String[] args = buildArgsForGroups(groups, "--topic", TOPIC, "--to-offset", "50"); - resetAndAssertOffsets(args, 50, true, Collections.singletonList(TOPIC)); - resetAndAssertOffsets(addTo(args, "--dry-run"), 50, true, Collections.singletonList(TOPIC)); - resetAndAssertOffsets(addTo(args, "--execute"), 50, false, Collections.singletonList(TOPIC)); - } + @ClusterTemplate("generator") + public void testResetOffsetsExistingTopicSelectedGroups(ClusterInstance cluster) throws Exception { + for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) { + String topic = generateRandomTopic(); - @Test - public void testResetOffsetsExistingTopicAllGroups() throws Exception { - String[] args = buildArgsForAllGroups("--topic", TOPIC, "--to-offset", "50"); - produceMessages(TOPIC, 100); - for (int i = 1; i <= 3; i++) { - String group = GROUP + i; - ConsumerGroupExecutor executor = addConsumerGroupExecutor(1, TOPIC, group, GroupProtocol.CLASSIC.name); - awaitConsumerProgress(TOPIC, group, 100L); - executor.shutdown(); - } - resetAndAssertOffsets(args, 50, true, Collections.singletonList(TOPIC)); - resetAndAssertOffsets(addTo(args, "--dry-run"), 50, true, Collections.singletonList(TOPIC)); - resetAndAssertOffsets(addTo(args, "--execute"), 50, false, Collections.singletonList(TOPIC)); - } - - @Test - public void testResetOffsetsAllTopicsAllGroups() throws Exception { - String[] args = buildArgsForAllGroups("--all-topics", "--to-offset", "50"); - List topics = IntStream.rangeClosed(1, 3).mapToObj(i -> TOPIC + i).collect(Collectors.toList()); - List groups = IntStream.rangeClosed(1, 3).mapToObj(i -> GROUP + i).collect(Collectors.toList()); - topics.forEach(topic -> produceMessages(topic, 100)); - - for (String topic : topics) { + produceMessages(cluster, topic, 100); + List groups = generateIds(topic); for (String group : groups) { - ConsumerGroupExecutor executor = addConsumerGroupExecutor(3, topic, group, GroupProtocol.CLASSIC.name); - awaitConsumerProgress(topic, group, 100); - executor.shutdown(); + try (AutoCloseable consumerGroupCloseable = + consumerGroupClosable(cluster, 1, topic, group, groupProtocol)) { + awaitConsumerProgress(cluster, topic, group, 100L); + } + } + + String[] args = buildArgsForGroups(cluster, groups, "--topic", topic, "--to-offset", "50"); + resetAndAssertOffsets(cluster, args, 50, true, singletonList(topic)); + resetAndAssertOffsets(cluster, addTo(args, "--dry-run"), + 50, true, singletonList(topic)); + resetAndAssertOffsets(cluster, addTo(args, "--execute"), + 50, false, singletonList(topic)); + } + } + + @ClusterTemplate("generator") + public void testResetOffsetsExistingTopicAllGroups(ClusterInstance cluster) throws Exception { + for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) { + String topic = generateRandomTopic(); + String[] args = buildArgsForAllGroups(cluster, "--topic", topic, "--to-offset", "50"); + + produceMessages(cluster, topic, 100); + for (int i = 1; i <= 3; i++) { + String group = generateRandomGroupId(); + try (AutoCloseable consumerGroupCloseable = + consumerGroupClosable(cluster, 1, topic, group, groupProtocol)) { + awaitConsumerProgress(cluster, topic, group, 100L); + } + } + resetAndAssertOffsets(cluster, args, 50, true, singletonList(topic)); + resetAndAssertOffsets(cluster, addTo(args, "--dry-run"), + 50, true, singletonList(topic)); + resetAndAssertOffsets(cluster, addTo(args, "--execute"), + 50, false, singletonList(topic)); + } + } + + @ClusterTemplate("generator") + public void testResetOffsetsAllTopicsAllGroups(ClusterInstance cluster) throws Exception { + for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) { + String groupId = generateRandomGroupId(); + String topicId = generateRandomTopic(); + + String[] args = buildArgsForAllGroups(cluster, "--all-topics", "--to-offset", "50"); + List topics = generateIds(groupId); + List groups = generateIds(topicId); + topics.forEach(topic -> produceMessages(cluster, topic, 100)); + + for (String topic : topics) { + for (String group : groups) { + try (AutoCloseable consumerGroupCloseable = + consumerGroupClosable(cluster, 3, topic, group, groupProtocol)) { + awaitConsumerProgress(cluster, topic, group, 100); + } + } + } + + resetAndAssertOffsets(cluster, args, 50, true, topics); + resetAndAssertOffsets(cluster, addTo(args, "--dry-run"), + 50, true, topics); + resetAndAssertOffsets(cluster, addTo(args, "--execute"), + 50, false, topics); + + try (Admin admin = cluster.createAdminClient()) { + admin.deleteConsumerGroups(groups).all().get(); } } - resetAndAssertOffsets(args, 50, true, topics); - resetAndAssertOffsets(addTo(args, "--dry-run"), 50, true, topics); - resetAndAssertOffsets(addTo(args, "--execute"), 50, false, topics); } - @Test - public void testResetOffsetsToLocalDateTime() throws Exception { - SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS"); - Calendar calendar = Calendar.getInstance(); - calendar.add(Calendar.DATE, -1); + @ClusterTemplate("generator") + public void testResetOffsetsToLocalDateTime(ClusterInstance cluster) throws Exception { + for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) { + String group = generateRandomGroupId(); + String topic = generateRandomTopic(); - produceMessages(TOPIC, 100); + DateTimeFormatter format = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS"); + LocalDateTime dateTime = now().minusDays(1); + String[] args = buildArgsForGroup(cluster, group, + "--all-topics", "--to-datetime", + format.format(dateTime), "--execute"); - ConsumerGroupExecutor executor = addConsumerGroupExecutor(1, TOPIC, GROUP, GroupProtocol.CLASSIC.name); - awaitConsumerProgress(TOPIC, GROUP, 100L); - executor.shutdown(); + produceMessages(cluster, topic, 100); - String[] args = buildArgsForGroup(GROUP, "--all-topics", "--to-datetime", format.format(calendar.getTime()), "--execute"); - resetAndAssertOffsets(args, 0); + try (AutoCloseable consumerGroupCloseable = + consumerGroupClosable(cluster, 1, topic, group, groupProtocol)) { + awaitConsumerProgress(cluster, topic, group, 100L); + } + + resetAndAssertOffsets(cluster, topic, args, 0); + } } - @Test - public void testResetOffsetsToZonedDateTime() throws Exception { - SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX"); + @ClusterTemplate("generator") + public void testResetOffsetsToZonedDateTime(ClusterInstance cluster) throws Exception { + for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) { + String group = generateRandomGroupId(); + String topic = generateRandomTopic(); + DateTimeFormatter format = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSXXX"); - produceMessages(TOPIC, 50); - Date checkpoint = new Date(); - produceMessages(TOPIC, 50); + produceMessages(cluster, topic, 50); + ZonedDateTime checkpoint = now().atZone(ZoneId.systemDefault()); + produceMessages(cluster, topic, 50); - ConsumerGroupExecutor executor = addConsumerGroupExecutor(1, TOPIC, GROUP, GroupProtocol.CLASSIC.name); - awaitConsumerProgress(TOPIC, GROUP, 100L); - executor.shutdown(); + String[] args = buildArgsForGroup(cluster, group, + "--all-topics", "--to-datetime", format.format(checkpoint), + "--execute"); - String[] args = buildArgsForGroup(GROUP, "--all-topics", "--to-datetime", format.format(checkpoint), "--execute"); - resetAndAssertOffsets(args, 50); + try (AutoCloseable consumerGroupCloseable = + consumerGroupClosable(cluster, 1, topic, group, groupProtocol)) { + awaitConsumerProgress(cluster, topic, group, 100L); + } + + resetAndAssertOffsets(cluster, topic, args, 50); + } } - @Test - public void testResetOffsetsByDuration() throws Exception { - String[] args = buildArgsForGroup(GROUP, "--all-topics", "--by-duration", "PT1M", "--execute"); - produceConsumeAndShutdown(TOPIC, GROUP, 100, 1); - resetAndAssertOffsets(args, 0); + @ClusterTemplate("generator") + public void testResetOffsetsByDuration(ClusterInstance cluster) throws Exception { + for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) { + String group = generateRandomGroupId(); + String topic = generateRandomTopic(); + + String[] args = buildArgsForGroup(cluster, group, "--all-topics", "--by-duration", "PT1M", "--execute"); + produceConsumeAndShutdown(cluster, topic, group, 1, groupProtocol); + resetAndAssertOffsets(cluster, topic, args, 0); + } } - @Test - public void testResetOffsetsByDurationToEarliest() throws Exception { - String[] args = buildArgsForGroup(GROUP, "--all-topics", "--by-duration", "PT0.1S", "--execute"); - produceConsumeAndShutdown(TOPIC, GROUP, 100, 1); - resetAndAssertOffsets(args, 100); + @ClusterTemplate("generator") + public void testResetOffsetsByDurationToEarliest(ClusterInstance cluster) throws Exception { + for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) { + String group = generateRandomGroupId(); + String topic = generateRandomTopic(); + + String[] args = buildArgsForGroup(cluster, group, "--all-topics", "--by-duration", "PT0.1S", "--execute"); + produceConsumeAndShutdown(cluster, topic, group, 1, groupProtocol); + resetAndAssertOffsets(cluster, topic, args, 100); + } } - @Test - public void testResetOffsetsByDurationFallbackToLatestWhenNoRecords() { - String topic = "foo2"; - String[] args = buildArgsForGroup(GROUP, "--topic", topic, "--by-duration", "PT1M", "--execute"); - createTopic(topic, 1, 1, new Properties(), listenerName(), new Properties()); - resetAndAssertOffsets(args, 0, false, Collections.singletonList("foo2")); + @ClusterTemplate("generator") + public void testResetOffsetsByDurationFallbackToLatestWhenNoRecords(ClusterInstance cluster) throws ExecutionException, InterruptedException { + String group = generateRandomGroupId(); + String topic = generateRandomTopic(); - adminZkClient().deleteTopic(topic); + String[] args = buildArgsForGroup(cluster, group, "--topic", topic, "--by-duration", "PT1M", "--execute"); + + try (Admin admin = cluster.createAdminClient()) { + admin.createTopics(singleton(new NewTopic(topic, 1, (short) 1))).all().get(); + resetAndAssertOffsets(cluster, args, 0, false, singletonList(topic)); + admin.deleteTopics(singleton(topic)).all().get(); + } } - @Test - public void testResetOffsetsToEarliest() throws Exception { - String[] args = buildArgsForGroup(GROUP, "--all-topics", "--to-earliest", "--execute"); - produceConsumeAndShutdown(TOPIC, GROUP, 100, 1); - resetAndAssertOffsets(args, 0); + @ClusterTemplate("generator") + public void testResetOffsetsToEarliest(ClusterInstance cluster) throws Exception { + for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) { + String group = generateRandomGroupId(); + String topic = generateRandomTopic(); + + String[] args = buildArgsForGroup(cluster, group, "--all-topics", "--to-earliest", "--execute"); + produceConsumeAndShutdown(cluster, topic, group, 1, groupProtocol); + resetAndAssertOffsets(cluster, topic, args, 0); + } } - @Test - public void testResetOffsetsToLatest() throws Exception { - String[] args = buildArgsForGroup(GROUP, "--all-topics", "--to-latest", "--execute"); - produceConsumeAndShutdown(TOPIC, GROUP, 100, 1); - produceMessages(TOPIC, 100); - resetAndAssertOffsets(args, 200); + @ClusterTemplate("generator") + public void testResetOffsetsToLatest(ClusterInstance cluster) throws Exception { + for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) { + String group = generateRandomGroupId(); + String topic = generateRandomTopic(); + + String[] args = buildArgsForGroup(cluster, group, "--all-topics", "--to-latest", "--execute"); + produceConsumeAndShutdown(cluster, topic, group, 1, groupProtocol); + produceMessages(cluster, topic, 100); + resetAndAssertOffsets(cluster, topic, args, 200); + } } - @Test - public void testResetOffsetsToCurrentOffset() throws Exception { - String[] args = buildArgsForGroup(GROUP, "--all-topics", "--to-current", "--execute"); - produceConsumeAndShutdown(TOPIC, GROUP, 100, 1); - produceMessages(TOPIC, 100); - resetAndAssertOffsets(args, 100); + @ClusterTemplate("generator") + public void testResetOffsetsToCurrentOffset(ClusterInstance cluster) throws Exception { + for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) { + String group = generateRandomGroupId(); + String topic = generateRandomTopic(); + + String[] args = buildArgsForGroup(cluster, group, "--all-topics", "--to-current", "--execute"); + produceConsumeAndShutdown(cluster, topic, group, 1, groupProtocol); + produceMessages(cluster, topic, 100); + resetAndAssertOffsets(cluster, topic, args, 100); + } } - @Test - public void testResetOffsetsToSpecificOffset() throws Exception { - String[] args = buildArgsForGroup(GROUP, "--all-topics", "--to-offset", "1", "--execute"); - produceConsumeAndShutdown(TOPIC, GROUP, 100, 1); - resetAndAssertOffsets(args, 1); + @ClusterTemplate("generator") + public void testResetOffsetsToSpecificOffset(ClusterInstance cluster) throws Exception { + for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) { + String group = generateRandomGroupId(); + String topic = generateRandomTopic(); + + String[] args = buildArgsForGroup(cluster, group, "--all-topics", "--to-offset", "1", "--execute"); + produceConsumeAndShutdown(cluster, topic, group, 1, groupProtocol); + resetAndAssertOffsets(cluster, topic, args, 1); + } } - @Test - public void testResetOffsetsShiftPlus() throws Exception { - String[] args = buildArgsForGroup(GROUP, "--all-topics", "--shift-by", "50", "--execute"); - produceConsumeAndShutdown(TOPIC, GROUP, 100, 1); - produceMessages(TOPIC, 100); - resetAndAssertOffsets(args, 150); + @ClusterTemplate("generator") + public void testResetOffsetsShiftPlus(ClusterInstance cluster) throws Exception { + for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) { + String group = generateRandomGroupId(); + String topic = generateRandomTopic(); + + String[] args = buildArgsForGroup(cluster, group, "--all-topics", "--shift-by", "50", "--execute"); + produceConsumeAndShutdown(cluster, topic, group, 1, groupProtocol); + produceMessages(cluster, topic, 100); + resetAndAssertOffsets(cluster, topic, args, 150); + } } - @Test - public void testResetOffsetsShiftMinus() throws Exception { - String[] args = buildArgsForGroup(GROUP, "--all-topics", "--shift-by", "-50", "--execute"); - produceConsumeAndShutdown(TOPIC, GROUP, 100, 1); - produceMessages(TOPIC, 100); - resetAndAssertOffsets(args, 50); + @ClusterTemplate("generator") + public void testResetOffsetsShiftMinus(ClusterInstance cluster) throws Exception { + for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) { + String group = generateRandomGroupId(); + String topic = generateRandomTopic(); + + String[] args = buildArgsForGroup(cluster, group, "--all-topics", "--shift-by", "-50", "--execute"); + produceConsumeAndShutdown(cluster, topic, group, 1, groupProtocol); + produceMessages(cluster, topic, 100); + resetAndAssertOffsets(cluster, topic, args, 50); + } } - @Test - public void testResetOffsetsShiftByLowerThanEarliest() throws Exception { - String[] args = buildArgsForGroup(GROUP, "--all-topics", "--shift-by", "-150", "--execute"); - produceConsumeAndShutdown(TOPIC, GROUP, 100, 1); - produceMessages(TOPIC, 100); - resetAndAssertOffsets(args, 0); + @ClusterTemplate("generator") + public void testResetOffsetsShiftByLowerThanEarliest(ClusterInstance cluster) throws Exception { + for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) { + String group = generateRandomGroupId(); + String topic = generateRandomTopic(); + + String[] args = buildArgsForGroup(cluster, group, "--all-topics", "--shift-by", "-150", "--execute"); + produceConsumeAndShutdown(cluster, topic, group, 1, groupProtocol); + produceMessages(cluster, topic, 100); + resetAndAssertOffsets(cluster, topic, args, 0); + } } - @Test - public void testResetOffsetsShiftByHigherThanLatest() throws Exception { - String[] args = buildArgsForGroup(GROUP, "--all-topics", "--shift-by", "150", "--execute"); - produceConsumeAndShutdown(TOPIC, GROUP, 100, 1); - produceMessages(TOPIC, 100); - resetAndAssertOffsets(args, 200); + @ClusterTemplate("generator") + public void testResetOffsetsShiftByHigherThanLatest(ClusterInstance cluster) throws Exception { + for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) { + String group = generateRandomGroupId(); + String topic = generateRandomTopic(); + + String[] args = buildArgsForGroup(cluster, group, "--all-topics", "--shift-by", "150", "--execute"); + produceConsumeAndShutdown(cluster, topic, group, 1, groupProtocol); + produceMessages(cluster, topic, 100); + resetAndAssertOffsets(cluster, topic, args, 200); + } } - @Test - public void testResetOffsetsToEarliestOnOneTopic() throws Exception { - String[] args = buildArgsForGroup(GROUP, "--topic", TOPIC, "--to-earliest", "--execute"); - produceConsumeAndShutdown(TOPIC, GROUP, 100, 1); - resetAndAssertOffsets(args, 0); + @ClusterTemplate("generator") + public void testResetOffsetsToEarliestOnOneTopic(ClusterInstance cluster) throws Exception { + for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) { + String group = generateRandomGroupId(); + String topic = generateRandomTopic(); + + String[] args = buildArgsForGroup(cluster, group, "--topic", topic, "--to-earliest", "--execute"); + produceConsumeAndShutdown(cluster, topic, group, 1, groupProtocol); + resetAndAssertOffsets(cluster, topic, args, 0); + } } - @Test - public void testResetOffsetsToEarliestOnOneTopicAndPartition() throws Exception { - String topic = "bar"; - createTopic(topic, 2, 1, new Properties(), listenerName(), new Properties()); + @ClusterTemplate("generator") + public void testResetOffsetsToEarliestOnOneTopicAndPartition(ClusterInstance cluster) throws Exception { + for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) { + String group = generateRandomGroupId(); + String topic = generateRandomTopic(); + String[] args = buildArgsForGroup(cluster, group, "--topic", topic + ":1", + "--to-earliest", "--execute"); - String[] args = buildArgsForGroup(GROUP, "--topic", topic + ":1", "--to-earliest", "--execute"); - ConsumerGroupCommand.ConsumerGroupService consumerGroupCommand = getConsumerGroupService(args); + try (Admin admin = cluster.createAdminClient(); + ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(args)) { + admin.createTopics(singleton(new NewTopic(topic, 2, (short) 1))).all().get(); - produceConsumeAndShutdown(topic, GROUP, 100, 2); - Map priorCommittedOffsets = committedOffsets(topic, GROUP); + produceConsumeAndShutdown(cluster, topic, group, 2, groupProtocol); + Map priorCommittedOffsets = committedOffsets(cluster, topic, group); + TopicPartition tp0 = new TopicPartition(topic, 0); + TopicPartition tp1 = new TopicPartition(topic, 1); + Map expectedOffsets = new HashMap<>(); + expectedOffsets.put(tp0, priorCommittedOffsets.get(tp0)); + expectedOffsets.put(tp1, 0L); + resetAndAssertOffsetsCommitted(cluster, service, expectedOffsets, topic); - TopicPartition tp0 = new TopicPartition(topic, 0); - TopicPartition tp1 = new TopicPartition(topic, 1); - Map expectedOffsets = new HashMap<>(); - expectedOffsets.put(tp0, priorCommittedOffsets.get(tp0)); - expectedOffsets.put(tp1, 0L); - resetAndAssertOffsetsCommitted(consumerGroupCommand, expectedOffsets, topic); - - adminZkClient().deleteTopic(topic); + admin.deleteTopics(singleton(topic)).all().get(); + } + } } - @Test - public void testResetOffsetsToEarliestOnTopics() throws Exception { - String topic1 = "topic1"; - String topic2 = "topic2"; - createTopic(topic1, 1, 1, new Properties(), listenerName(), new Properties()); - createTopic(topic2, 1, 1, new Properties(), listenerName(), new Properties()); + @ClusterTemplate("generator") + public void testResetOffsetsToEarliestOnTopics(ClusterInstance cluster) throws Exception { + for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) { + String group = generateRandomGroupId(); + String topic1 = generateRandomTopic(); + String topic2 = generateRandomTopic(); + String[] args = buildArgsForGroup(cluster, group, + "--topic", topic1, + "--topic", topic2, + "--to-earliest", "--execute"); - String[] args = buildArgsForGroup(GROUP, "--topic", topic1, "--topic", topic2, "--to-earliest", "--execute"); - ConsumerGroupCommand.ConsumerGroupService consumerGroupCommand = getConsumerGroupService(args); + try (Admin admin = cluster.createAdminClient(); + ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(args)) { + admin.createTopics(asList(new NewTopic(topic1, 1, (short) 1), + new NewTopic(topic2, 1, (short) 1))).all().get(); - produceConsumeAndShutdown(topic1, GROUP, 100, 1); - produceConsumeAndShutdown(topic2, GROUP, 100, 1); + produceConsumeAndShutdown(cluster, topic1, group, 1, groupProtocol); + produceConsumeAndShutdown(cluster, topic2, group, 1, groupProtocol); - TopicPartition tp1 = new TopicPartition(topic1, 0); - TopicPartition tp2 = new TopicPartition(topic2, 0); + TopicPartition tp1 = new TopicPartition(topic1, 0); + TopicPartition tp2 = new TopicPartition(topic2, 0); - Map allResetOffsets = toOffsetMap(resetOffsets(consumerGroupCommand).get(GROUP)); - Map expMap = new HashMap<>(); - expMap.put(tp1, 0L); - expMap.put(tp2, 0L); - assertEquals(expMap, allResetOffsets); - assertEquals(Collections.singletonMap(tp1, 0L), committedOffsets(topic1, GROUP)); - assertEquals(Collections.singletonMap(tp2, 0L), committedOffsets(topic2, GROUP)); + Map allResetOffsets = toOffsetMap(resetOffsets(service).get(group)); + Map expMap = new HashMap<>(); + expMap.put(tp1, 0L); + expMap.put(tp2, 0L); + assertEquals(expMap, allResetOffsets); + assertEquals(singletonMap(tp1, 0L), committedOffsets(cluster, topic1, group)); + assertEquals(singletonMap(tp2, 0L), committedOffsets(cluster, topic2, group)); - adminZkClient().deleteTopic(topic1); - adminZkClient().deleteTopic(topic2); + admin.deleteTopics(asList(topic1, topic2)).all().get(); + } + } } - @Test - public void testResetOffsetsToEarliestOnTopicsAndPartitions() throws Exception { - String topic1 = "topic1"; - String topic2 = "topic2"; + @ClusterTemplate("generator") + public void testResetOffsetsToEarliestOnTopicsAndPartitions(ClusterInstance cluster) throws Exception { + for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) { + String group = generateRandomGroupId(); + String topic1 = generateRandomTopic(); + String topic2 = generateRandomTopic(); + String[] args = buildArgsForGroup(cluster, group, + "--topic", topic1 + ":1", + "--topic", topic2 + ":1", + "--to-earliest", "--execute"); - createTopic(topic1, 2, 1, new Properties(), listenerName(), new Properties()); - createTopic(topic2, 2, 1, new Properties(), listenerName(), new Properties()); + try (Admin admin = cluster.createAdminClient(); + ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(args)) { + admin.createTopics(asList(new NewTopic(topic1, 2, (short) 1), + new NewTopic(topic2, 2, (short) 1))).all().get(); - String[] args = buildArgsForGroup(GROUP, "--topic", topic1 + ":1", "--topic", topic2 + ":1", "--to-earliest", "--execute"); - ConsumerGroupCommand.ConsumerGroupService consumerGroupCommand = getConsumerGroupService(args); + produceConsumeAndShutdown(cluster, topic1, group, 2, groupProtocol); + produceConsumeAndShutdown(cluster, topic2, group, 2, groupProtocol); - produceConsumeAndShutdown(topic1, GROUP, 100, 2); - produceConsumeAndShutdown(topic2, GROUP, 100, 2); + Map priorCommittedOffsets1 = + committedOffsets(cluster, topic1, group); + Map priorCommittedOffsets2 = + committedOffsets(cluster, topic2, group); - Map priorCommittedOffsets1 = committedOffsets(topic1, GROUP); - Map priorCommittedOffsets2 = committedOffsets(topic2, GROUP); + TopicPartition tp1 = new TopicPartition(topic1, 1); + TopicPartition tp2 = new TopicPartition(topic2, 1); + Map allResetOffsets = toOffsetMap(resetOffsets(service).get(group)); + Map expMap = new HashMap<>(); + expMap.put(tp1, 0L); + expMap.put(tp2, 0L); + assertEquals(expMap, allResetOffsets); + priorCommittedOffsets1.put(tp1, 0L); + assertEquals(priorCommittedOffsets1, committedOffsets(cluster, topic1, group)); + priorCommittedOffsets2.put(tp2, 0L); + assertEquals(priorCommittedOffsets2, committedOffsets(cluster, topic2, group)); - TopicPartition tp1 = new TopicPartition(topic1, 1); - TopicPartition tp2 = new TopicPartition(topic2, 1); - Map allResetOffsets = toOffsetMap(resetOffsets(consumerGroupCommand).get(GROUP)); - Map expMap = new HashMap<>(); - expMap.put(tp1, 0L); - expMap.put(tp2, 0L); - assertEquals(expMap, allResetOffsets); - priorCommittedOffsets1.put(tp1, 0L); - assertEquals(priorCommittedOffsets1, committedOffsets(topic1, GROUP)); - priorCommittedOffsets2.put(tp2, 0L); - assertEquals(priorCommittedOffsets2, committedOffsets(topic2, GROUP)); - - adminZkClient().deleteTopic(topic1); - adminZkClient().deleteTopic(topic2); + admin.deleteTopics(asList(topic1, topic2)).all().get(); + } + } } - @Test - // This one deals with old CSV export/import format for a single --group arg: "topic,partition,offset" to support old behavior - public void testResetOffsetsExportImportPlanSingleGroupArg() throws Exception { - String topic = "bar"; - TopicPartition tp0 = new TopicPartition(topic, 0); - TopicPartition tp1 = new TopicPartition(topic, 1); - createTopic(topic, 2, 1, new Properties(), listenerName(), new Properties()); + @ClusterTemplate("generator") + // This one deals with old CSV export/import format for a single --group arg: + // "topic,partition,offset" to support old behavior + public void testResetOffsetsExportImportPlanSingleGroupArg(ClusterInstance cluster) throws Exception { + for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) { + String group = generateRandomGroupId(); + String topic = generateRandomTopic(); - String[] cgcArgs = buildArgsForGroup(GROUP, "--all-topics", "--to-offset", "2", "--export"); - ConsumerGroupCommand.ConsumerGroupService consumerGroupCommand = getConsumerGroupService(cgcArgs); + TopicPartition tp0 = new TopicPartition(topic, 0); + TopicPartition tp1 = new TopicPartition(topic, 1); + String[] cgcArgs = buildArgsForGroup(cluster, group, "--all-topics", "--to-offset", "2", "--export"); + File file = TestUtils.tempFile("reset", ".csv"); - produceConsumeAndShutdown(topic, GROUP, 100, 2); + try (Admin admin = cluster.createAdminClient(); + ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs)) { - File file = TestUtils.tempFile("reset", ".csv"); + admin.createTopics(singleton(new NewTopic(topic, 2, (short) 1))).all().get(); + produceConsumeAndShutdown(cluster, topic, group, 2, groupProtocol); - Map> exportedOffsets = consumerGroupCommand.resetOffsets(); - BufferedWriter bw = new BufferedWriter(new FileWriter(file)); - bw.write(consumerGroupCommand.exportOffsetsToCsv(exportedOffsets)); - bw.close(); + Map> exportedOffsets = service.resetOffsets(); - Map exp1 = new HashMap<>(); - exp1.put(tp0, 2L); - exp1.put(tp1, 2L); - assertEquals(exp1, toOffsetMap(exportedOffsets.get(GROUP))); + writeContentToFile(file, service.exportOffsetsToCsv(exportedOffsets)); - String[] cgcArgsExec = buildArgsForGroup(GROUP, "--all-topics", "--from-file", file.getCanonicalPath(), "--dry-run"); - ConsumerGroupCommand.ConsumerGroupService consumerGroupCommandExec = getConsumerGroupService(cgcArgsExec); - Map> importedOffsets = consumerGroupCommandExec.resetOffsets(); - assertEquals(exp1, toOffsetMap(importedOffsets.get(GROUP))); + Map exp1 = new HashMap<>(); + exp1.put(tp0, 2L); + exp1.put(tp1, 2L); + assertEquals(exp1, toOffsetMap(exportedOffsets.get(group))); - adminZkClient().deleteTopic(topic); + String[] cgcArgsExec = buildArgsForGroup(cluster, group, "--all-topics", + "--from-file", file.getCanonicalPath(), "--dry-run"); + try (ConsumerGroupCommand.ConsumerGroupService serviceExec = getConsumerGroupService(cgcArgsExec)) { + Map> importedOffsets = serviceExec.resetOffsets(); + assertEquals(exp1, toOffsetMap(importedOffsets.get(group))); + } + + admin.deleteTopics(singleton(topic)); + } + } } - @Test + @ClusterTemplate("generator") // This one deals with universal CSV export/import file format "group,topic,partition,offset", // supporting multiple --group args or --all-groups arg - public void testResetOffsetsExportImportPlan() throws Exception { - String group1 = GROUP + "1"; - String group2 = GROUP + "2"; - String topic1 = "bar1"; - String topic2 = "bar2"; - TopicPartition t1p0 = new TopicPartition(topic1, 0); - TopicPartition t1p1 = new TopicPartition(topic1, 1); - TopicPartition t2p0 = new TopicPartition(topic2, 0); - TopicPartition t2p1 = new TopicPartition(topic2, 1); - createTopic(topic1, 2, 1, new Properties(), listenerName(), new Properties()); - createTopic(topic2, 2, 1, new Properties(), listenerName(), new Properties()); + public void testResetOffsetsExportImportPlan(ClusterInstance cluster) throws Exception { + for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) { + String group1 = generateRandomGroupId(); + String group2 = generateRandomGroupId(); + String topic1 = generateRandomTopic(); + String topic2 = generateRandomTopic(); - String[] cgcArgs = buildArgsForGroups(Arrays.asList(group1, group2), "--all-topics", "--to-offset", "2", "--export"); - ConsumerGroupCommand.ConsumerGroupService consumerGroupCommand = getConsumerGroupService(cgcArgs); + TopicPartition t1p0 = new TopicPartition(topic1, 0); + TopicPartition t1p1 = new TopicPartition(topic1, 1); + TopicPartition t2p0 = new TopicPartition(topic2, 0); + TopicPartition t2p1 = new TopicPartition(topic2, 1); + String[] cgcArgs = buildArgsForGroups(cluster, asList(group1, group2), + "--all-topics", "--to-offset", "2", "--export"); + File file = TestUtils.tempFile("reset", ".csv"); - produceConsumeAndShutdown(topic1, group1, 100, 1); - produceConsumeAndShutdown(topic2, group2, 100, 1); + try (Admin admin = cluster.createAdminClient(); + ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs)) { - awaitConsumerGroupInactive(consumerGroupCommand, group1); - awaitConsumerGroupInactive(consumerGroupCommand, group2); + admin.createTopics(asList(new NewTopic(topic1, 2, (short) 1), + new NewTopic(topic2, 2, (short) 1))).all().get(); - File file = TestUtils.tempFile("reset", ".csv"); + produceConsumeAndShutdown(cluster, topic1, group1, 1, groupProtocol); + produceConsumeAndShutdown(cluster, topic2, group2, 1, groupProtocol); - Map> exportedOffsets = consumerGroupCommand.resetOffsets(); - BufferedWriter bw = new BufferedWriter(new FileWriter(file)); - bw.write(consumerGroupCommand.exportOffsetsToCsv(exportedOffsets)); - bw.close(); - Map exp1 = new HashMap<>(); - exp1.put(t1p0, 2L); - exp1.put(t1p1, 2L); - Map exp2 = new HashMap<>(); - exp2.put(t2p0, 2L); - exp2.put(t2p1, 2L); + awaitConsumerGroupInactive(service, group1); + awaitConsumerGroupInactive(service, group2); - assertEquals(exp1, toOffsetMap(exportedOffsets.get(group1))); - assertEquals(exp2, toOffsetMap(exportedOffsets.get(group2))); + Map> exportedOffsets = service.resetOffsets(); - // Multiple --group's offset import - String[] cgcArgsExec = buildArgsForGroups(Arrays.asList(group1, group2), "--all-topics", "--from-file", file.getCanonicalPath(), "--dry-run"); - ConsumerGroupCommand.ConsumerGroupService consumerGroupCommandExec = getConsumerGroupService(cgcArgsExec); - Map> importedOffsets = consumerGroupCommandExec.resetOffsets(); - assertEquals(exp1, toOffsetMap(importedOffsets.get(group1))); - assertEquals(exp2, toOffsetMap(importedOffsets.get(group2))); + writeContentToFile(file, service.exportOffsetsToCsv(exportedOffsets)); - // Single --group offset import using "group,topic,partition,offset" csv format - String[] cgcArgsExec2 = buildArgsForGroup(group1, "--all-topics", "--from-file", file.getCanonicalPath(), "--dry-run"); - ConsumerGroupCommand.ConsumerGroupService consumerGroupCommandExec2 = getConsumerGroupService(cgcArgsExec2); - Map> importedOffsets2 = consumerGroupCommandExec2.resetOffsets(); - assertEquals(exp1, toOffsetMap(importedOffsets2.get(group1))); + Map exp1 = new HashMap<>(); + exp1.put(t1p0, 2L); + exp1.put(t1p1, 2L); + Map exp2 = new HashMap<>(); + exp2.put(t2p0, 2L); + exp2.put(t2p1, 2L); - adminZkClient().deleteTopic(TOPIC); + assertEquals(exp1, toOffsetMap(exportedOffsets.get(group1))); + assertEquals(exp2, toOffsetMap(exportedOffsets.get(group2))); + + // Multiple --group's offset import + String[] cgcArgsExec = buildArgsForGroups(cluster, asList(group1, group2), + "--all-topics", + "--from-file", file.getCanonicalPath(), "--dry-run"); + try (ConsumerGroupCommand.ConsumerGroupService serviceExec = getConsumerGroupService(cgcArgsExec)) { + Map> importedOffsets = serviceExec.resetOffsets(); + assertEquals(exp1, toOffsetMap(importedOffsets.get(group1))); + assertEquals(exp2, toOffsetMap(importedOffsets.get(group2))); + } + + // Single --group offset import using "group,topic,partition,offset" csv format + String[] cgcArgsExec2 = buildArgsForGroup(cluster, group1, "--all-topics", + "--from-file", file.getCanonicalPath(), "--dry-run"); + try (ConsumerGroupCommand.ConsumerGroupService serviceExec2 = getConsumerGroupService(cgcArgsExec2)) { + Map> importedOffsets2 = serviceExec2.resetOffsets(); + assertEquals(exp1, toOffsetMap(importedOffsets2.get(group1))); + } + + admin.deleteTopics(asList(topic1, topic2)); + } + } } - @Test - public void testResetWithUnrecognizedNewConsumerOption() { - String[] cgcArgs = new String[]{"--new-consumer", "--bootstrap-server", bootstrapServers(listenerName()), "--reset-offsets", - "--group", GROUP, "--all-topics", "--to-offset", "2", "--export"}; + @ClusterTemplate("generator") + public void testResetWithUnrecognizedNewConsumerOption(ClusterInstance cluster) { + String group = generateRandomGroupId(); + String[] cgcArgs = new String[]{"--new-consumer", + "--bootstrap-server", cluster.bootstrapServers(), + "--reset-offsets", "--group", group, "--all-topics", + "--to-offset", "2", "--export"}; assertThrows(OptionException.class, () -> getConsumerGroupService(cgcArgs)); } - private void produceMessages(String topic, int numMessages) { - List> records = IntStream.range(0, numMessages) - .mapToObj(i -> new ProducerRecord(topic, new byte[100 * 1000])) - .collect(Collectors.toList()); - kafka.utils.TestUtils.produceMessages(servers(), seq(records), 1); + private String generateRandomTopic() { + return TOPIC_PREFIX + TestUtils.randomString(10); } - private void produceConsumeAndShutdown(String topic, String group, int totalMessages, int numConsumers) throws Exception { - produceMessages(topic, totalMessages); - ConsumerGroupExecutor executor = addConsumerGroupExecutor(numConsumers, topic, group, GroupProtocol.CLASSIC.name); - awaitConsumerProgress(topic, group, totalMessages); - executor.shutdown(); + private String generateRandomGroupId() { + return GROUP_PREFIX + TestUtils.randomString(10); } - private void awaitConsumerProgress(String topic, - String group, - long count) throws Exception { - try (Consumer consumer = createNoAutoCommitConsumer(group)) { - Set partitions = consumer.partitionsFor(topic).stream() - .map(partitionInfo -> new TopicPartition(partitionInfo.topic(), partitionInfo.partition())) - .collect(Collectors.toSet()); - - TestUtils.waitForCondition(() -> { - Collection committed = consumer.committed(partitions).values(); - long total = committed.stream() - .mapToLong(offsetAndMetadata -> Optional.ofNullable(offsetAndMetadata).map(OffsetAndMetadata::offset).orElse(0L)) - .sum(); - - return total == count; - }, "Expected that consumer group has consumed all messages from topic/partition. " + - "Expected offset: " + count + ". Actual offset: " + committedOffsets(topic, group).values().stream().mapToLong(Long::longValue).sum()); + private Map committedOffsets(ClusterInstance cluster, + String topic, + String group) { + try (Admin admin = Admin.create(singletonMap(BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()))) { + return admin.listConsumerGroupOffsets(group) + .all().get() + .get(group).entrySet() + .stream() + .filter(e -> e.getKey().topic().equals(topic)) + .collect(toMap(Map.Entry::getKey, e -> e.getValue().offset())); + } catch (ExecutionException | InterruptedException e) { + throw new RuntimeException(e); } } - private void awaitConsumerGroupInactive(ConsumerGroupCommand.ConsumerGroupService consumerGroupService, String group) throws Exception { - TestUtils.waitForCondition(() -> { - ConsumerGroupState state = consumerGroupService.collectGroupState(group).state; - return Objects.equals(state, ConsumerGroupState.EMPTY) || Objects.equals(state, ConsumerGroupState.DEAD); - }, "Expected that consumer group is inactive. Actual state: " + consumerGroupService.collectGroupState(group).state); + private ConsumerGroupCommand.ConsumerGroupService getConsumerGroupService(String[] args) { + return new ConsumerGroupCommand.ConsumerGroupService( + ConsumerGroupCommandOptions.fromArgs(args), + singletonMap(RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE))); } - private void resetAndAssertOffsets(String[] args, + private void produceMessages(ClusterInstance cluster, String topic, int numMessages) { + List> records = IntStream.range(0, numMessages) + .mapToObj(i -> new ProducerRecord(topic, new byte[100 * 1000])) + .collect(Collectors.toList()); + produceMessages(cluster, records); + } + + private void produceMessages(ClusterInstance cluster, List> records) { + try (Producer producer = createProducer(cluster)) { + records.forEach(producer::send); + } + } + + private Producer createProducer(ClusterInstance cluster) { + Properties props = new Properties(); + props.put(BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()); + props.put(ACKS_CONFIG, "1"); + props.put(KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); + props.put(VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); + return new KafkaProducer<>(props); + } + + private void resetAndAssertOffsets(ClusterInstance cluster, + String topic, + String[] args, long expectedOffset) { - resetAndAssertOffsets(args, expectedOffset, false, Collections.singletonList(TOPIC)); + resetAndAssertOffsets(cluster, args, expectedOffset, false, singletonList(topic)); } - private void resetAndAssertOffsets(String[] args, - long expectedOffset, - boolean dryRun, - List topics) { - ConsumerGroupCommand.ConsumerGroupService consumerGroupCommand = getConsumerGroupService(args); - Map> expectedOffsets = topics.stream().collect(Collectors.toMap( - Function.identity(), - topic -> Collections.singletonMap(new TopicPartition(topic, 0), expectedOffset))); - Map> resetOffsetsResultByGroup = resetOffsets(consumerGroupCommand); - - try { + private void resetAndAssertOffsets(ClusterInstance cluster, + String[] args, + long expectedOffset, + boolean dryRun, + List topics) { + try (ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(args)) { + Map> topicToExpectedOffsets = getTopicExceptOffsets(topics, expectedOffset); + Map> resetOffsetsResultByGroup = + resetOffsets(service); for (final String topic : topics) { resetOffsetsResultByGroup.forEach((group, partitionInfo) -> { - Map priorOffsets = committedOffsets(topic, group); - assertEquals(expectedOffsets.get(topic), - partitionInfo.entrySet().stream() - .filter(entry -> Objects.equals(entry.getKey().topic(), topic)) - .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().offset()))); - assertEquals(dryRun ? priorOffsets : expectedOffsets.get(topic), committedOffsets(topic, group)); + Map priorOffsets = committedOffsets(cluster, topic, group); + assertEquals(topicToExpectedOffsets.get(topic), partitionToOffsets(topic, partitionInfo)); + assertEquals(dryRun ? priorOffsets : topicToExpectedOffsets.get(topic), + committedOffsets(cluster, topic, group)); }); } - } finally { - consumerGroupCommand.close(); } } - private void resetAndAssertOffsetsCommitted(ConsumerGroupCommand.ConsumerGroupService consumerGroupService, - Map expectedOffsets, - String topic) { - Map> allResetOffsets = resetOffsets(consumerGroupService); - - allResetOffsets.forEach((group, offsetsInfo) -> { - offsetsInfo.forEach((tp, offsetMetadata) -> { - assertEquals(offsetMetadata.offset(), expectedOffsets.get(tp)); - assertEquals(expectedOffsets, committedOffsets(topic, group)); - }); - }); + private Map> getTopicExceptOffsets(List topics, + long expectedOffset) { + return topics.stream() + .collect(toMap(Function.identity(), + topic -> singletonMap(new TopicPartition(topic, 0), + expectedOffset))); } - private Map> resetOffsets(ConsumerGroupCommand.ConsumerGroupService consumerGroupService) { + private Map> resetOffsets( + ConsumerGroupCommand.ConsumerGroupService consumerGroupService) { return consumerGroupService.resetOffsets(); } - Map toOffsetMap(Map map) { - return map.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().offset())); + private Map partitionToOffsets(String topic, + Map partitionInfo) { + return partitionInfo.entrySet() + .stream() + .filter(entry -> Objects.equals(entry.getKey().topic(), topic)) + .collect(toMap(Map.Entry::getKey, e -> e.getValue().offset())); } - private String[] addTo(String[] args, String...extra) { - List res = new ArrayList<>(Arrays.asList(args)); - res.addAll(Arrays.asList(extra)); + private static List generateIds(String name) { + return IntStream.rangeClosed(1, 2) + .mapToObj(id -> name + id) + .collect(Collectors.toList()); + } + + private void produceConsumeAndShutdown(ClusterInstance cluster, + String topic, + String group, + int numConsumers, + GroupProtocol groupProtocol) throws Exception { + produceMessages(cluster, topic, 100); + try (AutoCloseable consumerGroupCloseable = + consumerGroupClosable(cluster, numConsumers, topic, group, groupProtocol)) { + awaitConsumerProgress(cluster, topic, group, 100); + } + } + + private void writeContentToFile(File file, String content) throws IOException { + try (BufferedWriter bw = new BufferedWriter(new FileWriter(file))) { + bw.write(content); + } + } + + private AutoCloseable consumerGroupClosable(ClusterInstance cluster, + int numConsumers, + String topic, + String group, + GroupProtocol groupProtocol) { + Map configs = composeConsumerConfigs(cluster, group, groupProtocol); + return ConsumerGroupCommandTestUtils.buildConsumers( + numConsumers, + false, + topic, + () -> new KafkaConsumer(configs)); + } + + private Map composeConsumerConfigs(ClusterInstance cluster, + String group, + GroupProtocol groupProtocol) { + HashMap configs = new HashMap<>(); + configs.put(BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()); + configs.put(GROUP_ID_CONFIG, group); + configs.put(GROUP_PROTOCOL_CONFIG, groupProtocol.name); + configs.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + configs.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + configs.put(PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName()); + configs.put(AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000); + configs.put(GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 1000); + return configs; + } + + private void awaitConsumerProgress(ClusterInstance cluster, + String topic, + String group, + long count) throws Exception { + try (Admin admin = Admin.create(singletonMap(BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()))) { + Supplier offsets = () -> { + try { + return admin.listConsumerGroupOffsets(group) + .all().get().get(group) + .entrySet() + .stream() + .filter(e -> e.getKey().topic().equals(topic)) + .mapToLong(e -> e.getValue().offset()) + .sum(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + }; + TestUtils.waitForCondition(() -> offsets.get() == count, + "Expected that consumer group has consumed all messages from topic/partition. " + + "Expected offset: " + count + + ". Actual offset: " + offsets.get()); + } + } + + private void awaitConsumerGroupInactive(ConsumerGroupCommand.ConsumerGroupService service, + String group) throws Exception { + TestUtils.waitForCondition(() -> { + ConsumerGroupState state = service.collectGroupState(group).state; + return Objects.equals(state, ConsumerGroupState.EMPTY) || Objects.equals(state, ConsumerGroupState.DEAD); + }, "Expected that consumer group is inactive. Actual state: " + + service.collectGroupState(group).state); + } + + private void resetAndAssertOffsetsCommitted(ClusterInstance cluster, + ConsumerGroupCommand.ConsumerGroupService service, + Map expectedOffsets, + String topic) { + Map> allResetOffsets = resetOffsets(service); + + allResetOffsets.forEach((group, offsetsInfo) -> offsetsInfo.forEach((tp, offsetMetadata) -> { + assertEquals(offsetMetadata.offset(), expectedOffsets.get(tp)); + assertEquals(expectedOffsets, committedOffsets(cluster, topic, group)); + })); + } + + private Map toOffsetMap(Map map) { + return map.entrySet() + .stream() + .collect(toMap(Map.Entry::getKey, e -> e.getValue().offset())); + } + + private String[] addTo(String[] args, String... extra) { + List res = new ArrayList<>(asList(args)); + res.addAll(asList(extra)); return res.toArray(new String[0]); } }