KAFKA-14582: Move JmxTool to tools (#13136)

Reviewers: Mickael Maison <mickael.maison@gmail.com>
This commit is contained in:
Federico Valeri 2023-02-02 11:23:26 +01:00 committed by GitHub
parent 65bb819313
commit 50e0e3c257
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 741 additions and 295 deletions

View File

@ -1763,6 +1763,7 @@ project(':tools') {
implementation libs.jacksonJDK8Datatypes
implementation libs.slf4jApi
implementation libs.log4j
implementation libs.joptSimple
implementation libs.jose4j // for SASL/OAUTHBEARER JWT validation
implementation libs.jacksonJaxrsJsonProvider

View File

@ -408,6 +408,7 @@
<allow pkg="org.apache.log4j" />
<allow pkg="kafka.test" />
<allow pkg="joptsimple" />
<allow pkg="javax.rmi.ssl"/>
</subpackage>
<subpackage name="trogdor">

View File

@ -1,275 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package kafka.tools
import java.util.{Date, Objects}
import java.text.SimpleDateFormat
import javax.management._
import javax.management.remote._
import javax.rmi.ssl.SslRMIClientSocketFactory
import joptsimple.OptionParser
import scala.jdk.CollectionConverters._
import scala.collection.mutable
import scala.math._
import kafka.utils.{Exit, Logging}
import org.apache.kafka.server.util.CommandLineUtils
/**
* A program for reading JMX metrics from a given endpoint.
*
* This tool only works reliably if the JmxServer is fully initialized prior to invoking the tool. See KAFKA-4620 for
* details.
*/
object JmxTool extends Logging {
def main(args: Array[String]): Unit = {
// Parse command line
val parser = new OptionParser(false)
val objectNameOpt =
parser.accepts("object-name", "A JMX object name to use as a query. This can contain wild cards, and this option " +
"can be given multiple times to specify more than one query. If no objects are specified " +
"all objects will be queried.")
.withRequiredArg
.describedAs("name")
.ofType(classOf[String])
val attributesOpt =
parser.accepts("attributes", "The list of attributes to include in the query. This is a comma-separated list. If no " +
"attributes are specified all objects will be queried.")
.withRequiredArg
.describedAs("name")
.ofType(classOf[String])
val reportingIntervalOpt = parser.accepts("reporting-interval", "Interval in MS with which to poll jmx stats; default value is 2 seconds. " +
"Value of -1 equivalent to setting one-time to true")
.withRequiredArg
.describedAs("ms")
.ofType(classOf[java.lang.Integer])
.defaultsTo(2000)
val oneTimeOpt = parser.accepts("one-time", "Flag to indicate run once only.")
.withRequiredArg
.describedAs("one-time")
.ofType(classOf[java.lang.Boolean])
.defaultsTo(false)
val dateFormatOpt = parser.accepts("date-format", "The date format to use for formatting the time field. " +
"See java.text.SimpleDateFormat for options.")
.withRequiredArg
.describedAs("format")
.ofType(classOf[String])
val jmxServiceUrlOpt =
parser.accepts("jmx-url", "The url to connect to poll JMX data. See Oracle javadoc for JMXServiceURL for details.")
.withRequiredArg
.describedAs("service-url")
.ofType(classOf[String])
.defaultsTo("service:jmx:rmi:///jndi/rmi://:9999/jmxrmi")
val reportFormatOpt = parser.accepts("report-format", "output format name: either 'original', 'properties', 'csv', 'tsv' ")
.withRequiredArg
.describedAs("report-format")
.ofType(classOf[java.lang.String])
.defaultsTo("original")
val jmxAuthPropOpt = parser.accepts("jmx-auth-prop", "A mechanism to pass property in the form 'username=password' " +
"when enabling remote JMX with password authentication.")
.withRequiredArg
.describedAs("jmx-auth-prop")
.ofType(classOf[String])
val jmxSslEnableOpt = parser.accepts("jmx-ssl-enable", "Flag to enable remote JMX with SSL.")
.withRequiredArg
.describedAs("ssl-enable")
.ofType(classOf[java.lang.Boolean])
.defaultsTo(false)
val waitOpt = parser.accepts("wait", "Wait for requested JMX objects to become available before starting output. " +
"Only supported when the list of objects is non-empty and contains no object name patterns.")
val helpOpt = parser.accepts("help", "Print usage information.")
if(args.isEmpty)
CommandLineUtils.printUsageAndExit(parser, "Dump JMX values to standard output.")
val options = parser.parse(args : _*)
if(options.has(helpOpt)) {
parser.printHelpOn(System.out)
Exit.exit(0)
}
val url = new JMXServiceURL(options.valueOf(jmxServiceUrlOpt))
val interval = options.valueOf(reportingIntervalOpt).intValue
val oneTime = interval < 0 || options.has(oneTimeOpt)
val attributesIncludeExists = options.has(attributesOpt)
val attributesInclude = if(attributesIncludeExists) Some(options.valueOf(attributesOpt).split(",").filterNot(_.equals(""))) else None
val dateFormatExists = options.has(dateFormatOpt)
val dateFormat = if(dateFormatExists) Some(new SimpleDateFormat(options.valueOf(dateFormatOpt))) else None
val wait = options.has(waitOpt)
val reportFormat = parseFormat(options.valueOf(reportFormatOpt).toLowerCase)
val reportFormatOriginal = reportFormat.equals("original")
val enablePasswordAuth = options.has(jmxAuthPropOpt)
val enableSsl = options.has(jmxSslEnableOpt)
var jmxc: JMXConnector = null
var mbsc: MBeanServerConnection = null
var connected = false
val connectTimeoutMs = 10000
val connectTestStarted = System.currentTimeMillis
do {
try {
System.err.println(s"Trying to connect to JMX url: $url.")
val env = new java.util.HashMap[String, AnyRef]
// ssl enable
if (enableSsl) {
val csf = new SslRMIClientSocketFactory
env.put("com.sun.jndi.rmi.factory.socket", csf)
}
// password authentication enable
if (enablePasswordAuth) {
val credentials = options.valueOf(jmxAuthPropOpt).split("=", 2)
env.put(JMXConnector.CREDENTIALS, credentials)
}
jmxc = JMXConnectorFactory.connect(url, env)
mbsc = jmxc.getMBeanServerConnection
connected = true
} catch {
case e : Exception =>
System.err.println(s"Could not connect to JMX url: $url. Exception ${e.getMessage}.")
e.printStackTrace()
Thread.sleep(100)
}
} while (System.currentTimeMillis - connectTestStarted < connectTimeoutMs && !connected)
if (!connected) {
System.err.println(s"Could not connect to JMX url $url after $connectTimeoutMs ms.")
System.err.println("Exiting.")
sys.exit(1)
}
val queries: Iterable[ObjectName] =
if(options.has(objectNameOpt))
options.valuesOf(objectNameOpt).asScala.map(new ObjectName(_))
else
List(null)
val hasPatternQueries = queries.filterNot(Objects.isNull).exists((name: ObjectName) => name.isPattern)
var names: Iterable[ObjectName] = null
def namesSet = Option(names).toSet.flatten
def foundAllObjects = queries.toSet == namesSet
val waitTimeoutMs = 10000
if (!hasPatternQueries) {
val start = System.currentTimeMillis
do {
if (names != null) {
System.err.println("Could not find all object names, retrying")
Thread.sleep(100)
}
names = queries.flatMap((name: ObjectName) => mbsc.queryNames(name, null).asScala)
} while (wait && System.currentTimeMillis - start < waitTimeoutMs && !foundAllObjects)
}
if (wait && !foundAllObjects) {
val missing = (queries.toSet - namesSet).mkString(", ")
System.err.println(s"Could not find all requested object names after $waitTimeoutMs ms. Missing $missing")
System.err.println("Exiting.")
sys.exit(1)
}
val numExpectedAttributes: Map[ObjectName, Int] =
if (!attributesIncludeExists)
names.map{name: ObjectName =>
val mbean = mbsc.getMBeanInfo(name)
(name, mbsc.getAttributes(name, mbean.getAttributes.map(_.getName)).size)}.toMap
else {
if (!hasPatternQueries)
names.map{name: ObjectName =>
val mbean = mbsc.getMBeanInfo(name)
val attributes = mbsc.getAttributes(name, mbean.getAttributes.map(_.getName))
val expectedAttributes = attributes.asScala.asInstanceOf[mutable.Buffer[Attribute]]
.filter(attr => attributesInclude.get.contains(attr.getName))
(name, expectedAttributes.size)}.toMap.filter(_._2 > 0)
else
queries.map((_, attributesInclude.get.length)).toMap
}
if(numExpectedAttributes.isEmpty) {
CommandLineUtils.printUsageAndExit(parser, s"No matched attributes for the queried objects $queries.")
}
// print csv header
val keys = List("time") ++ queryAttributes(mbsc, names, attributesInclude).keys.toArray.sorted
if(reportFormatOriginal && keys.size == numExpectedAttributes.values.sum + 1) {
println(keys.map("\"" + _ + "\"").mkString(","))
}
var keepGoing = true
while (keepGoing) {
val start = System.currentTimeMillis
val attributes = queryAttributes(mbsc, names, attributesInclude)
attributes("time") = dateFormat match {
case Some(dFormat) => dFormat.format(new Date)
case None => System.currentTimeMillis().toString
}
if(attributes.keySet.size == numExpectedAttributes.values.sum + 1) {
if(reportFormatOriginal) {
println(keys.map(attributes(_)).mkString(","))
}
else if(reportFormat.equals("properties")) {
keys.foreach( k => { println(k + "=" + attributes(k) ) } )
}
else if(reportFormat.equals("csv")) {
keys.foreach( k => { println(k + ",\"" + attributes(k) + "\"" ) } )
}
else { // tsv
keys.foreach( k => { println(k + "\t" + attributes(k) ) } )
}
}
if (oneTime) {
keepGoing = false
}
else {
val sleep = max(0, interval - (System.currentTimeMillis - start))
Thread.sleep(sleep)
}
}
}
def queryAttributes(mbsc: MBeanServerConnection, names: Iterable[ObjectName], attributesInclude: Option[Array[String]]): mutable.Map[String, Any] = {
val attributes = new mutable.HashMap[String, Any]()
for (name <- names) {
val mbean = mbsc.getMBeanInfo(name)
for (attrObj <- mbsc.getAttributes(name, mbean.getAttributes.map(_.getName)).asScala) {
val attr = attrObj.asInstanceOf[Attribute]
attributesInclude match {
case Some(allowedAttributes) =>
if (allowedAttributes.contains(attr.getName))
attributes(name.toString + ":" + attr.getName) = attr.getValue
case None => attributes(name.toString + ":" + attr.getName) = attr.getValue
}
}
}
attributes
}
def parseFormat(reportFormatOpt : String): String = reportFormatOpt match {
case "properties" => "properties"
case "csv" => "csv"
case "tsv" => "tsv"
case _ => "original"
}
}

View File

@ -141,7 +141,7 @@ class JmxMixin(object):
self.read_jmx_output(self.idx(node), node)
def jmx_class_name(self):
return "kafka.tools.JmxTool"
return "org.apache.kafka.tools.JmxTool"
class JmxTool(JmxMixin, KafkaPathResolverMixin):
"""

View File

@ -0,0 +1,430 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tools;
import joptsimple.OptionSpec;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.util.CommandDefaultOptions;
import org.apache.kafka.server.util.CommandLineUtils;
import javax.management.Attribute;
import javax.management.AttributeList;
import javax.management.MBeanFeatureInfo;
import javax.management.MBeanInfo;
import javax.management.MBeanServerConnection;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
import javax.rmi.ssl.SslRMIClientSocketFactory;
import java.io.IOException;
import java.net.MalformedURLException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.function.BiPredicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* A program for reading JMX metrics from a given endpoint.
* <p>
* This tool only works reliably if the JmxServer is fully initialized prior to invoking the tool.
* See KAFKA-4620 for details.
*/
public class JmxTool {
public static void main(String[] args) {
try {
JmxToolOptions options = new JmxToolOptions(args);
if (CommandLineUtils.isPrintHelpNeeded(options)) {
CommandLineUtils.printUsageAndExit(options.parser, "Dump JMX values to standard output.");
return;
}
if (CommandLineUtils.isPrintVersionNeeded(options)) {
CommandLineUtils.printVersionAndExit();
return;
}
Optional<String[]> attributesInclude = options.attributesInclude();
Optional<DateFormat> dateFormat = options.dateFormat();
String reportFormat = options.parseFormat();
boolean keepGoing = true;
MBeanServerConnection conn = connectToBeanServer(options);
List<ObjectName> queries = options.queries();
boolean hasPatternQueries = queries.stream().filter(Objects::nonNull).anyMatch(ObjectName::isPattern);
Set<ObjectName> found = findObjectsIfNoPattern(options, conn, queries, hasPatternQueries);
Map<ObjectName, Integer> numExpectedAttributes =
findNumExpectedAttributes(conn, attributesInclude, hasPatternQueries, queries, found);
List<String> keys = new ArrayList<>();
keys.add("time");
keys.addAll(new TreeSet<>(queryAttributes(conn, found, attributesInclude).keySet()));
maybePrintCsvHeader(reportFormat, keys, numExpectedAttributes);
while (keepGoing) {
long start = System.currentTimeMillis();
Map<String, Object> attributes = queryAttributes(conn, found, attributesInclude);
attributes.put("time", dateFormat.isPresent() ? dateFormat.get().format(new Date()) : String.valueOf(System.currentTimeMillis()));
maybePrintDataRows(reportFormat, numExpectedAttributes, keys, attributes);
if (options.isOneTime()) {
keepGoing = false;
} else {
TimeUnit.MILLISECONDS.sleep(Math.max(0, options.interval() - (System.currentTimeMillis() - start)));
}
}
Exit.exit(0);
} catch (TerseException e) {
System.err.println(e.getMessage());
Exit.exit(1);
} catch (Throwable e) {
System.err.println(e.getMessage());
System.err.println(Utils.stackTrace(e));
Exit.exit(1);
}
}
private static String mkString(Stream<Object> stream, String delimeter) {
return stream.filter(Objects::nonNull).map(Object::toString).collect(Collectors.joining(delimeter));
}
private static int sumValues(Map<ObjectName, Integer> numExpectedAttributes) {
return numExpectedAttributes.values().stream().mapToInt(Integer::intValue).sum();
}
private static String[] attributesNames(MBeanInfo mBeanInfo) {
return Arrays.stream(mBeanInfo.getAttributes()).map(MBeanFeatureInfo::getName).toArray(String[]::new);
}
private static MBeanServerConnection connectToBeanServer(JmxToolOptions options) throws Exception {
JMXConnector connector;
MBeanServerConnection serverConn = null;
boolean connected = false;
long connectTimeoutMs = 10_000;
long connectTestStarted = System.currentTimeMillis();
do {
try {
// printing to stderr because system tests parse the output
System.err.printf("Trying to connect to JMX url: %s%n", options.jmxServiceURL());
Map<String, Object> env = new HashMap<>();
// ssl enable
if (options.hasJmxSslEnableOpt()) {
env.put("com.sun.jndi.rmi.factory.socket", new SslRMIClientSocketFactory());
}
// password authentication enable
if (options.hasJmxAuthPropOpt()) {
env.put(JMXConnector.CREDENTIALS, options.credentials());
}
connector = JMXConnectorFactory.connect(options.jmxServiceURL(), env);
serverConn = connector.getMBeanServerConnection();
connected = true;
} catch (Exception e) {
System.err.printf("Could not connect to JMX url: %s. Exception: %s.%n",
options.jmxServiceURL(), e.getMessage());
e.printStackTrace();
TimeUnit.MILLISECONDS.sleep(100);
}
} while (System.currentTimeMillis() - connectTestStarted < connectTimeoutMs && !connected);
if (!connected) {
throw new TerseException(String.format("Could not connect to JMX url %s after %d ms.",
options.jmxServiceURL(), connectTimeoutMs));
}
return serverConn;
}
private static Set<ObjectName> findObjectsIfNoPattern(JmxToolOptions options,
MBeanServerConnection conn,
List<ObjectName> queries,
boolean hasPatternQueries) throws Exception {
long waitTimeoutMs = 10_000;
Set<ObjectName> result = new HashSet<>();
Set<ObjectName> querySet = new HashSet<>(queries);
BiPredicate<Set<ObjectName>, Set<ObjectName>> foundAllObjects = (s1, s2) -> s1.containsAll(s2);
if (!hasPatternQueries) {
long start = System.currentTimeMillis();
do {
if (!result.isEmpty()) {
System.err.println("Could not find all object names, retrying");
TimeUnit.MILLISECONDS.sleep(100);
}
result.addAll(queryObjects(conn, queries));
} while (options.hasWait() && System.currentTimeMillis() - start < waitTimeoutMs && !foundAllObjects.test(querySet, result));
}
if (options.hasWait() && !foundAllObjects.test(querySet, result)) {
querySet.removeAll(result);
String missing = mkString(querySet.stream().map(Object::toString), ",");
throw new TerseException(String.format("Could not find all requested object names after %d ms. Missing %s", waitTimeoutMs, missing));
}
return result;
}
private static Set<ObjectName> queryObjects(MBeanServerConnection conn,
List<ObjectName> queries) {
Set<ObjectName> result = new HashSet<>();
queries.forEach(name -> {
try {
result.addAll(conn.queryNames(name, null));
} catch (IOException e) {
throw new RuntimeException(e);
}
});
return result;
}
private static Map<ObjectName, Integer> findNumExpectedAttributes(MBeanServerConnection conn,
Optional<String[]> attributesInclude,
boolean hasPatternQueries,
List<ObjectName> queries,
Set<ObjectName> found) throws Exception {
Map<ObjectName, Integer> result = new HashMap<>();
if (!attributesInclude.isPresent()) {
found.forEach(objectName -> {
try {
MBeanInfo mBeanInfo = conn.getMBeanInfo(objectName);
result.put(objectName, conn.getAttributes(objectName, attributesNames(mBeanInfo)).size());
} catch (Exception e) {
throw new RuntimeException(e);
}
});
} else {
if (!hasPatternQueries) {
found.forEach(objectName -> {
try {
MBeanInfo mBeanInfo = conn.getMBeanInfo(objectName);
AttributeList attributes = conn.getAttributes(objectName, attributesNames(mBeanInfo));
List<ObjectName> expectedAttributes = new ArrayList<>();
attributes.asList().forEach(attribute -> {
if (Arrays.asList(attributesInclude.get()).contains(attribute.getName())) {
expectedAttributes.add(objectName);
}
});
if (expectedAttributes.size() > 0) {
result.put(objectName, expectedAttributes.size());
}
} catch (Exception e) {
throw new RuntimeException(e);
}
});
} else {
queries.forEach(objectName -> result.put(objectName, attributesInclude.get().length));
}
}
if (result.isEmpty()) {
throw new TerseException(String.format("No matched attributes for the queried objects %s.", queries));
}
return result;
}
private static Map<String, Object> queryAttributes(MBeanServerConnection conn,
Set<ObjectName> objectNames,
Optional<String[]> attributesInclude) throws Exception {
Map<String, Object> result = new HashMap<>();
for (ObjectName objectName : objectNames) {
MBeanInfo beanInfo = conn.getMBeanInfo(objectName);
AttributeList attributes = conn.getAttributes(objectName,
Arrays.stream(beanInfo.getAttributes()).map(a -> a.getName()).toArray(String[]::new));
for (Attribute attribute : attributes.asList()) {
if (attributesInclude.isPresent()) {
if (Arrays.asList(attributesInclude.get()).contains(attribute.getName())) {
result.put(String.format("%s:%s", objectName.toString(), attribute.getName()),
attribute.getValue());
}
} else {
result.put(String.format("%s:%s", objectName.toString(), attribute.getName()),
attribute.getValue());
}
}
}
return result;
}
private static void maybePrintCsvHeader(String reportFormat, List<String> keys, Map<ObjectName, Integer> numExpectedAttributes) {
if (reportFormat.equals("original") && keys.size() == sumValues(numExpectedAttributes) + 1) {
System.out.println(mkString(keys.stream().map(key -> String.format("\"%s\"", key)), ","));
}
}
private static void maybePrintDataRows(String reportFormat, Map<ObjectName, Integer> numExpectedAttributes, List<String> keys, Map<String, Object> attributes) {
if (attributes.size() == sumValues(numExpectedAttributes) + 1) {
switch (reportFormat) {
case "properties":
keys.forEach(key -> System.out.printf("%s=%s%n", key, attributes.get(key)));
break;
case "csv":
keys.forEach(key -> System.out.printf("%s,\"%s\"%n", key, attributes.get(key)));
break;
case "tsv":
keys.forEach(key -> System.out.printf("%s\t%s%n", key, attributes.get(key)));
break;
default:
System.out.println(mkString(keys.stream().map(attributes::get), ","));
break;
}
}
}
private static class JmxToolOptions extends CommandDefaultOptions {
private final OptionSpec<String> objectNameOpt;
private final OptionSpec<String> attributesOpt;
private final OptionSpec<Integer> reportingIntervalOpt;
private final OptionSpec<Boolean> oneTimeOpt;
private final OptionSpec<String> dateFormatOpt;
private final OptionSpec<String> jmxServiceUrlOpt;
private final OptionSpec<String> reportFormatOpt;
private final OptionSpec<String> jmxAuthPropOpt;
private final OptionSpec<Boolean> jmxSslEnableOpt;
private final OptionSpec<Void> waitOpt;
public JmxToolOptions(String[] args) {
super(args);
objectNameOpt = parser.accepts("object-name", "A JMX object name to use as a query. This can contain wild cards, and this option " +
"can be given multiple times to specify more than one query. If no objects are specified " +
"all objects will be queried.")
.withRequiredArg()
.describedAs("name")
.ofType(String.class);
attributesOpt = parser.accepts("attributes", "The list of attributes to include in the query. This is a comma-separated list. If no " +
"attributes are specified all objects will be queried.")
.withRequiredArg()
.describedAs("name")
.ofType(String.class);
reportingIntervalOpt = parser.accepts("reporting-interval", "Interval in MS with which to poll jmx stats; default value is 2 seconds. " +
"Value of -1 equivalent to setting one-time to true")
.withRequiredArg()
.describedAs("ms")
.ofType(Integer.class)
.defaultsTo(2000);
oneTimeOpt = parser.accepts("one-time", "Flag to indicate run once only.")
.withOptionalArg()
.describedAs("one-time")
.ofType(Boolean.class)
.defaultsTo(false);
dateFormatOpt = parser.accepts("date-format", "The date format to use for formatting the time field. " +
"See java.text.SimpleDateFormat for options.")
.withRequiredArg()
.describedAs("format")
.ofType(String.class);
jmxServiceUrlOpt = parser.accepts("jmx-url", "The url to connect to poll JMX data. See Oracle javadoc for JMXServiceURL for details.")
.withRequiredArg()
.describedAs("service-url")
.ofType(String.class)
.defaultsTo("service:jmx:rmi:///jndi/rmi://:9999/jmxrmi");
reportFormatOpt = parser.accepts("report-format", "output format name: either 'original', 'properties', 'csv', 'tsv' ")
.withRequiredArg()
.describedAs("report-format")
.ofType(String.class)
.defaultsTo("original");
jmxAuthPropOpt = parser.accepts("jmx-auth-prop", "A mechanism to pass property in the form 'username=password' " +
"when enabling remote JMX with password authentication.")
.withRequiredArg()
.describedAs("jmx-auth-prop")
.ofType(String.class);
jmxSslEnableOpt = parser.accepts("jmx-ssl-enable", "Flag to enable remote JMX with SSL.")
.withRequiredArg()
.describedAs("ssl-enable")
.ofType(Boolean.class)
.defaultsTo(false);
waitOpt = parser.accepts("wait", "Wait for requested JMX objects to become available before starting output. " +
"Only supported when the list of objects is non-empty and contains no object name patterns.");
options = parser.parse(args);
}
public JMXServiceURL jmxServiceURL() throws MalformedURLException {
return new JMXServiceURL(options.valueOf(jmxServiceUrlOpt));
}
public int interval() {
return options.valueOf(reportingIntervalOpt);
}
public boolean isOneTime() {
return interval() < 0 || options.has(oneTimeOpt);
}
public Optional<String[]> attributesInclude() {
if (options.has(attributesOpt)) {
String[] attributes = Arrays.stream(options.valueOf(attributesOpt).split(","))
.sequential().filter(s -> !s.isEmpty()).toArray(String[]::new);
return Optional.of(attributes);
} else {
return Optional.empty();
}
}
public Optional<DateFormat> dateFormat() {
return options.has(dateFormatOpt)
? Optional.of(new SimpleDateFormat(options.valueOf(dateFormatOpt)))
: Optional.empty();
}
public boolean hasWait() {
return options.has(waitOpt);
}
private String parseFormat() {
String reportFormat = options.valueOf(reportFormatOpt).toLowerCase(Locale.ROOT);
return Arrays.asList("properties", "csv", "tsv").contains(reportFormat) ? reportFormat : "original";
}
public boolean hasJmxAuthPropOpt() {
return options.has(jmxAuthPropOpt);
}
public boolean hasJmxSslEnableOpt() {
return options.has(jmxSslEnableOpt);
}
public String[] credentials() {
return options.valueOf(jmxAuthPropOpt).split("=", 2);
}
public List<ObjectName> queries() {
if (options.has(objectNameOpt)) {
return options.valuesOf(objectNameOpt).stream()
.map(s -> {
try {
return new ObjectName(s);
} catch (MalformedObjectNameException e) {
throw new RuntimeException(e);
}
}).collect(Collectors.toList());
} else {
List<ObjectName> listWithNull = new ArrayList<>();
listWithNull.add(null);
return listWithNull;
}
}
}
}

View File

@ -0,0 +1,281 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tools;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.Exit;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import javax.management.remote.JMXConnectorServer;
import javax.management.remote.JMXConnectorServerFactory;
import javax.management.remote.JMXServiceURL;
import java.lang.management.ManagementFactory;
import java.net.ServerSocket;
import java.rmi.registry.LocateRegistry;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class JmxToolTest {
private final ToolsTestUtils.MockExitProcedure exitProcedure = new ToolsTestUtils.MockExitProcedure();
private static JMXConnectorServer jmxAgent;
private static String jmxUrl;
@BeforeAll
public static void beforeAll() throws Exception {
int port = findRandomOpenPortOnAllLocalInterfaces();
jmxAgent = startJmxAgent(port);
jmxUrl = String.format("service:jmx:rmi:///jndi/rmi://:%d/jmxrmi", port);
}
@AfterAll
public static void afterAll() throws Exception {
jmxAgent.stop();
}
@BeforeEach
public void beforeEach() {
Exit.setExitProcedure(exitProcedure);
}
@AfterEach
public void afterEach() {
Exit.resetExitProcedure();
}
@Test
public void kafkaVersion() {
String out = executeAndGetOut("--version");
assertNormalExit();
assertTrue(out.contains(AppInfoParser.getVersion()));
}
@Test
public void unrecognizedOption() {
String err = executeAndGetErr("--foo");
assertCommandFailure();
assertTrue(err.contains("UnrecognizedOptionException"));
assertTrue(err.contains("foo"));
}
@Test
public void missingRequired() {
String err = executeAndGetErr("--reporting-interval");
assertCommandFailure();
assertTrue(err.contains("OptionMissingRequiredArgumentException"));
assertTrue(err.contains("reporting-interval"));
}
@Test
public void malformedURL() {
String err = executeAndGetErr("--jmx-url", "localhost:9999");
assertCommandFailure();
assertTrue(err.contains("MalformedURLException"));
}
@Test
public void helpOptions() {
String[] expectedOptions = new String[]{
"--attributes", "--date-format", "--help", "--jmx-auth-prop",
"--jmx-ssl-enable", "--jmx-url", "--object-name", "--one-time",
"--report-format", "--reporting-interval", "--version", "--wait"
};
String err = executeAndGetErr("--help");
assertCommandFailure();
for (String option : expectedOptions) {
assertTrue(err.contains(option), option);
}
}
@Test
public void csvFormat() {
String[] args = new String[]{
"--jmx-url", jmxUrl,
"--object-name", "kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec",
"--report-format", "csv",
"--one-time"
};
String out = executeAndGetOut(args);
Arrays.stream(out.split("\\r?\\n")).forEach(line -> {
assertTrue(line.matches("([a-zA-Z0-9=:,.]+),\"([ -~]+)\""), line);
});
}
@Test
public void tsvFormat() {
String[] args = new String[]{
"--jmx-url", jmxUrl,
"--object-name", "kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec",
"--report-format", "tsv",
"--one-time"
};
String out = executeAndGetOut(args);
Arrays.stream(out.split("\\r?\\n")).forEach(line -> {
assertTrue(line.matches("([a-zA-Z0-9=:,.]+)\\t([ -~]+)"), line);
});
}
@Test
public void allMetrics() {
String[] args = new String[]{
"--jmx-url", jmxUrl,
"--report-format", "csv",
"--reporting-interval", "-1"
};
String out = executeAndGetOut(args);
assertNormalExit();
Map<String, String> csv = parseCsv(out);
assertTrue(csv.size() > 0);
}
@Test
public void filteredMetrics() {
String[] args = new String[]{
"--jmx-url", jmxUrl,
"--object-name", "kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec",
"--attributes", "FifteenMinuteRate,FiveMinuteRate",
"--report-format", "csv",
"--one-time"
};
String out = executeAndGetOut(args);
assertNormalExit();
Map<String, String> csv = parseCsv(out);
assertEquals("1.0", csv.get("kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:FifteenMinuteRate"));
assertEquals("3.0", csv.get("kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:FiveMinuteRate"));
}
@Test
public void dateFormat() {
String dateFormat = "yyyyMMdd-hh:mm:ss";
String[] args = new String[]{
"--jmx-url", jmxUrl,
"--date-format", dateFormat,
"--report-format", "csv",
"--one-time"
};
String out = executeAndGetOut(args);
assertNormalExit();
Map<String, String> csv = parseCsv(out);
assertTrue(validDateFormat(dateFormat, csv.get("time")));
}
private static JMXConnectorServer startJmxAgent(int port) throws Exception {
LocateRegistry.createRegistry(port);
Map<String, Object> env = new HashMap<>();
env.put("com.sun.management.jmxremote.authenticate", "false");
env.put("com.sun.management.jmxremote.ssl", "false");
JMXServiceURL url = new JMXServiceURL(String.format("service:jmx:rmi:///jndi/rmi://:%d/jmxrmi", port));
MBeanServer server = ManagementFactory.getPlatformMBeanServer();
server.registerMBean(new Metrics(),
new ObjectName("kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec"));
JMXConnectorServer agent = JMXConnectorServerFactory.newJMXConnectorServer(url, env, server);
agent.start();
return agent;
}
private static int findRandomOpenPortOnAllLocalInterfaces() throws Exception {
try (ServerSocket socket = new ServerSocket(0)) {
return socket.getLocalPort();
}
}
private String executeAndGetOut(String... args) {
return execute(args, false);
}
private String executeAndGetErr(String... args) {
return execute(args, true);
}
private String execute(String[] args, boolean err) {
Runnable runnable = () -> {
try {
JmxTool.main(args);
} catch (Exception e) {
throw new RuntimeException(e);
}
};
return err ? ToolsTestUtils.captureStandardErr(runnable)
: ToolsTestUtils.captureStandardOut(runnable);
}
private void assertNormalExit() {
assertTrue(exitProcedure.hasExited());
assertEquals(0, exitProcedure.statusCode());
}
private void assertCommandFailure() {
assertTrue(exitProcedure.hasExited());
assertEquals(1, exitProcedure.statusCode());
}
private Map<String, String> parseCsv(String value) {
Map<String, String> result = new HashMap<>();
Arrays.stream(value.split("\\r?\\n")).forEach(line -> {
String[] cells = line.split(",\"");
if (cells.length == 2) {
result.put(cells[0], cells[1].replaceAll("\"", ""));
}
});
return result;
}
private boolean validDateFormat(String format, String value) {
DateFormat formatter = new SimpleDateFormat(format);
formatter.setLenient(false);
try {
formatter.parse(value);
return true;
} catch (ParseException e) {
return false;
}
}
public interface MetricsMBean {
double getFifteenMinuteRate();
double getFiveMinuteRate();
}
public static class Metrics implements MetricsMBean {
@Override
public double getFifteenMinuteRate() {
return 1.0;
}
@Override
public double getFiveMinuteRate() {
return 3.0;
}
}
}

View File

@ -16,11 +16,12 @@
*/
package org.apache.kafka.tools;
import org.apache.kafka.common.utils.Exit;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
public class ToolsTestUtils {
public static String captureStandardOut(Runnable runnable) {
return captureStandardStream(false, runnable);
}
@ -48,4 +49,24 @@ public class ToolsTestUtils {
}
}
public static class MockExitProcedure implements Exit.Procedure {
private boolean hasExited = false;
private int statusCode;
@Override
public void execute(int statusCode, String message) {
if (!this.hasExited) {
this.hasExited = true;
this.statusCode = statusCode;
}
}
public boolean hasExited() {
return hasExited;
}
public int statusCode() {
return statusCode;
}
}
}

View File

@ -79,7 +79,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class TransactionsCommandTest {
private final MockExitProcedure exitProcedure = new MockExitProcedure();
private final ToolsTestUtils.MockExitProcedure exitProcedure = new ToolsTestUtils.MockExitProcedure();
private final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
private final PrintStream out = new PrintStream(outputStream);
private final MockTime time = new MockTime();
@ -1048,27 +1048,14 @@ public class TransactionsCommandTest {
}
private void assertNormalExit() {
assertTrue(exitProcedure.hasExited);
assertEquals(0, exitProcedure.statusCode);
assertTrue(exitProcedure.hasExited());
assertEquals(0, exitProcedure.statusCode());
}
private void assertCommandFailure(String[] args) throws Exception {
execute(args);
assertTrue(exitProcedure.hasExited);
assertEquals(1, exitProcedure.statusCode);
}
private static class MockExitProcedure implements Exit.Procedure {
private boolean hasExited = false;
private int statusCode;
@Override
public void execute(int statusCode, String message) {
if (!this.hasExited) {
this.hasExited = true;
this.statusCode = statusCode;
}
}
assertTrue(exitProcedure.hasExited());
assertEquals(1, exitProcedure.statusCode());
}
}