diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java index de8933d1405..3cc350be196 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java @@ -56,6 +56,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; @@ -171,12 +172,14 @@ public class DescribeConsumerGroupTest { createTopic(topic); for (List describeType : DESCRIBE_TYPES) { String protocolGroup = GROUP_PREFIX + groupProtocol.name() + "." + String.join("", describeType); + List cgcArgs = new ArrayList<>(List.of("--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", protocolGroup)); + cgcArgs.addAll(describeType); try (AutoCloseable protocolConsumerGroupExecutor = consumerGroupClosable(groupProtocol, protocolGroup, topic, Collections.emptyMap()); - ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", protocolGroup}) + ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(cgcArgs.toArray(new String[0])) ) { TestUtils.waitForCondition(() -> { Entry res = ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service)); - return res.getKey().trim().split("\n").length == 2 && res.getValue().isEmpty(); + return res.getKey().trim().split("\n").length == 2 && res.getValue().isEmpty() && checkArgsOutput(cgcArgs, res.getKey().trim().split("\n")[0]); }, "Expected a data row and no error in describe results with describe type " + String.join(" ", describeType) + "."); } } @@ -210,7 +213,7 @@ public class DescribeConsumerGroupTest { TestUtils.waitForCondition(() -> { Entry res = ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service)); long numLines = Arrays.stream(res.getKey().trim().split("\n")).filter(line -> !line.isEmpty()).count(); - return (numLines == expectedNumLines) && res.getValue().isEmpty(); + return (numLines == expectedNumLines) && res.getValue().isEmpty() && checkArgsOutput(cgcArgs, res.getKey().trim().split("\n")[0]); }, "Expected a data row and no error in describe results with describe type " + String.join(" ", describeType) + "."); } } @@ -240,11 +243,13 @@ public class DescribeConsumerGroupTest { } int expectedNumLines = DESCRIBE_TYPES.size() * 2; for (List describeType : DESCRIBE_TYPES) { - try (ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--all-groups"})) { + List cgcArgs = new ArrayList<>(List.of("--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--all-groups")); + cgcArgs.addAll(describeType); + try (ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(cgcArgs.toArray(new String[0]))) { TestUtils.waitForCondition(() -> { Entry res = ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service)); long numLines = Arrays.stream(res.getKey().trim().split("\n")).filter(line -> !line.isEmpty()).count(); - return (numLines == expectedNumLines) && res.getValue().isEmpty(); + return (numLines == expectedNumLines) && res.getValue().isEmpty() && checkArgsOutput(cgcArgs, res.getKey().trim().split("\n")[0]); }, "Expected a data row and no error in describe results with describe type " + String.join(" ", describeType) + "."); } } @@ -396,13 +401,15 @@ public class DescribeConsumerGroupTest { for (List describeType : DESCRIBE_TYPES) { String group = GROUP_PREFIX + groupProtocol.name() + String.join("", describeType); + List cgcArgs = new ArrayList<>(List.of("--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group)); + cgcArgs.addAll(describeType); // run one consumer in the group consuming from a single-partition topic try (AutoCloseable protocolConsumerGroupExecutor = consumerGroupClosable(groupProtocol, group, topic, Collections.emptyMap()); - ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group}) + ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(cgcArgs.toArray(new String[0])) ) { TestUtils.waitForCondition(() -> { Entry res = ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service)); - return res.getKey().trim().split("\n").length == 2 && res.getValue().isEmpty(); + return res.getKey().trim().split("\n").length == 2 && res.getValue().isEmpty() && checkArgsOutput(cgcArgs, res.getKey().trim().split("\n")[0]); }, "Expected describe group results with one data row for describe type '" + String.join(" ", describeType) + "'"); protocolConsumerGroupExecutor.close(); @@ -529,7 +536,7 @@ public class DescribeConsumerGroupTest { TestUtils.waitForCondition(() -> { Entry res = ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service)); int expectedNumRows = DESCRIBE_TYPE_MEMBERS.contains(describeType) ? 3 : 2; - return res.getValue().isEmpty() && res.getKey().trim().split("\n").length == expectedNumRows; + return res.getValue().isEmpty() && res.getKey().trim().split("\n").length == expectedNumRows && checkArgsOutput(cgcArgs, res.getKey().trim().split("\n")[0]); }, "Expected a single data row in describe group result with describe type '" + String.join(" ", describeType) + "'"); } } @@ -627,7 +634,7 @@ public class DescribeConsumerGroupTest { TestUtils.waitForCondition(() -> { Entry res = ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service)); int expectedNumRows = DESCRIBE_TYPE_STATE.contains(describeType) ? 2 : 3; - return res.getValue().isEmpty() && res.getKey().trim().split("\n").length == expectedNumRows; + return res.getValue().isEmpty() && res.getKey().trim().split("\n").length == expectedNumRows && checkArgsOutput(cgcArgs, res.getKey().trim().split("\n")[0]); }, "Expected a single data row in describe group result with describe type '" + String.join(" ", describeType) + "'"); } } @@ -994,4 +1001,44 @@ public class DescribeConsumerGroupTest { private Runnable describeGroups(ConsumerGroupCommand.ConsumerGroupService service) { return () -> Assertions.assertDoesNotThrow(service::describeGroups); } + + private boolean checkArgsOutput(List args, String output) { + if (!output.contains("GROUP")) { + return false; + } + + if (args.contains("--members")) { + return checkMembersArgsOutput(output, args.contains("--verbose")); + } + + if (args.contains("--state")) { + return checkStateArgsOutput(output); + } + + // --offsets or no arguments + AtomicBoolean result = new AtomicBoolean(true); + List.of("TOPIC", "PARTITION", "CURRENT-OFFSET", "LOG-END-OFFSET", "LAG", "CONSUMER-ID", "HOST", "CLIENT-ID").forEach(key -> { + if (!output.contains(key)) { + result.set(false); + } + }); + return result.get(); + } + + private boolean checkMembersArgsOutput(String output, boolean verbose) { + AtomicBoolean result = new AtomicBoolean(true); + List expectedKeys = verbose ? + List.of("CONSUMER-ID", "HOST", "CLIENT-ID", "#PARTITIONS", "ASSIGNMENT") : + List.of("CONSUMER-ID", "HOST", "CLIENT-ID", "#PARTITIONS"); + expectedKeys.forEach(key -> { + if (!output.contains(key)) { + result.set(false); + } + }); + return result.get(); + } + + private boolean checkStateArgsOutput(String output) { + return output.contains("COORDINATOR (ID)") && output.contains("ASSIGNMENT-STRATEGY") && output.contains("STATE") && output.contains("#MEMBERS"); + } }