This commit is contained in:
Alieh Saeedi 2025-10-07 10:36:05 -07:00 committed by GitHub
commit df024d5111
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 30 additions and 17 deletions

View File

@ -72,6 +72,9 @@ public class DescribeStreamsGroupTest {
private static final String OUTPUT_TOPIC = "customOutputTopic"; private static final String OUTPUT_TOPIC = "customOutputTopic";
private static final String INPUT_TOPIC_2 = "customInputTopic2"; private static final String INPUT_TOPIC_2 = "customInputTopic2";
private static final String OUTPUT_TOPIC_2 = "customOutputTopic2"; private static final String OUTPUT_TOPIC_2 = "customOutputTopic2";
private static final String INPUT_TOPIC_3 = "customInputTopic3";
private static final String OUTPUT_TOPIC_3 = "customOutputTopic3";
private static String bootstrapServers; private static String bootstrapServers;
@BeforeAll @BeforeAll
public static void setup() throws Exception { public static void setup() throws Exception {
@ -92,6 +95,7 @@ public class DescribeStreamsGroupTest {
@AfterAll @AfterAll
public static void closeCluster() { public static void closeCluster() {
cluster.deleteTopics(INPUT_TOPIC, OUTPUT_TOPIC, INPUT_TOPIC_2, OUTPUT_TOPIC_2, INPUT_TOPIC_3, OUTPUT_TOPIC_3);
streams.close(); streams.close();
cluster.stop(); cluster.stop();
cluster = null; cluster = null;
@ -207,8 +211,6 @@ public class DescribeStreamsGroupTest {
@Test @Test
public void testDescribeMultipleStreamsGroupWithMembersAndVerboseOptions() throws Exception { public void testDescribeMultipleStreamsGroupWithMembersAndVerboseOptions() throws Exception {
cluster.createTopic(INPUT_TOPIC_2, 1, 1); cluster.createTopic(INPUT_TOPIC_2, 1, 1);
KafkaStreams streams2 = new KafkaStreams(topology(INPUT_TOPIC_2, OUTPUT_TOPIC_2), streamsProp(APP_ID_2));
startApplicationAndWaitUntilRunning(streams2);
final List<String> expectedHeader = List.of("GROUP", "TARGET-ASSIGNMENT-EPOCH", "TOPOLOGY-EPOCH", "MEMBER", "MEMBER-PROTOCOL", "MEMBER-EPOCH", "PROCESS", "CLIENT-ID", "ASSIGNMENTS"); final List<String> expectedHeader = List.of("GROUP", "TARGET-ASSIGNMENT-EPOCH", "TOPOLOGY-EPOCH", "MEMBER", "MEMBER-PROTOCOL", "MEMBER-EPOCH", "PROCESS", "CLIENT-ID", "ASSIGNMENTS");
final Set<List<String>> expectedRows1 = Set.of( final Set<List<String>> expectedRows1 = Set.of(
@ -224,18 +226,22 @@ public class DescribeStreamsGroupTest {
// The member and process names as well as client-id are not deterministic, so we don't care about them. // The member and process names as well as client-id are not deterministic, so we don't care about them.
final List<Integer> dontCares = List.of(3, 6, 7); final List<Integer> dontCares = List.of(3, 6, 7);
validateDescribeOutput( try (KafkaStreams streams2 = new KafkaStreams(topology(INPUT_TOPIC_2, OUTPUT_TOPIC_2), streamsProp(APP_ID_2))) {
List.of("--bootstrap-server", bootstrapServers, "--describe", "--members", "--verbose", "--group", APP_ID, "--group", APP_ID_2), startApplicationAndWaitUntilRunning(streams2);
expectedHeader, expectedRowsMap, dontCares);
validateDescribeOutput(
List.of("--bootstrap-server", bootstrapServers, "--describe", "--verbose", "--members", "--group", APP_ID, "--group", APP_ID_2),
expectedHeader, expectedRowsMap, dontCares);
validateDescribeOutput(
List.of("--bootstrap-server", bootstrapServers, "--describe", "--verbose", "--members", "--all-groups"),
expectedHeader, expectedRowsMap, dontCares);
streams2.close(); validateDescribeOutput(
streams2.cleanUp(); List.of("--bootstrap-server", bootstrapServers, "--describe", "--members", "--verbose", "--group", APP_ID, "--group", APP_ID_2),
expectedHeader, expectedRowsMap, dontCares);
validateDescribeOutput(
List.of("--bootstrap-server", bootstrapServers, "--describe", "--verbose", "--members", "--group", APP_ID, "--group", APP_ID_2),
expectedHeader, expectedRowsMap, dontCares);
validateDescribeOutput(
List.of("--bootstrap-server", bootstrapServers, "--describe", "--verbose", "--members", "--all-groups"),
expectedHeader, expectedRowsMap, dontCares);
streams2.close();
streams2.cleanUp();
}
} }
@Test @Test
@ -252,10 +258,17 @@ public class DescribeStreamsGroupTest {
} }
@Test @Test
public void testDescribeStreamsGroupWithShortTimeout() { public void testDescribeStreamsGroupWithShortTimeout() throws Exception {
List<String> args = List.of("--bootstrap-server", bootstrapServers, "--describe", "--members", "--verbose", "--group", APP_ID, "--timeout", "1"); cluster.createTopic(INPUT_TOPIC_2, 1, 1);
Throwable e = assertThrows(ExecutionException.class, () -> getStreamsGroupService(args.toArray(new String[0])).describeGroups()); try (KafkaStreams streams2 = new KafkaStreams(topology(INPUT_TOPIC_2, OUTPUT_TOPIC_2), streamsProp(APP_ID_2))) {
assertEquals(TimeoutException.class, e.getCause().getClass()); startApplicationAndWaitUntilRunning(streams2);
List<String> args = List.of("--bootstrap-server", bootstrapServers, "--describe", "--members", "--verbose", "--group", APP_ID, "--timeout", "0");
Throwable e = assertThrows(ExecutionException.class, () -> getStreamsGroupService(args.toArray(new String[0])).describeGroups());
assertEquals(TimeoutException.class, e.getCause().getClass());
streams2.close();
streams2.cleanUp();
}
} }
private static Topology topology(String inputTopic, String outputTopic) { private static Topology topology(String inputTopic, String outputTopic) {