KAFKA-14589 [2/4] Tests of ConsoleGroupCommand rewritten in java (#15363)

This PR is part of #14471
It contains some of ConsoleGroupCommand tests rewritten in java.
Intention of separate PR is to reduce changes and simplify review.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Nikolay 2024-03-07 02:44:17 +03:00 committed by GitHub
parent ccf4bd5f46
commit 5f4806fd1c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 830 additions and 747 deletions

View File

@ -1,747 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.admin
import java.util.Properties
import kafka.utils.{Exit, TestInfoUtils, TestUtils}
import org.apache.kafka.clients.consumer.{ConsumerConfig, RoundRobinAssignor}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.TimeoutException
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{MethodSource, ValueSource}
import scala.concurrent.ExecutionException
import scala.util.Random
class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
private val describeTypeOffsets = Array(Array(""), Array("--offsets"))
private val describeTypeMembers = Array(Array("--members"), Array("--members", "--verbose"))
private val describeTypeState = Array(Array("--state"))
private val describeTypes = describeTypeOffsets ++ describeTypeMembers ++ describeTypeState
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testDescribeNonExistingGroup(quorum: String, groupProtocol: String): Unit = {
createOffsetsTopic()
val missingGroup = "missing.group"
for (describeType <- describeTypes) {
// note the group to be queried is a different (non-existing) group
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", missingGroup) ++ describeType
val service = getConsumerGroupService(cgcArgs)
val output = TestUtils.grabConsoleOutput(service.describeGroups())
assertTrue(output.contains(s"Consumer group '$missingGroup' does not exist."),
s"Expected error was not detected for describe option '${describeType.mkString(" ")}'")
}
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testDescribeWithMultipleSubActions(quorum: String): Unit = {
var exitStatus: Option[Int] = None
var exitMessage: Option[String] = None
Exit.setExitProcedure { (status, err) =>
exitStatus = Some(status)
exitMessage = err
throw new RuntimeException
}
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group, "--members", "--state")
try {
ConsumerGroupCommand.main(cgcArgs)
} catch {
case e: RuntimeException => //expected
} finally {
Exit.resetExitProcedure()
}
assertEquals(Some(1), exitStatus)
assertTrue(exitMessage.get.contains("Option [describe] takes at most one of these options"))
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testDescribeWithStateValue(quorum: String): Unit = {
var exitStatus: Option[Int] = None
var exitMessage: Option[String] = None
Exit.setExitProcedure { (status, err) =>
exitStatus = Some(status)
exitMessage = err
throw new RuntimeException
}
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--all-groups", "--state", "Stable")
try {
ConsumerGroupCommand.main(cgcArgs)
} catch {
case e: RuntimeException => //expected
} finally {
Exit.resetExitProcedure()
}
assertEquals(Some(1), exitStatus)
assertTrue(exitMessage.get.contains("Option [describe] does not take a value for [state]"))
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testDescribeOffsetsOfNonExistingGroup(quorum: String, groupProtocol: String): Unit = {
val group = "missing.group"
createOffsetsTopic()
// run one consumer in the group consuming from a single-partition topic
addConsumerGroupExecutor(numConsumers = 1, groupProtocol = groupProtocol)
// note the group to be queried is a different (non-existing) group
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group)
val service = getConsumerGroupService(cgcArgs)
val (state, assignments) = service.collectGroupOffsets(group)
assertTrue(state.contains("Dead") && assignments.contains(List()),
s"Expected the state to be 'Dead', with no members in the group '$group'.")
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testDescribeMembersOfNonExistingGroup(quorum: String, groupProtocol: String): Unit = {
val group = "missing.group"
createOffsetsTopic()
// run one consumer in the group consuming from a single-partition topic
addConsumerGroupExecutor(numConsumers = 1, groupProtocol = groupProtocol)
// note the group to be queried is a different (non-existing) group
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group)
val service = getConsumerGroupService(cgcArgs)
val (state, assignments) = service.collectGroupMembers(group, false)
assertTrue(state.contains("Dead") && assignments.contains(List()),
s"Expected the state to be 'Dead', with no members in the group '$group'.")
val (state2, assignments2) = service.collectGroupMembers(group, true)
assertTrue(state2.contains("Dead") && assignments2.contains(List()),
s"Expected the state to be 'Dead', with no members in the group '$group' (verbose option).")
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testDescribeStateOfNonExistingGroup(quorum: String, groupProtocol: String): Unit = {
val group = "missing.group"
createOffsetsTopic()
// run one consumer in the group consuming from a single-partition topic
addConsumerGroupExecutor(numConsumers = 1, groupProtocol = groupProtocol)
// note the group to be queried is a different (non-existing) group
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group)
val service = getConsumerGroupService(cgcArgs)
val state = service.collectGroupState(group)
assertTrue(state.state == "Dead" && state.numMembers == 0 &&
state.coordinator != null && brokers.map(_.config.brokerId).toList.contains(state.coordinator.id),
s"Expected the state to be 'Dead', with no members in the group '$group'."
)
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testDescribeExistingGroup(quorum: String, groupProtocol: String): Unit = {
createOffsetsTopic()
for (describeType <- describeTypes) {
val group = this.group + describeType.mkString("")
// run one consumer in the group consuming from a single-partition topic
addConsumerGroupExecutor(numConsumers = 1, group = group, groupProtocol = groupProtocol)
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group) ++ describeType
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
val (output, error) = TestUtils.grabConsoleOutputAndError(service.describeGroups())
output.trim.split("\n").length == 2 && error.isEmpty
}, s"Expected a data row and no error in describe results with describe type ${describeType.mkString(" ")}.")
}
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testDescribeExistingGroups(quorum: String, groupProtocol: String): Unit = {
createOffsetsTopic()
// Create N single-threaded consumer groups from a single-partition topic
val groups = (for (describeType <- describeTypes) yield {
val group = this.group + describeType.mkString("")
addConsumerGroupExecutor(numConsumers = 1, group = group, groupProtocol = groupProtocol)
Array("--group", group)
}).flatten
val expectedNumLines = describeTypes.length * 2
for (describeType <- describeTypes) {
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe") ++ groups ++ describeType
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
val (output, error) = TestUtils.grabConsoleOutputAndError(service.describeGroups())
val numLines = output.trim.split("\n").count(line => line.nonEmpty)
(numLines == expectedNumLines) && error.isEmpty
}, s"Expected a data row and no error in describe results with describe type ${describeType.mkString(" ")}.")
}
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testDescribeAllExistingGroups(quorum: String, groupProtocol: String): Unit = {
createOffsetsTopic()
// Create N single-threaded consumer groups from a single-partition topic
for (describeType <- describeTypes) {
val group = this.group + describeType.mkString("")
addConsumerGroupExecutor(numConsumers = 1, group = group, groupProtocol = groupProtocol)
}
val expectedNumLines = describeTypes.length * 2
for (describeType <- describeTypes) {
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--all-groups") ++ describeType
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
val (output, error) = TestUtils.grabConsoleOutputAndError(service.describeGroups())
val numLines = output.trim.split("\n").count(line => line.nonEmpty)
(numLines == expectedNumLines) && error.isEmpty
}, s"Expected a data row and no error in describe results with describe type ${describeType.mkString(" ")}.")
}
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testDescribeOffsetsOfExistingGroup(quorum: String, groupProtocol: String): Unit = {
createOffsetsTopic()
// run one consumer in the group consuming from a single-partition topic
addConsumerGroupExecutor(numConsumers = 1, groupProtocol = groupProtocol)
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group)
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
val (state, assignments) = service.collectGroupOffsets(group)
state.contains("Stable") &&
assignments.isDefined &&
assignments.get.count(_.group == group) == 1 &&
assignments.get.filter(_.group == group).head.consumerId.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE) &&
assignments.get.filter(_.group == group).head.clientId.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE) &&
assignments.get.filter(_.group == group).head.host.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE)
}, s"Expected a 'Stable' group status, rows and valid values for consumer id / client id / host columns in describe results for group $group.")
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testDescribeMembersOfExistingGroup(quorum: String, groupProtocol: String): Unit = {
createOffsetsTopic()
// run one consumer in the group consuming from a single-partition topic
addConsumerGroupExecutor(numConsumers = 1, groupProtocol = groupProtocol)
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group)
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
val (state, assignments) = service.collectGroupMembers(group, false)
state.contains("Stable") &&
(assignments match {
case Some(memberAssignments) =>
memberAssignments.count(_.group == group) == 1 &&
memberAssignments.filter(_.group == group).head.consumerId != ConsumerGroupCommand.MISSING_COLUMN_VALUE &&
memberAssignments.filter(_.group == group).head.clientId != ConsumerGroupCommand.MISSING_COLUMN_VALUE &&
memberAssignments.filter(_.group == group).head.host != ConsumerGroupCommand.MISSING_COLUMN_VALUE
case None =>
false
})
}, s"Expected a 'Stable' group status, rows and valid member information for group $group.")
val (_, assignments) = service.collectGroupMembers(group, true)
assignments match {
case None =>
fail(s"Expected partition assignments for members of group $group")
case Some(memberAssignments) =>
assertTrue(memberAssignments.size == 1 && memberAssignments.head.assignment.size == 1,
s"Expected a topic partition assigned to the single group member for group $group")
}
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testDescribeStateOfExistingGroup(quorum: String, groupProtocol: String): Unit = {
createOffsetsTopic()
// run one consumer in the group consuming from a single-partition topic
addConsumerGroupExecutor(
numConsumers = 1,
groupProtocol = groupProtocol,
// This is only effective when new protocol is used.
remoteAssignor = Some("range")
)
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group)
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
val state = service.collectGroupState(group)
state.state == "Stable" &&
state.numMembers == 1 &&
state.assignmentStrategy == "range" &&
state.coordinator != null &&
brokers.map(_.config.brokerId).toList.contains(state.coordinator.id)
}, s"Expected a 'Stable' group status, with one member and round robin assignment strategy for group $group.")
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testDescribeStateOfExistingGroupWithNonDefaultAssignor(quorum: String, groupProtocol: String): Unit = {
createOffsetsTopic()
// run one consumer in the group consuming from a single-partition topic
val expectedName = if (groupProtocol == "consumer") {
addConsumerGroupExecutor(numConsumers = 1, remoteAssignor = Some("range"), groupProtocol = groupProtocol)
"range"
} else {
addConsumerGroupExecutor(numConsumers = 1, strategy = classOf[RoundRobinAssignor].getName, groupProtocol = groupProtocol)
"roundrobin"
}
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group)
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
val state = service.collectGroupState(group)
state.state == "Stable" &&
state.numMembers == 1 &&
state.assignmentStrategy == expectedName &&
state.coordinator != null &&
brokers.map(_.config.brokerId).toList.contains(state.coordinator.id)
}, s"Expected a 'Stable' group status, with one member and $expectedName assignment strategy for group $group.")
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testDescribeExistingGroupWithNoMembers(quorum: String, groupProtocol: String): Unit = {
createOffsetsTopic()
for (describeType <- describeTypes) {
val group = this.group + describeType.mkString("")
// run one consumer in the group consuming from a single-partition topic
val executor = addConsumerGroupExecutor(numConsumers = 1, group = group, groupProtocol = groupProtocol)
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group) ++ describeType
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
val (output, error) = TestUtils.grabConsoleOutputAndError(service.describeGroups())
output.trim.split("\n").length == 2 && error.isEmpty
}, s"Expected describe group results with one data row for describe type '${describeType.mkString(" ")}'")
// stop the consumer so the group has no active member anymore
executor.shutdown()
TestUtils.waitUntilTrue(() => {
TestUtils.grabConsoleError(service.describeGroups()).contains(s"Consumer group '$group' has no active members.")
}, s"Expected no active member in describe group results with describe type ${describeType.mkString(" ")}")
}
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testDescribeOffsetsOfExistingGroupWithNoMembers(quorum: String, groupProtocol: String): Unit = {
createOffsetsTopic()
// run one consumer in the group consuming from a single-partition topic
val executor = addConsumerGroupExecutor(numConsumers = 1, groupProtocol = groupProtocol, syncCommit = true)
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group)
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
val (state, assignments) = service.collectGroupOffsets(group)
state.contains("Stable") && assignments.exists(_.exists(assignment => assignment.group == group && assignment.offset.isDefined))
}, "Expected the group to initially become stable, and to find group in assignments after initial offset commit.")
// stop the consumer so the group has no active member anymore
executor.shutdown()
val (result, succeeded) = TestUtils.computeUntilTrue(service.collectGroupOffsets(group)) {
case (state, assignments) =>
val testGroupAssignments = assignments.toSeq.flatMap(_.filter(_.group == group))
def assignment = testGroupAssignments.head
state.contains("Empty") &&
testGroupAssignments.size == 1 &&
assignment.consumerId.exists(_.trim == ConsumerGroupCommand.MISSING_COLUMN_VALUE) && // the member should be gone
assignment.clientId.exists(_.trim == ConsumerGroupCommand.MISSING_COLUMN_VALUE) &&
assignment.host.exists(_.trim == ConsumerGroupCommand.MISSING_COLUMN_VALUE)
}
val (state, assignments) = result
assertTrue(succeeded, s"Expected no active member in describe group results, state: $state, assignments: $assignments")
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testDescribeMembersOfExistingGroupWithNoMembers(quorum: String, groupProtocol: String): Unit = {
createOffsetsTopic()
// run one consumer in the group consuming from a single-partition topic
val executor = addConsumerGroupExecutor(numConsumers = 1, groupProtocol = groupProtocol)
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group)
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
val (state, assignments) = service.collectGroupMembers(group, false)
state.contains("Stable") && assignments.exists(_.exists(_.group == group))
}, "Expected the group to initially become stable, and to find group in assignments after initial offset commit.")
// stop the consumer so the group has no active member anymore
executor.shutdown()
TestUtils.waitUntilTrue(() => {
val (state, assignments) = service.collectGroupMembers(group, false)
state.contains("Empty") && assignments.isDefined && assignments.get.isEmpty
}, s"Expected no member in describe group members results for group '$group'")
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testDescribeStateOfExistingGroupWithNoMembers(quorum: String, groupProtocol: String): Unit = {
createOffsetsTopic()
// run one consumer in the group consuming from a single-partition topic
val executor = addConsumerGroupExecutor(numConsumers = 1, groupProtocol = groupProtocol)
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group)
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
val state = service.collectGroupState(group)
state.state == "Stable" &&
state.numMembers == 1 &&
state.coordinator != null &&
brokers.map(_.config.brokerId).toList.contains(state.coordinator.id)
}, s"Expected the group '$group' to initially become stable, and have a single member.")
// stop the consumer so the group has no active member anymore
executor.shutdown()
TestUtils.waitUntilTrue(() => {
val state = service.collectGroupState(group)
state.state == "Empty" && state.numMembers == 0
}, s"Expected the group '$group' to become empty after the only member leaving.")
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testDescribeWithConsumersWithoutAssignedPartitions(quorum: String, groupProtocol: String): Unit = {
createOffsetsTopic()
for (describeType <- describeTypes) {
val group = this.group + describeType.mkString("")
// run two consumers in the group consuming from a single-partition topic
addConsumerGroupExecutor(numConsumers = 2, group = group, groupProtocol = groupProtocol)
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group) ++ describeType
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
val (output, error) = TestUtils.grabConsoleOutputAndError(service.describeGroups())
val expectedNumRows = if (describeTypeMembers.contains(describeType)) 3 else 2
error.isEmpty && output.trim.split("\n").size == expectedNumRows
}, s"Expected a single data row in describe group result with describe type '${describeType.mkString(" ")}'")
}
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testDescribeOffsetsWithConsumersWithoutAssignedPartitions(quorum: String, groupProtocol: String): Unit = {
createOffsetsTopic()
// run two consumers in the group consuming from a single-partition topic
addConsumerGroupExecutor(numConsumers = 2, groupProtocol = groupProtocol)
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group)
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
val (state, assignments) = service.collectGroupOffsets(group)
state.contains("Stable") &&
assignments.isDefined &&
assignments.get.count(_.group == group) == 1 &&
assignments.get.count { x => x.group == group && x.partition.isDefined } == 1
}, "Expected rows for consumers with no assigned partitions in describe group results")
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testDescribeMembersWithConsumersWithoutAssignedPartitions(quorum: String, groupProtocol: String): Unit = {
createOffsetsTopic()
// run two consumers in the group consuming from a single-partition topic
addConsumerGroupExecutor(numConsumers = 2, groupProtocol = groupProtocol)
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group)
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
val (state, assignments) = service.collectGroupMembers(group, false)
state.contains("Stable") &&
assignments.isDefined &&
assignments.get.count(_.group == group) == 2 &&
assignments.get.count { x => x.group == group && x.numPartitions == 1 } == 1 &&
assignments.get.count { x => x.group == group && x.numPartitions == 0 } == 1 &&
!assignments.get.exists(_.assignment.nonEmpty)
}, "Expected rows for consumers with no assigned partitions in describe group results")
val (state, assignments) = service.collectGroupMembers(group, true)
assertTrue(state.contains("Stable") && assignments.get.count(_.assignment.nonEmpty) > 0,
"Expected additional columns in verbose version of describe members")
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testDescribeStateWithConsumersWithoutAssignedPartitions(quorum: String, groupProtocol: String): Unit = {
createOffsetsTopic()
// run two consumers in the group consuming from a single-partition topic
addConsumerGroupExecutor(numConsumers = 2, groupProtocol = groupProtocol)
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group)
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
val state = service.collectGroupState(group)
state.state == "Stable" && state.numMembers == 2
}, "Expected two consumers in describe group results")
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testDescribeWithMultiPartitionTopicAndMultipleConsumers(quorum: String, groupProtocol: String): Unit = {
createOffsetsTopic()
val topic2 = "foo2"
createTopic(topic2, 2, 1)
for (describeType <- describeTypes) {
val group = this.group + describeType.mkString("")
// run two consumers in the group consuming from a two-partition topic
addConsumerGroupExecutor(2, topic2, group = group, groupProtocol = groupProtocol)
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group) ++ describeType
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
val (output, error) = TestUtils.grabConsoleOutputAndError(service.describeGroups())
val expectedNumRows = if (describeTypeState.contains(describeType)) 2 else 3
error.isEmpty && output.trim.split("\n").size == expectedNumRows
}, s"Expected a single data row in describe group result with describe type '${describeType.mkString(" ")}'")
}
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testDescribeOffsetsWithMultiPartitionTopicAndMultipleConsumers(quorum: String, groupProtocol: String): Unit = {
createOffsetsTopic()
val topic2 = "foo2"
createTopic(topic2, 2, 1)
// run two consumers in the group consuming from a two-partition topic
addConsumerGroupExecutor(numConsumers = 2, topic2, groupProtocol = groupProtocol)
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group)
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
val (state, assignments) = service.collectGroupOffsets(group)
state.contains("Stable") &&
assignments.isDefined &&
assignments.get.count(_.group == group) == 2 &&
assignments.get.count { x => x.group == group && x.partition.isDefined } == 2 &&
assignments.get.count { x => x.group == group && x.partition.isEmpty } == 0
}, "Expected two rows (one row per consumer) in describe group results.")
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testDescribeMembersWithMultiPartitionTopicAndMultipleConsumers(quorum: String, groupProtocol: String): Unit = {
createOffsetsTopic()
val topic2 = "foo2"
createTopic(topic2, 2, 1)
// run two consumers in the group consuming from a two-partition topic
addConsumerGroupExecutor(numConsumers = 2, topic2, groupProtocol = groupProtocol)
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group)
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
val (state, assignments) = service.collectGroupMembers(group, false)
state.contains("Stable") &&
assignments.isDefined &&
assignments.get.count(_.group == group) == 2 &&
assignments.get.count { x => x.group == group && x.numPartitions == 1 } == 2 &&
assignments.get.count { x => x.group == group && x.numPartitions == 0 } == 0
}, "Expected two rows (one row per consumer) in describe group members results.")
val (state, assignments) = service.collectGroupMembers(group, true)
assertTrue(state.contains("Stable") && assignments.get.count(_.assignment.isEmpty) == 0,
"Expected additional columns in verbose version of describe members")
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testDescribeStateWithMultiPartitionTopicAndMultipleConsumers(quorum: String, groupProtocol: String): Unit = {
createOffsetsTopic()
val topic2 = "foo2"
createTopic(topic2, 2, 1)
// run two consumers in the group consuming from a two-partition topic
addConsumerGroupExecutor(numConsumers = 2, topic2, groupProtocol = groupProtocol)
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group)
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
val state = service.collectGroupState(group)
state.state == "Stable" && state.group == group && state.numMembers == 2
}, "Expected a stable group with two members in describe group state result.")
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
def testDescribeSimpleConsumerGroup(quorum: String): Unit = {
// Ensure that the offsets of consumers which don't use group management are still displayed
createOffsetsTopic()
val topic2 = "foo2"
createTopic(topic2, 2, 1)
addSimpleGroupExecutor(Seq(new TopicPartition(topic2, 0), new TopicPartition(topic2, 1)))
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group)
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
val (state, assignments) = service.collectGroupOffsets(group)
state.contains("Empty") && assignments.isDefined && assignments.get.count(_.group == group) == 2
}, "Expected a stable group with two members in describe group state result.")
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testDescribeGroupWithShortInitializationTimeout(quorum: String, groupProtocol: String): Unit = {
// Let creation of the offsets topic happen during group initialization to ensure that initialization doesn't
// complete before the timeout expires
val describeType = describeTypes(Random.nextInt(describeTypes.length))
val group = this.group + describeType.mkString("")
// run one consumer in the group consuming from a single-partition topic
addConsumerGroupExecutor(numConsumers = 1, groupProtocol = groupProtocol)
// set the group initialization timeout too low for the group to stabilize
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--timeout", "1", "--group", group) ++ describeType
val service = getConsumerGroupService(cgcArgs)
val e = assertThrows(classOf[ExecutionException], () => TestUtils.grabConsoleOutputAndError(service.describeGroups()))
assertEquals(classOf[TimeoutException], e.getCause.getClass)
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testDescribeGroupOffsetsWithShortInitializationTimeout(quorum: String, groupProtocol: String): Unit = {
// Let creation of the offsets topic happen during group initialization to ensure that initialization doesn't
// complete before the timeout expires
// run one consumer in the group consuming from a single-partition topic
addConsumerGroupExecutor(numConsumers = 1, groupProtocol = groupProtocol)
// set the group initialization timeout too low for the group to stabilize
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group, "--timeout", "1")
val service = getConsumerGroupService(cgcArgs)
val e = assertThrows(classOf[ExecutionException], () => service.collectGroupOffsets(group))
assertEquals(classOf[TimeoutException], e.getCause.getClass)
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testDescribeGroupMembersWithShortInitializationTimeout(quorum: String, groupProtocol: String): Unit = {
// Let creation of the offsets topic happen during group initialization to ensure that initialization doesn't
// complete before the timeout expires
// run one consumer in the group consuming from a single-partition topic
addConsumerGroupExecutor(numConsumers = 1, groupProtocol = groupProtocol)
// set the group initialization timeout too low for the group to stabilize
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group, "--timeout", "1")
val service = getConsumerGroupService(cgcArgs)
var e = assertThrows(classOf[ExecutionException], () => service.collectGroupMembers(group, false))
assertEquals(classOf[TimeoutException], e.getCause.getClass)
e = assertThrows(classOf[ExecutionException], () => service.collectGroupMembers(group, true))
assertEquals(classOf[TimeoutException], e.getCause.getClass)
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testDescribeGroupStateWithShortInitializationTimeout(quorum: String, groupProtocol: String): Unit = {
// Let creation of the offsets topic happen during group initialization to ensure that initialization doesn't
// complete before the timeout expires
// run one consumer in the group consuming from a single-partition topic
addConsumerGroupExecutor(numConsumers = 1, groupProtocol = groupProtocol)
// set the group initialization timeout too low for the group to stabilize
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group, "--timeout", "1")
val service = getConsumerGroupService(cgcArgs)
val e = assertThrows(classOf[ExecutionException], () => service.collectGroupState(group))
assertEquals(classOf[TimeoutException], e.getCause.getClass)
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testDescribeWithUnrecognizedNewConsumerOption(quorum: String): Unit = {
val cgcArgs = Array("--new-consumer", "--bootstrap-server", bootstrapServers(), "--describe", "--group", group)
assertThrows(classOf[joptsimple.OptionException], () => getConsumerGroupService(cgcArgs))
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testDescribeNonOffsetCommitGroup(quorum: String, groupProtocol: String): Unit = {
createOffsetsTopic()
val customProps = new Properties
// create a consumer group that never commits offsets
customProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
// run one consumer in the group consuming from a single-partition topic
addConsumerGroupExecutor(numConsumers = 1, customPropsOpt = Some(customProps), groupProtocol = groupProtocol)
val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--describe", "--group", group)
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
val (state, assignments) = service.collectGroupOffsets(group)
state.contains("Stable") &&
assignments.isDefined &&
assignments.get.count(_.group == group) == 1 &&
assignments.get.filter(_.group == group).head.consumerId.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE) &&
assignments.get.filter(_.group == group).head.clientId.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE) &&
assignments.get.filter(_.group == group).head.host.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE)
}, s"Expected a 'Stable' group status, rows and valid values for consumer id / client id / host columns in describe results for non-offset-committing group $group.")
}
}

