diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java index 5c054767f99..daadc5beb70 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java @@ -40,7 +40,6 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.coordinator.group.GroupCoordinatorConfig; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.extension.ExtendWith; @@ -70,7 +69,6 @@ public class DeleteOffsetsConsumerGroupCommandIntegrationTest { public static final String GROUP = "test.group"; private final ClusterInstance clusterInstance; - private ConsumerGroupCommand.ConsumerGroupService consumerGroupService; private final Iterable> consumerConfigs; DeleteOffsetsConsumerGroupCommandIntegrationTest(ClusterInstance clusterInstance) { @@ -81,21 +79,14 @@ public class DeleteOffsetsConsumerGroupCommandIntegrationTest { : Collections.singletonList(Collections.emptyMap()); } - @AfterEach - public void tearDown() { - if (consumerGroupService != null) { - consumerGroupService.close(); - } - } - @ClusterTest public void testDeleteOffsetsNonExistingGroup() { String group = "missing.group"; String topic = "foo:1"; - setupConsumerGroupService(getArgs(group, topic)); - - Entry> res = consumerGroupService.deleteOffsets(group, Collections.singletonList(topic)); - assertEquals(Errors.GROUP_ID_NOT_FOUND, res.getKey()); + try (ConsumerGroupCommand.ConsumerGroupService consumerGroupService = consumerGroupService(getArgs(group, topic))) { + Entry> res = consumerGroupService.deleteOffsets(group, Collections.singletonList(topic)); + assertEquals(Errors.GROUP_ID_NOT_FOUND, res.getKey()); + } } @ClusterTest @@ -171,8 +162,8 @@ public class DeleteOffsetsConsumerGroupCommandIntegrationTest { }; } - private void setupConsumerGroupService(String[] args) { - consumerGroupService = new ConsumerGroupCommand.ConsumerGroupService( + private static ConsumerGroupCommand.ConsumerGroupService consumerGroupService(String[] args) { + return new ConsumerGroupCommand.ConsumerGroupService( ConsumerGroupCommandOptions.fromArgs(args), Collections.singletonMap(AdminClientConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE)) ); @@ -187,20 +178,20 @@ public class DeleteOffsetsConsumerGroupCommandIntegrationTest { produceRecord(); this.withConsumerGroup(() -> { String topic = inputPartition >= 0 ? inputTopic + ":" + inputPartition : inputTopic; - setupConsumerGroupService(getArgs(GROUP, topic)); - - Entry> res = consumerGroupService.deleteOffsets(GROUP, Collections.singletonList(topic)); - Errors topLevelError = res.getKey(); - Map partitions = res.getValue(); - TopicPartition tp = new TopicPartition(inputTopic, expectedPartition); - // Partition level error should propagate to top level, unless this is due to a missed partition attempt. - if (inputPartition >= 0) { - assertEquals(expectedError, topLevelError); + try (ConsumerGroupCommand.ConsumerGroupService consumerGroupService = consumerGroupService(getArgs(GROUP, topic))) { + Entry> res = consumerGroupService.deleteOffsets(GROUP, Collections.singletonList(topic)); + Errors topLevelError = res.getKey(); + Map partitions = res.getValue(); + TopicPartition tp = new TopicPartition(inputTopic, expectedPartition); + // Partition level error should propagate to top level, unless this is due to a missed partition attempt. + if (inputPartition >= 0) { + assertEquals(expectedError, topLevelError); + } + if (expectedError == Errors.NONE) + assertNull(partitions.get(tp)); + else + assertEquals(expectedError.exception(), partitions.get(tp).getCause()); } - if (expectedError == Errors.NONE) - assertNull(partitions.get(tp)); - else - assertEquals(expectedError.exception(), partitions.get(tp).getCause()); }, isStable, consumerConfig); }