From 686c02cf35baf0c0eec15324f0b45b8b6e7c1494 Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Mon, 20 Nov 2017 14:47:29 -0800 Subject: [PATCH] MINOR: Add HttpMetricsReporter for system tests Author: Ewen Cheslack-Postava Reviewers: Apurva Mehta , Ismael Juma Closes #4072 from ewencp/http-metrics *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* Author: Ewen Cheslack-Postava Reviewers: Apurva Mehta , Ismael Juma Closes #4207 from ewencp/http-metrics-0.11.0 --- .travis.yml | 2 +- build.gradle | 4 + .../kafka/common/metrics/KafkaMetric.java | 3 +- tests/README.md | 15 +- tests/docker/ducker-ak | 3 +- .../kafkatest/directory_layout/kafka_path.py | 16 +- tests/kafkatest/services/monitor/http.py | 226 ++++++++++++ .../services/performance/performance.py | 2 +- .../performance/producer_performance.py | 35 +- tests/kafkatest/tests/client/quota_test.py | 8 +- tests/kafkatest/tests/core/throttling_test.py | 10 +- .../kafka/tools/PushHttpMetricsReporter.java | 332 +++++++++++++++++ .../tools/PushHttpMetricsReporterTest.java | 333 ++++++++++++++++++ 13 files changed, 939 insertions(+), 50 deletions(-) create mode 100644 tests/kafkatest/services/monitor/http.py create mode 100644 tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java create mode 100644 tools/src/test/java/org/apache/kafka/tools/PushHttpMetricsReporterTest.java diff --git a/.travis.yml b/.travis.yml index 9be5c58d9e9..8a22c9bc4cd 100644 --- a/.travis.yml +++ b/.travis.yml @@ -39,7 +39,7 @@ before_install: script: - ./gradlew rat - - ./gradlew releaseTarGz && /bin/bash ./tests/docker/run_tests.sh + - ./gradlew systemTestLibs && /bin/bash ./tests/docker/run_tests.sh services: - docker diff --git a/build.gradle b/build.gradle index 3d70bcfc92f..ce4b4e44cb2 100644 --- a/build.gradle +++ b/build.gradle @@ -807,6 +807,10 @@ project(':tools') { testCompile project(':clients') testCompile libs.junit + testCompile project(':clients').sourceSets.test.output + testCompile libs.easymock + testCompile libs.powermock + testCompile libs.powermockEasymock } javadoc { diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java index 1cd5b243339..5d2b1a51783 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java @@ -28,7 +28,8 @@ public final class KafkaMetric implements Metric { private final Measurable measurable; private MetricConfig config; - KafkaMetric(Object lock, MetricName metricName, Measurable measurable, MetricConfig config, Time time) { + // public for testing + public KafkaMetric(Object lock, MetricName metricName, Measurable measurable, MetricConfig config, Time time) { super(); this.metricName = metricName; this.lock = lock; diff --git a/tests/README.md b/tests/README.md index ec6ab31a792..469522fe39e 100644 --- a/tests/README.md +++ b/tests/README.md @@ -10,9 +10,8 @@ Running tests using docker -------------------------- Docker containers can be used for running kafka system tests locally. * Requirements - - Docker 1.12.3 is installed and running on the machine. - - Test require a single kafka_*SNAPSHOT.tgz to be present in core/build/distributions. - This can be done by running ./gradlew clean releaseTarGz + - Docker 1.12.3 (or higher) is installed and running on the machine. + - Test require that Kafka, including system test libs, is built. This can be done by running ./gradlew clean systemTestLibs * Run all tests ``` bash tests/docker/run_tests.sh @@ -80,7 +79,7 @@ This produces a json about the build which looks like: ], "before_install": null, "script": [ - "./gradlew releaseTarGz && /bin/bash ./tests/travis/run_tests.sh" + "./gradlew systemTestLibs && /bin/bash ./tests/travis/run_tests.sh" ], "services": [ "docker" @@ -128,7 +127,7 @@ This produces a json about the build which looks like: "jdk": "oraclejdk8", "before_install": null, "script": [ - "./gradlew releaseTarGz && /bin/bash ./tests/travis/run_tests.sh" + "./gradlew systemTestLibs && /bin/bash ./tests/travis/run_tests.sh" ], "services": [ "docker" @@ -165,7 +164,7 @@ This produces a json about the build which looks like: "jdk": "oraclejdk8", "before_install": null, "script": [ - "./gradlew releaseTarGz && /bin/bash ./tests/travis/run_tests.sh" + "./gradlew systemTestLibs && /bin/bash ./tests/travis/run_tests.sh" ], "services": [ "docker" @@ -215,7 +214,7 @@ The resulting json looks like: "jdk": "oraclejdk8", "before_install": null, "script": [ - "./gradlew releaseTarGz && /bin/bash ./tests/travis/run_tests.sh" + "./gradlew systemTestLibs && /bin/bash ./tests/travis/run_tests.sh" ], "services": [ "docker" @@ -252,7 +251,7 @@ The resulting json looks like: "jdk": "oraclejdk8", "before_install": null, "script": [ - "./gradlew releaseTarGz && /bin/bash ./tests/travis/run_tests.sh" + "./gradlew systemTestLibs && /bin/bash ./tests/travis/run_tests.sh" ], "services": [ "docker" diff --git a/tests/docker/ducker-ak b/tests/docker/ducker-ak index 1f061891481..7522473f19b 100755 --- a/tests/docker/ducker-ak +++ b/tests/docker/ducker-ak @@ -372,8 +372,7 @@ ducker_test() { fi done must_pushd "${kafka_dir}" - ls ./core/build/distributions/kafka_*.tgz &> /dev/null - [[ $? -eq 0 ]] || die "Failed to find core/build/distributions/kafka_*.tgz. Did you run ./gradlew releaseTarGz?" + (test -f ./gradlew || gradle) && ./gradlew systemTestLibs must_popd cmd="cd /opt/kafka-dev && ducktape --cluster-file /opt/kafka-dev/tests/docker/build/cluster.json $args" echo "docker exec -it ducker01 bash -c \"${cmd}\"" diff --git a/tests/kafkatest/directory_layout/kafka_path.py b/tests/kafkatest/directory_layout/kafka_path.py index ece8be58214..40dda228685 100644 --- a/tests/kafkatest/directory_layout/kafka_path.py +++ b/tests/kafkatest/directory_layout/kafka_path.py @@ -106,25 +106,25 @@ class KafkaSystemTestPathResolver(object): self.context = context self.project = project - def home(self, node_or_version=DEV_BRANCH): + def home(self, node_or_version=DEV_BRANCH, project=None): version = self._version(node_or_version) - home_dir = self.project + home_dir = project or self.project if version is not None: home_dir += "-%s" % str(version) return os.path.join(KAFKA_INSTALL_ROOT, home_dir) - def bin(self, node_or_version=DEV_BRANCH): + def bin(self, node_or_version=DEV_BRANCH, project=None): version = self._version(node_or_version) - return os.path.join(self.home(version), "bin") + return os.path.join(self.home(version, project=project), "bin") - def script(self, script_name, node_or_version=DEV_BRANCH): + def script(self, script_name, node_or_version=DEV_BRANCH, project=None): version = self._version(node_or_version) - return os.path.join(self.bin(version), script_name) + return os.path.join(self.bin(version, project=project), script_name) - def jar(self, jar_name, node_or_version=DEV_BRANCH): + def jar(self, jar_name, node_or_version=DEV_BRANCH, project=None): version = self._version(node_or_version) - return os.path.join(self.home(version), JARS[str(version)][jar_name]) + return os.path.join(self.home(version, project=project), JARS[str(version)][jar_name]) def scratch_space(self, service_instance): return os.path.join(SCRATCH_ROOT, service_instance.service_id) diff --git a/tests/kafkatest/services/monitor/http.py b/tests/kafkatest/services/monitor/http.py new file mode 100644 index 00000000000..83324dfd4a1 --- /dev/null +++ b/tests/kafkatest/services/monitor/http.py @@ -0,0 +1,226 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer +from collections import defaultdict, namedtuple +import json +from threading import Thread +from select import select +import socket + +MetricKey = namedtuple('MetricKey', ['host', 'client_id', 'name', 'group', 'tags']) +MetricValue = namedtuple('MetricValue', ['time', 'value']) + +# Python's logging library doesn't define anything more detailed than DEBUG, but we'd like a finer-grained setting for +# for highly detailed messages, e.g. logging every single incoming request. +TRACE = 5 + + +class HttpMetricsCollector(object): + """ + HttpMetricsCollector enables collection of metrics from various Kafka clients instrumented with the + PushHttpMetricsReporter. It starts a web server locally and provides the necessary configuration for clients + to automatically report metrics data to this server. It also provides basic functionality for querying the + recorded metrics. This class can be used either as a mixin or standalone object. + """ + + # The port to listen on on the worker node, which will be forwarded to the port listening on this driver node + REMOTE_PORT = 6789 + + def __init__(self, **kwargs): + """ + Create a new HttpMetricsCollector + :param period the period, in seconds, between updates that the metrics reporter configuration should define. + defaults to reporting once per second + :param args: + :param kwargs: + """ + self._http_metrics_period = kwargs.pop('period', 1) + + super(HttpMetricsCollector, self).__init__(**kwargs) + + # TODO: currently we maintain just a simple map from all key info -> value. However, some key fields are far + # more common to filter on, so we'd want to index by them, e.g. host, client.id, metric name. + self._http_metrics = defaultdict(list) + + self._httpd = HTTPServer(('', 0), _MetricsReceiver) + self._httpd.parent = self + self._httpd.metrics = self._http_metrics + + self._http_metrics_thread = Thread(target=self._run_http_metrics_httpd, + name='http-metrics-thread[%s]' % str(self)) + self._http_metrics_thread.start() + + self._forwarders = {} + + @property + def http_metrics_url(self): + """ + :return: the URL to use when reporting metrics + """ + return "http://%s:%d" % ("localhost", self.REMOTE_PORT) + + @property + def http_metrics_client_configs(self): + """ + Get client configurations that can be used to report data to this collector. Put these in a properties file for + clients (e.g. console producer or consumer) to have them push metrics to this driver. Note that in some cases + (e.g. streams, connect) these settings may need to be prefixed. + :return: a dictionary of client configurations that will direct a client to report metrics to this collector + """ + return { + "metric.reporters": "org.apache.kafka.tools.PushHttpMetricsReporter", + "metrics.url": self.http_metrics_url, + "metrics.period": self._http_metrics_period, + } + + def start_node(self, node): + local_port = self._httpd.socket.getsockname()[1] + self.logger.debug('HttpMetricsCollector listening on %s', local_port) + self._forwarders[self.idx(node)] = _ReverseForwarder(self.logger, node, self.REMOTE_PORT, local_port) + + super(HttpMetricsCollector, self).start_node(node) + + def stop(self): + super(HttpMetricsCollector, self).stop() + + if self._http_metrics_thread: + self.logger.debug("Shutting down metrics httpd") + self._httpd.shutdown() + self._http_metrics_thread.join() + self.logger.debug("Finished shutting down metrics httpd") + + def stop_node(self, node): + super(HttpMetricsCollector, self).stop_node(node) + + idx = self.idx(node) + self._forwarders[idx].stop() + del self._forwarders[idx] + + def metrics(self, host=None, client_id=None, name=None, group=None, tags=None): + """ + Get any collected metrics that match the specified parameters, yielding each as a tuple of + (key, [, ...]) values. + """ + for k, values in self._http_metrics.iteritems(): + if ((host is None or host == k.host) and + (client_id is None or client_id == k.client_id) and + (name is None or name == k.name) and + (group is None or group == k.group) and + (tags is None or tags == k.tags)): + yield (k, values) + + def _run_http_metrics_httpd(self): + self._httpd.serve_forever() + + +class _MetricsReceiver(BaseHTTPRequestHandler): + """ + HTTP request handler that accepts requests from the PushHttpMetricsReporter and stores them back into the parent + HttpMetricsCollector + """ + + def log_message(self, format, *args, **kwargs): + # Don't do any logging here so we get rid of the mostly useless per-request Apache log-style info that spams + # the debug log + pass + + def do_POST(self): + data = self.rfile.read(int(self.headers['Content-Length'])) + data = json.loads(data) + self.server.parent.logger.log(TRACE, "POST %s\n\n%s\n%s", self.path, self.headers, + json.dumps(data, indent=4, separators=(',', ': '))) + self.send_response(204) + self.end_headers() + + client = data['client'] + host = client['host'] + client_id = client['client_id'] + ts = client['time'] + metrics = data['metrics'] + for raw_metric in metrics: + name = raw_metric['name'] + group = raw_metric['group'] + # Convert to tuple of pairs because dicts & lists are unhashable + tags = tuple([(k, v) for k, v in raw_metric['tags'].iteritems()]), + value = raw_metric['value'] + + key = MetricKey(host=host, client_id=client_id, name=name, group=group, tags=tags) + metric_value = MetricValue(time=ts, value=value) + + self.server.metrics[key].append(metric_value) + + +class _ReverseForwarder(object): + """ + Runs reverse forwarding of a port on a node to a local port. This allows you to setup a server on the test driver + that only assumes we have basic SSH access that ducktape guarantees is available for worker nodes. + """ + + def __init__(self, logger, node, remote_port, local_port): + self.logger = logger + self._node = node + self._local_port = local_port + + self.logger.debug('Forwarding %s port %d to driver port %d', node, remote_port, local_port) + + self._stopping = False + + self._transport = node.account.ssh_client.get_transport() + self._transport.request_port_forward('', remote_port) + + self._accept_thread = Thread(target=self._accept) + self._accept_thread.start() + + def stop(self): + self._stopping = True + self._accept_thread.join(30) + if self._accept_thread.isAlive(): + raise RuntimeError("Failed to stop reverse forwarder on %s", self._node) + + def _accept(self): + while not self._stopping: + chan = self._transport.accept(1) + if chan is None: + continue + thr = Thread(target=self._handler, args=(chan,)) + thr.setDaemon(True) + thr.start() + + def _handler(self, chan): + sock = socket.socket() + try: + sock.connect(("localhost", self._local_port)) + except Exception as e: + self.logger.error('Forwarding request to port %d failed: %r', self._local_port, e) + return + + self.logger.log(TRACE, 'Connected! Tunnel open %r -> %r -> %d', chan.origin_addr, chan.getpeername(), + self._local_port) + while True: + r, w, x = select([sock, chan], [], []) + if sock in r: + data = sock.recv(1024) + if len(data) == 0: + break + chan.send(data) + if chan in r: + data = chan.recv(1024) + if len(data) == 0: + break + sock.send(data) + chan.close() + sock.close() + self.logger.log(TRACE, 'Tunnel closed from %r', chan.origin_addr) diff --git a/tests/kafkatest/services/performance/performance.py b/tests/kafkatest/services/performance/performance.py index d6d4f14d4e0..32b15c6124f 100644 --- a/tests/kafkatest/services/performance/performance.py +++ b/tests/kafkatest/services/performance/performance.py @@ -19,7 +19,7 @@ from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin class PerformanceService(KafkaPathResolverMixin, BackgroundThreadService): - def __init__(self, context, num_nodes, stop_timeout_sec=30): + def __init__(self, context=None, num_nodes=0, stop_timeout_sec=30): super(PerformanceService, self).__init__(context, num_nodes) self.results = [None] * self.num_nodes self.stats = [[] for x in range(self.num_nodes)] diff --git a/tests/kafkatest/services/performance/producer_performance.py b/tests/kafkatest/services/performance/producer_performance.py index ff92da86bbf..18790a703e3 100644 --- a/tests/kafkatest/services/performance/producer_performance.py +++ b/tests/kafkatest/services/performance/producer_performance.py @@ -18,14 +18,14 @@ import time from ducktape.utils.util import wait_until from ducktape.cluster.remoteaccount import RemoteCommandError -from kafkatest.directory_layout.kafka_path import TOOLS_JAR_NAME, TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME -from kafkatest.services.monitor.jmx import JmxMixin +from kafkatest.directory_layout.kafka_path import TOOLS_JAR_NAME, TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME +from kafkatest.services.monitor.http import HttpMetricsCollector from kafkatest.services.performance import PerformanceService from kafkatest.services.security.security_config import SecurityConfig from kafkatest.version import DEV_BRANCH, V_0_9_0_0 -class ProducerPerformanceService(JmxMixin, PerformanceService): +class ProducerPerformanceService(HttpMetricsCollector, PerformanceService): PERSISTENT_ROOT = "/mnt/producer_performance" STDOUT_CAPTURE = os.path.join(PERSISTENT_ROOT, "producer_performance.stdout") @@ -35,10 +35,9 @@ class ProducerPerformanceService(JmxMixin, PerformanceService): LOG4J_CONFIG = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties") def __init__(self, context, num_nodes, kafka, topic, num_records, record_size, throughput, version=DEV_BRANCH, settings=None, - intermediate_stats=False, client_id="producer-performance", jmx_object_names=None, jmx_attributes=None): + intermediate_stats=False, client_id="producer-performance"): - JmxMixin.__init__(self, num_nodes, jmx_object_names, jmx_attributes or []) - PerformanceService.__init__(self, context, num_nodes) + super(ProducerPerformanceService, self).__init__(context=context, num_nodes=num_nodes) self.logs = { "producer_performance_stdout": { @@ -49,12 +48,7 @@ class ProducerPerformanceService(JmxMixin, PerformanceService): "collect_default": True}, "producer_performance_log": { "path": ProducerPerformanceService.LOG_FILE, - "collect_default": True}, - "jmx_log": { - "path": "/mnt/jmx_tool.log", - "collect_default": jmx_object_names is not None - } - + "collect_default": True} } self.kafka = kafka @@ -82,9 +76,9 @@ class ProducerPerformanceService(JmxMixin, PerformanceService): args = self.args.copy() args.update({ 'bootstrap_servers': self.kafka.bootstrap_servers(self.security_config.security_protocol), - 'jmx_port': self.jmx_port, 'client_id': self.client_id, - 'kafka_run_class': self.path.script("kafka-run-class.sh", node) + 'kafka_run_class': self.path.script("kafka-run-class.sh", node), + 'metrics_props': ' '.join(["%s=%s" % (k, v) for k, v in self.http_metrics_client_configs.iteritems()]) }) cmd = "" @@ -94,14 +88,15 @@ class ProducerPerformanceService(JmxMixin, PerformanceService): # tool from the development branch tools_jar = self.path.jar(TOOLS_JAR_NAME, DEV_BRANCH) tools_dependant_libs_jar = self.path.jar(TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME, DEV_BRANCH) + tools_dependant_libs_jar = self.path.jar(TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME, DEV_BRANCH) - cmd += "for file in %s; do CLASSPATH=$CLASSPATH:$file; done; " % tools_jar - cmd += "for file in %s; do CLASSPATH=$CLASSPATH:$file; done; " % tools_dependant_libs_jar + for jar in (tools_jar, tools_dependant_libs_jar): + cmd += "for file in %s; do CLASSPATH=$CLASSPATH:$file; done; " % jar cmd += "export CLASSPATH; " cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % ProducerPerformanceService.LOG4J_CONFIG - cmd += "JMX_PORT=%(jmx_port)d KAFKA_OPTS=%(kafka_opts)s KAFKA_HEAP_OPTS=\"-XX:+HeapDumpOnOutOfMemoryError\" %(kafka_run_class)s org.apache.kafka.tools.ProducerPerformance " \ - "--topic %(topic)s --num-records %(num_records)d --record-size %(record_size)d --throughput %(throughput)d --producer-props bootstrap.servers=%(bootstrap_servers)s client.id=%(client_id)s" % args + cmd += "KAFKA_OPTS=%(kafka_opts)s KAFKA_HEAP_OPTS=\"-XX:+HeapDumpOnOutOfMemoryError\" %(kafka_run_class)s org.apache.kafka.tools.ProducerPerformance " \ + "--topic %(topic)s --num-records %(num_records)d --record-size %(record_size)d --throughput %(throughput)d --producer-props bootstrap.servers=%(bootstrap_servers)s client.id=%(client_id)s %(metrics_props)s" % args self.security_config.setup_node(node) if self.security_config.security_protocol != SecurityConfig.PLAINTEXT: @@ -125,7 +120,6 @@ class ProducerPerformanceService(JmxMixin, PerformanceService): return len(self.pids(node)) > 0 def _worker(self, idx, node): - node.account.ssh("mkdir -p %s" % ProducerPerformanceService.PERSISTENT_ROOT, allow_fail=False) # Create and upload log properties @@ -144,13 +138,10 @@ class ProducerPerformanceService(JmxMixin, PerformanceService): if first_line is None: raise Exception("No output from ProducerPerformance") - self.start_jmx_tool(idx, node) wait_until(lambda: not self.alive(node), timeout_sec=1200, backoff_sec=2, err_msg="ProducerPerformance failed to finish") elapsed = time.time() - start self.logger.debug("ProducerPerformance process ran for %s seconds" % elapsed) - self.read_jmx_output(idx, node) - # parse producer output from file last = None producer_output = node.account.ssh_capture("cat %s" % ProducerPerformanceService.STDOUT_CAPTURE) diff --git a/tests/kafkatest/tests/client/quota_test.py b/tests/kafkatest/tests/client/quota_test.py index ae86c2861f6..47a6a96b020 100644 --- a/tests/kafkatest/tests/client/quota_test.py +++ b/tests/kafkatest/tests/client/quota_test.py @@ -136,8 +136,7 @@ class QuotaTest(Test): # Produce all messages producer = ProducerPerformanceService( self.test_context, producer_num, self.kafka, - topic=self.topic, num_records=self.num_records, record_size=self.record_size, throughput=-1, client_id=producer_client_id, - jmx_object_names=['kafka.producer:type=producer-metrics,client-id=%s' % producer_client_id], jmx_attributes=['outgoing-byte-rate']) + topic=self.topic, num_records=self.num_records, record_size=self.record_size, throughput=-1, client_id=producer_client_id) producer.run() @@ -178,8 +177,9 @@ class QuotaTest(Test): msg += "number of produced messages %d doesn't equal number of consumed messages %d" % (produced_num, consumed_num) # validate that maximum_producer_throughput <= producer_quota * (1 + maximum_client_deviation_percentage/100) - producer_attribute_name = 'kafka.producer:type=producer-metrics,client-id=%s:outgoing-byte-rate' % producer.client_id - producer_maximum_bps = producer.maximum_jmx_value[producer_attribute_name] + producer_maximum_bps = max( + metric.value for k, metrics in producer.metrics(group='producer-metrics', name='outgoing-byte-rate', client_id=producer.client_id) for metric in metrics + ) producer_quota_bps = self.quota_config.producer_quota self.logger.info('producer has maximum throughput %.2f bps with producer quota %.2f bps' % (producer_maximum_bps, producer_quota_bps)) if producer_maximum_bps > producer_quota_bps*(self.maximum_client_deviation_percentage/100+1): diff --git a/tests/kafkatest/tests/core/throttling_test.py b/tests/kafkatest/tests/core/throttling_test.py index 94a40106d0a..586bac9c634 100644 --- a/tests/kafkatest/tests/core/throttling_test.py +++ b/tests/kafkatest/tests/core/throttling_test.py @@ -150,9 +150,7 @@ class ThrottlingTest(ProduceConsumeValidateTest): bulk_producer = ProducerPerformanceService( context=self.test_context, num_nodes=1, kafka=self.kafka, topic=self.topic, num_records=self.num_records, - record_size=self.record_size, throughput=-1, client_id=producer_id, - jmx_object_names=['kafka.producer:type=producer-metrics,client-id=%s' % producer_id], - jmx_attributes=['outgoing-byte-rate']) + record_size=self.record_size, throughput=-1, client_id=producer_id) self.producer = VerifiableProducer(context=self.test_context, @@ -173,3 +171,9 @@ class ThrottlingTest(ProduceConsumeValidateTest): bulk_producer.run() self.run_produce_consume_validate(core_test_action= lambda: self.reassign_partitions(bounce_brokers, self.throttle)) + + self.logger.debug("Bulk producer outgoing-byte-rates: %s", + (metric.value for k, metrics in + bulk_producer.metrics(group='producer-metrics', name='outgoing-byte-rate', client_id=producer_id) for + metric in metrics) + ) \ No newline at end of file diff --git a/tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java b/tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java new file mode 100644 index 00000000000..c5764b49cca --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.common.utils.SystemTime; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.HttpURLConnection; +import java.net.InetAddress; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.UnknownHostException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Scanner; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * MetricsReporter that aggregates metrics data and reports it via HTTP requests to a configurable + * webhook endpoint in JSON format. + * + * This is an internal class used for system tests and does not provide any compatibility guarantees. + */ +public class PushHttpMetricsReporter implements MetricsReporter { + private static final Logger log = LoggerFactory.getLogger(PushHttpMetricsReporter.class); + + private static final String METRICS_PREFIX = "metrics."; + static final String METRICS_URL_CONFIG = METRICS_PREFIX + "url"; + static final String METRICS_PERIOD_CONFIG = METRICS_PREFIX + "period"; + static final String METRICS_HOST_CONFIG = METRICS_PREFIX + "host"; + static final String CLIENT_ID_CONFIG = ProducerConfig.CLIENT_ID_CONFIG; + + private static final Map HEADERS = new LinkedHashMap<>(); + static { + HEADERS.put("Content-Type", "application/json"); + } + + private final Object lock = new Object(); + private final Time time; + private final ScheduledExecutorService executor; + // The set of metrics are updated in init/metricChange/metricRemoval + private final Map metrics = new LinkedHashMap<>(); + private final ObjectMapper json = new ObjectMapper(); + + // Non-final because these are set via configure() + private URL url; + private String host; + private String clientId; + + private static final ConfigDef CONFIG_DEF = new ConfigDef() + .define(METRICS_URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, + "The URL to report metrics to") + .define(METRICS_PERIOD_CONFIG, ConfigDef.Type.INT, ConfigDef.Importance.HIGH, + "The frequency at which metrics should be reported, in second") + .define(METRICS_HOST_CONFIG, ConfigDef.Type.STRING, "", ConfigDef.Importance.LOW, + "The hostname to report with each metric; if empty, defaults to the FQDN that can be automatically" + + "determined") + .define(CLIENT_ID_CONFIG, ConfigDef.Type.STRING, "", ConfigDef.Importance.LOW, + "Client ID to identify the application, generally inherited from the " + + "producer/consumer/streams/connect instance"); + + public PushHttpMetricsReporter() { + time = new SystemTime(); + executor = Executors.newSingleThreadScheduledExecutor(); + } + + PushHttpMetricsReporter(Time mockTime, ScheduledExecutorService mockExecutor) { + time = mockTime; + executor = mockExecutor; + } + + @Override + public void configure(Map configs) { + PushHttpMetricsReporterConfig config = new PushHttpMetricsReporterConfig(CONFIG_DEF, configs); + try { + url = new URL(config.getString(METRICS_URL_CONFIG)); + } catch (MalformedURLException e) { + throw new ConfigException("Malformed metrics.url", e); + } + int period = config.getInteger(METRICS_PERIOD_CONFIG); + clientId = config.getString(CLIENT_ID_CONFIG); + + host = config.getString(METRICS_HOST_CONFIG); + if (host == null || host.isEmpty()) { + try { + host = InetAddress.getLocalHost().getCanonicalHostName(); + } catch (UnknownHostException e) { + throw new ConfigException("Failed to get canonical hostname", e); + } + } + + executor.scheduleAtFixedRate(new HttpReporter(), period, period, TimeUnit.SECONDS); + + log.info("Configured PushHttpMetricsReporter for {} to report every {} seconds", url, period); + } + + @Override + public void init(List initMetrics) { + synchronized (lock) { + for (KafkaMetric metric : initMetrics) { + log.debug("Adding metric {}", metric.metricName()); + metrics.put(metric.metricName(), metric); + } + } + } + + @Override + public void metricChange(KafkaMetric metric) { + synchronized (lock) { + log.debug("Updating metric {}", metric.metricName()); + metrics.put(metric.metricName(), metric); + } + } + + @Override + public void metricRemoval(KafkaMetric metric) { + synchronized (lock) { + log.debug("Removing metric {}", metric.metricName()); + metrics.remove(metric.metricName()); + } + } + + @Override + public void close() { + executor.shutdown(); + try { + executor.awaitTermination(30, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new KafkaException("Interrupted when shutting down PushHttpMetricsReporter", e); + } + } + + private class HttpReporter implements Runnable { + @Override + public void run() { + long now = time.milliseconds(); + final List samples; + synchronized (lock) { + samples = new ArrayList<>(metrics.size()); + for (KafkaMetric metric : metrics.values()) { + MetricName name = metric.metricName(); + double value = metric.value(); + samples.add(new MetricValue(name.name(), name.group(), name.tags(), value)); + } + } + + MetricsReport report = new MetricsReport(new MetricClientInfo(host, clientId, now), samples); + + log.trace("Reporting {} metrics to {}", samples.size(), url); + HttpURLConnection connection = null; + try { + connection = newHttpConnection(url); + connection.setRequestMethod("POST"); + // connection.getResponseCode() implicitly calls getInputStream, so always set to true. + // On the other hand, leaving this out breaks nothing. + connection.setDoInput(true); + connection.setRequestProperty("Content-Type", "application/json"); + byte[] data = json.writeValueAsBytes(report); + connection.setRequestProperty("Content-Length", Integer.toString(data.length)); + connection.setRequestProperty("Accept", "*/*"); + connection.setUseCaches(false); + + connection.setDoOutput(true); + + try (OutputStream os = connection.getOutputStream()) { + os.write(data); + os.flush(); + } + + int responseCode = connection.getResponseCode(); + if (responseCode >= 400) { + InputStream is = connection.getErrorStream(); + String msg = readResponse(is); + log.error("Error reporting metrics, {}: {}", responseCode, msg); + } else if (responseCode >= 300) { + log.error("PushHttpMetricsReporter does not currently support redirects, saw {}", responseCode); + } else { + log.info("Finished reporting metrics with response code {}", responseCode); + } + } catch (Exception e) { + log.error("Error reporting metrics", e); + throw new KafkaException("Failed to report current metrics", e); + } finally { + if (connection != null) { + connection.disconnect(); + } + } + } + } + + // Static package-private so unit tests can use a mock connection + static HttpURLConnection newHttpConnection(URL url) throws IOException { + return (HttpURLConnection) url.openConnection(); + } + + // Static package-private so unit tests can mock reading response + static String readResponse(InputStream is) { + try (Scanner s = new Scanner(is, StandardCharsets.UTF_8.name()).useDelimiter("\\A")) { + return s.hasNext() ? s.next() : ""; + } + } + + private static class MetricsReport { + private final MetricClientInfo client; + private final Collection metrics; + + MetricsReport(MetricClientInfo client, Collection metrics) { + this.client = client; + this.metrics = metrics; + } + + @JsonProperty + public MetricClientInfo client() { + return client; + } + + @JsonProperty + public Collection metrics() { + return metrics; + } + } + + private static class MetricClientInfo { + private final String host; + private final String clientId; + private final long time; + + MetricClientInfo(String host, String clientId, long time) { + this.host = host; + this.clientId = clientId; + this.time = time; + } + + @JsonProperty + public String host() { + return host; + } + + @JsonProperty("client_id") + public String clientId() { + return clientId; + } + + @JsonProperty + public long time() { + return time; + } + } + + private static class MetricValue { + + private final String name; + private final String group; + private final Map tags; + private final Object value; + + MetricValue(String name, String group, Map tags, Object value) { + this.name = name; + this.group = group; + this.tags = tags; + this.value = value; + } + + @JsonProperty + public String name() { + return name; + } + + @JsonProperty + public String group() { + return group; + } + + @JsonProperty + public Map tags() { + return tags; + } + + @JsonProperty + public Object value() { + return value; + } + } + + // The signature for getInt changed from returning int to Integer so to remain compatible with 0.8.2.2 jars + // for system tests we replace it with a custom version that works for all versions. + private static class PushHttpMetricsReporterConfig extends AbstractConfig { + public PushHttpMetricsReporterConfig(ConfigDef definition, Map originals) { + super(definition, originals); + } + + public Integer getInteger(String key) { + return (Integer) get(key); + } + + } +} diff --git a/tools/src/test/java/org/apache/kafka/tools/PushHttpMetricsReporterTest.java b/tools/src/test/java/org/apache/kafka/tools/PushHttpMetricsReporterTest.java new file mode 100644 index 00000000000..1cd37993b38 --- /dev/null +++ b/tools/src/test/java/org/apache/kafka/tools/PushHttpMetricsReporterTest.java @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.Measurable; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.easymock.Capture; +import org.easymock.EasyMock; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.easymock.PowerMock; +import org.powermock.api.easymock.annotation.MockStrict; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.io.InputStream; +import java.io.OutputStream; +import java.net.HttpURLConnection; +import java.net.InetAddress; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.UnknownHostException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.powermock.api.easymock.PowerMock.replayAll; +import static org.powermock.api.easymock.PowerMock.verifyAll; + +@RunWith(PowerMockRunner.class) +@PrepareForTest(PushHttpMetricsReporter.class) +public class PushHttpMetricsReporterTest { + + private static final URL URL; + static { + try { + URL = new URL("http://fake:80"); + } catch (MalformedURLException e) { + throw new RuntimeException(e); + } + } + private PushHttpMetricsReporter reporter; + private Time time = new MockTime(); + @MockStrict + private ScheduledExecutorService executor; + private Capture reportRunnable = EasyMock.newCapture(); + @MockStrict + private HttpURLConnection httpReq; + @MockStrict + private OutputStream httpOut; + private Capture httpPayload = EasyMock.newCapture(); + @MockStrict + private InputStream httpErr; + + @Before + public void setUp() { + reporter = new PushHttpMetricsReporter(time, executor); + PowerMock.mockStatic(PushHttpMetricsReporter.class); + } + + @Test + public void testConfigureClose() throws Exception { + expectConfigure(); + expectClose(); + + replayAll(); + + configure(); + reporter.close(); + + verifyAll(); + } + + @Test(expected = ConfigException.class) + public void testConfigureBadUrl() throws Exception { + Map config = new HashMap<>(); + config.put(PushHttpMetricsReporter.METRICS_URL_CONFIG, "malformed;url"); + config.put(PushHttpMetricsReporter.METRICS_PERIOD_CONFIG, "5"); + reporter.configure(config); + } + + @Test(expected = ConfigException.class) + public void testConfigureMissingPeriod() throws Exception { + Map config = new HashMap<>(); + config.put(PushHttpMetricsReporter.METRICS_URL_CONFIG, URL.toString()); + reporter.configure(config); + } + + @Test + public void testNoMetrics() throws Exception { + expectConfigure(); + expectRequest(200); + expectClose(); + + replayAll(); + + configure(); + reportRunnable.getValue().run(); + JsonNode payload = new ObjectMapper().readTree(httpPayload.getValue()); + assertTrue(payload.isObject()); + + assertPayloadHasClientInfo(payload); + + // Should contain an empty list of metrics, i.e. we report updates even if there are no metrics to report to + // indicate liveness + JsonNode metrics = payload.get("metrics"); + assertTrue(metrics.isArray()); + assertEquals(0, metrics.size()); + + reporter.close(); + + verifyAll(); + } + + // For error conditions, we expect them to come with a response body that we can read & log + @Test + public void testClientError() throws Exception { + expectConfigure(); + expectRequest(400, true); + expectClose(); + + replayAll(); + + configure(); + reportRunnable.getValue().run(); + + reporter.close(); + + verifyAll(); + } + + @Test + public void testServerError() throws Exception { + expectConfigure(); + expectRequest(500, true); + expectClose(); + + replayAll(); + + configure(); + reportRunnable.getValue().run(); + + reporter.close(); + + verifyAll(); + } + + @Test + public void testMetricValues() throws Exception { + expectConfigure(); + expectRequest(200); + expectClose(); + + replayAll(); + + configure(); + KafkaMetric metric1 = new KafkaMetric( + new Object(), + new MetricName("name1", "group1", "desc1", Collections.singletonMap("key1", "value1")), + new ImmutableValue(1.0), + null, + time + ); + KafkaMetric newMetric1 = new KafkaMetric( + new Object(), + new MetricName("name1", "group1", "desc1", Collections.singletonMap("key1", "value1")), + new ImmutableValue(-1.0), + null, + time + ); + KafkaMetric metric2 = new KafkaMetric( + new Object(), + new MetricName("name2", "group2", "desc2", Collections.singletonMap("key2", "value2")), + new ImmutableValue(2.0), + null, + time + ); + KafkaMetric metric3 = new KafkaMetric( + new Object(), + new MetricName("name3", "group3", "desc3", Collections.singletonMap("key3", "value3")), + new ImmutableValue(3.0), + null, + time + ); + reporter.init(Arrays.asList(metric1, metric2)); + reporter.metricChange(newMetric1); // added in init, modified + reporter.metricChange(metric3); // added by change + reporter.metricRemoval(metric2); // added in init, deleted by removal + + reportRunnable.getValue().run(); + JsonNode payload = new ObjectMapper().readTree(httpPayload.getValue()); + assertTrue(payload.isObject()); + assertPayloadHasClientInfo(payload); + + // We should be left with the modified version of metric1 and metric3 + JsonNode metrics = payload.get("metrics"); + assertTrue(metrics.isArray()); + assertEquals(2, metrics.size()); + + JsonNode m1 = metrics.get(0); + assertEquals("name1", m1.get("name").textValue()); + assertEquals("group1", m1.get("group").textValue()); + JsonNode m1Tags = m1.get("tags"); + assertTrue(m1Tags.isObject()); + assertEquals(1, m1Tags.size()); + assertEquals("value1", m1Tags.get("key1").textValue()); + assertEquals(-1.0, m1.get("value").doubleValue(), 0.0); + + JsonNode m3 = metrics.get(1); + assertEquals("name3", m3.get("name").textValue()); + assertEquals("group3", m3.get("group").textValue()); + JsonNode m3Tags = m3.get("tags"); + assertTrue(m3Tags.isObject()); + assertEquals(1, m3Tags.size()); + assertEquals("value3", m3Tags.get("key3").textValue()); + assertEquals(3.0, m3.get("value").doubleValue(), 0.0); + + reporter.close(); + + verifyAll(); + } + + private void expectConfigure() { + EasyMock.expect( + executor.scheduleAtFixedRate(EasyMock.capture(reportRunnable), EasyMock.eq(5L), EasyMock.eq(5L), EasyMock.eq(TimeUnit.SECONDS)) + ).andReturn(null); // return value not expected to be used + } + + private void configure() { + Map config = new HashMap<>(); + config.put(PushHttpMetricsReporter.METRICS_URL_CONFIG, URL.toString()); + config.put(PushHttpMetricsReporter.METRICS_PERIOD_CONFIG, "5"); + reporter.configure(config); + } + + private void expectRequest(int returnStatus) throws Exception { + expectRequest(returnStatus, false); + } + + // Expect that a request is made with the given response code + private void expectRequest(int returnStatus, boolean readResponse) throws Exception { + EasyMock.expect(PushHttpMetricsReporter.newHttpConnection(URL)).andReturn(httpReq); + httpReq.setRequestMethod("POST"); + EasyMock.expectLastCall(); + httpReq.setDoInput(true); + EasyMock.expectLastCall(); + httpReq.setRequestProperty("Content-Type", "application/json"); + EasyMock.expectLastCall(); + httpReq.setRequestProperty(EasyMock.eq("Content-Length"), EasyMock.anyString()); + EasyMock.expectLastCall(); + httpReq.setRequestProperty("Accept", "*/*"); + EasyMock.expectLastCall(); + httpReq.setUseCaches(false); + EasyMock.expectLastCall(); + httpReq.setDoOutput(true); + EasyMock.expectLastCall(); + EasyMock.expect(httpReq.getOutputStream()).andReturn(httpOut); + httpOut.write(EasyMock.capture(httpPayload)); + EasyMock.expectLastCall(); + httpOut.flush(); + EasyMock.expectLastCall(); + httpOut.close(); + EasyMock.expectLastCall(); + + EasyMock.expect(httpReq.getResponseCode()).andReturn(returnStatus); + + if (readResponse) + expectReadResponse(); + + httpReq.disconnect(); + EasyMock.expectLastCall(); + } + + private void assertPayloadHasClientInfo(JsonNode payload) throws UnknownHostException { + // Should contain client info... + JsonNode client = payload.get("client"); + assertTrue(client.isObject()); + assertEquals(InetAddress.getLocalHost().getCanonicalHostName(), client.get("host").textValue()); + assertEquals("", client.get("client_id").textValue()); + assertEquals(time.milliseconds(), client.get("time").longValue()); + } + + private void expectReadResponse() throws Exception { + EasyMock.expect(httpReq.getErrorStream()).andReturn(httpErr); + EasyMock.expect(PushHttpMetricsReporter.readResponse(httpErr)).andReturn("error response message"); + EasyMock.expectLastCall(); + } + + private void expectClose() throws Exception { + executor.shutdown(); + EasyMock.expect(executor.awaitTermination(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class))).andReturn(true); + } + + private static class ImmutableValue implements Measurable { + private final double value; + + public ImmutableValue(double value) { + this.value = value; + } + + @Override + public double measure(MetricConfig config, long now) { + return value; + } + } +}