KAFKA-13945: add bytes/records consumed and produced metrics (#12235)

Implementation of KIP-846: Source/sink node metrics for Consumed/Produced throughput in Streams

Adds the following INFO topic-level metrics for the total bytes/records consumed and produced:

    bytes-consumed-total
    records-consumed-total
    bytes-produced-total
    records-produced-total

Reviewers: Kvicii <Karonazaba@gmail.com>, Guozhang Wang <guozhang@apache.org>, Bruno Cadonna <cadonna@apache.org>
This commit is contained in:
A. Sophie Blee-Goldman 2022-06-07 07:02:17 -07:00 committed by GitHub
parent 603502bf5f
commit a6c5a74fdb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
33 changed files with 1095 additions and 259 deletions

View File

@ -213,7 +213,7 @@
<!-- Streams tests --> <!-- Streams tests -->
<suppress checks="ClassFanOutComplexity" <suppress checks="ClassFanOutComplexity"
files="(StreamsPartitionAssignorTest|StreamThreadTest|StreamTaskTest|TaskManagerTest|TopologyTestDriverTest).java"/> files="(RecordCollectorTest|StreamsPartitionAssignorTest|StreamThreadTest|StreamTaskTest|TaskManagerTest|TopologyTestDriverTest).java"/>
<suppress checks="MethodLength" <suppress checks="MethodLength"
files="(EosIntegrationTest|EosV2UpgradeIntegrationTest|KStreamKStreamJoinTest|RocksDBWindowStoreTest).java"/> files="(EosIntegrationTest|EosV2UpgradeIntegrationTest|KStreamKStreamJoinTest|RocksDBWindowStoreTest).java"/>

View File

@ -2308,7 +2308,7 @@ $ bin/kafka-acls.sh \
<h4 class="anchor-heading"><a id="kafka_streams_monitoring" class="anchor-link"></a><a href="#kafka_streams_monitoring">Streams Monitoring</a></h4> <h4 class="anchor-heading"><a id="kafka_streams_monitoring" class="anchor-link"></a><a href="#kafka_streams_monitoring">Streams Monitoring</a></h4>
A Kafka Streams instance contains all the producer and consumer metrics as well as additional metrics specific to Streams. A Kafka Streams instance contains all the producer and consumer metrics as well as additional metrics specific to Streams.
By default Kafka Streams has metrics with three recording levels: <code>info</code>, <code>debug</code>, and <code>trace</code>. The metrics have three recording levels: <code>info</code>, <code>debug</code>, and <code>trace</code>.
<p> <p>
Note that the metrics have a 4-layer hierarchy. At the top level there are client-level metrics for each started Note that the metrics have a 4-layer hierarchy. At the top level there are client-level metrics for each started
@ -2617,7 +2617,7 @@ active-process-ratio metrics which have a recording level of <code>info</code>:
</table> </table>
<h5 class="anchor-heading"><a id="kafka_streams_store_monitoring" class="anchor-link"></a><a href="#kafka_streams_store_monitoring">State Store Metrics</a></h5> <h5 class="anchor-heading"><a id="kafka_streams_store_monitoring" class="anchor-link"></a><a href="#kafka_streams_store_monitoring">State Store Metrics</a></h5>
All of the following metrics have a recording level of <code>debug</code>, except for the record-e2e-latency-* metrics which have a recording level <code>trace></code>. All of the following metrics have a recording level of <code>debug</code>, except for the record-e2e-latency-* metrics which have a recording level <code>trace</code>.
Note that the <code>store-scope</code> value is specified in <code>StoreSupplier#metricsScope()</code> for user's customized state stores; Note that the <code>store-scope</code> value is specified in <code>StoreSupplier#metricsScope()</code> for user's customized state stores;
for built-in state stores, currently we have: for built-in state stores, currently we have:
<ul> <ul>

View File

@ -260,7 +260,8 @@ class ActiveTaskCreator {
taskId, taskId,
streamsProducer, streamsProducer,
applicationConfig.defaultProductionExceptionHandler(), applicationConfig.defaultProductionExceptionHandler(),
streamsMetrics streamsMetrics,
topology
); );
final StreamTask task = new StreamTask( final StreamTask task = new StreamTask(

View File

@ -21,12 +21,17 @@ import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Metric; import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName; import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.TaskId;
@ -166,4 +171,46 @@ public class ClientUtils {
final int index = fullThreadName.indexOf("StreamThread-"); final int index = fullThreadName.indexOf("StreamThread-");
return fullThreadName.substring(index); return fullThreadName.substring(index);
} }
public static long producerRecordSizeInBytes(final ProducerRecord<byte[], byte[]> record) {
return recordSizeInBytes(
record.key().length,
record.value() == null ? 0 : record.value().length,
record.topic(),
record.headers()
);
}
public static long consumerRecordSizeInBytes(final ConsumerRecord<byte[], byte[]> record) {
return recordSizeInBytes(
record.serializedKeySize(),
record.serializedValueSize(),
record.topic(),
record.headers()
);
}
public static long recordSizeInBytes(final long keyBytes,
final long valueBytes,
final String topic,
final Headers headers) {
long headerSizeInBytes = 0L;
if (headers != null) {
for (final Header header : headers.toArray()) {
headerSizeInBytes += Utils.utf8(header.key()).length;
if (header.value() != null) {
headerSizeInBytes += header.value().length;
}
}
}
return keyBytes +
valueBytes +
8L + // timestamp
8L + // offset
Utils.utf8(topic).length +
4L + // partition
headerSizeInBytes;
}
} }

View File

@ -268,7 +268,7 @@ public class PartitionGroup {
// get the buffer size of queue before poll // get the buffer size of queue before poll
final long oldBufferSize = queue.getTotalBytesBuffered(); final long oldBufferSize = queue.getTotalBytesBuffered();
// get the first record from this queue. // get the first record from this queue.
record = queue.poll(); record = queue.poll(wallClockTime);
// After polling, the buffer size would have reduced. // After polling, the buffer size would have reduced.
final long newBufferSize = queue.getTotalBytesBuffered(); final long newBufferSize = queue.getTotalBytesBuffered();
@ -392,6 +392,12 @@ public class PartitionGroup {
streamTime = RecordQueue.UNKNOWN; streamTime = RecordQueue.UNKNOWN;
} }
void close() {
for (final RecordQueue queue : partitionQueues.values()) {
queue.close();
}
}
// Below methods are for only testing. // Below methods are for only testing.
boolean allPartitionsBufferedLocally() { boolean allPartitionsBufferedLocally() {

View File

@ -147,8 +147,9 @@ public class ProcessorContextImpl extends AbstractProcessorContext<Object, Objec
changelogPartition.partition(), changelogPartition.partition(),
timestamp, timestamp,
BYTES_KEY_SERIALIZER, BYTES_KEY_SERIALIZER,
BYTEARRAY_VALUE_SERIALIZER BYTEARRAY_VALUE_SERIALIZER,
); null,
null);
} }
/** /**

View File

@ -33,7 +33,9 @@ public interface RecordCollector {
final Integer partition, final Integer partition,
final Long timestamp, final Long timestamp,
final Serializer<K> keySerializer, final Serializer<K> keySerializer,
final Serializer<V> valueSerializer); final Serializer<V> valueSerializer,
final String processorNodeId,
final InternalProcessorContext<Void, Void> context);
<K, V> void send(final String topic, <K, V> void send(final String topic,
final K key, final K key,
@ -42,6 +44,8 @@ public interface RecordCollector {
final Long timestamp, final Long timestamp,
final Serializer<K> keySerializer, final Serializer<K> keySerializer,
final Serializer<V> valueSerializer, final Serializer<V> valueSerializer,
final String processorNodeId,
final InternalProcessorContext<Void, Void> context,
final StreamPartitioner<? super K, ? super V> partitioner); final StreamPartitioner<? super K, ? super V> partitioner);
/** /**

View File

@ -46,6 +46,8 @@ import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics; import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.processor.internals.metrics.TopicMetrics;
import org.slf4j.Logger; import org.slf4j.Logger;
import java.util.Collections; import java.util.Collections;
@ -54,6 +56,8 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import static org.apache.kafka.streams.processor.internals.ClientUtils.producerRecordSizeInBytes;
public class RecordCollectorImpl implements RecordCollector { public class RecordCollectorImpl implements RecordCollector {
private final static String SEND_EXCEPTION_MESSAGE = "Error encountered sending record to topic %s for task %s due to:%n%s"; private final static String SEND_EXCEPTION_MESSAGE = "Error encountered sending record to topic %s for task %s due to:%n%s";
@ -61,10 +65,13 @@ public class RecordCollectorImpl implements RecordCollector {
private final TaskId taskId; private final TaskId taskId;
private final StreamsProducer streamsProducer; private final StreamsProducer streamsProducer;
private final ProductionExceptionHandler productionExceptionHandler; private final ProductionExceptionHandler productionExceptionHandler;
private final Sensor droppedRecordsSensor;
private final boolean eosEnabled; private final boolean eosEnabled;
private final Map<TopicPartition, Long> offsets; private final Map<TopicPartition, Long> offsets;
private final StreamsMetricsImpl streamsMetrics;
private final Sensor droppedRecordsSensor;
private final Map<String, Map<String, Sensor>> sinkNodeToProducedSensorByTopic = new HashMap<>();
private final AtomicReference<KafkaException> sendException = new AtomicReference<>(null); private final AtomicReference<KafkaException> sendException = new AtomicReference<>(null);
/** /**
@ -74,15 +81,29 @@ public class RecordCollectorImpl implements RecordCollector {
final TaskId taskId, final TaskId taskId,
final StreamsProducer streamsProducer, final StreamsProducer streamsProducer,
final ProductionExceptionHandler productionExceptionHandler, final ProductionExceptionHandler productionExceptionHandler,
final StreamsMetricsImpl streamsMetrics) { final StreamsMetricsImpl streamsMetrics,
final ProcessorTopology topology) {
this.log = logContext.logger(getClass()); this.log = logContext.logger(getClass());
this.taskId = taskId; this.taskId = taskId;
this.streamsProducer = streamsProducer; this.streamsProducer = streamsProducer;
this.productionExceptionHandler = productionExceptionHandler; this.productionExceptionHandler = productionExceptionHandler;
this.eosEnabled = streamsProducer.eosEnabled(); this.eosEnabled = streamsProducer.eosEnabled();
this.streamsMetrics = streamsMetrics;
final String threadId = Thread.currentThread().getName(); final String threadId = Thread.currentThread().getName();
this.droppedRecordsSensor = TaskMetrics.droppedRecordsSensor(threadId, taskId.toString(), streamsMetrics); this.droppedRecordsSensor = TaskMetrics.droppedRecordsSensor(threadId, taskId.toString(), streamsMetrics);
for (final String topic : topology.sinkTopics()) {
final String processorNodeId = topology.sink(topic).name();
sinkNodeToProducedSensorByTopic.computeIfAbsent(processorNodeId, t -> new HashMap<>()).put(
topic,
TopicMetrics.producedSensor(
threadId,
taskId.toString(),
processorNodeId,
topic,
streamsMetrics
));
}
this.offsets = new HashMap<>(); this.offsets = new HashMap<>();
} }
@ -106,6 +127,8 @@ public class RecordCollectorImpl implements RecordCollector {
final Long timestamp, final Long timestamp,
final Serializer<K> keySerializer, final Serializer<K> keySerializer,
final Serializer<V> valueSerializer, final Serializer<V> valueSerializer,
final String processorNodeId,
final InternalProcessorContext<Void, Void> context,
final StreamPartitioner<? super K, ? super V> partitioner) { final StreamPartitioner<? super K, ? super V> partitioner) {
final Integer partition; final Integer partition;
@ -122,7 +145,7 @@ public class RecordCollectorImpl implements RecordCollector {
// here we cannot drop the message on the floor even if it is a transient timeout exception, // here we cannot drop the message on the floor even if it is a transient timeout exception,
// so we treat everything the same as a fatal exception // so we treat everything the same as a fatal exception
throw new StreamsException("Could not determine the number of partitions for topic '" + topic + throw new StreamsException("Could not determine the number of partitions for topic '" + topic +
"' for task " + taskId + " due to " + fatal.toString(), "' for task " + taskId + " due to " + fatal,
fatal fatal
); );
} }
@ -136,7 +159,7 @@ public class RecordCollectorImpl implements RecordCollector {
partition = null; partition = null;
} }
send(topic, key, value, headers, partition, timestamp, keySerializer, valueSerializer); send(topic, key, value, headers, partition, timestamp, keySerializer, valueSerializer, processorNodeId, context);
} }
@Override @Override
@ -147,7 +170,9 @@ public class RecordCollectorImpl implements RecordCollector {
final Integer partition, final Integer partition,
final Long timestamp, final Long timestamp,
final Serializer<K> keySerializer, final Serializer<K> keySerializer,
final Serializer<V> valueSerializer) { final Serializer<V> valueSerializer,
final String processorNodeId,
final InternalProcessorContext<Void, Void> context) {
checkForException(); checkForException();
final byte[] keyBytes; final byte[] keyBytes;
@ -173,7 +198,7 @@ public class RecordCollectorImpl implements RecordCollector {
valueClass), valueClass),
exception); exception);
} catch (final RuntimeException exception) { } catch (final RuntimeException exception) {
final String errorMessage = String.format(SEND_EXCEPTION_MESSAGE, topic, taskId, exception.toString()); final String errorMessage = String.format(SEND_EXCEPTION_MESSAGE, topic, taskId, exception);
throw new StreamsException(errorMessage, exception); throw new StreamsException(errorMessage, exception);
} }
@ -192,6 +217,28 @@ public class RecordCollectorImpl implements RecordCollector {
} else { } else {
log.warn("Received offset={} in produce response for {}", metadata.offset(), tp); log.warn("Received offset={} in produce response for {}", metadata.offset(), tp);
} }
if (!topic.endsWith("-changelog")) {
// we may not have created a sensor yet if the node uses dynamic topic routing
final Map<String, Sensor> producedSensorByTopic = sinkNodeToProducedSensorByTopic.get(processorNodeId);
if (producedSensorByTopic == null) {
log.error("Unable to records bytes produced to topic {} by sink node {} as the node is not recognized.\n"
+ "Known sink nodes are {}.", topic, processorNodeId, sinkNodeToProducedSensorByTopic.keySet());
} else {
final Sensor topicProducedSensor = producedSensorByTopic.computeIfAbsent(
topic,
t -> TopicMetrics.producedSensor(
Thread.currentThread().getName(),
taskId.toString(),
processorNodeId,
topic,
context.metrics()
)
);
final long bytesProduced = producerRecordSizeInBytes(serializedRecord);
topicProducedSensor.record(bytesProduced, context.currentSystemTimeMs());
}
}
} else { } else {
recordSendError(topic, exception, serializedRecord); recordSendError(topic, exception, serializedRecord);
@ -267,6 +314,8 @@ public class RecordCollectorImpl implements RecordCollector {
public void closeClean() { public void closeClean() {
log.info("Closing record collector clean"); log.info("Closing record collector clean");
removeAllProducedSensors();
// No need to abort transaction during a clean close: either we have successfully committed the ongoing // No need to abort transaction during a clean close: either we have successfully committed the ongoing
// transaction during handleRevocation and thus there is no transaction in flight, or else none of the revoked // transaction during handleRevocation and thus there is no transaction in flight, or else none of the revoked
// tasks had any data in the current transaction and therefore there is no need to commit or abort it. // tasks had any data in the current transaction and therefore there is no need to commit or abort it.
@ -290,6 +339,14 @@ public class RecordCollectorImpl implements RecordCollector {
checkForException(); checkForException();
} }
private void removeAllProducedSensors() {
for (final Map<String, Sensor> nodeMap : sinkNodeToProducedSensorByTopic.values()) {
for (final Sensor sensor : nodeMap.values()) {
streamsMetrics.removeSensor(sensor);
}
}
}
@Override @Override
public Map<TopicPartition, Long> offsets() { public Map<TopicPartition, Long> offsets() {
return Collections.unmodifiableMap(new HashMap<>(offsets)); return Collections.unmodifiableMap(new HashMap<>(offsets));

View File

@ -18,19 +18,21 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler; import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics; import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.TimestampExtractor; import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.internals.metrics.TopicMetrics;
import org.slf4j.Logger; import org.slf4j.Logger;
import java.util.ArrayDeque; import java.util.ArrayDeque;
import static org.apache.kafka.streams.processor.internals.ClientUtils.consumerRecordSizeInBytes;
/** /**
* RecordQueue is a FIFO queue of {@link StampedRecord} (ConsumerRecord + timestamp). It also keeps track of the * RecordQueue is a FIFO queue of {@link StampedRecord} (ConsumerRecord + timestamp). It also keeps track of the
* partition timestamp defined as the largest timestamp seen on the partition so far; this is passed to the * partition timestamp defined as the largest timestamp seen on the partition so far; this is passed to the
@ -52,6 +54,7 @@ public class RecordQueue {
private long partitionTime = UNKNOWN; private long partitionTime = UNKNOWN;
private final Sensor droppedRecordsSensor; private final Sensor droppedRecordsSensor;
private final Sensor consumedSensor;
private long totalBytesBuffered; private long totalBytesBuffered;
private long headRecordSizeInBytes; private long headRecordSizeInBytes;
@ -66,11 +69,20 @@ public class RecordQueue {
this.fifoQueue = new ArrayDeque<>(); this.fifoQueue = new ArrayDeque<>();
this.timestampExtractor = timestampExtractor; this.timestampExtractor = timestampExtractor;
this.processorContext = processorContext; this.processorContext = processorContext;
final String threadName = Thread.currentThread().getName();
droppedRecordsSensor = TaskMetrics.droppedRecordsSensor( droppedRecordsSensor = TaskMetrics.droppedRecordsSensor(
Thread.currentThread().getName(), threadName,
processorContext.taskId().toString(), processorContext.taskId().toString(),
processorContext.metrics() processorContext.metrics()
); );
consumedSensor = TopicMetrics.consumedSensor(
threadName,
processorContext.taskId().toString(),
source.name(),
partition.topic(),
processorContext.metrics()
);
recordDeserializer = new RecordDeserializer( recordDeserializer = new RecordDeserializer(
source, source,
deserializationExceptionHandler, deserializationExceptionHandler,
@ -104,25 +116,6 @@ public class RecordQueue {
return partition; return partition;
} }
private long sizeInBytes(final ConsumerRecord<byte[], byte[]> record) {
long headerSizeInBytes = 0L;
for (final Header header: record.headers().toArray()) {
headerSizeInBytes += Utils.utf8(header.key()).length;
if (header.value() != null) {
headerSizeInBytes += header.value().length;
}
}
return record.serializedKeySize() +
record.serializedValueSize() +
8L + // timestamp
8L + // offset
Utils.utf8(record.topic()).length +
4L + // partition
headerSizeInBytes;
}
/** /**
* Add a batch of {@link ConsumerRecord} into the queue * Add a batch of {@link ConsumerRecord} into the queue
* *
@ -132,7 +125,7 @@ public class RecordQueue {
int addRawRecords(final Iterable<ConsumerRecord<byte[], byte[]>> rawRecords) { int addRawRecords(final Iterable<ConsumerRecord<byte[], byte[]>> rawRecords) {
for (final ConsumerRecord<byte[], byte[]> rawRecord : rawRecords) { for (final ConsumerRecord<byte[], byte[]> rawRecord : rawRecords) {
fifoQueue.addLast(rawRecord); fifoQueue.addLast(rawRecord);
this.totalBytesBuffered += sizeInBytes(rawRecord); this.totalBytesBuffered += consumerRecordSizeInBytes(rawRecord);
} }
updateHead(); updateHead();
@ -145,8 +138,11 @@ public class RecordQueue {
* *
* @return StampedRecord * @return StampedRecord
*/ */
public StampedRecord poll() { public StampedRecord poll(final long wallClockTime) {
final StampedRecord recordToReturn = headRecord; final StampedRecord recordToReturn = headRecord;
consumedSensor.record(headRecordSizeInBytes, wallClockTime);
totalBytesBuffered -= headRecordSizeInBytes; totalBytesBuffered -= headRecordSizeInBytes;
headRecord = null; headRecord = null;
headRecordSizeInBytes = 0L; headRecordSizeInBytes = 0L;
@ -199,6 +195,10 @@ public class RecordQueue {
partitionTime = UNKNOWN; partitionTime = UNKNOWN;
} }
public void close() {
processorContext.metrics().removeSensor(consumedSensor);
}
private void updateHead() { private void updateHead() {
ConsumerRecord<byte[], byte[]> lastCorruptedRecord = null; ConsumerRecord<byte[], byte[]> lastCorruptedRecord = null;
@ -235,7 +235,7 @@ public class RecordQueue {
continue; continue;
} }
headRecord = new StampedRecord(deserialized, timestamp); headRecord = new StampedRecord(deserialized, timestamp);
headRecordSizeInBytes = sizeInBytes(raw); headRecordSizeInBytes = consumerRecordSizeInBytes(raw);
} }
// if all records in the FIFO queue are corrupted, make the last one the headRecord // if all records in the FIFO queue are corrupted, make the last one the headRecord

