diff --git a/bin/kafka-broker-api-versions.sh b/bin/kafka-broker-api-versions.sh
new file mode 100755
index 00000000000..4f560a0a60c
--- /dev/null
+++ b/bin/kafka-broker-api-versions.sh
@@ -0,0 +1,17 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+exec $(dirname $0)/kafka-run-class.sh kafka.admin.BrokerApiVersionsCommand "$@"
diff --git a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java b/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
index 6acbb631837..b90009bd57d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.utils.Utils;
import java.util.Collection;
import java.util.EnumMap;
import java.util.Map;
+import java.util.NoSuchElementException;
import java.util.TreeMap;
public class NodeApiVersions {
@@ -56,10 +57,21 @@ public class NodeApiVersions {
}
/**
+ * Convert the object to a string with no linebreaks.
+ *
* This toString method is relatively expensive, so avoid calling it unless debug logging is turned on.
*/
@Override
public String toString() {
+ return toString(false);
+ }
+
+ /**
+ * Convert the object to a string.
+ *
+ * @param lineBreaks True if we should add a linebreak after each api.
+ */
+ public String toString(boolean lineBreaks) {
// The apiVersion collection may not be in sorted order. We put it into
// a TreeMap before printing it out to ensure that we always print in
// ascending order.
@@ -73,11 +85,20 @@ public class NodeApiVersions {
if (!apiKeysText.containsKey(apiKey.id)) {
StringBuilder bld = new StringBuilder();
bld.append(apiKey.name).append("(").
- append(apiKey.id).append("): ").append("UNSUPPORTED");
+ append(apiKey.id).append("): ").append("UNSUPPORTED");
apiKeysText.put(apiKey.id, bld.toString());
}
}
- return "{" + Utils.join(apiKeysText.values(), ", ") + "}";
+ String separator = lineBreaks ? ",\n\t" : ", ";
+ StringBuilder bld = new StringBuilder();
+ bld.append("(");
+ if (lineBreaks)
+ bld.append("\n\t");
+ bld.append(Utils.join(apiKeysText.values(), separator));
+ if (lineBreaks)
+ bld.append("\n");
+ bld.append(")");
+ return bld.toString();
}
private String apiVersionToText(ApiVersion apiVersion) {
@@ -106,4 +127,13 @@ public class NodeApiVersions {
}
return bld.toString();
}
+
+ public ApiVersion apiVersion(ApiKeys apiKey) {
+ for (ApiVersion apiVersion : apiVersions) {
+ if (apiVersion.apiKey == apiKey.id) {
+ return apiVersion;
+ }
+ }
+ throw new NoSuchElementException();
+ }
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java b/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
index b39a0aa88fa..861a28f01f9 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
@@ -36,13 +36,13 @@ public class NodeApiVersionsTest {
NodeApiVersions versions = new NodeApiVersions(
Collections.emptyList());
StringBuilder bld = new StringBuilder();
- String prefix = "{";
+ String prefix = "(";
for (ApiKeys apiKey : ApiKeys.values()) {
bld.append(prefix).append(apiKey.name).
append("(").append(apiKey.id).append("): UNSUPPORTED");
prefix = ", ";
}
- bld.append("}");
+ bld.append(")");
assertEquals(bld.toString(), versions.toString());
}
@@ -59,7 +59,7 @@ public class NodeApiVersionsTest {
}
NodeApiVersions versions = new NodeApiVersions(versionList);
StringBuilder bld = new StringBuilder();
- String prefix = "{";
+ String prefix = "(";
for (ApiKeys apiKey : ApiKeys.values()) {
bld.append(prefix);
if (apiKey == ApiKeys.CONTROLLED_SHUTDOWN_KEY) {
@@ -80,7 +80,7 @@ public class NodeApiVersionsTest {
}
prefix = ", ";
}
- bld.append("}");
+ bld.append(")");
assertEquals(bld.toString(), versions.toString());
}
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index 33089d107a9..680c5e16f43 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -16,6 +16,7 @@ import java.nio.ByteBuffer
import java.util.{Collections, Properties}
import java.util.concurrent.atomic.AtomicInteger
+import org.apache.kafka.common.requests.ApiVersionsResponse.ApiVersion
import kafka.common.KafkaException
import kafka.coordinator.GroupOverview
import kafka.utils.Logging
@@ -28,11 +29,11 @@ import org.apache.kafka.common.network.Selector
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests._
import org.apache.kafka.common.requests.OffsetFetchResponse
-import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{Cluster, Node, TopicPartition}
import scala.collection.JavaConverters._
+import scala.util.Try
class AdminClient(val time: Time,
val requestTimeoutMs: Int,
@@ -68,37 +69,42 @@ class AdminClient(val time: Time,
def findCoordinator(groupId: String): Node = {
val requestBuilder = new GroupCoordinatorRequest.Builder(groupId)
val response = sendAnyNode(ApiKeys.GROUP_COORDINATOR, requestBuilder).asInstanceOf[GroupCoordinatorResponse]
- Errors.forCode(response.errorCode()).maybeThrow()
- response.node()
+ Errors.forCode(response.errorCode).maybeThrow()
+ response.node
}
def listGroups(node: Node): List[GroupOverview] = {
val response = send(node, ApiKeys.LIST_GROUPS, new ListGroupsRequest.Builder()).asInstanceOf[ListGroupsResponse]
- Errors.forCode(response.errorCode()).maybeThrow()
- response.groups().asScala.map(group => GroupOverview(group.groupId(), group.protocolType())).toList
+ Errors.forCode(response.errorCode).maybeThrow()
+ response.groups.asScala.map(group => GroupOverview(group.groupId, group.protocolType)).toList
+ }
+
+ def getApiVersions(node: Node): List[ApiVersion] = {
+ val response = send(node, ApiKeys.API_VERSIONS, new ApiVersionsRequest.Builder()).asInstanceOf[ApiVersionsResponse]
+ Errors.forCode(response.errorCode).maybeThrow()
+ response.apiVersions.asScala.toList
}
private def findAllBrokers(): List[Node] = {
val request = MetadataRequest.Builder.allTopics()
val response = sendAnyNode(ApiKeys.METADATA, request).asInstanceOf[MetadataResponse]
- val errors = response.errors()
+ val errors = response.errors
if (!errors.isEmpty)
debug(s"Metadata request contained errors: $errors")
- response.cluster().nodes().asScala.toList
+ response.cluster.nodes.asScala.toList
}
def listAllGroups(): Map[Node, List[GroupOverview]] = {
- findAllBrokers.map {
- case broker =>
- broker -> {
- try {
- listGroups(broker)
- } catch {
- case e: Exception =>
- debug(s"Failed to find groups from broker $broker", e)
- List[GroupOverview]()
- }
+ findAllBrokers.map { broker =>
+ broker -> {
+ try {
+ listGroups(broker)
+ } catch {
+ case e: Exception =>
+ debug(s"Failed to find groups from broker $broker", e)
+ List[GroupOverview]()
}
+ }
}.toMap
}
@@ -123,9 +129,14 @@ class AdminClient(val time: Time,
if (response.hasError)
throw response.error.exception
response.maybeThrowFirstPartitionError
- response.responseData().asScala.map { responseData => (responseData._1, responseData._2.offset) }.toMap
+ response.responseData.asScala.map { case (tp, partitionData) => (tp, partitionData.offset) }.toMap
}
+ def listAllBrokerVersionInfo(): Map[Node, Try[NodeApiVersions]] =
+ findAllBrokers.map { broker =>
+ broker -> Try[NodeApiVersions](new NodeApiVersions(getApiVersions(broker).asJava))
+ }.toMap
+
/**
* Case class used to represent a consumer of a consumer group
*/
@@ -252,6 +263,6 @@ object AdminClient {
time,
DefaultRequestTimeoutMs,
highLevelClient,
- bootstrapCluster.nodes().asScala.toList)
+ bootstrapCluster.nodes.asScala.toList)
}
}
diff --git a/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
new file mode 100644
index 00000000000..812bc9ddad7
--- /dev/null
+++ b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.admin
+
+import java.io.PrintStream
+import java.util.Properties
+
+import kafka.utils.CommandLineUtils
+import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.clients.CommonClientConfigs
+import joptsimple._
+
+import scala.util.{Failure, Success}
+
+/**
+ * A command for retrieving broker version information.
+ */
+object BrokerApiVersionsCommand {
+
+ def main(args: Array[String]): Unit = {
+ execute(args, System.out)
+ }
+
+ def execute(args: Array[String], out: PrintStream): Unit = {
+ val opts = new BrokerVersionCommandOptions(args)
+ val adminClient = createAdminClient(opts)
+ val brokerMap = adminClient.listAllBrokerVersionInfo()
+ brokerMap.foreach { case (broker, versionInfoOrError) =>
+ versionInfoOrError match {
+ case Success(v) => out.print(s"${broker} -> ${v.toString(true)}\n")
+ case Failure(v) => out.print(s"${broker} -> ERROR: ${v}\n")
+ }
+ }
+ }
+
+ private def createAdminClient(opts: BrokerVersionCommandOptions): AdminClient = {
+ val props = if (opts.options.has(opts.commandConfigOpt))
+ Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))
+ else
+ new Properties()
+ props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt))
+ AdminClient.create(props)
+ }
+
+ class BrokerVersionCommandOptions(args: Array[String]) {
+ val BootstrapServerDoc = "REQUIRED: The server to connect to."
+ val CommandConfigDoc = "A property file containing configs to be passed to Admin Client."
+
+ val parser = new OptionParser
+ val commandConfigOpt = parser.accepts("command-config", CommandConfigDoc)
+ .withRequiredArg
+ .describedAs("command config property file")
+ .ofType(classOf[String])
+ val bootstrapServerOpt = parser.accepts("bootstrap-server", BootstrapServerDoc)
+ .withRequiredArg
+ .describedAs("server(s) to use for bootstrapping")
+ .ofType(classOf[String])
+ val options = parser.parse(args : _*)
+ checkArgs()
+
+ def checkArgs() {
+ // check required args
+ CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt)
+ }
+ }
+}
diff --git a/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala b/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala
new file mode 100644
index 00000000000..ff93f22c11f
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.admin
+
+import java.io.{ByteArrayOutputStream, PrintStream}
+import java.nio.charset.StandardCharsets
+
+import kafka.integration.KafkaServerTestHarness
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.NodeApiVersions
+import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.requests.ApiVersionsResponse
+import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
+import org.junit.Test
+
+class BrokerApiVersionsCommandTest extends KafkaServerTestHarness {
+
+ def generateConfigs(): Seq[KafkaConfig] = TestUtils.createBrokerConfigs(1, zkConnect).map(KafkaConfig.fromProps)
+
+ @Test
+ def checkBrokerApiVersionCommandOutput() {
+ val byteArrayOutputStream = new ByteArrayOutputStream
+ val printStream = new PrintStream(byteArrayOutputStream)
+ BrokerApiVersionsCommand.execute(Array("--bootstrap-server", brokerList), printStream)
+ val content = new String(byteArrayOutputStream.toByteArray, StandardCharsets.UTF_8)
+ val lineIter = content.split("\n").iterator
+ assertTrue(lineIter.hasNext)
+ assertEquals(s"$brokerList (id: 0 rack: null) -> (", lineIter.next)
+ val nodeApiVersions = new NodeApiVersions(ApiVersionsResponse.API_VERSIONS_RESPONSE.apiVersions)
+ for (apiKey <- ApiKeys.values) {
+ val apiVersion = nodeApiVersions.apiVersion(apiKey)
+ val versionRangeStr =
+ if (apiVersion.minVersion == apiVersion.maxVersion) apiVersion.minVersion.toString
+ else s"${apiVersion.minVersion} to ${apiVersion.maxVersion}"
+ val terminator = if (apiKey == ApiKeys.values.last) "" else ","
+ val usableVersion = nodeApiVersions.usableVersion(apiKey)
+ val line = s"\t${apiKey.name}(${apiKey.id}): $versionRangeStr [usable: $usableVersion]$terminator"
+ assertTrue(lineIter.hasNext)
+ assertEquals(line, lineIter.next)
+ }
+ assertTrue(lineIter.hasNext)
+ assertEquals(")", lineIter.next)
+ assertFalse(lineIter.hasNext)
+ }
+}
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
index a62922caaf2..1e2749f6946 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
@@ -24,6 +24,7 @@ import kafka.utils.{Logging, TestUtils}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.protocol.ApiKeys
import org.junit.{Before, Test}
import org.junit.Assert._
@@ -77,6 +78,24 @@ class AdminClientTest extends IntegrationTestHarness with Logging {
assertEquals("consumer", group.protocolType)
}
+ @Test
+ def testListAllBrokerVersionInfo() {
+ consumers.head.subscribe(Collections.singletonList(topic))
+ TestUtils.waitUntilTrue(() => {
+ consumers.head.poll(0)
+ !consumers.head.assignment.isEmpty
+ }, "Expected non-empty assignment")
+ val brokerVersionInfos = client.listAllBrokerVersionInfo
+ val brokers = brokerList.split(",")
+ assertEquals(brokers.size, brokerVersionInfos.size)
+ for ((node, tryBrokerVersionInfo) <- brokerVersionInfos) {
+ val hostStr = s"${node.host}:${node.port}"
+ assertTrue(s"Unknown host:port pair $hostStr in brokerVersionInfos", brokers.contains(hostStr))
+ val brokerVersionInfo = tryBrokerVersionInfo.get
+ assertEquals(0, brokerVersionInfo.usableVersion(ApiKeys.API_VERSIONS))
+ }
+ }
+
@Test
def testGetConsumerGroupSummary() {
consumers.head.subscribe(Collections.singletonList(topic))