KAFKA-2768: AdminClient ignore member list for non-stable groups

…stabilizing

Author: Ashish Singh <asingh@cloudera.com>

Reviewers: Ismael Juma, Jason Gustafson, Guozhang Wang

Closes #447 from SinghAsDev/KAFKA-2768
This commit is contained in:
Ashish Singh 2015-11-13 12:56:15 -08:00 committed by Guozhang Wang
parent 002ec9c796
commit a26dbcdf3a
2 changed files with 17 additions and 15 deletions

View File

@ -138,11 +138,10 @@ class AdminClient(val time: Time,
throw new KafkaException(s"Response from broker contained no metadata for group ${groupId}")
Errors.forCode(metadata.errorCode()).maybeThrow()
val members = metadata.members().map {
case member =>
val metadata = Utils.readBytes(member.memberMetadata())
val assignment = Utils.readBytes(member.memberAssignment())
MemberSummary(member.memberId(), member.clientId(), member.clientHost(), metadata, assignment)
val members = metadata.members().map { member =>
val metadata = Utils.readBytes(member.memberMetadata())
val assignment = Utils.readBytes(member.memberAssignment())
MemberSummary(member.memberId(), member.clientId(), member.clientHost(), metadata, assignment)
}.toList
GroupSummary(metadata.state(), metadata.protocolType(), metadata.protocol(), members)
}
@ -161,10 +160,13 @@ class AdminClient(val time: Time,
if (group.protocolType != ConsumerProtocol.PROTOCOL_TYPE)
throw new IllegalArgumentException(s"Group ${groupId} with protocol type '${group.protocolType}' is not a valid consumer group")
group.members.map {
case member =>
if (group.state == "Stable") {
group.members.map { member =>
val assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(member.assignment))
new ConsumerSummary(member.memberId, member.clientId, member.clientHost, assignment.partitions().asScala.toList)
}
} else {
List.empty
}
}
}

View File

@ -92,9 +92,6 @@ object ConsumerGroupCommand {
val configs = parseConfigs(opts)
val channelSocketTimeoutMs = configs.getProperty("channelSocketTimeoutMs", "600").toInt
val channelRetryBackoffMs = configs.getProperty("channelRetryBackoffMsOpt", "300").toInt
def warnNoTopicsForGroupFound: Unit = {
println("No topic available for consumer group provided")
}
println("%s, %s, %s, %s, %s, %s, %s"
.format("GROUP", "TOPIC", "PARTITION", "CURRENT OFFSET", "LOG END OFFSET", "LAG", "OWNER"))
@ -102,15 +99,18 @@ object ConsumerGroupCommand {
if (!useNewConsumer) {
val topics = zkUtils.getTopicsByConsumerGroup(group)
if (topics.isEmpty) {
warnNoTopicsForGroupFound
println("No topic available for consumer group provided")
} else {
topics.foreach(topic => describeTopic(zkUtils, group, topic, channelSocketTimeoutMs, channelRetryBackoffMs, opts))
}
topics.foreach(topic => describeTopic(zkUtils, group, topic, channelSocketTimeoutMs, channelRetryBackoffMs, opts))
} else {
val consumers = createAndGetAdminClient(opts).describeConsumerGroup(group)
if (consumers.isEmpty)
warnNoTopicsForGroupFound
consumers.foreach(x => describeTopicPartition(zkUtils, group, channelSocketTimeoutMs, channelRetryBackoffMs, opts, x.assignment.map(tp => new TopicAndPartition(tp.topic(), tp.partition())), Option("%s_%s".format(x.clientId, x.clientHost))))
if (consumers.isEmpty) {
println(s"Consumer group, ${group}, does not exist or is rebalancing.")
} else {
consumers.foreach(x => describeTopicPartition(zkUtils, group, channelSocketTimeoutMs, channelRetryBackoffMs, opts, x.assignment.map(tp => new TopicAndPartition(tp.topic(), tp.partition())), Option("%s_%s".format(x.clientId, x.clientHost))))
}
}
}