MINOR: Add HttpMetricsReporter for system tests

Author: Ewen Cheslack-Postava <meewencp.org>

Reviewers: Apurva Mehta <apurvaconfluent.io>, Ismael Juma <ismaeljuma.me.uk>

Closes #4072 from ewencp/http-metrics

*More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.*

*Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.*

Author: Ewen Cheslack-Postava <me@ewencp.org>

Reviewers: Apurva Mehta <apurva@confluent.io>, Ismael Juma <ismael@juma.me.uk>

Closes #4207 from ewencp/http-metrics-0.11.0
This commit is contained in:
Ewen Cheslack-Postava 2017-11-20 14:47:29 -08:00
parent 4b5c82a8ee
commit 686c02cf35
13 changed files with 939 additions and 50 deletions

View File

@ -39,7 +39,7 @@ before_install:
script:
- ./gradlew rat
- ./gradlew releaseTarGz && /bin/bash ./tests/docker/run_tests.sh
- ./gradlew systemTestLibs && /bin/bash ./tests/docker/run_tests.sh
services:
- docker

View File

@ -807,6 +807,10 @@ project(':tools') {
testCompile project(':clients')
testCompile libs.junit
testCompile project(':clients').sourceSets.test.output
testCompile libs.easymock
testCompile libs.powermock
testCompile libs.powermockEasymock
}
javadoc {

View File

@ -28,7 +28,8 @@ public final class KafkaMetric implements Metric {
private final Measurable measurable;
private MetricConfig config;
KafkaMetric(Object lock, MetricName metricName, Measurable measurable, MetricConfig config, Time time) {
// public for testing
public KafkaMetric(Object lock, MetricName metricName, Measurable measurable, MetricConfig config, Time time) {
super();
this.metricName = metricName;
this.lock = lock;

View File

@ -10,9 +10,8 @@ Running tests using docker
--------------------------
Docker containers can be used for running kafka system tests locally.
* Requirements
- Docker 1.12.3 is installed and running on the machine.
- Test require a single kafka_*SNAPSHOT.tgz to be present in core/build/distributions.
This can be done by running ./gradlew clean releaseTarGz
- Docker 1.12.3 (or higher) is installed and running on the machine.
- Test require that Kafka, including system test libs, is built. This can be done by running ./gradlew clean systemTestLibs
* Run all tests
```
bash tests/docker/run_tests.sh
@ -80,7 +79,7 @@ This produces a json about the build which looks like:
],
"before_install": null,
"script": [
"./gradlew releaseTarGz && /bin/bash ./tests/travis/run_tests.sh"
"./gradlew systemTestLibs && /bin/bash ./tests/travis/run_tests.sh"
],
"services": [
"docker"
@ -128,7 +127,7 @@ This produces a json about the build which looks like:
"jdk": "oraclejdk8",
"before_install": null,
"script": [
"./gradlew releaseTarGz && /bin/bash ./tests/travis/run_tests.sh"
"./gradlew systemTestLibs && /bin/bash ./tests/travis/run_tests.sh"
],
"services": [
"docker"
@ -165,7 +164,7 @@ This produces a json about the build which looks like:
"jdk": "oraclejdk8",
"before_install": null,
"script": [
"./gradlew releaseTarGz && /bin/bash ./tests/travis/run_tests.sh"
"./gradlew systemTestLibs && /bin/bash ./tests/travis/run_tests.sh"
],
"services": [
"docker"
@ -215,7 +214,7 @@ The resulting json looks like:
"jdk": "oraclejdk8",
"before_install": null,
"script": [
"./gradlew releaseTarGz && /bin/bash ./tests/travis/run_tests.sh"
"./gradlew systemTestLibs && /bin/bash ./tests/travis/run_tests.sh"
],
"services": [
"docker"
@ -252,7 +251,7 @@ The resulting json looks like:
"jdk": "oraclejdk8",
"before_install": null,
"script": [
"./gradlew releaseTarGz && /bin/bash ./tests/travis/run_tests.sh"
"./gradlew systemTestLibs && /bin/bash ./tests/travis/run_tests.sh"
],
"services": [
"docker"

View File

@ -372,8 +372,7 @@ ducker_test() {
fi
done
must_pushd "${kafka_dir}"
ls ./core/build/distributions/kafka_*.tgz &> /dev/null
[[ $? -eq 0 ]] || die "Failed to find core/build/distributions/kafka_*.tgz. Did you run ./gradlew releaseTarGz?"
(test -f ./gradlew || gradle) && ./gradlew systemTestLibs
must_popd
cmd="cd /opt/kafka-dev && ducktape --cluster-file /opt/kafka-dev/tests/docker/build/cluster.json $args"
echo "docker exec -it ducker01 bash -c \"${cmd}\""

View File

@ -106,25 +106,25 @@ class KafkaSystemTestPathResolver(object):
self.context = context
self.project = project
def home(self, node_or_version=DEV_BRANCH):
def home(self, node_or_version=DEV_BRANCH, project=None):
version = self._version(node_or_version)
home_dir = self.project
home_dir = project or self.project
if version is not None:
home_dir += "-%s" % str(version)
return os.path.join(KAFKA_INSTALL_ROOT, home_dir)
def bin(self, node_or_version=DEV_BRANCH):
def bin(self, node_or_version=DEV_BRANCH, project=None):
version = self._version(node_or_version)
return os.path.join(self.home(version), "bin")
return os.path.join(self.home(version, project=project), "bin")
def script(self, script_name, node_or_version=DEV_BRANCH):
def script(self, script_name, node_or_version=DEV_BRANCH, project=None):
version = self._version(node_or_version)
return os.path.join(self.bin(version), script_name)
return os.path.join(self.bin(version, project=project), script_name)
def jar(self, jar_name, node_or_version=DEV_BRANCH):
def jar(self, jar_name, node_or_version=DEV_BRANCH, project=None):
version = self._version(node_or_version)
return os.path.join(self.home(version), JARS[str(version)][jar_name])
return os.path.join(self.home(version, project=project), JARS[str(version)][jar_name])
def scratch_space(self, service_instance):
return os.path.join(SCRATCH_ROOT, service_instance.service_id)

View File

@ -0,0 +1,226 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
from collections import defaultdict, namedtuple
import json
from threading import Thread
from select import select
import socket
MetricKey = namedtuple('MetricKey', ['host', 'client_id', 'name', 'group', 'tags'])
MetricValue = namedtuple('MetricValue', ['time', 'value'])
# Python's logging library doesn't define anything more detailed than DEBUG, but we'd like a finer-grained setting for
# for highly detailed messages, e.g. logging every single incoming request.
TRACE = 5
class HttpMetricsCollector(object):
"""
HttpMetricsCollector enables collection of metrics from various Kafka clients instrumented with the
PushHttpMetricsReporter. It starts a web server locally and provides the necessary configuration for clients
to automatically report metrics data to this server. It also provides basic functionality for querying the
recorded metrics. This class can be used either as a mixin or standalone object.
"""
# The port to listen on on the worker node, which will be forwarded to the port listening on this driver node
REMOTE_PORT = 6789
def __init__(self, **kwargs):
"""
Create a new HttpMetricsCollector
:param period the period, in seconds, between updates that the metrics reporter configuration should define.
defaults to reporting once per second
:param args:
:param kwargs:
"""
self._http_metrics_period = kwargs.pop('period', 1)
super(HttpMetricsCollector, self).__init__(**kwargs)
# TODO: currently we maintain just a simple map from all key info -> value. However, some key fields are far
# more common to filter on, so we'd want to index by them, e.g. host, client.id, metric name.
self._http_metrics = defaultdict(list)
self._httpd = HTTPServer(('', 0), _MetricsReceiver)
self._httpd.parent = self
self._httpd.metrics = self._http_metrics
self._http_metrics_thread = Thread(target=self._run_http_metrics_httpd,
name='http-metrics-thread[%s]' % str(self))
self._http_metrics_thread.start()
self._forwarders = {}
@property
def http_metrics_url(self):
"""
:return: the URL to use when reporting metrics
"""
return "http://%s:%d" % ("localhost", self.REMOTE_PORT)
@property
def http_metrics_client_configs(self):
"""
Get client configurations that can be used to report data to this collector. Put these in a properties file for
clients (e.g. console producer or consumer) to have them push metrics to this driver. Note that in some cases
(e.g. streams, connect) these settings may need to be prefixed.
:return: a dictionary of client configurations that will direct a client to report metrics to this collector
"""
return {
"metric.reporters": "org.apache.kafka.tools.PushHttpMetricsReporter",
"metrics.url": self.http_metrics_url,
"metrics.period": self._http_metrics_period,
}
def start_node(self, node):
local_port = self._httpd.socket.getsockname()[1]
self.logger.debug('HttpMetricsCollector listening on %s', local_port)
self._forwarders[self.idx(node)] = _ReverseForwarder(self.logger, node, self.REMOTE_PORT, local_port)
super(HttpMetricsCollector, self).start_node(node)
def stop(self):
super(HttpMetricsCollector, self).stop()
if self._http_metrics_thread:
self.logger.debug("Shutting down metrics httpd")
self._httpd.shutdown()
self._http_metrics_thread.join()
self.logger.debug("Finished shutting down metrics httpd")
def stop_node(self, node):
super(HttpMetricsCollector, self).stop_node(node)
idx = self.idx(node)
self._forwarders[idx].stop()
del self._forwarders[idx]
def metrics(self, host=None, client_id=None, name=None, group=None, tags=None):
"""
Get any collected metrics that match the specified parameters, yielding each as a tuple of
(key, [<timestamp, value>, ...]) values.
"""
for k, values in self._http_metrics.iteritems():
if ((host is None or host == k.host) and
(client_id is None or client_id == k.client_id) and
(name is None or name == k.name) and
(group is None or group == k.group) and
(tags is None or tags == k.tags)):
yield (k, values)
def _run_http_metrics_httpd(self):
self._httpd.serve_forever()
class _MetricsReceiver(BaseHTTPRequestHandler):
"""
HTTP request handler that accepts requests from the PushHttpMetricsReporter and stores them back into the parent
HttpMetricsCollector
"""
def log_message(self, format, *args, **kwargs):
# Don't do any logging here so we get rid of the mostly useless per-request Apache log-style info that spams
# the debug log
pass
def do_POST(self):
data = self.rfile.read(int(self.headers['Content-Length']))
data = json.loads(data)
self.server.parent.logger.log(TRACE, "POST %s\n\n%s\n%s", self.path, self.headers,
json.dumps(data, indent=4, separators=(',', ': ')))
self.send_response(204)
self.end_headers()
client = data['client']
host = client['host']
client_id = client['client_id']
ts = client['time']
metrics = data['metrics']
for raw_metric in metrics:
name = raw_metric['name']
group = raw_metric['group']
# Convert to tuple of pairs because dicts & lists are unhashable
tags = tuple([(k, v) for k, v in raw_metric['tags'].iteritems()]),
value = raw_metric['value']
key = MetricKey(host=host, client_id=client_id, name=name, group=group, tags=tags)
metric_value = MetricValue(time=ts, value=value)
self.server.metrics[key].append(metric_value)
class _ReverseForwarder(object):
"""
Runs reverse forwarding of a port on a node to a local port. This allows you to setup a server on the test driver
that only assumes we have basic SSH access that ducktape guarantees is available for worker nodes.
"""
def __init__(self, logger, node, remote_port, local_port):
self.logger = logger
self._node = node
self._local_port = local_port
self.logger.debug('Forwarding %s port %d to driver port %d', node, remote_port, local_port)
self._stopping = False
self._transport = node.account.ssh_client.get_transport()
self._transport.request_port_forward('', remote_port)
self._accept_thread = Thread(target=self._accept)
self._accept_thread.start()
def stop(self):
self._stopping = True
self._accept_thread.join(30)
if self._accept_thread.isAlive():
raise RuntimeError("Failed to stop reverse forwarder on %s", self._node)
def _accept(self):
while not self._stopping:
chan = self._transport.accept(1)
if chan is None:
continue
thr = Thread(target=self._handler, args=(chan,))
thr.setDaemon(True)
thr.start()
def _handler(self, chan):
sock = socket.socket()
try:
sock.connect(("localhost", self._local_port))
except Exception as e:
self.logger.error('Forwarding request to port %d failed: %r', self._local_port, e)
return
self.logger.log(TRACE, 'Connected! Tunnel open %r -> %r -> %d', chan.origin_addr, chan.getpeername(),
self._local_port)
while True:
r, w, x = select([sock, chan], [], [])
if sock in r:
data = sock.recv(1024)
if len(data) == 0:
break
chan.send(data)
if chan in r:
data = chan.recv(1024)
if len(data) == 0:
break
sock.send(data)
chan.close()
sock.close()
self.logger.log(TRACE, 'Tunnel closed from %r', chan.origin_addr)

View File

@ -19,7 +19,7 @@ from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
class PerformanceService(KafkaPathResolverMixin, BackgroundThreadService):
def __init__(self, context, num_nodes, stop_timeout_sec=30):
def __init__(self, context=None, num_nodes=0, stop_timeout_sec=30):
super(PerformanceService, self).__init__(context, num_nodes)
self.results = [None] * self.num_nodes
self.stats = [[] for x in range(self.num_nodes)]

View File

@ -19,13 +19,13 @@ from ducktape.utils.util import wait_until
from ducktape.cluster.remoteaccount import RemoteCommandError
from kafkatest.directory_layout.kafka_path import TOOLS_JAR_NAME, TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME
from kafkatest.services.monitor.jmx import JmxMixin
from kafkatest.services.monitor.http import HttpMetricsCollector
from kafkatest.services.performance import PerformanceService
from kafkatest.services.security.security_config import SecurityConfig
from kafkatest.version import DEV_BRANCH, V_0_9_0_0
class ProducerPerformanceService(JmxMixin, PerformanceService):
class ProducerPerformanceService(HttpMetricsCollector, PerformanceService):
PERSISTENT_ROOT = "/mnt/producer_performance"
STDOUT_CAPTURE = os.path.join(PERSISTENT_ROOT, "producer_performance.stdout")
@ -35,10 +35,9 @@ class ProducerPerformanceService(JmxMixin, PerformanceService):
LOG4J_CONFIG = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties")
def __init__(self, context, num_nodes, kafka, topic, num_records, record_size, throughput, version=DEV_BRANCH, settings=None,
intermediate_stats=False, client_id="producer-performance", jmx_object_names=None, jmx_attributes=None):
intermediate_stats=False, client_id="producer-performance"):
JmxMixin.__init__(self, num_nodes, jmx_object_names, jmx_attributes or [])
PerformanceService.__init__(self, context, num_nodes)
super(ProducerPerformanceService, self).__init__(context=context, num_nodes=num_nodes)
self.logs = {
"producer_performance_stdout": {
@ -49,12 +48,7 @@ class ProducerPerformanceService(JmxMixin, PerformanceService):
"collect_default": True},
"producer_performance_log": {
"path": ProducerPerformanceService.LOG_FILE,
"collect_default": True},
"jmx_log": {
"path": "/mnt/jmx_tool.log",
"collect_default": jmx_object_names is not None
}
"collect_default": True}
}
self.kafka = kafka
@ -82,9 +76,9 @@ class ProducerPerformanceService(JmxMixin, PerformanceService):
args = self.args.copy()
args.update({
'bootstrap_servers': self.kafka.bootstrap_servers(self.security_config.security_protocol),
'jmx_port': self.jmx_port,
'client_id': self.client_id,
'kafka_run_class': self.path.script("kafka-run-class.sh", node)
'kafka_run_class': self.path.script("kafka-run-class.sh", node),
'metrics_props': ' '.join(["%s=%s" % (k, v) for k, v in self.http_metrics_client_configs.iteritems()])
})
cmd = ""
@ -94,14 +88,15 @@ class ProducerPerformanceService(JmxMixin, PerformanceService):
# tool from the development branch
tools_jar = self.path.jar(TOOLS_JAR_NAME, DEV_BRANCH)
tools_dependant_libs_jar = self.path.jar(TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME, DEV_BRANCH)
tools_dependant_libs_jar = self.path.jar(TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME, DEV_BRANCH)
cmd += "for file in %s; do CLASSPATH=$CLASSPATH:$file; done; " % tools_jar
cmd += "for file in %s; do CLASSPATH=$CLASSPATH:$file; done; " % tools_dependant_libs_jar
for jar in (tools_jar, tools_dependant_libs_jar):
cmd += "for file in %s; do CLASSPATH=$CLASSPATH:$file; done; " % jar
cmd += "export CLASSPATH; "
cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % ProducerPerformanceService.LOG4J_CONFIG
cmd += "JMX_PORT=%(jmx_port)d KAFKA_OPTS=%(kafka_opts)s KAFKA_HEAP_OPTS=\"-XX:+HeapDumpOnOutOfMemoryError\" %(kafka_run_class)s org.apache.kafka.tools.ProducerPerformance " \
"--topic %(topic)s --num-records %(num_records)d --record-size %(record_size)d --throughput %(throughput)d --producer-props bootstrap.servers=%(bootstrap_servers)s client.id=%(client_id)s" % args
cmd += "KAFKA_OPTS=%(kafka_opts)s KAFKA_HEAP_OPTS=\"-XX:+HeapDumpOnOutOfMemoryError\" %(kafka_run_class)s org.apache.kafka.tools.ProducerPerformance " \
"--topic %(topic)s --num-records %(num_records)d --record-size %(record_size)d --throughput %(throughput)d --producer-props bootstrap.servers=%(bootstrap_servers)s client.id=%(client_id)s %(metrics_props)s" % args
self.security_config.setup_node(node)
if self.security_config.security_protocol != SecurityConfig.PLAINTEXT:
@ -125,7 +120,6 @@ class ProducerPerformanceService(JmxMixin, PerformanceService):
return len(self.pids(node)) > 0
def _worker(self, idx, node):
node.account.ssh("mkdir -p %s" % ProducerPerformanceService.PERSISTENT_ROOT, allow_fail=False)
# Create and upload log properties
@ -144,13 +138,10 @@ class ProducerPerformanceService(JmxMixin, PerformanceService):
if first_line is None:
raise Exception("No output from ProducerPerformance")
self.start_jmx_tool(idx, node)
wait_until(lambda: not self.alive(node), timeout_sec=1200, backoff_sec=2, err_msg="ProducerPerformance failed to finish")
elapsed = time.time() - start
self.logger.debug("ProducerPerformance process ran for %s seconds" % elapsed)
self.read_jmx_output(idx, node)
# parse producer output from file
last = None
producer_output = node.account.ssh_capture("cat %s" % ProducerPerformanceService.STDOUT_CAPTURE)

View File

@ -136,8 +136,7 @@ class QuotaTest(Test):
# Produce all messages
producer = ProducerPerformanceService(
self.test_context, producer_num, self.kafka,
topic=self.topic, num_records=self.num_records, record_size=self.record_size, throughput=-1, client_id=producer_client_id,
jmx_object_names=['kafka.producer:type=producer-metrics,client-id=%s' % producer_client_id], jmx_attributes=['outgoing-byte-rate'])
topic=self.topic, num_records=self.num_records, record_size=self.record_size, throughput=-1, client_id=producer_client_id)
producer.run()
@ -178,8 +177,9 @@ class QuotaTest(Test):
msg += "number of produced messages %d doesn't equal number of consumed messages %d" % (produced_num, consumed_num)
# validate that maximum_producer_throughput <= producer_quota * (1 + maximum_client_deviation_percentage/100)
producer_attribute_name = 'kafka.producer:type=producer-metrics,client-id=%s:outgoing-byte-rate' % producer.client_id
producer_maximum_bps = producer.maximum_jmx_value[producer_attribute_name]
producer_maximum_bps = max(
metric.value for k, metrics in producer.metrics(group='producer-metrics', name='outgoing-byte-rate', client_id=producer.client_id) for metric in metrics
)
producer_quota_bps = self.quota_config.producer_quota
self.logger.info('producer has maximum throughput %.2f bps with producer quota %.2f bps' % (producer_maximum_bps, producer_quota_bps))
if producer_maximum_bps > producer_quota_bps*(self.maximum_client_deviation_percentage/100+1):

View File

@ -150,9 +150,7 @@ class ThrottlingTest(ProduceConsumeValidateTest):
bulk_producer = ProducerPerformanceService(
context=self.test_context, num_nodes=1, kafka=self.kafka,
topic=self.topic, num_records=self.num_records,
record_size=self.record_size, throughput=-1, client_id=producer_id,
jmx_object_names=['kafka.producer:type=producer-metrics,client-id=%s' % producer_id],
jmx_attributes=['outgoing-byte-rate'])
record_size=self.record_size, throughput=-1, client_id=producer_id)
self.producer = VerifiableProducer(context=self.test_context,
@ -173,3 +171,9 @@ class ThrottlingTest(ProduceConsumeValidateTest):
bulk_producer.run()
self.run_produce_consume_validate(core_test_action=
lambda: self.reassign_partitions(bounce_brokers, self.throttle))
self.logger.debug("Bulk producer outgoing-byte-rates: %s",
(metric.value for k, metrics in
bulk_producer.metrics(group='producer-metrics', name='outgoing-byte-rate', client_id=producer_id) for
metric in metrics)
)

View File

@ -0,0 +1,332 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tools;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.InetAddress;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Scanner;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* MetricsReporter that aggregates metrics data and reports it via HTTP requests to a configurable
* webhook endpoint in JSON format.
*
* This is an internal class used for system tests and does not provide any compatibility guarantees.
*/
public class PushHttpMetricsReporter implements MetricsReporter {
private static final Logger log = LoggerFactory.getLogger(PushHttpMetricsReporter.class);
private static final String METRICS_PREFIX = "metrics.";
static final String METRICS_URL_CONFIG = METRICS_PREFIX + "url";
static final String METRICS_PERIOD_CONFIG = METRICS_PREFIX + "period";
static final String METRICS_HOST_CONFIG = METRICS_PREFIX + "host";
static final String CLIENT_ID_CONFIG = ProducerConfig.CLIENT_ID_CONFIG;
private static final Map<String, String> HEADERS = new LinkedHashMap<>();
static {
HEADERS.put("Content-Type", "application/json");
}
private final Object lock = new Object();
private final Time time;
private final ScheduledExecutorService executor;
// The set of metrics are updated in init/metricChange/metricRemoval
private final Map<MetricName, KafkaMetric> metrics = new LinkedHashMap<>();
private final ObjectMapper json = new ObjectMapper();
// Non-final because these are set via configure()
private URL url;
private String host;
private String clientId;
private static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(METRICS_URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH,
"The URL to report metrics to")
.define(METRICS_PERIOD_CONFIG, ConfigDef.Type.INT, ConfigDef.Importance.HIGH,
"The frequency at which metrics should be reported, in second")
.define(METRICS_HOST_CONFIG, ConfigDef.Type.STRING, "", ConfigDef.Importance.LOW,
"The hostname to report with each metric; if empty, defaults to the FQDN that can be automatically" +
"determined")
.define(CLIENT_ID_CONFIG, ConfigDef.Type.STRING, "", ConfigDef.Importance.LOW,
"Client ID to identify the application, generally inherited from the " +
"producer/consumer/streams/connect instance");
public PushHttpMetricsReporter() {
time = new SystemTime();
executor = Executors.newSingleThreadScheduledExecutor();
}
PushHttpMetricsReporter(Time mockTime, ScheduledExecutorService mockExecutor) {
time = mockTime;
executor = mockExecutor;
}
@Override
public void configure(Map<String, ?> configs) {
PushHttpMetricsReporterConfig config = new PushHttpMetricsReporterConfig(CONFIG_DEF, configs);
try {
url = new URL(config.getString(METRICS_URL_CONFIG));
} catch (MalformedURLException e) {
throw new ConfigException("Malformed metrics.url", e);
}
int period = config.getInteger(METRICS_PERIOD_CONFIG);
clientId = config.getString(CLIENT_ID_CONFIG);
host = config.getString(METRICS_HOST_CONFIG);
if (host == null || host.isEmpty()) {
try {
host = InetAddress.getLocalHost().getCanonicalHostName();
} catch (UnknownHostException e) {
throw new ConfigException("Failed to get canonical hostname", e);
}
}
executor.scheduleAtFixedRate(new HttpReporter(), period, period, TimeUnit.SECONDS);
log.info("Configured PushHttpMetricsReporter for {} to report every {} seconds", url, period);
}
@Override
public void init(List<KafkaMetric> initMetrics) {
synchronized (lock) {
for (KafkaMetric metric : initMetrics) {
log.debug("Adding metric {}", metric.metricName());
metrics.put(metric.metricName(), metric);
}
}
}
@Override
public void metricChange(KafkaMetric metric) {
synchronized (lock) {
log.debug("Updating metric {}", metric.metricName());
metrics.put(metric.metricName(), metric);
}
}
@Override
public void metricRemoval(KafkaMetric metric) {
synchronized (lock) {
log.debug("Removing metric {}", metric.metricName());
metrics.remove(metric.metricName());
}
}
@Override
public void close() {
executor.shutdown();
try {
executor.awaitTermination(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new KafkaException("Interrupted when shutting down PushHttpMetricsReporter", e);
}
}
private class HttpReporter implements Runnable {
@Override
public void run() {
long now = time.milliseconds();
final List<MetricValue> samples;
synchronized (lock) {
samples = new ArrayList<>(metrics.size());
for (KafkaMetric metric : metrics.values()) {
MetricName name = metric.metricName();
double value = metric.value();
samples.add(new MetricValue(name.name(), name.group(), name.tags(), value));
}
}
MetricsReport report = new MetricsReport(new MetricClientInfo(host, clientId, now), samples);
log.trace("Reporting {} metrics to {}", samples.size(), url);
HttpURLConnection connection = null;
try {
connection = newHttpConnection(url);
connection.setRequestMethod("POST");
// connection.getResponseCode() implicitly calls getInputStream, so always set to true.
// On the other hand, leaving this out breaks nothing.
connection.setDoInput(true);
connection.setRequestProperty("Content-Type", "application/json");
byte[] data = json.writeValueAsBytes(report);
connection.setRequestProperty("Content-Length", Integer.toString(data.length));
connection.setRequestProperty("Accept", "*/*");
connection.setUseCaches(false);
connection.setDoOutput(true);
try (OutputStream os = connection.getOutputStream()) {
os.write(data);
os.flush();
}
int responseCode = connection.getResponseCode();
if (responseCode >= 400) {
InputStream is = connection.getErrorStream();
String msg = readResponse(is);
log.error("Error reporting metrics, {}: {}", responseCode, msg);
} else if (responseCode >= 300) {
log.error("PushHttpMetricsReporter does not currently support redirects, saw {}", responseCode);
} else {
log.info("Finished reporting metrics with response code {}", responseCode);
}
} catch (Exception e) {
log.error("Error reporting metrics", e);
throw new KafkaException("Failed to report current metrics", e);
} finally {
if (connection != null) {
connection.disconnect();
}
}
}
}
// Static package-private so unit tests can use a mock connection
static HttpURLConnection newHttpConnection(URL url) throws IOException {
return (HttpURLConnection) url.openConnection();
}
// Static package-private so unit tests can mock reading response
static String readResponse(InputStream is) {
try (Scanner s = new Scanner(is, StandardCharsets.UTF_8.name()).useDelimiter("\\A")) {
return s.hasNext() ? s.next() : "";
}
}
private static class MetricsReport {
private final MetricClientInfo client;
private final Collection<MetricValue> metrics;
MetricsReport(MetricClientInfo client, Collection<MetricValue> metrics) {
this.client = client;
this.metrics = metrics;
}
@JsonProperty
public MetricClientInfo client() {
return client;
}
@JsonProperty
public Collection<MetricValue> metrics() {
return metrics;
}
}
private static class MetricClientInfo {
private final String host;
private final String clientId;
private final long time;
MetricClientInfo(String host, String clientId, long time) {
this.host = host;
this.clientId = clientId;
this.time = time;
}
@JsonProperty
public String host() {
return host;
}
@JsonProperty("client_id")
public String clientId() {
return clientId;
}
@JsonProperty
public long time() {
return time;
}
}
private static class MetricValue {
private final String name;
private final String group;
private final Map<String, String> tags;
private final Object value;
MetricValue(String name, String group, Map<String, String> tags, Object value) {
this.name = name;
this.group = group;
this.tags = tags;
this.value = value;
}
@JsonProperty
public String name() {
return name;
}
@JsonProperty
public String group() {
return group;
}
@JsonProperty
public Map<String, String> tags() {
return tags;
}
@JsonProperty
public Object value() {
return value;
}
}
// The signature for getInt changed from returning int to Integer so to remain compatible with 0.8.2.2 jars
// for system tests we replace it with a custom version that works for all versions.
private static class PushHttpMetricsReporterConfig extends AbstractConfig {
public PushHttpMetricsReporterConfig(ConfigDef definition, Map<?, ?> originals) {
super(definition, originals);
}
public Integer getInteger(String key) {
return (Integer) get(key);
}
}
}

View File

@ -0,0 +1,333 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tools;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
import org.powermock.api.easymock.annotation.MockStrict;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.InetAddress;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.powermock.api.easymock.PowerMock.replayAll;
import static org.powermock.api.easymock.PowerMock.verifyAll;
@RunWith(PowerMockRunner.class)
@PrepareForTest(PushHttpMetricsReporter.class)
public class PushHttpMetricsReporterTest {
private static final URL URL;
static {
try {
URL = new URL("http://fake:80");
} catch (MalformedURLException e) {
throw new RuntimeException(e);
}
}
private PushHttpMetricsReporter reporter;
private Time time = new MockTime();
@MockStrict
private ScheduledExecutorService executor;
private Capture<Runnable> reportRunnable = EasyMock.newCapture();
@MockStrict
private HttpURLConnection httpReq;
@MockStrict
private OutputStream httpOut;
private Capture<byte[]> httpPayload = EasyMock.newCapture();
@MockStrict
private InputStream httpErr;
@Before
public void setUp() {
reporter = new PushHttpMetricsReporter(time, executor);
PowerMock.mockStatic(PushHttpMetricsReporter.class);
}
@Test
public void testConfigureClose() throws Exception {
expectConfigure();
expectClose();
replayAll();
configure();
reporter.close();
verifyAll();
}
@Test(expected = ConfigException.class)
public void testConfigureBadUrl() throws Exception {
Map<String, String> config = new HashMap<>();
config.put(PushHttpMetricsReporter.METRICS_URL_CONFIG, "malformed;url");
config.put(PushHttpMetricsReporter.METRICS_PERIOD_CONFIG, "5");
reporter.configure(config);
}
@Test(expected = ConfigException.class)
public void testConfigureMissingPeriod() throws Exception {
Map<String, String> config = new HashMap<>();
config.put(PushHttpMetricsReporter.METRICS_URL_CONFIG, URL.toString());
reporter.configure(config);
}
@Test
public void testNoMetrics() throws Exception {
expectConfigure();
expectRequest(200);
expectClose();
replayAll();
configure();
reportRunnable.getValue().run();
JsonNode payload = new ObjectMapper().readTree(httpPayload.getValue());
assertTrue(payload.isObject());
assertPayloadHasClientInfo(payload);
// Should contain an empty list of metrics, i.e. we report updates even if there are no metrics to report to
// indicate liveness
JsonNode metrics = payload.get("metrics");
assertTrue(metrics.isArray());
assertEquals(0, metrics.size());
reporter.close();
verifyAll();
}
// For error conditions, we expect them to come with a response body that we can read & log
@Test
public void testClientError() throws Exception {
expectConfigure();
expectRequest(400, true);
expectClose();
replayAll();
configure();
reportRunnable.getValue().run();
reporter.close();
verifyAll();
}
@Test
public void testServerError() throws Exception {
expectConfigure();
expectRequest(500, true);
expectClose();
replayAll();
configure();
reportRunnable.getValue().run();
reporter.close();
verifyAll();
}
@Test
public void testMetricValues() throws Exception {
expectConfigure();
expectRequest(200);
expectClose();
replayAll();
configure();
KafkaMetric metric1 = new KafkaMetric(
new Object(),
new MetricName("name1", "group1", "desc1", Collections.singletonMap("key1", "value1")),
new ImmutableValue(1.0),
null,
time
);
KafkaMetric newMetric1 = new KafkaMetric(
new Object(),
new MetricName("name1", "group1", "desc1", Collections.singletonMap("key1", "value1")),
new ImmutableValue(-1.0),
null,
time
);
KafkaMetric metric2 = new KafkaMetric(
new Object(),
new MetricName("name2", "group2", "desc2", Collections.singletonMap("key2", "value2")),
new ImmutableValue(2.0),
null,
time
);
KafkaMetric metric3 = new KafkaMetric(
new Object(),
new MetricName("name3", "group3", "desc3", Collections.singletonMap("key3", "value3")),
new ImmutableValue(3.0),
null,
time
);
reporter.init(Arrays.asList(metric1, metric2));
reporter.metricChange(newMetric1); // added in init, modified
reporter.metricChange(metric3); // added by change
reporter.metricRemoval(metric2); // added in init, deleted by removal
reportRunnable.getValue().run();
JsonNode payload = new ObjectMapper().readTree(httpPayload.getValue());
assertTrue(payload.isObject());
assertPayloadHasClientInfo(payload);
// We should be left with the modified version of metric1 and metric3
JsonNode metrics = payload.get("metrics");
assertTrue(metrics.isArray());
assertEquals(2, metrics.size());
JsonNode m1 = metrics.get(0);
assertEquals("name1", m1.get("name").textValue());
assertEquals("group1", m1.get("group").textValue());
JsonNode m1Tags = m1.get("tags");
assertTrue(m1Tags.isObject());
assertEquals(1, m1Tags.size());
assertEquals("value1", m1Tags.get("key1").textValue());
assertEquals(-1.0, m1.get("value").doubleValue(), 0.0);
JsonNode m3 = metrics.get(1);
assertEquals("name3", m3.get("name").textValue());
assertEquals("group3", m3.get("group").textValue());
JsonNode m3Tags = m3.get("tags");
assertTrue(m3Tags.isObject());
assertEquals(1, m3Tags.size());
assertEquals("value3", m3Tags.get("key3").textValue());
assertEquals(3.0, m3.get("value").doubleValue(), 0.0);
reporter.close();
verifyAll();
}
private void expectConfigure() {
EasyMock.expect(
executor.scheduleAtFixedRate(EasyMock.capture(reportRunnable), EasyMock.eq(5L), EasyMock.eq(5L), EasyMock.eq(TimeUnit.SECONDS))
).andReturn(null); // return value not expected to be used
}
private void configure() {
Map<String, String> config = new HashMap<>();
config.put(PushHttpMetricsReporter.METRICS_URL_CONFIG, URL.toString());
config.put(PushHttpMetricsReporter.METRICS_PERIOD_CONFIG, "5");
reporter.configure(config);
}
private void expectRequest(int returnStatus) throws Exception {
expectRequest(returnStatus, false);
}
// Expect that a request is made with the given response code
private void expectRequest(int returnStatus, boolean readResponse) throws Exception {
EasyMock.expect(PushHttpMetricsReporter.newHttpConnection(URL)).andReturn(httpReq);
httpReq.setRequestMethod("POST");
EasyMock.expectLastCall();
httpReq.setDoInput(true);
EasyMock.expectLastCall();
httpReq.setRequestProperty("Content-Type", "application/json");
EasyMock.expectLastCall();
httpReq.setRequestProperty(EasyMock.eq("Content-Length"), EasyMock.anyString());
EasyMock.expectLastCall();
httpReq.setRequestProperty("Accept", "*/*");
EasyMock.expectLastCall();
httpReq.setUseCaches(false);
EasyMock.expectLastCall();
httpReq.setDoOutput(true);
EasyMock.expectLastCall();
EasyMock.expect(httpReq.getOutputStream()).andReturn(httpOut);
httpOut.write(EasyMock.capture(httpPayload));
EasyMock.expectLastCall();
httpOut.flush();
EasyMock.expectLastCall();
httpOut.close();
EasyMock.expectLastCall();
EasyMock.expect(httpReq.getResponseCode()).andReturn(returnStatus);
if (readResponse)
expectReadResponse();
httpReq.disconnect();
EasyMock.expectLastCall();
}
private void assertPayloadHasClientInfo(JsonNode payload) throws UnknownHostException {
// Should contain client info...
JsonNode client = payload.get("client");
assertTrue(client.isObject());
assertEquals(InetAddress.getLocalHost().getCanonicalHostName(), client.get("host").textValue());
assertEquals("", client.get("client_id").textValue());
assertEquals(time.milliseconds(), client.get("time").longValue());
}
private void expectReadResponse() throws Exception {
EasyMock.expect(httpReq.getErrorStream()).andReturn(httpErr);
EasyMock.expect(PushHttpMetricsReporter.readResponse(httpErr)).andReturn("error response message");
EasyMock.expectLastCall();
}
private void expectClose() throws Exception {
executor.shutdown();
EasyMock.expect(executor.awaitTermination(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class))).andReturn(true);
}
private static class ImmutableValue implements Measurable {
private final double value;
public ImmutableValue(double value) {
this.value = value;
}
@Override
public double measure(MetricConfig config, long now) {
return value;
}
}
}