address reviews

This commit is contained in:
aliehsaeedii 2025-09-02 11:36:20 +02:00
parent 594b36f0a5
commit 895abd9d2d
1 changed files with 23 additions and 21 deletions

View File

@ -210,8 +210,6 @@ public class DescribeStreamsGroupTest {
@Test
public void testDescribeMultipleStreamsGroupWithMembersAndVerboseOptions() throws Exception {
cluster.createTopic(INPUT_TOPIC_2, 1, 1);
KafkaStreams streams2 = new KafkaStreams(topology(INPUT_TOPIC_2, OUTPUT_TOPIC_2), streamsProp(APP_ID_2));
startApplicationAndWaitUntilRunning(streams2);
final List<String> expectedHeader = List.of("GROUP", "TARGET-ASSIGNMENT-EPOCH", "TOPOLOGY-EPOCH", "MEMBER", "MEMBER-PROTOCOL", "MEMBER-EPOCH", "PROCESS", "CLIENT-ID", "ASSIGNMENTS");
final Set<List<String>> expectedRows1 = Set.of(
@ -227,6 +225,9 @@ public class DescribeStreamsGroupTest {
// The member and process names as well as client-id are not deterministic, so we don't care about them.
final List<Integer> dontCares = List.of(3, 6, 7);
try (KafkaStreams streams2 = new KafkaStreams(topology(INPUT_TOPIC_2, OUTPUT_TOPIC_2), streamsProp(APP_ID_2))) {
startApplicationAndWaitUntilRunning(streams2);
validateDescribeOutput(
List.of("--bootstrap-server", bootstrapServers, "--describe", "--members", "--verbose", "--group", APP_ID, "--group", APP_ID_2),
expectedHeader, expectedRowsMap, dontCares);
@ -240,6 +241,7 @@ public class DescribeStreamsGroupTest {
streams2.close();
streams2.cleanUp();
}
}
@Test
public void testDescribeNonExistingStreamsGroup() {
@ -257,9 +259,8 @@ public class DescribeStreamsGroupTest {
@Test
public void testDescribeStreamsGroupWithShortTimeout() throws Exception {
cluster.createTopic(INPUT_TOPIC_2, 1, 1);
KafkaStreams streams2 = new KafkaStreams(topology(INPUT_TOPIC_2, OUTPUT_TOPIC_2), streamsProp(APP_ID_2));
try (KafkaStreams streams2 = new KafkaStreams(topology(INPUT_TOPIC_2, OUTPUT_TOPIC_2), streamsProp(APP_ID_2))) {
startApplicationAndWaitUntilRunning(streams2);
List<String> args = List.of("--bootstrap-server", bootstrapServers, "--describe", "--members", "--verbose", "--group", APP_ID, "--timeout", "1");
Throwable e = assertThrows(ExecutionException.class, () -> getStreamsGroupService(args.toArray(new String[0])).describeGroups());
assertEquals(TimeoutException.class, e.getCause().getClass());
@ -267,6 +268,7 @@ public class DescribeStreamsGroupTest {
streams2.close();
streams2.cleanUp();
}
}
private static Topology topology(String inputTopic, String outputTopic) {
final StreamsBuilder builder = new StreamsBuilder();