From 5f4806fd1c0eb0ef67885a5a7f12de282f494933 Mon Sep 17 00:00:00 2001 From: Nikolay Date: Thu, 7 Mar 2024 02:44:17 +0300 Subject: [PATCH] KAFKA-14589 [2/4] Tests of ConsoleGroupCommand rewritten in java (#15363) This PR is part of #14471 It contains some of ConsoleGroupCommand tests rewritten in java. Intention of separate PR is to reduce changes and simplify review. Reviewers: Chia-Ping Tsai --- .../admin/DescribeConsumerGroupTest.scala | 747 ---------------- .../group/DescribeConsumerGroupTest.java | 830 ++++++++++++++++++ 2 files changed, 830 insertions(+), 747 deletions(-) delete mode 100644 core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala create mode 100644 tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java diff --git a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala deleted file mode 100644 index e98404f496f..00000000000 --- a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala +++ /dev/null @@ -1,747 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.admin - -import java.util.Properties -import kafka.utils.{Exit, TestInfoUtils, TestUtils} -import org.apache.kafka.clients.consumer.{ConsumerConfig, RoundRobinAssignor} -import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.errors.TimeoutException -import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.{MethodSource, ValueSource} - -import scala.concurrent.ExecutionException -import scala.util.Random - -class DescribeConsumerGroupTest extends ConsumerGroupCommandTest { - private val describeTypeOffsets = Array(Array(""), Array("--offsets")) - private val describeTypeMembers = Array(Array("--members"), Array("--members", "--verbose")) - private val describeTypeState = Array(Array("--state")) - private val describeTypes = describeTypeOffsets ++ describeTypeMembers ++ describeTypeState - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) - def testDescribeNonExistingGroup(quorum: String, groupProtocol: String): Unit = { - createOffsetsTopic() - - val missingGroup = "missing.group" - - for (describeType <- describeTypes) { - // note the group to be queried is a different (non-existing) group - val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", missingGroup) ++ describeType - val service = getConsumerGroupService(cgcArgs) - - val output = TestUtils.grabConsoleOutput(service.describeGroups()) - assertTrue(output.contains(s"Consumer group '$missingGroup' does not exist."), - s"Expected error was not detected for describe option '${describeType.mkString(" ")}'") - } - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) - @ValueSource(strings = Array("zk", "kraft")) - def testDescribeWithMultipleSubActions(quorum: String): Unit = { - var exitStatus: Option[Int] = None - var exitMessage: Option[String] = None - Exit.setExitProcedure { (status, err) => - exitStatus = Some(status) - exitMessage = err - throw new RuntimeException - } - val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group, "--members", "--state") - try { - ConsumerGroupCommand.main(cgcArgs) - } catch { - case e: RuntimeException => //expected - } finally { - Exit.resetExitProcedure() - } - assertEquals(Some(1), exitStatus) - assertTrue(exitMessage.get.contains("Option [describe] takes at most one of these options")) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) - @ValueSource(strings = Array("zk", "kraft")) - def testDescribeWithStateValue(quorum: String): Unit = { - var exitStatus: Option[Int] = None - var exitMessage: Option[String] = None - Exit.setExitProcedure { (status, err) => - exitStatus = Some(status) - exitMessage = err - throw new RuntimeException - } - val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--all-groups", "--state", "Stable") - try { - ConsumerGroupCommand.main(cgcArgs) - } catch { - case e: RuntimeException => //expected - } finally { - Exit.resetExitProcedure() - } - assertEquals(Some(1), exitStatus) - assertTrue(exitMessage.get.contains("Option [describe] does not take a value for [state]")) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) - def testDescribeOffsetsOfNonExistingGroup(quorum: String, groupProtocol: String): Unit = { - val group = "missing.group" - createOffsetsTopic() - - // run one consumer in the group consuming from a single-partition topic - addConsumerGroupExecutor(numConsumers = 1, groupProtocol = groupProtocol) - // note the group to be queried is a different (non-existing) group - val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group) - val service = getConsumerGroupService(cgcArgs) - - val (state, assignments) = service.collectGroupOffsets(group) - assertTrue(state.contains("Dead") && assignments.contains(List()), - s"Expected the state to be 'Dead', with no members in the group '$group'.") - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) - def testDescribeMembersOfNonExistingGroup(quorum: String, groupProtocol: String): Unit = { - val group = "missing.group" - createOffsetsTopic() - - // run one consumer in the group consuming from a single-partition topic - addConsumerGroupExecutor(numConsumers = 1, groupProtocol = groupProtocol) - // note the group to be queried is a different (non-existing) group - val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group) - val service = getConsumerGroupService(cgcArgs) - - val (state, assignments) = service.collectGroupMembers(group, false) - assertTrue(state.contains("Dead") && assignments.contains(List()), - s"Expected the state to be 'Dead', with no members in the group '$group'.") - - val (state2, assignments2) = service.collectGroupMembers(group, true) - assertTrue(state2.contains("Dead") && assignments2.contains(List()), - s"Expected the state to be 'Dead', with no members in the group '$group' (verbose option).") - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) - def testDescribeStateOfNonExistingGroup(quorum: String, groupProtocol: String): Unit = { - val group = "missing.group" - createOffsetsTopic() - - // run one consumer in the group consuming from a single-partition topic - addConsumerGroupExecutor(numConsumers = 1, groupProtocol = groupProtocol) - // note the group to be queried is a different (non-existing) group - val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group) - val service = getConsumerGroupService(cgcArgs) - - val state = service.collectGroupState(group) - assertTrue(state.state == "Dead" && state.numMembers == 0 && - state.coordinator != null && brokers.map(_.config.brokerId).toList.contains(state.coordinator.id), - s"Expected the state to be 'Dead', with no members in the group '$group'." - ) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) - def testDescribeExistingGroup(quorum: String, groupProtocol: String): Unit = { - createOffsetsTopic() - - for (describeType <- describeTypes) { - val group = this.group + describeType.mkString("") - // run one consumer in the group consuming from a single-partition topic - addConsumerGroupExecutor(numConsumers = 1, group = group, groupProtocol = groupProtocol) - val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group) ++ describeType - val service = getConsumerGroupService(cgcArgs) - - TestUtils.waitUntilTrue(() => { - val (output, error) = TestUtils.grabConsoleOutputAndError(service.describeGroups()) - output.trim.split("\n").length == 2 && error.isEmpty - }, s"Expected a data row and no error in describe results with describe type ${describeType.mkString(" ")}.") - } - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) - def testDescribeExistingGroups(quorum: String, groupProtocol: String): Unit = { - createOffsetsTopic() - - // Create N single-threaded consumer groups from a single-partition topic - val groups = (for (describeType <- describeTypes) yield { - val group = this.group + describeType.mkString("") - addConsumerGroupExecutor(numConsumers = 1, group = group, groupProtocol = groupProtocol) - Array("--group", group) - }).flatten - - val expectedNumLines = describeTypes.length * 2 - - for (describeType <- describeTypes) { - val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe") ++ groups ++ describeType - val service = getConsumerGroupService(cgcArgs) - - TestUtils.waitUntilTrue(() => { - val (output, error) = TestUtils.grabConsoleOutputAndError(service.describeGroups()) - val numLines = output.trim.split("\n").count(line => line.nonEmpty) - (numLines == expectedNumLines) && error.isEmpty - }, s"Expected a data row and no error in describe results with describe type ${describeType.mkString(" ")}.") - } - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) - def testDescribeAllExistingGroups(quorum: String, groupProtocol: String): Unit = { - createOffsetsTopic() - - // Create N single-threaded consumer groups from a single-partition topic - for (describeType <- describeTypes) { - val group = this.group + describeType.mkString("") - addConsumerGroupExecutor(numConsumers = 1, group = group, groupProtocol = groupProtocol) - } - - val expectedNumLines = describeTypes.length * 2 - - for (describeType <- describeTypes) { - val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--all-groups") ++ describeType - val service = getConsumerGroupService(cgcArgs) - - TestUtils.waitUntilTrue(() => { - val (output, error) = TestUtils.grabConsoleOutputAndError(service.describeGroups()) - val numLines = output.trim.split("\n").count(line => line.nonEmpty) - (numLines == expectedNumLines) && error.isEmpty - }, s"Expected a data row and no error in describe results with describe type ${describeType.mkString(" ")}.") - } - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) - def testDescribeOffsetsOfExistingGroup(quorum: String, groupProtocol: String): Unit = { - createOffsetsTopic() - - // run one consumer in the group consuming from a single-partition topic - addConsumerGroupExecutor(numConsumers = 1, groupProtocol = groupProtocol) - - val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group) - val service = getConsumerGroupService(cgcArgs) - - TestUtils.waitUntilTrue(() => { - val (state, assignments) = service.collectGroupOffsets(group) - state.contains("Stable") && - assignments.isDefined && - assignments.get.count(_.group == group) == 1 && - assignments.get.filter(_.group == group).head.consumerId.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE) && - assignments.get.filter(_.group == group).head.clientId.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE) && - assignments.get.filter(_.group == group).head.host.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE) - }, s"Expected a 'Stable' group status, rows and valid values for consumer id / client id / host columns in describe results for group $group.") - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) - def testDescribeMembersOfExistingGroup(quorum: String, groupProtocol: String): Unit = { - createOffsetsTopic() - - // run one consumer in the group consuming from a single-partition topic - addConsumerGroupExecutor(numConsumers = 1, groupProtocol = groupProtocol) - val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group) - val service = getConsumerGroupService(cgcArgs) - - TestUtils.waitUntilTrue(() => { - val (state, assignments) = service.collectGroupMembers(group, false) - state.contains("Stable") && - (assignments match { - case Some(memberAssignments) => - memberAssignments.count(_.group == group) == 1 && - memberAssignments.filter(_.group == group).head.consumerId != ConsumerGroupCommand.MISSING_COLUMN_VALUE && - memberAssignments.filter(_.group == group).head.clientId != ConsumerGroupCommand.MISSING_COLUMN_VALUE && - memberAssignments.filter(_.group == group).head.host != ConsumerGroupCommand.MISSING_COLUMN_VALUE - case None => - false - }) - }, s"Expected a 'Stable' group status, rows and valid member information for group $group.") - - val (_, assignments) = service.collectGroupMembers(group, true) - assignments match { - case None => - fail(s"Expected partition assignments for members of group $group") - case Some(memberAssignments) => - assertTrue(memberAssignments.size == 1 && memberAssignments.head.assignment.size == 1, - s"Expected a topic partition assigned to the single group member for group $group") - } - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) - def testDescribeStateOfExistingGroup(quorum: String, groupProtocol: String): Unit = { - createOffsetsTopic() - - // run one consumer in the group consuming from a single-partition topic - addConsumerGroupExecutor( - numConsumers = 1, - groupProtocol = groupProtocol, - // This is only effective when new protocol is used. - remoteAssignor = Some("range") - ) - val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group) - val service = getConsumerGroupService(cgcArgs) - - TestUtils.waitUntilTrue(() => { - val state = service.collectGroupState(group) - state.state == "Stable" && - state.numMembers == 1 && - state.assignmentStrategy == "range" && - state.coordinator != null && - brokers.map(_.config.brokerId).toList.contains(state.coordinator.id) - }, s"Expected a 'Stable' group status, with one member and round robin assignment strategy for group $group.") - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) - def testDescribeStateOfExistingGroupWithNonDefaultAssignor(quorum: String, groupProtocol: String): Unit = { - createOffsetsTopic() - - // run one consumer in the group consuming from a single-partition topic - val expectedName = if (groupProtocol == "consumer") { - addConsumerGroupExecutor(numConsumers = 1, remoteAssignor = Some("range"), groupProtocol = groupProtocol) - "range" - } else { - addConsumerGroupExecutor(numConsumers = 1, strategy = classOf[RoundRobinAssignor].getName, groupProtocol = groupProtocol) - "roundrobin" - } - - - val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group) - val service = getConsumerGroupService(cgcArgs) - - TestUtils.waitUntilTrue(() => { - val state = service.collectGroupState(group) - state.state == "Stable" && - state.numMembers == 1 && - state.assignmentStrategy == expectedName && - state.coordinator != null && - brokers.map(_.config.brokerId).toList.contains(state.coordinator.id) - }, s"Expected a 'Stable' group status, with one member and $expectedName assignment strategy for group $group.") - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) - def testDescribeExistingGroupWithNoMembers(quorum: String, groupProtocol: String): Unit = { - createOffsetsTopic() - - for (describeType <- describeTypes) { - val group = this.group + describeType.mkString("") - // run one consumer in the group consuming from a single-partition topic - val executor = addConsumerGroupExecutor(numConsumers = 1, group = group, groupProtocol = groupProtocol) - val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group) ++ describeType - val service = getConsumerGroupService(cgcArgs) - - TestUtils.waitUntilTrue(() => { - val (output, error) = TestUtils.grabConsoleOutputAndError(service.describeGroups()) - output.trim.split("\n").length == 2 && error.isEmpty - }, s"Expected describe group results with one data row for describe type '${describeType.mkString(" ")}'") - - // stop the consumer so the group has no active member anymore - executor.shutdown() - TestUtils.waitUntilTrue(() => { - TestUtils.grabConsoleError(service.describeGroups()).contains(s"Consumer group '$group' has no active members.") - }, s"Expected no active member in describe group results with describe type ${describeType.mkString(" ")}") - } - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) - def testDescribeOffsetsOfExistingGroupWithNoMembers(quorum: String, groupProtocol: String): Unit = { - createOffsetsTopic() - - // run one consumer in the group consuming from a single-partition topic - val executor = addConsumerGroupExecutor(numConsumers = 1, groupProtocol = groupProtocol, syncCommit = true) - - val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group) - val service = getConsumerGroupService(cgcArgs) - - TestUtils.waitUntilTrue(() => { - val (state, assignments) = service.collectGroupOffsets(group) - state.contains("Stable") && assignments.exists(_.exists(assignment => assignment.group == group && assignment.offset.isDefined)) - }, "Expected the group to initially become stable, and to find group in assignments after initial offset commit.") - - // stop the consumer so the group has no active member anymore - executor.shutdown() - - val (result, succeeded) = TestUtils.computeUntilTrue(service.collectGroupOffsets(group)) { - case (state, assignments) => - val testGroupAssignments = assignments.toSeq.flatMap(_.filter(_.group == group)) - def assignment = testGroupAssignments.head - state.contains("Empty") && - testGroupAssignments.size == 1 && - assignment.consumerId.exists(_.trim == ConsumerGroupCommand.MISSING_COLUMN_VALUE) && // the member should be gone - assignment.clientId.exists(_.trim == ConsumerGroupCommand.MISSING_COLUMN_VALUE) && - assignment.host.exists(_.trim == ConsumerGroupCommand.MISSING_COLUMN_VALUE) - } - val (state, assignments) = result - assertTrue(succeeded, s"Expected no active member in describe group results, state: $state, assignments: $assignments") - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) - def testDescribeMembersOfExistingGroupWithNoMembers(quorum: String, groupProtocol: String): Unit = { - createOffsetsTopic() - - // run one consumer in the group consuming from a single-partition topic - val executor = addConsumerGroupExecutor(numConsumers = 1, groupProtocol = groupProtocol) - - val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group) - val service = getConsumerGroupService(cgcArgs) - - TestUtils.waitUntilTrue(() => { - val (state, assignments) = service.collectGroupMembers(group, false) - state.contains("Stable") && assignments.exists(_.exists(_.group == group)) - }, "Expected the group to initially become stable, and to find group in assignments after initial offset commit.") - - // stop the consumer so the group has no active member anymore - executor.shutdown() - - TestUtils.waitUntilTrue(() => { - val (state, assignments) = service.collectGroupMembers(group, false) - state.contains("Empty") && assignments.isDefined && assignments.get.isEmpty - }, s"Expected no member in describe group members results for group '$group'") - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) - def testDescribeStateOfExistingGroupWithNoMembers(quorum: String, groupProtocol: String): Unit = { - createOffsetsTopic() - - // run one consumer in the group consuming from a single-partition topic - val executor = addConsumerGroupExecutor(numConsumers = 1, groupProtocol = groupProtocol) - - val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group) - val service = getConsumerGroupService(cgcArgs) - - TestUtils.waitUntilTrue(() => { - val state = service.collectGroupState(group) - state.state == "Stable" && - state.numMembers == 1 && - state.coordinator != null && - brokers.map(_.config.brokerId).toList.contains(state.coordinator.id) - }, s"Expected the group '$group' to initially become stable, and have a single member.") - - // stop the consumer so the group has no active member anymore - executor.shutdown() - - TestUtils.waitUntilTrue(() => { - val state = service.collectGroupState(group) - state.state == "Empty" && state.numMembers == 0 - }, s"Expected the group '$group' to become empty after the only member leaving.") - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) - def testDescribeWithConsumersWithoutAssignedPartitions(quorum: String, groupProtocol: String): Unit = { - createOffsetsTopic() - - for (describeType <- describeTypes) { - val group = this.group + describeType.mkString("") - // run two consumers in the group consuming from a single-partition topic - addConsumerGroupExecutor(numConsumers = 2, group = group, groupProtocol = groupProtocol) - val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group) ++ describeType - val service = getConsumerGroupService(cgcArgs) - - TestUtils.waitUntilTrue(() => { - val (output, error) = TestUtils.grabConsoleOutputAndError(service.describeGroups()) - val expectedNumRows = if (describeTypeMembers.contains(describeType)) 3 else 2 - error.isEmpty && output.trim.split("\n").size == expectedNumRows - }, s"Expected a single data row in describe group result with describe type '${describeType.mkString(" ")}'") - } - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) - def testDescribeOffsetsWithConsumersWithoutAssignedPartitions(quorum: String, groupProtocol: String): Unit = { - createOffsetsTopic() - - // run two consumers in the group consuming from a single-partition topic - addConsumerGroupExecutor(numConsumers = 2, groupProtocol = groupProtocol) - - val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group) - val service = getConsumerGroupService(cgcArgs) - - TestUtils.waitUntilTrue(() => { - val (state, assignments) = service.collectGroupOffsets(group) - state.contains("Stable") && - assignments.isDefined && - assignments.get.count(_.group == group) == 1 && - assignments.get.count { x => x.group == group && x.partition.isDefined } == 1 - }, "Expected rows for consumers with no assigned partitions in describe group results") - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) - def testDescribeMembersWithConsumersWithoutAssignedPartitions(quorum: String, groupProtocol: String): Unit = { - createOffsetsTopic() - - // run two consumers in the group consuming from a single-partition topic - addConsumerGroupExecutor(numConsumers = 2, groupProtocol = groupProtocol) - - val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group) - val service = getConsumerGroupService(cgcArgs) - - TestUtils.waitUntilTrue(() => { - val (state, assignments) = service.collectGroupMembers(group, false) - state.contains("Stable") && - assignments.isDefined && - assignments.get.count(_.group == group) == 2 && - assignments.get.count { x => x.group == group && x.numPartitions == 1 } == 1 && - assignments.get.count { x => x.group == group && x.numPartitions == 0 } == 1 && - !assignments.get.exists(_.assignment.nonEmpty) - }, "Expected rows for consumers with no assigned partitions in describe group results") - - val (state, assignments) = service.collectGroupMembers(group, true) - assertTrue(state.contains("Stable") && assignments.get.count(_.assignment.nonEmpty) > 0, - "Expected additional columns in verbose version of describe members") - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) - def testDescribeStateWithConsumersWithoutAssignedPartitions(quorum: String, groupProtocol: String): Unit = { - createOffsetsTopic() - - // run two consumers in the group consuming from a single-partition topic - addConsumerGroupExecutor(numConsumers = 2, groupProtocol = groupProtocol) - - val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group) - val service = getConsumerGroupService(cgcArgs) - - TestUtils.waitUntilTrue(() => { - val state = service.collectGroupState(group) - state.state == "Stable" && state.numMembers == 2 - }, "Expected two consumers in describe group results") - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) - def testDescribeWithMultiPartitionTopicAndMultipleConsumers(quorum: String, groupProtocol: String): Unit = { - createOffsetsTopic() - - val topic2 = "foo2" - createTopic(topic2, 2, 1) - - for (describeType <- describeTypes) { - val group = this.group + describeType.mkString("") - // run two consumers in the group consuming from a two-partition topic - addConsumerGroupExecutor(2, topic2, group = group, groupProtocol = groupProtocol) - val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group) ++ describeType - val service = getConsumerGroupService(cgcArgs) - - TestUtils.waitUntilTrue(() => { - val (output, error) = TestUtils.grabConsoleOutputAndError(service.describeGroups()) - val expectedNumRows = if (describeTypeState.contains(describeType)) 2 else 3 - error.isEmpty && output.trim.split("\n").size == expectedNumRows - }, s"Expected a single data row in describe group result with describe type '${describeType.mkString(" ")}'") - } - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) - def testDescribeOffsetsWithMultiPartitionTopicAndMultipleConsumers(quorum: String, groupProtocol: String): Unit = { - createOffsetsTopic() - - val topic2 = "foo2" - createTopic(topic2, 2, 1) - - // run two consumers in the group consuming from a two-partition topic - addConsumerGroupExecutor(numConsumers = 2, topic2, groupProtocol = groupProtocol) - - val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group) - val service = getConsumerGroupService(cgcArgs) - - TestUtils.waitUntilTrue(() => { - val (state, assignments) = service.collectGroupOffsets(group) - state.contains("Stable") && - assignments.isDefined && - assignments.get.count(_.group == group) == 2 && - assignments.get.count { x => x.group == group && x.partition.isDefined } == 2 && - assignments.get.count { x => x.group == group && x.partition.isEmpty } == 0 - }, "Expected two rows (one row per consumer) in describe group results.") - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) - def testDescribeMembersWithMultiPartitionTopicAndMultipleConsumers(quorum: String, groupProtocol: String): Unit = { - createOffsetsTopic() - - val topic2 = "foo2" - createTopic(topic2, 2, 1) - - // run two consumers in the group consuming from a two-partition topic - addConsumerGroupExecutor(numConsumers = 2, topic2, groupProtocol = groupProtocol) - - val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group) - val service = getConsumerGroupService(cgcArgs) - - TestUtils.waitUntilTrue(() => { - val (state, assignments) = service.collectGroupMembers(group, false) - state.contains("Stable") && - assignments.isDefined && - assignments.get.count(_.group == group) == 2 && - assignments.get.count { x => x.group == group && x.numPartitions == 1 } == 2 && - assignments.get.count { x => x.group == group && x.numPartitions == 0 } == 0 - }, "Expected two rows (one row per consumer) in describe group members results.") - - val (state, assignments) = service.collectGroupMembers(group, true) - assertTrue(state.contains("Stable") && assignments.get.count(_.assignment.isEmpty) == 0, - "Expected additional columns in verbose version of describe members") - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) - def testDescribeStateWithMultiPartitionTopicAndMultipleConsumers(quorum: String, groupProtocol: String): Unit = { - createOffsetsTopic() - - val topic2 = "foo2" - createTopic(topic2, 2, 1) - - // run two consumers in the group consuming from a two-partition topic - addConsumerGroupExecutor(numConsumers = 2, topic2, groupProtocol = groupProtocol) - - val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group) - val service = getConsumerGroupService(cgcArgs) - - TestUtils.waitUntilTrue(() => { - val state = service.collectGroupState(group) - state.state == "Stable" && state.group == group && state.numMembers == 2 - }, "Expected a stable group with two members in describe group state result.") - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) - @ValueSource(strings = Array("zk", "kraft", "kraft+kip848")) - def testDescribeSimpleConsumerGroup(quorum: String): Unit = { - // Ensure that the offsets of consumers which don't use group management are still displayed - - createOffsetsTopic() - val topic2 = "foo2" - createTopic(topic2, 2, 1) - addSimpleGroupExecutor(Seq(new TopicPartition(topic2, 0), new TopicPartition(topic2, 1))) - - val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group) - val service = getConsumerGroupService(cgcArgs) - - TestUtils.waitUntilTrue(() => { - val (state, assignments) = service.collectGroupOffsets(group) - state.contains("Empty") && assignments.isDefined && assignments.get.count(_.group == group) == 2 - }, "Expected a stable group with two members in describe group state result.") - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) - def testDescribeGroupWithShortInitializationTimeout(quorum: String, groupProtocol: String): Unit = { - // Let creation of the offsets topic happen during group initialization to ensure that initialization doesn't - // complete before the timeout expires - - val describeType = describeTypes(Random.nextInt(describeTypes.length)) - val group = this.group + describeType.mkString("") - // run one consumer in the group consuming from a single-partition topic - addConsumerGroupExecutor(numConsumers = 1, groupProtocol = groupProtocol) - // set the group initialization timeout too low for the group to stabilize - val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--timeout", "1", "--group", group) ++ describeType - val service = getConsumerGroupService(cgcArgs) - - val e = assertThrows(classOf[ExecutionException], () => TestUtils.grabConsoleOutputAndError(service.describeGroups())) - assertEquals(classOf[TimeoutException], e.getCause.getClass) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) - def testDescribeGroupOffsetsWithShortInitializationTimeout(quorum: String, groupProtocol: String): Unit = { - // Let creation of the offsets topic happen during group initialization to ensure that initialization doesn't - // complete before the timeout expires - - // run one consumer in the group consuming from a single-partition topic - addConsumerGroupExecutor(numConsumers = 1, groupProtocol = groupProtocol) - - // set the group initialization timeout too low for the group to stabilize - val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group, "--timeout", "1") - val service = getConsumerGroupService(cgcArgs) - - val e = assertThrows(classOf[ExecutionException], () => service.collectGroupOffsets(group)) - assertEquals(classOf[TimeoutException], e.getCause.getClass) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) - def testDescribeGroupMembersWithShortInitializationTimeout(quorum: String, groupProtocol: String): Unit = { - // Let creation of the offsets topic happen during group initialization to ensure that initialization doesn't - // complete before the timeout expires - - // run one consumer in the group consuming from a single-partition topic - addConsumerGroupExecutor(numConsumers = 1, groupProtocol = groupProtocol) - - // set the group initialization timeout too low for the group to stabilize - val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group, "--timeout", "1") - val service = getConsumerGroupService(cgcArgs) - - var e = assertThrows(classOf[ExecutionException], () => service.collectGroupMembers(group, false)) - assertEquals(classOf[TimeoutException], e.getCause.getClass) - e = assertThrows(classOf[ExecutionException], () => service.collectGroupMembers(group, true)) - assertEquals(classOf[TimeoutException], e.getCause.getClass) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) - def testDescribeGroupStateWithShortInitializationTimeout(quorum: String, groupProtocol: String): Unit = { - // Let creation of the offsets topic happen during group initialization to ensure that initialization doesn't - // complete before the timeout expires - - // run one consumer in the group consuming from a single-partition topic - addConsumerGroupExecutor(numConsumers = 1, groupProtocol = groupProtocol) - - // set the group initialization timeout too low for the group to stabilize - val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group, "--timeout", "1") - val service = getConsumerGroupService(cgcArgs) - - val e = assertThrows(classOf[ExecutionException], () => service.collectGroupState(group)) - assertEquals(classOf[TimeoutException], e.getCause.getClass) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) - @ValueSource(strings = Array("zk", "kraft")) - def testDescribeWithUnrecognizedNewConsumerOption(quorum: String): Unit = { - val cgcArgs = Array("--new-consumer", "--bootstrap-server", bootstrapServers(), "--describe", "--group", group) - assertThrows(classOf[joptsimple.OptionException], () => getConsumerGroupService(cgcArgs)) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) - def testDescribeNonOffsetCommitGroup(quorum: String, groupProtocol: String): Unit = { - createOffsetsTopic() - - val customProps = new Properties - // create a consumer group that never commits offsets - customProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") - // run one consumer in the group consuming from a single-partition topic - addConsumerGroupExecutor(numConsumers = 1, customPropsOpt = Some(customProps), groupProtocol = groupProtocol) - - val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group) - val service = getConsumerGroupService(cgcArgs) - - TestUtils.waitUntilTrue(() => { - val (state, assignments) = service.collectGroupOffsets(group) - state.contains("Stable") && - assignments.isDefined && - assignments.get.count(_.group == group) == 1 && - assignments.get.filter(_.group == group).head.consumerId.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE) && - assignments.get.filter(_.group == group).head.clientId.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE) && - assignments.get.filter(_.group == group).head.host.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE) - }, s"Expected a 'Stable' group status, rows and valid values for consumer id / client id / host columns in describe results for non-offset-committing group $group.") - } - -} - diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java new file mode 100644 index 00000000000..f0277d18cd6 --- /dev/null +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java @@ -0,0 +1,830 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.consumer.group; + +import kafka.admin.ConsumerGroupCommand; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.RangeAssignor; +import org.apache.kafka.clients.consumer.RoundRobinAssignor; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; +import scala.Function0; +import scala.Function1; +import scala.Option; +import scala.collection.Seq; +import scala.runtime.BoxedUnit; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.kafka.test.TestUtils.RANDOM; +import static org.apache.kafka.tools.ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES; +import static org.apache.kafka.tools.ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +public class DescribeConsumerGroupTest extends ConsumerGroupCommandTest { + private static final List> DESCRIBE_TYPE_OFFSETS = Arrays.asList(Collections.singletonList(""), Collections.singletonList("--offsets")); + private static final List> DESCRIBE_TYPE_MEMBERS = Arrays.asList(Collections.singletonList("--members"), Arrays.asList("--members", "--verbose")); + private static final List> DESCRIBE_TYPE_STATE = Collections.singletonList(Collections.singletonList("--state")); + private static final List> DESCRIBE_TYPES; + + static { + List> describeTypes = new ArrayList<>(); + + describeTypes.addAll(DESCRIBE_TYPE_OFFSETS); + describeTypes.addAll(DESCRIBE_TYPE_MEMBERS); + describeTypes.addAll(DESCRIBE_TYPE_STATE); + + DESCRIBE_TYPES = describeTypes; + } + + @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) + @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"}) + public void testDescribeNonExistingGroup(String quorum, String groupProtocol) { + createOffsetsTopic(listenerName(), new Properties()); + String missingGroup = "missing.group"; + + for (List describeType : DESCRIBE_TYPES) { + // note the group to be queried is a different (non-existing) group + List cgcArgs = new ArrayList<>(Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", missingGroup)); + cgcArgs.addAll(describeType); + ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs.toArray(new String[0])); + + String output = kafka.utils.TestUtils.grabConsoleOutput(describeGroups(service)); + assertTrue(output.contains("Consumer group '" + missingGroup + "' does not exist."), + "Expected error was not detected for describe option '" + String.join(" ", describeType) + "'"); + } + } + + @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME) + @ValueSource(strings = {"zk", "kraft"}) + public void testDescribeWithMultipleSubActions(String quorum) { + AtomicInteger exitStatus = new AtomicInteger(0); + AtomicReference exitMessage = new AtomicReference<>(""); + Exit.setExitProcedure((status, err) -> { + exitStatus.set(status); + exitMessage.set(err); + throw new RuntimeException(); + }); + String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", GROUP, "--members", "--state"}; + try { + assertThrows(RuntimeException.class, () -> ConsumerGroupCommand.main(cgcArgs)); + } finally { + Exit.resetExitProcedure(); + } + assertEquals(1, exitStatus.get()); + assertTrue(exitMessage.get().contains("Option [describe] takes at most one of these options")); + } + + @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME) + @ValueSource(strings = {"zk", "kraft"}) + public void testDescribeWithStateValue(String quorum) { + AtomicInteger exitStatus = new AtomicInteger(0); + AtomicReference exitMessage = new AtomicReference<>(""); + Exit.setExitProcedure((status, err) -> { + exitStatus.set(status); + exitMessage.set(err); + throw new RuntimeException(); + }); + String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--all-groups", "--state", "Stable"}; + try { + assertThrows(RuntimeException.class, () -> ConsumerGroupCommand.main(cgcArgs)); + } finally { + Exit.resetExitProcedure(); + } + assertEquals(1, exitStatus.get()); + assertTrue(exitMessage.get().contains("Option [describe] does not take a value for [state]")); + } + + @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) + @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"}) + public void testDescribeOffsetsOfNonExistingGroup(String quorum, String groupProtocol) { + String group = "missing.group"; + createOffsetsTopic(listenerName(), new Properties()); + + // run one consumer in the group consuming from a single-partition topic + addConsumerGroupExecutor(1, groupProtocol); + // note the group to be queried is a different (non-existing) group + String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", group}; + ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); + + scala.Tuple2, Option>> res = service.collectGroupOffsets(group); + assertTrue(res._1.map(s -> s.contains("Dead")).getOrElse(() -> false) && res._2.map(Seq::isEmpty).getOrElse(() -> false), + "Expected the state to be 'Dead', with no members in the group '" + group + "'."); + } + + @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) + @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"}) + public void testDescribeMembersOfNonExistingGroup(String quorum, String groupProtocol) { + String group = "missing.group"; + createOffsetsTopic(listenerName(), new Properties()); + + // run one consumer in the group consuming from a single-partition topic + addConsumerGroupExecutor(1, groupProtocol); + // note the group to be queried is a different (non-existing) group + String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", group}; + ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); + + scala.Tuple2, Option>> res = service.collectGroupMembers(group, false); + assertTrue(res._1.map(s -> s.contains("Dead")).getOrElse(() -> false) && res._2.map(Seq::isEmpty).getOrElse(() -> false), + "Expected the state to be 'Dead', with no members in the group '" + group + "'."); + + scala.Tuple2, Option>> res2 = service.collectGroupMembers(group, true); + assertTrue(res2._1.map(s -> s.contains("Dead")).getOrElse(() -> false) && res2._2.map(Seq::isEmpty).getOrElse(() -> false), + "Expected the state to be 'Dead', with no members in the group '" + group + "' (verbose option)."); + } + + @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) + @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"}) + public void testDescribeStateOfNonExistingGroup(String quorum, String groupProtocol) { + String group = "missing.group"; + createOffsetsTopic(listenerName(), new Properties()); + + // run one consumer in the group consuming from a single-partition topic + addConsumerGroupExecutor(1, groupProtocol); + // note the group to be queried is a different (non-existing) group + String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", group}; + ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); + + ConsumerGroupCommand.GroupState state = service.collectGroupState(group); + assertTrue(Objects.equals(state.state(), "Dead") && state.numMembers() == 0 && + state.coordinator() != null && !brokers().filter(s -> s.config().brokerId() == state.coordinator().id()).isEmpty(), + "Expected the state to be 'Dead', with no members in the group '" + group + "'." + ); + } + + @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) + @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"}) + public void testDescribeExistingGroup(String quorum, String groupProtocol) throws Exception { + createOffsetsTopic(listenerName(), new Properties()); + + for (List describeType : DESCRIBE_TYPES) { + String group = GROUP + String.join("", describeType); + // run one consumer in the group consuming from a single-partition topic + addConsumerGroupExecutor(1, TOPIC, group, groupProtocol); + List cgcArgs = new ArrayList<>(Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", group)); + cgcArgs.addAll(describeType); + ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs.toArray(new String[0])); + + TestUtils.waitForCondition(() -> { + scala.Tuple2 res = kafka.utils.TestUtils.grabConsoleOutputAndError(describeGroups(service)); + return res._1.trim().split("\n").length == 2 && res._2.isEmpty(); + }, "Expected a data row and no error in describe results with describe type " + String.join(" ", describeType) + "."); + } + } + + @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) + @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"}) + public void testDescribeExistingGroups(String quorum, String groupProtocol) throws Exception { + createOffsetsTopic(listenerName(), new Properties()); + + // Create N single-threaded consumer groups from a single-partition topic + List groups = new ArrayList<>(); + + for (List describeType : DESCRIBE_TYPES) { + String group = GROUP + String.join("", describeType); + addConsumerGroupExecutor(1, TOPIC, group, groupProtocol); + groups.addAll(Arrays.asList("--group", group)); + } + + int expectedNumLines = DESCRIBE_TYPES.size() * 2; + + for (List describeType : DESCRIBE_TYPES) { + List cgcArgs = new ArrayList<>(Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--describe")); + cgcArgs.addAll(groups); + cgcArgs.addAll(describeType); + ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs.toArray(new String[0])); + + TestUtils.waitForCondition(() -> { + scala.Tuple2 res = kafka.utils.TestUtils.grabConsoleOutputAndError(describeGroups(service)); + long numLines = Arrays.stream(res._1.trim().split("\n")).filter(line -> !line.isEmpty()).count(); + return (numLines == expectedNumLines) && res._2.isEmpty(); + }, "Expected a data row and no error in describe results with describe type " + String.join(" ", describeType) + "."); + } + } + + @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) + @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"}) + public void testDescribeAllExistingGroups(String quorum, String groupProtocol) throws Exception { + createOffsetsTopic(listenerName(), new Properties()); + + // Create N single-threaded consumer groups from a single-partition topic + for (List describeType : DESCRIBE_TYPES) { + String group = GROUP + String.join("", describeType); + addConsumerGroupExecutor(1, TOPIC, group, groupProtocol); + } + + int expectedNumLines = DESCRIBE_TYPES.size() * 2; + + for (List describeType : DESCRIBE_TYPES) { + List cgcArgs = new ArrayList<>(Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--all-groups")); + cgcArgs.addAll(describeType); + ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs.toArray(new String[0])); + + TestUtils.waitForCondition(() -> { + scala.Tuple2 res = kafka.utils.TestUtils.grabConsoleOutputAndError(describeGroups(service)); + long numLines = Arrays.stream(res._1.trim().split("\n")).filter(s -> !s.isEmpty()).count(); + return (numLines == expectedNumLines) && res._2.isEmpty(); + }, "Expected a data row and no error in describe results with describe type " + String.join(" ", describeType) + "."); + } + } + + @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) + @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"}) + public void testDescribeOffsetsOfExistingGroup(String quorum, String groupProtocol) throws Exception { + createOffsetsTopic(listenerName(), new Properties()); + + // run one consumer in the group consuming from a single-partition topic + addConsumerGroupExecutor(1, groupProtocol); + + String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", GROUP}; + ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); + + TestUtils.waitForCondition(() -> { + scala.Tuple2, Option>> groupOffsets = service.collectGroupOffsets(GROUP); + Option state = groupOffsets._1; + Option> assignments = groupOffsets._2; + + Function1 isGrp = s -> Objects.equals(s.group(), GROUP); + + boolean res = state.map(s -> s.contains("Stable")).getOrElse(() -> false) && + assignments.isDefined() && + assignments.get().count(isGrp) == 1; + + if (!res) + return false; + + @SuppressWarnings("cast") + ConsumerGroupCommand.PartitionAssignmentState partitionState = + (ConsumerGroupCommand.PartitionAssignmentState) assignments.get().filter(isGrp).head(); + + return !partitionState.consumerId().map(s0 -> s0.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE())).getOrElse(() -> false) && + !partitionState.clientId().map(s0 -> s0.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE())).getOrElse(() -> false) && + !partitionState.host().map(h -> h.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE())).getOrElse(() -> false); + }, "Expected a 'Stable' group status, rows and valid values for consumer id / client id / host columns in describe results for group " + GROUP + "."); + } + + @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) + @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"}) + public void testDescribeMembersOfExistingGroup(String quorum, String groupProtocol) throws Exception { + createOffsetsTopic(listenerName(), new Properties()); + + // run one consumer in the group consuming from a single-partition topic + addConsumerGroupExecutor(1, groupProtocol); + String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", GROUP}; + ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); + + TestUtils.waitForCondition(() -> { + scala.Tuple2, Option>> groupMembers = service.collectGroupMembers(GROUP, false); + Option state = groupMembers._1; + Option> assignments = groupMembers._2; + + Function1 isGrp = s -> Objects.equals(s.group(), GROUP); + + boolean res = state.map(s -> s.contains("Stable")).getOrElse(() -> false) && + assignments.isDefined() && + assignments.get().count(s -> Objects.equals(s.group(), GROUP)) == 1; + + if (!res) + return false; + + @SuppressWarnings("cast") + ConsumerGroupCommand.MemberAssignmentState assignmentState = + (ConsumerGroupCommand.MemberAssignmentState) assignments.get().filter(isGrp).head(); + + return !Objects.equals(assignmentState.consumerId(), ConsumerGroupCommand.MISSING_COLUMN_VALUE()) && + !Objects.equals(assignmentState.clientId(), ConsumerGroupCommand.MISSING_COLUMN_VALUE()) && + !Objects.equals(assignmentState.host(), ConsumerGroupCommand.MISSING_COLUMN_VALUE()); + }, "Expected a 'Stable' group status, rows and valid member information for group " + GROUP + "."); + + scala.Tuple2, Option>> res = service.collectGroupMembers(GROUP, true); + + if (res._2.isDefined()) { + assertTrue(res._2.get().size() == 1 && res._2.get().iterator().next().assignment().size() == 1, + "Expected a topic partition assigned to the single group member for group " + GROUP); + } else { + fail("Expected partition assignments for members of group " + GROUP); + } + } + + @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) + @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"}) + public void testDescribeStateOfExistingGroup(String quorum, String groupProtocol) throws Exception { + createOffsetsTopic(listenerName(), new Properties()); + + // run one consumer in the group consuming from a single-partition topic + addConsumerGroupExecutor( + 1, + groupProtocol, + // This is only effective when new protocol is used. + Optional.of("range") + ); + String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", GROUP}; + ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); + + TestUtils.waitForCondition(() -> { + ConsumerGroupCommand.GroupState state = service.collectGroupState(GROUP); + return Objects.equals(state.state(), "Stable") && + state.numMembers() == 1 && + Objects.equals(state.assignmentStrategy(), "range") && + state.coordinator() != null && + brokers().count(s -> s.config().brokerId() == state.coordinator().id()) > 0; + }, "Expected a 'Stable' group status, with one member and round robin assignment strategy for group " + GROUP + "."); + } + + @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) + @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"}) + public void testDescribeStateOfExistingGroupWithNonDefaultAssignor(String quorum, String groupProtocol) throws Exception { + createOffsetsTopic(listenerName(), new Properties()); + + // run one consumer in the group consuming from a single-partition topic + String expectedName; + if (groupProtocol.equals("consumer")) { + addConsumerGroupExecutor(1, groupProtocol, Optional.of("range")); + expectedName = "range"; + } else { + addConsumerGroupExecutor(1, TOPIC, GROUP, RoundRobinAssignor.class.getName(), Optional.empty(), Optional.empty(), false, groupProtocol); + expectedName = "roundrobin"; + } + String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", GROUP}; + ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); + + TestUtils.waitForCondition(() -> { + ConsumerGroupCommand.GroupState state = service.collectGroupState(GROUP); + return Objects.equals(state.state(), "Stable") && + state.numMembers() == 1 && + Objects.equals(state.assignmentStrategy(), expectedName) && + state.coordinator() != null && + brokers().count(s -> s.config().brokerId() == state.coordinator().id()) > 0; + }, "Expected a 'Stable' group status, with one member and " + expectedName + " assignment strategy for group " + GROUP + "."); + } + + @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) + @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"}) + public void testDescribeExistingGroupWithNoMembers(String quorum, String groupProtocol) throws Exception { + createOffsetsTopic(listenerName(), new Properties()); + + for (List describeType : DESCRIBE_TYPES) { + String group = GROUP + String.join("", describeType); + // run one consumer in the group consuming from a single-partition topic + ConsumerGroupExecutor executor = addConsumerGroupExecutor(1, TOPIC, group, groupProtocol); + List cgcArgs = new ArrayList<>(Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", group)); + cgcArgs.addAll(describeType); + ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs.toArray(new String[0])); + + TestUtils.waitForCondition(() -> { + scala.Tuple2 res = kafka.utils.TestUtils.grabConsoleOutputAndError(describeGroups(service)); + return res._1.trim().split("\n").length == 2 && res._2.isEmpty(); + }, "Expected describe group results with one data row for describe type '" + String.join(" ", describeType) + "'"); + + // stop the consumer so the group has no active member anymore + executor.shutdown(); + TestUtils.waitForCondition( + () -> kafka.utils.TestUtils.grabConsoleError(describeGroups(service)).contains("Consumer group '" + group + "' has no active members."), + "Expected no active member in describe group results with describe type " + String.join(" ", describeType)); + } + } + + @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) + @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"}) + public void testDescribeOffsetsOfExistingGroupWithNoMembers(String quorum, String groupProtocol) throws Exception { + createOffsetsTopic(listenerName(), new Properties()); + + // run one consumer in the group consuming from a single-partition topic + ConsumerGroupExecutor executor = addConsumerGroupExecutor(1, TOPIC, GROUP, RangeAssignor.class.getName(), Optional.empty(), Optional.empty(), true, groupProtocol); + + String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", GROUP}; + ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); + + TestUtils.waitForCondition(() -> { + scala.Tuple2, Option>> res = service.collectGroupOffsets(GROUP); + return res._1.map(s -> s.contains("Stable")).getOrElse(() -> false) + && res._2.map(c -> c.exists(assignment -> Objects.equals(assignment.group(), GROUP) && assignment.offset().isDefined())).getOrElse(() -> false); + }, "Expected the group to initially become stable, and to find group in assignments after initial offset commit."); + + // stop the consumer so the group has no active member anymore + executor.shutdown(); + + TestUtils.waitForCondition(() -> { + scala.Tuple2, Option>> offsets = service.collectGroupOffsets(GROUP); + Option state = offsets._1; + Option> assignments = offsets._2; + @SuppressWarnings("unchecked") + Seq testGroupAssignments = assignments.get().filter(a -> Objects.equals(a.group(), GROUP)).toSeq(); + ConsumerGroupCommand.PartitionAssignmentState assignment = testGroupAssignments.head(); + return state.map(s -> s.contains("Empty")).getOrElse(() -> false) && + testGroupAssignments.size() == 1 && + assignment.consumerId().map(c -> c.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE())).getOrElse(() -> false) && // the member should be gone + assignment.clientId().map(c -> c.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE())).getOrElse(() -> false) && + assignment.host().map(c -> c.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE())).getOrElse(() -> false); + }, "failed to collect group offsets"); + } + + @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) + @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"}) + public void testDescribeMembersOfExistingGroupWithNoMembers(String quorum, String groupProtocol) throws Exception { + createOffsetsTopic(listenerName(), new Properties()); + + // run one consumer in the group consuming from a single-partition topic + ConsumerGroupExecutor executor = addConsumerGroupExecutor(1, groupProtocol); + + String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", GROUP}; + ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); + + TestUtils.waitForCondition(() -> { + scala.Tuple2, Option>> res = service.collectGroupMembers(GROUP, false); + return res._1.map(s -> s.contains("Stable")).getOrElse(() -> false) + && res._2.map(c -> c.exists(m -> Objects.equals(m.group(), GROUP))).getOrElse(() -> false); + }, "Expected the group to initially become stable, and to find group in assignments after initial offset commit."); + + // stop the consumer so the group has no active member anymore + executor.shutdown(); + + TestUtils.waitForCondition(() -> { + scala.Tuple2, Option>> res = service.collectGroupMembers(GROUP, false); + return res._1.map(s -> s.contains("Empty")).getOrElse(() -> false) && res._2.isDefined() && res._2.get().isEmpty(); + }, "Expected no member in describe group members results for group '" + GROUP + "'"); + } + + @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) + @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"}) + public void testDescribeStateOfExistingGroupWithNoMembers(String quorum, String groupProtocol) throws Exception { + createOffsetsTopic(listenerName(), new Properties()); + + // run one consumer in the group consuming from a single-partition topic + ConsumerGroupExecutor executor = addConsumerGroupExecutor(1, groupProtocol); + + String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", GROUP}; + ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); + + TestUtils.waitForCondition(() -> { + ConsumerGroupCommand.GroupState state = service.collectGroupState(GROUP); + return Objects.equals(state.state(), "Stable") && + state.numMembers() == 1 && + state.coordinator() != null && + brokers().count(s -> s.config().brokerId() == state.coordinator().id()) > 0; + }, "Expected the group '" + GROUP + "' to initially become stable, and have a single member."); + + // stop the consumer so the group has no active member anymore + executor.shutdown(); + + TestUtils.waitForCondition(() -> { + ConsumerGroupCommand.GroupState state = service.collectGroupState(GROUP); + return Objects.equals(state.state(), "Empty") && state.numMembers() == 0; + }, "Expected the group '" + GROUP + "' to become empty after the only member leaving."); + } + + @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) + @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"}) + public void testDescribeWithConsumersWithoutAssignedPartitions(String quorum, String groupProtocol) throws Exception { + createOffsetsTopic(listenerName(), new Properties()); + + for (List describeType : DESCRIBE_TYPES) { + String group = GROUP + String.join("", describeType); + // run two consumers in the group consuming from a single-partition topic + addConsumerGroupExecutor(2, TOPIC, group, groupProtocol); + List cgcArgs = new ArrayList<>(Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", group)); + cgcArgs.addAll(describeType); + ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs.toArray(new String[0])); + + TestUtils.waitForCondition(() -> { + scala.Tuple2 res = kafka.utils.TestUtils.grabConsoleOutputAndError(describeGroups(service)); + int expectedNumRows = DESCRIBE_TYPE_MEMBERS.contains(describeType) ? 3 : 2; + return res._2.isEmpty() && res._1.trim().split("\n").length == expectedNumRows; + }, "Expected a single data row in describe group result with describe type '" + String.join(" ", describeType) + "'"); + } + } + + @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) + @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"}) + public void testDescribeOffsetsWithConsumersWithoutAssignedPartitions(String quorum, String groupProtocol) throws Exception { + createOffsetsTopic(listenerName(), new Properties()); + + // run two consumers in the group consuming from a single-partition topic + addConsumerGroupExecutor(2, groupProtocol); + + String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", GROUP}; + ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); + + TestUtils.waitForCondition(() -> { + scala.Tuple2, Option>> res = service.collectGroupOffsets(GROUP); + return res._1.map(s -> s.contains("Stable")).getOrElse(() -> false) && + res._2.isDefined() && + res._2.get().count(s -> Objects.equals(s.group(), GROUP)) == 1 && + res._2.get().count(x -> Objects.equals(x.group(), GROUP) && x.partition().isDefined()) == 1; + }, "Expected rows for consumers with no assigned partitions in describe group results"); + } + + @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) + @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"}) + public void testDescribeMembersWithConsumersWithoutAssignedPartitions(String quorum, String groupProtocol) throws Exception { + createOffsetsTopic(listenerName(), new Properties()); + + // run two consumers in the group consuming from a single-partition topic + addConsumerGroupExecutor(2, groupProtocol); + + String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", GROUP}; + ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); + + TestUtils.waitForCondition(() -> { + scala.Tuple2, Option>> res = service.collectGroupMembers(GROUP, false); + return res._1.map(s -> s.contains("Stable")).getOrElse(() -> false) && + res._2.isDefined() && + res._2.get().count(s -> Objects.equals(s.group(), GROUP)) == 2 && + res._2.get().count(x -> Objects.equals(x.group(), GROUP) && x.numPartitions() == 1) == 1 && + res._2.get().count(x -> Objects.equals(x.group(), GROUP) && x.numPartitions() == 0) == 1 && + res._2.get().forall(s -> s.assignment().isEmpty()); + }, "Expected rows for consumers with no assigned partitions in describe group results"); + + scala.Tuple2, Option>> res = service.collectGroupMembers(GROUP, true); + assertTrue(res._1.map(s -> s.contains("Stable")).getOrElse(() -> false) + && res._2.map(c -> c.exists(s -> !s.assignment().isEmpty())).getOrElse(() -> false), + "Expected additional columns in verbose version of describe members"); + } + + @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) + @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"}) + public void testDescribeStateWithConsumersWithoutAssignedPartitions(String quorum, String groupProtocol) throws Exception { + createOffsetsTopic(listenerName(), new Properties()); + + // run two consumers in the group consuming from a single-partition topic + addConsumerGroupExecutor(2, groupProtocol); + + String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", GROUP}; + ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); + + TestUtils.waitForCondition(() -> { + ConsumerGroupCommand.GroupState state = service.collectGroupState(GROUP); + return Objects.equals(state.state(), "Stable") && state.numMembers() == 2; + }, "Expected two consumers in describe group results"); + } + + @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) + @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"}) + public void testDescribeWithMultiPartitionTopicAndMultipleConsumers(String quorum, String groupProtocol) throws Exception { + createOffsetsTopic(listenerName(), new Properties()); + String topic2 = "foo2"; + createTopic(topic2, 2, 1, new Properties(), listenerName(), new Properties()); + + for (List describeType : DESCRIBE_TYPES) { + String group = GROUP + String.join("", describeType); + // run two consumers in the group consuming from a two-partition topic + addConsumerGroupExecutor(2, topic2, group, groupProtocol); + List cgcArgs = new ArrayList<>(Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", group)); + cgcArgs.addAll(describeType); + ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs.toArray(new String[0])); + + TestUtils.waitForCondition(() -> { + scala.Tuple2 res = kafka.utils.TestUtils.grabConsoleOutputAndError(describeGroups(service)); + int expectedNumRows = DESCRIBE_TYPE_STATE.contains(describeType) ? 2 : 3; + return res._2.isEmpty() && res._1.trim().split("\n").length == expectedNumRows; + }, "Expected a single data row in describe group result with describe type '" + String.join(" ", describeType) + "'"); + } + } + + @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) + @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"}) + public void testDescribeOffsetsWithMultiPartitionTopicAndMultipleConsumers(String quorum, String groupProtocol) throws Exception { + createOffsetsTopic(listenerName(), new Properties()); + String topic2 = "foo2"; + createTopic(topic2, 2, 1, new Properties(), listenerName(), new Properties()); + + // run two consumers in the group consuming from a two-partition topic + addConsumerGroupExecutor(2, topic2, GROUP, groupProtocol); + + String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", GROUP}; + ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); + + TestUtils.waitForCondition(() -> { + scala.Tuple2, Option>> res = service.collectGroupOffsets(GROUP); + return res._1.map(s -> s.contains("Stable")).getOrElse(() -> false) && + res._2.isDefined() && + res._2.get().count(s -> Objects.equals(s.group(), GROUP)) == 2 && + res._2.get().count(x -> Objects.equals(x.group(), GROUP) && x.partition().isDefined()) == 2 && + res._2.get().count(x -> Objects.equals(x.group(), GROUP) && !x.partition().isDefined()) == 0; + }, "Expected two rows (one row per consumer) in describe group results."); + } + + @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) + @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"}) + public void testDescribeMembersWithMultiPartitionTopicAndMultipleConsumers(String quorum, String groupProtocol) throws Exception { + createOffsetsTopic(listenerName(), new Properties()); + String topic2 = "foo2"; + createTopic(topic2, 2, 1, new Properties(), listenerName(), new Properties()); + + // run two consumers in the group consuming from a two-partition topic + addConsumerGroupExecutor(2, topic2, GROUP, groupProtocol); + + String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", GROUP}; + ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); + + TestUtils.waitForCondition(() -> { + scala.Tuple2, Option>> res = service.collectGroupMembers(GROUP, false); + return res._1.map(s -> s.contains("Stable")).getOrElse(() -> false) && + res._2.isDefined() && + res._2.get().count(s -> Objects.equals(s.group(), GROUP)) == 2 && + res._2.get().count(x -> Objects.equals(x.group(), GROUP) && x.numPartitions() == 1) == 2 && + res._2.get().count(x -> Objects.equals(x.group(), GROUP) && x.numPartitions() == 0) == 0; + }, "Expected two rows (one row per consumer) in describe group members results."); + + scala.Tuple2, Option>> res = service.collectGroupMembers(GROUP, true); + assertTrue(res._1.map(s -> s.contains("Stable")).getOrElse(() -> false) && res._2.map(s -> s.count(x -> x.assignment().isEmpty())).getOrElse(() -> 0) == 0, + "Expected additional columns in verbose version of describe members"); + } + + @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) + @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"}) + public void testDescribeStateWithMultiPartitionTopicAndMultipleConsumers(String quorum, String groupProtocol) throws Exception { + createOffsetsTopic(listenerName(), new Properties()); + String topic2 = "foo2"; + createTopic(topic2, 2, 1, new Properties(), listenerName(), new Properties()); + + // run two consumers in the group consuming from a two-partition topic + addConsumerGroupExecutor(2, topic2, GROUP, groupProtocol); + + String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", GROUP}; + ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); + + TestUtils.waitForCondition(() -> { + ConsumerGroupCommand.GroupState state = service.collectGroupState(GROUP); + return Objects.equals(state.state(), "Stable") && Objects.equals(state.group(), GROUP) && state.numMembers() == 2; + }, "Expected a stable group with two members in describe group state result."); + } + + @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME) + @ValueSource(strings = {"zk", "kraft", "kraft+kip848"}) + public void testDescribeSimpleConsumerGroup(String quorum) throws Exception { + // Ensure that the offsets of consumers which don't use group management are still displayed + + createOffsetsTopic(listenerName(), new Properties()); + String topic2 = "foo2"; + createTopic(topic2, 2, 1, new Properties(), listenerName(), new Properties()); + addSimpleGroupExecutor(Arrays.asList(new TopicPartition(topic2, 0), new TopicPartition(topic2, 1)), GROUP); + + String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", GROUP}; + ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); + + TestUtils.waitForCondition(() -> { + scala.Tuple2, Option>> res = service.collectGroupOffsets(GROUP); + return res._1.map(s -> s.contains("Empty")).getOrElse(() -> false) + && res._2.isDefined() && res._2.get().count(s -> Objects.equals(s.group(), GROUP)) == 2; + }, "Expected a stable group with two members in describe group state result."); + } + + @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) + @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"}) + public void testDescribeGroupWithShortInitializationTimeout(String quorum, String groupProtocol) { + // Let creation of the offsets topic happen during group initialization to ensure that initialization doesn't + // complete before the timeout expires + + List describeType = DESCRIBE_TYPES.get(RANDOM.nextInt(DESCRIBE_TYPES.size())); + String group = GROUP + String.join("", describeType); + // run one consumer in the group consuming from a single-partition topic + addConsumerGroupExecutor(1, groupProtocol); + // set the group initialization timeout too low for the group to stabilize + List cgcArgs = new ArrayList<>(Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--timeout", "1", "--group", group)); + cgcArgs.addAll(describeType); + ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs.toArray(new String[0])); + + ExecutionException e = assertThrows(ExecutionException.class, service::describeGroups); + assertInstanceOf(TimeoutException.class, e.getCause()); + } + + @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) + @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"}) + public void testDescribeGroupOffsetsWithShortInitializationTimeout(String quorum, String groupProtocol) { + // Let creation of the offsets topic happen during group initialization to ensure that initialization doesn't + // complete before the timeout expires + + // run one consumer in the group consuming from a single-partition topic + addConsumerGroupExecutor(1, groupProtocol); + + // set the group initialization timeout too low for the group to stabilize + String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", GROUP, "--timeout", "1"}; + ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); + + Throwable e = assertThrows(ExecutionException.class, () -> service.collectGroupOffsets(GROUP)); + assertEquals(TimeoutException.class, e.getCause().getClass()); + } + + @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) + @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"}) + public void testDescribeGroupMembersWithShortInitializationTimeout(String quorum, String groupProtocol) { + // Let creation of the offsets topic happen during group initialization to ensure that initialization doesn't + // complete before the timeout expires + + // run one consumer in the group consuming from a single-partition topic + addConsumerGroupExecutor(1, groupProtocol); + + // set the group initialization timeout too low for the group to stabilize + String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", GROUP, "--timeout", "1"}; + ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); + + Throwable e = assertThrows(ExecutionException.class, () -> service.collectGroupMembers(GROUP, false)); + assertEquals(TimeoutException.class, e.getCause().getClass()); + e = assertThrows(ExecutionException.class, () -> service.collectGroupMembers(GROUP, true)); + assertEquals(TimeoutException.class, e.getCause().getClass()); + } + + @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) + @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"}) + public void testDescribeGroupStateWithShortInitializationTimeout(String quorum, String groupProtocol) { + // Let creation of the offsets topic happen during group initialization to ensure that initialization doesn't + // complete before the timeout expires + + // run one consumer in the group consuming from a single-partition topic + addConsumerGroupExecutor(1, groupProtocol); + + // set the group initialization timeout too low for the group to stabilize + String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", GROUP, "--timeout", "1"}; + ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); + + Throwable e = assertThrows(ExecutionException.class, () -> service.collectGroupState(GROUP)); + assertEquals(TimeoutException.class, e.getCause().getClass()); + } + + @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME) + @ValueSource(strings = {"zk", "kraft"}) + public void testDescribeWithUnrecognizedNewConsumerOption(String quorum) { + String[] cgcArgs = new String[]{"--new-consumer", "--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", GROUP}; + assertThrows(joptsimple.OptionException.class, () -> getConsumerGroupService(cgcArgs)); + } + + @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) + @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"}) + public void testDescribeNonOffsetCommitGroup(String quorum, String groupProtocol) throws Exception { + createOffsetsTopic(listenerName(), new Properties()); + + Properties customProps = new Properties(); + // create a consumer group that never commits offsets + customProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + // run one consumer in the group consuming from a single-partition topic + addConsumerGroupExecutor(1, TOPIC, GROUP, RangeAssignor.class.getName(), Optional.empty(), Optional.of(customProps), false, groupProtocol); + + String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", GROUP}; + ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); + + TestUtils.waitForCondition(() -> { + scala.Tuple2, Option>> groupOffsets = service.collectGroupOffsets(GROUP); + + Function1 isGrp = s -> Objects.equals(s.group(), GROUP); + + boolean res = groupOffsets._1.map(s -> s.contains("Stable")).getOrElse(() -> false) && + groupOffsets._2.isDefined() && + groupOffsets._2.get().count(isGrp) == 1; + + if (!res) + return false; + + @SuppressWarnings("cast") + ConsumerGroupCommand.PartitionAssignmentState assignmentState = + (ConsumerGroupCommand.PartitionAssignmentState) groupOffsets._2.get().filter(isGrp).head(); + + return assignmentState.consumerId().map(c -> !c.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE())).getOrElse(() -> false) && + assignmentState.clientId().map(c -> !c.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE())).getOrElse(() -> false) && + assignmentState.host().map(h -> !h.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE())).getOrElse(() -> false); + }, "Expected a 'Stable' group status, rows and valid values for consumer id / client id / host columns in describe results for non-offset-committing group " + GROUP + "."); + } + + private Function0 describeGroups(ConsumerGroupCommand.ConsumerGroupService service) { + return () -> { + try { + service.describeGroups(); + return null; + } catch (Exception e) { + throw new RuntimeException(e); + } + }; + } +}