From ae237be1bbfd7d1b84a5095491a6131cd3cc9346 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Tue, 16 Aug 2016 10:43:12 -0700 Subject: [PATCH] KAFKA-3769: Create new sensors per-thread in KafkaStreams Author: Guozhang Wang Reviewers: Damian Guy , Matthias J. Sax , Michael G. Noll , Greg Fodor , Ismael Juma , Ewen Cheslack-Postava Closes #1530 from guozhangwang/K3769-per-thread-metrics --- .../clients/consumer/internals/Fetcher.java | 3 +- .../clients/producer/internals/Sender.java | 7 +- config/server.properties | 3 + .../apache/kafka/streams/StreamsMetrics.java | 12 ++ .../processor/internals/StreamThread.java | 159 +++++++++++------- .../kafka/streams/perf/SimpleBenchmark.java | 11 +- .../processor/internals/StreamThreadTest.java | 6 +- .../StreamThreadStateStoreProviderTest.java | 3 +- 8 files changed, 123 insertions(+), 81 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index fec9b6eb0e5..913ce9e422e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -807,8 +807,7 @@ public class Fetcher { String name = "topic." + topic + ".bytes-fetched"; Sensor bytesFetched = this.metrics.getSensor(name); if (bytesFetched == null) { - Map metricTags = new HashMap<>(1); - metricTags.put("topic", topic.replace('.', '_')); + Map metricTags = Collections.singletonMap("topic", topic.replace('.', '_')); bytesFetched = this.metrics.sensor(name); bytesFetched.add(this.metrics.metricName("fetch-size-avg", diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index f1852b54883..30f888773c2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -14,9 +14,9 @@ package org.apache.kafka.clients.producer.internals; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.Iterator; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -457,14 +457,13 @@ public class Sender implements Runnable { }); } - public void maybeRegisterTopicMetrics(String topic) { + private void maybeRegisterTopicMetrics(String topic) { // if one sensor of the metrics has been registered for the topic, // then all other sensors should have been registered; and vice versa String topicRecordsCountName = "topic." + topic + ".records-per-batch"; Sensor topicRecordCount = this.metrics.getSensor(topicRecordsCountName); if (topicRecordCount == null) { - Map metricTags = new LinkedHashMap(); - metricTags.put("topic", topic); + Map metricTags = Collections.singletonMap("topic", topic); String metricGrpName = "producer-topic-metrics"; topicRecordCount = this.metrics.sensor(topicRecordsCountName); diff --git a/config/server.properties b/config/server.properties index d1b1753132b..f00a7d6ff14 100644 --- a/config/server.properties +++ b/config/server.properties @@ -20,6 +20,9 @@ # The id of the broker. This must be set to a unique integer for each broker. broker.id=0 +# Switch to enable topic deletion or not, default value is false +#delete.topic.enable=true + ############################# Socket Server Settings ############################# # The address the socket server listens on. It will get the value returned from diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java b/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java index 70c332092ca..c0870c647fe 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java @@ -24,7 +24,19 @@ import org.apache.kafka.common.metrics.Sensor; */ public interface StreamsMetrics { + /** + * Add the latency sensor. + * + * @param scopeName Name of the scope, could be the type of the state store, etc. + * @param entityName Name of the entity, could be the name of the state store instance, etc. + * @param operationName Name of the operation, could be get / put / delete / etc. + * @param tags Additional tags of the sensor. + * @return The added sensor. + */ Sensor addLatencySensor(String scopeName, String entityName, String operationName, String... tags); + /** + * Record the given latency value of the sensor. + */ void recordLatency(Sensor sensor, long startNs, long endNs); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index f416443bf2a..50d77c36401 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -52,7 +52,6 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -83,6 +82,7 @@ public class StreamThread extends Thread { protected final Consumer consumer; protected final Consumer restoreConsumer; + private final String threadClientId; private final AtomicBoolean running; private final Map activeTasks; private final Map standbyTasks; @@ -98,8 +98,9 @@ public class StreamThread extends Thread { private StreamPartitionAssignor partitionAssignor = null; - private long lastClean; - private long lastCommit; + private long timerStartedMs; + private long lastCleanMs; + private long lastCommitMs; private Throwable rebalanceException = null; private Map>> standbyRecords; @@ -111,7 +112,7 @@ public class StreamThread extends Thread { try { addStreamTasks(assignment); addStandbyTasks(); - lastClean = time.milliseconds(); // start the cleaning cycle + lastCleanMs = time.milliseconds(); // start the cleaning cycle streamsMetadataState.onChange(partitionAssignor.getPartitionsByHostState(), partitionAssignor.clusterMetadata()); } catch (Throwable t) { rebalanceException = t; @@ -123,7 +124,7 @@ public class StreamThread extends Thread { public void onPartitionsRevoked(Collection assignment) { try { commitAll(); - lastClean = Long.MAX_VALUE; // stop the cleaning cycle until partitions are assigned + lastCleanMs = Long.MAX_VALUE; // stop the cleaning cycle until partitions are assigned } catch (Throwable t) { rebalanceException = t; throw t; @@ -160,7 +161,7 @@ public class StreamThread extends Thread { // set the producer and consumer clients String threadName = getName(); - String threadClientId = clientId + "-" + threadName; + threadClientId = clientId + "-" + threadName; log.info("Creating producer client for stream thread [{}]", threadName); this.producer = clientSupplier.getProducer(config.getProducerConfigs(threadClientId)); log.info("Creating consumer client for stream thread [{}]", threadName); @@ -187,9 +188,10 @@ public class StreamThread extends Thread { this.commitTimeMs = config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG); this.cleanTimeMs = config.getLong(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG); - this.lastClean = Long.MAX_VALUE; // the cleaning cycle won't start until partition assignment - this.lastCommit = time.milliseconds(); this.time = time; + this.timerStartedMs = time.milliseconds(); + this.lastCleanMs = Long.MAX_VALUE; // the cleaning cycle won't start until partition assignment + this.lastCommitMs = timerStartedMs; this.sensors = new StreamsMetricsImpl(metrics); @@ -274,10 +276,26 @@ public class StreamThread extends Thread { log.info("Stream thread shutdown complete [" + this.getName() + "]"); } + /** + * Compute the latency based on the current marked timestamp, + * and update the marked timestamp with the current system timestamp. + * + * @return latency + */ + private long computeLatency() { + long previousTimeMs = this.timerStartedMs; + this.timerStartedMs = time.milliseconds(); + + return Math.max(this.timerStartedMs - previousTimeMs, 0); + } + private void runLoop() { int totalNumBuffered = 0; - long lastPoll = 0L; boolean requiresPoll = true; + boolean polledRecords = false; + + // TODO: this can be removed after KIP-62 + long lastPoll = 0L; if (topicPattern != null) { consumer.subscribe(topicPattern, rebalanceListener); @@ -287,13 +305,15 @@ public class StreamThread extends Thread { while (stillRunning()) { + this.timerStartedMs = time.milliseconds(); + // try to fetch some records if necessary if (requiresPoll) { requiresPoll = false; - long startPoll = time.milliseconds(); + boolean longPoll = totalNumBuffered == 0; - ConsumerRecords records = consumer.poll(totalNumBuffered == 0 ? this.pollTimeMs : 0); + ConsumerRecords records = consumer.poll(longPoll ? this.pollTimeMs : 0); lastPoll = time.milliseconds(); if (rebalanceException != null) @@ -304,41 +324,51 @@ public class StreamThread extends Thread { StreamTask task = activeTasksByPartition.get(partition); task.addRecords(partition, records.records(partition)); } + polledRecords = true; + } else { + polledRecords = false; } - long endPoll = time.milliseconds(); - sensors.pollTimeSensor.record(endPoll - startPoll); + // only record poll latency is long poll is required + if (longPoll) { + sensors.pollTimeSensor.record(computeLatency()); + } } - totalNumBuffered = 0; - // try to process one fetch record from each task via the topology, and also trigger punctuate // functions if necessary, which may result in more records going through the topology in this loop - if (!activeTasks.isEmpty()) { - for (StreamTask task : activeTasks.values()) { - long startProcess = time.milliseconds(); + if (totalNumBuffered > 0 || polledRecords) { + totalNumBuffered = 0; - totalNumBuffered += task.process(); - requiresPoll = requiresPoll || task.requiresPoll(); + if (!activeTasks.isEmpty()) { + for (StreamTask task : activeTasks.values()) { - sensors.processTimeSensor.record(time.milliseconds() - startProcess); + totalNumBuffered += task.process(); - maybePunctuate(task); + requiresPoll = requiresPoll || task.requiresPoll(); - if (task.commitNeeded()) - commitOne(task, time.milliseconds()); - } + sensors.processTimeSensor.record(computeLatency()); - // if pollTimeMs has passed since the last poll, we poll to respond to a possible rebalance - // even when we paused all partitions. - if (lastPoll + this.pollTimeMs < time.milliseconds()) + maybePunctuate(task); + + if (task.commitNeeded()) + commitOne(task); + } + + // if pollTimeMs has passed since the last poll, we poll to respond to a possible rebalance + // even when we paused all partitions. + if (lastPoll + this.pollTimeMs < this.timerStartedMs) + requiresPoll = true; + + } else { + // even when no task is assigned, we must poll to get a task. requiresPoll = true; - + } + maybeCommit(); } else { - // even when no task is assigned, we must poll to get a task. requiresPoll = true; } - maybeCommit(); + maybeUpdateStandbyTasks(); maybeClean(); @@ -401,12 +431,10 @@ public class StreamThread extends Thread { private void maybePunctuate(StreamTask task) { try { - long now = time.milliseconds(); - // check whether we should punctuate based on the task's partition group timestamp; // which are essentially based on record timestamp. if (task.maybePunctuate()) - sensors.punctuateTimeSensor.record(time.milliseconds() - now); + sensors.punctuateTimeSensor.record(computeLatency()); } catch (KafkaException e) { log.error("Failed to punctuate active task #" + task.id() + " in thread [" + this.getName() + "]: ", e); @@ -414,35 +442,50 @@ public class StreamThread extends Thread { } } + /** + * Commit all tasks owned by this thread if specified interval time has elapsed + */ protected void maybeCommit() { long now = time.milliseconds(); - if (commitTimeMs >= 0 && lastCommit + commitTimeMs < now) { + if (commitTimeMs >= 0 && lastCommitMs + commitTimeMs < now) { log.trace("Committing processor instances because the commit interval has elapsed."); commitAll(); - lastCommit = now; + lastCommitMs = now; processStandbyRecords = true; } } + /** + * Cleanup any states of the tasks that have been removed from this thread + */ + protected void maybeClean() { + long now = time.milliseconds(); + + if (now > lastCleanMs + cleanTimeMs) { + stateDirectory.cleanRemovedTasks(); + lastCleanMs = now; + } + } + /** * Commit the states of all its tasks */ private void commitAll() { for (StreamTask task : activeTasks.values()) { - commitOne(task, time.milliseconds()); + commitOne(task); } for (StandbyTask task : standbyTasks.values()) { - commitOne(task, time.milliseconds()); + commitOne(task); } } /** * Commit the state of a task */ - private void commitOne(AbstractTask task, long now) { + private void commitOne(AbstractTask task) { try { task.commit(); } catch (CommitFailedException e) { @@ -454,19 +497,7 @@ public class StreamThread extends Thread { throw e; } - sensors.commitTimeSensor.record(time.milliseconds() - now); - } - - /** - * Cleanup any states of the tasks that have been removed from this thread - */ - protected void maybeClean() { - long now = time.milliseconds(); - - if (now > lastClean + cleanTimeMs) { - stateDirectory.cleanRemovedTasks(); - lastClean = now; - } + sensors.commitTimeSensor.record(computeLatency()); } /** @@ -682,6 +713,7 @@ public class StreamThread extends Thread { private class StreamsMetricsImpl implements StreamsMetrics { final Metrics metrics; final String metricGrpName; + final String sensorNamePrefix; final Map metricTags; final Sensor commitTimeSensor; @@ -692,42 +724,41 @@ public class StreamThread extends Thread { final Sensor taskDestructionSensor; public StreamsMetricsImpl(Metrics metrics) { - this.metrics = metrics; this.metricGrpName = "stream-metrics"; - this.metricTags = new LinkedHashMap<>(); - this.metricTags.put("client-id", clientId + "-" + getName()); + this.sensorNamePrefix = "thread." + threadClientId; + this.metricTags = Collections.singletonMap("client-id", threadClientId); - this.commitTimeSensor = metrics.sensor("commit-time"); + this.commitTimeSensor = metrics.sensor(sensorNamePrefix + ".commit-time"); this.commitTimeSensor.add(metrics.metricName("commit-time-avg", metricGrpName, "The average commit time in ms", metricTags), new Avg()); this.commitTimeSensor.add(metrics.metricName("commit-time-max", metricGrpName, "The maximum commit time in ms", metricTags), new Max()); this.commitTimeSensor.add(metrics.metricName("commit-calls-rate", metricGrpName, "The average per-second number of commit calls", metricTags), new Rate(new Count())); - this.pollTimeSensor = metrics.sensor("poll-time"); + this.pollTimeSensor = metrics.sensor(sensorNamePrefix + ".poll-time"); this.pollTimeSensor.add(metrics.metricName("poll-time-avg", metricGrpName, "The average poll time in ms", metricTags), new Avg()); this.pollTimeSensor.add(metrics.metricName("poll-time-max", metricGrpName, "The maximum poll time in ms", metricTags), new Max()); this.pollTimeSensor.add(metrics.metricName("poll-calls-rate", metricGrpName, "The average per-second number of record-poll calls", metricTags), new Rate(new Count())); - this.processTimeSensor = metrics.sensor("process-time"); + this.processTimeSensor = metrics.sensor(sensorNamePrefix + ".process-time"); this.processTimeSensor.add(metrics.metricName("process-time-avg-ms", metricGrpName, "The average process time in ms", metricTags), new Avg()); this.processTimeSensor.add(metrics.metricName("process-time-max-ms", metricGrpName, "The maximum process time in ms", metricTags), new Max()); this.processTimeSensor.add(metrics.metricName("process-calls-rate", metricGrpName, "The average per-second number of process calls", metricTags), new Rate(new Count())); - this.punctuateTimeSensor = metrics.sensor("punctuate-time"); + this.punctuateTimeSensor = metrics.sensor(sensorNamePrefix + ".punctuate-time"); this.punctuateTimeSensor.add(metrics.metricName("punctuate-time-avg", metricGrpName, "The average punctuate time in ms", metricTags), new Avg()); this.punctuateTimeSensor.add(metrics.metricName("punctuate-time-max", metricGrpName, "The maximum punctuate time in ms", metricTags), new Max()); this.punctuateTimeSensor.add(metrics.metricName("punctuate-calls-rate", metricGrpName, "The average per-second number of punctuate calls", metricTags), new Rate(new Count())); - this.taskCreationSensor = metrics.sensor("task-creation"); + this.taskCreationSensor = metrics.sensor(sensorNamePrefix + ".task-creation"); this.taskCreationSensor.add(metrics.metricName("task-creation-rate", metricGrpName, "The average per-second number of newly created tasks", metricTags), new Rate(new Count())); - this.taskDestructionSensor = metrics.sensor("task-destruction"); + this.taskDestructionSensor = metrics.sensor(sensorNamePrefix + ".task-destruction"); this.taskDestructionSensor.add(metrics.metricName("task-destruction-rate", metricGrpName, "The average per-second number of destructed tasks", metricTags), new Rate(new Count())); } @Override public void recordLatency(Sensor sensor, long startNs, long endNs) { - sensor.record((endNs - startNs) / 1000000, endNs); + sensor.record(endNs - startNs, timerStartedMs); } /** @@ -746,11 +777,11 @@ public class StreamThread extends Thread { String metricGroupName = "stream-" + scopeName + "-metrics"; // first add the global operation metrics if not yet, with the global tags only - Sensor parent = metrics.sensor(scopeName + "-" + operationName); + Sensor parent = metrics.sensor(sensorNamePrefix + "." + scopeName + "-" + operationName); addLatencyMetrics(metricGroupName, parent, "all", operationName, this.metricTags); // add the store operation metrics with additional tags - Sensor sensor = metrics.sensor(scopeName + "-" + entityName + "-" + operationName, parent); + Sensor sensor = metrics.sensor(sensorNamePrefix + "." + scopeName + "-" + entityName + "-" + operationName, parent); addLatencyMetrics(metricGroupName, sensor, entityName, operationName, tagMap); return sensor; diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java index 93bb5719287..ea9584b4e20 100644 --- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java +++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java @@ -83,11 +83,6 @@ public class SimpleBenchmark { final File rocksdbDir = new File(stateDir, "rocksdb-test"); rocksdbDir.mkdir(); - System.out.println("SimpleBenchmark instance started"); - System.out.println("kafka=" + kafka); - System.out.println("zookeeper=" + zookeeper); - System.out.println("stateDir=" + stateDir); - SimpleBenchmark benchmark = new SimpleBenchmark(stateDir, kafka, zookeeper); // producer performance @@ -232,6 +227,7 @@ public class SimpleBenchmark { props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); KafkaConsumer consumer = new KafkaConsumer<>(props); @@ -250,7 +246,10 @@ public class SimpleBenchmark { break; } else { for (ConsumerRecord record : records) { - key = record.key(); + Long recKey = record.key(); + + if (key == null || key < recKey) + key = recKey; } } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 3a90ce371a6..1a66d321147 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -306,14 +306,14 @@ public class StreamThreadTest { List revokedPartitions; List assignedPartitions; - Map prevTasks; + Map prevTasks; // // Assign t1p1 and t1p2. This should create task1 & task2 // revokedPartitions = Collections.emptyList(); assignedPartitions = Arrays.asList(t1p1, t1p2); - prevTasks = new HashMap(thread.tasks()); + prevTasks = new HashMap<>(thread.tasks()); rebalanceListener.onPartitionsRevoked(revokedPartitions); rebalanceListener.onPartitionsAssigned(assignedPartitions); @@ -345,7 +345,7 @@ public class StreamThreadTest { // revokedPartitions = assignedPartitions; assignedPartitions = Collections.emptyList(); - prevTasks = new HashMap(thread.tasks()); + prevTasks = new HashMap<>(thread.tasks()); rebalanceListener.onPartitionsRevoked(revokedPartitions); rebalanceListener.onPartitionsAssigned(assignedPartitions); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java index 0416e405948..1baedbb5146 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java @@ -218,8 +218,7 @@ public class StreamThreadStateStoreProviderTest { } @Override - public void recordLatency(final Sensor sensor, final long startNs, - final long endNs) { + public void recordLatency(final Sensor sensor, final long startNs, final long endNs) { } }