View File

@ -0,0 +1,830 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tools.consumer.group;
import kafka.admin.ConsumerGroupCommand;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.RangeAssignor;
import org.apache.kafka.clients.consumer.RoundRobinAssignor;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import scala.Function0;
import scala.Function1;
import scala.Option;
import scala.collection.Seq;
import scala.runtime.BoxedUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static org.apache.kafka.test.TestUtils.RANDOM;
import static org.apache.kafka.tools.ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES;
import static org.apache.kafka.tools.ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
public class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
private static final List<List<String>> DESCRIBE_TYPE_OFFSETS = Arrays.asList(Collections.singletonList(""), Collections.singletonList("--offsets"));
private static final List<List<String>> DESCRIBE_TYPE_MEMBERS = Arrays.asList(Collections.singletonList("--members"), Arrays.asList("--members", "--verbose"));
private static final List<List<String>> DESCRIBE_TYPE_STATE = Collections.singletonList(Collections.singletonList("--state"));
private static final List<List<String>> DESCRIBE_TYPES;
static {
List<List<String>> describeTypes = new ArrayList<>();
describeTypes.addAll(DESCRIBE_TYPE_OFFSETS);
describeTypes.addAll(DESCRIBE_TYPE_MEMBERS);
describeTypes.addAll(DESCRIBE_TYPE_STATE);
DESCRIBE_TYPES = describeTypes;
}
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
@MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
public void testDescribeNonExistingGroup(String quorum, String groupProtocol) {
createOffsetsTopic(listenerName(), new Properties());
String missingGroup = "missing.group";
for (List<String> describeType : DESCRIBE_TYPES) {
// note the group to be queried is a different (non-existing) group
List<String> cgcArgs = new ArrayList<>(Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", missingGroup));
cgcArgs.addAll(describeType);
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs.toArray(new String[0]));
String output = kafka.utils.TestUtils.grabConsoleOutput(describeGroups(service));
assertTrue(output.contains("Consumer group '" + missingGroup + "' does not exist."),
"Expected error was not detected for describe option '" + String.join(" ", describeType) + "'");
}
}
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
@ValueSource(strings = {"zk", "kraft"})
public void testDescribeWithMultipleSubActions(String quorum) {
AtomicInteger exitStatus = new AtomicInteger(0);
AtomicReference<String> exitMessage = new AtomicReference<>("");
Exit.setExitProcedure((status, err) -> {
exitStatus.set(status);
exitMessage.set(err);
throw new RuntimeException();
});
String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", GROUP, "--members", "--state"};
try {
assertThrows(RuntimeException.class, () -> ConsumerGroupCommand.main(cgcArgs));
} finally {
Exit.resetExitProcedure();
}
assertEquals(1, exitStatus.get());
assertTrue(exitMessage.get().contains("Option [describe] takes at most one of these options"));
}
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
@ValueSource(strings = {"zk", "kraft"})
public void testDescribeWithStateValue(String quorum) {
AtomicInteger exitStatus = new AtomicInteger(0);
AtomicReference<String> exitMessage = new AtomicReference<>("");
Exit.setExitProcedure((status, err) -> {
exitStatus.set(status);
exitMessage.set(err);
throw new RuntimeException();
});
String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--all-groups", "--state", "Stable"};
try {
assertThrows(RuntimeException.class, () -> ConsumerGroupCommand.main(cgcArgs));
} finally {
Exit.resetExitProcedure();
}
assertEquals(1, exitStatus.get());
assertTrue(exitMessage.get().contains("Option [describe] does not take a value for [state]"));
}
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
@MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
public void testDescribeOffsetsOfNonExistingGroup(String quorum, String groupProtocol) {
String group = "missing.group";
createOffsetsTopic(listenerName(), new Properties());
// run one consumer in the group consuming from a single-partition topic
addConsumerGroupExecutor(1, groupProtocol);
// note the group to be queried is a different (non-existing) group
String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", group};
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
scala.Tuple2<Option<String>, Option<Seq<ConsumerGroupCommand.PartitionAssignmentState>>> res = service.collectGroupOffsets(group);
assertTrue(res._1.map(s -> s.contains("Dead")).getOrElse(() -> false) && res._2.map(Seq::isEmpty).getOrElse(() -> false),
"Expected the state to be 'Dead', with no members in the group '" + group + "'.");
}
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
@MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
public void testDescribeMembersOfNonExistingGroup(String quorum, String groupProtocol) {
String group = "missing.group";
createOffsetsTopic(listenerName(), new Properties());
// run one consumer in the group consuming from a single-partition topic
addConsumerGroupExecutor(1, groupProtocol);
// note the group to be queried is a different (non-existing) group
String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", group};
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
scala.Tuple2<Option<String>, Option<Seq<ConsumerGroupCommand.MemberAssignmentState>>> res = service.collectGroupMembers(group, false);
assertTrue(res._1.map(s -> s.contains("Dead")).getOrElse(() -> false) && res._2.map(Seq::isEmpty).getOrElse(() -> false),
"Expected the state to be 'Dead', with no members in the group '" + group + "'.");
scala.Tuple2<Option<String>, Option<Seq<ConsumerGroupCommand.MemberAssignmentState>>> res2 = service.collectGroupMembers(group, true);
assertTrue(res2._1.map(s -> s.contains("Dead")).getOrElse(() -> false) && res2._2.map(Seq::isEmpty).getOrElse(() -> false),
"Expected the state to be 'Dead', with no members in the group '" + group + "' (verbose option).");
}
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
@MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
public void testDescribeStateOfNonExistingGroup(String quorum, String groupProtocol) {
String group = "missing.group";
createOffsetsTopic(listenerName(), new Properties());
// run one consumer in the group consuming from a single-partition topic
addConsumerGroupExecutor(1, groupProtocol);
// note the group to be queried is a different (non-existing) group
String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", group};
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
ConsumerGroupCommand.GroupState state = service.collectGroupState(group);
assertTrue(Objects.equals(state.state(), "Dead") && state.numMembers() == 0 &&
state.coordinator() != null && !brokers().filter(s -> s.config().brokerId() == state.coordinator().id()).isEmpty(),
"Expected the state to be 'Dead', with no members in the group '" + group + "'."
);
}
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
@MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
public void testDescribeExistingGroup(String quorum, String groupProtocol) throws Exception {
createOffsetsTopic(listenerName(), new Properties());
for (List<String> describeType : DESCRIBE_TYPES) {
String group = GROUP + String.join("", describeType);
// run one consumer in the group consuming from a single-partition topic
addConsumerGroupExecutor(1, TOPIC, group, groupProtocol);
List<String> cgcArgs = new ArrayList<>(Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", group));
cgcArgs.addAll(describeType);
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs.toArray(new String[0]));
TestUtils.waitForCondition(() -> {
scala.Tuple2<String, String> res = kafka.utils.TestUtils.grabConsoleOutputAndError(describeGroups(service));
return res._1.trim().split("\n").length == 2 && res._2.isEmpty();
}, "Expected a data row and no error in describe results with describe type " + String.join(" ", describeType) + ".");
}
}
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
@MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
public void testDescribeExistingGroups(String quorum, String groupProtocol) throws Exception {
createOffsetsTopic(listenerName(), new Properties());
// Create N single-threaded consumer groups from a single-partition topic
List<String> groups = new ArrayList<>();
for (List<String> describeType : DESCRIBE_TYPES) {
String group = GROUP + String.join("", describeType);
addConsumerGroupExecutor(1, TOPIC, group, groupProtocol);
groups.addAll(Arrays.asList("--group", group));
}
int expectedNumLines = DESCRIBE_TYPES.size() * 2;
for (List<String> describeType : DESCRIBE_TYPES) {
List<String> cgcArgs = new ArrayList<>(Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--describe"));
cgcArgs.addAll(groups);
cgcArgs.addAll(describeType);
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs.toArray(new String[0]));
TestUtils.waitForCondition(() -> {
scala.Tuple2<String, String> res = kafka.utils.TestUtils.grabConsoleOutputAndError(describeGroups(service));
long numLines = Arrays.stream(res._1.trim().split("\n")).filter(line -> !line.isEmpty()).count();
return (numLines == expectedNumLines) && res._2.isEmpty();
}, "Expected a data row and no error in describe results with describe type " + String.join(" ", describeType) + ".");
}
}
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
@MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
public void testDescribeAllExistingGroups(String quorum, String groupProtocol) throws Exception {
createOffsetsTopic(listenerName(), new Properties());
// Create N single-threaded consumer groups from a single-partition topic
for (List<String> describeType : DESCRIBE_TYPES) {
String group = GROUP + String.join("", describeType);
addConsumerGroupExecutor(1, TOPIC, group, groupProtocol);
}
int expectedNumLines = DESCRIBE_TYPES.size() * 2;
for (List<String> describeType : DESCRIBE_TYPES) {
List<String> cgcArgs = new ArrayList<>(Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--all-groups"));
cgcArgs.addAll(describeType);
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs.toArray(new String[0]));
TestUtils.waitForCondition(() -> {
scala.Tuple2<String, String> res = kafka.utils.TestUtils.grabConsoleOutputAndError(describeGroups(service));
long numLines = Arrays.stream(res._1.trim().split("\n")).filter(s -> !s.isEmpty()).count();
return (numLines == expectedNumLines) && res._2.isEmpty();
}, "Expected a data row and no error in describe results with describe type " + String.join(" ", describeType) + ".");
}
}
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
@MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
public void testDescribeOffsetsOfExistingGroup(String quorum, String groupProtocol) throws Exception {
createOffsetsTopic(listenerName(), new Properties());
// run one consumer in the group consuming from a single-partition topic
addConsumerGroupExecutor(1, groupProtocol);
String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", GROUP};
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
TestUtils.waitForCondition(() -> {
scala.Tuple2<Option<String>, Option<Seq<ConsumerGroupCommand.PartitionAssignmentState>>> groupOffsets = service.collectGroupOffsets(GROUP);
Option<String> state = groupOffsets._1;
Option<Seq<ConsumerGroupCommand.PartitionAssignmentState>> assignments = groupOffsets._2;
Function1<ConsumerGroupCommand.PartitionAssignmentState, Object> isGrp = s -> Objects.equals(s.group(), GROUP);
boolean res = state.map(s -> s.contains("Stable")).getOrElse(() -> false) &&
assignments.isDefined() &&
assignments.get().count(isGrp) == 1;
if (!res)
return false;
@SuppressWarnings("cast")
ConsumerGroupCommand.PartitionAssignmentState partitionState =
(ConsumerGroupCommand.PartitionAssignmentState) assignments.get().filter(isGrp).head();
return !partitionState.consumerId().map(s0 -> s0.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE())).getOrElse(() -> false) &&
!partitionState.clientId().map(s0 -> s0.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE())).getOrElse(() -> false) &&
!partitionState.host().map(h -> h.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE())).getOrElse(() -> false);
}, "Expected a 'Stable' group status, rows and valid values for consumer id / client id / host columns in describe results for group " + GROUP + ".");
}
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
@MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
public void testDescribeMembersOfExistingGroup(String quorum, String groupProtocol) throws Exception {
createOffsetsTopic(listenerName(), new Properties());
// run one consumer in the group consuming from a single-partition topic
addConsumerGroupExecutor(1, groupProtocol);
String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", GROUP};
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
TestUtils.waitForCondition(() -> {
scala.Tuple2<Option<String>, Option<Seq<ConsumerGroupCommand.MemberAssignmentState>>> groupMembers = service.collectGroupMembers(GROUP, false);
Option<String> state = groupMembers._1;
Option<Seq<ConsumerGroupCommand.MemberAssignmentState>> assignments = groupMembers._2;
Function1<ConsumerGroupCommand.MemberAssignmentState, Object> isGrp = s -> Objects.equals(s.group(), GROUP);
boolean res = state.map(s -> s.contains("Stable")).getOrElse(() -> false) &&
assignments.isDefined() &&
assignments.get().count(s -> Objects.equals(s.group(), GROUP)) == 1;
if (!res)
return false;
@SuppressWarnings("cast")
ConsumerGroupCommand.MemberAssignmentState assignmentState =
(ConsumerGroupCommand.MemberAssignmentState) assignments.get().filter(isGrp).head();
return !Objects.equals(assignmentState.consumerId(), ConsumerGroupCommand.MISSING_COLUMN_VALUE()) &&
!Objects.equals(assignmentState.clientId(), ConsumerGroupCommand.MISSING_COLUMN_VALUE()) &&
!Objects.equals(assignmentState.host(), ConsumerGroupCommand.MISSING_COLUMN_VALUE());
}, "Expected a 'Stable' group status, rows and valid member information for group " + GROUP + ".");
scala.Tuple2<Option<String>, Option<Seq<ConsumerGroupCommand.MemberAssignmentState>>> res = service.collectGroupMembers(GROUP, true);
if (res._2.isDefined()) {
assertTrue(res._2.get().size() == 1 && res._2.get().iterator().next().assignment().size() == 1,
"Expected a topic partition assigned to the single group member for group " + GROUP);
} else {
fail("Expected partition assignments for members of group " + GROUP);
}
}
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
@MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
public void testDescribeStateOfExistingGroup(String quorum, String groupProtocol) throws Exception {
createOffsetsTopic(listenerName(), new Properties());
// run one consumer in the group consuming from a single-partition topic
addConsumerGroupExecutor(
1,
groupProtocol,
// This is only effective when new protocol is used.
Optional.of("range")
);
String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", GROUP};
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
TestUtils.waitForCondition(() -> {
ConsumerGroupCommand.GroupState state = service.collectGroupState(GROUP);
return Objects.equals(state.state(), "Stable") &&
state.numMembers() == 1 &&
Objects.equals(state.assignmentStrategy(), "range") &&
state.coordinator() != null &&
brokers().count(s -> s.config().brokerId() == state.coordinator().id()) > 0;
}, "Expected a 'Stable' group status, with one member and round robin assignment strategy for group " + GROUP + ".");
}
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
@MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
public void testDescribeStateOfExistingGroupWithNonDefaultAssignor(String quorum, String groupProtocol) throws Exception {
createOffsetsTopic(listenerName(), new Properties());
// run one consumer in the group consuming from a single-partition topic
String expectedName;
if (groupProtocol.equals("consumer")) {
addConsumerGroupExecutor(1, groupProtocol, Optional.of("range"));
expectedName = "range";
} else {
addConsumerGroupExecutor(1, TOPIC, GROUP, RoundRobinAssignor.class.getName(), Optional.empty(), Optional.empty(), false, groupProtocol);
expectedName = "roundrobin";
}
String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", GROUP};
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
TestUtils.waitForCondition(() -> {
ConsumerGroupCommand.GroupState state = service.collectGroupState(GROUP);
return Objects.equals(state.state(), "Stable") &&
state.numMembers() == 1 &&
Objects.equals(state.assignmentStrategy(), expectedName) &&
state.coordinator() != null &&
brokers().count(s -> s.config().brokerId() == state.coordinator().id()) > 0;
}, "Expected a 'Stable' group status, with one member and " + expectedName + " assignment strategy for group " + GROUP + ".");
}
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
@MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
public void testDescribeExistingGroupWithNoMembers(String quorum, String groupProtocol) throws Exception {
createOffsetsTopic(listenerName(), new Properties());
for (List<String> describeType : DESCRIBE_TYPES) {
String group = GROUP + String.join("", describeType);
// run one consumer in the group consuming from a single-partition topic
ConsumerGroupExecutor executor = addConsumerGroupExecutor(1, TOPIC, group, groupProtocol);
List<String> cgcArgs = new ArrayList<>(Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", group));
cgcArgs.addAll(describeType);
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs.toArray(new String[0]));
TestUtils.waitForCondition(() -> {
scala.Tuple2<String, String> res = kafka.utils.TestUtils.grabConsoleOutputAndError(describeGroups(service));
return res._1.trim().split("\n").length == 2 && res._2.isEmpty();
}, "Expected describe group results with one data row for describe type '" + String.join(" ", describeType) + "'");
// stop the consumer so the group has no active member anymore
executor.shutdown();
TestUtils.waitForCondition(
() -> kafka.utils.TestUtils.grabConsoleError(describeGroups(service)).contains("Consumer group '" + group + "' has no active members."),
"Expected no active member in describe group results with describe type " + String.join(" ", describeType));
}
}
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
@MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
public void testDescribeOffsetsOfExistingGroupWithNoMembers(String quorum, String groupProtocol) throws Exception {
createOffsetsTopic(listenerName(), new Properties());
// run one consumer in the group consuming from a single-partition topic
ConsumerGroupExecutor executor = addConsumerGroupExecutor(1, TOPIC, GROUP, RangeAssignor.class.getName(), Optional.empty(), Optional.empty(), true, groupProtocol);
String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", GROUP};
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
TestUtils.waitForCondition(() -> {
scala.Tuple2<Option<String>, Option<Seq<ConsumerGroupCommand.PartitionAssignmentState>>> res = service.collectGroupOffsets(GROUP);
return res._1.map(s -> s.contains("Stable")).getOrElse(() -> false)
&& res._2.map(c -> c.exists(assignment -> Objects.equals(assignment.group(), GROUP) && assignment.offset().isDefined())).getOrElse(() -> false);
}, "Expected the group to initially become stable, and to find group in assignments after initial offset commit.");
// stop the consumer so the group has no active member anymore
executor.shutdown();
TestUtils.waitForCondition(() -> {
scala.Tuple2<Option<String>, Option<Seq<ConsumerGroupCommand.PartitionAssignmentState>>> offsets = service.collectGroupOffsets(GROUP);
Option<String> state = offsets._1;
Option<Seq<ConsumerGroupCommand.PartitionAssignmentState>> assignments = offsets._2;
@SuppressWarnings("unchecked")
Seq<ConsumerGroupCommand.PartitionAssignmentState> testGroupAssignments = assignments.get().filter(a -> Objects.equals(a.group(), GROUP)).toSeq();
ConsumerGroupCommand.PartitionAssignmentState assignment = testGroupAssignments.head();
return state.map(s -> s.contains("Empty")).getOrElse(() -> false) &&
testGroupAssignments.size() == 1 &&
assignment.consumerId().map(c -> c.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE())).getOrElse(() -> false) && // the member should be gone
assignment.clientId().map(c -> c.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE())).getOrElse(() -> false) &&
assignment.host().map(c -> c.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE())).getOrElse(() -> false);
}, "failed to collect group offsets");
}
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
@MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
public void testDescribeMembersOfExistingGroupWithNoMembers(String quorum, String groupProtocol) throws Exception {
createOffsetsTopic(listenerName(), new Properties());
// run one consumer in the group consuming from a single-partition topic
ConsumerGroupExecutor executor = addConsumerGroupExecutor(1, groupProtocol);
String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", GROUP};
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
TestUtils.waitForCondition(() -> {
scala.Tuple2<Option<String>, Option<Seq<ConsumerGroupCommand.MemberAssignmentState>>> res = service.collectGroupMembers(GROUP, false);
return res._1.map(s -> s.contains("Stable")).getOrElse(() -> false)
&& res._2.map(c -> c.exists(m -> Objects.equals(m.group(), GROUP))).getOrElse(() -> false);
}, "Expected the group to initially become stable, and to find group in assignments after initial offset commit.");
// stop the consumer so the group has no active member anymore
executor.shutdown();
TestUtils.waitForCondition(() -> {
scala.Tuple2<Option<String>, Option<Seq<ConsumerGroupCommand.MemberAssignmentState>>> res = service.collectGroupMembers(GROUP, false);
return res._1.map(s -> s.contains("Empty")).getOrElse(() -> false) && res._2.isDefined() && res._2.get().isEmpty();
}, "Expected no member in describe group members results for group '" + GROUP + "'");
}
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
@MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
public void testDescribeStateOfExistingGroupWithNoMembers(String quorum, String groupProtocol) throws Exception {
createOffsetsTopic(listenerName(), new Properties());
// run one consumer in the group consuming from a single-partition topic
ConsumerGroupExecutor executor = addConsumerGroupExecutor(1, groupProtocol);
String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", GROUP};
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
TestUtils.waitForCondition(() -> {
ConsumerGroupCommand.GroupState state = service.collectGroupState(GROUP);
return Objects.equals(state.state(), "Stable") &&
state.numMembers() == 1 &&
state.coordinator() != null &&
brokers().count(s -> s.config().brokerId() == state.coordinator().id()) > 0;
}, "Expected the group '" + GROUP + "' to initially become stable, and have a single member.");
// stop the consumer so the group has no active member anymore
executor.shutdown();
TestUtils.waitForCondition(() -> {
ConsumerGroupCommand.GroupState state = service.collectGroupState(GROUP);
return Objects.equals(state.state(), "Empty") && state.numMembers() == 0;
}, "Expected the group '" + GROUP + "' to become empty after the only member leaving.");
}
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
@MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
public void testDescribeWithConsumersWithoutAssignedPartitions(String quorum, String groupProtocol) throws Exception {
createOffsetsTopic(listenerName(), new Properties());
for (List<String> describeType : DESCRIBE_TYPES) {
String group = GROUP + String.join("", describeType);
// run two consumers in the group consuming from a single-partition topic
addConsumerGroupExecutor(2, TOPIC, group, groupProtocol);
List<String> cgcArgs = new ArrayList<>(Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", group));
cgcArgs.addAll(describeType);
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs.toArray(new String[0]));
TestUtils.waitForCondition(() -> {
scala.Tuple2<String, String> res = kafka.utils.TestUtils.grabConsoleOutputAndError(describeGroups(service));
int expectedNumRows = DESCRIBE_TYPE_MEMBERS.contains(describeType) ? 3 : 2;
return res._2.isEmpty() && res._1.trim().split("\n").length == expectedNumRows;
}, "Expected a single data row in describe group result with describe type '" + String.join(" ", describeType) + "'");
}
}
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
@MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
public void testDescribeOffsetsWithConsumersWithoutAssignedPartitions(String quorum, String groupProtocol) throws Exception {
createOffsetsTopic(listenerName(), new Properties());
// run two consumers in the group consuming from a single-partition topic
addConsumerGroupExecutor(2, groupProtocol);
String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", GROUP};
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
TestUtils.waitForCondition(() -> {
scala.Tuple2<Option<String>, Option<Seq<ConsumerGroupCommand.PartitionAssignmentState>>> res = service.collectGroupOffsets(GROUP);
return res._1.map(s -> s.contains("Stable")).getOrElse(() -> false) &&
res._2.isDefined() &&
res._2.get().count(s -> Objects.equals(s.group(), GROUP)) == 1 &&
res._2.get().count(x -> Objects.equals(x.group(), GROUP) && x.partition().isDefined()) == 1;
}, "Expected rows for consumers with no assigned partitions in describe group results");
}
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
@MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
public void testDescribeMembersWithConsumersWithoutAssignedPartitions(String quorum, String groupProtocol) throws Exception {
createOffsetsTopic(listenerName(), new Properties());
// run two consumers in the group consuming from a single-partition topic
addConsumerGroupExecutor(2, groupProtocol);
String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", GROUP};
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
TestUtils.waitForCondition(() -> {
scala.Tuple2<Option<String>, Option<Seq<ConsumerGroupCommand.MemberAssignmentState>>> res = service.collectGroupMembers(GROUP, false);
return res._1.map(s -> s.contains("Stable")).getOrElse(() -> false) &&
res._2.isDefined() &&
res._2.get().count(s -> Objects.equals(s.group(), GROUP)) == 2 &&
res._2.get().count(x -> Objects.equals(x.group(), GROUP) && x.numPartitions() == 1) == 1 &&
res._2.get().count(x -> Objects.equals(x.group(), GROUP) && x.numPartitions() == 0) == 1 &&
res._2.get().forall(s -> s.assignment().isEmpty());
}, "Expected rows for consumers with no assigned partitions in describe group results");
scala.Tuple2<Option<String>, Option<Seq<ConsumerGroupCommand.MemberAssignmentState>>> res = service.collectGroupMembers(GROUP, true);
assertTrue(res._1.map(s -> s.contains("Stable")).getOrElse(() -> false)
&& res._2.map(c -> c.exists(s -> !s.assignment().isEmpty())).getOrElse(() -> false),
"Expected additional columns in verbose version of describe members");
}
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
@MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
public void testDescribeStateWithConsumersWithoutAssignedPartitions(String quorum, String groupProtocol) throws Exception {
createOffsetsTopic(listenerName(), new Properties());
// run two consumers in the group consuming from a single-partition topic
addConsumerGroupExecutor(2, groupProtocol);
String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", GROUP};
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
TestUtils.waitForCondition(() -> {
ConsumerGroupCommand.GroupState state = service.collectGroupState(GROUP);
return Objects.equals(state.state(), "Stable") && state.numMembers() == 2;
}, "Expected two consumers in describe group results");
}
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
@MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
public void testDescribeWithMultiPartitionTopicAndMultipleConsumers(String quorum, String groupProtocol) throws Exception {
createOffsetsTopic(listenerName(), new Properties());
String topic2 = "foo2";
createTopic(topic2, 2, 1, new Properties(), listenerName(), new Properties());
for (List<String> describeType : DESCRIBE_TYPES) {
String group = GROUP + String.join("", describeType);
// run two consumers in the group consuming from a two-partition topic
addConsumerGroupExecutor(2, topic2, group, groupProtocol);
List<String> cgcArgs = new ArrayList<>(Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", group));
cgcArgs.addAll(describeType);
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs.toArray(new String[0]));
TestUtils.waitForCondition(() -> {
scala.Tuple2<String, String> res = kafka.utils.TestUtils.grabConsoleOutputAndError(describeGroups(service));
int expectedNumRows = DESCRIBE_TYPE_STATE.contains(describeType) ? 2 : 3;
return res._2.isEmpty() && res._1.trim().split("\n").length == expectedNumRows;
}, "Expected a single data row in describe group result with describe type '" + String.join(" ", describeType) + "'");
}
}
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
@MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
public void testDescribeOffsetsWithMultiPartitionTopicAndMultipleConsumers(String quorum, String groupProtocol) throws Exception {
createOffsetsTopic(listenerName(), new Properties());
String topic2 = "foo2";
createTopic(topic2, 2, 1, new Properties(), listenerName(), new Properties());
// run two consumers in the group consuming from a two-partition topic
addConsumerGroupExecutor(2, topic2, GROUP, groupProtocol);
String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", GROUP};
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
TestUtils.waitForCondition(() -> {
scala.Tuple2<Option<String>, Option<Seq<ConsumerGroupCommand.PartitionAssignmentState>>> res = service.collectGroupOffsets(GROUP);
return res._1.map(s -> s.contains("Stable")).getOrElse(() -> false) &&
res._2.isDefined() &&
res._2.get().count(s -> Objects.equals(s.group(), GROUP)) == 2 &&
res._2.get().count(x -> Objects.equals(x.group(), GROUP) && x.partition().isDefined()) == 2 &&
res._2.get().count(x -> Objects.equals(x.group(), GROUP) && !x.partition().isDefined()) == 0;
}, "Expected two rows (one row per consumer) in describe group results.");
}
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
@MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
public void testDescribeMembersWithMultiPartitionTopicAndMultipleConsumers(String quorum, String groupProtocol) throws Exception {
createOffsetsTopic(listenerName(), new Properties());
String topic2 = "foo2";
createTopic(topic2, 2, 1, new Properties(), listenerName(), new Properties());
// run two consumers in the group consuming from a two-partition topic
addConsumerGroupExecutor(2, topic2, GROUP, groupProtocol);
String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", GROUP};
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
TestUtils.waitForCondition(() -> {
scala.Tuple2<Option<String>, Option<Seq<ConsumerGroupCommand.MemberAssignmentState>>> res = service.collectGroupMembers(GROUP, false);
return res._1.map(s -> s.contains("Stable")).getOrElse(() -> false) &&
res._2.isDefined() &&
res._2.get().count(s -> Objects.equals(s.group(), GROUP)) == 2 &&
res._2.get().count(x -> Objects.equals(x.group(), GROUP) && x.numPartitions() == 1) == 2 &&
res._2.get().count(x -> Objects.equals(x.group(), GROUP) && x.numPartitions() == 0) == 0;
}, "Expected two rows (one row per consumer) in describe group members results.");
scala.Tuple2<Option<String>, Option<Seq<ConsumerGroupCommand.MemberAssignmentState>>> res = service.collectGroupMembers(GROUP, true);
assertTrue(res._1.map(s -> s.contains("Stable")).getOrElse(() -> false) && res._2.map(s -> s.count(x -> x.assignment().isEmpty())).getOrElse(() -> 0) == 0,
"Expected additional columns in verbose version of describe members");
}
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
@MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
public void testDescribeStateWithMultiPartitionTopicAndMultipleConsumers(String quorum, String groupProtocol) throws Exception {
createOffsetsTopic(listenerName(), new Properties());
String topic2 = "foo2";
createTopic(topic2, 2, 1, new Properties(), listenerName(), new Properties());
// run two consumers in the group consuming from a two-partition topic
addConsumerGroupExecutor(2, topic2, GROUP, groupProtocol);
String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", GROUP};
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
TestUtils.waitForCondition(() -> {
ConsumerGroupCommand.GroupState state = service.collectGroupState(GROUP);
return Objects.equals(state.state(), "Stable") && Objects.equals(state.group(), GROUP) && state.numMembers() == 2;
}, "Expected a stable group with two members in describe group state result.");
}
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
@ValueSource(strings = {"zk", "kraft", "kraft+kip848"})
public void testDescribeSimpleConsumerGroup(String quorum) throws Exception {
// Ensure that the offsets of consumers which don't use group management are still displayed
createOffsetsTopic(listenerName(), new Properties());
String topic2 = "foo2";
createTopic(topic2, 2, 1, new Properties(), listenerName(), new Properties());
addSimpleGroupExecutor(Arrays.asList(new TopicPartition(topic2, 0), new TopicPartition(topic2, 1)), GROUP);
String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", GROUP};
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
TestUtils.waitForCondition(() -> {
scala.Tuple2<Option<String>, Option<Seq<ConsumerGroupCommand.PartitionAssignmentState>>> res = service.collectGroupOffsets(GROUP);
return res._1.map(s -> s.contains("Empty")).getOrElse(() -> false)
&& res._2.isDefined() && res._2.get().count(s -> Objects.equals(s.group(), GROUP)) == 2;
}, "Expected a stable group with two members in describe group state result.");
}
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
@MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
public void testDescribeGroupWithShortInitializationTimeout(String quorum, String groupProtocol) {
// Let creation of the offsets topic happen during group initialization to ensure that initialization doesn't
// complete before the timeout expires
List<String> describeType = DESCRIBE_TYPES.get(RANDOM.nextInt(DESCRIBE_TYPES.size()));
String group = GROUP + String.join("", describeType);
// run one consumer in the group consuming from a single-partition topic
addConsumerGroupExecutor(1, groupProtocol);
// set the group initialization timeout too low for the group to stabilize
List<String> cgcArgs = new ArrayList<>(Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--timeout", "1", "--group", group));
cgcArgs.addAll(describeType);
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs.toArray(new String[0]));
ExecutionException e = assertThrows(ExecutionException.class, service::describeGroups);
assertInstanceOf(TimeoutException.class, e.getCause());
}
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
@MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
public void testDescribeGroupOffsetsWithShortInitializationTimeout(String quorum, String groupProtocol) {
// Let creation of the offsets topic happen during group initialization to ensure that initialization doesn't
// complete before the timeout expires
// run one consumer in the group consuming from a single-partition topic
addConsumerGroupExecutor(1, groupProtocol);
// set the group initialization timeout too low for the group to stabilize
String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", GROUP, "--timeout", "1"};
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
Throwable e = assertThrows(ExecutionException.class, () -> service.collectGroupOffsets(GROUP));
assertEquals(TimeoutException.class, e.getCause().getClass());
}
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
@MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
public void testDescribeGroupMembersWithShortInitializationTimeout(String quorum, String groupProtocol) {
// Let creation of the offsets topic happen during group initialization to ensure that initialization doesn't
// complete before the timeout expires
// run one consumer in the group consuming from a single-partition topic
addConsumerGroupExecutor(1, groupProtocol);
// set the group initialization timeout too low for the group to stabilize
String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", GROUP, "--timeout", "1"};
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
Throwable e = assertThrows(ExecutionException.class, () -> service.collectGroupMembers(GROUP, false));
assertEquals(TimeoutException.class, e.getCause().getClass());
e = assertThrows(ExecutionException.class, () -> service.collectGroupMembers(GROUP, true));
assertEquals(TimeoutException.class, e.getCause().getClass());
}
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
@MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
public void testDescribeGroupStateWithShortInitializationTimeout(String quorum, String groupProtocol) {
// Let creation of the offsets topic happen during group initialization to ensure that initialization doesn't
// complete before the timeout expires
// run one consumer in the group consuming from a single-partition topic
addConsumerGroupExecutor(1, groupProtocol);
// set the group initialization timeout too low for the group to stabilize
String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", GROUP, "--timeout", "1"};
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
Throwable e = assertThrows(ExecutionException.class, () -> service.collectGroupState(GROUP));
assertEquals(TimeoutException.class, e.getCause().getClass());
}
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
@ValueSource(strings = {"zk", "kraft"})
public void testDescribeWithUnrecognizedNewConsumerOption(String quorum) {
String[] cgcArgs = new String[]{"--new-consumer", "--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", GROUP};
assertThrows(joptsimple.OptionException.class, () -> getConsumerGroupService(cgcArgs));
}
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
@MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
public void testDescribeNonOffsetCommitGroup(String quorum, String groupProtocol) throws Exception {
createOffsetsTopic(listenerName(), new Properties());
Properties customProps = new Properties();
// create a consumer group that never commits offsets
customProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
// run one consumer in the group consuming from a single-partition topic
addConsumerGroupExecutor(1, TOPIC, GROUP, RangeAssignor.class.getName(), Optional.empty(), Optional.of(customProps), false, groupProtocol);
String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", GROUP};
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
TestUtils.waitForCondition(() -> {
scala.Tuple2<Option<String>, Option<Seq<ConsumerGroupCommand.PartitionAssignmentState>>> groupOffsets = service.collectGroupOffsets(GROUP);
Function1<ConsumerGroupCommand.PartitionAssignmentState, Object> isGrp = s -> Objects.equals(s.group(), GROUP);
boolean res = groupOffsets._1.map(s -> s.contains("Stable")).getOrElse(() -> false) &&
groupOffsets._2.isDefined() &&
groupOffsets._2.get().count(isGrp) == 1;
if (!res)
return false;
@SuppressWarnings("cast")
ConsumerGroupCommand.PartitionAssignmentState assignmentState =
(ConsumerGroupCommand.PartitionAssignmentState) groupOffsets._2.get().filter(isGrp).head();
return assignmentState.consumerId().map(c -> !c.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE())).getOrElse(() -> false) &&
assignmentState.clientId().map(c -> !c.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE())).getOrElse(() -> false) &&
assignmentState.host().map(h -> !h.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE())).getOrElse(() -> false);
}, "Expected a 'Stable' group status, rows and valid values for consumer id / client id / host columns in describe results for non-offset-committing group " + GROUP + ".");
}
private Function0<BoxedUnit> describeGroups(ConsumerGroupCommand.ConsumerGroupService service) {
return () -> {
try {
service.describeGroups();
return null;
} catch (Exception e) {
throw new RuntimeException(e);
}
};
}
}