mirror of https://github.com/apache/kafka.git
KAFKA-4457; Add BrokerApiVersionsCommand
Author: Colin P. Mccabe <cmccabe@confluent.io> Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>, Apurva Mehta <apurva.1618@gmail.com>, Ismael Juma <ismael@juma.me.uk> Closes #2184 from cmccabe/KAFKA-4457
This commit is contained in:
parent
d8a77560c2
commit
4a6f2c6cc0
|
@ -0,0 +1,17 @@
|
||||||
|
#!/bin/bash
|
||||||
|
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
# contributor license agreements. See the NOTICE file distributed with
|
||||||
|
# this work for additional information regarding copyright ownership.
|
||||||
|
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
# (the "License"); you may not use this file except in compliance with
|
||||||
|
# the License. You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
exec $(dirname $0)/kafka-run-class.sh kafka.admin.BrokerApiVersionsCommand "$@"
|
|
@ -21,6 +21,7 @@ import org.apache.kafka.common.utils.Utils;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.EnumMap;
|
import java.util.EnumMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.NoSuchElementException;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
|
|
||||||
public class NodeApiVersions {
|
public class NodeApiVersions {
|
||||||
|
@ -56,10 +57,21 @@ public class NodeApiVersions {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* Convert the object to a string with no linebreaks.<p/>
|
||||||
|
*
|
||||||
* This toString method is relatively expensive, so avoid calling it unless debug logging is turned on.
|
* This toString method is relatively expensive, so avoid calling it unless debug logging is turned on.
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
|
return toString(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Convert the object to a string.
|
||||||
|
*
|
||||||
|
* @param lineBreaks True if we should add a linebreak after each api.
|
||||||
|
*/
|
||||||
|
public String toString(boolean lineBreaks) {
|
||||||
// The apiVersion collection may not be in sorted order. We put it into
|
// The apiVersion collection may not be in sorted order. We put it into
|
||||||
// a TreeMap before printing it out to ensure that we always print in
|
// a TreeMap before printing it out to ensure that we always print in
|
||||||
// ascending order.
|
// ascending order.
|
||||||
|
@ -73,11 +85,20 @@ public class NodeApiVersions {
|
||||||
if (!apiKeysText.containsKey(apiKey.id)) {
|
if (!apiKeysText.containsKey(apiKey.id)) {
|
||||||
StringBuilder bld = new StringBuilder();
|
StringBuilder bld = new StringBuilder();
|
||||||
bld.append(apiKey.name).append("(").
|
bld.append(apiKey.name).append("(").
|
||||||
append(apiKey.id).append("): ").append("UNSUPPORTED");
|
append(apiKey.id).append("): ").append("UNSUPPORTED");
|
||||||
apiKeysText.put(apiKey.id, bld.toString());
|
apiKeysText.put(apiKey.id, bld.toString());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return "{" + Utils.join(apiKeysText.values(), ", ") + "}";
|
String separator = lineBreaks ? ",\n\t" : ", ";
|
||||||
|
StringBuilder bld = new StringBuilder();
|
||||||
|
bld.append("(");
|
||||||
|
if (lineBreaks)
|
||||||
|
bld.append("\n\t");
|
||||||
|
bld.append(Utils.join(apiKeysText.values(), separator));
|
||||||
|
if (lineBreaks)
|
||||||
|
bld.append("\n");
|
||||||
|
bld.append(")");
|
||||||
|
return bld.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
private String apiVersionToText(ApiVersion apiVersion) {
|
private String apiVersionToText(ApiVersion apiVersion) {
|
||||||
|
@ -106,4 +127,13 @@ public class NodeApiVersions {
|
||||||
}
|
}
|
||||||
return bld.toString();
|
return bld.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public ApiVersion apiVersion(ApiKeys apiKey) {
|
||||||
|
for (ApiVersion apiVersion : apiVersions) {
|
||||||
|
if (apiVersion.apiKey == apiKey.id) {
|
||||||
|
return apiVersion;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
throw new NoSuchElementException();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -36,13 +36,13 @@ public class NodeApiVersionsTest {
|
||||||
NodeApiVersions versions = new NodeApiVersions(
|
NodeApiVersions versions = new NodeApiVersions(
|
||||||
Collections.<ApiVersion>emptyList());
|
Collections.<ApiVersion>emptyList());
|
||||||
StringBuilder bld = new StringBuilder();
|
StringBuilder bld = new StringBuilder();
|
||||||
String prefix = "{";
|
String prefix = "(";
|
||||||
for (ApiKeys apiKey : ApiKeys.values()) {
|
for (ApiKeys apiKey : ApiKeys.values()) {
|
||||||
bld.append(prefix).append(apiKey.name).
|
bld.append(prefix).append(apiKey.name).
|
||||||
append("(").append(apiKey.id).append("): UNSUPPORTED");
|
append("(").append(apiKey.id).append("): UNSUPPORTED");
|
||||||
prefix = ", ";
|
prefix = ", ";
|
||||||
}
|
}
|
||||||
bld.append("}");
|
bld.append(")");
|
||||||
assertEquals(bld.toString(), versions.toString());
|
assertEquals(bld.toString(), versions.toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -59,7 +59,7 @@ public class NodeApiVersionsTest {
|
||||||
}
|
}
|
||||||
NodeApiVersions versions = new NodeApiVersions(versionList);
|
NodeApiVersions versions = new NodeApiVersions(versionList);
|
||||||
StringBuilder bld = new StringBuilder();
|
StringBuilder bld = new StringBuilder();
|
||||||
String prefix = "{";
|
String prefix = "(";
|
||||||
for (ApiKeys apiKey : ApiKeys.values()) {
|
for (ApiKeys apiKey : ApiKeys.values()) {
|
||||||
bld.append(prefix);
|
bld.append(prefix);
|
||||||
if (apiKey == ApiKeys.CONTROLLED_SHUTDOWN_KEY) {
|
if (apiKey == ApiKeys.CONTROLLED_SHUTDOWN_KEY) {
|
||||||
|
@ -80,7 +80,7 @@ public class NodeApiVersionsTest {
|
||||||
}
|
}
|
||||||
prefix = ", ";
|
prefix = ", ";
|
||||||
}
|
}
|
||||||
bld.append("}");
|
bld.append(")");
|
||||||
assertEquals(bld.toString(), versions.toString());
|
assertEquals(bld.toString(), versions.toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -16,6 +16,7 @@ import java.nio.ByteBuffer
|
||||||
import java.util.{Collections, Properties}
|
import java.util.{Collections, Properties}
|
||||||
import java.util.concurrent.atomic.AtomicInteger
|
import java.util.concurrent.atomic.AtomicInteger
|
||||||
|
|
||||||
|
import org.apache.kafka.common.requests.ApiVersionsResponse.ApiVersion
|
||||||
import kafka.common.KafkaException
|
import kafka.common.KafkaException
|
||||||
import kafka.coordinator.GroupOverview
|
import kafka.coordinator.GroupOverview
|
||||||
import kafka.utils.Logging
|
import kafka.utils.Logging
|
||||||
|
@ -28,11 +29,11 @@ import org.apache.kafka.common.network.Selector
|
||||||
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
|
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
|
||||||
import org.apache.kafka.common.requests._
|
import org.apache.kafka.common.requests._
|
||||||
import org.apache.kafka.common.requests.OffsetFetchResponse
|
import org.apache.kafka.common.requests.OffsetFetchResponse
|
||||||
import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData
|
|
||||||
import org.apache.kafka.common.utils.{Time, Utils}
|
import org.apache.kafka.common.utils.{Time, Utils}
|
||||||
import org.apache.kafka.common.{Cluster, Node, TopicPartition}
|
import org.apache.kafka.common.{Cluster, Node, TopicPartition}
|
||||||
|
|
||||||
import scala.collection.JavaConverters._
|
import scala.collection.JavaConverters._
|
||||||
|
import scala.util.Try
|
||||||
|
|
||||||
class AdminClient(val time: Time,
|
class AdminClient(val time: Time,
|
||||||
val requestTimeoutMs: Int,
|
val requestTimeoutMs: Int,
|
||||||
|
@ -68,37 +69,42 @@ class AdminClient(val time: Time,
|
||||||
def findCoordinator(groupId: String): Node = {
|
def findCoordinator(groupId: String): Node = {
|
||||||
val requestBuilder = new GroupCoordinatorRequest.Builder(groupId)
|
val requestBuilder = new GroupCoordinatorRequest.Builder(groupId)
|
||||||
val response = sendAnyNode(ApiKeys.GROUP_COORDINATOR, requestBuilder).asInstanceOf[GroupCoordinatorResponse]
|
val response = sendAnyNode(ApiKeys.GROUP_COORDINATOR, requestBuilder).asInstanceOf[GroupCoordinatorResponse]
|
||||||
Errors.forCode(response.errorCode()).maybeThrow()
|
Errors.forCode(response.errorCode).maybeThrow()
|
||||||
response.node()
|
response.node
|
||||||
}
|
}
|
||||||
|
|
||||||
def listGroups(node: Node): List[GroupOverview] = {
|
def listGroups(node: Node): List[GroupOverview] = {
|
||||||
val response = send(node, ApiKeys.LIST_GROUPS, new ListGroupsRequest.Builder()).asInstanceOf[ListGroupsResponse]
|
val response = send(node, ApiKeys.LIST_GROUPS, new ListGroupsRequest.Builder()).asInstanceOf[ListGroupsResponse]
|
||||||
Errors.forCode(response.errorCode()).maybeThrow()
|
Errors.forCode(response.errorCode).maybeThrow()
|
||||||
response.groups().asScala.map(group => GroupOverview(group.groupId(), group.protocolType())).toList
|
response.groups.asScala.map(group => GroupOverview(group.groupId, group.protocolType)).toList
|
||||||
|
}
|
||||||
|
|
||||||
|
def getApiVersions(node: Node): List[ApiVersion] = {
|
||||||
|
val response = send(node, ApiKeys.API_VERSIONS, new ApiVersionsRequest.Builder()).asInstanceOf[ApiVersionsResponse]
|
||||||
|
Errors.forCode(response.errorCode).maybeThrow()
|
||||||
|
response.apiVersions.asScala.toList
|
||||||
}
|
}
|
||||||
|
|
||||||
private def findAllBrokers(): List[Node] = {
|
private def findAllBrokers(): List[Node] = {
|
||||||
val request = MetadataRequest.Builder.allTopics()
|
val request = MetadataRequest.Builder.allTopics()
|
||||||
val response = sendAnyNode(ApiKeys.METADATA, request).asInstanceOf[MetadataResponse]
|
val response = sendAnyNode(ApiKeys.METADATA, request).asInstanceOf[MetadataResponse]
|
||||||
val errors = response.errors()
|
val errors = response.errors
|
||||||
if (!errors.isEmpty)
|
if (!errors.isEmpty)
|
||||||
debug(s"Metadata request contained errors: $errors")
|
debug(s"Metadata request contained errors: $errors")
|
||||||
response.cluster().nodes().asScala.toList
|
response.cluster.nodes.asScala.toList
|
||||||
}
|
}
|
||||||
|
|
||||||
def listAllGroups(): Map[Node, List[GroupOverview]] = {
|
def listAllGroups(): Map[Node, List[GroupOverview]] = {
|
||||||
findAllBrokers.map {
|
findAllBrokers.map { broker =>
|
||||||
case broker =>
|
broker -> {
|
||||||
broker -> {
|
try {
|
||||||
try {
|
listGroups(broker)
|
||||||
listGroups(broker)
|
} catch {
|
||||||
} catch {
|
case e: Exception =>
|
||||||
case e: Exception =>
|
debug(s"Failed to find groups from broker $broker", e)
|
||||||
debug(s"Failed to find groups from broker $broker", e)
|
List[GroupOverview]()
|
||||||
List[GroupOverview]()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
}.toMap
|
}.toMap
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -123,9 +129,14 @@ class AdminClient(val time: Time,
|
||||||
if (response.hasError)
|
if (response.hasError)
|
||||||
throw response.error.exception
|
throw response.error.exception
|
||||||
response.maybeThrowFirstPartitionError
|
response.maybeThrowFirstPartitionError
|
||||||
response.responseData().asScala.map { responseData => (responseData._1, responseData._2.offset) }.toMap
|
response.responseData.asScala.map { case (tp, partitionData) => (tp, partitionData.offset) }.toMap
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def listAllBrokerVersionInfo(): Map[Node, Try[NodeApiVersions]] =
|
||||||
|
findAllBrokers.map { broker =>
|
||||||
|
broker -> Try[NodeApiVersions](new NodeApiVersions(getApiVersions(broker).asJava))
|
||||||
|
}.toMap
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Case class used to represent a consumer of a consumer group
|
* Case class used to represent a consumer of a consumer group
|
||||||
*/
|
*/
|
||||||
|
@ -252,6 +263,6 @@ object AdminClient {
|
||||||
time,
|
time,
|
||||||
DefaultRequestTimeoutMs,
|
DefaultRequestTimeoutMs,
|
||||||
highLevelClient,
|
highLevelClient,
|
||||||
bootstrapCluster.nodes().asScala.toList)
|
bootstrapCluster.nodes.asScala.toList)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,81 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package kafka.admin
|
||||||
|
|
||||||
|
import java.io.PrintStream
|
||||||
|
import java.util.Properties
|
||||||
|
|
||||||
|
import kafka.utils.CommandLineUtils
|
||||||
|
import org.apache.kafka.common.utils.Utils
|
||||||
|
import org.apache.kafka.clients.CommonClientConfigs
|
||||||
|
import joptsimple._
|
||||||
|
|
||||||
|
import scala.util.{Failure, Success}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A command for retrieving broker version information.
|
||||||
|
*/
|
||||||
|
object BrokerApiVersionsCommand {
|
||||||
|
|
||||||
|
def main(args: Array[String]): Unit = {
|
||||||
|
execute(args, System.out)
|
||||||
|
}
|
||||||
|
|
||||||
|
def execute(args: Array[String], out: PrintStream): Unit = {
|
||||||
|
val opts = new BrokerVersionCommandOptions(args)
|
||||||
|
val adminClient = createAdminClient(opts)
|
||||||
|
val brokerMap = adminClient.listAllBrokerVersionInfo()
|
||||||
|
brokerMap.foreach { case (broker, versionInfoOrError) =>
|
||||||
|
versionInfoOrError match {
|
||||||
|
case Success(v) => out.print(s"${broker} -> ${v.toString(true)}\n")
|
||||||
|
case Failure(v) => out.print(s"${broker} -> ERROR: ${v}\n")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private def createAdminClient(opts: BrokerVersionCommandOptions): AdminClient = {
|
||||||
|
val props = if (opts.options.has(opts.commandConfigOpt))
|
||||||
|
Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))
|
||||||
|
else
|
||||||
|
new Properties()
|
||||||
|
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt))
|
||||||
|
AdminClient.create(props)
|
||||||
|
}
|
||||||
|
|
||||||
|
class BrokerVersionCommandOptions(args: Array[String]) {
|
||||||
|
val BootstrapServerDoc = "REQUIRED: The server to connect to."
|
||||||
|
val CommandConfigDoc = "A property file containing configs to be passed to Admin Client."
|
||||||
|
|
||||||
|
val parser = new OptionParser
|
||||||
|
val commandConfigOpt = parser.accepts("command-config", CommandConfigDoc)
|
||||||
|
.withRequiredArg
|
||||||
|
.describedAs("command config property file")
|
||||||
|
.ofType(classOf[String])
|
||||||
|
val bootstrapServerOpt = parser.accepts("bootstrap-server", BootstrapServerDoc)
|
||||||
|
.withRequiredArg
|
||||||
|
.describedAs("server(s) to use for bootstrapping")
|
||||||
|
.ofType(classOf[String])
|
||||||
|
val options = parser.parse(args : _*)
|
||||||
|
checkArgs()
|
||||||
|
|
||||||
|
def checkArgs() {
|
||||||
|
// check required args
|
||||||
|
CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,61 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package kafka.admin
|
||||||
|
|
||||||
|
import java.io.{ByteArrayOutputStream, PrintStream}
|
||||||
|
import java.nio.charset.StandardCharsets
|
||||||
|
|
||||||
|
import kafka.integration.KafkaServerTestHarness
|
||||||
|
import kafka.server.KafkaConfig
|
||||||
|
import kafka.utils.TestUtils
|
||||||
|
import org.apache.kafka.clients.NodeApiVersions
|
||||||
|
import org.apache.kafka.common.protocol.ApiKeys
|
||||||
|
import org.apache.kafka.common.requests.ApiVersionsResponse
|
||||||
|
import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
|
||||||
|
import org.junit.Test
|
||||||
|
|
||||||
|
class BrokerApiVersionsCommandTest extends KafkaServerTestHarness {
|
||||||
|
|
||||||
|
def generateConfigs(): Seq[KafkaConfig] = TestUtils.createBrokerConfigs(1, zkConnect).map(KafkaConfig.fromProps)
|
||||||
|
|
||||||
|
@Test
|
||||||
|
def checkBrokerApiVersionCommandOutput() {
|
||||||
|
val byteArrayOutputStream = new ByteArrayOutputStream
|
||||||
|
val printStream = new PrintStream(byteArrayOutputStream)
|
||||||
|
BrokerApiVersionsCommand.execute(Array("--bootstrap-server", brokerList), printStream)
|
||||||
|
val content = new String(byteArrayOutputStream.toByteArray, StandardCharsets.UTF_8)
|
||||||
|
val lineIter = content.split("\n").iterator
|
||||||
|
assertTrue(lineIter.hasNext)
|
||||||
|
assertEquals(s"$brokerList (id: 0 rack: null) -> (", lineIter.next)
|
||||||
|
val nodeApiVersions = new NodeApiVersions(ApiVersionsResponse.API_VERSIONS_RESPONSE.apiVersions)
|
||||||
|
for (apiKey <- ApiKeys.values) {
|
||||||
|
val apiVersion = nodeApiVersions.apiVersion(apiKey)
|
||||||
|
val versionRangeStr =
|
||||||
|
if (apiVersion.minVersion == apiVersion.maxVersion) apiVersion.minVersion.toString
|
||||||
|
else s"${apiVersion.minVersion} to ${apiVersion.maxVersion}"
|
||||||
|
val terminator = if (apiKey == ApiKeys.values.last) "" else ","
|
||||||
|
val usableVersion = nodeApiVersions.usableVersion(apiKey)
|
||||||
|
val line = s"\t${apiKey.name}(${apiKey.id}): $versionRangeStr [usable: $usableVersion]$terminator"
|
||||||
|
assertTrue(lineIter.hasNext)
|
||||||
|
assertEquals(line, lineIter.next)
|
||||||
|
}
|
||||||
|
assertTrue(lineIter.hasNext)
|
||||||
|
assertEquals(")", lineIter.next)
|
||||||
|
assertFalse(lineIter.hasNext)
|
||||||
|
}
|
||||||
|
}
|
|
@ -24,6 +24,7 @@ import kafka.utils.{Logging, TestUtils}
|
||||||
import org.apache.kafka.clients.consumer.ConsumerConfig
|
import org.apache.kafka.clients.consumer.ConsumerConfig
|
||||||
import org.apache.kafka.clients.producer.ProducerConfig
|
import org.apache.kafka.clients.producer.ProducerConfig
|
||||||
import org.apache.kafka.common.TopicPartition
|
import org.apache.kafka.common.TopicPartition
|
||||||
|
import org.apache.kafka.common.protocol.ApiKeys
|
||||||
import org.junit.{Before, Test}
|
import org.junit.{Before, Test}
|
||||||
import org.junit.Assert._
|
import org.junit.Assert._
|
||||||
|
|
||||||
|
@ -77,6 +78,24 @@ class AdminClientTest extends IntegrationTestHarness with Logging {
|
||||||
assertEquals("consumer", group.protocolType)
|
assertEquals("consumer", group.protocolType)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
def testListAllBrokerVersionInfo() {
|
||||||
|
consumers.head.subscribe(Collections.singletonList(topic))
|
||||||
|
TestUtils.waitUntilTrue(() => {
|
||||||
|
consumers.head.poll(0)
|
||||||
|
!consumers.head.assignment.isEmpty
|
||||||
|
}, "Expected non-empty assignment")
|
||||||
|
val brokerVersionInfos = client.listAllBrokerVersionInfo
|
||||||
|
val brokers = brokerList.split(",")
|
||||||
|
assertEquals(brokers.size, brokerVersionInfos.size)
|
||||||
|
for ((node, tryBrokerVersionInfo) <- brokerVersionInfos) {
|
||||||
|
val hostStr = s"${node.host}:${node.port}"
|
||||||
|
assertTrue(s"Unknown host:port pair $hostStr in brokerVersionInfos", brokers.contains(hostStr))
|
||||||
|
val brokerVersionInfo = tryBrokerVersionInfo.get
|
||||||
|
assertEquals(0, brokerVersionInfo.usableVersion(ApiKeys.API_VERSIONS))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
def testGetConsumerGroupSummary() {
|
def testGetConsumerGroupSummary() {
|
||||||
consumers.head.subscribe(Collections.singletonList(topic))
|
consumers.head.subscribe(Collections.singletonList(topic))
|
||||||
|
|
Loading…
Reference in New Issue