diff --git a/tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java b/tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java index a0140182b08..3fca3272f1e 100644 --- a/tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java @@ -210,8 +210,6 @@ public class DescribeStreamsGroupTest { @Test public void testDescribeMultipleStreamsGroupWithMembersAndVerboseOptions() throws Exception { cluster.createTopic(INPUT_TOPIC_2, 1, 1); - KafkaStreams streams2 = new KafkaStreams(topology(INPUT_TOPIC_2, OUTPUT_TOPIC_2), streamsProp(APP_ID_2)); - startApplicationAndWaitUntilRunning(streams2); final List expectedHeader = List.of("GROUP", "TARGET-ASSIGNMENT-EPOCH", "TOPOLOGY-EPOCH", "MEMBER", "MEMBER-PROTOCOL", "MEMBER-EPOCH", "PROCESS", "CLIENT-ID", "ASSIGNMENTS"); final Set> expectedRows1 = Set.of( @@ -227,18 +225,22 @@ public class DescribeStreamsGroupTest { // The member and process names as well as client-id are not deterministic, so we don't care about them. final List dontCares = List.of(3, 6, 7); - validateDescribeOutput( - List.of("--bootstrap-server", bootstrapServers, "--describe", "--members", "--verbose", "--group", APP_ID, "--group", APP_ID_2), - expectedHeader, expectedRowsMap, dontCares); - validateDescribeOutput( - List.of("--bootstrap-server", bootstrapServers, "--describe", "--verbose", "--members", "--group", APP_ID, "--group", APP_ID_2), - expectedHeader, expectedRowsMap, dontCares); - validateDescribeOutput( - List.of("--bootstrap-server", bootstrapServers, "--describe", "--verbose", "--members", "--all-groups"), - expectedHeader, expectedRowsMap, dontCares); + try (KafkaStreams streams2 = new KafkaStreams(topology(INPUT_TOPIC_2, OUTPUT_TOPIC_2), streamsProp(APP_ID_2))) { + startApplicationAndWaitUntilRunning(streams2); - streams2.close(); - streams2.cleanUp(); + validateDescribeOutput( + List.of("--bootstrap-server", bootstrapServers, "--describe", "--members", "--verbose", "--group", APP_ID, "--group", APP_ID_2), + expectedHeader, expectedRowsMap, dontCares); + validateDescribeOutput( + List.of("--bootstrap-server", bootstrapServers, "--describe", "--verbose", "--members", "--group", APP_ID, "--group", APP_ID_2), + expectedHeader, expectedRowsMap, dontCares); + validateDescribeOutput( + List.of("--bootstrap-server", bootstrapServers, "--describe", "--verbose", "--members", "--all-groups"), + expectedHeader, expectedRowsMap, dontCares); + + streams2.close(); + streams2.cleanUp(); + } } @Test @@ -257,15 +259,15 @@ public class DescribeStreamsGroupTest { @Test public void testDescribeStreamsGroupWithShortTimeout() throws Exception { cluster.createTopic(INPUT_TOPIC_2, 1, 1); - KafkaStreams streams2 = new KafkaStreams(topology(INPUT_TOPIC_2, OUTPUT_TOPIC_2), streamsProp(APP_ID_2)); - startApplicationAndWaitUntilRunning(streams2); + try (KafkaStreams streams2 = new KafkaStreams(topology(INPUT_TOPIC_2, OUTPUT_TOPIC_2), streamsProp(APP_ID_2))) { + startApplicationAndWaitUntilRunning(streams2); + List args = List.of("--bootstrap-server", bootstrapServers, "--describe", "--members", "--verbose", "--group", APP_ID, "--timeout", "1"); + Throwable e = assertThrows(ExecutionException.class, () -> getStreamsGroupService(args.toArray(new String[0])).describeGroups()); + assertEquals(TimeoutException.class, e.getCause().getClass()); - List args = List.of("--bootstrap-server", bootstrapServers, "--describe", "--members", "--verbose", "--group", APP_ID, "--timeout", "1"); - Throwable e = assertThrows(ExecutionException.class, () -> getStreamsGroupService(args.toArray(new String[0])).describeGroups()); - assertEquals(TimeoutException.class, e.getCause().getClass()); - - streams2.close(); - streams2.cleanUp(); + streams2.close(); + streams2.cleanUp(); + } } private static Topology topology(String inputTopic, String outputTopic) {