View File

@ -82,7 +82,17 @@ public class SinkNode<KIn, VIn> extends ProcessorNode<KIn, VIn, Void, Void> {
final String topic = topicExtractor.extract(key, value, contextForExtraction); final String topic = topicExtractor.extract(key, value, contextForExtraction);
collector.send(topic, key, value, record.headers(), timestamp, keySerializer, valSerializer, partitioner); collector.send(
topic,
key,
value,
record.headers(),
timestamp,
keySerializer,
valSerializer,
name(),
context,
partitioner);
} }
/** /**

View File

@ -184,7 +184,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
createPartitionQueues(), createPartitionQueues(),
mainConsumer::currentLag, mainConsumer::currentLag,
TaskMetrics.recordLatenessSensor(threadId, taskId, streamsMetrics), TaskMetrics.recordLatenessSensor(threadId, taskId, streamsMetrics),
TaskMetrics.totalBytesSensor(threadId, taskId, streamsMetrics), TaskMetrics.totalInputBufferBytesSensor(threadId, taskId, streamsMetrics),
enforcedProcessingSensor, enforcedProcessingSensor,
maxTaskIdleMs maxTaskIdleMs
); );
@ -553,6 +553,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
switch (state()) { switch (state()) {
case SUSPENDED: case SUSPENDED:
stateMgr.recycle(); stateMgr.recycle();
partitionGroup.close();
recordCollector.closeClean(); recordCollector.closeClean();
break; break;
@ -614,6 +615,13 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
private void close(final boolean clean) { private void close(final boolean clean) {
switch (state()) { switch (state()) {
case SUSPENDED: case SUSPENDED:
TaskManager.executeAndMaybeSwallow(
clean,
partitionGroup::close,
"partition group close",
log
);
// first close state manager (which is idempotent) then close the record collector // first close state manager (which is idempotent) then close the record collector
// if the latter throws and we re-close dirty which would close the state manager again. // if the latter throws and we re-close dirty which would close the state manager again.
TaskManager.executeAndMaybeSwallow( TaskManager.executeAndMaybeSwallow(
@ -653,7 +661,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
} }
record = null; record = null;
partitionGroup.clear();
closeTaskSensor.record(); closeTaskSensor.record();
transitionTo(State.CLOSED); transitionTo(State.CLOSED);

View File

@ -95,6 +95,7 @@ public class StreamsMetricsImpl implements StreamsMetrics {
private final Map<String, Deque<String>> threadLevelSensors = new HashMap<>(); private final Map<String, Deque<String>> threadLevelSensors = new HashMap<>();
private final Map<String, Deque<String>> taskLevelSensors = new HashMap<>(); private final Map<String, Deque<String>> taskLevelSensors = new HashMap<>();
private final Map<String, Deque<String>> nodeLevelSensors = new HashMap<>(); private final Map<String, Deque<String>> nodeLevelSensors = new HashMap<>();
private final Map<String, Deque<String>> topicLevelSensors = new HashMap<>();
private final Map<String, Deque<String>> cacheLevelSensors = new HashMap<>(); private final Map<String, Deque<String>> cacheLevelSensors = new HashMap<>();
private final ConcurrentMap<String, Deque<String>> storeLevelSensors = new ConcurrentHashMap<>(); private final ConcurrentMap<String, Deque<String>> storeLevelSensors = new ConcurrentHashMap<>();
private final ConcurrentMap<String, Deque<MetricName>> storeLevelMetrics = new ConcurrentHashMap<>(); private final ConcurrentMap<String, Deque<MetricName>> storeLevelMetrics = new ConcurrentHashMap<>();
@ -105,6 +106,7 @@ public class StreamsMetricsImpl implements StreamsMetrics {
private static final String SENSOR_NAME_DELIMITER = ".s."; private static final String SENSOR_NAME_DELIMITER = ".s.";
private static final String SENSOR_TASK_LABEL = "task"; private static final String SENSOR_TASK_LABEL = "task";
private static final String SENSOR_NODE_LABEL = "node"; private static final String SENSOR_NODE_LABEL = "node";
private static final String SENSOR_TOPIC_LABEL = "topic";
private static final String SENSOR_CACHE_LABEL = "cache"; private static final String SENSOR_CACHE_LABEL = "cache";
private static final String SENSOR_STORE_LABEL = "store"; private static final String SENSOR_STORE_LABEL = "store";
private static final String SENSOR_ENTITY_LABEL = "entity"; private static final String SENSOR_ENTITY_LABEL = "entity";
@ -115,6 +117,7 @@ public class StreamsMetricsImpl implements StreamsMetrics {
public static final String THREAD_ID_TAG = "thread-id"; public static final String THREAD_ID_TAG = "thread-id";
public static final String TASK_ID_TAG = "task-id"; public static final String TASK_ID_TAG = "task-id";
public static final String PROCESSOR_NODE_ID_TAG = "processor-node-id"; public static final String PROCESSOR_NODE_ID_TAG = "processor-node-id";
public static final String TOPIC_NAME_TAG = "topic-name";
public static final String STORE_ID_TAG = "state-id"; public static final String STORE_ID_TAG = "state-id";
public static final String RECORD_CACHE_ID_TAG = "record-cache-id"; public static final String RECORD_CACHE_ID_TAG = "record-cache-id";
@ -136,6 +139,7 @@ public class StreamsMetricsImpl implements StreamsMetrics {
public static final String THREAD_LEVEL_GROUP = GROUP_PREFIX + "thread" + GROUP_SUFFIX; public static final String THREAD_LEVEL_GROUP = GROUP_PREFIX + "thread" + GROUP_SUFFIX;
public static final String TASK_LEVEL_GROUP = GROUP_PREFIX + "task" + GROUP_SUFFIX; public static final String TASK_LEVEL_GROUP = GROUP_PREFIX + "task" + GROUP_SUFFIX;
public static final String PROCESSOR_NODE_LEVEL_GROUP = GROUP_PREFIX + "processor-node" + GROUP_SUFFIX; public static final String PROCESSOR_NODE_LEVEL_GROUP = GROUP_PREFIX + "processor-node" + GROUP_SUFFIX;
public static final String TOPIC_LEVEL_GROUP = GROUP_PREFIX + "topic" + GROUP_SUFFIX;
public static final String STATE_STORE_LEVEL_GROUP = GROUP_PREFIX + "state" + GROUP_SUFFIX; public static final String STATE_STORE_LEVEL_GROUP = GROUP_PREFIX + "state" + GROUP_SUFFIX;
public static final String CACHE_LEVEL_GROUP = GROUP_PREFIX + "record-cache" + GROUP_SUFFIX; public static final String CACHE_LEVEL_GROUP = GROUP_PREFIX + "record-cache" + GROUP_SUFFIX;
@ -325,6 +329,15 @@ public class StreamsMetricsImpl implements StreamsMetrics {
return tagMap; return tagMap;
} }
public Map<String, String> topicLevelTagMap(final String threadId,
final String taskName,
final String processorNodeName,
final String topicName) {
final Map<String, String> tagMap = nodeLevelTagMap(threadId, taskName, processorNodeName);
tagMap.put(TOPIC_NAME_TAG, topicName);
return tagMap;
}
public Map<String, String> storeLevelTagMap(final String taskName, public Map<String, String> storeLevelTagMap(final String taskName,
final String storeType, final String storeType,
final String storeName) { final String storeName) {
@ -388,6 +401,40 @@ public class StreamsMetricsImpl implements StreamsMetrics {
+ SENSOR_PREFIX_DELIMITER + SENSOR_NODE_LABEL + SENSOR_PREFIX_DELIMITER + processorNodeName; + SENSOR_PREFIX_DELIMITER + SENSOR_NODE_LABEL + SENSOR_PREFIX_DELIMITER + processorNodeName;
} }
public Sensor topicLevelSensor(final String threadId,
final String taskId,
final String processorNodeName,
final String topicName,
final String sensorName,
final Sensor.RecordingLevel recordingLevel,
final Sensor... parents) {
final String key = topicSensorPrefix(threadId, taskId, processorNodeName, topicName);
synchronized (topicLevelSensors) {
return getSensors(topicLevelSensors, sensorName, key, recordingLevel, parents);
}
}
public final void removeAllTopicLevelSensors(final String threadId,
final String taskId,
final String processorNodeName,
final String topicName) {
final String key = topicSensorPrefix(threadId, taskId, processorNodeName, topicName);
synchronized (topicLevelSensors) {
final Deque<String> sensors = topicLevelSensors.remove(key);
while (sensors != null && !sensors.isEmpty()) {
metrics.removeSensor(sensors.pop());
}
}
}
private String topicSensorPrefix(final String threadId,
final String taskId,
final String processorNodeName,
final String topicName) {
return nodeSensorPrefix(threadId, taskId, processorNodeName)
+ SENSOR_PREFIX_DELIMITER + SENSOR_TOPIC_LABEL + SENSOR_PREFIX_DELIMITER + topicName;
}
public Sensor cacheLevelSensor(final String threadId, public Sensor cacheLevelSensor(final String threadId,
final String taskName, final String taskName,
final String storeName, final String storeName,
@ -795,6 +842,23 @@ public class StreamsMetricsImpl implements StreamsMetrics {
); );
} }
public static void addTotalCountAndSumMetricsToSensor(final Sensor sensor,
final String group,
final Map<String, String> tags,
final String countMetricNamePrefix,
final String sumMetricNamePrefix,
final String descriptionOfCount,
final String descriptionOfTotal) {
sensor.add(
new MetricName(countMetricNamePrefix + TOTAL_SUFFIX, group, descriptionOfCount, tags),
new CumulativeCount()
);
sensor.add(
new MetricName(sumMetricNamePrefix + TOTAL_SUFFIX, group, descriptionOfTotal, tags),
new CumulativeSum()
);
}
public static void maybeMeasureLatency(final Runnable actionToMeasure, public static void maybeMeasureLatency(final Runnable actionToMeasure,
final Time time, final Time time,
final Sensor sensor) { final Sensor sensor) {

View File

@ -133,7 +133,7 @@ public class TaskMetrics {
return sensor; return sensor;
} }
public static Sensor totalBytesSensor(final String threadId, public static Sensor totalInputBufferBytesSensor(final String threadId,
final String taskId, final String taskId,
final StreamsMetricsImpl streamsMetrics) { final StreamsMetricsImpl streamsMetrics) {
final String name = INPUT_BUFFER_BYTES_TOTAL; final String name = INPUT_BUFFER_BYTES_TOTAL;

View File

@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.internals.metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TOPIC_LEVEL_GROUP;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TOTAL_DESCRIPTION;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addTotalCountAndSumMetricsToSensor;
public class TopicMetrics {
private static final String CONSUMED = "consumed";
private static final String BYTES_CONSUMED = "bytes-consumed";
private static final String BYTES_CONSUMED_DESCRIPTION = "bytes consumed from this topic";
private static final String BYTES_CONSUMED_TOTAL_DESCRIPTION = TOTAL_DESCRIPTION + BYTES_CONSUMED_DESCRIPTION;
private static final String RECORDS_CONSUMED = "records-consumed";
private static final String RECORDS_CONSUMED_DESCRIPTION = "records consumed from this topic";
private static final String RECORDS_CONSUMED_TOTAL_DESCRIPTION = TOTAL_DESCRIPTION + RECORDS_CONSUMED_DESCRIPTION;
private static final String PRODUCED = "produced";
private static final String BYTES_PRODUCED = "bytes-produced";
private static final String BYTES_PRODUCED_DESCRIPTION = "bytes produced to this topic";
private static final String BYTES_PRODUCED_TOTAL_DESCRIPTION = TOTAL_DESCRIPTION + BYTES_PRODUCED_DESCRIPTION;
private static final String RECORDS_PRODUCED = "records-produced";
private static final String RECORDS_PRODUCED_DESCRIPTION = "records produced to this topic";
private static final String RECORDS_PRODUCED_TOTAL_DESCRIPTION = TOTAL_DESCRIPTION + RECORDS_PRODUCED_DESCRIPTION;
public static Sensor consumedSensor(final String threadId,
final String taskId,
final String processorNodeId,
final String topic,
final StreamsMetricsImpl streamsMetrics) {
final Sensor sensor = streamsMetrics.topicLevelSensor(
threadId,
taskId,
processorNodeId,
topic,
CONSUMED,
RecordingLevel.INFO);
addTotalCountAndSumMetricsToSensor(
sensor,
TOPIC_LEVEL_GROUP,
streamsMetrics.topicLevelTagMap(threadId, taskId, processorNodeId, topic),
RECORDS_CONSUMED,
BYTES_CONSUMED,
RECORDS_CONSUMED_TOTAL_DESCRIPTION,
BYTES_CONSUMED_TOTAL_DESCRIPTION
);
return sensor;
}
public static Sensor producedSensor(final String threadId,
final String taskId,
final String processorNodeId,
final String topic,
final StreamsMetricsImpl streamsMetrics) {
final Sensor sensor = streamsMetrics.topicLevelSensor(
threadId,
taskId,
processorNodeId,
topic,
PRODUCED,
RecordingLevel.INFO);
addTotalCountAndSumMetricsToSensor(
sensor,
TOPIC_LEVEL_GROUP,
streamsMetrics.topicLevelTagMap(threadId, taskId, processorNodeId, topic),
RECORDS_PRODUCED,
BYTES_PRODUCED,
RECORDS_PRODUCED_TOTAL_DESCRIPTION,
BYTES_PRODUCED_TOTAL_DESCRIPTION
);
return sensor;
}
}

View File

@ -292,8 +292,9 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
partition, partition,
null, null,
KEY_SERIALIZER, KEY_SERIALIZER,
VALUE_SERIALIZER VALUE_SERIALIZER,
); null,
null);
} }
private void logTombstone(final Bytes key) { private void logTombstone(final Bytes key) {
@ -305,8 +306,9 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
partition, partition,
null, null,
KEY_SERIALIZER, KEY_SERIALIZER,
VALUE_SERIALIZER VALUE_SERIALIZER,
); null,
null);
} }
private void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> batch) { private void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> batch) {

View File

@ -93,6 +93,7 @@ public class MetricsIntegrationTest {
private static final String STREAM_THREAD_NODE_METRICS = "stream-thread-metrics"; private static final String STREAM_THREAD_NODE_METRICS = "stream-thread-metrics";
private static final String STREAM_TASK_NODE_METRICS = "stream-task-metrics"; private static final String STREAM_TASK_NODE_METRICS = "stream-task-metrics";
private static final String STREAM_PROCESSOR_NODE_METRICS = "stream-processor-node-metrics"; private static final String STREAM_PROCESSOR_NODE_METRICS = "stream-processor-node-metrics";
private static final String STREAM_TOPIC_METRICS = "stream-topic-metrics";
private static final String STREAM_CACHE_NODE_METRICS = "stream-record-cache-metrics"; private static final String STREAM_CACHE_NODE_METRICS = "stream-record-cache-metrics";
private static final String IN_MEMORY_KVSTORE_TAG_KEY = "in-memory-state-id"; private static final String IN_MEMORY_KVSTORE_TAG_KEY = "in-memory-state-id";
@ -217,6 +218,10 @@ public class MetricsIntegrationTest {
private static final String RECORD_E2E_LATENCY_AVG = "record-e2e-latency-avg"; private static final String RECORD_E2E_LATENCY_AVG = "record-e2e-latency-avg";
private static final String RECORD_E2E_LATENCY_MIN = "record-e2e-latency-min"; private static final String RECORD_E2E_LATENCY_MIN = "record-e2e-latency-min";
private static final String RECORD_E2E_LATENCY_MAX = "record-e2e-latency-max"; private static final String RECORD_E2E_LATENCY_MAX = "record-e2e-latency-max";
private static final String BYTES_CONSUMED_TOTAL = "bytes-consumed-total";
private static final String RECORDS_CONSUMED_TOTAL = "records-consumed-total";
private static final String BYTES_PRODUCED_TOTAL = "bytes-produced-total";
private static final String RECORDS_PRODUCED_TOTAL = "records-produced-total";
// stores name // stores name
private static final String TIME_WINDOWED_AGGREGATED_STREAM_STORE = "time-windowed-aggregated-stream-store"; private static final String TIME_WINDOWED_AGGREGATED_STREAM_STORE = "time-windowed-aggregated-stream-store";
@ -360,6 +365,7 @@ public class MetricsIntegrationTest {
checkThreadLevelMetrics(); checkThreadLevelMetrics();
checkTaskLevelMetrics(); checkTaskLevelMetrics();
checkProcessorNodeLevelMetrics(); checkProcessorNodeLevelMetrics();
checkTopicLevelMetrics();
checkKeyValueStoreMetrics(IN_MEMORY_KVSTORE_TAG_KEY); checkKeyValueStoreMetrics(IN_MEMORY_KVSTORE_TAG_KEY);
checkKeyValueStoreMetrics(ROCKSDB_KVSTORE_TAG_KEY); checkKeyValueStoreMetrics(ROCKSDB_KVSTORE_TAG_KEY);
checkKeyValueStoreMetrics(IN_MEMORY_LRUCACHE_TAG_KEY); checkKeyValueStoreMetrics(IN_MEMORY_LRUCACHE_TAG_KEY);
@ -548,6 +554,18 @@ public class MetricsIntegrationTest {
checkMetricByName(listMetricProcessor, RECORD_E2E_LATENCY_MAX, numberOfSourceNodes + numberOfTerminalNodes); checkMetricByName(listMetricProcessor, RECORD_E2E_LATENCY_MAX, numberOfSourceNodes + numberOfTerminalNodes);
} }
private void checkTopicLevelMetrics() {
final List<Metric> listMetricProcessor = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream()
.filter(m -> m.metricName().group().equals(STREAM_TOPIC_METRICS))
.collect(Collectors.toList());
final int numberOfSourceTopics = 4;
final int numberOfSinkTopics = 4;
checkMetricByName(listMetricProcessor, BYTES_CONSUMED_TOTAL, numberOfSourceTopics);
checkMetricByName(listMetricProcessor, RECORDS_CONSUMED_TOTAL, numberOfSourceTopics);
checkMetricByName(listMetricProcessor, BYTES_PRODUCED_TOTAL, numberOfSinkTopics);
checkMetricByName(listMetricProcessor, RECORDS_PRODUCED_TOTAL, numberOfSinkTopics);
}
private void checkKeyValueStoreMetrics(final String tagKey) { private void checkKeyValueStoreMetrics(final String tagKey) {
final List<Metric> listMetricStore = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream() final List<Metric> listMetricStore = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream()
.filter(m -> m.metricName().tags().containsKey(tagKey) && m.metricName().group().equals(STATE_STORE_LEVEL_GROUP)) .filter(m -> m.metricName().tags().containsKey(tagKey) && m.metricName().group().equals(STATE_STORE_LEVEL_GROUP))

View File

@ -60,6 +60,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.closeTo; import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.core.IsNot.not; import static org.hamcrest.core.IsNot.not;
import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertThrows;
import static java.util.Collections.emptySet;
@RunWith(EasyMockRunner.class) @RunWith(EasyMockRunner.class)
public class ActiveTaskCreatorTest { public class ActiveTaskCreatorTest {
@ -478,6 +479,7 @@ public class ActiveTaskCreatorTest {
reset(builder, stateDirectory); reset(builder, stateDirectory);
expect(builder.topologyConfigs()).andStubReturn(new TopologyConfig(new StreamsConfig(properties))); expect(builder.topologyConfigs()).andStubReturn(new TopologyConfig(new StreamsConfig(properties)));
expect(builder.buildSubtopology(0)).andReturn(topology).anyTimes(); expect(builder.buildSubtopology(0)).andReturn(topology).anyTimes();
expect(topology.sinkTopics()).andStubReturn(emptySet());
expect(stateDirectory.getOrCreateDirectoryForTask(task00)).andReturn(mock(File.class)); expect(stateDirectory.getOrCreateDirectoryForTask(task00)).andReturn(mock(File.class));
expect(stateDirectory.checkpointFileFor(task00)).andReturn(mock(File.class)); expect(stateDirectory.checkpointFileFor(task00)).andReturn(mock(File.class));
expect(stateDirectory.getOrCreateDirectoryForTask(task01)).andReturn(mock(File.class)); expect(stateDirectory.getOrCreateDirectoryForTask(task01)).andReturn(mock(File.class));

View File

@ -17,6 +17,7 @@
package org.apache.kafka.streams.processor.internals; package org.apache.kafka.streams.processor.internals;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.Admin;
@ -24,29 +25,76 @@ import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListOffsetsResult; import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo; import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.StreamsException;
import org.easymock.EasyMock; import org.easymock.EasyMock;
import org.junit.Test; import org.junit.Test;
import static java.util.Arrays.asList;
import static java.util.Collections.emptySet; import static java.util.Collections.emptySet;
import static org.apache.kafka.common.utils.Utils.mkSet; import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.streams.processor.internals.ClientUtils.consumerRecordSizeInBytes;
import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchCommittedOffsets; import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchCommittedOffsets;
import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchEndOffsets; import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchEndOffsets;
import static org.apache.kafka.streams.processor.internals.ClientUtils.producerRecordSizeInBytes;
import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.replay; import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.verify; import static org.easymock.EasyMock.verify;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
public class ClientUtilsTest { public class ClientUtilsTest {
// consumer and producer records use utf8 encoding for topic name, header keys, etc
private static final String TOPIC = "topic";
private static final int TOPIC_BYTES = 5;
private static final byte[] KEY = "key".getBytes();
private static final int KEY_BYTES = 3;
private static final byte[] VALUE = "value".getBytes();
private static final int VALUE_BYTES = 5;
private static final Headers HEADERS = new RecordHeaders(asList(
new RecordHeader("h1", "headerVal1".getBytes()), // 2 + 10 --> 12 bytes
new RecordHeader("h2", "headerVal2".getBytes())
)); // 2 + 10 --> 12 bytes
private static final int HEADERS_BYTES = 24;
private static final int RECORD_METADATA_BYTES =
8 + // timestamp
8 + // offset
4; // partition
// 57 bytes
private static final long SIZE_IN_BYTES =
KEY_BYTES +
VALUE_BYTES +
TOPIC_BYTES +
HEADERS_BYTES +
RECORD_METADATA_BYTES;
private static final long TOMBSTONE_SIZE_IN_BYTES =
KEY_BYTES +
TOPIC_BYTES +
HEADERS_BYTES +
RECORD_METADATA_BYTES;
private static final Set<TopicPartition> PARTITIONS = mkSet( private static final Set<TopicPartition> PARTITIONS = mkSet(
new TopicPartition("topic", 1), new TopicPartition(TOPIC, 1),
new TopicPartition("topic", 2) new TopicPartition(TOPIC, 2)
); );
@Test @Test
@ -122,4 +170,66 @@ public class ClientUtilsTest {
verify(adminClient); verify(adminClient);
} }
@Test
public void shouldComputeSizeInBytesForConsumerRecord() {
final ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>(
TOPIC,
1,
0L,
0L,
TimestampType.CREATE_TIME,
KEY_BYTES,
VALUE_BYTES,
KEY,
VALUE,
HEADERS,
Optional.empty()
);
assertThat(consumerRecordSizeInBytes(record), equalTo(SIZE_IN_BYTES));
}
@Test
public void shouldComputeSizeInBytesForProducerRecord() {
final ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(
TOPIC,
1,
0L,
KEY,
VALUE,
HEADERS
);
assertThat(producerRecordSizeInBytes(record), equalTo(SIZE_IN_BYTES));
}
@Test
public void shouldComputeSizeInBytesForConsumerRecordWithNullValue() {
final ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>(
TOPIC,
1,
0,
0L,
TimestampType.CREATE_TIME,
KEY_BYTES,
0,
KEY,
null,
HEADERS,
Optional.empty()
);
assertThat(consumerRecordSizeInBytes(record), equalTo(TOMBSTONE_SIZE_IN_BYTES));
}
@Test
public void shouldComputeSizeInBytesForProducerRecordWithNullValue() {
final ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(
TOPIC,
1,
0L,
KEY,
null,
HEADERS
);
assertThat(producerRecordSizeInBytes(record), equalTo(TOMBSTONE_SIZE_IN_BYTES));
}
} }

View File

@ -19,7 +19,6 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.MetricName; import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.Sensor;
@ -53,6 +52,8 @@ import java.util.Optional;
import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkSet; import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.streams.processor.internals.ClientUtils.consumerRecordSizeInBytes;
import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
@ -846,19 +847,7 @@ public class PartitionGroupTest {
private long getBytesBufferedForRawRecords(final List<ConsumerRecord<byte[], byte[]>> rawRecords) { private long getBytesBufferedForRawRecords(final List<ConsumerRecord<byte[], byte[]>> rawRecords) {
long rawRecordsSizeInBytes = 0L; long rawRecordsSizeInBytes = 0L;
for (final ConsumerRecord<byte[], byte[]> rawRecord : rawRecords) { for (final ConsumerRecord<byte[], byte[]> rawRecord : rawRecords) {
long headerSizeInBytes = 0L; rawRecordsSizeInBytes += consumerRecordSizeInBytes(rawRecord);
for (final Header header: rawRecord.headers().toArray()) {
headerSizeInBytes += header.key().getBytes().length + header.value().length;
}
rawRecordsSizeInBytes += rawRecord.serializedKeySize() +
rawRecord.serializedValueSize() +
8L + // timestamp
8L + // offset
rawRecord.topic().getBytes().length +
4L + // partition
headerSizeInBytes;
} }
return rawRecordsSizeInBytes; return rawRecordsSizeInBytes;
} }

View File

@ -404,8 +404,10 @@ public class ProcessorContextImplTest {
CHANGELOG_PARTITION.partition(), CHANGELOG_PARTITION.partition(),
TIMESTAMP, TIMESTAMP,
BYTES_KEY_SERIALIZER, BYTES_KEY_SERIALIZER,
BYTEARRAY_VALUE_SERIALIZER BYTEARRAY_VALUE_SERIALIZER,
); null,
null);
final StreamTask task = EasyMock.createNiceMock(StreamTask.class); final StreamTask task = EasyMock.createNiceMock(StreamTask.class);
replay(recordCollector, task); replay(recordCollector, task);
@ -430,8 +432,9 @@ public class ProcessorContextImplTest {
CHANGELOG_PARTITION.partition(), CHANGELOG_PARTITION.partition(),
TIMESTAMP, TIMESTAMP,
BYTES_KEY_SERIALIZER, BYTES_KEY_SERIALIZER,
BYTEARRAY_VALUE_SERIALIZER BYTEARRAY_VALUE_SERIALIZER,
); null,
null);
final StreamTask task = EasyMock.createNiceMock(StreamTask.class); final StreamTask task = EasyMock.createNiceMock(StreamTask.class);

View File

@ -53,6 +53,7 @@ import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.MockClientSupplier; import org.apache.kafka.test.MockClientSupplier;
import java.util.UUID; import java.util.UUID;
@ -69,6 +70,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.streams.processor.internals.ClientUtils.producerRecordSizeInBytes;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TOPIC_LEVEL_GROUP;
import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall; import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.mock; import static org.easymock.EasyMock.mock;
@ -81,6 +85,10 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static java.util.Collections.singletonMap;
public class RecordCollectorTest { public class RecordCollectorTest {
@ -99,6 +107,7 @@ public class RecordCollectorTest {
)); ));
private final String topic = "topic"; private final String topic = "topic";
private final String sinkNodeName = "output-node";
private final Cluster cluster = new Cluster( private final Cluster cluster = new Cluster(
"cluster", "cluster",
Collections.singletonList(Node.noNode()), Collections.singletonList(Node.noNode()),
@ -120,6 +129,8 @@ public class RecordCollectorTest {
private MockProducer<byte[], byte[]> mockProducer; private MockProducer<byte[], byte[]> mockProducer;
private StreamsProducer streamsProducer; private StreamsProducer streamsProducer;
private ProcessorTopology topology;
private final InternalProcessorContext<Void, Void> context = new InternalMockProcessorContext<>();
private RecordCollectorImpl collector; private RecordCollectorImpl collector;
@ -137,12 +148,29 @@ public class RecordCollectorTest {
Time.SYSTEM Time.SYSTEM
); );
mockProducer = clientSupplier.producers.get(0); mockProducer = clientSupplier.producers.get(0);
final SinkNode<?, ?> sinkNode = new SinkNode<>(
sinkNodeName,
new StaticTopicNameExtractor<>(topic),
stringSerializer,
byteArraySerializer,
streamPartitioner);
topology = new ProcessorTopology(
emptyList(),
emptyMap(),
singletonMap(topic, sinkNode),
emptyList(),
emptyList(),
emptyMap(),
emptySet()
);
collector = new RecordCollectorImpl( collector = new RecordCollectorImpl(
logContext, logContext,
taskId, taskId,
streamsProducer, streamsProducer,
productionExceptionHandler, productionExceptionHandler,
streamsMetrics); streamsMetrics,
topology
);
} }
@After @After
@ -150,16 +178,73 @@ public class RecordCollectorTest {
collector.closeClean(); collector.closeClean();
} }
@Test
public void shouldRecordRecordsAndBytesProduced() {
final Headers headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())});
final String threadId = Thread.currentThread().getName();
final String processorNodeId = sinkNodeName;
final String topic = "topic";
final Metric recordsProduced = streamsMetrics.metrics().get(
new MetricName("records-produced-total",
TOPIC_LEVEL_GROUP,
"The total number of records produced from this topic",
streamsMetrics.topicLevelTagMap(threadId, taskId.toString(), processorNodeId, topic))
);
final Metric bytesProduced = streamsMetrics.metrics().get(
new MetricName("bytes-produced-total",
TOPIC_LEVEL_GROUP,
"The total number of bytes produced from this topic",
streamsMetrics.topicLevelTagMap(threadId, taskId.toString(), processorNodeId, topic))
);
double totalRecords = 0D;
double totalBytes = 0D;
assertThat(recordsProduced.metricValue(), equalTo(totalRecords));
assertThat(bytesProduced.metricValue(), equalTo(totalBytes));
collector.send(topic, "999", "0", null, 0, null, stringSerializer, stringSerializer, sinkNodeName, context);
++totalRecords;
totalBytes += producerRecordSizeInBytes(mockProducer.history().get(0));
assertThat(recordsProduced.metricValue(), equalTo(totalRecords));
assertThat(bytesProduced.metricValue(), equalTo(totalBytes));
collector.send(topic, "999", "0", headers, 1, null, stringSerializer, stringSerializer, sinkNodeName, context);
++totalRecords;
totalBytes += producerRecordSizeInBytes(mockProducer.history().get(1));
assertThat(recordsProduced.metricValue(), equalTo(totalRecords));
assertThat(bytesProduced.metricValue(), equalTo(totalBytes));
collector.send(topic, "999", "0", null, 0, null, stringSerializer, stringSerializer, sinkNodeName, context);
++totalRecords;
totalBytes += producerRecordSizeInBytes(mockProducer.history().get(2));
assertThat(recordsProduced.metricValue(), equalTo(totalRecords));
assertThat(bytesProduced.metricValue(), equalTo(totalBytes));
collector.send(topic, "999", "0", headers, 1, null, stringSerializer, stringSerializer, sinkNodeName, context);
++totalRecords;
totalBytes += producerRecordSizeInBytes(mockProducer.history().get(3));
assertThat(recordsProduced.metricValue(), equalTo(totalRecords));
assertThat(bytesProduced.metricValue(), equalTo(totalBytes));
collector.send(topic, "999", "0", null, 0, null, stringSerializer, stringSerializer, sinkNodeName, context);
++totalRecords;
totalBytes += producerRecordSizeInBytes(mockProducer.history().get(4));
assertThat(recordsProduced.metricValue(), equalTo(totalRecords));
assertThat(bytesProduced.metricValue(), equalTo(totalBytes));
}
@Test @Test
public void shouldSendToSpecificPartition() { public void shouldSendToSpecificPartition() {
final Headers headers = new RecordHeaders(new Header[] {new RecordHeader("key", "value".getBytes())}); final Headers headers = new RecordHeaders(new Header[] {new RecordHeader("key", "value".getBytes())});
collector.send(topic, "999", "0", null, 0, null, stringSerializer, stringSerializer); collector.send(topic, "999", "0", null, 0, null, stringSerializer, stringSerializer, null, null);
collector.send(topic, "999", "0", null, 0, null, stringSerializer, stringSerializer); collector.send(topic, "999", "0", null, 0, null, stringSerializer, stringSerializer, null, null);
collector.send(topic, "999", "0", null, 0, null, stringSerializer, stringSerializer); collector.send(topic, "999", "0", null, 0, null, stringSerializer, stringSerializer, null, null);
collector.send(topic, "999", "0", headers, 1, null, stringSerializer, stringSerializer); collector.send(topic, "999", "0", headers, 1, null, stringSerializer, stringSerializer, null, null);
collector.send(topic, "999", "0", headers, 1, null, stringSerializer, stringSerializer); collector.send(topic, "999", "0", headers, 1, null, stringSerializer, stringSerializer, null, null);
collector.send(topic, "999", "0", headers, 2, null, stringSerializer, stringSerializer); collector.send(topic, "999", "0", headers, 2, null, stringSerializer, stringSerializer, null, null);
Map<TopicPartition, Long> offsets = collector.offsets(); Map<TopicPartition, Long> offsets = collector.offsets();
@ -168,9 +253,9 @@ public class RecordCollectorTest {
assertEquals(0L, (long) offsets.get(new TopicPartition(topic, 2))); assertEquals(0L, (long) offsets.get(new TopicPartition(topic, 2)));
assertEquals(6, mockProducer.history().size()); assertEquals(6, mockProducer.history().size());
collector.send(topic, "999", "0", null, 0, null, stringSerializer, stringSerializer); collector.send(topic, "999", "0", null, 0, null, stringSerializer, stringSerializer, null, null);
collector.send(topic, "999", "0", null, 1, null, stringSerializer, stringSerializer); collector.send(topic, "999", "0", null, 1, null, stringSerializer, stringSerializer, null, null);
collector.send(topic, "999", "0", headers, 2, null, stringSerializer, stringSerializer); collector.send(topic, "999", "0", headers, 2, null, stringSerializer, stringSerializer, null, null);
offsets = collector.offsets(); offsets = collector.offsets();
@ -184,15 +269,15 @@ public class RecordCollectorTest {
public void shouldSendWithPartitioner() { public void shouldSendWithPartitioner() {
final Headers headers = new RecordHeaders(new Header[] {new RecordHeader("key", "value".getBytes())}); final Headers headers = new RecordHeaders(new Header[] {new RecordHeader("key", "value".getBytes())});
collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, null, null, streamPartitioner);
collector.send(topic, "9", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); collector.send(topic, "9", "0", null, null, stringSerializer, stringSerializer, null, null, streamPartitioner);
collector.send(topic, "27", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); collector.send(topic, "27", "0", null, null, stringSerializer, stringSerializer, null, null, streamPartitioner);
collector.send(topic, "81", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); collector.send(topic, "81", "0", null, null, stringSerializer, stringSerializer, null, null, streamPartitioner);
collector.send(topic, "243", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); collector.send(topic, "243", "0", null, null, stringSerializer, stringSerializer, null, null, streamPartitioner);
collector.send(topic, "28", "0", headers, null, stringSerializer, stringSerializer, streamPartitioner); collector.send(topic, "28", "0", headers, null, stringSerializer, stringSerializer, null, null, streamPartitioner);
collector.send(topic, "82", "0", headers, null, stringSerializer, stringSerializer, streamPartitioner); collector.send(topic, "82", "0", headers, null, stringSerializer, stringSerializer, null, null, streamPartitioner);
collector.send(topic, "244", "0", headers, null, stringSerializer, stringSerializer, streamPartitioner); collector.send(topic, "244", "0", headers, null, stringSerializer, stringSerializer, null, null, streamPartitioner);
collector.send(topic, "245", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); collector.send(topic, "245", "0", null, null, stringSerializer, stringSerializer, null, null, streamPartitioner);
final Map<TopicPartition, Long> offsets = collector.offsets(); final Map<TopicPartition, Long> offsets = collector.offsets();
@ -210,15 +295,15 @@ public class RecordCollectorTest {
public void shouldSendWithNoPartition() { public void shouldSendWithNoPartition() {
final Headers headers = new RecordHeaders(new Header[] {new RecordHeader("key", "value".getBytes())}); final Headers headers = new RecordHeaders(new Header[] {new RecordHeader("key", "value".getBytes())});
collector.send(topic, "3", "0", headers, null, null, stringSerializer, stringSerializer); collector.send(topic, "3", "0", headers, null, null, stringSerializer, stringSerializer, null, null);
collector.send(topic, "9", "0", headers, null, null, stringSerializer, stringSerializer); collector.send(topic, "9", "0", headers, null, null, stringSerializer, stringSerializer, null, null);
collector.send(topic, "27", "0", headers, null, null, stringSerializer, stringSerializer); collector.send(topic, "27", "0", headers, null, null, stringSerializer, stringSerializer, null, null);
collector.send(topic, "81", "0", headers, null, null, stringSerializer, stringSerializer); collector.send(topic, "81", "0", headers, null, null, stringSerializer, stringSerializer, null, null);
collector.send(topic, "243", "0", headers, null, null, stringSerializer, stringSerializer); collector.send(topic, "243", "0", headers, null, null, stringSerializer, stringSerializer, null, null);
collector.send(topic, "28", "0", headers, null, null, stringSerializer, stringSerializer); collector.send(topic, "28", "0", headers, null, null, stringSerializer, stringSerializer, null, null);
collector.send(topic, "82", "0", headers, null, null, stringSerializer, stringSerializer); collector.send(topic, "82", "0", headers, null, null, stringSerializer, stringSerializer, null, null);
collector.send(topic, "244", "0", headers, null, null, stringSerializer, stringSerializer); collector.send(topic, "244", "0", headers, null, null, stringSerializer, stringSerializer, null, null);
collector.send(topic, "245", "0", headers, null, null, stringSerializer, stringSerializer); collector.send(topic, "245", "0", headers, null, null, stringSerializer, stringSerializer, null, null);
final Map<TopicPartition, Long> offsets = collector.offsets(); final Map<TopicPartition, Long> offsets = collector.offsets();
@ -233,9 +318,9 @@ public class RecordCollectorTest {
public void shouldUpdateOffsetsUponCompletion() { public void shouldUpdateOffsetsUponCompletion() {
Map<TopicPartition, Long> offsets = collector.offsets(); Map<TopicPartition, Long> offsets = collector.offsets();
collector.send(topic, "999", "0", null, 0, null, stringSerializer, stringSerializer); collector.send(topic, "999", "0", null, 0, null, stringSerializer, stringSerializer, null, null);
collector.send(topic, "999", "0", null, 1, null, stringSerializer, stringSerializer); collector.send(topic, "999", "0", null, 1, null, stringSerializer, stringSerializer, null, null);
collector.send(topic, "999", "0", null, 2, null, stringSerializer, stringSerializer); collector.send(topic, "999", "0", null, 2, null, stringSerializer, stringSerializer, null, null);
assertEquals(Collections.<TopicPartition, Long>emptyMap(), offsets); assertEquals(Collections.<TopicPartition, Long>emptyMap(), offsets);
@ -253,7 +338,7 @@ public class RecordCollectorTest {
final CustomStringSerializer valueSerializer = new CustomStringSerializer(); final CustomStringSerializer valueSerializer = new CustomStringSerializer();
keySerializer.configure(Collections.emptyMap(), true); keySerializer.configure(Collections.emptyMap(), true);
collector.send(topic, "3", "0", new RecordHeaders(), null, keySerializer, valueSerializer, streamPartitioner); collector.send(topic, "3", "0", new RecordHeaders(), null, keySerializer, valueSerializer, null, null, streamPartitioner);
final List<ProducerRecord<byte[], byte[]>> recordHistory = mockProducer.history(); final List<ProducerRecord<byte[], byte[]>> recordHistory = mockProducer.history();
for (final ProducerRecord<byte[], byte[]> sentRecord : recordHistory) { for (final ProducerRecord<byte[], byte[]> sentRecord : recordHistory) {
@ -270,14 +355,19 @@ public class RecordCollectorTest {
expect(streamsProducer.eosEnabled()).andReturn(false); expect(streamsProducer.eosEnabled()).andReturn(false);
streamsProducer.flush(); streamsProducer.flush();
expectLastCall(); expectLastCall();
replay(streamsProducer);
final ProcessorTopology topology = mock(ProcessorTopology.class);
expect(topology.sinkTopics()).andStubReturn(Collections.emptySet());
replay(streamsProducer, topology);
final RecordCollector collector = new RecordCollectorImpl( final RecordCollector collector = new RecordCollectorImpl(
logContext, logContext,
taskId, taskId,
streamsProducer, streamsProducer,
productionExceptionHandler, productionExceptionHandler,
streamsMetrics); streamsMetrics,
topology
);
collector.flush(); collector.flush();
@ -290,14 +380,18 @@ public class RecordCollectorTest {
expect(streamsProducer.eosEnabled()).andReturn(true); expect(streamsProducer.eosEnabled()).andReturn(true);
streamsProducer.flush(); streamsProducer.flush();
expectLastCall(); expectLastCall();
replay(streamsProducer); final ProcessorTopology topology = mock(ProcessorTopology.class);
expect(topology.sinkTopics()).andStubReturn(Collections.emptySet());
replay(streamsProducer, topology);
final RecordCollector collector = new RecordCollectorImpl( final RecordCollector collector = new RecordCollectorImpl(
logContext, logContext,
taskId, taskId,
streamsProducer, streamsProducer,
productionExceptionHandler, productionExceptionHandler,
streamsMetrics); streamsMetrics,
topology
);
collector.flush(); collector.flush();
@ -308,14 +402,19 @@ public class RecordCollectorTest {
public void shouldNotAbortTxOnCloseCleanIfEosEnabled() { public void shouldNotAbortTxOnCloseCleanIfEosEnabled() {
final StreamsProducer streamsProducer = mock(StreamsProducer.class); final StreamsProducer streamsProducer = mock(StreamsProducer.class);
expect(streamsProducer.eosEnabled()).andReturn(true); expect(streamsProducer.eosEnabled()).andReturn(true);
replay(streamsProducer);
final ProcessorTopology topology = mock(ProcessorTopology.class);
expect(topology.sinkTopics()).andStubReturn(Collections.emptySet());
replay(streamsProducer, topology);
final RecordCollector collector = new RecordCollectorImpl( final RecordCollector collector = new RecordCollectorImpl(
logContext, logContext,
taskId, taskId,
streamsProducer, streamsProducer,
productionExceptionHandler, productionExceptionHandler,
streamsMetrics); streamsMetrics,
topology
);
collector.closeClean(); collector.closeClean();
@ -327,14 +426,19 @@ public class RecordCollectorTest {
final StreamsProducer streamsProducer = mock(StreamsProducer.class); final StreamsProducer streamsProducer = mock(StreamsProducer.class);
expect(streamsProducer.eosEnabled()).andReturn(true); expect(streamsProducer.eosEnabled()).andReturn(true);
streamsProducer.abortTransaction(); streamsProducer.abortTransaction();
replay(streamsProducer);
final ProcessorTopology topology = mock(ProcessorTopology.class);
expect(topology.sinkTopics()).andStubReturn(Collections.emptySet());
replay(streamsProducer, topology);
final RecordCollector collector = new RecordCollectorImpl( final RecordCollector collector = new RecordCollectorImpl(
logContext, logContext,
taskId, taskId,
streamsProducer, streamsProducer,
productionExceptionHandler, productionExceptionHandler,
streamsMetrics); streamsMetrics,
topology
);
collector.closeDirty(); collector.closeDirty();
@ -354,7 +458,7 @@ public class RecordCollectorTest {
0, 0,
0L, 0L,
(Serializer) new LongSerializer(), // need to add cast to trigger `ClassCastException` (Serializer) new LongSerializer(), // need to add cast to trigger `ClassCastException`
new StringSerializer()) new StringSerializer(), null, null)
); );
assertThat(expected.getCause(), instanceOf(ClassCastException.class)); assertThat(expected.getCause(), instanceOf(ClassCastException.class));
@ -382,7 +486,7 @@ public class RecordCollectorTest {
0, 0,
0L, 0L,
(Serializer) new LongSerializer(), // need to add cast to trigger `ClassCastException` (Serializer) new LongSerializer(), // need to add cast to trigger `ClassCastException`
new StringSerializer()) new StringSerializer(), null, null)
); );
assertThat(expected.getCause(), instanceOf(ClassCastException.class)); assertThat(expected.getCause(), instanceOf(ClassCastException.class));
@ -410,7 +514,7 @@ public class RecordCollectorTest {
0, 0,
0L, 0L,
new StringSerializer(), new StringSerializer(),
(Serializer) new LongSerializer()) // need to add cast to trigger `ClassCastException` (Serializer) new LongSerializer(), null, null) // need to add cast to trigger `ClassCastException`
); );
assertThat(expected.getCause(), instanceOf(ClassCastException.class)); assertThat(expected.getCause(), instanceOf(ClassCastException.class));
@ -438,7 +542,7 @@ public class RecordCollectorTest {
0, 0,
0L, 0L,
new StringSerializer(), new StringSerializer(),
(Serializer) new LongSerializer()) // need to add cast to trigger `ClassCastException` (Serializer) new LongSerializer(), null, null) // need to add cast to trigger `ClassCastException`
); );
assertThat(expected.getCause(), instanceOf(ClassCastException.class)); assertThat(expected.getCause(), instanceOf(ClassCastException.class));
@ -460,13 +564,14 @@ public class RecordCollectorTest {
taskId, taskId,
getExceptionalStreamProducerOnPartitionsFor(new KafkaException("Kaboom!")), getExceptionalStreamProducerOnPartitionsFor(new KafkaException("Kaboom!")),
productionExceptionHandler, productionExceptionHandler,
streamsMetrics streamsMetrics,
topology
); );
collector.initialize(); collector.initialize();
final StreamsException exception = assertThrows( final StreamsException exception = assertThrows(
StreamsException.class, StreamsException.class,
() -> collector.send(topic, "0", "0", null, null, stringSerializer, stringSerializer, streamPartitioner) () -> collector.send(topic, "0", "0", null, null, stringSerializer, stringSerializer, null, null, streamPartitioner)
); );
assertThat( assertThat(
exception.getMessage(), exception.getMessage(),
@ -491,13 +596,14 @@ public class RecordCollectorTest {
taskId, taskId,
getExceptionalStreamProducerOnPartitionsFor(runtimeException), getExceptionalStreamProducerOnPartitionsFor(runtimeException),
productionExceptionHandler, productionExceptionHandler,
streamsMetrics streamsMetrics,
topology
); );
collector.initialize(); collector.initialize();
final RuntimeException exception = assertThrows( final RuntimeException exception = assertThrows(
runtimeException.getClass(), runtimeException.getClass(),
() -> collector.send(topic, "0", "0", null, null, stringSerializer, stringSerializer, streamPartitioner) () -> collector.send(topic, "0", "0", null, null, stringSerializer, stringSerializer, null, null, streamPartitioner)
); );
assertThat(exception.getMessage(), equalTo("Kaboom!")); assertThat(exception.getMessage(), equalTo("Kaboom!"));
} }
@ -518,15 +624,16 @@ public class RecordCollectorTest {
taskId, taskId,
getExceptionalStreamsProducerOnSend(exception), getExceptionalStreamsProducerOnSend(exception),
productionExceptionHandler, productionExceptionHandler,
streamsMetrics streamsMetrics,
topology
); );
collector.initialize(); collector.initialize();
collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, null, null, streamPartitioner);
final TaskMigratedException thrown = assertThrows( final TaskMigratedException thrown = assertThrows(
TaskMigratedException.class, TaskMigratedException.class,
() -> collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner) () -> collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, null, null, streamPartitioner)
); );
assertEquals(exception, thrown.getCause()); assertEquals(exception, thrown.getCause());
} }
@ -547,11 +654,12 @@ public class RecordCollectorTest {
taskId, taskId,
getExceptionalStreamsProducerOnSend(exception), getExceptionalStreamsProducerOnSend(exception),
productionExceptionHandler, productionExceptionHandler,
streamsMetrics streamsMetrics,
topology
); );
collector.initialize(); collector.initialize();
collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, null, null, streamPartitioner);
final TaskMigratedException thrown = assertThrows(TaskMigratedException.class, collector::flush); final TaskMigratedException thrown = assertThrows(TaskMigratedException.class, collector::flush);
assertEquals(exception, thrown.getCause()); assertEquals(exception, thrown.getCause());
@ -573,11 +681,12 @@ public class RecordCollectorTest {
taskId, taskId,
getExceptionalStreamsProducerOnSend(exception), getExceptionalStreamsProducerOnSend(exception),
productionExceptionHandler, productionExceptionHandler,
streamsMetrics streamsMetrics,
topology
); );
collector.initialize(); collector.initialize();
collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, null, null, streamPartitioner);
final TaskMigratedException thrown = assertThrows(TaskMigratedException.class, collector::closeClean); final TaskMigratedException thrown = assertThrows(TaskMigratedException.class, collector::closeClean);
assertEquals(exception, thrown.getCause()); assertEquals(exception, thrown.getCause());
@ -591,14 +700,15 @@ public class RecordCollectorTest {
taskId, taskId,
getExceptionalStreamsProducerOnSend(exception), getExceptionalStreamsProducerOnSend(exception),
productionExceptionHandler, productionExceptionHandler,
streamsMetrics streamsMetrics,
topology
); );
collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, null, null, streamPartitioner);
final StreamsException thrown = assertThrows( final StreamsException thrown = assertThrows(
StreamsException.class, StreamsException.class,
() -> collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner) () -> collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, null, null, streamPartitioner)
); );
assertEquals(exception, thrown.getCause()); assertEquals(exception, thrown.getCause());
assertThat( assertThat(
@ -617,10 +727,11 @@ public class RecordCollectorTest {
taskId, taskId,
getExceptionalStreamsProducerOnSend(exception), getExceptionalStreamsProducerOnSend(exception),
productionExceptionHandler, productionExceptionHandler,
streamsMetrics streamsMetrics,
topology
); );
collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, null, null, streamPartitioner);
final StreamsException thrown = assertThrows(StreamsException.class, collector::flush); final StreamsException thrown = assertThrows(StreamsException.class, collector::flush);
assertEquals(exception, thrown.getCause()); assertEquals(exception, thrown.getCause());
@ -640,10 +751,11 @@ public class RecordCollectorTest {
taskId, taskId,
getExceptionalStreamsProducerOnSend(exception), getExceptionalStreamsProducerOnSend(exception),
productionExceptionHandler, productionExceptionHandler,
streamsMetrics streamsMetrics,
topology
); );
collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, null, null, streamPartitioner);
final StreamsException thrown = assertThrows(StreamsException.class, collector::closeClean); final StreamsException thrown = assertThrows(StreamsException.class, collector::closeClean);
assertEquals(exception, thrown.getCause()); assertEquals(exception, thrown.getCause());
@ -663,14 +775,15 @@ public class RecordCollectorTest {
taskId, taskId,
getExceptionalStreamsProducerOnSend(exception), getExceptionalStreamsProducerOnSend(exception),
new AlwaysContinueProductionExceptionHandler(), new AlwaysContinueProductionExceptionHandler(),
streamsMetrics streamsMetrics,
topology
); );
collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, null, null, streamPartitioner);
final StreamsException thrown = assertThrows( final StreamsException thrown = assertThrows(
StreamsException.class, StreamsException.class,
() -> collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner) () -> collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, null, null, streamPartitioner)
); );
assertEquals(exception, thrown.getCause()); assertEquals(exception, thrown.getCause());
assertThat( assertThat(
@ -689,10 +802,11 @@ public class RecordCollectorTest {
taskId, taskId,
getExceptionalStreamsProducerOnSend(exception), getExceptionalStreamsProducerOnSend(exception),
new AlwaysContinueProductionExceptionHandler(), new AlwaysContinueProductionExceptionHandler(),
streamsMetrics streamsMetrics,
topology
); );
collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, null, null, streamPartitioner);
final StreamsException thrown = assertThrows(StreamsException.class, collector::flush); final StreamsException thrown = assertThrows(StreamsException.class, collector::flush);
assertEquals(exception, thrown.getCause()); assertEquals(exception, thrown.getCause());
@ -712,10 +826,11 @@ public class RecordCollectorTest {
taskId, taskId,
getExceptionalStreamsProducerOnSend(exception), getExceptionalStreamsProducerOnSend(exception),
new AlwaysContinueProductionExceptionHandler(), new AlwaysContinueProductionExceptionHandler(),
streamsMetrics streamsMetrics,
topology
); );
collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, null, null, streamPartitioner);
final StreamsException thrown = assertThrows(StreamsException.class, collector::closeClean); final StreamsException thrown = assertThrows(StreamsException.class, collector::closeClean);
assertEquals(exception, thrown.getCause()); assertEquals(exception, thrown.getCause());
@ -734,13 +849,14 @@ public class RecordCollectorTest {
taskId, taskId,
getExceptionalStreamsProducerOnSend(new Exception()), getExceptionalStreamsProducerOnSend(new Exception()),
new AlwaysContinueProductionExceptionHandler(), new AlwaysContinueProductionExceptionHandler(),
streamsMetrics streamsMetrics,
topology
); );
try (final LogCaptureAppender logCaptureAppender = try (final LogCaptureAppender logCaptureAppender =
LogCaptureAppender.createAndRegister(RecordCollectorImpl.class)) { LogCaptureAppender.createAndRegister(RecordCollectorImpl.class)) {
collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, null, null, streamPartitioner);
collector.flush(); collector.flush();
final List<String> messages = logCaptureAppender.getMessages(); final List<String> messages = logCaptureAppender.getMessages();
@ -766,7 +882,7 @@ public class RecordCollectorTest {
)); ));
assertEquals(1.0, metric.metricValue()); assertEquals(1.0, metric.metricValue());
collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, null, null, streamPartitioner);
collector.flush(); collector.flush();
collector.closeClean(); collector.closeClean();
} }
@ -797,7 +913,8 @@ public class RecordCollectorTest {
Time.SYSTEM Time.SYSTEM
), ),
productionExceptionHandler, productionExceptionHandler,
streamsMetrics streamsMetrics,
topology
); );
collector.closeDirty(); collector.closeDirty();
@ -829,13 +946,14 @@ public class RecordCollectorTest {
Time.SYSTEM Time.SYSTEM
), ),
productionExceptionHandler, productionExceptionHandler,
streamsMetrics streamsMetrics,
topology
); );
collector.initialize(); collector.initialize();
final StreamsException thrown = assertThrows( final StreamsException thrown = assertThrows(
StreamsException.class, StreamsException.class,
() -> collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner) () -> collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, null, null, streamPartitioner)
); );
assertThat( assertThat(
thrown.getMessage(), thrown.getMessage(),
@ -864,7 +982,8 @@ public class RecordCollectorTest {
Time.SYSTEM Time.SYSTEM
), ),
productionExceptionHandler, productionExceptionHandler,
streamsMetrics streamsMetrics,
topology
); );
collector.closeClean(); collector.closeClean();

View File

@ -17,9 +17,12 @@
package org.apache.kafka.streams.processor.internals; package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerDeserializer;
@ -28,12 +31,15 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler; import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.errors.LogAndFailExceptionHandler; import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp; import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp; import org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp;
import org.apache.kafka.streams.processor.TimestampExtractor; import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.StateSerdes; import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.test.InternalMockProcessorContext; import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.MockRecordCollector; import org.apache.kafka.test.MockRecordCollector;
@ -48,6 +54,9 @@ import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import static org.apache.kafka.streams.processor.internals.ClientUtils.consumerRecordSizeInBytes;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TOPIC_LEVEL_GROUP;
import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
@ -62,10 +71,15 @@ public class RecordQueueTest {
private final Deserializer<Integer> intDeserializer = new IntegerDeserializer(); private final Deserializer<Integer> intDeserializer = new IntegerDeserializer();
private final TimestampExtractor timestampExtractor = new MockTimestampExtractor(); private final TimestampExtractor timestampExtractor = new MockTimestampExtractor();
private final Metrics metrics = new Metrics();
private final StreamsMetricsImpl streamsMetrics =
new StreamsMetricsImpl(metrics, "mock", StreamsConfig.METRICS_LATEST, new MockTime());
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
final InternalMockProcessorContext context = new InternalMockProcessorContext<>( final InternalMockProcessorContext context = new InternalMockProcessorContext<>(
StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class), StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class),
new MockRecordCollector() new MockRecordCollector(),
metrics
); );
private final MockSourceNode<Integer, Integer> mockSourceNodeWithMetrics private final MockSourceNode<Integer, Integer> mockSourceNodeWithMetrics
= new MockSourceNode<>(intDeserializer, intDeserializer); = new MockSourceNode<>(intDeserializer, intDeserializer);
@ -98,6 +112,57 @@ public class RecordQueueTest {
mockSourceNodeWithMetrics.close(); mockSourceNodeWithMetrics.close();
} }
@Test
public void testConsumedSensor() {
final List<ConsumerRecord<byte[], byte[]>> records = Arrays.asList(
new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0, 0, recordKey, recordValue, new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>("topic", 1, 2, 0L, TimestampType.CREATE_TIME, 0, 0, recordKey, recordValue, new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>("topic", 1, 3, 0L, TimestampType.CREATE_TIME, 0, 0, recordKey, recordValue, new RecordHeaders(), Optional.empty()));
queue.addRawRecords(records);
final String threadId = Thread.currentThread().getName();
final String taskId = context.taskId().toString();
final String processorNodeId = mockSourceNodeWithMetrics.name();
final String topic = "topic";
final Metric recordsConsumed = context.metrics().metrics().get(
new MetricName("records-consumed-total",
TOPIC_LEVEL_GROUP,
"The total number of records consumed from this topic",
streamsMetrics.topicLevelTagMap(threadId, taskId, processorNodeId, topic))
);
final Metric bytesConsumed = context.metrics().metrics().get(
new MetricName("bytes-consumed-total",
TOPIC_LEVEL_GROUP,
"The total number of bytes consumed from this topic",
streamsMetrics.topicLevelTagMap(threadId, taskId, processorNodeId, topic))
);
double totalBytes = 0D;
double totalRecords = 0D;
queue.poll(5L);
++totalRecords;
totalBytes += consumerRecordSizeInBytes(records.get(0));
assertThat(bytesConsumed.metricValue(), equalTo(totalBytes));
assertThat(recordsConsumed.metricValue(), equalTo(totalRecords));
queue.poll(6L);
++totalRecords;
totalBytes += consumerRecordSizeInBytes(records.get(1));
assertThat(bytesConsumed.metricValue(), equalTo(totalBytes));
assertThat(recordsConsumed.metricValue(), equalTo(totalRecords));
queue.poll(7L);
++totalRecords;
totalBytes += consumerRecordSizeInBytes(records.get(2));
assertThat(bytesConsumed.metricValue(), equalTo(totalBytes));
assertThat(recordsConsumed.metricValue(), equalTo(totalRecords));
}
@Test @Test
public void testTimeTracking() { public void testTimeTracking() {
assertTrue(queue.isEmpty()); assertTrue(queue.isEmpty());
@ -118,13 +183,13 @@ public class RecordQueueTest {
assertEquals(2L, queue.headRecordOffset().longValue()); assertEquals(2L, queue.headRecordOffset().longValue());
// poll the first record, now with 1, 3 // poll the first record, now with 1, 3
assertEquals(2L, queue.poll().timestamp); assertEquals(2L, queue.poll(0).timestamp);
assertEquals(2, queue.size()); assertEquals(2, queue.size());
assertEquals(1L, queue.headRecordTimestamp()); assertEquals(1L, queue.headRecordTimestamp());
assertEquals(1L, queue.headRecordOffset().longValue()); assertEquals(1L, queue.headRecordOffset().longValue());
// poll the second record, now with 3 // poll the second record, now with 3
assertEquals(1L, queue.poll().timestamp); assertEquals(1L, queue.poll(0).timestamp);
assertEquals(1, queue.size()); assertEquals(1, queue.size());
assertEquals(3L, queue.headRecordTimestamp()); assertEquals(3L, queue.headRecordTimestamp());
assertEquals(3L, queue.headRecordOffset().longValue()); assertEquals(3L, queue.headRecordOffset().longValue());
@ -143,21 +208,21 @@ public class RecordQueueTest {
assertEquals(3L, queue.headRecordOffset().longValue()); assertEquals(3L, queue.headRecordOffset().longValue());
// poll the third record, now with 4, 1, 2 // poll the third record, now with 4, 1, 2
assertEquals(3L, queue.poll().timestamp); assertEquals(3L, queue.poll(0).timestamp);
assertEquals(3, queue.size()); assertEquals(3, queue.size());
assertEquals(4L, queue.headRecordTimestamp()); assertEquals(4L, queue.headRecordTimestamp());
assertEquals(4L, queue.headRecordOffset().longValue()); assertEquals(4L, queue.headRecordOffset().longValue());
// poll the rest records // poll the rest records
assertEquals(4L, queue.poll().timestamp); assertEquals(4L, queue.poll(0).timestamp);
assertEquals(1L, queue.headRecordTimestamp()); assertEquals(1L, queue.headRecordTimestamp());
assertEquals(1L, queue.headRecordOffset().longValue()); assertEquals(1L, queue.headRecordOffset().longValue());
assertEquals(1L, queue.poll().timestamp); assertEquals(1L, queue.poll(0).timestamp);
assertEquals(2L, queue.headRecordTimestamp()); assertEquals(2L, queue.headRecordTimestamp());
assertEquals(2L, queue.headRecordOffset().longValue()); assertEquals(2L, queue.headRecordOffset().longValue());
assertEquals(2L, queue.poll().timestamp); assertEquals(2L, queue.poll(0).timestamp);
assertTrue(queue.isEmpty()); assertTrue(queue.isEmpty());
assertEquals(0, queue.size()); assertEquals(0, queue.size());
assertEquals(RecordQueue.UNKNOWN, queue.headRecordTimestamp()); assertEquals(RecordQueue.UNKNOWN, queue.headRecordTimestamp());
@ -176,7 +241,7 @@ public class RecordQueueTest {
assertEquals(4L, queue.headRecordOffset().longValue()); assertEquals(4L, queue.headRecordOffset().longValue());
// poll one record again, the timestamp should advance now // poll one record again, the timestamp should advance now
assertEquals(4L, queue.poll().timestamp); assertEquals(4L, queue.poll(0).timestamp);
assertEquals(2, queue.size()); assertEquals(2, queue.size());
assertEquals(5L, queue.headRecordTimestamp()); assertEquals(5L, queue.headRecordTimestamp());
assertEquals(5L, queue.headRecordOffset().longValue()); assertEquals(5L, queue.headRecordOffset().longValue());
@ -218,13 +283,13 @@ public class RecordQueueTest {
queue.addRawRecords(list1); queue.addRawRecords(list1);
assertThat(queue.partitionTime(), is(RecordQueue.UNKNOWN)); assertThat(queue.partitionTime(), is(RecordQueue.UNKNOWN));
queue.poll(); queue.poll(0);
assertThat(queue.partitionTime(), is(2L)); assertThat(queue.partitionTime(), is(2L));
queue.poll(); queue.poll(0);
assertThat(queue.partitionTime(), is(2L)); assertThat(queue.partitionTime(), is(2L));
queue.poll(); queue.poll(0);
assertThat(queue.partitionTime(), is(3L)); assertThat(queue.partitionTime(), is(3L));
} }
@ -251,13 +316,13 @@ public class RecordQueueTest {
queue.addRawRecords(list1); queue.addRawRecords(list1);
assertThat(queue.partitionTime(), is(150L)); assertThat(queue.partitionTime(), is(150L));
queue.poll(); queue.poll(0);
assertThat(queue.partitionTime(), is(200L)); assertThat(queue.partitionTime(), is(200L));
queue.setPartitionTime(500L); queue.setPartitionTime(500L);
assertThat(queue.partitionTime(), is(500L)); assertThat(queue.partitionTime(), is(500L));
queue.poll(); queue.poll(0);
assertThat(queue.partitionTime(), is(500L)); assertThat(queue.partitionTime(), is(500L));
} }
@ -299,7 +364,7 @@ public class RecordQueueTest {
queueThatSkipsDeserializeErrors.addRawRecords(records); queueThatSkipsDeserializeErrors.addRawRecords(records);
assertEquals(1, queueThatSkipsDeserializeErrors.size()); assertEquals(1, queueThatSkipsDeserializeErrors.size());
assertEquals(new CorruptedRecord(record), queueThatSkipsDeserializeErrors.poll()); assertEquals(new CorruptedRecord(record), queueThatSkipsDeserializeErrors.poll(0));
} }
@Test @Test
@ -313,7 +378,7 @@ public class RecordQueueTest {
queueThatSkipsDeserializeErrors.addRawRecords(records); queueThatSkipsDeserializeErrors.addRawRecords(records);
assertEquals(1, queueThatSkipsDeserializeErrors.size()); assertEquals(1, queueThatSkipsDeserializeErrors.size());
assertEquals(new CorruptedRecord(record), queueThatSkipsDeserializeErrors.poll()); assertEquals(new CorruptedRecord(record), queueThatSkipsDeserializeErrors.poll(0));
} }
@Test @Test
@ -394,13 +459,13 @@ public class RecordQueueTest {
// no (known) timestamp has yet been passed to the timestamp extractor // no (known) timestamp has yet been passed to the timestamp extractor
assertEquals(RecordQueue.UNKNOWN, timestampExtractor.partitionTime); assertEquals(RecordQueue.UNKNOWN, timestampExtractor.partitionTime);
queue.poll(); queue.poll(0);
assertEquals(2L, timestampExtractor.partitionTime); assertEquals(2L, timestampExtractor.partitionTime);
queue.poll(); queue.poll(0);
assertEquals(2L, timestampExtractor.partitionTime); assertEquals(2L, timestampExtractor.partitionTime);
queue.poll(); queue.poll(0);
assertEquals(3L, timestampExtractor.partitionTime); assertEquals(3L, timestampExtractor.partitionTime);
} }

View File

@ -122,8 +122,9 @@ public class WriteConsistencyVectorTest {
CHANGELOG_PARTITION.partition(), CHANGELOG_PARTITION.partition(),
TIMESTAMP, TIMESTAMP,
BYTES_KEY_SERIALIZER, BYTES_KEY_SERIALIZER,
BYTEARRAY_VALUE_SERIALIZER BYTEARRAY_VALUE_SERIALIZER,
); null,
null);
final StreamTask task = EasyMock.createNiceMock(StreamTask.class); final StreamTask task = EasyMock.createNiceMock(StreamTask.class);

View File

@ -58,6 +58,7 @@ import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetric
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ROLLUP_VALUE; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ROLLUP_VALUE;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.STATE_STORE_LEVEL_GROUP; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.STATE_STORE_LEVEL_GROUP;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_LEVEL_GROUP; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_LEVEL_GROUP;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TOPIC_LEVEL_GROUP;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TOTAL_SUFFIX; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TOTAL_SUFFIX;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgAndMaxLatencyToSensor; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgAndMaxLatencyToSensor;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCountToSensor; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCountToSensor;
@ -99,6 +100,10 @@ public class StreamsMetricsImplTest {
private final static String THREAD_ID1 = "test-thread-1"; private final static String THREAD_ID1 = "test-thread-1";
private final static String TASK_ID1 = "test-task-1"; private final static String TASK_ID1 = "test-task-1";
private final static String TASK_ID2 = "test-task-2"; private final static String TASK_ID2 = "test-task-2";
private final static String NODE_ID1 = "test-node-1";
private final static String NODE_ID2 = "test-node-2";
private final static String TOPIC_ID1 = "test-topic-1";
private final static String TOPIC_ID2 = "test-topic-2";
private final static String METRIC_NAME1 = "test-metric1"; private final static String METRIC_NAME1 = "test-metric1";
private final static String METRIC_NAME2 = "test-metric2"; private final static String METRIC_NAME2 = "test-metric2";
private final static String THREAD_ID_TAG = "thread-id"; private final static String THREAD_ID_TAG = "thread-id";
@ -319,6 +324,46 @@ public class StreamsMetricsImplTest {
assertThat(actualSensor, is(equalToObject(sensor))); assertThat(actualSensor, is(equalToObject(sensor)));
} }
@Test
public void shouldGetNewTopicLevelSensor() {
final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
setupGetNewSensorTest(metrics, recordingLevel);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
final Sensor actualSensor = streamsMetrics.topicLevelSensor(
THREAD_ID1,
TASK_ID1,
NODE_ID1,
TOPIC_ID1,
SENSOR_NAME_1,
recordingLevel
);
verify(metrics);
assertThat(actualSensor, is(equalToObject(sensor)));
}
@Test
public void shouldGetExistingTopicLevelSensor() {
final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
setupGetExistingSensorTest(metrics);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
final Sensor actualSensor = streamsMetrics.topicLevelSensor(
THREAD_ID1,
TASK_ID1,
NODE_ID1,
TOPIC_ID1,
SENSOR_NAME_1,
recordingLevel
);
verify(metrics);
assertThat(actualSensor, is(equalToObject(sensor)));
}
@Test @Test
public void shouldGetNewStoreLevelSensorIfNoneExists() { public void shouldGetNewStoreLevelSensorIfNoneExists() {
final Metrics metrics = mock(Metrics.class); final Metrics metrics = mock(Metrics.class);
@ -505,14 +550,13 @@ public class StreamsMetricsImplTest {
public void shouldGetNewNodeLevelSensor() { public void shouldGetNewNodeLevelSensor() {
final Metrics metrics = mock(Metrics.class); final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO; final RecordingLevel recordingLevel = RecordingLevel.INFO;
final String processorNodeName = "processorNodeName";
setupGetNewSensorTest(metrics, recordingLevel); setupGetNewSensorTest(metrics, recordingLevel);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time); final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
final Sensor actualSensor = streamsMetrics.nodeLevelSensor( final Sensor actualSensor = streamsMetrics.nodeLevelSensor(
THREAD_ID1, THREAD_ID1,
TASK_ID1, TASK_ID1,
processorNodeName, NODE_ID1,
SENSOR_NAME_1, SENSOR_NAME_1,
recordingLevel recordingLevel
); );
@ -525,14 +569,13 @@ public class StreamsMetricsImplTest {
public void shouldGetExistingNodeLevelSensor() { public void shouldGetExistingNodeLevelSensor() {
final Metrics metrics = mock(Metrics.class); final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO; final RecordingLevel recordingLevel = RecordingLevel.INFO;
final String processorNodeName = "processorNodeName";
setupGetExistingSensorTest(metrics); setupGetExistingSensorTest(metrics);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time); final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
final Sensor actualSensor = streamsMetrics.nodeLevelSensor( final Sensor actualSensor = streamsMetrics.nodeLevelSensor(
THREAD_ID1, THREAD_ID1,
TASK_ID1, TASK_ID1,
processorNodeName, NODE_ID1,
SENSOR_NAME_1, SENSOR_NAME_1,
recordingLevel recordingLevel
); );
@ -732,6 +775,9 @@ public class StreamsMetricsImplTest {
final String processorNodeName = "processorNodeName"; final String processorNodeName = "processorNodeName";
final Map<String, String> nodeTags = mkMap(mkEntry("nkey", "value")); final Map<String, String> nodeTags = mkMap(mkEntry("nkey", "value"));
final String topicName = "topicName";
final Map<String, String> topicTags = mkMap(mkEntry("tkey", "value"));
final Sensor parent1 = metrics.taskLevelSensor(THREAD_ID1, taskName, operation, RecordingLevel.DEBUG); final Sensor parent1 = metrics.taskLevelSensor(THREAD_ID1, taskName, operation, RecordingLevel.DEBUG);
addAvgAndMaxLatencyToSensor(parent1, PROCESSOR_NODE_LEVEL_GROUP, taskTags, operation); addAvgAndMaxLatencyToSensor(parent1, PROCESSOR_NODE_LEVEL_GROUP, taskTags, operation);
addInvocationRateAndCountToSensor(parent1, PROCESSOR_NODE_LEVEL_GROUP, taskTags, operation, "", ""); addInvocationRateAndCountToSensor(parent1, PROCESSOR_NODE_LEVEL_GROUP, taskTags, operation, "", "");
@ -744,6 +790,18 @@ public class StreamsMetricsImplTest {
assertThat(registry.metrics().size(), greaterThan(numberOfTaskMetrics)); assertThat(registry.metrics().size(), greaterThan(numberOfTaskMetrics));
final int numberOfNodeMetrics = registry.metrics().size();
final Sensor child1 = metrics.topicLevelSensor(THREAD_ID1, taskName, processorNodeName, topicName, operation, RecordingLevel.DEBUG, sensor1);
addAvgAndMaxLatencyToSensor(child1, TOPIC_LEVEL_GROUP, topicTags, operation);
addInvocationRateAndCountToSensor(child1, TOPIC_LEVEL_GROUP, topicTags, operation, "", "");
assertThat(registry.metrics().size(), greaterThan(numberOfNodeMetrics));
metrics.removeAllTopicLevelSensors(THREAD_ID1, taskName, processorNodeName, topicName);
assertThat(registry.metrics().size(), equalTo(numberOfNodeMetrics));
metrics.removeAllNodeLevelSensors(THREAD_ID1, taskName, processorNodeName); metrics.removeAllNodeLevelSensors(THREAD_ID1, taskName, processorNodeName);
assertThat(registry.metrics().size(), equalTo(numberOfTaskMetrics)); assertThat(registry.metrics().size(), equalTo(numberOfTaskMetrics));
@ -1104,6 +1162,22 @@ public class StreamsMetricsImplTest {
assertThat(metrics.metrics().size(), equalTo(1 + 1)); // one metric is added automatically in the constructor of Metrics assertThat(metrics.metrics().size(), equalTo(1 + 1)); // one metric is added automatically in the constructor of Metrics
} }
@Test
public void shouldAddTotalCountAndSumMetricsToSensor() {
final String totalMetricNamePrefix = "total";
final String sumMetricNamePrefix = "count";
StreamsMetricsImpl
.addTotalCountAndSumMetricsToSensor(sensor, group, tags, totalMetricNamePrefix, sumMetricNamePrefix, DESCRIPTION1, DESCRIPTION2);
final double valueToRecord1 = 18.0;
final double valueToRecord2 = 42.0;
final double expectedCountMetricValue = 2;
verifyMetric(totalMetricNamePrefix + "-total", DESCRIPTION1, valueToRecord1, valueToRecord2, expectedCountMetricValue);
final double expectedSumMetricValue = 2 * valueToRecord1 + 2 * valueToRecord2; // values are recorded once for each metric verification
verifyMetric(sumMetricNamePrefix + "-total", DESCRIPTION2, valueToRecord1, valueToRecord2, expectedSumMetricValue);
assertThat(metrics.metrics().size(), equalTo(2 + 1)); // one metric is added automatically in the constructor of Metrics
}
@Test @Test
public void shouldAddAvgAndTotalMetricsToSensor() { public void shouldAddAvgAndTotalMetricsToSensor() {
StreamsMetricsImpl StreamsMetricsImpl

View File

@ -100,7 +100,7 @@ public class TaskMetricsTest {
); );
final Sensor sensor = TaskMetrics.totalBytesSensor(THREAD_ID, TASK_ID, streamsMetrics); final Sensor sensor = TaskMetrics.totalInputBufferBytesSensor(THREAD_ID, TASK_ID, streamsMetrics);
assertThat(sensor, is(expectedSensor)); assertThat(sensor, is(expectedSensor));
} }

View File

@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.internals.metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
import org.junit.AfterClass;
import org.junit.Test;
import org.mockito.MockedStatic;
import java.util.Collections;
import java.util.Map;
import java.util.function.Supplier;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TOPIC_LEVEL_GROUP;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.when;
public class TopicMetricsTest {
private static final String THREAD_ID = "test-thread";
private static final String TASK_ID = "test-task";
private static final String PROCESSOR_NODE_ID = "test-processor";
private static final String TOPIC_NAME = "topic";
private final Map<String, String> tagMap = Collections.singletonMap("hello", "world");
private final Sensor expectedSensor = mock(Sensor.class);
private static final MockedStatic<StreamsMetricsImpl> STREAMS_METRICS_STATIC_MOCK = mockStatic(StreamsMetricsImpl.class);
private final StreamsMetricsImpl streamsMetrics = mock(StreamsMetricsImpl.class);
@AfterClass
public static void cleanUp() {
STREAMS_METRICS_STATIC_MOCK.close();
}
@Test
public void shouldGetRecordsAndBytesConsumedSensor() {
final String recordsMetricNamePrefix = "records-consumed";
final String bytesMetricNamePrefix = "bytes-consumed";
final String descriptionOfRecordsTotal = "The total number of records consumed from this topic";
final String descriptionOfBytesTotal = "The total number of bytes consumed from this topic";
when(streamsMetrics.topicLevelSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, TOPIC_NAME, "consumed", RecordingLevel.INFO))
.thenReturn(expectedSensor);
when(streamsMetrics.topicLevelSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, TOPIC_NAME, "consumed", RecordingLevel.INFO))
.thenReturn(expectedSensor);
when(streamsMetrics.topicLevelTagMap(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, TOPIC_NAME)).thenReturn(tagMap);
verifySensor(
() -> TopicMetrics.consumedSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, TOPIC_NAME, streamsMetrics)
);
STREAMS_METRICS_STATIC_MOCK.verify(
() -> StreamsMetricsImpl.addTotalCountAndSumMetricsToSensor(
expectedSensor,
TOPIC_LEVEL_GROUP,
tagMap,
recordsMetricNamePrefix,
bytesMetricNamePrefix,
descriptionOfRecordsTotal,
descriptionOfBytesTotal
)
);
}
@Test
public void shouldGetRecordsAndBytesProducedSensor() {
final String recordsMetricNamePrefix = "records-produced";
final String bytesMetricNamePrefix = "bytes-produced";
final String descriptionOfRecordsTotal = "The total number of records produced to this topic";
final String descriptionOfBytesTotal = "The total number of bytes produced to this topic";
when(streamsMetrics.topicLevelSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, TOPIC_NAME, "produced", RecordingLevel.INFO))
.thenReturn(expectedSensor);
when(streamsMetrics.topicLevelSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, TOPIC_NAME, "produced", RecordingLevel.INFO))
.thenReturn(expectedSensor);
when(streamsMetrics.topicLevelTagMap(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, TOPIC_NAME)).thenReturn(tagMap);
verifySensor(() -> TopicMetrics.producedSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, TOPIC_NAME, streamsMetrics));
STREAMS_METRICS_STATIC_MOCK.verify(
() -> StreamsMetricsImpl.addTotalCountAndSumMetricsToSensor(
expectedSensor,
TOPIC_LEVEL_GROUP,
tagMap,
recordsMetricNamePrefix,
bytesMetricNamePrefix,
descriptionOfRecordsTotal,
descriptionOfBytesTotal
)
);
}
private void verifySensor(final Supplier<Sensor> sensorSupplier) {
final Sensor sensor = sensorSupplier.get();
assertThat(sensor, is(expectedSensor));
}
}

View File

@ -32,7 +32,9 @@ import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.RecordCollector; import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.processor.internals.RecordCollectorImpl; import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
import org.apache.kafka.streams.processor.internals.StreamsProducer; import org.apache.kafka.streams.processor.internals.StreamsProducer;
@ -45,6 +47,7 @@ import org.apache.kafka.test.MockTimestampExtractor;
import org.apache.kafka.test.TestUtils; import org.apache.kafka.test.TestUtils;
import java.io.File; import java.io.File;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.LinkedList; import java.util.LinkedList;
@ -54,6 +57,9 @@ import java.util.Objects;
import java.util.Properties; import java.util.Properties;
import java.util.Set; import java.util.Set;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/** /**
* A component that provides a {@link #context() ProcessingContext} that can be supplied to a {@link KeyValueStore} so that * A component that provides a {@link #context() ProcessingContext} that can be supplied to a {@link KeyValueStore} so that
* all entries written to the Kafka topic by the store during {@link KeyValueStore#flush()} are captured for testing purposes. * all entries written to the Kafka topic by the store during {@link KeyValueStore#flush()} are captured for testing purposes.
@ -199,6 +205,9 @@ public class KeyValueStoreTestDriver<K, V> {
props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, MockRocksDbConfigSetter.class); props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, MockRocksDbConfigSetter.class);
props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG"); props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG");
final ProcessorTopology topology = mock(ProcessorTopology.class);
when(topology.sinkTopics()).thenReturn(Collections.emptySet());
final LogContext logContext = new LogContext("KeyValueStoreTestDriver "); final LogContext logContext = new LogContext("KeyValueStoreTestDriver ");
final RecordCollector recordCollector = new RecordCollectorImpl( final RecordCollector recordCollector = new RecordCollectorImpl(
logContext, logContext,
@ -212,7 +221,8 @@ public class KeyValueStoreTestDriver<K, V> {
logContext, logContext,
Time.SYSTEM), Time.SYSTEM),
new DefaultProductionExceptionHandler(), new DefaultProductionExceptionHandler(),
new MockStreamsMetrics(new Metrics()) new MockStreamsMetrics(new Metrics()),
topology
) { ) {
@Override @Override
public <K1, V1> void send(final String topic, public <K1, V1> void send(final String topic,
@ -222,11 +232,16 @@ public class KeyValueStoreTestDriver<K, V> {
final Integer partition, final Integer partition,
final Long timestamp, final Long timestamp,
final Serializer<K1> keySerializer, final Serializer<K1> keySerializer,
final Serializer<V1> valueSerializer) { final Serializer<V1> valueSerializer,
final String processorNodeId,
final InternalProcessorContext<Void, Void> context) {
// for byte arrays we need to wrap it for comparison // for byte arrays we need to wrap it for comparison
final K keyTest = serdes.keyFrom(keySerializer.serialize(topic, headers, key)); final byte[] keyBytes = keySerializer.serialize(topic, headers, key);
final V valueTest = serdes.valueFrom(valueSerializer.serialize(topic, headers, value)); final byte[] valueBytes = valueSerializer.serialize(topic, headers, value);
final K keyTest = serdes.keyFrom(keyBytes);
final V valueTest = serdes.valueFrom(valueBytes);
recordFlushed(keyTest, valueTest); recordFlushed(keyTest, valueTest);
} }
@ -239,6 +254,8 @@ public class KeyValueStoreTestDriver<K, V> {
final Long timestamp, final Long timestamp,
final Serializer<K1> keySerializer, final Serializer<K1> keySerializer,
final Serializer<V1> valueSerializer, final Serializer<V1> valueSerializer,
final String processorNodeId,
final InternalProcessorContext<Void, Void> context,
final StreamPartitioner<? super K1, ? super V1> partitioner) { final StreamPartitioner<? super K1, ? super V1> partitioner) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }

View File

@ -430,7 +430,9 @@ public class StreamThreadStateStoreProviderTest {
Time.SYSTEM Time.SYSTEM
), ),
streamsConfig.defaultProductionExceptionHandler(), streamsConfig.defaultProductionExceptionHandler(),
new MockStreamsMetrics(metrics)); new MockStreamsMetrics(metrics),
topology
);
final StreamsMetricsImpl streamsMetrics = new MockStreamsMetrics(metrics); final StreamsMetricsImpl streamsMetrics = new MockStreamsMetrics(metrics);
final InternalProcessorContext context = new ProcessorContextImpl( final InternalProcessorContext context = new ProcessorContextImpl(
taskId, taskId,

View File

@ -460,7 +460,9 @@ public class InternalMockProcessorContext<KOut, VOut>
taskId().partition(), taskId().partition(),
timestamp, timestamp,
BYTES_KEY_SERIALIZER, BYTES_KEY_SERIALIZER,
BYTEARRAY_VALUE_SERIALIZER); BYTEARRAY_VALUE_SERIALIZER,
null,
null);
} }
@Override @Override

View File

@ -21,6 +21,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.RecordCollector; import org.apache.kafka.streams.processor.internals.RecordCollector;
import java.util.Collections; import java.util.Collections;
@ -46,7 +47,9 @@ public class MockRecordCollector implements RecordCollector {
final Integer partition, final Integer partition,
final Long timestamp, final Long timestamp,
final Serializer<K> keySerializer, final Serializer<K> keySerializer,
final Serializer<V> valueSerializer) { final Serializer<V> valueSerializer,
final String processorNodeId,
final InternalProcessorContext<Void, Void> context) {
collected.add(new ProducerRecord<>(topic, collected.add(new ProducerRecord<>(topic,
partition, partition,
timestamp, timestamp,
@ -63,6 +66,8 @@ public class MockRecordCollector implements RecordCollector {
final Long timestamp, final Long timestamp,
final Serializer<K> keySerializer, final Serializer<K> keySerializer,
final Serializer<V> valueSerializer, final Serializer<V> valueSerializer,
final String processorNodeId,
final InternalProcessorContext<Void, Void> context,
final StreamPartitioner<? super K, ? super V> partitioner) { final StreamPartitioner<? super K, ? super V> partitioner) {
collected.add(new ProducerRecord<>(topic, collected.add(new ProducerRecord<>(topic,
0, // partition id 0, // partition id

View File

@ -17,6 +17,7 @@
package org.apache.kafka.streams.scala.kstream package org.apache.kafka.streams.scala.kstream
import org.apache.kafka.streams.kstream.Named import org.apache.kafka.streams.kstream.Named
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.scala.ImplicitConversions._ import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.StreamsBuilder import org.apache.kafka.streams.scala.StreamsBuilder
import org.apache.kafka.streams.scala.serialization.Serdes._ import org.apache.kafka.streams.scala.serialization.Serdes._
@ -35,7 +36,7 @@ class KStreamSplitTest extends TestDriver {
val sinkTopic = Array("default", "even", "three"); val sinkTopic = Array("default", "even", "three");
val m = builder val m = builder
.stream[Integer, Integer](sourceTopic) .stream[Int, Int](sourceTopic)
.split(Named.as("_")) .split(Named.as("_"))
.branch((_, v) => v % 2 == 0) .branch((_, v) => v % 2 == 0)
.branch((_, v) => v % 3 == 0) .branch((_, v) => v % 3 == 0)
@ -46,14 +47,17 @@ class KStreamSplitTest extends TestDriver {
m("_2").to(sinkTopic(2)) m("_2").to(sinkTopic(2))
val testDriver = createTestDriver(builder) val testDriver = createTestDriver(builder)
val testInput = testDriver.createInput[Integer, Integer](sourceTopic) val testInput = testDriver.createInput[Int, Int](sourceTopic)
val testOutput = sinkTopic.map(name => testDriver.createOutput[Integer, Integer](name)) val testOutput = sinkTopic.map(name => testDriver.createOutput[Int, Int](name))
testInput pipeKeyValueList List(
new KeyValue(1, 1),
new KeyValue(1, 2),
new KeyValue(1, 3),
new KeyValue(1, 4),
new KeyValue(1, 5)
).asJava
testInput.pipeValueList(
List(1, 2, 3, 4, 5)
.map(Integer.valueOf)
.asJava
)
assertEquals(List(1, 5), testOutput(0).readValuesToList().asScala) assertEquals(List(1, 5), testOutput(0).readValuesToList().asScala)
assertEquals(List(2, 4), testOutput(1).readValuesToList().asScala) assertEquals(List(2, 4), testOutput(1).readValuesToList().asScala)
assertEquals(List(3), testOutput(2).readValuesToList().asScala) assertEquals(List(3), testOutput(2).readValuesToList().asScala)
@ -67,7 +71,7 @@ class KStreamSplitTest extends TestDriver {
val sourceTopic = "source" val sourceTopic = "source"
val m = builder val m = builder
.stream[Integer, Integer](sourceTopic) .stream[Int, Int](sourceTopic)
.split(Named.as("_")) .split(Named.as("_"))
.branch((_, v) => v % 2 == 0, Branched.withConsumer(ks => ks.to("even"), "consumedEvens")) .branch((_, v) => v % 2 == 0, Branched.withConsumer(ks => ks.to("even"), "consumedEvens"))
.branch((_, v) => v % 3 == 0, Branched.withFunction(ks => ks.mapValues(x => x * x), "mapped")) .branch((_, v) => v % 3 == 0, Branched.withFunction(ks => ks.mapValues(x => x * x), "mapped"))
@ -76,15 +80,18 @@ class KStreamSplitTest extends TestDriver {
m("_mapped").to("mapped") m("_mapped").to("mapped")
val testDriver = createTestDriver(builder) val testDriver = createTestDriver(builder)
val testInput = testDriver.createInput[Integer, Integer](sourceTopic) val testInput = testDriver.createInput[Int, Int](sourceTopic)
testInput.pipeValueList( testInput pipeKeyValueList List(
List(1, 2, 3, 4, 5, 9) new KeyValue(1, 1),
.map(Integer.valueOf) new KeyValue(1, 2),
.asJava new KeyValue(1, 3),
) new KeyValue(1, 4),
new KeyValue(1, 5),
new KeyValue(1, 9)
).asJava
val even = testDriver.createOutput[Integer, Integer]("even") val even = testDriver.createOutput[Int, Int]("even")
val mapped = testDriver.createOutput[Integer, Integer]("mapped") val mapped = testDriver.createOutput[Int, Int]("mapped")
assertEquals(List(2, 4), even.readValuesToList().asScala) assertEquals(List(2, 4), even.readValuesToList().asScala)
assertEquals(List(9, 81), mapped.readValuesToList().asScala) assertEquals(List(9, 81), mapped.readValuesToList().asScala)
@ -98,7 +105,7 @@ class KStreamSplitTest extends TestDriver {
val sourceTopic = "source" val sourceTopic = "source"
val m = builder val m = builder
.stream[Integer, Integer](sourceTopic) .stream[Int, Int](sourceTopic)
.split(Named.as("_")) .split(Named.as("_"))
.branch((_, v) => v % 2 == 0, Branched.withConsumer(ks => ks.to("even"))) .branch((_, v) => v % 2 == 0, Branched.withConsumer(ks => ks.to("even")))
.branch((_, v) => v % 3 == 0, Branched.withFunction(ks => ks.mapValues(x => x * x))) .branch((_, v) => v % 3 == 0, Branched.withFunction(ks => ks.mapValues(x => x * x)))
@ -107,19 +114,23 @@ class KStreamSplitTest extends TestDriver {
m("_2").to("mapped") m("_2").to("mapped")
val testDriver = createTestDriver(builder) val testDriver = createTestDriver(builder)
val testInput = testDriver.createInput[Integer, Integer](sourceTopic) val testInput = testDriver.createInput[Int, Int](sourceTopic)
testInput.pipeValueList( testInput pipeKeyValueList List(
List(1, 2, 3, 4, 5, 9) new KeyValue(1, 1),
.map(Integer.valueOf) new KeyValue(1, 2),
.asJava new KeyValue(1, 3),
) new KeyValue(1, 4),
new KeyValue(1, 5),
new KeyValue(1, 9)
).asJava
val even = testDriver.createOutput[Integer, Integer]("even") val even = testDriver.createOutput[Int, Int]("even")
val mapped = testDriver.createOutput[Integer, Integer]("mapped") val mapped = testDriver.createOutput[Int, Int]("mapped")
assertEquals(List(2, 4), even.readValuesToList().asScala) assertEquals(List(2, 4), even.readValuesToList().asScala)
assertEquals(List(9, 81), mapped.readValuesToList().asScala) assertEquals(List(9, 81), mapped.readValuesToList().asScala)
testDriver.close() testDriver.close()
} }
} }

View File

@ -498,7 +498,8 @@ public class TopologyTestDriver implements Closeable {
TASK_ID, TASK_ID,
testDriverProducer, testDriverProducer,
streamsConfig.defaultProductionExceptionHandler(), streamsConfig.defaultProductionExceptionHandler(),
streamsMetrics streamsMetrics,
processorTopology
); );
final InternalProcessorContext context = new ProcessorContextImpl( final InternalProcessorContext context = new ProcessorContextImpl(

View File

@ -43,6 +43,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
import java.util.Properties; import java.util.Properties;
import java.util.stream.Collectors;
import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.equalTo;
@ -89,12 +90,12 @@ public class TestTopicsTest {
@Test @Test
public void testValue() { public void testValue() {
final TestInputTopic<String, String> inputTopic = final TestInputTopic<Long, String> inputTopic =
testDriver.createInputTopic(INPUT_TOPIC, stringSerde.serializer(), stringSerde.serializer()); testDriver.createInputTopic(INPUT_TOPIC, longSerde.serializer(), stringSerde.serializer());
final TestOutputTopic<String, String> outputTopic = final TestOutputTopic<String, String> outputTopic =
testDriver.createOutputTopic(OUTPUT_TOPIC, stringSerde.deserializer(), stringSerde.deserializer()); testDriver.createOutputTopic(OUTPUT_TOPIC, stringSerde.deserializer(), stringSerde.deserializer());
//Feed word "Hello" to inputTopic and no kafka key, timestamp is irrelevant in this case //Feed word "Hello" to inputTopic, timestamp and key irrelevant in this case
inputTopic.pipeInput("Hello"); inputTopic.pipeInput(1L, "Hello");
assertThat(outputTopic.readValue(), equalTo("Hello")); assertThat(outputTopic.readValue(), equalTo("Hello"));
//No more output in topic //No more output in topic
assertThat(outputTopic.isEmpty(), is(true)); assertThat(outputTopic.isEmpty(), is(true));
@ -102,16 +103,20 @@ public class TestTopicsTest {
@Test @Test
public void testValueList() { public void testValueList() {
final TestInputTopic<String, String> inputTopic = final TestInputTopic<Long, String> inputTopic =
testDriver.createInputTopic(INPUT_TOPIC, stringSerde.serializer(), stringSerde.serializer()); testDriver.createInputTopic(INPUT_TOPIC, longSerde.serializer(), stringSerde.serializer());
final TestOutputTopic<String, String> outputTopic = final TestOutputTopic<String, String> outputTopic =
testDriver.createOutputTopic(OUTPUT_TOPIC, stringSerde.deserializer(), stringSerde.deserializer()); testDriver.createOutputTopic(OUTPUT_TOPIC, stringSerde.deserializer(), stringSerde.deserializer());
final List<String> inputList = Arrays.asList("This", "is", "an", "example"); final List<KeyValue<Long, String>> inputList = Arrays.asList(
//Feed list of words to inputTopic and no kafka key, timestamp is irrelevant in this case new KeyValue<>(1L, "This"),
inputTopic.pipeValueList(inputList); new KeyValue<>(2L, "is"),
new KeyValue<>(3L, "an"),
new KeyValue<>(4L, "example"));
//Feed list of words to inputTopic, key and timestamp are irrelevant in this case
inputTopic.pipeKeyValueList(inputList);
final List<String> output = outputTopic.readValuesToList(); final List<String> output = outputTopic.readValuesToList();
assertThat(output, hasItems("This", "is", "an", "example")); assertThat(output, hasItems("This", "is", "an", "example"));
assertThat(output, is(equalTo(inputList))); assertThat(output, is(equalTo(inputList.stream().map(kv -> kv.value).collect(Collectors.toList()))));
} }
@Test @Test
@ -166,15 +171,16 @@ public class TestTopicsTest {
} }
@Test @Test
public void testKeyValuesToMapWithNull() { public void testPipeInputWithNullKey() {
final TestInputTopic<Long, String> inputTopic = final TestInputTopic<Long, String> inputTopic =
testDriver.createInputTopic(INPUT_TOPIC, longSerde.serializer(), stringSerde.serializer()); testDriver.createInputTopic(INPUT_TOPIC, longSerde.serializer(), stringSerde.serializer());
final TestOutputTopic<Long, String> outputTopic = final TestOutputTopic<Long, String> outputTopic =
testDriver.createOutputTopic(OUTPUT_TOPIC, longSerde.deserializer(), stringSerde.deserializer()); testDriver.createOutputTopic(OUTPUT_TOPIC, longSerde.deserializer(), stringSerde.deserializer());
inputTopic.pipeInput("value"); final StreamsException exception = assertThrows(StreamsException.class, () -> inputTopic.pipeInput("value"));
assertThrows(IllegalStateException.class, outputTopic::readKeyValuesToMap); assertThat(exception.getCause() instanceof NullPointerException, is(true));
} assertThat(outputTopic.readKeyValuesToMap().isEmpty(), is(true));
}
@Test @Test
public void testKeyValueListDuration() { public void testKeyValueListDuration() {
@ -229,8 +235,8 @@ public class TestTopicsTest {
testDriver.createInputTopic(INPUT_TOPIC, longSerde.serializer(), stringSerde.serializer()); testDriver.createInputTopic(INPUT_TOPIC, longSerde.serializer(), stringSerde.serializer());
final TestOutputTopic<Long, String> outputTopic = final TestOutputTopic<Long, String> outputTopic =
testDriver.createOutputTopic(OUTPUT_TOPIC, longSerde.deserializer(), stringSerde.deserializer()); testDriver.createOutputTopic(OUTPUT_TOPIC, longSerde.deserializer(), stringSerde.deserializer());
inputTopic.pipeInput(null, "Hello", baseTime); inputTopic.pipeInput(1L, "Hello", baseTime);
assertThat(outputTopic.readRecord(), is(equalTo(new TestRecord<>(null, "Hello", null, baseTime)))); assertThat(outputTopic.readRecord(), is(equalTo(new TestRecord<>(1L, "Hello", null, baseTime))));
inputTopic.pipeInput(2L, "Kafka", ++baseTime); inputTopic.pipeInput(2L, "Kafka", ++baseTime);
assertThat(outputTopic.readRecord(), is(equalTo(new TestRecord<>(2L, "Kafka", null, baseTime)))); assertThat(outputTopic.readRecord(), is(equalTo(new TestRecord<>(2L, "Kafka", null, baseTime))));
@ -238,13 +244,15 @@ public class TestTopicsTest {
inputTopic.pipeInput(2L, "Kafka", testBaseTime); inputTopic.pipeInput(2L, "Kafka", testBaseTime);
assertThat(outputTopic.readRecord(), is(equalTo(new TestRecord<>(2L, "Kafka", testBaseTime)))); assertThat(outputTopic.readRecord(), is(equalTo(new TestRecord<>(2L, "Kafka", testBaseTime))));
final List<String> inputList = Arrays.asList("Advancing", "time"); final List<KeyValue<Long, String>> inputList = Arrays.asList(
new KeyValue<>(1L, "Advancing"),
new KeyValue<>(2L, "time"));
//Feed list of words to inputTopic and no kafka key, timestamp advancing from testInstant //Feed list of words to inputTopic and no kafka key, timestamp advancing from testInstant
final Duration advance = Duration.ofSeconds(15); final Duration advance = Duration.ofSeconds(15);
final Instant recordInstant = testBaseTime.plus(Duration.ofDays(1)); final Instant recordInstant = testBaseTime.plus(Duration.ofDays(1));
inputTopic.pipeValueList(inputList, recordInstant, advance); inputTopic.pipeKeyValueList(inputList, recordInstant, advance);
assertThat(outputTopic.readRecord(), is(equalTo(new TestRecord<>(null, "Advancing", recordInstant)))); assertThat(outputTopic.readRecord(), is(equalTo(new TestRecord<>(1L, "Advancing", recordInstant))));
assertThat(outputTopic.readRecord(), is(equalTo(new TestRecord<>(null, "time", null, recordInstant.plus(advance))))); assertThat(outputTopic.readRecord(), is(equalTo(new TestRecord<>(2L, "time", null, recordInstant.plus(advance)))));
} }
@Test @Test
@ -292,8 +300,8 @@ public class TestTopicsTest {
testDriver.createInputTopic(INPUT_TOPIC, longSerde.serializer(), stringSerde.serializer(), testBaseTime, advance); testDriver.createInputTopic(INPUT_TOPIC, longSerde.serializer(), stringSerde.serializer(), testBaseTime, advance);
final TestOutputTopic<Long, String> outputTopic = final TestOutputTopic<Long, String> outputTopic =
testDriver.createOutputTopic(OUTPUT_TOPIC, longSerde.deserializer(), stringSerde.deserializer()); testDriver.createOutputTopic(OUTPUT_TOPIC, longSerde.deserializer(), stringSerde.deserializer());
inputTopic.pipeInput("Hello"); inputTopic.pipeInput(1L, "Hello");
assertThat(outputTopic.readRecord(), is(equalTo(new TestRecord<>(null, "Hello", testBaseTime)))); assertThat(outputTopic.readRecord(), is(equalTo(new TestRecord<>(1L, "Hello", testBaseTime))));
inputTopic.pipeInput(2L, "Kafka"); inputTopic.pipeInput(2L, "Kafka");
assertThat(outputTopic.readRecord(), is(equalTo(new TestRecord<>(2L, "Kafka", testBaseTime.plus(advance))))); assertThat(outputTopic.readRecord(), is(equalTo(new TestRecord<>(2L, "Kafka", testBaseTime.plus(advance)))));
} }
@ -337,12 +345,12 @@ public class TestTopicsTest {
@Test @Test
public void testEmptyTopic() { public void testEmptyTopic() {
final TestInputTopic<String, String> inputTopic = final TestInputTopic<Long, String> inputTopic =
testDriver.createInputTopic(INPUT_TOPIC, stringSerde.serializer(), stringSerde.serializer()); testDriver.createInputTopic(INPUT_TOPIC, longSerde.serializer(), stringSerde.serializer());
final TestOutputTopic<String, String> outputTopic = final TestOutputTopic<String, String> outputTopic =
testDriver.createOutputTopic(OUTPUT_TOPIC, stringSerde.deserializer(), stringSerde.deserializer()); testDriver.createOutputTopic(OUTPUT_TOPIC, stringSerde.deserializer(), stringSerde.deserializer());
//Feed word "Hello" to inputTopic and no kafka key, timestamp is irrelevant in this case //Feed word "Hello" to inputTopic and no kafka key, timestamp is irrelevant in this case
inputTopic.pipeInput("Hello"); inputTopic.pipeInput(1L, "Hello");
assertThat(outputTopic.readValue(), equalTo("Hello")); assertThat(outputTopic.readValue(), equalTo("Hello"));
//No more output in topic //No more output in topic
assertThrows(NoSuchElementException.class, outputTopic::readRecord, "Empty topic"); assertThrows(NoSuchElementException.class, outputTopic::readRecord, "Empty topic");