mirror of https://github.com/apache/kafka.git
KAFKA-17839 DescribeConsumerGroupTest doesn't really test different arguments (#17556)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
cfa2edc7a7
commit
54a83c5b9a
|
@ -56,6 +56,7 @@ import java.util.Objects;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import java.util.function.Predicate;
|
import java.util.function.Predicate;
|
||||||
|
@ -171,12 +172,14 @@ public class DescribeConsumerGroupTest {
|
||||||
createTopic(topic);
|
createTopic(topic);
|
||||||
for (List<String> describeType : DESCRIBE_TYPES) {
|
for (List<String> describeType : DESCRIBE_TYPES) {
|
||||||
String protocolGroup = GROUP_PREFIX + groupProtocol.name() + "." + String.join("", describeType);
|
String protocolGroup = GROUP_PREFIX + groupProtocol.name() + "." + String.join("", describeType);
|
||||||
|
List<String> cgcArgs = new ArrayList<>(List.of("--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", protocolGroup));
|
||||||
|
cgcArgs.addAll(describeType);
|
||||||
try (AutoCloseable protocolConsumerGroupExecutor = consumerGroupClosable(groupProtocol, protocolGroup, topic, Collections.emptyMap());
|
try (AutoCloseable protocolConsumerGroupExecutor = consumerGroupClosable(groupProtocol, protocolGroup, topic, Collections.emptyMap());
|
||||||
ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", protocolGroup})
|
ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(cgcArgs.toArray(new String[0]))
|
||||||
) {
|
) {
|
||||||
TestUtils.waitForCondition(() -> {
|
TestUtils.waitForCondition(() -> {
|
||||||
Entry<String, String> res = ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service));
|
Entry<String, String> res = ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service));
|
||||||
return res.getKey().trim().split("\n").length == 2 && res.getValue().isEmpty();
|
return res.getKey().trim().split("\n").length == 2 && res.getValue().isEmpty() && checkArgsOutput(cgcArgs, res.getKey().trim().split("\n")[0]);
|
||||||
}, "Expected a data row and no error in describe results with describe type " + String.join(" ", describeType) + ".");
|
}, "Expected a data row and no error in describe results with describe type " + String.join(" ", describeType) + ".");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -210,7 +213,7 @@ public class DescribeConsumerGroupTest {
|
||||||
TestUtils.waitForCondition(() -> {
|
TestUtils.waitForCondition(() -> {
|
||||||
Entry<String, String> res = ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service));
|
Entry<String, String> res = ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service));
|
||||||
long numLines = Arrays.stream(res.getKey().trim().split("\n")).filter(line -> !line.isEmpty()).count();
|
long numLines = Arrays.stream(res.getKey().trim().split("\n")).filter(line -> !line.isEmpty()).count();
|
||||||
return (numLines == expectedNumLines) && res.getValue().isEmpty();
|
return (numLines == expectedNumLines) && res.getValue().isEmpty() && checkArgsOutput(cgcArgs, res.getKey().trim().split("\n")[0]);
|
||||||
}, "Expected a data row and no error in describe results with describe type " + String.join(" ", describeType) + ".");
|
}, "Expected a data row and no error in describe results with describe type " + String.join(" ", describeType) + ".");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -240,11 +243,13 @@ public class DescribeConsumerGroupTest {
|
||||||
}
|
}
|
||||||
int expectedNumLines = DESCRIBE_TYPES.size() * 2;
|
int expectedNumLines = DESCRIBE_TYPES.size() * 2;
|
||||||
for (List<String> describeType : DESCRIBE_TYPES) {
|
for (List<String> describeType : DESCRIBE_TYPES) {
|
||||||
try (ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--all-groups"})) {
|
List<String> cgcArgs = new ArrayList<>(List.of("--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--all-groups"));
|
||||||
|
cgcArgs.addAll(describeType);
|
||||||
|
try (ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(cgcArgs.toArray(new String[0]))) {
|
||||||
TestUtils.waitForCondition(() -> {
|
TestUtils.waitForCondition(() -> {
|
||||||
Entry<String, String> res = ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service));
|
Entry<String, String> res = ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service));
|
||||||
long numLines = Arrays.stream(res.getKey().trim().split("\n")).filter(line -> !line.isEmpty()).count();
|
long numLines = Arrays.stream(res.getKey().trim().split("\n")).filter(line -> !line.isEmpty()).count();
|
||||||
return (numLines == expectedNumLines) && res.getValue().isEmpty();
|
return (numLines == expectedNumLines) && res.getValue().isEmpty() && checkArgsOutput(cgcArgs, res.getKey().trim().split("\n")[0]);
|
||||||
}, "Expected a data row and no error in describe results with describe type " + String.join(" ", describeType) + ".");
|
}, "Expected a data row and no error in describe results with describe type " + String.join(" ", describeType) + ".");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -396,13 +401,15 @@ public class DescribeConsumerGroupTest {
|
||||||
|
|
||||||
for (List<String> describeType : DESCRIBE_TYPES) {
|
for (List<String> describeType : DESCRIBE_TYPES) {
|
||||||
String group = GROUP_PREFIX + groupProtocol.name() + String.join("", describeType);
|
String group = GROUP_PREFIX + groupProtocol.name() + String.join("", describeType);
|
||||||
|
List<String> cgcArgs = new ArrayList<>(List.of("--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group));
|
||||||
|
cgcArgs.addAll(describeType);
|
||||||
// run one consumer in the group consuming from a single-partition topic
|
// run one consumer in the group consuming from a single-partition topic
|
||||||
try (AutoCloseable protocolConsumerGroupExecutor = consumerGroupClosable(groupProtocol, group, topic, Collections.emptyMap());
|
try (AutoCloseable protocolConsumerGroupExecutor = consumerGroupClosable(groupProtocol, group, topic, Collections.emptyMap());
|
||||||
ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group})
|
ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(cgcArgs.toArray(new String[0]))
|
||||||
) {
|
) {
|
||||||
TestUtils.waitForCondition(() -> {
|
TestUtils.waitForCondition(() -> {
|
||||||
Entry<String, String> res = ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service));
|
Entry<String, String> res = ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service));
|
||||||
return res.getKey().trim().split("\n").length == 2 && res.getValue().isEmpty();
|
return res.getKey().trim().split("\n").length == 2 && res.getValue().isEmpty() && checkArgsOutput(cgcArgs, res.getKey().trim().split("\n")[0]);
|
||||||
}, "Expected describe group results with one data row for describe type '" + String.join(" ", describeType) + "'");
|
}, "Expected describe group results with one data row for describe type '" + String.join(" ", describeType) + "'");
|
||||||
|
|
||||||
protocolConsumerGroupExecutor.close();
|
protocolConsumerGroupExecutor.close();
|
||||||
|
@ -529,7 +536,7 @@ public class DescribeConsumerGroupTest {
|
||||||
TestUtils.waitForCondition(() -> {
|
TestUtils.waitForCondition(() -> {
|
||||||
Entry<String, String> res = ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service));
|
Entry<String, String> res = ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service));
|
||||||
int expectedNumRows = DESCRIBE_TYPE_MEMBERS.contains(describeType) ? 3 : 2;
|
int expectedNumRows = DESCRIBE_TYPE_MEMBERS.contains(describeType) ? 3 : 2;
|
||||||
return res.getValue().isEmpty() && res.getKey().trim().split("\n").length == expectedNumRows;
|
return res.getValue().isEmpty() && res.getKey().trim().split("\n").length == expectedNumRows && checkArgsOutput(cgcArgs, res.getKey().trim().split("\n")[0]);
|
||||||
}, "Expected a single data row in describe group result with describe type '" + String.join(" ", describeType) + "'");
|
}, "Expected a single data row in describe group result with describe type '" + String.join(" ", describeType) + "'");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -627,7 +634,7 @@ public class DescribeConsumerGroupTest {
|
||||||
TestUtils.waitForCondition(() -> {
|
TestUtils.waitForCondition(() -> {
|
||||||
Entry<String, String> res = ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service));
|
Entry<String, String> res = ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service));
|
||||||
int expectedNumRows = DESCRIBE_TYPE_STATE.contains(describeType) ? 2 : 3;
|
int expectedNumRows = DESCRIBE_TYPE_STATE.contains(describeType) ? 2 : 3;
|
||||||
return res.getValue().isEmpty() && res.getKey().trim().split("\n").length == expectedNumRows;
|
return res.getValue().isEmpty() && res.getKey().trim().split("\n").length == expectedNumRows && checkArgsOutput(cgcArgs, res.getKey().trim().split("\n")[0]);
|
||||||
}, "Expected a single data row in describe group result with describe type '" + String.join(" ", describeType) + "'");
|
}, "Expected a single data row in describe group result with describe type '" + String.join(" ", describeType) + "'");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -994,4 +1001,44 @@ public class DescribeConsumerGroupTest {
|
||||||
private Runnable describeGroups(ConsumerGroupCommand.ConsumerGroupService service) {
|
private Runnable describeGroups(ConsumerGroupCommand.ConsumerGroupService service) {
|
||||||
return () -> Assertions.assertDoesNotThrow(service::describeGroups);
|
return () -> Assertions.assertDoesNotThrow(service::describeGroups);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private boolean checkArgsOutput(List<String> args, String output) {
|
||||||
|
if (!output.contains("GROUP")) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (args.contains("--members")) {
|
||||||
|
return checkMembersArgsOutput(output, args.contains("--verbose"));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (args.contains("--state")) {
|
||||||
|
return checkStateArgsOutput(output);
|
||||||
|
}
|
||||||
|
|
||||||
|
// --offsets or no arguments
|
||||||
|
AtomicBoolean result = new AtomicBoolean(true);
|
||||||
|
List.of("TOPIC", "PARTITION", "CURRENT-OFFSET", "LOG-END-OFFSET", "LAG", "CONSUMER-ID", "HOST", "CLIENT-ID").forEach(key -> {
|
||||||
|
if (!output.contains(key)) {
|
||||||
|
result.set(false);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
return result.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean checkMembersArgsOutput(String output, boolean verbose) {
|
||||||
|
AtomicBoolean result = new AtomicBoolean(true);
|
||||||
|
List<String> expectedKeys = verbose ?
|
||||||
|
List.of("CONSUMER-ID", "HOST", "CLIENT-ID", "#PARTITIONS", "ASSIGNMENT") :
|
||||||
|
List.of("CONSUMER-ID", "HOST", "CLIENT-ID", "#PARTITIONS");
|
||||||
|
expectedKeys.forEach(key -> {
|
||||||
|
if (!output.contains(key)) {
|
||||||
|
result.set(false);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
return result.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean checkStateArgsOutput(String output) {
|
||||||
|
return output.contains("COORDINATOR (ID)") && output.contains("ASSIGNMENT-STRATEGY") && output.contains("STATE") && output.contains("#MEMBERS");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue