KAFKA-7471: Multiple Consumer Group Management Feature (#5726)

* Describe/Delete/Reset offsets on multiple consumer groups at a time (including each group by repeating `--group` parameter)
* Describe/Delete/Reset offsets on ALL consumer groups at a time (add new `--all-groups` option similar to `--all-topics`)
* Reset plan CSV file generation reworked: structure updated to support multiple consumer groups and make sure that CSV file generation is done properly since there are no restrictions on consumer group names and symbols like commas and quotes are allowed.
* Extending data output table format by adding `GROUP` column for all `--describe` queries
This commit is contained in:
Alex Dunayevsky 2019-04-16 02:53:28 +03:00 committed by Vahid Hashemian
parent b7d7f7590d
commit 47a9871ef6
7 changed files with 727 additions and 370 deletions

View File

@ -638,6 +638,8 @@ project(':core') {
dependencies {
compile project(':clients')
compile libs.jacksonDatabind
compile libs.jacksonModuleScala
compile libs.jacksonDataformatCsv
compile libs.jacksonJDK8Datatypes
compile libs.joptSimple
compile libs.metrics

View File

@ -19,10 +19,11 @@ package kafka.admin
import java.text.{ParseException, SimpleDateFormat}
import java.time.{Duration, Instant}
import java.util
import java.util.Properties
import joptsimple.OptionSpec
import com.fasterxml.jackson.dataformat.csv.CsvMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import kafka.utils._
import org.apache.kafka.clients.admin._
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, OffsetAndMetadata}
@ -33,8 +34,11 @@ import org.apache.kafka.common.{KafkaException, Node, TopicPartition}
import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer
import scala.collection.{Seq, Set}
import scala.collection.{Seq, Set, mutable}
import scala.util.{Failure, Success, Try}
import joptsimple.OptionSpec
import scala.collection.immutable.TreeMap
import scala.reflect.ClassTag
object ConsumerGroupCommand extends Logging {
@ -56,13 +60,13 @@ object ConsumerGroupCommand extends Logging {
if (opts.options.has(opts.listOpt))
consumerGroupService.listGroups().foreach(println(_))
else if (opts.options.has(opts.describeOpt))
consumerGroupService.describeGroup()
consumerGroupService.describeGroups()
else if (opts.options.has(opts.deleteOpt))
consumerGroupService.deleteGroups()
else if (opts.options.has(opts.resetOffsetsOpt)) {
val offsetsToReset = consumerGroupService.resetOffsets()
if (opts.options.has(opts.exportOpt)) {
val exported = consumerGroupService.exportOffsetsToReset(offsetsToReset)
val exported = consumerGroupService.exportOffsetsToCsv(offsetsToReset)
println(exported)
} else
printOffsetsToReset(offsetsToReset)
@ -78,7 +82,7 @@ object ConsumerGroupCommand extends Logging {
val MISSING_COLUMN_VALUE = "-"
def printError(msg: String, e: Option[Throwable] = None): Unit = {
println(s"Error: $msg")
println(s"\nError: $msg")
e.foreach(_.printStackTrace())
}
@ -95,15 +99,18 @@ object ConsumerGroupCommand extends Logging {
date.getTime
}
def printOffsetsToReset(groupAssignmentsToReset: Map[TopicPartition, OffsetAndMetadata]): Unit = {
println("\n%-30s %-10s %-15s".format("TOPIC", "PARTITION", "NEW-OFFSET"))
groupAssignmentsToReset.foreach {
case (consumerAssignment, offsetAndMetadata) =>
println("%-30s %-10s %-15s".format(
consumerAssignment.topic,
consumerAssignment.partition,
offsetAndMetadata.offset))
def printOffsetsToReset(groupAssignmentsToReset: Map[String, Map[TopicPartition, OffsetAndMetadata]]): Unit = {
if (groupAssignmentsToReset.nonEmpty)
println("\n%-30s %-30s %-10s %-15s".format("GROUP", "TOPIC", "PARTITION", "NEW-OFFSET"))
for {
(groupId, assignment) <- groupAssignmentsToReset
(consumerAssignment, offsetAndMetadata) <- assignment
} {
println("%-30s %-30s %-10s %-15s".format(
groupId,
consumerAssignment.topic,
consumerAssignment.partition,
offsetAndMetadata.offset))
}
}
@ -117,19 +124,61 @@ object ConsumerGroupCommand extends Logging {
private[admin] case class GroupState(group: String, coordinator: Node, assignmentStrategy: String, state: String, numMembers: Int)
private[admin] sealed trait CsvRecord
private[admin] case class CsvRecordWithGroup(group: String, topic: String, partition: Int, offset: Long) extends CsvRecord
private[admin] case class CsvRecordNoGroup(topic: String, partition: Int, offset: Long) extends CsvRecord
private[admin] object CsvRecordWithGroup {
val fields = Array("group", "topic", "partition", "offset")
}
private[admin] object CsvRecordNoGroup {
val fields = Array("topic", "partition", "offset")
}
// Example: CsvUtils().readerFor[CsvRecordWithoutGroup]
private[admin] case class CsvUtils() {
val mapper = new CsvMapper with ScalaObjectMapper
mapper.registerModule(DefaultScalaModule)
def readerFor[T <: CsvRecord: ClassTag] = {
val schema = getSchema[T]
val clazz = implicitly[ClassTag[T]].runtimeClass
mapper.readerFor(clazz).`with`(schema)
}
def writerFor[T <: CsvRecord: ClassTag] = {
val schema = getSchema[T]
val clazz = implicitly[ClassTag[T]].runtimeClass
mapper.writerFor(clazz).`with`(schema)
}
private def getSchema[T <: CsvRecord: ClassTag] = {
val clazz = implicitly[ClassTag[T]].runtimeClass
val fields = clazz match {
case _ if classOf[CsvRecordWithGroup] == clazz => CsvRecordWithGroup.fields
case _ if classOf[CsvRecordNoGroup] == clazz => CsvRecordNoGroup.fields
}
val schema = mapper.schemaFor(clazz).sortedBy(fields: _*)
schema
}
}
class ConsumerGroupService(val opts: ConsumerGroupCommandOptions) {
private val adminClient = createAdminClient()
// `consumer` is only needed for `describe`, so we instantiate it lazily
private var consumer: KafkaConsumer[String, String] = _
// `consumers` are only needed for `describe`, so we instantiate them lazily
private lazy val consumers: mutable.Map[String, KafkaConsumer[String, String]] = mutable.Map.empty
// We have to make sure it is evaluated once and available
private lazy val resetPlanFromFile: Option[Map[String, Map[TopicPartition, OffsetAndMetadata]]] = {
if (opts.options.has(opts.resetFromFileOpt)) {
val resetPlanPath = opts.options.valueOf(opts.resetFromFileOpt)
val resetPlanCsv = Utils.readFileAsString(resetPlanPath)
val resetPlan = parseResetPlan(resetPlanCsv)
Some(resetPlan)
} else None
}
def listGroups(): List[String] = {
val result = adminClient.listConsumerGroups(
withTimeoutMs(new ListConsumerGroupsOptions))
val listings = result.all.get.asScala
listings.map(_.groupId).toList
val result = adminClient.listConsumerGroups(withTimeoutMs(new ListConsumerGroupsOptions))
val listings = result.all.get.asScala
listings.map(_.groupId).toList
}
private def shouldPrintMemberState(group: String, state: Option[String], numRows: Option[Int]): Boolean = {
@ -143,9 +192,9 @@ object ConsumerGroupCommand extends Logging {
case Some("Dead") =>
printError(s"Consumer group '$group' does not exist.")
case Some("Empty") =>
Console.err.println(s"Consumer group '$group' has no active members.")
Console.err.println(s"\nConsumer group '$group' has no active members.")
case Some("PreparingRebalance") | Some("CompletingRebalance") =>
Console.err.println(s"Warning: Consumer group '$group' is rebalancing.")
Console.err.println(s"\nWarning: Consumer group '$group' is rebalancing.")
case Some("Stable") =>
case other =>
// the control should never reach here
@ -157,113 +206,127 @@ object ConsumerGroupCommand extends Logging {
private def size(colOpt: Option[Seq[Object]]): Option[Int] = colOpt.map(_.size)
private def printOffsets(group: String, state: Option[String], assignments: Option[Seq[PartitionAssignmentState]]): Unit = {
if (shouldPrintMemberState(group, state, size(assignments))) {
// find proper columns width
var (maxTopicLen, maxConsumerIdLen, maxHostLen) = (15, 15, 15)
assignments match {
case None => // do nothing
case Some(consumerAssignments) =>
consumerAssignments.foreach { consumerAssignment =>
maxTopicLen = Math.max(maxTopicLen, consumerAssignment.topic.getOrElse(MISSING_COLUMN_VALUE).length)
maxConsumerIdLen = Math.max(maxConsumerIdLen, consumerAssignment.consumerId.getOrElse(MISSING_COLUMN_VALUE).length)
maxHostLen = Math.max(maxHostLen, consumerAssignment.host.getOrElse(MISSING_COLUMN_VALUE).length)
}
}
println(s"\n%${-maxTopicLen}s %-10s %-15s %-15s %-15s %${-maxConsumerIdLen}s %${-maxHostLen}s %s"
.format("TOPIC", "PARTITION", "CURRENT-OFFSET", "LOG-END-OFFSET", "LAG", "CONSUMER-ID", "HOST", "CLIENT-ID"))
assignments match {
case None => // do nothing
case Some(consumerAssignments) =>
consumerAssignments.foreach { consumerAssignment =>
println(s"%-${maxTopicLen}s %-10s %-15s %-15s %-15s %-${maxConsumerIdLen}s %-${maxHostLen}s %s".format(
consumerAssignment.topic.getOrElse(MISSING_COLUMN_VALUE), consumerAssignment.partition.getOrElse(MISSING_COLUMN_VALUE),
consumerAssignment.offset.getOrElse(MISSING_COLUMN_VALUE), consumerAssignment.logEndOffset.getOrElse(MISSING_COLUMN_VALUE),
consumerAssignment.lag.getOrElse(MISSING_COLUMN_VALUE), consumerAssignment.consumerId.getOrElse(MISSING_COLUMN_VALUE),
consumerAssignment.host.getOrElse(MISSING_COLUMN_VALUE), consumerAssignment.clientId.getOrElse(MISSING_COLUMN_VALUE)))
}
}
}
}
private def printMembers(group: String, state: Option[String], assignments: Option[Seq[MemberAssignmentState]], verbose: Boolean): Unit = {
if (shouldPrintMemberState(group, state, size(assignments))) {
// find proper columns width
var (maxConsumerIdLen, maxHostLen, maxClientIdLen) = (15, 15, 15)
assignments match {
case None => // do nothing
case Some(memberAssignments) =>
memberAssignments.foreach { memberAssignment =>
maxConsumerIdLen = Math.max(maxConsumerIdLen, memberAssignment.consumerId.length)
maxHostLen = Math.max(maxHostLen, memberAssignment.host.length)
maxClientIdLen = Math.max(maxClientIdLen, memberAssignment.clientId.length)
}
}
print(s"\n%${-maxConsumerIdLen}s %${-maxHostLen}s %${-maxClientIdLen}s %-15s "
.format("CONSUMER-ID", "HOST", "CLIENT-ID", "#PARTITIONS"))
if (verbose)
print("%s".format("ASSIGNMENT"))
println()
assignments match {
case None => // do nothing
case Some(memberAssignments) =>
memberAssignments.foreach { memberAssignment =>
print(s"%${-maxConsumerIdLen}s %${-maxHostLen}s %${-maxClientIdLen}s %-15s ".format(
memberAssignment.consumerId, memberAssignment.host, memberAssignment.clientId, memberAssignment.numPartitions))
if (verbose) {
val partitions = memberAssignment.assignment match {
case List() => MISSING_COLUMN_VALUE
case assignment =>
assignment.groupBy(_.topic).map {
case (topic, partitionList) => topic + partitionList.map(_.partition).sorted.mkString("(", ",", ")")
}.toList.sorted.mkString(", ")
}
print("%s".format(partitions))
private def printOffsets(offsets: Map[String, (Option[String], Option[Seq[PartitionAssignmentState]])]): Unit = {
for ((groupId, (state, assignments)) <- offsets) {
if (shouldPrintMemberState(groupId, state, size(assignments))) {
// find proper columns width
var (maxGroupLen, maxTopicLen, maxConsumerIdLen, maxHostLen) = (15, 15, 15, 15)
assignments match {
case None => // do nothing
case Some(consumerAssignments) =>
consumerAssignments.foreach { consumerAssignment =>
maxGroupLen = Math.max(maxGroupLen, consumerAssignment.group.length)
maxTopicLen = Math.max(maxTopicLen, consumerAssignment.topic.getOrElse(MISSING_COLUMN_VALUE).length)
maxConsumerIdLen = Math.max(maxConsumerIdLen, consumerAssignment.consumerId.getOrElse(MISSING_COLUMN_VALUE).length)
maxHostLen = Math.max(maxHostLen, consumerAssignment.host.getOrElse(MISSING_COLUMN_VALUE).length)
}
println()
}
}
println(s"\n%${-maxGroupLen}s %${-maxTopicLen}s %-10s %-15s %-15s %-15s %${-maxConsumerIdLen}s %${-maxHostLen}s %s"
.format("GROUP", "TOPIC", "PARTITION", "CURRENT-OFFSET", "LOG-END-OFFSET", "LAG", "CONSUMER-ID", "HOST", "CLIENT-ID"))
assignments match {
case None => // do nothing
case Some(consumerAssignments) =>
consumerAssignments.foreach { consumerAssignment =>
println(s"%${-maxGroupLen}s %${-maxTopicLen}s %-10s %-15s %-15s %-15s %${-maxConsumerIdLen}s %${-maxHostLen}s %s".format(
consumerAssignment.group,
consumerAssignment.topic.getOrElse(MISSING_COLUMN_VALUE), consumerAssignment.partition.getOrElse(MISSING_COLUMN_VALUE),
consumerAssignment.offset.getOrElse(MISSING_COLUMN_VALUE), consumerAssignment.logEndOffset.getOrElse(MISSING_COLUMN_VALUE),
consumerAssignment.lag.getOrElse(MISSING_COLUMN_VALUE), consumerAssignment.consumerId.getOrElse(MISSING_COLUMN_VALUE),
consumerAssignment.host.getOrElse(MISSING_COLUMN_VALUE), consumerAssignment.clientId.getOrElse(MISSING_COLUMN_VALUE))
)
}
}
}
}
}
private def printState(group: String, state: GroupState): Unit = {
if (shouldPrintMemberState(group, Some(state.state), Some(1))) {
val coordinator = s"${state.coordinator.host}:${state.coordinator.port} (${state.coordinator.idString})"
val coordinatorColLen = Math.max(25, coordinator.length)
print(s"\n%${-coordinatorColLen}s %-25s %-20s %s".format("COORDINATOR (ID)", "ASSIGNMENT-STRATEGY", "STATE", "#MEMBERS"))
print(s"\n%${-coordinatorColLen}s %-25s %-20s %s".format(coordinator, state.assignmentStrategy, state.state, state.numMembers))
println()
private def printMembers(members: Map[String, (Option[String], Option[Seq[MemberAssignmentState]])], verbose: Boolean): Unit = {
for ((groupId, (state, assignments)) <- members) {
if (shouldPrintMemberState(groupId, state, size(assignments))) {
// find proper columns width
var (maxGroupLen, maxConsumerIdLen, maxHostLen, maxClientIdLen) = (15, 15, 15, 15)
assignments match {
case None => // do nothing
case Some(memberAssignments) =>
memberAssignments.foreach { memberAssignment =>
maxGroupLen = Math.max(maxGroupLen, memberAssignment.group.length)
maxConsumerIdLen = Math.max(maxConsumerIdLen, memberAssignment.consumerId.length)
maxHostLen = Math.max(maxHostLen, memberAssignment.host.length)
maxClientIdLen = Math.max(maxClientIdLen, memberAssignment.clientId.length)
}
}
print(s"\n%${-maxGroupLen}s %${-maxConsumerIdLen}s %${-maxHostLen}s %${-maxClientIdLen}s %-15s "
.format("GROUP", "CONSUMER-ID", "HOST", "CLIENT-ID", "#PARTITIONS"))
if (verbose)
print(s"%s".format("ASSIGNMENT"))
println()
assignments match {
case None => // do nothing
case Some(memberAssignments) =>
memberAssignments.foreach { memberAssignment =>
print(s"%${-maxGroupLen}s %${-maxConsumerIdLen}s %${-maxHostLen}s %${-maxClientIdLen}s %-15s ".format(
memberAssignment.group, memberAssignment.consumerId, memberAssignment.host, memberAssignment.clientId, memberAssignment.numPartitions))
if (verbose) {
val partitions = memberAssignment.assignment match {
case List() => MISSING_COLUMN_VALUE
case assignment =>
assignment.groupBy(_.topic).map {
case (topic, partitionList) => topic + partitionList.map(_.partition).sorted.mkString("(", ",", ")")
}.toList.sorted.mkString(", ")
}
print(s"%s".format(partitions))
}
println()
}
}
}
}
}
def describeGroup(): Unit = {
val group = opts.options.valuesOf(opts.groupOpt).asScala.head
private def printStates(states: Map[String, GroupState]): Unit = {
for ((groupId, state) <- states) {
if (shouldPrintMemberState(groupId, Some(state.state), Some(1))) {
val coordinator = s"${state.coordinator.host}:${state.coordinator.port} (${state.coordinator.idString})"
val coordinatorColLen = Math.max(25, coordinator.length)
print(s"\n%${-coordinatorColLen}s %-25s %-20s %-15s %s".format("GROUP", "COORDINATOR (ID)", "ASSIGNMENT-STRATEGY", "STATE", "#MEMBERS"))
print(s"\n%${-coordinatorColLen}s %-25s %-20s %-15s %s".format(state.group, coordinator, state.assignmentStrategy, state.state, state.numMembers))
println()
}
}
}
def describeGroups(): Unit = {
val groupIds =
if (opts.options.has(opts.allGroupsOpt)) listGroups()
else opts.options.valuesOf(opts.groupOpt).asScala
val membersOptPresent = opts.options.has(opts.membersOpt)
val stateOptPresent = opts.options.has(opts.stateOpt)
val offsetsOptPresent = opts.options.has(opts.offsetsOpt)
val subActions = Seq(membersOptPresent, offsetsOptPresent, stateOptPresent).count(_ == true)
if (subActions == 0 || offsetsOptPresent) {
val offsets = collectGroupOffsets()
printOffsets(group, offsets._1, offsets._2)
val offsets = collectGroupsOffsets(groupIds)
printOffsets(offsets)
} else if (membersOptPresent) {
val members = collectGroupMembers(opts.options.has(opts.verboseOpt))
printMembers(group, members._1, members._2, opts.options.has(opts.verboseOpt))
} else
printState(group, collectGroupState())
val members = collectGroupsMembers(groupIds, opts.options.has(opts.verboseOpt))
printMembers(members, opts.options.has(opts.verboseOpt))
} else {
val states = collectGroupsState(groupIds)
printStates(states)
}
}
private def collectConsumerAssignment(group: String,
coordinator: Option[Node],
topicPartitions: Seq[TopicPartition],
getPartitionOffset: TopicPartition => Option[Long],
consumerIdOpt: Option[String],
hostOpt: Option[String],
clientIdOpt: Option[String]): Array[PartitionAssignmentState] = {
coordinator: Option[Node],
topicPartitions: Seq[TopicPartition],
getPartitionOffset: TopicPartition => Option[Long],
consumerIdOpt: Option[String],
hostOpt: Option[String],
clientIdOpt: Option[String]): Array[PartitionAssignmentState] = {
if (topicPartitions.isEmpty) {
Array[PartitionAssignmentState](
PartitionAssignmentState(group, coordinator, None, None, None, getLag(None, None), consumerIdOpt, hostOpt, clientIdOpt, None)
@ -290,109 +353,142 @@ object ConsumerGroupCommand extends Logging {
getLag(offset, logEndOffsetOpt), consumerIdOpt, hostOpt, clientIdOpt, logEndOffsetOpt)
}
getLogEndOffsets(topicPartitions).map {
case (topicPartition, LogOffsetResult.LogOffset(offset)) => getDescribePartitionResult(topicPartition, Some(offset))
case (topicPartition, _) => getDescribePartitionResult(topicPartition, None)
getLogEndOffsets(group, topicPartitions).map {
logEndOffsetResult =>
logEndOffsetResult._2 match {
case LogOffsetResult.LogOffset(logEndOffset) => getDescribePartitionResult(logEndOffsetResult._1, Some(logEndOffset))
case LogOffsetResult.Unknown => getDescribePartitionResult(logEndOffsetResult._1, None)
case LogOffsetResult.Ignore => null
}
}.toArray
}
def resetOffsets(): Map[TopicPartition, OffsetAndMetadata] = {
val groupId = opts.options.valueOf(opts.groupOpt)
def resetOffsets(): Map[String, Map[TopicPartition, OffsetAndMetadata]] = {
val groupIds =
if (opts.options.has(opts.allGroupsOpt)) listGroups()
else opts.options.valuesOf(opts.groupOpt).asScala
val consumerGroups = adminClient.describeConsumerGroups(
util.Arrays.asList(groupId),
groupIds.asJava,
withTimeoutMs(new DescribeConsumerGroupsOptions)
).describedGroups()
val group = consumerGroups.get(groupId).get
group.state.toString match {
case "Empty" | "Dead" =>
val partitionsToReset = getPartitionsToReset(groupId)
val preparedOffsets = prepareOffsetsToReset(groupId, partitionsToReset)
val result =
consumerGroups.asScala.foldLeft(Map[String, Map[TopicPartition, OffsetAndMetadata]]()) {
case (acc, (groupId, groupDescription)) =>
groupDescription.get.state().toString match {
case "Empty" | "Dead" =>
val partitionsToReset = getPartitionsToReset(groupId)
val preparedOffsets = prepareOffsetsToReset(groupId, partitionsToReset)
// Dry-run is the default behavior if --execute is not specified
val dryRun = opts.options.has(opts.dryRunOpt) || !opts.options.has(opts.executeOpt)
if (!dryRun)
getConsumer.commitSync(preparedOffsets.asJava)
preparedOffsets
case currentState =>
printError(s"Assignments can only be reset if the group '$groupId' is inactive, but the current state is $currentState.")
Map.empty
// Dry-run is the default behavior if --execute is not specified
val dryRun = opts.options.has(opts.dryRunOpt) || !opts.options.has(opts.executeOpt)
if (!dryRun)
getConsumer(groupId).commitSync(preparedOffsets.asJava)
acc.updated(groupId, preparedOffsets)
case currentState =>
printError(s"Assignments can only be reset if the group '$groupId' is inactive, but the current state is $currentState.")
acc.updated(groupId, Map.empty)
}
}
result
}
private[admin] def describeConsumerGroups(groupIds: Seq[String]): mutable.Map[String, ConsumerGroupDescription] = {
adminClient.describeConsumerGroups(
groupIds.asJava,
withTimeoutMs(new DescribeConsumerGroupsOptions)
).describedGroups().asScala.map {
case (groupId, groupDescriptionFuture) => (groupId, groupDescriptionFuture.get())
}
}
/**
* Returns the state of the specified consumer group and partition assignment states
*/
def collectGroupOffsets(): (Option[String], Option[Seq[PartitionAssignmentState]]) = {
val groupId = opts.options.valueOf(opts.groupOpt)
val consumerGroup = adminClient.describeConsumerGroups(
List(groupId).asJava,
withTimeoutMs(new DescribeConsumerGroupsOptions())
).describedGroups.get(groupId).get
def collectGroupOffsets(groupId: String): (Option[String], Option[Seq[PartitionAssignmentState]]) = {
collectGroupsOffsets(List(groupId)).getOrElse(groupId, (None, None))
}
val state = consumerGroup.state
val committedOffsets = getCommittedOffsets(groupId).asScala.toMap
var assignedTopicPartitions = ListBuffer[TopicPartition]()
val rowsWithConsumer = consumerGroup.members.asScala.filter(!_.assignment.topicPartitions.isEmpty).toSeq
.sortWith(_.assignment.topicPartitions.size > _.assignment.topicPartitions.size).flatMap { consumerSummary =>
val topicPartitions = consumerSummary.assignment.topicPartitions.asScala
assignedTopicPartitions = assignedTopicPartitions ++ topicPartitions
val partitionOffsets = consumerSummary.assignment.topicPartitions.asScala
.map { topicPartition =>
topicPartition -> committedOffsets.get(topicPartition).map(_.offset)
}.toMap
/**
* Returns states of the specified consumer groups and partition assignment states
*/
def collectGroupsOffsets(groupIds: Seq[String]): TreeMap[String, (Option[String], Option[Seq[PartitionAssignmentState]])] = {
val consumerGroups = describeConsumerGroups(groupIds)
collectConsumerAssignment(groupId, Option(consumerGroup.coordinator), topicPartitions.toList,
partitionOffsets, Some(s"${consumerSummary.consumerId}"), Some(s"${consumerSummary.host}"),
Some(s"${consumerSummary.clientId}"))
}
val groupOffsets = TreeMap[String, (Option[String], Option[Seq[PartitionAssignmentState]])]() ++ (for ((groupId, consumerGroup) <- consumerGroups) yield {
val state = consumerGroup.state
val committedOffsets = getCommittedOffsets(groupId).asScala.toMap
var assignedTopicPartitions = ListBuffer[TopicPartition]()
val rowsWithConsumer = consumerGroup.members.asScala.filter(!_.assignment.topicPartitions.isEmpty).toSeq
.sortWith(_.assignment.topicPartitions.size > _.assignment.topicPartitions.size).flatMap { consumerSummary =>
val topicPartitions = consumerSummary.assignment.topicPartitions.asScala
assignedTopicPartitions = assignedTopicPartitions ++ topicPartitions
val partitionOffsets = consumerSummary.assignment.topicPartitions.asScala
.map { topicPartition =>
topicPartition -> committedOffsets.get(topicPartition).map(_.offset)
}.toMap
collectConsumerAssignment(groupId, Option(consumerGroup.coordinator), topicPartitions.toList,
partitionOffsets, Some(s"${consumerSummary.consumerId}"), Some(s"${consumerSummary.host}"),
Some(s"${consumerSummary.clientId}"))
}
val rowsWithoutConsumer = committedOffsets.filterKeys(!assignedTopicPartitions.contains(_)).flatMap {
case (topicPartition, offset) =>
collectConsumerAssignment(
groupId,
Option(consumerGroup.coordinator),
Seq(topicPartition),
Map(topicPartition -> Some(offset.offset)),
Some(MISSING_COLUMN_VALUE),
Some(MISSING_COLUMN_VALUE),
Some(MISSING_COLUMN_VALUE))
}
groupId -> (Some(state.toString), Some(rowsWithConsumer ++ rowsWithoutConsumer))
}).toMap
val rowsWithoutConsumer = committedOffsets.filterKeys(!assignedTopicPartitions.contains(_)).flatMap {
case (topicPartition, offset) =>
collectConsumerAssignment(
groupOffsets
}
private[admin] def collectGroupMembers(groupId: String, verbose: Boolean): (Option[String], Option[Seq[MemberAssignmentState]]) = {
collectGroupsMembers(Seq(groupId), verbose)(groupId)
}
private[admin] def collectGroupsMembers(groupIds: Seq[String], verbose: Boolean): TreeMap[String, (Option[String], Option[Seq[MemberAssignmentState]])] = {
val consumerGroups = describeConsumerGroups(groupIds)
TreeMap[String, (Option[String], Option[Seq[MemberAssignmentState]])]() ++ (for ((groupId, consumerGroup) <- consumerGroups) yield {
val state = consumerGroup.state.toString
val memberAssignmentStates = consumerGroup.members().asScala.map(consumer =>
MemberAssignmentState(
groupId,
Option(consumerGroup.coordinator),
Seq(topicPartition),
Map(topicPartition -> Some(offset.offset)),
Some(MISSING_COLUMN_VALUE),
Some(MISSING_COLUMN_VALUE),
Some(MISSING_COLUMN_VALUE))
}
(Some(state.toString), Some(rowsWithConsumer ++ rowsWithoutConsumer))
consumer.consumerId,
consumer.host,
consumer.clientId,
consumer.assignment.topicPartitions.size(),
if (verbose) consumer.assignment.topicPartitions.asScala.toList else List()
)).toList
groupId -> (Some(state), Option(memberAssignmentStates))
}).toMap
}
private[admin] def collectGroupMembers(verbose: Boolean): (Option[String], Option[Seq[MemberAssignmentState]]) = {
val groupId = opts.options.valueOf(opts.groupOpt)
val consumerGroups = adminClient.describeConsumerGroups(
List(groupId).asJava,
withTimeoutMs(new DescribeConsumerGroupsOptions)
).describedGroups()
val group = consumerGroups.get(groupId).get
val state = group.state
(Some(state.toString),
Option(group.members().asScala.map {
consumer => MemberAssignmentState(groupId, consumer.consumerId, consumer.host, consumer.clientId, consumer.assignment.topicPartitions.size(),
if (verbose) consumer.assignment.topicPartitions.asScala.toList else List())
}.toList))
private[admin] def collectGroupState(groupId: String): GroupState = {
collectGroupsState(Seq(groupId))(groupId)
}
private[admin] def collectGroupState(): GroupState = {
val groupId = opts.options.valueOf(opts.groupOpt)
val consumerGroups = adminClient.describeConsumerGroups(
util.Arrays.asList(groupId),
withTimeoutMs(new DescribeConsumerGroupsOptions)
).describedGroups()
val group = consumerGroups.get(groupId).get
GroupState(groupId, group.coordinator, group.partitionAssignor(),
group.state.toString, group.members().size)
private[admin] def collectGroupsState(groupIds: Seq[String]): TreeMap[String, GroupState] = {
val consumerGroups = describeConsumerGroups(groupIds)
TreeMap[String, GroupState]() ++ (for ((groupId, groupDescription) <- consumerGroups) yield {
groupId -> GroupState(
groupId,
groupDescription.coordinator,
groupDescription.partitionAssignor(),
groupDescription.state.toString,
groupDescription.members().size
)
}).toMap
}
private def getLogEndOffsets(topicPartitions: Seq[TopicPartition]): Map[TopicPartition, LogOffsetResult] = {
val offsets = getConsumer.endOffsets(topicPartitions.asJava)
private def getLogEndOffsets(groupId: String, topicPartitions: Seq[TopicPartition]): Map[TopicPartition, LogOffsetResult] = {
val offsets = getConsumer(groupId).endOffsets(topicPartitions.asJava)
topicPartitions.map { topicPartition =>
Option(offsets.get(topicPartition)) match {
case Some(logEndOffset) => topicPartition -> LogOffsetResult.LogOffset(logEndOffset)
@ -401,8 +497,8 @@ object ConsumerGroupCommand extends Logging {
}.toMap
}
private def getLogStartOffsets(topicPartitions: Seq[TopicPartition]): Map[TopicPartition, LogOffsetResult] = {
val offsets = getConsumer.beginningOffsets(topicPartitions.asJava)
private def getLogStartOffsets(groupId: String, topicPartitions: Seq[TopicPartition]): Map[TopicPartition, LogOffsetResult] = {
val offsets = getConsumer(groupId).beginningOffsets(topicPartitions.asJava)
topicPartitions.map { topicPartition =>
Option(offsets.get(topicPartition)) match {
case Some(logStartOffset) => topicPartition -> LogOffsetResult.LogOffset(logStartOffset)
@ -411,8 +507,8 @@ object ConsumerGroupCommand extends Logging {
}.toMap
}
private def getLogTimestampOffsets(topicPartitions: Seq[TopicPartition], timestamp: java.lang.Long): Map[TopicPartition, LogOffsetResult] = {
val consumer = getConsumer
private def getLogTimestampOffsets(groupId: String, topicPartitions: Seq[TopicPartition], timestamp: java.lang.Long): Map[TopicPartition, LogOffsetResult] = {
val consumer = getConsumer(groupId)
consumer.assign(topicPartitions.asJava)
val (successfulOffsetsForTimes, unsuccessfulOffsetsForTimes) =
@ -422,12 +518,14 @@ object ConsumerGroupCommand extends Logging {
case (topicPartition, offsetAndTimestamp) => topicPartition -> LogOffsetResult.LogOffset(offsetAndTimestamp.offset)
}.toMap
successfulLogTimestampOffsets ++ getLogEndOffsets(unsuccessfulOffsetsForTimes.keySet.toSeq)
successfulLogTimestampOffsets ++ getLogEndOffsets(groupId, unsuccessfulOffsetsForTimes.keySet.toSeq)
}
def close() {
adminClient.close()
if (consumer != null) consumer.close()
consumers.values.foreach(consumer =>
Option(consumer).foreach(_.close())
)
}
private def createAdminClient(): admin.AdminClient = {
@ -436,18 +534,18 @@ object ConsumerGroupCommand extends Logging {
admin.AdminClient.create(props)
}
private def getConsumer = {
if (consumer == null)
consumer = createConsumer()
consumer
private def getConsumer(groupId: String) = {
if (consumers.get(groupId).isEmpty)
consumers.update(groupId, createConsumer(groupId))
consumers(groupId)
}
private def createConsumer(): KafkaConsumer[String, String] = {
private def createConsumer(groupId: String): KafkaConsumer[String, String] = {
val properties = new Properties()
val deserializer = (new StringDeserializer).getClass.getName
val brokerUrl = opts.options.valueOf(opts.bootstrapServerOpt)
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl)
properties.put(ConsumerConfig.GROUP_ID_CONFIG, opts.options.valueOf(opts.groupOpt))
properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000")
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserializer)
@ -467,12 +565,12 @@ object ConsumerGroupCommand extends Logging {
options.timeoutMs(t)
}
private def parseTopicPartitionsToReset(topicArgs: Seq[String]): Seq[TopicPartition] = topicArgs.flatMap {
private def parseTopicPartitionsToReset(groupId: String, topicArgs: Seq[String]): Seq[TopicPartition] = topicArgs.flatMap {
case topicArg if topicArg.contains(":") =>
val topicPartitions = topicArg.split(":")
val topic = topicPartitions(0)
topicPartitions(1).split(",").map(partition => new TopicPartition(topic, partition.toInt))
case topic => getConsumer.partitionsFor(topic).asScala
case topic => getConsumer(groupId).partitionsFor(topic).asScala
.map(partitionInfo => new TopicPartition(topic, partitionInfo.partition))
}
@ -482,7 +580,7 @@ object ConsumerGroupCommand extends Logging {
allTopicPartitions
} else if (opts.options.has(opts.topicOpt)) {
val topics = opts.options.valuesOf(opts.topicOpt).asScala
parseTopicPartitionsToReset(topics)
parseTopicPartitionsToReset(groupId, topics)
} else {
if (opts.options.has(opts.resetFromFileOpt))
Nil
@ -498,24 +596,46 @@ object ConsumerGroupCommand extends Logging {
).partitionsToOffsetAndMetadata.get
}
private def parseResetPlan(resetPlanCsv: String): Map[TopicPartition, OffsetAndMetadata] = {
resetPlanCsv.split("\n")
.map { line =>
val Array(topic, partition, offset) = line.split(",").map(_.trim)
val topicPartition = new TopicPartition(topic, partition.toInt)
val offsetAndMetadata = new OffsetAndMetadata(offset.toLong)
(topicPartition, offsetAndMetadata)
}.toMap
type GroupMetadata = Map[String, Map[TopicPartition, OffsetAndMetadata]]
private def parseResetPlan(resetPlanCsv: String): GroupMetadata = {
def updateGroupMetadata(group: String, topic: String, partition: Int, offset: Long, acc: GroupMetadata) = {
val topicPartition = new TopicPartition(topic, partition)
val offsetAndMetadata = new OffsetAndMetadata(offset)
val dataMap = acc.getOrElse(group, Map()).updated(topicPartition, offsetAndMetadata)
acc.updated(group, dataMap)
}
val csvReader = CsvUtils().readerFor[CsvRecordNoGroup]
val lines = resetPlanCsv.split("\n")
val isSingleGroupQuery = opts.options.valuesOf(opts.groupOpt).size() == 1
val isOldCsvFormat = lines.headOption.flatMap(line =>
Try(csvReader.readValue[CsvRecordNoGroup](line)).toOption).nonEmpty
// Single group CSV format: "topic,partition,offset"
val dataMap = if (isSingleGroupQuery && isOldCsvFormat) {
val group = opts.options.valueOf(opts.groupOpt)
lines.foldLeft(Map[String, Map[TopicPartition, OffsetAndMetadata]]()) { (acc, line) =>
val CsvRecordNoGroup(topic, partition, offset) = csvReader.readValue[CsvRecordNoGroup](line)
updateGroupMetadata(group, topic, partition, offset, acc)
}
// Multiple group CSV format: "group,topic,partition,offset"
} else {
val csvReader = CsvUtils().readerFor[CsvRecordWithGroup]
lines.foldLeft(Map[String, Map[TopicPartition, OffsetAndMetadata]]()) { (acc, line) =>
val CsvRecordWithGroup(group, topic, partition, offset) = csvReader.readValue[CsvRecordWithGroup](line)
updateGroupMetadata(group, topic, partition, offset, acc)
}
}
dataMap
}
private def prepareOffsetsToReset(groupId: String, partitionsToReset: Seq[TopicPartition]): Map[TopicPartition, OffsetAndMetadata] = {
private def prepareOffsetsToReset(groupId: String,
partitionsToReset: Seq[TopicPartition]): Map[TopicPartition, OffsetAndMetadata] = {
if (opts.options.has(opts.resetToOffsetOpt)) {
val offset = opts.options.valueOf(opts.resetToOffsetOpt)
checkOffsetsRange(partitionsToReset.map((_, offset)).toMap).map {
checkOffsetsRange(groupId, partitionsToReset.map((_, offset)).toMap).map {
case (topicPartition, newOffset) => (topicPartition, new OffsetAndMetadata(newOffset))
}
} else if (opts.options.has(opts.resetToEarliestOpt)) {
val logStartOffsets = getLogStartOffsets(partitionsToReset)
val logStartOffsets = getLogStartOffsets(groupId, partitionsToReset)
partitionsToReset.map { topicPartition =>
logStartOffsets.get(topicPartition) match {
case Some(LogOffsetResult.LogOffset(offset)) => (topicPartition, new OffsetAndMetadata(offset))
@ -523,7 +643,7 @@ object ConsumerGroupCommand extends Logging {
}
}.toMap
} else if (opts.options.has(opts.resetToLatestOpt)) {
val logEndOffsets = getLogEndOffsets(partitionsToReset)
val logEndOffsets = getLogEndOffsets(groupId, partitionsToReset)
partitionsToReset.map { topicPartition =>
logEndOffsets.get(topicPartition) match {
case Some(LogOffsetResult.LogOffset(offset)) => (topicPartition, new OffsetAndMetadata(offset))
@ -538,12 +658,12 @@ object ConsumerGroupCommand extends Logging {
throw new IllegalArgumentException(s"Cannot shift offset for partition $topicPartition since there is no current committed offset")).offset
(topicPartition, currentOffset + shiftBy)
}.toMap
checkOffsetsRange(requestedOffsets).map {
checkOffsetsRange(groupId, requestedOffsets).map {
case (topicPartition, newOffset) => (topicPartition, new OffsetAndMetadata(newOffset))
}
} else if (opts.options.has(opts.resetToDatetimeOpt)) {
val timestamp = convertTimestamp(opts.options.valueOf(opts.resetToDatetimeOpt))
val logTimestampOffsets = getLogTimestampOffsets(partitionsToReset, timestamp)
val logTimestampOffsets = getLogTimestampOffsets(groupId, partitionsToReset, timestamp)
partitionsToReset.map { topicPartition =>
val logTimestampOffset = logTimestampOffsets.get(topicPartition)
logTimestampOffset match {
@ -555,8 +675,9 @@ object ConsumerGroupCommand extends Logging {
val duration = opts.options.valueOf(opts.resetByDurationOpt)
val durationParsed = Duration.parse(duration)
val now = Instant.now()
durationParsed.negated().addTo(now)
val timestamp = now.minus(durationParsed).toEpochMilli
val logTimestampOffsets = getLogTimestampOffsets(partitionsToReset, timestamp)
val logTimestampOffsets = getLogTimestampOffsets(groupId, partitionsToReset, timestamp)
partitionsToReset.map { topicPartition =>
val logTimestampOffset = logTimestampOffsets.get(topicPartition)
logTimestampOffset match {
@ -564,16 +685,20 @@ object ConsumerGroupCommand extends Logging {
case _ => CommandLineUtils.printUsageAndDie(opts.parser, s"Error getting offset by timestamp of topic partition: $topicPartition")
}
}.toMap
} else if (opts.options.has(opts.resetFromFileOpt)) {
val resetPlanPath = opts.options.valueOf(opts.resetFromFileOpt)
val resetPlanCsv = Utils.readFileAsString(resetPlanPath)
val resetPlan = parseResetPlan(resetPlanCsv)
val requestedOffsets = resetPlan.keySet.map { topicPartition =>
(topicPartition, resetPlan(topicPartition).offset)
}.toMap
checkOffsetsRange(requestedOffsets).map {
case (topicPartition, newOffset) => (topicPartition, new OffsetAndMetadata(newOffset))
}
} else if (resetPlanFromFile.isDefined) {
resetPlanFromFile.map(resetPlan => resetPlan.get(groupId).map { resetPlanForGroup =>
val requestedOffsets = resetPlanForGroup.keySet.map { topicPartition =>
topicPartition -> resetPlanForGroup(topicPartition).offset
}.toMap
checkOffsetsRange(groupId, requestedOffsets).map {
case (topicPartition, newOffset) => (topicPartition, new OffsetAndMetadata(newOffset))
}
} match {
case Some(resetPlanForGroup) => resetPlanForGroup
case None =>
printError(s"No reset plan for group $groupId found")
Map[TopicPartition, OffsetAndMetadata]()
}).getOrElse(Map.empty)
} else if (opts.options.has(opts.resetToCurrentOpt)) {
val currentCommittedOffsets = getCommittedOffsets(groupId)
val (partitionsToResetWithCommittedOffset, partitionsToResetWithoutCommittedOffset) =
@ -586,7 +711,7 @@ object ConsumerGroupCommand extends Logging {
}))
}.toMap
val preparedOffsetsForPartitionsWithoutCommittedOffset = getLogEndOffsets(partitionsToResetWithoutCommittedOffset).map {
val preparedOffsetsForPartitionsWithoutCommittedOffset = getLogEndOffsets(groupId, partitionsToResetWithoutCommittedOffset).map {
case (topicPartition, LogOffsetResult.LogOffset(offset)) => (topicPartition, new OffsetAndMetadata(offset))
case (topicPartition, _) => CommandLineUtils.printUsageAndDie(opts.parser, s"Error getting ending offset of topic partition: $topicPartition")
}
@ -597,9 +722,9 @@ object ConsumerGroupCommand extends Logging {
}
}
private def checkOffsetsRange(requestedOffsets: Map[TopicPartition, Long]) = {
val logStartOffsets = getLogStartOffsets(requestedOffsets.keySet.toSeq)
val logEndOffsets = getLogEndOffsets(requestedOffsets.keySet.toSeq)
private def checkOffsetsRange(groupId: String, requestedOffsets: Map[TopicPartition, Long]) = {
val logStartOffsets = getLogStartOffsets(groupId, requestedOffsets.keySet.toSeq)
val logEndOffsets = getLogEndOffsets(groupId, requestedOffsets.keySet.toSeq)
requestedOffsets.map { case (topicPartition, offset) => (topicPartition,
logEndOffsets.get(topicPartition) match {
case Some(LogOffsetResult.LogOffset(endOffset)) if offset > endOffset =>
@ -620,19 +745,33 @@ object ConsumerGroupCommand extends Logging {
}
}
def exportOffsetsToReset(assignmentsToReset: Map[TopicPartition, OffsetAndMetadata]): String = {
val rows = assignmentsToReset.map { case (k,v) => s"${k.topic},${k.partition},${v.offset}" }(collection.breakOut): List[String]
rows.foldRight("")(_ + "\n" + _)
def exportOffsetsToCsv(assignments: Map[String, Map[TopicPartition, OffsetAndMetadata]]): String = {
val isSingleGroupQuery = opts.options.valuesOf(opts.groupOpt).size() == 1
val csvWriter =
if (isSingleGroupQuery) CsvUtils().writerFor[CsvRecordNoGroup]
else CsvUtils().writerFor[CsvRecordWithGroup]
val rows = assignments.flatMap { case (groupId, partitionInfo) =>
partitionInfo.map { case (k: TopicPartition, v: OffsetAndMetadata) =>
val csvRecord =
if (isSingleGroupQuery) CsvRecordNoGroup(k.topic, k.partition, v.offset)
else CsvRecordWithGroup(groupId, k.topic, k.partition, v.offset)
csvWriter.writeValueAsString(csvRecord)
}(collection.breakOut): List[String]
}
rows.mkString("")
}
def deleteGroups(): Map[String, Throwable] = {
val groupsToDelete = opts.options.valuesOf(opts.groupOpt).asScala.toList
val deletedGroups = adminClient.deleteConsumerGroups(
groupsToDelete.asJava,
val groupIds =
if (opts.options.has(opts.allGroupsOpt)) listGroups()
else opts.options.valuesOf(opts.groupOpt).asScala
val groupsToDelete = adminClient.deleteConsumerGroups(
groupIds.asJava,
withTimeoutMs(new DeleteConsumerGroupsOptions)
).deletedGroups().asScala
val result = deletedGroups.mapValues { f =>
val result = groupsToDelete.mapValues { f =>
Try(f.get) match {
case _: Success[_] => null
case Failure(e) => e
@ -676,6 +815,7 @@ object ConsumerGroupCommand extends Logging {
val AllTopicsDoc = "Consider all topics assigned to a group in the `reset-offsets` process."
val ListDoc = "List all consumer groups."
val DescribeDoc = "Describe consumer group and list offset lag (number of messages not yet processed) related to given group."
val AllGroupsDoc = "Apply to all consumer groups."
val nl = System.getProperty("line.separator")
val DeleteDoc = "Pass in groups to delete topic partition offsets and ownership information " +
"over the entire consumer group. For instance --group g1 --group g2"
@ -725,6 +865,7 @@ object ConsumerGroupCommand extends Logging {
val allTopicsOpt = parser.accepts("all-topics", AllTopicsDoc)
val listOpt = parser.accepts("list", ListDoc)
val describeOpt = parser.accepts("describe", DescribeDoc)
val allGroupsOpt = parser.accepts("all-groups", AllGroupsDoc)
val deleteOpt = parser.accepts("delete", DeleteDoc)
val timeoutMsOpt = parser.accepts("timeout", TimeoutMsDoc)
.withRequiredArg
@ -775,28 +916,32 @@ object ConsumerGroupCommand extends Logging {
options = parser.parse(args : _*)
val describeOptPresent = options.has(describeOpt)
val allConsumerGroupLevelOpts: Set[OptionSpec[_]] = Set(listOpt, describeOpt, deleteOpt, resetOffsetsOpt)
val allGroupSelectionScopeOpts: Set[OptionSpec[_]] = Set(groupOpt, allGroupsOpt)
val allConsumerGroupLevelOpts: Set[OptionSpec[_]] = Set(listOpt, describeOpt, deleteOpt, resetOffsetsOpt)
val allResetOffsetScenarioOpts: Set[OptionSpec[_]] = Set(resetToOffsetOpt, resetShiftByOpt,
resetToDatetimeOpt, resetByDurationOpt, resetToEarliestOpt, resetToLatestOpt, resetToCurrentOpt, resetFromFileOpt)
def checkArgs() {
// check required args
if (options.has(timeoutMsOpt) && !describeOptPresent)
debug(s"Option $timeoutMsOpt is applicable only when $describeOpt is used.")
CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt)
if (options.has(deleteOpt) && options.has(topicOpt))
if (options.has(describeOpt)) {
if (!options.has(groupOpt) && !options.has(allGroupsOpt))
CommandLineUtils.printUsageAndDie(parser,
s"Option $describeOpt takes one of these options: ${allGroupSelectionScopeOpts.mkString(", ")}")
} else {
if (options.has(timeoutMsOpt))
debug(s"Option $timeoutMsOpt is applicable only when $describeOpt is used.")
}
if (options.has(deleteOpt)) {
if (!options.has(groupOpt) && !options.has(allGroupsOpt))
CommandLineUtils.printUsageAndDie(parser,
s"Option $deleteOpt takes one of these options: ${allGroupSelectionScopeOpts.mkString(", ")}")
if (options.has(topicOpt))
CommandLineUtils.printUsageAndDie(parser, s"The consumer does not support topic-specific offset " +
"deletion from a consumer group.")
if (describeOptPresent)
CommandLineUtils.checkRequiredArgs(parser, options, groupOpt)
if (options.has(deleteOpt) && !options.has(groupOpt))
CommandLineUtils.printUsageAndDie(parser, s"Option $deleteOpt takes $groupOpt")
}
if (options.has(resetOffsetsOpt)) {
if (options.has(dryRunOpt) && options.has(executeOpt))
@ -809,18 +954,20 @@ object ConsumerGroupCommand extends Logging {
"if you are scripting this command and want to keep the current default behavior without prompting.")
}
CommandLineUtils.checkRequiredArgs(parser, options, groupOpt)
CommandLineUtils.checkInvalidArgs(parser, options, resetToOffsetOpt, allResetOffsetScenarioOpts - resetToOffsetOpt)
if (!options.has(groupOpt) && !options.has(allGroupsOpt))
CommandLineUtils.printUsageAndDie(parser,
s"Option $resetOffsetsOpt takes one of these options: ${allGroupSelectionScopeOpts.mkString(", ")}")
CommandLineUtils.checkInvalidArgs(parser, options, resetToOffsetOpt, allResetOffsetScenarioOpts - resetToOffsetOpt)
CommandLineUtils.checkInvalidArgs(parser, options, resetToDatetimeOpt, allResetOffsetScenarioOpts - resetToDatetimeOpt)
CommandLineUtils.checkInvalidArgs(parser, options, resetByDurationOpt, allResetOffsetScenarioOpts - resetByDurationOpt)
CommandLineUtils.checkInvalidArgs(parser, options, resetToEarliestOpt, allResetOffsetScenarioOpts - resetToEarliestOpt)
CommandLineUtils.checkInvalidArgs(parser, options, resetToLatestOpt, allResetOffsetScenarioOpts - resetToLatestOpt)
CommandLineUtils.checkInvalidArgs(parser, options, resetToCurrentOpt, allResetOffsetScenarioOpts - resetToCurrentOpt)
CommandLineUtils.checkInvalidArgs(parser, options, resetShiftByOpt, allResetOffsetScenarioOpts - resetShiftByOpt)
CommandLineUtils.checkInvalidArgs(parser, options, resetFromFileOpt, allResetOffsetScenarioOpts - resetFromFileOpt)
CommandLineUtils.checkInvalidArgs(parser, options, resetToLatestOpt, allResetOffsetScenarioOpts - resetToLatestOpt)
CommandLineUtils.checkInvalidArgs(parser, options, resetToCurrentOpt, allResetOffsetScenarioOpts - resetToCurrentOpt)
CommandLineUtils.checkInvalidArgs(parser, options, resetShiftByOpt, allResetOffsetScenarioOpts - resetShiftByOpt)
CommandLineUtils.checkInvalidArgs(parser, options, resetFromFileOpt, allResetOffsetScenarioOpts - resetFromFileOpt)
}
// check invalid args
CommandLineUtils.checkInvalidArgs(parser, options, groupOpt, allGroupSelectionScopeOpts - groupOpt)
CommandLineUtils.checkInvalidArgs(parser, options, groupOpt, allConsumerGroupLevelOpts - describeOpt - deleteOpt - resetOffsetsOpt)
CommandLineUtils.checkInvalidArgs(parser, options, topicOpt, allConsumerGroupLevelOpts - deleteOpt - resetOffsetsOpt)
}

View File

@ -1110,7 +1110,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group)
val opts = new ConsumerGroupCommandOptions(cgcArgs)
val consumerGroupService = new ConsumerGroupService(opts)
consumerGroupService.describeGroup()
consumerGroupService.describeGroups()
consumerGroupService.close()
}

View File

@ -42,7 +42,7 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
val output = TestUtils.grabConsoleOutput(service.deleteGroups())
assertTrue(s"The expected error (${Errors.GROUP_ID_NOT_FOUND}) was not detected while deleting consumer group",
output.contains(s"Group '$missingGroup' could not be deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message))
output.contains(s"Group '$missingGroup' could not be deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message))
}
@Test
@ -56,7 +56,7 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
val result = service.deleteGroups()
assertTrue(s"The expected error (${Errors.GROUP_ID_NOT_FOUND}) was not detected while deleting consumer group",
result.size == 1 && result.keySet.contains(missingGroup) && result.get(missingGroup).get.getCause
result.size == 1 && result.keySet.contains(missingGroup) && result(missingGroup).getCause
.isInstanceOf[GroupIdNotFoundException])
}
@ -70,7 +70,7 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
service.collectGroupMembers(false)._2.get.size == 1
service.collectGroupMembers(group, false)._2.get.size == 1
}, "The group did not initialize as expected.", maxRetries = 3)
val output = TestUtils.grabConsoleOutput(service.deleteGroups())
@ -88,13 +88,13 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
service.collectGroupMembers(false)._2.get.size == 1
service.collectGroupMembers(group, false)._2.get.size == 1
}, "The group did not initialize as expected.", maxRetries = 3)
val result = service.deleteGroups()
assertNotNull(s"Group was deleted successfully, but it shouldn't have been. Result was:(${result})", result.get(group).get)
assertNotNull(s"Group was deleted successfully, but it shouldn't have been. Result was:(${result})", result(group))
assertTrue(s"The expected error (${Errors.NON_EMPTY_GROUP}) was not detected while deleting consumer group. Result was:(${result})",
result.size == 1 && result.keySet.contains(group) && result.get(group).get.getCause.isInstanceOf[GroupNotEmptyException])
result.size == 1 && result.keySet.contains(group) && result(group).getCause.isInstanceOf[GroupNotEmptyException])
}
@Test
@ -113,7 +113,7 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
executor.shutdown()
TestUtils.waitUntilTrue(() => {
service.collectGroupState().state == "Empty"
service.collectGroupState(group).state == "Empty"
}, "The group did become empty as expected.", maxRetries = 3)
val output = TestUtils.grabConsoleOutput(service.deleteGroups())
@ -121,6 +121,43 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
output.contains(s"Deletion of requested consumer groups ('$group') was successful."))
}
@Test
def testDeleteCmdAllGroups() {
TestUtils.createOffsetsTopic(zkClient, servers)
// Create 3 groups with 1 consumer per each
val groups =
(for (i <- 1 to 3) yield {
val group = this.group + i
val executor = addConsumerGroupExecutor(numConsumers = 1, group = group)
group -> executor
}).toMap
val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--all-groups")
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
service.listGroups().forall(groupId => groups.keySet.contains(groupId))
}, "The group did not initialize as expected.", maxRetries = 3)
// Shutdown consumers to empty out groups
groups.values.foreach(executor => executor.shutdown())
TestUtils.waitUntilTrue(() => {
groups.keySet.forall(groupId => service.collectGroupState(groupId).state == "Empty")
}, "The group did become empty as expected.", maxRetries = 3)
val output = TestUtils.grabConsoleOutput(service.deleteGroups()).trim
val expectedGroupsForDeletion = groups.keySet
val deletedGroupsGrepped = output.substring(output.indexOf('(') + 1, output.indexOf(')')).split(',')
.map(_.replaceAll("'", "").trim).toSet
assertTrue(s"The consumer group(s) could not be deleted as expected",
output.matches(s"Deletion of requested consumer groups (.*) was successful.")
&& deletedGroupsGrepped == expectedGroupsForDeletion
)
}
@Test
def testDeleteEmptyGroup() {
TestUtils.createOffsetsTopic(zkClient, servers)
@ -137,12 +174,12 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
executor.shutdown()
TestUtils.waitUntilTrue(() => {
service.collectGroupState().state == "Empty"
service.collectGroupState(group).state == "Empty"
}, "The group did become empty as expected.", maxRetries = 3)
val result = service.deleteGroups()
assertTrue(s"The consumer group could not be deleted as expected",
result.size == 1 && result.keySet.contains(group) && result.get(group).get == null)
result.size == 1 && result.keySet.contains(group) && result(group) == null)
}
@Test
@ -162,14 +199,14 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
executor.shutdown()
TestUtils.waitUntilTrue(() => {
service.collectGroupState().state == "Empty"
service.collectGroupState(group).state == "Empty"
}, "The group did become empty as expected.", maxRetries = 3)
val service2 = getConsumerGroupService(cgcArgs ++ Array("--group", missingGroup))
val output = TestUtils.grabConsoleOutput(service2.deleteGroups())
assertTrue(s"The consumer group deletion did not work as expected",
output.contains(s"Group '$missingGroup' could not be deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message) &&
output.contains(s"These consumer groups were deleted successfully: '$group'"))
output.contains(s"These consumer groups were deleted successfully: '$group'"))
}
@Test
@ -189,15 +226,16 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
executor.shutdown()
TestUtils.waitUntilTrue(() => {
service.collectGroupState().state == "Empty"
service.collectGroupState(group).state == "Empty"
}, "The group did become empty as expected.", maxRetries = 3)
val service2 = getConsumerGroupService(cgcArgs ++ Array("--group", missingGroup))
val result = service2.deleteGroups()
assertTrue(s"The consumer group deletion did not work as expected",
result.size == 2 &&
result.keySet.contains(group) && result.get(group).get == null &&
result.keySet.contains(missingGroup) && result.get(missingGroup).get.getMessage.contains(Errors.GROUP_ID_NOT_FOUND.message))
result.keySet.contains(group) && result(group) == null &&
result.keySet.contains(missingGroup) &&
result(missingGroup).getMessage.contains(Errors.GROUP_ID_NOT_FOUND.message))
}

View File

@ -22,7 +22,7 @@ import joptsimple.OptionException
import kafka.utils.TestUtils
import org.apache.kafka.clients.consumer.{ConsumerConfig, RoundRobinAssignor}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.{TimeoutException}
import org.apache.kafka.common.errors.TimeoutException
import org.junit.Assert._
import org.junit.Test
@ -46,7 +46,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", missingGroup) ++ describeType
val service = getConsumerGroupService(cgcArgs)
val output = TestUtils.grabConsoleOutput(service.describeGroup())
val output = TestUtils.grabConsoleOutput(service.describeGroups())
assertTrue(s"Expected error was not detected for describe option '${describeType.mkString(" ")}'",
output.contains(s"Consumer group '$missingGroup' does not exist."))
}
@ -61,49 +61,52 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
@Test
def testDescribeOffsetsOfNonExistingGroup() {
val group = "missing.group"
TestUtils.createOffsetsTopic(zkClient, servers)
// run one consumer in the group consuming from a single-partition topic
addConsumerGroupExecutor(numConsumers = 1)
// note the group to be queried is a different (non-existing) group
val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", "missing.group")
val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group)
val service = getConsumerGroupService(cgcArgs)
val (state, assignments) = service.collectGroupOffsets()
val (state, assignments) = service.collectGroupOffsets(group)
assertTrue(s"Expected the state to be 'Dead', with no members in the group '$group'.",
state.contains("Dead") && assignments.contains(List()))
}
@Test
def testDescribeMembersOfNonExistingGroup() {
val group = "missing.group"
TestUtils.createOffsetsTopic(zkClient, servers)
// run one consumer in the group consuming from a single-partition topic
addConsumerGroupExecutor(numConsumers = 1)
// note the group to be queried is a different (non-existing) group
val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", "missing.group")
val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group)
val service = getConsumerGroupService(cgcArgs)
val (state, assignments) = service.collectGroupMembers(false)
val (state, assignments) = service.collectGroupMembers(group, false)
assertTrue(s"Expected the state to be 'Dead', with no members in the group '$group'.",
state.contains("Dead") && assignments.contains(List()))
val (state2, assignments2) = service.collectGroupMembers(true)
val (state2, assignments2) = service.collectGroupMembers(group, true)
assertTrue(s"Expected the state to be 'Dead', with no members in the group '$group' (verbose option).",
state2.contains("Dead") && assignments2.contains(List()))
}
@Test
def testDescribeStateOfNonExistingGroup() {
val group = "missing.group"
TestUtils.createOffsetsTopic(zkClient, servers)
// run one consumer in the group consuming from a single-partition topic
addConsumerGroupExecutor(numConsumers = 1)
// note the group to be queried is a different (non-existing) group
val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", "missing.group")
val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group)
val service = getConsumerGroupService(cgcArgs)
val state = service.collectGroupState()
val state = service.collectGroupState(group)
assertTrue(s"Expected the state to be 'Dead', with no members in the group '$group'.",
state.state == "Dead" && state.numMembers == 0 &&
state.coordinator != null && servers.map(_.config.brokerId).toList.contains(state.coordinator.id)
@ -122,12 +125,61 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
val (output, error) = TestUtils.grabConsoleOutputAndError(service.describeGroup())
val (output, error) = TestUtils.grabConsoleOutputAndError(service.describeGroups())
output.trim.split("\n").length == 2 && error.isEmpty
}, s"Expected a data row and no error in describe results with describe type ${describeType.mkString(" ")}.", maxRetries = 3)
}
}
@Test
def testDescribeExistingGroups() {
TestUtils.createOffsetsTopic(zkClient, servers)
// Create N single-threaded consumer groups from a single-partition topic
val groups = (for (describeType <- describeTypes) yield {
val group = this.group + describeType.mkString("")
addConsumerGroupExecutor(numConsumers = 1, group = group)
Array("--group", group)
}).flatten
val expectedNumLines = describeTypes.length * 2
for (describeType <- describeTypes) {
val cgcArgs = Array("--bootstrap-server", brokerList, "--describe") ++ groups ++ describeType
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
val (output, error) = TestUtils.grabConsoleOutputAndError(service.describeGroups())
val numLines = output.trim.split("\n").filterNot(line => line.isEmpty).length
(numLines == expectedNumLines) && error.isEmpty
}, s"Expected a data row and no error in describe results with describe type ${describeType.mkString(" ")}.", maxRetries = 3)
}
}
@Test
def testDescribeAllExistingGroups() {
TestUtils.createOffsetsTopic(zkClient, servers)
// Create N single-threaded consumer groups from a single-partition topic
for (describeType <- describeTypes) {
val group = this.group + describeType.mkString("")
addConsumerGroupExecutor(numConsumers = 1, group = group)
}
val expectedNumLines = describeTypes.length * 2
for (describeType <- describeTypes) {
val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--all-groups") ++ describeType
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
val (output, error) = TestUtils.grabConsoleOutputAndError(service.describeGroups())
val numLines = output.trim.split("\n").filterNot(line => line.isEmpty).length
(numLines == expectedNumLines) && error.isEmpty
}, s"Expected a data row and no error in describe results with describe type ${describeType.mkString(" ")}.", maxRetries = 3)
}
}
@Test
def testDescribeOffsetsOfExistingGroup() {
TestUtils.createOffsetsTopic(zkClient, servers)
@ -139,7 +191,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
val (state, assignments) = service.collectGroupOffsets()
val (state, assignments) = service.collectGroupOffsets(group)
state.contains("Stable") &&
assignments.isDefined &&
assignments.get.count(_.group == group) == 1 &&
@ -159,7 +211,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
val (state, assignments) = service.collectGroupMembers(false)
val (state, assignments) = service.collectGroupMembers(group, false)
state.contains("Stable") &&
(assignments match {
case Some(memberAssignments) =>
@ -172,7 +224,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
})
}, s"Expected a 'Stable' group status, rows and valid member information for group $group.", maxRetries = 3)
val (_, assignments) = service.collectGroupMembers(true)
val (_, assignments) = service.collectGroupMembers(group, true)
assignments match {
case None =>
fail(s"Expected partition assignments for members of group $group")
@ -193,7 +245,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
val state = service.collectGroupState()
val state = service.collectGroupState(group)
state.state == "Stable" &&
state.numMembers == 1 &&
state.assignmentStrategy == "range" &&
@ -212,7 +264,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
val state = service.collectGroupState()
val state = service.collectGroupState(group)
state.state == "Stable" &&
state.numMembers == 1 &&
state.assignmentStrategy == "roundrobin" &&
@ -233,14 +285,14 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
val (output, error) = TestUtils.grabConsoleOutputAndError(service.describeGroup())
val (output, error) = TestUtils.grabConsoleOutputAndError(service.describeGroups())
output.trim.split("\n").length == 2 && error.isEmpty
}, s"Expected describe group results with one data row for describe type '${describeType.mkString(" ")}'", maxRetries = 3)
// stop the consumer so the group has no active member anymore
executor.shutdown()
TestUtils.waitUntilTrue(() => {
TestUtils.grabConsoleError(service.describeGroup()).contains(s"Consumer group '$group' has no active members.")
TestUtils.grabConsoleError(service.describeGroups()).contains(s"Consumer group '$group' has no active members.")
}, s"Expected no active member in describe group results with describe type ${describeType.mkString(" ")}", maxRetries = 3)
}
}
@ -256,14 +308,14 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
val (state, assignments) = service.collectGroupOffsets()
val (state, assignments) = service.collectGroupOffsets(group)
state.contains("Stable") && assignments.exists(_.exists(_.group == group))
}, "Expected the group to initially become stable, and to find group in assignments after initial offset commit.", maxRetries = 3)
// stop the consumer so the group has no active member anymore
executor.shutdown()
val (result, succeeded) = TestUtils.computeUntilTrue(service.collectGroupOffsets()) {
val (result, succeeded) = TestUtils.computeUntilTrue(service.collectGroupOffsets(group)) {
case (state, assignments) =>
val testGroupAssignments = assignments.toSeq.flatMap(_.filter(_.group == group))
def assignment = testGroupAssignments.head
@ -289,7 +341,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
val (state, assignments) = service.collectGroupMembers(false)
val (state, assignments) = service.collectGroupMembers(group, false)
state.contains("Stable") && assignments.exists(_.exists(_.group == group))
}, "Expected the group to initially become stable, and to find group in assignments after initial offset commit.", maxRetries = 3)
@ -297,7 +349,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
executor.shutdown()
TestUtils.waitUntilTrue(() => {
val (state, assignments) = service.collectGroupMembers(false)
val (state, assignments) = service.collectGroupMembers(group, false)
state.contains("Empty") && assignments.isDefined && assignments.get.isEmpty
}, s"Expected no member in describe group members results for group '$group'", maxRetries = 3)
}
@ -313,7 +365,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
val state = service.collectGroupState()
val state = service.collectGroupState(group)
state.state == "Stable" &&
state.numMembers == 1 &&
state.coordinator != null &&
@ -324,7 +376,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
executor.shutdown()
TestUtils.waitUntilTrue(() => {
val state = service.collectGroupState()
val state = service.collectGroupState(group)
state.state == "Empty" && state.numMembers == 0 && state.assignmentStrategy == ""
}, s"Expected the group '$group' to become empty after the only member leaving.", maxRetries = 3)
}
@ -341,7 +393,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
val (output, error) = TestUtils.grabConsoleOutputAndError(service.describeGroup())
val (output, error) = TestUtils.grabConsoleOutputAndError(service.describeGroups())
val expectedNumRows = if (describeTypeMembers.contains(describeType)) 3 else 2
error.isEmpty && output.trim.split("\n").size == expectedNumRows
}, s"Expected a single data row in describe group result with describe type '${describeType.mkString(" ")}'", maxRetries = 3)
@ -359,7 +411,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
val (state, assignments) = service.collectGroupOffsets()
val (state, assignments) = service.collectGroupOffsets(group)
state.contains("Stable") &&
assignments.isDefined &&
assignments.get.count(_.group == group) == 1 &&
@ -378,7 +430,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
val (state, assignments) = service.collectGroupMembers(false)
val (state, assignments) = service.collectGroupMembers(group, false)
state.contains("Stable") &&
assignments.isDefined &&
assignments.get.count(_.group == group) == 2 &&
@ -387,7 +439,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
assignments.get.count(_.assignment.nonEmpty) == 0
}, "Expected rows for consumers with no assigned partitions in describe group results", maxRetries = 3)
val (state, assignments) = service.collectGroupMembers(true)
val (state, assignments) = service.collectGroupMembers(group, true)
assertTrue("Expected additional columns in verbose version of describe members",
state.contains("Stable") && assignments.get.count(_.assignment.nonEmpty) > 0)
}
@ -403,7 +455,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
val state = service.collectGroupState()
val state = service.collectGroupState(group)
state.state == "Stable" && state.numMembers == 2
}, "Expected two consumers in describe group results", maxRetries = 3)
}
@ -422,7 +474,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
val (output, error) = TestUtils.grabConsoleOutputAndError(service.describeGroup())
val (output, error) = TestUtils.grabConsoleOutputAndError(service.describeGroups())
val expectedNumRows = if (describeTypeState.contains(describeType)) 2 else 3
error.isEmpty && output.trim.split("\n").size == expectedNumRows
}, s"Expected a single data row in describe group result with describe type '${describeType.mkString(" ")}'", maxRetries = 3)
@ -442,7 +494,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
val (state, assignments) = service.collectGroupOffsets()
val (state, assignments) = service.collectGroupOffsets(group)
state.contains("Stable") &&
assignments.isDefined &&
assignments.get.count(_.group == group) == 2 &&
@ -464,7 +516,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
val (state, assignments) = service.collectGroupMembers(false)
val (state, assignments) = service.collectGroupMembers(group, false)
state.contains("Stable") &&
assignments.isDefined &&
assignments.get.count(_.group == group) == 2 &&
@ -472,7 +524,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
assignments.get.count{ x => x.group == group && x.numPartitions == 0 } == 0
}, "Expected two rows (one row per consumer) in describe group members results.", maxRetries = 3)
val (state, assignments) = service.collectGroupMembers(true)
val (state, assignments) = service.collectGroupMembers(group, true)
assertTrue("Expected additional columns in verbose version of describe members",
state.contains("Stable") && assignments.get.count(_.assignment.isEmpty) == 0)
}
@ -490,7 +542,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
val state = service.collectGroupState()
val state = service.collectGroupState(group)
state.state == "Stable" && state.group == group && state.numMembers == 2
}, "Expected a stable group with two members in describe group state result.", maxRetries = 3)
}
@ -508,7 +560,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
val (state, assignments) = service.collectGroupOffsets()
val (state, assignments) = service.collectGroupOffsets(group)
state.contains("Empty") && assignments.isDefined && assignments.get.count(_.group == group) == 2
}, "Expected a stable group with two members in describe group state result.", maxRetries = 3)
}
@ -527,7 +579,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
val service = getConsumerGroupService(cgcArgs)
try {
TestUtils.grabConsoleOutputAndError(service.describeGroup())
TestUtils.grabConsoleOutputAndError(service.describeGroups())
fail(s"The consumer group command should have failed due to low initialization timeout (describe type: ${describeType.mkString(" ")})")
} catch {
case e: ExecutionException => assert(e.getCause.isInstanceOf[TimeoutException]) // OK
@ -547,7 +599,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
val service = getConsumerGroupService(cgcArgs)
try {
service.collectGroupOffsets()
service.collectGroupOffsets(group)
fail("The consumer group command should fail due to low initialization timeout")
} catch {
case e : ExecutionException => assert(e.getCause.isInstanceOf[TimeoutException]) // OK
@ -567,12 +619,12 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
val service = getConsumerGroupService(cgcArgs)
try {
service.collectGroupMembers(false)
service.collectGroupMembers(group, false)
fail("The consumer group command should fail due to low initialization timeout")
} catch {
case e: ExecutionException => assert(e.getCause.isInstanceOf[TimeoutException])// OK
try {
service.collectGroupMembers(true)
service.collectGroupMembers(group, true)
fail("The consumer group command should fail due to low initialization timeout (verbose)")
} catch {
case e: ExecutionException => assert(e.getCause.isInstanceOf[TimeoutException]) // OK
@ -593,7 +645,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
val service = getConsumerGroupService(cgcArgs)
try {
service.collectGroupState()
service.collectGroupState(group)
fail("The consumer group command should fail due to low initialization timeout")
} catch {
case e: ExecutionException => assert(e.getCause.isInstanceOf[TimeoutException]) // OK
@ -621,7 +673,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
val (state, assignments) = service.collectGroupOffsets()
val (state, assignments) = service.collectGroupOffsets(group)
state.contains("Stable") &&
assignments.isDefined &&
assignments.get.count(_.group == group) == 1 &&

View File

@ -71,38 +71,93 @@ class TimeConversionTests {
* - export/import
*/
class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
val overridingProps = new Properties()
val topic1 = "foo1"
val topic2 = "foo2"
override def generateConfigs: Seq[KafkaConfig] = {
TestUtils.createBrokerConfigs(1, zkConnect, enableControlledShutdown = false)
.map(KafkaConfig.fromProps(_, overridingProps))
}
.map(KafkaConfig.fromProps(_, overridingProps))
}
@Test
def testResetOffsetsNotExistingGroup() {
val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", "missing.group", "--all-topics",
val group = "missing.group"
val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics",
"--to-current", "--execute")
val consumerGroupCommand = getConsumerGroupService(args)
// Make sure we got a coordinator
TestUtils.waitUntilTrue(() => {
consumerGroupCommand.collectGroupState().coordinator.host() == "localhost"
}, "Can't find a coordinator.", maxRetries = 3)
val resetOffsets = consumerGroupCommand.resetOffsets()
consumerGroupCommand.collectGroupState(group).coordinator.host() == "localhost"
}, "Can't find a coordinator")
val resetOffsets = consumerGroupCommand.resetOffsets()(group)
assertEquals(Map.empty, resetOffsets)
assertEquals(resetOffsets, committedOffsets(group = "missing.group"))
assertEquals(resetOffsets, committedOffsets(group = group))
}
@Test
def testResetOffsetsExistingTopic(): Unit = {
val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", "new.group", "--topic", topic,
val group = "new.group"
val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--topic", topic,
"--to-offset", "50")
produceMessages(topic, 100)
resetAndAssertOffsets(args, expectedOffset = 50, dryRun = true)
resetAndAssertOffsets(args ++ Array("--dry-run"), expectedOffset = 50, dryRun = true)
resetAndAssertOffsets(args ++ Array("--execute"), expectedOffset = 50, group = "new.group")
resetAndAssertOffsets(args ++ Array("--execute"), expectedOffset = 50)
}
@Test
def testResetOffsetsExistingTopicSelectedGroups(): Unit = {
produceMessages(topic, 100)
val groups = (
for (id <- 1 to 3) yield {
val group = this.group + id
val executor = addConsumerGroupExecutor(numConsumers = 1, topic = topic, group = group)
awaitConsumerProgress(count = 100L, group = group)
executor.shutdown()
Array("--group", group)
}).toArray.flatten
val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--topic", topic,
"--to-offset", "50") ++ groups
resetAndAssertOffsets(args, expectedOffset = 50, dryRun = true)
resetAndAssertOffsets(args ++ Array("--dry-run"), expectedOffset = 50, dryRun = true)
resetAndAssertOffsets(args ++ Array("--execute"), expectedOffset = 50)
}
@Test
def testResetOffsetsExistingTopicAllGroups(): Unit = {
val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--all-groups", "--topic", topic,
"--to-offset", "50")
produceMessages(topic, 100)
for (group <- 1 to 3 map (group + _)) {
val executor = addConsumerGroupExecutor(numConsumers = 1, topic = topic, group = group)
awaitConsumerProgress(count = 100L, group = group)
executor.shutdown()
}
resetAndAssertOffsets(args, expectedOffset = 50, dryRun = true)
resetAndAssertOffsets(args ++ Array("--dry-run"), expectedOffset = 50, dryRun = true)
resetAndAssertOffsets(args ++ Array("--execute"), expectedOffset = 50)
}
@Test
def testResetOffsetsAllTopicsAllGroups(): Unit = {
val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--all-groups", "--all-topics",
"--to-offset", "50")
val topics = 1 to 3 map (topic + _)
val groups = 1 to 3 map (group + _)
topics foreach (topic => produceMessages(topic, 100))
for {
topic <- topics
group <- groups
} {
val executor = addConsumerGroupExecutor(numConsumers = 3, topic = topic, group = group)
awaitConsumerProgress(topic = topic, count = 100L, group = group)
executor.shutdown()
}
resetAndAssertOffsets(args, expectedOffset = 50, dryRun = true, topics = topics)
resetAndAssertOffsets(args ++ Array("--dry-run"), expectedOffset = 50, dryRun = true, topics = topics)
resetAndAssertOffsets(args ++ Array("--execute"), expectedOffset = 50, topics = topics)
}
@Test
@ -143,7 +198,7 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
def testResetOffsetsByDuration() {
val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics",
"--by-duration", "PT1M", "--execute")
produceConsumeAndShutdown(topic, totalMessages = 100)
produceConsumeAndShutdown(topic, group, totalMessages = 100)
resetAndAssertOffsets(args, expectedOffset = 0)
}
@ -151,7 +206,7 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
def testResetOffsetsByDurationToEarliest() {
val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics",
"--by-duration", "PT0.1S", "--execute")
produceConsumeAndShutdown(topic, totalMessages = 100)
produceConsumeAndShutdown(topic, group, totalMessages = 100)
resetAndAssertOffsets(args, expectedOffset = 100)
}
@ -159,7 +214,7 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
def testResetOffsetsToEarliest() {
val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics",
"--to-earliest", "--execute")
produceConsumeAndShutdown(topic, totalMessages = 100)
produceConsumeAndShutdown(topic, group, totalMessages = 100)
resetAndAssertOffsets(args, expectedOffset = 0)
}
@ -167,7 +222,7 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
def testResetOffsetsToLatest() {
val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics",
"--to-latest", "--execute")
produceConsumeAndShutdown(topic, totalMessages = 100)
produceConsumeAndShutdown(topic, group, totalMessages = 100)
produceMessages(topic, 100)
resetAndAssertOffsets(args, expectedOffset = 200)
}
@ -176,7 +231,7 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
def testResetOffsetsToCurrentOffset() {
val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics",
"--to-current", "--execute")
produceConsumeAndShutdown(topic, totalMessages = 100)
produceConsumeAndShutdown(topic, group, totalMessages = 100)
produceMessages(topic, 100)
resetAndAssertOffsets(args, expectedOffset = 100)
}
@ -185,7 +240,7 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
def testResetOffsetsToSpecificOffset() {
val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics",
"--to-offset", "1", "--execute")
produceConsumeAndShutdown(topic, totalMessages = 100)
produceConsumeAndShutdown(topic, group, totalMessages = 100)
resetAndAssertOffsets(args, expectedOffset = 1)
}
@ -193,7 +248,7 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
def testResetOffsetsShiftPlus() {
val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics",
"--shift-by", "50", "--execute")
produceConsumeAndShutdown(topic, totalMessages = 100)
produceConsumeAndShutdown(topic, group, totalMessages = 100)
produceMessages(topic, 100)
resetAndAssertOffsets(args, expectedOffset = 150)
}
@ -202,7 +257,7 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
def testResetOffsetsShiftMinus() {
val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics",
"--shift-by", "-50", "--execute")
produceConsumeAndShutdown(topic, totalMessages = 100)
produceConsumeAndShutdown(topic, group, totalMessages = 100)
produceMessages(topic, 100)
resetAndAssertOffsets(args, expectedOffset = 50)
}
@ -211,7 +266,7 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
def testResetOffsetsShiftByLowerThanEarliest() {
val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics",
"--shift-by", "-150", "--execute")
produceConsumeAndShutdown(topic, totalMessages = 100)
produceConsumeAndShutdown(topic, group, totalMessages = 100)
produceMessages(topic, 100)
resetAndAssertOffsets(args, expectedOffset = 0)
}
@ -220,7 +275,7 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
def testResetOffsetsShiftByHigherThanLatest() {
val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics",
"--shift-by", "150", "--execute")
produceConsumeAndShutdown(topic, totalMessages = 100)
produceConsumeAndShutdown(topic, group, totalMessages = 100)
produceMessages(topic, 100)
resetAndAssertOffsets(args, expectedOffset = 200)
}
@ -229,7 +284,7 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
def testResetOffsetsToEarliestOnOneTopic() {
val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--topic", topic,
"--to-earliest", "--execute")
produceConsumeAndShutdown(topic, totalMessages = 100)
produceConsumeAndShutdown(topic, group, totalMessages = 100)
resetAndAssertOffsets(args, expectedOffset = 0)
}
@ -242,7 +297,7 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
s"$topic:1", "--to-earliest", "--execute")
val consumerGroupCommand = getConsumerGroupService(args)
produceConsumeAndShutdown(topic, totalMessages = 100, numConsumers = 2)
produceConsumeAndShutdown(topic, group, totalMessages = 100, numConsumers = 2)
val priorCommittedOffsets = committedOffsets(topic = topic)
val tp0 = new TopicPartition(topic, 0)
@ -264,13 +319,13 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
"--topic", topic2, "--to-earliest", "--execute")
val consumerGroupCommand = getConsumerGroupService(args)
produceConsumeAndShutdown(topic1, 100, 1)
produceConsumeAndShutdown(topic2, 100, 1)
produceConsumeAndShutdown(topic1, group, 100, 1)
produceConsumeAndShutdown(topic2, group, 100, 1)
val tp1 = new TopicPartition(topic1, 0)
val tp2 = new TopicPartition(topic2, 0)
val allResetOffsets = resetOffsets(consumerGroupCommand)
val allResetOffsets = resetOffsets(consumerGroupCommand)(group).mapValues(_.offset())
assertEquals(Map(tp1 -> 0L, tp2 -> 0L), allResetOffsets)
assertEquals(Map(tp1 -> 0L), committedOffsets(topic1))
assertEquals(Map(tp2 -> 0L), committedOffsets(topic2))
@ -291,15 +346,15 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
s"$topic1:1", "--topic", s"$topic2:1", "--to-earliest", "--execute")
val consumerGroupCommand = getConsumerGroupService(args)
produceConsumeAndShutdown(topic1, 100, 2)
produceConsumeAndShutdown(topic2, 100, 2)
produceConsumeAndShutdown(topic1, group, 100, 2)
produceConsumeAndShutdown(topic2, group, 100, 2)
val priorCommittedOffsets1 = committedOffsets(topic1)
val priorCommittedOffsets2 = committedOffsets(topic2)
val tp1 = new TopicPartition(topic1, 1)
val tp2 = new TopicPartition(topic2, 1)
val allResetOffsets = resetOffsets(consumerGroupCommand)
val allResetOffsets = resetOffsets(consumerGroupCommand)(group).mapValues(_.offset())
assertEquals(Map(tp1 -> 0, tp2 -> 0), allResetOffsets)
assertEquals(priorCommittedOffsets1 + (tp1 -> 0L), committedOffsets(topic1))
@ -310,7 +365,8 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
}
@Test
def testResetOffsetsExportImportPlan() {
// This one deals with old CSV export/import format for a single --group arg: "topic,partition,offset" to support old behavior
def testResetOffsetsExportImportPlanSingleGroupArg() {
val topic = "bar"
val tp0 = new TopicPartition(topic, 0)
val tp1 = new TopicPartition(topic, 1)
@ -320,22 +376,72 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
"--to-offset", "2", "--export")
val consumerGroupCommand = getConsumerGroupService(cgcArgs)
produceConsumeAndShutdown(topic, 100, 2)
produceConsumeAndShutdown(topic = topic, group = group, totalMessages = 100, numConsumers = 2)
val file = File.createTempFile("reset", ".csv")
file.deleteOnExit()
val exportedOffsets = consumerGroupCommand.resetOffsets()
val bw = new BufferedWriter(new FileWriter(file))
bw.write(consumerGroupCommand.exportOffsetsToReset(exportedOffsets))
bw.write(consumerGroupCommand.exportOffsetsToCsv(exportedOffsets))
bw.close()
assertEquals(Map(tp0 -> 2L, tp1 -> 2L), exportedOffsets.mapValues(_.offset))
assertEquals(Map(tp0 -> 2L, tp1 -> 2L), exportedOffsets(group).mapValues(_.offset))
val cgcArgsExec = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics",
"--from-file", file.getCanonicalPath, "--dry-run")
val consumerGroupCommandExec = getConsumerGroupService(cgcArgsExec)
val importedOffsets = consumerGroupCommandExec.resetOffsets()
assertEquals(Map(tp0 -> 2L, tp1 -> 2L), importedOffsets.mapValues(_.offset))
assertEquals(Map(tp0 -> 2L, tp1 -> 2L), importedOffsets(group).mapValues(_.offset))
adminZkClient.deleteTopic(topic)
}
@Test
// This one deals with universal CSV export/import file format "group,topic,partition,offset",
// supporting multiple --group args or --all-groups arg
def testResetOffsetsExportImportPlan() {
val group1 = group + "1"
val group2 = group + "2"
val topic1 = "bar1"
val topic2 = "bar2"
val t1p0 = new TopicPartition(topic1, 0)
val t1p1 = new TopicPartition(topic1, 1)
val t2p0 = new TopicPartition(topic2, 0)
val t2p1 = new TopicPartition(topic2, 1)
createTopic(topic1, 2, 1)
createTopic(topic2, 2, 1)
val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group1, "--group", group2, "--all-topics",
"--to-offset", "2", "--export")
val consumerGroupCommand = getConsumerGroupService(cgcArgs)
produceConsumeAndShutdown(topic = topic1, group = group1, totalMessages = 100, numConsumers = 2)
produceConsumeAndShutdown(topic = topic2, group = group2, totalMessages = 100, numConsumers = 5)
val file = File.createTempFile("reset", ".csv")
file.deleteOnExit()
val exportedOffsets = consumerGroupCommand.resetOffsets()
val bw = new BufferedWriter(new FileWriter(file))
bw.write(consumerGroupCommand.exportOffsetsToCsv(exportedOffsets))
bw.close()
assertEquals(Map(t1p0 -> 2L, t1p1 -> 2L), exportedOffsets(group1).mapValues(_.offset))
assertEquals(Map(t2p0 -> 2L, t2p1 -> 2L), exportedOffsets(group2).mapValues(_.offset))
// Multiple --group's offset import
val cgcArgsExec = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group1, "--group", group2, "--all-topics",
"--from-file", file.getCanonicalPath, "--dry-run")
val consumerGroupCommandExec = getConsumerGroupService(cgcArgsExec)
val importedOffsets = consumerGroupCommandExec.resetOffsets()
assertEquals(Map(t1p0 -> 2L, t1p1 -> 2L), importedOffsets(group1).mapValues(_.offset))
assertEquals(Map(t2p0 -> 2L, t2p1 -> 2L), importedOffsets(group2).mapValues(_.offset))
// Single --group offset import using "group,topic,partition,offset" csv format
val cgcArgsExec2 = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group1, "--all-topics",
"--from-file", file.getCanonicalPath, "--dry-run")
val consumerGroupCommandExec2 = getConsumerGroupService(cgcArgsExec2)
val importedOffsets2 = consumerGroupCommandExec2.resetOffsets()
assertEquals(Map(t1p0 -> 2L, t1p1 -> 2L), importedOffsets2(group1).mapValues(_.offset))
adminZkClient.deleteTopic(topic)
}
@ -353,30 +459,38 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
TestUtils.produceMessages(servers, records, acks = 1)
}
private def produceConsumeAndShutdown(topic: String, totalMessages: Int, numConsumers: Int = 1) {
private def produceConsumeAndShutdown(topic: String, group: String = group, totalMessages: Int, numConsumers: Int = 1) {
produceMessages(topic, totalMessages)
val executor = addConsumerGroupExecutor(numConsumers, topic)
awaitConsumerProgress(topic, totalMessages)
val executor = addConsumerGroupExecutor(numConsumers = numConsumers, topic = topic, group = group)
awaitConsumerProgress(topic, group, totalMessages)
executor.shutdown()
}
private def awaitConsumerProgress(topic: String = topic, count: Long): Unit = {
private def awaitConsumerProgress(topic: String = topic, group: String = group, count: Long): Unit = {
TestUtils.waitUntilTrue(() => {
val offsets = committedOffsets(topic).values
val offsets = committedOffsets(topic = topic, group = group).values
count == offsets.sum
}, "Expected that consumer group has consumed all messages from topic/partition.")
}, "Expected that consumer group has consumed all messages from topic/partition. " +
s"Expected offset: $count. Actual offset: ${committedOffsets(topic, group).values.sum}")
}
private def resetAndAssertOffsets(args: Array[String],
expectedOffset: Long,
group: String = group,
dryRun: Boolean = false): Unit = {
expectedOffset: Long,
dryRun: Boolean = false,
topics: Seq[String] = Seq(topic)): Unit = {
val consumerGroupCommand = getConsumerGroupService(args)
val expectedOffsets = topics.map(topic => topic -> Map(new TopicPartition(topic, 0) -> expectedOffset)).toMap
val resetOffsetsResultByGroup = resetOffsets(consumerGroupCommand)
try {
val priorOffsets = committedOffsets(group = group)
val expectedOffsets = Map(new TopicPartition(topic, 0) -> expectedOffset)
assertEquals(expectedOffsets, resetOffsets(consumerGroupCommand))
assertEquals(if (dryRun) priorOffsets else expectedOffsets, committedOffsets(group = group))
for {
topic <- topics
(group, partitionInfo) <- resetOffsetsResultByGroup
} {
val priorOffsets = committedOffsets(topic = topic, group = group)
assertEquals(expectedOffsets(topic), partitionInfo.filter(partitionInfo => partitionInfo._1.topic() == topic).mapValues(_.offset))
assertEquals(if (dryRun) priorOffsets else expectedOffsets(topic), committedOffsets(topic = topic, group = group))
}
} finally {
consumerGroupCommand.close()
}
@ -386,14 +500,16 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
expectedOffsets: Map[TopicPartition, Long],
topic: String): Unit = {
val allResetOffsets = resetOffsets(consumerGroupService)
allResetOffsets.foreach { case (tp, offset) =>
assertEquals(offset, expectedOffsets(tp))
for {
(group, offsetsInfo) <- allResetOffsets
(tp, offsetMetadata) <- offsetsInfo
} {
assertEquals(offsetMetadata.offset(), expectedOffsets(tp))
assertEquals(expectedOffsets, committedOffsets(topic, group))
}
assertEquals(expectedOffsets, committedOffsets(topic))
}
private def resetOffsets(consumerGroupService: ConsumerGroupService): Map[TopicPartition, Long] = {
consumerGroupService.resetOffsets().mapValues(_.offset)
private def resetOffsets(consumerGroupService: ConsumerGroupService) = {
consumerGroupService.resetOffsets()
}
}

View File

@ -105,6 +105,8 @@ libs += [
bcpkix: "org.bouncycastle:bcpkix-jdk15on:$versions.bcpkix",
easymock: "org.easymock:easymock:$versions.easymock",
jacksonDatabind: "com.fasterxml.jackson.core:jackson-databind:$versions.jackson",
jacksonDataformatCsv: "com.fasterxml.jackson.dataformat:jackson-dataformat-csv:$versions.jackson",
jacksonModuleScala: "com.fasterxml.jackson.module:jackson-module-scala_$versions.baseScala:$versions.jackson",
jacksonJDK8Datatypes: "com.fasterxml.jackson.datatype:jackson-datatype-jdk8:$versions.jackson",
jacksonJaxrsJsonProvider: "com.fasterxml.jackson.jaxrs:jackson-jaxrs-json-provider:$versions.jackson",
jaxbApi: "javax.xml.bind:jaxb-api:$versions.jaxb",