create streams group

This commit is contained in:
aliehsaeedii 2025-08-07 10:33:41 +02:00
parent 21db7e00bc
commit 594b36f0a5
1 changed files with 11 additions and 1 deletions

View File

@ -72,6 +72,8 @@ public class DescribeStreamsGroupTest {
private static final String OUTPUT_TOPIC = "customOutputTopic"; private static final String OUTPUT_TOPIC = "customOutputTopic";
private static final String INPUT_TOPIC_2 = "customInputTopic2"; private static final String INPUT_TOPIC_2 = "customInputTopic2";
private static final String OUTPUT_TOPIC_2 = "customOutputTopic2"; private static final String OUTPUT_TOPIC_2 = "customOutputTopic2";
private static final String INPUT_TOPIC_3 = "customInputTopic3";
private static final String OUTPUT_TOPIC_3 = "customOutputTopic3";
private static String bootstrapServers; private static String bootstrapServers;
@BeforeAll @BeforeAll
public static void setup() throws Exception { public static void setup() throws Exception {
@ -92,6 +94,7 @@ public class DescribeStreamsGroupTest {
@AfterAll @AfterAll
public static void closeCluster() { public static void closeCluster() {
cluster.deleteTopics(INPUT_TOPIC, OUTPUT_TOPIC, INPUT_TOPIC_2, OUTPUT_TOPIC_2, INPUT_TOPIC_3, OUTPUT_TOPIC_3);
streams.close(); streams.close();
cluster.stop(); cluster.stop();
cluster = null; cluster = null;
@ -252,10 +255,17 @@ public class DescribeStreamsGroupTest {
} }
@Test @Test
public void testDescribeStreamsGroupWithShortTimeout() { public void testDescribeStreamsGroupWithShortTimeout() throws Exception {
cluster.createTopic(INPUT_TOPIC_2, 1, 1);
KafkaStreams streams2 = new KafkaStreams(topology(INPUT_TOPIC_2, OUTPUT_TOPIC_2), streamsProp(APP_ID_2));
startApplicationAndWaitUntilRunning(streams2);
List<String> args = List.of("--bootstrap-server", bootstrapServers, "--describe", "--members", "--verbose", "--group", APP_ID, "--timeout", "1"); List<String> args = List.of("--bootstrap-server", bootstrapServers, "--describe", "--members", "--verbose", "--group", APP_ID, "--timeout", "1");
Throwable e = assertThrows(ExecutionException.class, () -> getStreamsGroupService(args.toArray(new String[0])).describeGroups()); Throwable e = assertThrows(ExecutionException.class, () -> getStreamsGroupService(args.toArray(new String[0])).describeGroups());
assertEquals(TimeoutException.class, e.getCause().getClass()); assertEquals(TimeoutException.class, e.getCause().getClass());
streams2.close();
streams2.cleanUp();
} }
private static Topology topology(String inputTopic, String outputTopic) { private static Topology topology(String inputTopic, String outputTopic) {