mirror of https://github.com/apache/kafka.git
MINOR: Disable JmxTool in kafkatest console-consumer by default (#7785)
Do not initialize `JmxTool` by default when running console consumer. In order to support this, we remove `has_partitions_assigned` and its only usage in an assertion inside `ProduceConsumeValidateTest`, which did not seem to contribute much to the validation. Reviewers: David Arthur <mumrah@gmail.com>, Jason Gustafson <jason@confluent.io>
This commit is contained in:
parent
b13a7ff707
commit
179d0d73d6
|
@ -249,7 +249,6 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
|
||||||
consumer_output = node.account.ssh_capture(cmd, allow_fail=False)
|
consumer_output = node.account.ssh_capture(cmd, allow_fail=False)
|
||||||
|
|
||||||
with self.lock:
|
with self.lock:
|
||||||
self._init_jmx_attributes()
|
|
||||||
self.logger.debug("collecting following jmx objects: %s", self.jmx_object_names)
|
self.logger.debug("collecting following jmx objects: %s", self.jmx_object_names)
|
||||||
self.start_jmx_tool(idx, node)
|
self.start_jmx_tool(idx, node)
|
||||||
|
|
||||||
|
@ -292,28 +291,3 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
|
||||||
|
|
||||||
def java_class_name(self):
|
def java_class_name(self):
|
||||||
return "ConsoleConsumer"
|
return "ConsoleConsumer"
|
||||||
|
|
||||||
def has_partitions_assigned(self, node):
|
|
||||||
if self.new_consumer is False:
|
|
||||||
return False
|
|
||||||
idx = self.idx(node)
|
|
||||||
with self.lock:
|
|
||||||
self._init_jmx_attributes()
|
|
||||||
self.start_jmx_tool(idx, node)
|
|
||||||
self.read_jmx_output(idx, node)
|
|
||||||
if not self.assigned_partitions_jmx_attr in self.maximum_jmx_value:
|
|
||||||
return False
|
|
||||||
self.logger.debug("Number of partitions assigned %f" % self.maximum_jmx_value[self.assigned_partitions_jmx_attr])
|
|
||||||
return self.maximum_jmx_value[self.assigned_partitions_jmx_attr] > 0.0
|
|
||||||
|
|
||||||
def _init_jmx_attributes(self):
|
|
||||||
# Must hold lock
|
|
||||||
if self.new_consumer:
|
|
||||||
# We use a flag to track whether we're using this automatically generated ID because the service could be
|
|
||||||
# restarted multiple times and the client ID may be changed.
|
|
||||||
if getattr(self, '_automatic_metrics', False) or not self.jmx_object_names:
|
|
||||||
self._automatic_metrics = True
|
|
||||||
self.jmx_object_names = ["kafka.consumer:type=consumer-coordinator-metrics,client-id=%s" % self.client_id]
|
|
||||||
self.jmx_attributes = ["assigned-partitions"]
|
|
||||||
self.assigned_partitions_jmx_attr = "kafka.consumer:type=consumer-coordinator-metrics,client-id=%s:assigned-partitions" % self.client_id
|
|
||||||
|
|
||||||
|
|
|
@ -18,8 +18,6 @@ from ducktape.utils.util import wait_until
|
||||||
|
|
||||||
from kafkatest.utils import validate_delivery
|
from kafkatest.utils import validate_delivery
|
||||||
|
|
||||||
import time
|
|
||||||
|
|
||||||
class ProduceConsumeValidateTest(Test):
|
class ProduceConsumeValidateTest(Test):
|
||||||
"""This class provides a shared template for tests which follow the common pattern of:
|
"""This class provides a shared template for tests which follow the common pattern of:
|
||||||
|
|
||||||
|
@ -56,20 +54,10 @@ class ProduceConsumeValidateTest(Test):
|
||||||
if (self.consumer_init_timeout_sec > 0):
|
if (self.consumer_init_timeout_sec > 0):
|
||||||
self.logger.debug("Waiting %ds for the consumer to initialize.",
|
self.logger.debug("Waiting %ds for the consumer to initialize.",
|
||||||
self.consumer_init_timeout_sec)
|
self.consumer_init_timeout_sec)
|
||||||
start = int(time.time())
|
|
||||||
wait_until(lambda: self.consumer.alive(self.consumer.nodes[0]) is True,
|
wait_until(lambda: self.consumer.alive(self.consumer.nodes[0]) is True,
|
||||||
timeout_sec=self.consumer_init_timeout_sec,
|
timeout_sec=self.consumer_init_timeout_sec,
|
||||||
err_msg="Consumer process took more than %d s to fork" %\
|
err_msg="Consumer process took more than %d s to fork" %\
|
||||||
self.consumer_init_timeout_sec)
|
self.consumer_init_timeout_sec)
|
||||||
end = int(time.time())
|
|
||||||
remaining_time = self.consumer_init_timeout_sec - (end - start)
|
|
||||||
if remaining_time < 0 :
|
|
||||||
remaining_time = 0
|
|
||||||
if self.consumer.new_consumer:
|
|
||||||
wait_until(lambda: self.consumer.has_partitions_assigned(self.consumer.nodes[0]) is True,
|
|
||||||
timeout_sec=remaining_time,
|
|
||||||
err_msg="Consumer process took more than %d s to have partitions assigned" %\
|
|
||||||
remaining_time)
|
|
||||||
|
|
||||||
self.producer.start()
|
self.producer.start()
|
||||||
wait_until(lambda: self.producer.num_acked > 5,
|
wait_until(lambda: self.producer.num_acked > 5,
|
||||||
|
|
Loading…
Reference in New Issue