mirror of https://github.com/apache/kafka.git
KAFKA-5608: Add --wait option for JmxTool and use in system tests to avoid race between JmxTool and monitored services
Author: Ewen Cheslack-Postava <me@ewencp.org>
Author: Ewen Cheslack-Postava <ewen@confluent.io>
Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
Closes #3547 from ewencp/wait-jmx-metrics
(cherry picked from commit f50af9c31d)
Signed-off-by: Ewen Cheslack-Postava <me@ewencp.org>
This commit is contained in:
parent
63a06c853e
commit
acfcc1f915
|
|
@ -72,7 +72,9 @@ object JmxTool extends Logging {
|
|||
.describedAs("service-url")
|
||||
.ofType(classOf[String])
|
||||
.defaultsTo("service:jmx:rmi:///jndi/rmi://:9999/jmxrmi")
|
||||
|
||||
val waitOpt = parser.accepts("wait", "Wait for requested JMX objects to become available before starting output. " +
|
||||
"Only supported when the list of objects is non-empty and contains no object name patterns.")
|
||||
|
||||
if(args.length == 0)
|
||||
CommandLineUtils.printUsageAndDie(parser, "Dump JMX values to standard output.")
|
||||
|
||||
|
|
@ -89,13 +91,14 @@ object JmxTool extends Logging {
|
|||
val attributesWhitelist = if(attributesWhitelistExists) Some(options.valueOf(attributesOpt).split(",")) else None
|
||||
val dateFormatExists = options.has(dateFormatOpt)
|
||||
val dateFormat = if(dateFormatExists) Some(new SimpleDateFormat(options.valueOf(dateFormatOpt))) else None
|
||||
val wait = options.has(waitOpt)
|
||||
|
||||
var jmxc: JMXConnector = null
|
||||
var mbsc: MBeanServerConnection = null
|
||||
var retries = 0
|
||||
val maxNumRetries = 10
|
||||
var connected = false
|
||||
while (retries < maxNumRetries && !connected) {
|
||||
val connectTimeoutMs = 10000
|
||||
val connectTestStarted = System.currentTimeMillis
|
||||
do {
|
||||
try {
|
||||
System.err.println(s"Trying to connect to JMX url: $url.")
|
||||
jmxc = JMXConnectorFactory.connect(url, null)
|
||||
|
|
@ -105,13 +108,12 @@ object JmxTool extends Logging {
|
|||
case e : Exception =>
|
||||
System.err.println(s"Could not connect to JMX url: $url. Exception ${e.getMessage}.")
|
||||
e.printStackTrace()
|
||||
retries += 1
|
||||
Thread.sleep(500)
|
||||
Thread.sleep(100)
|
||||
}
|
||||
}
|
||||
} while (System.currentTimeMillis - connectTestStarted < connectTimeoutMs && !connected)
|
||||
|
||||
if (!connected) {
|
||||
System.err.println(s"Could not connect to JMX url $url after $maxNumRetries retries.")
|
||||
System.err.println(s"Could not connect to JMX url $url after $connectTimeoutMs ms.")
|
||||
System.err.println("Exiting.")
|
||||
sys.exit(1)
|
||||
}
|
||||
|
|
@ -122,7 +124,29 @@ object JmxTool extends Logging {
|
|||
else
|
||||
List(null)
|
||||
|
||||
val names = queries.flatMap((name: ObjectName) => mbsc.queryNames(name, null).asScala)
|
||||
val hasPatternQueries = queries.exists((name: ObjectName) => name.isPattern)
|
||||
|
||||
var names: Iterable[ObjectName] = null
|
||||
def namesSet = if (names == null) null else names.toSet
|
||||
def foundAllObjects() = !queries.toSet.equals(namesSet)
|
||||
val waitTimeoutMs = 10000
|
||||
if (!hasPatternQueries) {
|
||||
val start = System.currentTimeMillis
|
||||
do {
|
||||
if (names != null) {
|
||||
System.err.println("Could not find all object names, retrying")
|
||||
Thread.sleep(100)
|
||||
}
|
||||
names = queries.flatMap((name: ObjectName) => mbsc.queryNames(name, null).asScala)
|
||||
} while (wait && System.currentTimeMillis - start < waitTimeoutMs && foundAllObjects)
|
||||
}
|
||||
|
||||
if (wait && foundAllObjects) {
|
||||
val missing = (queries.toSet - namesSet).mkString(", ")
|
||||
System.err.println(s"Could not find all requested object names after $waitTimeoutMs ms. Missing $missing")
|
||||
System.err.println("Exiting.")
|
||||
sys.exit(1)
|
||||
}
|
||||
|
||||
val numExpectedAttributes: Map[ObjectName, Int] =
|
||||
if (attributesWhitelistExists)
|
||||
|
|
|
|||
|
|
@ -256,9 +256,10 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
|
|||
|
||||
consumer_output = node.account.ssh_capture(cmd, allow_fail=False)
|
||||
|
||||
self.init_jmx_attributes()
|
||||
self.logger.debug("collecting following jmx objects: %s", self.jmx_object_names)
|
||||
self.start_jmx_tool(idx, node)
|
||||
with self.lock:
|
||||
self._init_jmx_attributes()
|
||||
self.logger.debug("collecting following jmx objects: %s", self.jmx_object_names)
|
||||
self.start_jmx_tool(idx, node)
|
||||
|
||||
for line in consumer_output:
|
||||
msg = line.strip()
|
||||
|
|
@ -273,7 +274,8 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
|
|||
if msg is not None:
|
||||
self.messages_consumed[idx].append(msg)
|
||||
|
||||
self.read_jmx_output(idx, node)
|
||||
with self.lock:
|
||||
self.read_jmx_output(idx, node)
|
||||
|
||||
def start_node(self, node):
|
||||
BackgroundThreadService.start_node(self, node)
|
||||
|
|
@ -295,20 +297,22 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
|
|||
self.security_config.clean_node(node)
|
||||
|
||||
def has_partitions_assigned(self, node):
|
||||
if self.new_consumer is False:
|
||||
return False
|
||||
idx = self.idx(node)
|
||||
self.init_jmx_attributes()
|
||||
self.start_jmx_tool(idx, node)
|
||||
self.read_jmx_output(idx, node)
|
||||
if not self.assigned_partitions_jmx_attr in self.maximum_jmx_value:
|
||||
return False
|
||||
self.logger.debug("Number of partitions assigned %f" % self.maximum_jmx_value[self.assigned_partitions_jmx_attr])
|
||||
return self.maximum_jmx_value[self.assigned_partitions_jmx_attr] > 0.0
|
||||
if self.new_consumer is False:
|
||||
return False
|
||||
idx = self.idx(node)
|
||||
with self.lock:
|
||||
self._init_jmx_attributes()
|
||||
self.start_jmx_tool(idx, node)
|
||||
self.read_jmx_output(idx, node)
|
||||
if not self.assigned_partitions_jmx_attr in self.maximum_jmx_value:
|
||||
return False
|
||||
self.logger.debug("Number of partitions assigned %f" % self.maximum_jmx_value[self.assigned_partitions_jmx_attr])
|
||||
return self.maximum_jmx_value[self.assigned_partitions_jmx_attr] > 0.0
|
||||
|
||||
def init_jmx_attributes(self):
|
||||
if self.new_consumer is True:
|
||||
if self.jmx_object_names is None:
|
||||
def _init_jmx_attributes(self):
|
||||
# Must hold lock
|
||||
if self.new_consumer:
|
||||
if not self.jmx_object_names:
|
||||
self.jmx_object_names = []
|
||||
self.jmx_object_names += ["kafka.consumer:type=consumer-coordinator-metrics,client-id=%s" % self.client_id]
|
||||
self.jmx_attributes += ["assigned-partitions"]
|
||||
|
|
|
|||
|
|
@ -15,7 +15,7 @@
|
|||
|
||||
from ducktape.cluster.remoteaccount import RemoteCommandError
|
||||
from ducktape.utils.util import wait_until
|
||||
|
||||
from kafkatest.version import get_version, V_0_11_0_0, DEV_BRANCH
|
||||
|
||||
class JmxMixin(object):
|
||||
"""This mixin helps existing service subclasses start JmxTool on their worker nodes and collect jmx stats.
|
||||
|
|
@ -23,6 +23,7 @@ class JmxMixin(object):
|
|||
A couple things worth noting:
|
||||
- this is not a service in its own right.
|
||||
- we assume the service using JmxMixin also uses KafkaPathResolverMixin
|
||||
- this uses the --wait option for JmxTool, so the list of object names must be explicit; no patterns are permitted
|
||||
"""
|
||||
def __init__(self, num_nodes, jmx_object_names=None, jmx_attributes=None):
|
||||
self.jmx_object_names = jmx_object_names
|
||||
|
|
@ -59,8 +60,14 @@ class JmxMixin(object):
|
|||
wait_until(check_jmx_port_listening, timeout_sec=30, backoff_sec=.1,
|
||||
err_msg="%s: Never saw JMX port for %s start listening" % (node.account, self))
|
||||
|
||||
cmd = "%s kafka.tools.JmxTool " % self.path.script("kafka-run-class.sh", node)
|
||||
# To correctly wait for requested JMX metrics to be added we need the --wait option for JmxTool. This option was
|
||||
# not added until 0.11.0.1, so any earlier versions need to use JmxTool from a newer version.
|
||||
use_jmxtool_version = get_version(node)
|
||||
if use_jmxtool_version <= V_0_11_0_0:
|
||||
use_jmxtool_version = DEV_BRANCH
|
||||
cmd = "%s kafka.tools.JmxTool " % self.path.script("kafka-run-class.sh", use_jmxtool_version)
|
||||
cmd += "--reporting-interval 1000 --jmx-url service:jmx:rmi:///jndi/rmi://127.0.0.1:%d/jmxrmi" % self.jmx_port
|
||||
cmd += " --wait"
|
||||
for jmx_object_name in self.jmx_object_names:
|
||||
cmd += " --object-name %s" % jmx_object_name
|
||||
for jmx_attribute in self.jmx_attributes:
|
||||
|
|
|
|||
Loading…
Reference in New Issue