mirror of https://github.com/apache/kafka.git
KAFKA-6819: Pt. 1 - Refactor thread-level Streams metrics (#6631)
* StreamsMetricsImpl wraps the Kafka Streams' metrics registry and provides logic to create and register sensors and their corresponding metrics. An example for such logic can be found in threadLevelSensor(). Furthermore, StreamsMetricsmpl keeps track of the sensors on the different levels of an application, i.e., thread, task, etc., and provides logic to remove sensors per level, e.g., removeAllThreadLevelSensors(). There is one StreamsMetricsImpl object per application instance. * ThreadMetrics contains only static methods that specify all built-in thread-level sensors and metrics and provide logic to register and retrieve those thread-level sensors, e.g., commitSensor(). * From anywhere inside the code base with access to StreamsMetricsImpl, thread-level sensors can be accessed by using ThreadMetrics. * ThreadsMetrics does not inherit from StreamsMetricsImpl anymore. Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, John Roesler <john@confluent.io>, Guozhang Wang <guozhang@confluent.io>
This commit is contained in:
parent
45fae33937
commit
17712b96c8
|
|
@ -1120,6 +1120,8 @@ project(':streams') {
|
|||
testCompile libs.log4j
|
||||
testCompile libs.junit
|
||||
testCompile libs.easymock
|
||||
testCompile libs.powermockJunit4
|
||||
testCompile libs.powermockEasymock
|
||||
testCompile libs.bcpkix
|
||||
testCompile libs.hamcrest
|
||||
|
||||
|
|
|
|||
|
|
@ -16,12 +16,14 @@
|
|||
*/
|
||||
package org.apache.kafka.streams.kstream.internals;
|
||||
|
||||
import org.apache.kafka.common.metrics.Sensor;
|
||||
import org.apache.kafka.streams.kstream.Aggregator;
|
||||
import org.apache.kafka.streams.kstream.Initializer;
|
||||
import org.apache.kafka.streams.processor.AbstractProcessor;
|
||||
import org.apache.kafka.streams.processor.Processor;
|
||||
import org.apache.kafka.streams.processor.ProcessorContext;
|
||||
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
|
||||
import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
|
||||
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
|
||||
import org.apache.kafka.streams.state.ValueAndTimestamp;
|
||||
import org.slf4j.Logger;
|
||||
|
|
@ -57,6 +59,7 @@ public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K,
|
|||
private class KStreamAggregateProcessor extends AbstractProcessor<K, V> {
|
||||
private TimestampedKeyValueStore<K, T> store;
|
||||
private StreamsMetricsImpl metrics;
|
||||
private Sensor skippedRecordsSensor;
|
||||
private TimestampedTupleForwarder<K, T> tupleForwarder;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
|
|
@ -64,6 +67,7 @@ public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K,
|
|||
public void init(final ProcessorContext context) {
|
||||
super.init(context);
|
||||
metrics = (StreamsMetricsImpl) context.metrics();
|
||||
skippedRecordsSensor = ThreadMetrics.skipRecordSensor(metrics);
|
||||
store = (TimestampedKeyValueStore<K, T>) context.getStateStore(storeName);
|
||||
tupleForwarder = new TimestampedTupleForwarder<>(
|
||||
store,
|
||||
|
|
@ -80,7 +84,7 @@ public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K,
|
|||
"Skipping record due to null key or value. key=[{}] value=[{}] topic=[{}] partition=[{}] offset=[{}]",
|
||||
key, value, context().topic(), context().partition(), context().offset()
|
||||
);
|
||||
metrics.skippedRecordsSensor().record();
|
||||
skippedRecordsSensor.record();
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.apache.kafka.streams.kstream.internals;
|
||||
|
||||
import org.apache.kafka.common.metrics.Sensor;
|
||||
import org.apache.kafka.streams.KeyValue;
|
||||
import org.apache.kafka.streams.kstream.ValueJoiner;
|
||||
import org.apache.kafka.streams.processor.AbstractProcessor;
|
||||
|
|
@ -24,12 +25,12 @@ import org.apache.kafka.streams.processor.ProcessorContext;
|
|||
import org.apache.kafka.streams.processor.ProcessorSupplier;
|
||||
import org.apache.kafka.streams.processor.To;
|
||||
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
|
||||
import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
|
||||
import org.apache.kafka.streams.state.WindowStore;
|
||||
import org.apache.kafka.streams.state.WindowStoreIterator;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
||||
class KStreamKStreamJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(KStreamKStreamJoin.class);
|
||||
|
||||
|
|
@ -57,12 +58,14 @@ class KStreamKStreamJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> {
|
|||
|
||||
private WindowStore<K, V2> otherWindow;
|
||||
private StreamsMetricsImpl metrics;
|
||||
private Sensor skippedRecordsSensor;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public void init(final ProcessorContext context) {
|
||||
super.init(context);
|
||||
metrics = (StreamsMetricsImpl) context.metrics();
|
||||
skippedRecordsSensor = ThreadMetrics.skipRecordSensor(metrics);
|
||||
|
||||
otherWindow = (WindowStore<K, V2>) context.getStateStore(otherWindowName);
|
||||
}
|
||||
|
|
@ -81,7 +84,7 @@ class KStreamKStreamJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> {
|
|||
"Skipping record due to null key or value. key=[{}] value=[{}] topic=[{}] partition=[{}] offset=[{}]",
|
||||
key, value, context().topic(), context().partition(), context().offset()
|
||||
);
|
||||
metrics.skippedRecordsSensor().record();
|
||||
skippedRecordsSensor.record();
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -16,11 +16,13 @@
|
|||
*/
|
||||
package org.apache.kafka.streams.kstream.internals;
|
||||
|
||||
import org.apache.kafka.common.metrics.Sensor;
|
||||
import org.apache.kafka.streams.kstream.KeyValueMapper;
|
||||
import org.apache.kafka.streams.kstream.ValueJoiner;
|
||||
import org.apache.kafka.streams.processor.AbstractProcessor;
|
||||
import org.apache.kafka.streams.processor.ProcessorContext;
|
||||
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
|
||||
import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
|
@ -34,6 +36,7 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, R> extends AbstractProcessor<K1
|
|||
private final ValueJoiner<? super V1, ? super V2, ? extends R> joiner;
|
||||
private final boolean leftJoin;
|
||||
private StreamsMetricsImpl metrics;
|
||||
private Sensor skippedRecordsSensor;
|
||||
|
||||
KStreamKTableJoinProcessor(final KTableValueGetter<K2, V2> valueGetter,
|
||||
final KeyValueMapper<? super K1, ? super V1, ? extends K2> keyMapper,
|
||||
|
|
@ -49,6 +52,8 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, R> extends AbstractProcessor<K1
|
|||
public void init(final ProcessorContext context) {
|
||||
super.init(context);
|
||||
metrics = (StreamsMetricsImpl) context.metrics();
|
||||
skippedRecordsSensor = ThreadMetrics.skipRecordSensor(metrics);
|
||||
|
||||
valueGetter.init(context);
|
||||
}
|
||||
|
||||
|
|
@ -67,7 +72,7 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, R> extends AbstractProcessor<K1
|
|||
"Skipping record due to null key or value. key=[{}] value=[{}] topic=[{}] partition=[{}] offset=[{}]",
|
||||
key, value, context().topic(), context().partition(), context().offset()
|
||||
);
|
||||
metrics.skippedRecordsSensor().record();
|
||||
skippedRecordsSensor.record();
|
||||
} else {
|
||||
final K2 mappedKey = keyMapper.apply(key, value);
|
||||
final V2 value2 = mappedKey == null ? null : getValueOrNull(valueGetter.get(mappedKey));
|
||||
|
|
|
|||
|
|
@ -16,11 +16,13 @@
|
|||
*/
|
||||
package org.apache.kafka.streams.kstream.internals;
|
||||
|
||||
import org.apache.kafka.common.metrics.Sensor;
|
||||
import org.apache.kafka.streams.kstream.Reducer;
|
||||
import org.apache.kafka.streams.processor.AbstractProcessor;
|
||||
import org.apache.kafka.streams.processor.Processor;
|
||||
import org.apache.kafka.streams.processor.ProcessorContext;
|
||||
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
|
||||
import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
|
||||
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
|
||||
import org.apache.kafka.streams.state.ValueAndTimestamp;
|
||||
import org.slf4j.Logger;
|
||||
|
|
@ -56,12 +58,14 @@ public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, V,
|
|||
private TimestampedKeyValueStore<K, V> store;
|
||||
private TimestampedTupleForwarder<K, V> tupleForwarder;
|
||||
private StreamsMetricsImpl metrics;
|
||||
private Sensor skippedRecordsSensor;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public void init(final ProcessorContext context) {
|
||||
super.init(context);
|
||||
metrics = (StreamsMetricsImpl) context.metrics();
|
||||
skippedRecordsSensor = ThreadMetrics.skipRecordSensor(metrics);
|
||||
store = (TimestampedKeyValueStore<K, V>) context.getStateStore(storeName);
|
||||
tupleForwarder = new TimestampedTupleForwarder<>(
|
||||
store,
|
||||
|
|
@ -78,7 +82,7 @@ public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, V,
|
|||
"Skipping record due to null key or value. key=[{}] value=[{}] topic=[{}] partition=[{}] offset=[{}]",
|
||||
key, value, context().topic(), context().partition(), context().offset()
|
||||
);
|
||||
metrics.skippedRecordsSensor().record();
|
||||
skippedRecordsSensor.record();
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -30,6 +30,7 @@ import org.apache.kafka.streams.processor.Processor;
|
|||
import org.apache.kafka.streams.processor.ProcessorContext;
|
||||
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
|
||||
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
|
||||
import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
|
||||
import org.apache.kafka.streams.state.KeyValueIterator;
|
||||
import org.apache.kafka.streams.state.SessionStore;
|
||||
import org.apache.kafka.streams.state.ValueAndTimestamp;
|
||||
|
|
@ -83,6 +84,7 @@ public class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProce
|
|||
private StreamsMetricsImpl metrics;
|
||||
private InternalProcessorContext internalProcessorContext;
|
||||
private Sensor lateRecordDropSensor;
|
||||
private Sensor skippedRecordsSensor;
|
||||
private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
|
|
@ -92,6 +94,7 @@ public class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProce
|
|||
internalProcessorContext = (InternalProcessorContext) context;
|
||||
metrics = (StreamsMetricsImpl) context.metrics();
|
||||
lateRecordDropSensor = Sensors.lateRecordDropSensor(internalProcessorContext);
|
||||
skippedRecordsSensor = ThreadMetrics.skipRecordSensor(metrics);
|
||||
|
||||
store = (SessionStore<K, Agg>) context.getStateStore(storeName);
|
||||
tupleForwarder = new SessionTupleForwarder<>(store, context, new SessionCacheFlushListener<>(context), sendOldValues);
|
||||
|
|
@ -106,7 +109,7 @@ public class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProce
|
|||
"Skipping record due to null key. value=[{}] topic=[{}] partition=[{}] offset=[{}]",
|
||||
value, context().topic(), context().partition(), context().offset()
|
||||
);
|
||||
metrics.skippedRecordsSensor().record();
|
||||
skippedRecordsSensor.record();
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -29,6 +29,7 @@ import org.apache.kafka.streams.processor.Processor;
|
|||
import org.apache.kafka.streams.processor.ProcessorContext;
|
||||
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
|
||||
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
|
||||
import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
|
||||
import org.apache.kafka.streams.state.TimestampedWindowStore;
|
||||
import org.apache.kafka.streams.state.ValueAndTimestamp;
|
||||
import org.slf4j.Logger;
|
||||
|
|
@ -79,6 +80,7 @@ public class KStreamWindowAggregate<K, V, Agg, W extends Window> implements KStr
|
|||
private StreamsMetricsImpl metrics;
|
||||
private InternalProcessorContext internalProcessorContext;
|
||||
private Sensor lateRecordDropSensor;
|
||||
private Sensor skippedRecordsSensor;
|
||||
private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
|
|
@ -86,8 +88,11 @@ public class KStreamWindowAggregate<K, V, Agg, W extends Window> implements KStr
|
|||
public void init(final ProcessorContext context) {
|
||||
super.init(context);
|
||||
internalProcessorContext = (InternalProcessorContext) context;
|
||||
metrics = (StreamsMetricsImpl) context.metrics();
|
||||
|
||||
metrics = internalProcessorContext.metrics();
|
||||
|
||||
lateRecordDropSensor = Sensors.lateRecordDropSensor(internalProcessorContext);
|
||||
skippedRecordsSensor = ThreadMetrics.skipRecordSensor(metrics);
|
||||
windowStore = (TimestampedWindowStore<K, Agg>) context.getStateStore(storeName);
|
||||
tupleForwarder = new TimestampedTupleForwarder<>(
|
||||
windowStore,
|
||||
|
|
@ -103,7 +108,7 @@ public class KStreamWindowAggregate<K, V, Agg, W extends Window> implements KStr
|
|||
"Skipping record due to null key. value=[{}] topic=[{}] partition=[{}] offset=[{}]",
|
||||
value, context().topic(), context().partition(), context().offset()
|
||||
);
|
||||
metrics.skippedRecordsSensor().record();
|
||||
skippedRecordsSensor.record();
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.apache.kafka.streams.kstream.internals;
|
||||
|
||||
import org.apache.kafka.common.metrics.Sensor;
|
||||
import org.apache.kafka.streams.kstream.KeyValueMapper;
|
||||
import org.apache.kafka.streams.kstream.ValueJoiner;
|
||||
import org.apache.kafka.streams.processor.AbstractProcessor;
|
||||
|
|
@ -23,6 +24,7 @@ import org.apache.kafka.streams.processor.Processor;
|
|||
import org.apache.kafka.streams.processor.ProcessorContext;
|
||||
import org.apache.kafka.streams.processor.To;
|
||||
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
|
||||
import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
|
||||
import org.apache.kafka.streams.state.ValueAndTimestamp;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
|
@ -66,6 +68,7 @@ class KTableKTableInnerJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
|
|||
|
||||
private final KTableValueGetter<K, V2> valueGetter;
|
||||
private StreamsMetricsImpl metrics;
|
||||
private Sensor skippedRecordsSensor;
|
||||
|
||||
KTableKTableJoinProcessor(final KTableValueGetter<K, V2> valueGetter) {
|
||||
this.valueGetter = valueGetter;
|
||||
|
|
@ -75,6 +78,7 @@ class KTableKTableInnerJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
|
|||
public void init(final ProcessorContext context) {
|
||||
super.init(context);
|
||||
metrics = (StreamsMetricsImpl) context.metrics();
|
||||
skippedRecordsSensor = ThreadMetrics.skipRecordSensor(metrics);
|
||||
valueGetter.init(context);
|
||||
}
|
||||
|
||||
|
|
@ -86,7 +90,7 @@ class KTableKTableInnerJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
|
|||
"Skipping record due to null key. change=[{}] topic=[{}] partition=[{}] offset=[{}]",
|
||||
change, context().topic(), context().partition(), context().offset()
|
||||
);
|
||||
metrics.skippedRecordsSensor().record();
|
||||
skippedRecordsSensor.record();
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -16,12 +16,14 @@
|
|||
*/
|
||||
package org.apache.kafka.streams.kstream.internals;
|
||||
|
||||
import org.apache.kafka.common.metrics.Sensor;
|
||||
import org.apache.kafka.streams.kstream.ValueJoiner;
|
||||
import org.apache.kafka.streams.processor.AbstractProcessor;
|
||||
import org.apache.kafka.streams.processor.Processor;
|
||||
import org.apache.kafka.streams.processor.ProcessorContext;
|
||||
import org.apache.kafka.streams.processor.To;
|
||||
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
|
||||
import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
|
||||
import org.apache.kafka.streams.state.ValueAndTimestamp;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
|
@ -65,6 +67,7 @@ class KTableKTableLeftJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
|
|||
|
||||
private final KTableValueGetter<K, V2> valueGetter;
|
||||
private StreamsMetricsImpl metrics;
|
||||
private Sensor skippedRecordsSensor;
|
||||
|
||||
KTableKTableLeftJoinProcessor(final KTableValueGetter<K, V2> valueGetter) {
|
||||
this.valueGetter = valueGetter;
|
||||
|
|
@ -74,6 +77,7 @@ class KTableKTableLeftJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
|
|||
public void init(final ProcessorContext context) {
|
||||
super.init(context);
|
||||
metrics = (StreamsMetricsImpl) context.metrics();
|
||||
skippedRecordsSensor = ThreadMetrics.skipRecordSensor(metrics);
|
||||
valueGetter.init(context);
|
||||
}
|
||||
|
||||
|
|
@ -85,7 +89,7 @@ class KTableKTableLeftJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
|
|||
"Skipping record due to null key. change=[{}] topic=[{}] partition=[{}] offset=[{}]",
|
||||
change, context().topic(), context().partition(), context().offset()
|
||||
);
|
||||
metrics.skippedRecordsSensor().record();
|
||||
skippedRecordsSensor.record();
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -16,12 +16,14 @@
|
|||
*/
|
||||
package org.apache.kafka.streams.kstream.internals;
|
||||
|
||||
import org.apache.kafka.common.metrics.Sensor;
|
||||
import org.apache.kafka.streams.kstream.ValueJoiner;
|
||||
import org.apache.kafka.streams.processor.AbstractProcessor;
|
||||
import org.apache.kafka.streams.processor.Processor;
|
||||
import org.apache.kafka.streams.processor.ProcessorContext;
|
||||
import org.apache.kafka.streams.processor.To;
|
||||
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
|
||||
import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
|
||||
import org.apache.kafka.streams.state.ValueAndTimestamp;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
|
@ -64,6 +66,7 @@ class KTableKTableOuterJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
|
|||
|
||||
private final KTableValueGetter<K, V2> valueGetter;
|
||||
private StreamsMetricsImpl metrics;
|
||||
private Sensor skippedRecordsSensor;
|
||||
|
||||
KTableKTableOuterJoinProcessor(final KTableValueGetter<K, V2> valueGetter) {
|
||||
this.valueGetter = valueGetter;
|
||||
|
|
@ -73,6 +76,7 @@ class KTableKTableOuterJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
|
|||
public void init(final ProcessorContext context) {
|
||||
super.init(context);
|
||||
metrics = (StreamsMetricsImpl) context.metrics();
|
||||
skippedRecordsSensor = ThreadMetrics.skipRecordSensor(metrics);
|
||||
valueGetter.init(context);
|
||||
}
|
||||
|
||||
|
|
@ -84,7 +88,7 @@ class KTableKTableOuterJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
|
|||
"Skipping record due to null key. change=[{}] topic=[{}] partition=[{}] offset=[{}]",
|
||||
change, context().topic(), context().partition(), context().offset()
|
||||
);
|
||||
metrics.skippedRecordsSensor().record();
|
||||
skippedRecordsSensor.record();
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -16,12 +16,14 @@
|
|||
*/
|
||||
package org.apache.kafka.streams.kstream.internals;
|
||||
|
||||
import org.apache.kafka.common.metrics.Sensor;
|
||||
import org.apache.kafka.streams.kstream.ValueJoiner;
|
||||
import org.apache.kafka.streams.processor.AbstractProcessor;
|
||||
import org.apache.kafka.streams.processor.Processor;
|
||||
import org.apache.kafka.streams.processor.ProcessorContext;
|
||||
import org.apache.kafka.streams.processor.To;
|
||||
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
|
||||
import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
|
||||
import org.apache.kafka.streams.state.ValueAndTimestamp;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
|
@ -63,6 +65,7 @@ class KTableKTableRightJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
|
|||
|
||||
private final KTableValueGetter<K, V2> valueGetter;
|
||||
private StreamsMetricsImpl metrics;
|
||||
private Sensor skippedRecordsSensor;
|
||||
|
||||
KTableKTableRightJoinProcessor(final KTableValueGetter<K, V2> valueGetter) {
|
||||
this.valueGetter = valueGetter;
|
||||
|
|
@ -72,6 +75,7 @@ class KTableKTableRightJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
|
|||
public void init(final ProcessorContext context) {
|
||||
super.init(context);
|
||||
metrics = (StreamsMetricsImpl) context.metrics();
|
||||
skippedRecordsSensor = ThreadMetrics.skipRecordSensor(metrics);
|
||||
valueGetter.init(context);
|
||||
}
|
||||
|
||||
|
|
@ -83,7 +87,7 @@ class KTableKTableRightJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
|
|||
"Skipping record due to null key. change=[{}] topic=[{}] partition=[{}] offset=[{}]",
|
||||
change, context().topic(), context().partition(), context().offset()
|
||||
);
|
||||
metrics.skippedRecordsSensor().record();
|
||||
skippedRecordsSensor.record();
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -16,11 +16,13 @@
|
|||
*/
|
||||
package org.apache.kafka.streams.kstream.internals;
|
||||
|
||||
import org.apache.kafka.common.metrics.Sensor;
|
||||
import org.apache.kafka.streams.processor.AbstractProcessor;
|
||||
import org.apache.kafka.streams.processor.Processor;
|
||||
import org.apache.kafka.streams.processor.ProcessorContext;
|
||||
import org.apache.kafka.streams.processor.ProcessorSupplier;
|
||||
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
|
||||
import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
|
||||
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
|
||||
import org.apache.kafka.streams.state.ValueAndTimestamp;
|
||||
import org.slf4j.Logger;
|
||||
|
|
@ -70,12 +72,14 @@ public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
|
|||
private TimestampedKeyValueStore<K, V> store;
|
||||
private TimestampedTupleForwarder<K, V> tupleForwarder;
|
||||
private StreamsMetricsImpl metrics;
|
||||
private Sensor skippedRecordsSensor;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public void init(final ProcessorContext context) {
|
||||
super.init(context);
|
||||
metrics = (StreamsMetricsImpl) context.metrics();
|
||||
skippedRecordsSensor = ThreadMetrics.skipRecordSensor(metrics);
|
||||
if (queryableName != null) {
|
||||
store = (TimestampedKeyValueStore<K, V>) context.getStateStore(queryableName);
|
||||
tupleForwarder = new TimestampedTupleForwarder<>(
|
||||
|
|
@ -94,7 +98,7 @@ public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
|
|||
"Skipping record due to null key. topic=[{}] partition=[{}] offset=[{}]",
|
||||
context().topic(), context().partition(), context().offset()
|
||||
);
|
||||
metrics.skippedRecordsSensor().record();
|
||||
skippedRecordsSensor.record();
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@ import org.apache.kafka.common.TopicPartition;
|
|||
import org.apache.kafka.common.utils.LogContext;
|
||||
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
|
||||
import org.apache.kafka.streams.errors.StreamsException;
|
||||
import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
|
|
@ -69,7 +70,7 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer {
|
|||
source,
|
||||
deserializationExceptionHandler,
|
||||
logContext,
|
||||
processorContext.metrics().skippedRecordsSensor()
|
||||
ThreadMetrics.skipRecordSensor(processorContext.metrics())
|
||||
)
|
||||
);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -18,17 +18,17 @@ package org.apache.kafka.streams.processor.internals;
|
|||
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.metrics.Sensor;
|
||||
import org.apache.kafka.common.utils.LogContext;
|
||||
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
|
||||
import org.apache.kafka.streams.errors.StreamsException;
|
||||
import org.apache.kafka.streams.processor.ProcessorContext;
|
||||
import org.apache.kafka.streams.processor.TimestampExtractor;
|
||||
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
|
||||
import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
|
||||
import org.slf4j.Logger;
|
||||
|
||||
import java.util.ArrayDeque;
|
||||
|
||||
|
||||
/**
|
||||
* RecordQueue is a FIFO queue of {@link StampedRecord} (ConsumerRecord + timestamp). It also keeps track of the
|
||||
* partition timestamp defined as the minimum timestamp of records in its queue; in addition, its partition
|
||||
|
|
@ -48,6 +48,8 @@ public class RecordQueue {
|
|||
|
||||
private StampedRecord headRecord = null;
|
||||
|
||||
private Sensor skipRecordsSensor;
|
||||
|
||||
RecordQueue(final TopicPartition partition,
|
||||
final SourceNode source,
|
||||
final TimestampExtractor timestampExtractor,
|
||||
|
|
@ -58,13 +60,14 @@ public class RecordQueue {
|
|||
this.partition = partition;
|
||||
this.fifoQueue = new ArrayDeque<>();
|
||||
this.timestampExtractor = timestampExtractor;
|
||||
this.recordDeserializer = new RecordDeserializer(
|
||||
this.processorContext = processorContext;
|
||||
skipRecordsSensor = ThreadMetrics.skipRecordSensor(processorContext.metrics());
|
||||
recordDeserializer = new RecordDeserializer(
|
||||
source,
|
||||
deserializationExceptionHandler,
|
||||
logContext,
|
||||
processorContext.metrics().skippedRecordsSensor()
|
||||
skipRecordsSensor
|
||||
);
|
||||
this.processorContext = processorContext;
|
||||
this.log = logContext.logger(RecordQueue.class);
|
||||
}
|
||||
|
||||
|
|
@ -180,7 +183,8 @@ public class RecordQueue {
|
|||
"Skipping record due to negative extracted timestamp. topic=[{}] partition=[{}] offset=[{}] extractedTimestamp=[{}] extractor=[{}]",
|
||||
deserialized.topic(), deserialized.partition(), deserialized.offset(), timestamp, timestampExtractor.getClass().getCanonicalName()
|
||||
);
|
||||
((StreamsMetricsImpl) processorContext.metrics()).skippedRecordsSensor().record();
|
||||
|
||||
skipRecordsSensor.record();
|
||||
continue;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -45,6 +45,7 @@ import org.apache.kafka.streams.processor.TaskId;
|
|||
import org.apache.kafka.streams.processor.TimestampExtractor;
|
||||
import org.apache.kafka.streams.processor.internals.metrics.CumulativeCount;
|
||||
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
|
||||
import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
|
||||
import org.apache.kafka.streams.state.internals.ThreadCache;
|
||||
|
||||
import java.io.IOException;
|
||||
|
|
@ -78,7 +79,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
|
|||
private final PunctuationQueue systemTimePunctuationQueue;
|
||||
private final ProducerSupplier producerSupplier;
|
||||
|
||||
private Sensor closeSensor;
|
||||
private Sensor closeTaskSensor;
|
||||
private long idleStartTime;
|
||||
private Producer<byte[], byte[]> producer;
|
||||
private boolean commitRequested = false;
|
||||
|
|
@ -96,24 +97,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
|
|||
final String group = "stream-task-metrics";
|
||||
|
||||
// first add the global operation metrics if not yet, with the global tags only
|
||||
final Map<String, String> allTagMap = metrics.tagMap("task-id", "all");
|
||||
final Sensor parent = metrics.threadLevelSensor("commit", Sensor.RecordingLevel.DEBUG);
|
||||
parent.add(
|
||||
new MetricName("commit-latency-avg", group, "The average latency of commit operation.", allTagMap),
|
||||
new Avg()
|
||||
);
|
||||
parent.add(
|
||||
new MetricName("commit-latency-max", group, "The max latency of commit operation.", allTagMap),
|
||||
new Max()
|
||||
);
|
||||
parent.add(
|
||||
new MetricName("commit-rate", group, "The average number of occurrence of commit operation per second.", allTagMap),
|
||||
new Rate(TimeUnit.SECONDS, new Count())
|
||||
);
|
||||
parent.add(
|
||||
new MetricName("commit-total", group, "The total number of occurrence of commit operations.", allTagMap),
|
||||
new CumulativeCount()
|
||||
);
|
||||
final Sensor parent = ThreadMetrics.commitOverTasksSensor(metrics);
|
||||
|
||||
// add the operation metrics with additional tags
|
||||
final Map<String, String> tagMap = metrics.tagMap("task-id", taskName);
|
||||
|
|
@ -167,9 +151,8 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
|
|||
final StateDirectory stateDirectory,
|
||||
final ThreadCache cache,
|
||||
final Time time,
|
||||
final ProducerSupplier producerSupplier,
|
||||
final Sensor closeSensor) {
|
||||
this(id, partitions, topology, consumer, changelogReader, config, metrics, stateDirectory, cache, time, producerSupplier, null, closeSensor);
|
||||
final ProducerSupplier producerSupplier) {
|
||||
this(id, partitions, topology, consumer, changelogReader, config, metrics, stateDirectory, cache, time, producerSupplier, null);
|
||||
}
|
||||
|
||||
public StreamTask(final TaskId id,
|
||||
|
|
@ -178,20 +161,20 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
|
|||
final Consumer<byte[], byte[]> consumer,
|
||||
final ChangelogReader changelogReader,
|
||||
final StreamsConfig config,
|
||||
final StreamsMetricsImpl metrics,
|
||||
final StreamsMetricsImpl streamsMetrics,
|
||||
final StateDirectory stateDirectory,
|
||||
final ThreadCache cache,
|
||||
final Time time,
|
||||
final ProducerSupplier producerSupplier,
|
||||
final RecordCollector recordCollector,
|
||||
final Sensor closeSensor) {
|
||||
final RecordCollector recordCollector) {
|
||||
super(id, partitions, topology, consumer, changelogReader, false, stateDirectory, config);
|
||||
|
||||
this.time = time;
|
||||
this.producerSupplier = producerSupplier;
|
||||
this.producer = producerSupplier.get();
|
||||
this.closeSensor = closeSensor;
|
||||
this.taskMetrics = new TaskMetrics(id, metrics);
|
||||
this.taskMetrics = new TaskMetrics(id, streamsMetrics);
|
||||
|
||||
closeTaskSensor = ThreadMetrics.closeTaskSensor(streamsMetrics);
|
||||
|
||||
final ProductionExceptionHandler productionExceptionHandler = config.defaultProductionExceptionHandler();
|
||||
|
||||
|
|
@ -200,8 +183,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
|
|||
id.toString(),
|
||||
logContext,
|
||||
productionExceptionHandler,
|
||||
metrics.skippedRecordsSensor()
|
||||
);
|
||||
ThreadMetrics.skipRecordSensor(streamsMetrics));
|
||||
} else {
|
||||
this.recordCollector = recordCollector;
|
||||
}
|
||||
|
|
@ -220,7 +202,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
|
|||
final Map<TopicPartition, RecordQueue> partitionQueues = new HashMap<>();
|
||||
|
||||
// initialize the topology with its own context
|
||||
final ProcessorContextImpl processorContextImpl = new ProcessorContextImpl(id, this, config, this.recordCollector, stateMgr, metrics, cache);
|
||||
final ProcessorContextImpl processorContextImpl = new ProcessorContextImpl(id, this, config, this.recordCollector, stateMgr, streamsMetrics, cache);
|
||||
processorContext = processorContextImpl;
|
||||
|
||||
final TimestampExtractor defaultTimestampExtractor = config.defaultTimestampExtractor();
|
||||
|
|
@ -691,7 +673,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
|
|||
partitionGroup.close();
|
||||
taskMetrics.removeAllSensors();
|
||||
|
||||
closeSensor.record();
|
||||
closeTaskSensor.record();
|
||||
|
||||
if (firstException != null) {
|
||||
throw firstException;
|
||||
|
|
|
|||
|
|
@ -31,9 +31,6 @@ import org.apache.kafka.common.MetricName;
|
|||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.metrics.Metrics;
|
||||
import org.apache.kafka.common.metrics.Sensor;
|
||||
import org.apache.kafka.common.metrics.stats.Count;
|
||||
import org.apache.kafka.common.metrics.stats.Rate;
|
||||
import org.apache.kafka.common.metrics.stats.Total;
|
||||
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
|
||||
import org.apache.kafka.common.utils.LogContext;
|
||||
import org.apache.kafka.common.utils.Time;
|
||||
|
|
@ -45,8 +42,8 @@ import org.apache.kafka.streams.processor.StateRestoreListener;
|
|||
import org.apache.kafka.streams.processor.TaskId;
|
||||
import org.apache.kafka.streams.processor.TaskMetadata;
|
||||
import org.apache.kafka.streams.processor.ThreadMetadata;
|
||||
import org.apache.kafka.streams.processor.internals.metrics.CumulativeCount;
|
||||
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
|
||||
import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
|
||||
import org.apache.kafka.streams.state.internals.ThreadCache;
|
||||
import org.slf4j.Logger;
|
||||
|
||||
|
|
@ -62,7 +59,6 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static java.util.Collections.singleton;
|
||||
|
|
@ -340,7 +336,7 @@ public class StreamThread extends Thread {
|
|||
final String applicationId;
|
||||
final InternalTopologyBuilder builder;
|
||||
final StreamsConfig config;
|
||||
final StreamsMetricsThreadImpl streamsMetrics;
|
||||
final StreamsMetricsImpl streamsMetrics;
|
||||
final StateDirectory stateDirectory;
|
||||
final ChangelogReader storeChangelogReader;
|
||||
final Time time;
|
||||
|
|
@ -349,7 +345,7 @@ public class StreamThread extends Thread {
|
|||
|
||||
AbstractTaskCreator(final InternalTopologyBuilder builder,
|
||||
final StreamsConfig config,
|
||||
final StreamsMetricsThreadImpl streamsMetrics,
|
||||
final StreamsMetricsImpl streamsMetrics,
|
||||
final StateDirectory stateDirectory,
|
||||
final ChangelogReader storeChangelogReader,
|
||||
final Time time,
|
||||
|
|
@ -398,10 +394,11 @@ public class StreamThread extends Thread {
|
|||
private final KafkaClientSupplier clientSupplier;
|
||||
private final String threadClientId;
|
||||
private final Producer<byte[], byte[]> threadProducer;
|
||||
private final Sensor createTaskSensor;
|
||||
|
||||
TaskCreator(final InternalTopologyBuilder builder,
|
||||
final StreamsConfig config,
|
||||
final StreamsMetricsThreadImpl streamsMetrics,
|
||||
final StreamsMetricsImpl streamsMetrics,
|
||||
final StateDirectory stateDirectory,
|
||||
final ChangelogReader storeChangelogReader,
|
||||
final ThreadCache cache,
|
||||
|
|
@ -422,13 +419,14 @@ public class StreamThread extends Thread {
|
|||
this.clientSupplier = clientSupplier;
|
||||
this.threadProducer = threadProducer;
|
||||
this.threadClientId = threadClientId;
|
||||
createTaskSensor = ThreadMetrics.createTaskSensor(streamsMetrics);
|
||||
}
|
||||
|
||||
@Override
|
||||
StreamTask createTask(final Consumer<byte[], byte[]> consumer,
|
||||
final TaskId taskId,
|
||||
final Set<TopicPartition> partitions) {
|
||||
streamsMetrics.taskCreatedSensor.record();
|
||||
createTaskSensor.record();
|
||||
|
||||
return new StreamTask(
|
||||
taskId,
|
||||
|
|
@ -441,8 +439,7 @@ public class StreamThread extends Thread {
|
|||
stateDirectory,
|
||||
cache,
|
||||
time,
|
||||
() -> createProducer(taskId),
|
||||
streamsMetrics.taskClosedSensor);
|
||||
() -> createProducer(taskId));
|
||||
}
|
||||
|
||||
private Producer<byte[], byte[]> createProducer(final TaskId id) {
|
||||
|
|
@ -470,9 +467,11 @@ public class StreamThread extends Thread {
|
|||
}
|
||||
|
||||
static class StandbyTaskCreator extends AbstractTaskCreator<StandbyTask> {
|
||||
private final Sensor createTaskSensor;
|
||||
|
||||
StandbyTaskCreator(final InternalTopologyBuilder builder,
|
||||
final StreamsConfig config,
|
||||
final StreamsMetricsThreadImpl streamsMetrics,
|
||||
final StreamsMetricsImpl streamsMetrics,
|
||||
final StateDirectory stateDirectory,
|
||||
final ChangelogReader storeChangelogReader,
|
||||
final Time time,
|
||||
|
|
@ -485,13 +484,14 @@ public class StreamThread extends Thread {
|
|||
storeChangelogReader,
|
||||
time,
|
||||
log);
|
||||
createTaskSensor = ThreadMetrics.createTaskSensor(streamsMetrics);
|
||||
}
|
||||
|
||||
@Override
|
||||
StandbyTask createTask(final Consumer<byte[], byte[]> consumer,
|
||||
final TaskId taskId,
|
||||
final Set<TopicPartition> partitions) {
|
||||
streamsMetrics.taskCreatedSensor.record();
|
||||
createTaskSensor.record();
|
||||
|
||||
final ProcessorTopology topology = builder.build(taskId.topicGroupId);
|
||||
|
||||
|
|
@ -516,47 +516,6 @@ public class StreamThread extends Thread {
|
|||
}
|
||||
}
|
||||
|
||||
static class StreamsMetricsThreadImpl extends StreamsMetricsImpl {
|
||||
|
||||
private final Sensor commitTimeSensor;
|
||||
private final Sensor pollTimeSensor;
|
||||
private final Sensor processTimeSensor;
|
||||
private final Sensor punctuateTimeSensor;
|
||||
private final Sensor taskCreatedSensor;
|
||||
private final Sensor taskClosedSensor;
|
||||
|
||||
StreamsMetricsThreadImpl(final Metrics metrics, final String threadName) {
|
||||
super(metrics, threadName);
|
||||
final String group = "stream-metrics";
|
||||
|
||||
commitTimeSensor = threadLevelSensor("commit-latency", Sensor.RecordingLevel.INFO);
|
||||
addAvgMaxLatency(commitTimeSensor, group, tagMap(), "commit");
|
||||
addInvocationRateAndCount(commitTimeSensor, group, tagMap(), "commit");
|
||||
|
||||
pollTimeSensor = threadLevelSensor("poll-latency", Sensor.RecordingLevel.INFO);
|
||||
addAvgMaxLatency(pollTimeSensor, group, tagMap(), "poll");
|
||||
// can't use addInvocationRateAndCount due to non-standard description string
|
||||
pollTimeSensor.add(metrics.metricName("poll-rate", group, "The average per-second number of record-poll calls", tagMap()), new Rate(TimeUnit.SECONDS, new Count()));
|
||||
pollTimeSensor.add(metrics.metricName("poll-total", group, "The total number of record-poll calls", tagMap()), new CumulativeCount());
|
||||
|
||||
processTimeSensor = threadLevelSensor("process-latency", Sensor.RecordingLevel.INFO);
|
||||
addAvgMaxLatency(processTimeSensor, group, tagMap(), "process");
|
||||
addInvocationRateAndCount(processTimeSensor, group, tagMap(), "process");
|
||||
|
||||
punctuateTimeSensor = threadLevelSensor("punctuate-latency", Sensor.RecordingLevel.INFO);
|
||||
addAvgMaxLatency(punctuateTimeSensor, group, tagMap(), "punctuate");
|
||||
addInvocationRateAndCount(punctuateTimeSensor, group, tagMap(), "punctuate");
|
||||
|
||||
taskCreatedSensor = threadLevelSensor("task-created", Sensor.RecordingLevel.INFO);
|
||||
taskCreatedSensor.add(metrics.metricName("task-created-rate", "stream-metrics", "The average per-second number of newly created tasks", tagMap()), new Rate(TimeUnit.SECONDS, new Count()));
|
||||
taskCreatedSensor.add(metrics.metricName("task-created-total", "stream-metrics", "The total number of newly created tasks", tagMap()), new Total());
|
||||
|
||||
taskClosedSensor = threadLevelSensor("task-closed", Sensor.RecordingLevel.INFO);
|
||||
taskClosedSensor.add(metrics.metricName("task-closed-rate", group, "The average per-second number of closed tasks", tagMap()), new Rate(TimeUnit.SECONDS, new Count()));
|
||||
taskClosedSensor.add(metrics.metricName("task-closed-total", group, "The total number of closed tasks", tagMap()), new Total());
|
||||
}
|
||||
}
|
||||
|
||||
private final Time time;
|
||||
private final Logger log;
|
||||
private final String logPrefix;
|
||||
|
|
@ -566,9 +525,14 @@ public class StreamThread extends Thread {
|
|||
private final int maxPollTimeMs;
|
||||
private final String originalReset;
|
||||
private final TaskManager taskManager;
|
||||
private final StreamsMetricsThreadImpl streamsMetrics;
|
||||
private final AtomicInteger assignmentErrorCode;
|
||||
|
||||
private final StreamsMetricsImpl streamsMetrics;
|
||||
private final Sensor commitSensor;
|
||||
private final Sensor pollSensor;
|
||||
private final Sensor punctuateSensor;
|
||||
private final Sensor processSensor;
|
||||
|
||||
private long now;
|
||||
private long lastPollMs;
|
||||
private long lastCommitMs;
|
||||
|
|
@ -620,10 +584,7 @@ public class StreamThread extends Thread {
|
|||
threadProducer = clientSupplier.getProducer(producerConfigs);
|
||||
}
|
||||
|
||||
final StreamsMetricsThreadImpl streamsMetrics = new StreamsMetricsThreadImpl(
|
||||
metrics,
|
||||
threadClientId
|
||||
);
|
||||
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, threadClientId);
|
||||
|
||||
final ThreadCache cache = new ThreadCache(logContext, cacheSizeBytes, streamsMetrics);
|
||||
|
||||
|
|
@ -697,7 +658,7 @@ public class StreamThread extends Thread {
|
|||
final Consumer<byte[], byte[]> consumer,
|
||||
final String originalReset,
|
||||
final TaskManager taskManager,
|
||||
final StreamsMetricsThreadImpl streamsMetrics,
|
||||
final StreamsMetricsImpl streamsMetrics,
|
||||
final InternalTopologyBuilder builder,
|
||||
final String threadClientId,
|
||||
final LogContext logContext,
|
||||
|
|
@ -707,9 +668,24 @@ public class StreamThread extends Thread {
|
|||
this.stateLock = new Object();
|
||||
this.standbyRecords = new HashMap<>();
|
||||
|
||||
this.streamsMetrics = streamsMetrics;
|
||||
this.commitSensor = ThreadMetrics.commitSensor(streamsMetrics);
|
||||
this.pollSensor = ThreadMetrics.pollSensor(streamsMetrics);
|
||||
this.processSensor = ThreadMetrics.processSensor(streamsMetrics);
|
||||
this.punctuateSensor = ThreadMetrics.punctuateSensor(streamsMetrics);
|
||||
|
||||
// The following sensors are created here but their references are not stored in this object, since within
|
||||
// this object they are not recorded. The sensors are created here so that the stream threads starts with all
|
||||
// its metrics initialised. Otherwise, those sensors would have been created during processing, which could
|
||||
// lead to missing metrics. For instance, if no task were created, the metrics for created and closed
|
||||
// tasks would never be added to the metrics.
|
||||
ThreadMetrics.createTaskSensor(streamsMetrics);
|
||||
ThreadMetrics.closeTaskSensor(streamsMetrics);
|
||||
ThreadMetrics.skipRecordSensor(streamsMetrics);
|
||||
ThreadMetrics.commitOverTasksSensor(streamsMetrics);
|
||||
|
||||
this.time = time;
|
||||
this.builder = builder;
|
||||
this.streamsMetrics = streamsMetrics;
|
||||
this.logPrefix = logContext.logPrefix();
|
||||
this.log = logContext.logger(StreamThread.class);
|
||||
this.rebalanceListener = new RebalanceListener(time, taskManager, this, this.log);
|
||||
|
|
@ -857,7 +833,7 @@ public class StreamThread extends Thread {
|
|||
final long pollLatency = advanceNowAndComputeLatency();
|
||||
|
||||
if (records != null && !records.isEmpty()) {
|
||||
streamsMetrics.pollTimeSensor.record(pollLatency, now);
|
||||
pollSensor.record(pollLatency, now);
|
||||
addRecordsToTasks(records);
|
||||
}
|
||||
|
||||
|
|
@ -891,14 +867,14 @@ public class StreamThread extends Thread {
|
|||
|
||||
if (processed > 0) {
|
||||
final long processLatency = advanceNowAndComputeLatency();
|
||||
streamsMetrics.processTimeSensor.record(processLatency / (double) processed, now);
|
||||
processSensor.record(processLatency / (double) processed, now);
|
||||
|
||||
// commit any tasks that have requested a commit
|
||||
final int committed = taskManager.maybeCommitActiveTasksPerUserRequested();
|
||||
|
||||
if (committed > 0) {
|
||||
final long commitLatency = advanceNowAndComputeLatency();
|
||||
streamsMetrics.commitTimeSensor.record(commitLatency / (double) committed, now);
|
||||
commitSensor.record(commitLatency / (double) committed, now);
|
||||
}
|
||||
} else {
|
||||
// if there is no records to be processed, exit immediately
|
||||
|
|
@ -1031,7 +1007,7 @@ public class StreamThread extends Thread {
|
|||
final int punctuated = taskManager.punctuate();
|
||||
if (punctuated > 0) {
|
||||
final long punctuateLatency = advanceNowAndComputeLatency();
|
||||
streamsMetrics.punctuateTimeSensor.record(punctuateLatency / (double) punctuated, now);
|
||||
punctuateSensor.record(punctuateLatency / (double) punctuated, now);
|
||||
}
|
||||
|
||||
return punctuated > 0;
|
||||
|
|
@ -1057,7 +1033,7 @@ public class StreamThread extends Thread {
|
|||
committed += taskManager.commitAll();
|
||||
if (committed > 0) {
|
||||
final long intervalCommitLatency = advanceNowAndComputeLatency();
|
||||
streamsMetrics.commitTimeSensor.record(intervalCommitLatency / (double) committed, now);
|
||||
commitSensor.record(intervalCommitLatency / (double) committed, now);
|
||||
|
||||
// try to purge the committed records for repartition topics if possible
|
||||
taskManager.maybePurgeCommitedRecords();
|
||||
|
|
@ -1074,7 +1050,7 @@ public class StreamThread extends Thread {
|
|||
final int commitPerRequested = taskManager.maybeCommitActiveTasksPerUserRequested();
|
||||
if (commitPerRequested > 0) {
|
||||
final long requestCommitLatency = advanceNowAndComputeLatency();
|
||||
streamsMetrics.commitTimeSensor.record(requestCommitLatency / (double) committed, now);
|
||||
commitSensor.record(requestCommitLatency / (double) committed, now);
|
||||
committed += commitPerRequested;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -20,11 +20,11 @@ import org.apache.kafka.common.Metric;
|
|||
import org.apache.kafka.common.MetricName;
|
||||
import org.apache.kafka.common.metrics.Metrics;
|
||||
import org.apache.kafka.common.metrics.Sensor;
|
||||
import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
|
||||
import org.apache.kafka.common.metrics.stats.Avg;
|
||||
import org.apache.kafka.common.metrics.stats.Count;
|
||||
import org.apache.kafka.common.metrics.stats.Max;
|
||||
import org.apache.kafka.common.metrics.stats.Rate;
|
||||
import org.apache.kafka.common.metrics.stats.Total;
|
||||
import org.apache.kafka.streams.StreamsMetrics;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
|
@ -40,7 +40,6 @@ import java.util.concurrent.TimeUnit;
|
|||
public class StreamsMetricsImpl implements StreamsMetrics {
|
||||
private final Metrics metrics;
|
||||
private final Map<Sensor, Sensor> parentSensors;
|
||||
private final Sensor skippedRecordsSensor;
|
||||
private final String threadName;
|
||||
|
||||
private final Deque<String> threadLevelSensors = new LinkedList<>();
|
||||
|
|
@ -52,6 +51,20 @@ public class StreamsMetricsImpl implements StreamsMetrics {
|
|||
private static final String SENSOR_PREFIX_DELIMITER = ".";
|
||||
private static final String SENSOR_NAME_DELIMITER = ".s.";
|
||||
|
||||
public static final String THREAD_ID_TAG = "client-id";
|
||||
public static final String TASK_ID_TAG = "task-id";
|
||||
|
||||
public static final String ALL_TASKS = "all";
|
||||
|
||||
public static final String LATENCY_SUFFIX = "-latency";
|
||||
public static final String AVG_SUFFIX = "-avg";
|
||||
public static final String MAX_SUFFIX = "-max";
|
||||
public static final String RATE_SUFFIX = "-rate";
|
||||
public static final String TOTAL_SUFFIX = "-total";
|
||||
|
||||
public static final String THREAD_LEVEL_GROUP = "stream-metrics";
|
||||
public static final String TASK_LEVEL_GROUP = "stream-task-metrics";
|
||||
|
||||
public static final String PROCESSOR_NODE_METRICS_GROUP = "stream-processor-node-metrics";
|
||||
public static final String PROCESSOR_NODE_ID_TAG = "processor-node-id";
|
||||
|
||||
|
|
@ -60,30 +73,47 @@ public class StreamsMetricsImpl implements StreamsMetrics {
|
|||
|
||||
public StreamsMetricsImpl(final Metrics metrics, final String threadName) {
|
||||
Objects.requireNonNull(metrics, "Metrics cannot be null");
|
||||
this.metrics = metrics;
|
||||
this.threadName = threadName;
|
||||
|
||||
this.metrics = metrics;
|
||||
|
||||
this.parentSensors = new HashMap<>();
|
||||
|
||||
final String group = "stream-metrics";
|
||||
skippedRecordsSensor = threadLevelSensor("skipped-records", Sensor.RecordingLevel.INFO);
|
||||
skippedRecordsSensor.add(new MetricName("skipped-records-rate", group, "The average per-second number of skipped records", tagMap()), new Rate(TimeUnit.SECONDS, new Count()));
|
||||
skippedRecordsSensor.add(new MetricName("skipped-records-total", group, "The total number of skipped records", tagMap()), new Total());
|
||||
}
|
||||
|
||||
public final Sensor threadLevelSensor(final String sensorName,
|
||||
final Sensor.RecordingLevel recordingLevel,
|
||||
final RecordingLevel recordingLevel,
|
||||
final Sensor... parents) {
|
||||
synchronized (threadLevelSensors) {
|
||||
final String fullSensorName = threadSensorPrefix() + SENSOR_NAME_DELIMITER + sensorName;
|
||||
final Sensor sensor = metrics.sensor(fullSensorName, recordingLevel, parents);
|
||||
threadLevelSensors.push(fullSensorName);
|
||||
|
||||
return sensor;
|
||||
}
|
||||
}
|
||||
|
||||
private String threadSensorPrefix() {
|
||||
return "internal" + SENSOR_PREFIX_DELIMITER + threadName;
|
||||
}
|
||||
|
||||
public Map<String, String> threadLevelTagMap() {
|
||||
final Map<String, String> tagMap = new LinkedHashMap<>();
|
||||
tagMap.put(THREAD_ID_TAG, threadName);
|
||||
return tagMap;
|
||||
}
|
||||
|
||||
public Map<String, String> threadLevelTagMap(final String... tags) {
|
||||
final Map<String, String> tagMap = threadLevelTagMap();
|
||||
if (tags != null) {
|
||||
if ((tags.length % 2) != 0) {
|
||||
throw new IllegalArgumentException("Tags needs to be specified in key-value pairs");
|
||||
}
|
||||
|
||||
for (int i = 0; i < tags.length; i += 2) {
|
||||
tagMap.put(tags[i], tags[i + 1]);
|
||||
}
|
||||
}
|
||||
return tagMap;
|
||||
}
|
||||
|
||||
public final void removeAllThreadLevelSensors() {
|
||||
synchronized (threadLevelSensors) {
|
||||
while (!threadLevelSensors.isEmpty()) {
|
||||
|
|
@ -92,13 +122,9 @@ public class StreamsMetricsImpl implements StreamsMetrics {
|
|||
}
|
||||
}
|
||||
|
||||
private String threadSensorPrefix() {
|
||||
return "internal" + SENSOR_PREFIX_DELIMITER + threadName;
|
||||
}
|
||||
|
||||
public final Sensor taskLevelSensor(final String taskName,
|
||||
final String sensorName,
|
||||
final Sensor.RecordingLevel recordingLevel,
|
||||
final RecordingLevel recordingLevel,
|
||||
final Sensor... parents) {
|
||||
final String key = taskSensorPrefix(taskName);
|
||||
synchronized (taskLevelSensors) {
|
||||
|
|
@ -235,10 +261,6 @@ public class StreamsMetricsImpl implements StreamsMetrics {
|
|||
return taskSensorPrefix(taskName) + SENSOR_PREFIX_DELIMITER + "store" + SENSOR_PREFIX_DELIMITER + storeName;
|
||||
}
|
||||
|
||||
public final Sensor skippedRecordsSensor() {
|
||||
return skippedRecordsSensor;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Sensor addSensor(final String name, final Sensor.RecordingLevel recordingLevel) {
|
||||
return metrics.sensor(name, recordingLevel);
|
||||
|
|
@ -357,6 +379,28 @@ public class StreamsMetricsImpl implements StreamsMetrics {
|
|||
}
|
||||
|
||||
|
||||
public static void addAvgAndMax(final Sensor sensor,
|
||||
final String group,
|
||||
final Map<String, String> tags,
|
||||
final String operation) {
|
||||
sensor.add(
|
||||
new MetricName(
|
||||
operation + AVG_SUFFIX,
|
||||
group,
|
||||
"The average value of " + operation + ".",
|
||||
tags),
|
||||
new Avg()
|
||||
);
|
||||
sensor.add(
|
||||
new MetricName(
|
||||
operation + MAX_SUFFIX,
|
||||
group,
|
||||
"The max value of " + operation + ".",
|
||||
tags),
|
||||
new Max()
|
||||
);
|
||||
}
|
||||
|
||||
public static void addAvgMaxLatency(final Sensor sensor,
|
||||
final String group,
|
||||
final Map<String, String> tags,
|
||||
|
|
@ -382,25 +426,39 @@ public class StreamsMetricsImpl implements StreamsMetrics {
|
|||
public static void addInvocationRateAndCount(final Sensor sensor,
|
||||
final String group,
|
||||
final Map<String, String> tags,
|
||||
final String operation) {
|
||||
final String operation,
|
||||
final String descriptionOfInvocation,
|
||||
final String descriptionOfRate) {
|
||||
sensor.add(
|
||||
new MetricName(
|
||||
operation + "-rate",
|
||||
operation + TOTAL_SUFFIX,
|
||||
group,
|
||||
"The average number of occurrence of " + operation + " operation per second.",
|
||||
tags
|
||||
),
|
||||
new Rate(TimeUnit.SECONDS, new Count())
|
||||
);
|
||||
sensor.add(
|
||||
new MetricName(
|
||||
operation + "-total",
|
||||
group,
|
||||
"The total number of occurrence of " + operation + " operations.",
|
||||
descriptionOfInvocation,
|
||||
tags
|
||||
),
|
||||
new CumulativeCount()
|
||||
);
|
||||
sensor.add(
|
||||
new MetricName(
|
||||
operation + RATE_SUFFIX,
|
||||
group,
|
||||
descriptionOfRate,
|
||||
tags
|
||||
),
|
||||
new Rate(TimeUnit.SECONDS, new Count())
|
||||
);
|
||||
}
|
||||
|
||||
public static void addInvocationRateAndCount(final Sensor sensor,
|
||||
final String group,
|
||||
final Map<String, String> tags,
|
||||
final String operation) {
|
||||
addInvocationRateAndCount(sensor,
|
||||
group,
|
||||
tags,
|
||||
operation,
|
||||
"The total number of " + operation,
|
||||
"The average per-second number of " + operation);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -0,0 +1,179 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.streams.processor.internals.metrics;
|
||||
|
||||
import org.apache.kafka.common.metrics.Sensor;
|
||||
import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ALL_TASKS;
|
||||
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.LATENCY_SUFFIX;
|
||||
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TASK_ID_TAG;
|
||||
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TASK_LEVEL_GROUP;
|
||||
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_LEVEL_GROUP;
|
||||
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgAndMax;
|
||||
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount;
|
||||
|
||||
public class ThreadMetrics {
|
||||
private ThreadMetrics() {}
|
||||
|
||||
private static final String COMMIT = "commit";
|
||||
private static final String POLL = "poll";
|
||||
private static final String PROCESS = "process";
|
||||
private static final String PUNCTUATE = "punctuate";
|
||||
private static final String CREATE_TASK = "task-created";
|
||||
private static final String CLOSE_TASK = "task-closed";
|
||||
private static final String SKIP_RECORD = "skipped-records";
|
||||
|
||||
private static final String TOTAL_DESCRIPTION = "The total number of ";
|
||||
private static final String RATE_DESCRIPTION = "The average per-second number of ";
|
||||
private static final String COMMIT_DESCRIPTION = "commit calls";
|
||||
private static final String COMMIT_TOTAL_DESCRIPTION = TOTAL_DESCRIPTION + COMMIT_DESCRIPTION;
|
||||
private static final String COMMIT_RATE_DESCRIPTION = RATE_DESCRIPTION + COMMIT_DESCRIPTION;
|
||||
private static final String CREATE_TASK_DESCRIPTION = "newly created tasks";
|
||||
private static final String CREATE_TASK_TOTAL_DESCRIPTION = TOTAL_DESCRIPTION + CREATE_TASK_DESCRIPTION;
|
||||
private static final String CREATE_TASK_RATE_DESCRIPTION = RATE_DESCRIPTION + CREATE_TASK_DESCRIPTION;
|
||||
private static final String CLOSE_TASK_DESCRIPTION = "closed tasks";
|
||||
private static final String CLOSE_TASK_TOTAL_DESCRIPTION = TOTAL_DESCRIPTION + CLOSE_TASK_DESCRIPTION;
|
||||
private static final String CLOSE_TASK_RATE_DESCRIPTION = RATE_DESCRIPTION + CLOSE_TASK_DESCRIPTION;
|
||||
private static final String POLL_DESCRIPTION = "poll calls";
|
||||
private static final String POLL_TOTAL_DESCRIPTION = TOTAL_DESCRIPTION + POLL_DESCRIPTION;
|
||||
private static final String POLL_RATE_DESCRIPTION = RATE_DESCRIPTION + POLL_DESCRIPTION;
|
||||
private static final String PROCESS_DESCRIPTION = "process calls";
|
||||
private static final String PROCESS_TOTAL_DESCRIPTION = TOTAL_DESCRIPTION + PROCESS_DESCRIPTION;
|
||||
private static final String PROCESS_RATE_DESCRIPTION = RATE_DESCRIPTION + PROCESS_DESCRIPTION;
|
||||
private static final String PUNCTUATE_DESCRIPTION = "punctuate calls";
|
||||
private static final String PUNCTUATE_TOTAL_DESCRIPTION = TOTAL_DESCRIPTION + PUNCTUATE_DESCRIPTION;
|
||||
private static final String PUNCTUATE_RATE_DESCRIPTION = RATE_DESCRIPTION + PUNCTUATE_DESCRIPTION;
|
||||
private static final String SKIP_RECORDS_DESCRIPTION = "skipped records";
|
||||
private static final String SKIP_RECORD_TOTAL_DESCRIPTION = TOTAL_DESCRIPTION + SKIP_RECORDS_DESCRIPTION;
|
||||
private static final String SKIP_RECORD_RATE_DESCRIPTION = RATE_DESCRIPTION + SKIP_RECORDS_DESCRIPTION;
|
||||
private static final String COMMIT_OVER_TASKS_DESCRIPTION = "commit calls over all tasks";
|
||||
private static final String COMMIT_OVER_TASKS_TOTAL_DESCRIPTION = TOTAL_DESCRIPTION + COMMIT_OVER_TASKS_DESCRIPTION;
|
||||
private static final String COMMIT_OVER_TASKS_RATE_DESCRIPTION = RATE_DESCRIPTION + COMMIT_OVER_TASKS_DESCRIPTION;
|
||||
|
||||
private static final String COMMIT_LATENCY = COMMIT + LATENCY_SUFFIX;
|
||||
private static final String POLL_LATENCY = POLL + LATENCY_SUFFIX;
|
||||
private static final String PROCESS_LATENCY = PROCESS + LATENCY_SUFFIX;
|
||||
private static final String PUNCTUATE_LATENCY = PUNCTUATE + LATENCY_SUFFIX;
|
||||
|
||||
public static Sensor createTaskSensor(final StreamsMetricsImpl streamsMetrics) {
|
||||
final Sensor createTaskSensor = streamsMetrics.threadLevelSensor(CREATE_TASK, RecordingLevel.INFO);
|
||||
addInvocationRateAndCount(createTaskSensor,
|
||||
THREAD_LEVEL_GROUP,
|
||||
streamsMetrics.threadLevelTagMap(),
|
||||
CREATE_TASK,
|
||||
CREATE_TASK_TOTAL_DESCRIPTION,
|
||||
CREATE_TASK_RATE_DESCRIPTION);
|
||||
return createTaskSensor;
|
||||
}
|
||||
|
||||
public static Sensor closeTaskSensor(final StreamsMetricsImpl streamsMetrics) {
|
||||
final Sensor closeTaskSensor = streamsMetrics.threadLevelSensor(CLOSE_TASK, RecordingLevel.INFO);
|
||||
addInvocationRateAndCount(closeTaskSensor,
|
||||
THREAD_LEVEL_GROUP,
|
||||
streamsMetrics.threadLevelTagMap(),
|
||||
CLOSE_TASK,
|
||||
CLOSE_TASK_TOTAL_DESCRIPTION,
|
||||
CLOSE_TASK_RATE_DESCRIPTION);
|
||||
return closeTaskSensor;
|
||||
}
|
||||
|
||||
public static Sensor commitSensor(final StreamsMetricsImpl streamsMetrics) {
|
||||
final Sensor commitSensor = streamsMetrics.threadLevelSensor(COMMIT, Sensor.RecordingLevel.INFO);
|
||||
final Map<String, String> tagMap = streamsMetrics.threadLevelTagMap();
|
||||
addAvgAndMax(commitSensor, THREAD_LEVEL_GROUP, tagMap, COMMIT_LATENCY);
|
||||
addInvocationRateAndCount(commitSensor,
|
||||
THREAD_LEVEL_GROUP,
|
||||
tagMap,
|
||||
COMMIT,
|
||||
COMMIT_TOTAL_DESCRIPTION,
|
||||
COMMIT_RATE_DESCRIPTION);
|
||||
return commitSensor;
|
||||
}
|
||||
|
||||
public static Sensor pollSensor(final StreamsMetricsImpl streamsMetrics) {
|
||||
final Sensor pollSensor = streamsMetrics.threadLevelSensor(POLL, Sensor.RecordingLevel.INFO);
|
||||
final Map<String, String> tagMap = streamsMetrics.threadLevelTagMap();
|
||||
addAvgAndMax(pollSensor, THREAD_LEVEL_GROUP, tagMap, POLL_LATENCY);
|
||||
addInvocationRateAndCount(pollSensor,
|
||||
THREAD_LEVEL_GROUP,
|
||||
tagMap,
|
||||
POLL,
|
||||
POLL_TOTAL_DESCRIPTION,
|
||||
POLL_RATE_DESCRIPTION);
|
||||
return pollSensor;
|
||||
}
|
||||
|
||||
public static Sensor processSensor(final StreamsMetricsImpl streamsMetrics) {
|
||||
final Sensor processSensor = streamsMetrics.threadLevelSensor(PROCESS, Sensor.RecordingLevel.INFO);
|
||||
final Map<String, String> tagMap = streamsMetrics.threadLevelTagMap();
|
||||
addAvgAndMax(processSensor, THREAD_LEVEL_GROUP, tagMap, PROCESS_LATENCY);
|
||||
addInvocationRateAndCount(processSensor,
|
||||
THREAD_LEVEL_GROUP,
|
||||
tagMap,
|
||||
PROCESS,
|
||||
PROCESS_TOTAL_DESCRIPTION,
|
||||
PROCESS_RATE_DESCRIPTION);
|
||||
|
||||
return processSensor;
|
||||
}
|
||||
|
||||
public static Sensor punctuateSensor(final StreamsMetricsImpl streamsMetrics) {
|
||||
final Sensor punctuateSensor = streamsMetrics.threadLevelSensor(PUNCTUATE, Sensor.RecordingLevel.INFO);
|
||||
final Map<String, String> tagMap = streamsMetrics.threadLevelTagMap();
|
||||
addAvgAndMax(punctuateSensor, THREAD_LEVEL_GROUP, tagMap, PUNCTUATE_LATENCY);
|
||||
addInvocationRateAndCount(punctuateSensor,
|
||||
THREAD_LEVEL_GROUP,
|
||||
tagMap,
|
||||
PUNCTUATE,
|
||||
PUNCTUATE_TOTAL_DESCRIPTION,
|
||||
PUNCTUATE_RATE_DESCRIPTION);
|
||||
|
||||
return punctuateSensor;
|
||||
}
|
||||
|
||||
public static Sensor skipRecordSensor(final StreamsMetricsImpl streamsMetrics) {
|
||||
final Sensor skippedRecordsSensor = streamsMetrics.threadLevelSensor(SKIP_RECORD, Sensor.RecordingLevel.INFO);
|
||||
addInvocationRateAndCount(skippedRecordsSensor,
|
||||
THREAD_LEVEL_GROUP,
|
||||
streamsMetrics.threadLevelTagMap(),
|
||||
SKIP_RECORD,
|
||||
SKIP_RECORD_TOTAL_DESCRIPTION,
|
||||
SKIP_RECORD_RATE_DESCRIPTION);
|
||||
|
||||
return skippedRecordsSensor;
|
||||
}
|
||||
|
||||
public static Sensor commitOverTasksSensor(final StreamsMetricsImpl streamsMetrics) {
|
||||
final Sensor commitOverTasksSensor = streamsMetrics.threadLevelSensor(COMMIT, Sensor.RecordingLevel.DEBUG);
|
||||
final Map<String, String> tagMap = streamsMetrics.threadLevelTagMap(TASK_ID_TAG, ALL_TASKS);
|
||||
addAvgAndMax(commitOverTasksSensor,
|
||||
TASK_LEVEL_GROUP,
|
||||
tagMap,
|
||||
COMMIT_LATENCY);
|
||||
addInvocationRateAndCount(commitOverTasksSensor,
|
||||
TASK_LEVEL_GROUP,
|
||||
tagMap,
|
||||
COMMIT,
|
||||
COMMIT_OVER_TASKS_TOTAL_DESCRIPTION,
|
||||
COMMIT_OVER_TASKS_RATE_DESCRIPTION);
|
||||
|
||||
return commitOverTasksSensor;
|
||||
}
|
||||
}
|
||||
|
|
@ -25,11 +25,11 @@ import org.apache.kafka.streams.StreamsBuilder;
|
|||
import org.apache.kafka.streams.StreamsConfig;
|
||||
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
|
||||
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
|
||||
import org.apache.kafka.streams.kstream.KGroupedStream;
|
||||
import org.apache.kafka.streams.kstream.Materialized;
|
||||
import org.apache.kafka.streams.kstream.Consumed;
|
||||
import org.apache.kafka.streams.kstream.Produced;
|
||||
import org.apache.kafka.streams.kstream.KGroupedStream;
|
||||
import org.apache.kafka.streams.kstream.KStream;
|
||||
import org.apache.kafka.streams.kstream.Materialized;
|
||||
import org.apache.kafka.streams.kstream.Produced;
|
||||
import org.apache.kafka.streams.kstream.SessionWindows;
|
||||
import org.apache.kafka.streams.kstream.TimeWindows;
|
||||
import org.apache.kafka.streams.state.SessionStore;
|
||||
|
|
@ -37,9 +37,9 @@ import org.apache.kafka.streams.state.Stores;
|
|||
import org.apache.kafka.streams.state.WindowStore;
|
||||
import org.apache.kafka.test.IntegrationTest;
|
||||
import org.apache.kafka.test.TestUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.After;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
|
|
|||
|
|
@ -32,6 +32,7 @@ import org.apache.kafka.streams.processor.Processor;
|
|||
import org.apache.kafka.streams.processor.To;
|
||||
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
|
||||
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
|
||||
import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
|
||||
import org.apache.kafka.streams.processor.internals.ToInternal;
|
||||
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
|
||||
import org.apache.kafka.streams.state.KeyValueIterator;
|
||||
|
|
@ -92,6 +93,8 @@ public class KStreamSessionWindowAggregateProcessorTest {
|
|||
final File stateDir = TestUtils.tempDirectory();
|
||||
metrics = new Metrics();
|
||||
final MockStreamsMetrics metrics = new MockStreamsMetrics(KStreamSessionWindowAggregateProcessorTest.this.metrics);
|
||||
ThreadMetrics.skipRecordSensor(metrics);
|
||||
|
||||
context = new InternalMockProcessorContext(
|
||||
stateDir,
|
||||
Serdes.String(),
|
||||
|
|
|
|||
|
|
@ -246,7 +246,6 @@ public class StreamTaskTest {
|
|||
throw new TimeoutException("test");
|
||||
}
|
||||
},
|
||||
null,
|
||||
null
|
||||
);
|
||||
fail("Expected an exception");
|
||||
|
|
@ -301,7 +300,6 @@ public class StreamTaskTest {
|
|||
}
|
||||
}
|
||||
},
|
||||
null,
|
||||
null
|
||||
);
|
||||
testTask.initializeTopology();
|
||||
|
|
@ -851,8 +849,7 @@ public class StreamTaskTest {
|
|||
public void flush() {
|
||||
flushed.set(true);
|
||||
}
|
||||
},
|
||||
metrics.sensor("dummy"));
|
||||
});
|
||||
streamTask.flushState();
|
||||
assertTrue(flushed.get());
|
||||
}
|
||||
|
|
@ -1427,8 +1424,7 @@ public class StreamTaskTest {
|
|||
stateDirectory,
|
||||
null,
|
||||
time,
|
||||
() -> producer = new MockProducer<>(false, bytesSerializer, bytesSerializer),
|
||||
metrics.sensor("dummy"));
|
||||
() -> producer = new MockProducer<>(false, bytesSerializer, bytesSerializer));
|
||||
task.initializeStateStores();
|
||||
task.initializeTopology();
|
||||
|
||||
|
|
@ -1498,8 +1494,7 @@ public class StreamTaskTest {
|
|||
stateDirectory,
|
||||
null,
|
||||
time,
|
||||
() -> producer = new MockProducer<>(false, bytesSerializer, bytesSerializer),
|
||||
metrics.sensor("dummy"));
|
||||
() -> producer = new MockProducer<>(false, bytesSerializer, bytesSerializer));
|
||||
}
|
||||
|
||||
private StreamTask createStatefulTaskThatThrowsExceptionOnClose() {
|
||||
|
|
@ -1520,8 +1515,7 @@ public class StreamTaskTest {
|
|||
stateDirectory,
|
||||
null,
|
||||
time,
|
||||
() -> producer = new MockProducer<>(false, bytesSerializer, bytesSerializer),
|
||||
metrics.sensor("dummy"));
|
||||
() -> producer = new MockProducer<>(false, bytesSerializer, bytesSerializer));
|
||||
}
|
||||
|
||||
private StreamTask createStatelessTask(final StreamsConfig streamsConfig) {
|
||||
|
|
@ -1546,8 +1540,7 @@ public class StreamTaskTest {
|
|||
stateDirectory,
|
||||
null,
|
||||
time,
|
||||
() -> producer = new MockProducer<>(false, bytesSerializer, bytesSerializer),
|
||||
metrics.sensor("dummy"));
|
||||
() -> producer = new MockProducer<>(false, bytesSerializer, bytesSerializer));
|
||||
}
|
||||
|
||||
// this task will throw exception when processing (on partition2), flushing, suspending and closing
|
||||
|
|
@ -1573,8 +1566,7 @@ public class StreamTaskTest {
|
|||
stateDirectory,
|
||||
null,
|
||||
time,
|
||||
() -> producer = new MockProducer<>(false, bytesSerializer, bytesSerializer),
|
||||
metrics.sensor("dummy")) {
|
||||
() -> producer = new MockProducer<>(false, bytesSerializer, bytesSerializer)) {
|
||||
@Override
|
||||
protected void flushState() {
|
||||
throw new RuntimeException("KABOOM!");
|
||||
|
|
|
|||
|
|
@ -56,6 +56,7 @@ import org.apache.kafka.streams.processor.PunctuationType;
|
|||
import org.apache.kafka.streams.processor.TaskId;
|
||||
import org.apache.kafka.streams.processor.TaskMetadata;
|
||||
import org.apache.kafka.streams.processor.ThreadMetadata;
|
||||
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
|
||||
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
|
||||
import org.apache.kafka.streams.state.KeyValueStore;
|
||||
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
|
||||
|
|
@ -254,35 +255,70 @@ public class StreamThreadTest {
|
|||
final StreamThread thread = createStreamThread(clientId, config, false);
|
||||
final String defaultGroupName = "stream-metrics";
|
||||
final Map<String, String> defaultTags = Collections.singletonMap("client-id", thread.getName());
|
||||
final String descriptionIsNotVerified = "";
|
||||
|
||||
assertNotNull(metrics.metrics().get(metrics.metricName("commit-latency-avg", defaultGroupName, "The average commit time in ms", defaultTags)));
|
||||
assertNotNull(metrics.metrics().get(metrics.metricName("commit-latency-max", defaultGroupName, "The maximum commit time in ms", defaultTags)));
|
||||
assertNotNull(metrics.metrics().get(metrics.metricName("commit-rate", defaultGroupName, "The average per-second number of commit calls", defaultTags)));
|
||||
assertNotNull(metrics.metrics().get(metrics.metricName("commit-total", defaultGroupName, "The total number of commit calls", defaultTags)));
|
||||
assertNotNull(metrics.metrics().get(metrics.metricName("poll-latency-avg", defaultGroupName, "The average poll time in ms", defaultTags)));
|
||||
assertNotNull(metrics.metrics().get(metrics.metricName("poll-latency-max", defaultGroupName, "The maximum poll time in ms", defaultTags)));
|
||||
assertNotNull(metrics.metrics().get(metrics.metricName("poll-rate", defaultGroupName, "The average per-second number of record-poll calls", defaultTags)));
|
||||
assertNotNull(metrics.metrics().get(metrics.metricName("poll-total", defaultGroupName, "The total number of record-poll calls", defaultTags)));
|
||||
assertNotNull(metrics.metrics().get(metrics.metricName("process-latency-avg", defaultGroupName, "The average process time in ms", defaultTags)));
|
||||
assertNotNull(metrics.metrics().get(metrics.metricName("process-latency-max", defaultGroupName, "The maximum process time in ms", defaultTags)));
|
||||
assertNotNull(metrics.metrics().get(metrics.metricName("process-rate", defaultGroupName, "The average per-second number of process calls", defaultTags)));
|
||||
assertNotNull(metrics.metrics().get(metrics.metricName("process-total", defaultGroupName, "The total number of process calls", defaultTags)));
|
||||
assertNotNull(metrics.metrics().get(metrics.metricName("punctuate-latency-avg", defaultGroupName, "The average punctuate time in ms", defaultTags)));
|
||||
assertNotNull(metrics.metrics().get(metrics.metricName("punctuate-latency-max", defaultGroupName, "The maximum punctuate time in ms", defaultTags)));
|
||||
assertNotNull(metrics.metrics().get(metrics.metricName("punctuate-rate", defaultGroupName, "The average per-second number of punctuate calls", defaultTags)));
|
||||
assertNotNull(metrics.metrics().get(metrics.metricName("punctuate-total", defaultGroupName, "The total number of punctuate calls", defaultTags)));
|
||||
assertNotNull(metrics.metrics().get(metrics.metricName("task-created-rate", defaultGroupName, "The average per-second number of newly created tasks", defaultTags)));
|
||||
assertNotNull(metrics.metrics().get(metrics.metricName("task-created-total", defaultGroupName, "The total number of newly created tasks", defaultTags)));
|
||||
assertNotNull(metrics.metrics().get(metrics.metricName("task-closed-rate", defaultGroupName, "The average per-second number of closed tasks", defaultTags)));
|
||||
assertNotNull(metrics.metrics().get(metrics.metricName("task-closed-total", defaultGroupName, "The total number of closed tasks", defaultTags)));
|
||||
assertNotNull(metrics.metrics().get(metrics.metricName("skipped-records-rate", defaultGroupName, "The average per-second number of skipped records.", defaultTags)));
|
||||
assertNotNull(metrics.metrics().get(metrics.metricName("skipped-records-total", defaultGroupName, "The total number of skipped records.", defaultTags)));
|
||||
assertNotNull(metrics.metrics().get(metrics.metricName(
|
||||
"commit-latency-avg", defaultGroupName, descriptionIsNotVerified, defaultTags)));
|
||||
assertNotNull(metrics.metrics().get(metrics.metricName(
|
||||
"commit-latency-max", defaultGroupName, descriptionIsNotVerified, defaultTags)));
|
||||
assertNotNull(metrics.metrics().get(metrics.metricName(
|
||||
"commit-rate", defaultGroupName, descriptionIsNotVerified, defaultTags)));
|
||||
assertNotNull(metrics.metrics().get(metrics.metricName(
|
||||
"commit-total", defaultGroupName, descriptionIsNotVerified, defaultTags)));
|
||||
assertNotNull(metrics.metrics().get(metrics.metricName(
|
||||
"poll-latency-avg", defaultGroupName, descriptionIsNotVerified, defaultTags)));
|
||||
assertNotNull(metrics.metrics().get(metrics.metricName(
|
||||
"poll-latency-max", defaultGroupName, descriptionIsNotVerified, defaultTags)));
|
||||
assertNotNull(metrics.metrics().get(metrics.metricName(
|
||||
"poll-rate", defaultGroupName, descriptionIsNotVerified, defaultTags)));
|
||||
assertNotNull(metrics.metrics().get(metrics.metricName(
|
||||
"poll-total", defaultGroupName, descriptionIsNotVerified, defaultTags)));
|
||||
assertNotNull(metrics.metrics().get(metrics.metricName(
|
||||
"process-latency-avg", defaultGroupName, descriptionIsNotVerified, defaultTags)));
|
||||
assertNotNull(metrics.metrics().get(metrics.metricName(
|
||||
"process-latency-max", defaultGroupName, descriptionIsNotVerified, defaultTags)));
|
||||
assertNotNull(metrics.metrics().get(metrics.metricName(
|
||||
"process-rate", defaultGroupName, descriptionIsNotVerified, defaultTags)));
|
||||
assertNotNull(metrics.metrics().get(metrics.metricName(
|
||||
"process-total", defaultGroupName, descriptionIsNotVerified, defaultTags)));
|
||||
assertNotNull(metrics.metrics().get(metrics.metricName(
|
||||
"punctuate-latency-avg", defaultGroupName, descriptionIsNotVerified, defaultTags)));
|
||||
assertNotNull(metrics.metrics().get(metrics.metricName(
|
||||
"punctuate-latency-max", defaultGroupName, descriptionIsNotVerified, defaultTags)));
|
||||
assertNotNull(metrics.metrics().get(metrics.metricName(
|
||||
"punctuate-rate", defaultGroupName, descriptionIsNotVerified, defaultTags)));
|
||||
assertNotNull(metrics.metrics().get(metrics.metricName(
|
||||
"punctuate-total", defaultGroupName, descriptionIsNotVerified, defaultTags)));
|
||||
assertNotNull(metrics.metrics().get(metrics.metricName(
|
||||
"task-created-rate", defaultGroupName, descriptionIsNotVerified, defaultTags)));
|
||||
assertNotNull(metrics.metrics().get(metrics.metricName(
|
||||
"task-created-total", defaultGroupName, descriptionIsNotVerified, defaultTags)));
|
||||
assertNotNull(metrics.metrics().get(metrics.metricName(
|
||||
"task-closed-rate", defaultGroupName, descriptionIsNotVerified, defaultTags)));
|
||||
assertNotNull(metrics.metrics().get(metrics.metricName(
|
||||
"task-closed-total", defaultGroupName, descriptionIsNotVerified, defaultTags)));
|
||||
assertNotNull(metrics.metrics().get(metrics.metricName(
|
||||
"skipped-records-rate", defaultGroupName, descriptionIsNotVerified, defaultTags)));
|
||||
assertNotNull(metrics.metrics().get(metrics.metricName(
|
||||
"skipped-records-total", defaultGroupName, descriptionIsNotVerified, defaultTags)));
|
||||
|
||||
final String taskGroupName = "stream-task-metrics";
|
||||
final Map<String, String> taskTags =
|
||||
mkMap(mkEntry("task-id", "all"), mkEntry("client-id", thread.getName()));
|
||||
assertNotNull(metrics.metrics().get(metrics.metricName(
|
||||
"commit-latency-avg", taskGroupName, descriptionIsNotVerified, taskTags)));
|
||||
assertNotNull(metrics.metrics().get(metrics.metricName(
|
||||
"commit-latency-max", taskGroupName, descriptionIsNotVerified, taskTags)));
|
||||
assertNotNull(metrics.metrics().get(metrics.metricName(
|
||||
"commit-rate", taskGroupName, descriptionIsNotVerified, taskTags)));
|
||||
|
||||
final JmxReporter reporter = new JmxReporter("kafka.streams");
|
||||
metrics.addReporter(reporter);
|
||||
assertEquals(clientId + "-StreamThread-1", thread.getName());
|
||||
assertTrue(reporter.containsMbean(String.format("kafka.streams:type=%s,client-id=%s",
|
||||
defaultGroupName, thread.getName())));
|
||||
defaultGroupName,
|
||||
thread.getName())));
|
||||
assertTrue(reporter.containsMbean("kafka.streams:type=stream-task-metrics,client-id=" + thread.getName() + ",task-id=all"));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -296,8 +332,7 @@ public class StreamThreadTest {
|
|||
final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
|
||||
final TaskManager taskManager = mockTaskManagerCommit(consumer, 1, 1);
|
||||
|
||||
final StreamThread.StreamsMetricsThreadImpl streamsMetrics
|
||||
= new StreamThread.StreamsMetricsThreadImpl(metrics, "");
|
||||
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, clientId);
|
||||
final StreamThread thread = new StreamThread(
|
||||
mockTime,
|
||||
config,
|
||||
|
|
@ -423,8 +458,7 @@ public class StreamThreadTest {
|
|||
final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
|
||||
final TaskManager taskManager = mockTaskManagerCommit(consumer, 1, 0);
|
||||
|
||||
final StreamThread.StreamsMetricsThreadImpl streamsMetrics
|
||||
= new StreamThread.StreamsMetricsThreadImpl(metrics, "");
|
||||
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, clientId);
|
||||
final StreamThread thread = new StreamThread(
|
||||
mockTime,
|
||||
config,
|
||||
|
|
@ -459,8 +493,7 @@ public class StreamThreadTest {
|
|||
final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
|
||||
final TaskManager taskManager = mockTaskManagerCommit(consumer, 2, 1);
|
||||
|
||||
final StreamThread.StreamsMetricsThreadImpl streamsMetrics
|
||||
= new StreamThread.StreamsMetricsThreadImpl(metrics, "");
|
||||
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, clientId);
|
||||
final StreamThread thread = new StreamThread(
|
||||
mockTime,
|
||||
config,
|
||||
|
|
@ -610,8 +643,7 @@ public class StreamThreadTest {
|
|||
EasyMock.expectLastCall();
|
||||
EasyMock.replay(taskManager, consumer);
|
||||
|
||||
final StreamThread.StreamsMetricsThreadImpl streamsMetrics
|
||||
= new StreamThread.StreamsMetricsThreadImpl(metrics, "");
|
||||
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, clientId);
|
||||
final StreamThread thread = new StreamThread(
|
||||
mockTime,
|
||||
config,
|
||||
|
|
@ -644,8 +676,7 @@ public class StreamThreadTest {
|
|||
EasyMock.expectLastCall();
|
||||
EasyMock.replay(taskManager, consumer);
|
||||
|
||||
final StreamThread.StreamsMetricsThreadImpl streamsMetrics
|
||||
= new StreamThread.StreamsMetricsThreadImpl(metrics, "");
|
||||
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, clientId);
|
||||
final StreamThread thread = new StreamThread(
|
||||
mockTime,
|
||||
config,
|
||||
|
|
@ -672,8 +703,7 @@ public class StreamThreadTest {
|
|||
EasyMock.expectLastCall();
|
||||
EasyMock.replay(taskManager, consumer);
|
||||
|
||||
final StreamThread.StreamsMetricsThreadImpl streamsMetrics
|
||||
= new StreamThread.StreamsMetricsThreadImpl(metrics, "");
|
||||
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, clientId);
|
||||
final StreamThread thread = new StreamThread(
|
||||
mockTime,
|
||||
config,
|
||||
|
|
@ -1449,8 +1479,7 @@ public class StreamThreadTest {
|
|||
final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
|
||||
final TaskManager taskManager = mockTaskManagerCommit(consumer, 1, 0);
|
||||
|
||||
final StreamThread.StreamsMetricsThreadImpl streamsMetrics
|
||||
= new StreamThread.StreamsMetricsThreadImpl(metrics, "");
|
||||
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, clientId);
|
||||
final StreamThread thread = new StreamThread(
|
||||
mockTime,
|
||||
config,
|
||||
|
|
@ -1489,8 +1518,7 @@ public class StreamThreadTest {
|
|||
final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
|
||||
final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
|
||||
|
||||
final StreamThread.StreamsMetricsThreadImpl streamsMetrics
|
||||
= new StreamThread.StreamsMetricsThreadImpl(metrics, "");
|
||||
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, clientId);
|
||||
final StreamThread thread = new StreamThread(
|
||||
mockTime,
|
||||
config,
|
||||
|
|
|
|||
|
|
@ -22,7 +22,10 @@ import org.apache.kafka.common.metrics.KafkaMetric;
|
|||
import org.apache.kafka.common.metrics.MetricConfig;
|
||||
import org.apache.kafka.common.metrics.Metrics;
|
||||
import org.apache.kafka.common.metrics.Sensor;
|
||||
import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
|
||||
import org.apache.kafka.common.utils.MockTime;
|
||||
import org.easymock.EasyMock;
|
||||
import org.easymock.EasyMockSupport;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Collections;
|
||||
|
|
@ -38,8 +41,34 @@ import static org.hamcrest.MatcherAssert.assertThat;
|
|||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNull;
|
||||
|
||||
public class StreamsMetricsImplTest {
|
||||
public class StreamsMetricsImplTest extends EasyMockSupport {
|
||||
|
||||
private final static String SENSOR_PREFIX_DELIMITER = ".";
|
||||
private final static String SENSOR_NAME_DELIMITER = ".s.";
|
||||
private final static String INTERNAL_PREFIX = "internal";
|
||||
|
||||
@Test
|
||||
public void shouldGetThreadLevelSensor() {
|
||||
final Metrics metrics = mock(Metrics.class);
|
||||
final String threadName = "thread1";
|
||||
final String sensorName = "sensor1";
|
||||
final String expectedFullSensorName =
|
||||
INTERNAL_PREFIX + SENSOR_PREFIX_DELIMITER + threadName + SENSOR_NAME_DELIMITER + sensorName;
|
||||
final RecordingLevel recordingLevel = RecordingLevel.DEBUG;
|
||||
final Sensor[] parents = {};
|
||||
EasyMock.expect(metrics.sensor(expectedFullSensorName, recordingLevel, parents)).andReturn(null);
|
||||
|
||||
replayAll();
|
||||
|
||||
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, threadName);
|
||||
final Sensor sensor = streamsMetrics.threadLevelSensor(sensorName, recordingLevel);
|
||||
|
||||
verifyAll();
|
||||
|
||||
assertNull(sensor);
|
||||
}
|
||||
|
||||
@Test(expected = NullPointerException.class)
|
||||
public void testNullMetrics() {
|
||||
|
|
@ -92,13 +121,13 @@ public class StreamsMetricsImplTest {
|
|||
|
||||
final Sensor parent1 = metrics.taskLevelSensor(taskName, operation, Sensor.RecordingLevel.DEBUG);
|
||||
addAvgMaxLatency(parent1, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation);
|
||||
addInvocationRateAndCount(parent1, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation);
|
||||
addInvocationRateAndCount(parent1, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation, "", "");
|
||||
|
||||
final int numberOfTaskMetrics = registry.metrics().size();
|
||||
|
||||
final Sensor sensor1 = metrics.nodeLevelSensor(taskName, processorNodeName, operation, Sensor.RecordingLevel.DEBUG, parent1);
|
||||
addAvgMaxLatency(sensor1, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation);
|
||||
addInvocationRateAndCount(sensor1, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation);
|
||||
addInvocationRateAndCount(sensor1, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation, "", "");
|
||||
|
||||
assertThat(registry.metrics().size(), greaterThan(numberOfTaskMetrics));
|
||||
|
||||
|
|
@ -108,13 +137,13 @@ public class StreamsMetricsImplTest {
|
|||
|
||||
final Sensor parent2 = metrics.taskLevelSensor(taskName, operation, Sensor.RecordingLevel.DEBUG);
|
||||
addAvgMaxLatency(parent2, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation);
|
||||
addInvocationRateAndCount(parent2, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation);
|
||||
addInvocationRateAndCount(parent2, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation, "", "");
|
||||
|
||||
assertThat(registry.metrics().size(), equalTo(numberOfTaskMetrics));
|
||||
|
||||
final Sensor sensor2 = metrics.nodeLevelSensor(taskName, processorNodeName, operation, Sensor.RecordingLevel.DEBUG, parent2);
|
||||
addAvgMaxLatency(sensor2, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation);
|
||||
addInvocationRateAndCount(sensor2, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation);
|
||||
addInvocationRateAndCount(sensor2, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation, "", "");
|
||||
|
||||
assertThat(registry.metrics().size(), greaterThan(numberOfTaskMetrics));
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,244 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.streams.processor.internals.metrics;
|
||||
|
||||
import org.apache.kafka.common.metrics.Metrics;
|
||||
import org.apache.kafka.common.metrics.Sensor;
|
||||
import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.powermock.core.classloader.annotations.PrepareForTest;
|
||||
import org.powermock.modules.junit4.PowerMockRunner;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ALL_TASKS;
|
||||
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TASK_ID_TAG;
|
||||
import static org.easymock.EasyMock.expect;
|
||||
import static org.hamcrest.CoreMatchers.is;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.powermock.api.easymock.PowerMock.createStrictMock;
|
||||
import static org.powermock.api.easymock.PowerMock.mockStatic;
|
||||
import static org.powermock.api.easymock.PowerMock.replay;
|
||||
import static org.powermock.api.easymock.PowerMock.replayAll;
|
||||
import static org.powermock.api.easymock.PowerMock.verify;
|
||||
import static org.powermock.api.easymock.PowerMock.verifyAll;
|
||||
|
||||
@RunWith(PowerMockRunner.class)
|
||||
@PrepareForTest(StreamsMetricsImpl.class)
|
||||
public class ThreadMetricsTest {
|
||||
|
||||
private static final String THREAD_LEVEL_GROUP = "stream-metrics";
|
||||
private static final String TASK_LEVEL_GROUP = "stream-task-metrics";
|
||||
|
||||
private final Metrics dummyMetrics = new Metrics();
|
||||
private final Sensor dummySensor = dummyMetrics.sensor("dummy");
|
||||
private final StreamsMetricsImpl streamsMetrics = createStrictMock(StreamsMetricsImpl.class);
|
||||
private final Map<String, String> dummyTagMap = Collections.singletonMap("hello", "world");
|
||||
|
||||
@Test
|
||||
public void shouldGetCreateTaskSensor() {
|
||||
final String operation = "task-created";
|
||||
final String totalDescription = "The total number of newly created tasks";
|
||||
final String rateDescription = "The average per-second number of newly created tasks";
|
||||
mockStatic(StreamsMetricsImpl.class);
|
||||
expect(streamsMetrics.threadLevelSensor(operation, RecordingLevel.INFO)).andReturn(dummySensor);
|
||||
expect(streamsMetrics.threadLevelTagMap()).andReturn(dummyTagMap);
|
||||
StreamsMetricsImpl.addInvocationRateAndCount(
|
||||
dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operation, totalDescription, rateDescription);
|
||||
|
||||
replayAll();
|
||||
replay(StreamsMetricsImpl.class);
|
||||
|
||||
final Sensor sensor = ThreadMetrics.createTaskSensor(streamsMetrics);
|
||||
|
||||
verifyAll();
|
||||
verify(StreamsMetricsImpl.class);
|
||||
|
||||
assertThat(sensor, is(dummySensor));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldGetCloseTaskSensor() {
|
||||
final String operation = "task-closed";
|
||||
final String totalDescription = "The total number of closed tasks";
|
||||
final String rateDescription = "The average per-second number of closed tasks";
|
||||
mockStatic(StreamsMetricsImpl.class);
|
||||
expect(streamsMetrics.threadLevelSensor(operation, RecordingLevel.INFO)).andReturn(dummySensor);
|
||||
expect(streamsMetrics.threadLevelTagMap()).andReturn(dummyTagMap);
|
||||
StreamsMetricsImpl.addInvocationRateAndCount(
|
||||
dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operation, totalDescription, rateDescription);
|
||||
|
||||
replayAll();
|
||||
replay(StreamsMetricsImpl.class);
|
||||
|
||||
final Sensor sensor = ThreadMetrics.closeTaskSensor(streamsMetrics);
|
||||
|
||||
verifyAll();
|
||||
verify(StreamsMetricsImpl.class);
|
||||
|
||||
assertThat(sensor, is(dummySensor));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldGetCommitSensor() {
|
||||
final String operation = "commit";
|
||||
final String operationLatency = operation + StreamsMetricsImpl.LATENCY_SUFFIX;
|
||||
final String totalDescription = "The total number of commit calls";
|
||||
final String rateDescription = "The average per-second number of commit calls";
|
||||
mockStatic(StreamsMetricsImpl.class);
|
||||
expect(streamsMetrics.threadLevelSensor(operation, RecordingLevel.INFO)).andReturn(dummySensor);
|
||||
expect(streamsMetrics.threadLevelTagMap()).andReturn(dummyTagMap);
|
||||
StreamsMetricsImpl.addInvocationRateAndCount(
|
||||
dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operation, totalDescription, rateDescription);
|
||||
StreamsMetricsImpl.addAvgAndMax(
|
||||
dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operationLatency);
|
||||
|
||||
replayAll();
|
||||
replay(StreamsMetricsImpl.class);
|
||||
|
||||
final Sensor sensor = ThreadMetrics.commitSensor(streamsMetrics);
|
||||
|
||||
verifyAll();
|
||||
verify(StreamsMetricsImpl.class);
|
||||
|
||||
assertThat(sensor, is(dummySensor));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldGetPollSensor() {
|
||||
final String operation = "poll";
|
||||
final String operationLatency = operation + StreamsMetricsImpl.LATENCY_SUFFIX;
|
||||
final String totalDescription = "The total number of poll calls";
|
||||
final String rateDescription = "The average per-second number of poll calls";
|
||||
mockStatic(StreamsMetricsImpl.class);
|
||||
expect(streamsMetrics.threadLevelSensor(operation, RecordingLevel.INFO)).andReturn(dummySensor);
|
||||
expect(streamsMetrics.threadLevelTagMap()).andReturn(dummyTagMap);
|
||||
StreamsMetricsImpl.addInvocationRateAndCount(
|
||||
dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operation, totalDescription, rateDescription);
|
||||
StreamsMetricsImpl.addAvgAndMax(
|
||||
dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operationLatency);
|
||||
|
||||
replayAll();
|
||||
replay(StreamsMetricsImpl.class);
|
||||
|
||||
final Sensor sensor = ThreadMetrics.pollSensor(streamsMetrics);
|
||||
|
||||
verifyAll();
|
||||
verify(StreamsMetricsImpl.class);
|
||||
|
||||
assertThat(sensor, is(dummySensor));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldGetProcessSensor() {
|
||||
final String operation = "process";
|
||||
final String operationLatency = operation + StreamsMetricsImpl.LATENCY_SUFFIX;
|
||||
final String totalDescription = "The total number of process calls";
|
||||
final String rateDescription = "The average per-second number of process calls";
|
||||
mockStatic(StreamsMetricsImpl.class);
|
||||
expect(streamsMetrics.threadLevelSensor(operation, RecordingLevel.INFO)).andReturn(dummySensor);
|
||||
expect(streamsMetrics.threadLevelTagMap()).andReturn(dummyTagMap);
|
||||
StreamsMetricsImpl.addInvocationRateAndCount(
|
||||
dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operation, totalDescription, rateDescription);
|
||||
StreamsMetricsImpl.addAvgAndMax(
|
||||
dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operationLatency);
|
||||
|
||||
replayAll();
|
||||
replay(StreamsMetricsImpl.class);
|
||||
|
||||
final Sensor sensor = ThreadMetrics.processSensor(streamsMetrics);
|
||||
|
||||
verifyAll();
|
||||
verify(StreamsMetricsImpl.class);
|
||||
|
||||
assertThat(sensor, is(dummySensor));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldGetPunctuateSensor() {
|
||||
final String operation = "punctuate";
|
||||
final String operationLatency = operation + StreamsMetricsImpl.LATENCY_SUFFIX;
|
||||
final String totalDescription = "The total number of punctuate calls";
|
||||
final String rateDescription = "The average per-second number of punctuate calls";
|
||||
mockStatic(StreamsMetricsImpl.class);
|
||||
expect(streamsMetrics.threadLevelSensor(operation, RecordingLevel.INFO)).andReturn(dummySensor);
|
||||
expect(streamsMetrics.threadLevelTagMap()).andReturn(dummyTagMap);
|
||||
StreamsMetricsImpl.addInvocationRateAndCount(
|
||||
dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operation, totalDescription, rateDescription);
|
||||
StreamsMetricsImpl.addAvgAndMax(
|
||||
dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operationLatency);
|
||||
|
||||
replayAll();
|
||||
replay(StreamsMetricsImpl.class);
|
||||
|
||||
final Sensor sensor = ThreadMetrics.punctuateSensor(streamsMetrics);
|
||||
|
||||
verifyAll();
|
||||
verify(StreamsMetricsImpl.class);
|
||||
|
||||
assertThat(sensor, is(dummySensor));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldGetSkipRecordSensor() {
|
||||
final String operation = "skipped-records";
|
||||
final String totalDescription = "The total number of skipped records";
|
||||
final String rateDescription = "The average per-second number of skipped records";
|
||||
mockStatic(StreamsMetricsImpl.class);
|
||||
expect(streamsMetrics.threadLevelSensor(operation, RecordingLevel.INFO)).andReturn(dummySensor);
|
||||
expect(streamsMetrics.threadLevelTagMap()).andReturn(dummyTagMap);
|
||||
StreamsMetricsImpl.addInvocationRateAndCount(
|
||||
dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operation, totalDescription, rateDescription);
|
||||
|
||||
replayAll();
|
||||
replay(StreamsMetricsImpl.class);
|
||||
|
||||
final Sensor sensor = ThreadMetrics.skipRecordSensor(streamsMetrics);
|
||||
|
||||
verifyAll();
|
||||
verify(StreamsMetricsImpl.class);
|
||||
|
||||
assertThat(sensor, is(dummySensor));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldGetCommitOverTasksSensor() {
|
||||
final String operation = "commit";
|
||||
final String operationLatency = operation + StreamsMetricsImpl.LATENCY_SUFFIX;
|
||||
final String totalDescription = "The total number of commit calls over all tasks";
|
||||
final String rateDescription = "The average per-second number of commit calls over all tasks";
|
||||
mockStatic(StreamsMetricsImpl.class);
|
||||
expect(streamsMetrics.threadLevelSensor(operation, RecordingLevel.DEBUG)).andReturn(dummySensor);
|
||||
expect(streamsMetrics.threadLevelTagMap(TASK_ID_TAG, ALL_TASKS)).andReturn(dummyTagMap);
|
||||
StreamsMetricsImpl.addInvocationRateAndCount(
|
||||
dummySensor, TASK_LEVEL_GROUP, dummyTagMap, operation, totalDescription, rateDescription);
|
||||
StreamsMetricsImpl.addAvgAndMax(
|
||||
dummySensor, TASK_LEVEL_GROUP, dummyTagMap, operationLatency);
|
||||
|
||||
replayAll();
|
||||
replay(StreamsMetricsImpl.class);
|
||||
|
||||
final Sensor sensor = ThreadMetrics.commitOverTasksSensor(streamsMetrics);
|
||||
|
||||
verifyAll();
|
||||
verify(StreamsMetricsImpl.class);
|
||||
|
||||
assertThat(sensor, is(dummySensor));
|
||||
}
|
||||
}
|
||||
|
|
@ -317,8 +317,7 @@ public class StreamThreadStateStoreProviderTest {
|
|||
stateDirectory,
|
||||
null,
|
||||
new MockTime(),
|
||||
() -> clientSupplier.getProducer(new HashMap<>()),
|
||||
metrics.sensor("dummy")) {
|
||||
() -> clientSupplier.getProducer(new HashMap<>())) {
|
||||
@Override
|
||||
protected void updateOffsetLimits() {}
|
||||
};
|
||||
|
|
|
|||
|
|
@ -32,6 +32,9 @@ import org.apache.kafka.common.header.internals.RecordHeaders;
|
|||
import org.apache.kafka.common.metrics.MetricConfig;
|
||||
import org.apache.kafka.common.metrics.Metrics;
|
||||
import org.apache.kafka.common.metrics.Sensor;
|
||||
import org.apache.kafka.common.metrics.stats.Count;
|
||||
import org.apache.kafka.common.metrics.stats.Rate;
|
||||
import org.apache.kafka.common.metrics.stats.Total;
|
||||
import org.apache.kafka.common.record.TimestampType;
|
||||
import org.apache.kafka.common.serialization.ByteArraySerializer;
|
||||
import org.apache.kafka.common.serialization.Deserializer;
|
||||
|
|
@ -277,10 +280,21 @@ public class TopologyTestDriver implements Closeable {
|
|||
.timeWindow(streamsConfig.getLong(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS);
|
||||
|
||||
metrics = new Metrics(metricConfig, mockWallClockTime);
|
||||
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(
|
||||
metrics,
|
||||
"topology-test-driver-virtual-thread"
|
||||
);
|
||||
|
||||
final String threadName = "topology-test-driver-virtual-thread";
|
||||
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, threadName);
|
||||
final Sensor skippedRecordsSensor = streamsMetrics.threadLevelSensor("skipped-records", Sensor.RecordingLevel.INFO);
|
||||
final String threadLevelGroup = "stream-metrics";
|
||||
skippedRecordsSensor.add(new MetricName("skipped-records-rate",
|
||||
threadLevelGroup,
|
||||
"The average per-second number of skipped records",
|
||||
streamsMetrics.tagMap()),
|
||||
new Rate(TimeUnit.SECONDS, new Count()));
|
||||
skippedRecordsSensor.add(new MetricName("skipped-records-total",
|
||||
threadLevelGroup,
|
||||
"The total number of skipped records",
|
||||
streamsMetrics.tagMap()),
|
||||
new Total());
|
||||
final ThreadCache cache = new ThreadCache(
|
||||
new LogContext("topology-test-driver "),
|
||||
Math.max(0, streamsConfig.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG)),
|
||||
|
|
@ -360,8 +374,7 @@ public class TopologyTestDriver implements Closeable {
|
|||
stateDirectory,
|
||||
cache,
|
||||
mockWallClockTime,
|
||||
() -> producer,
|
||||
metrics.sensor("dummy"));
|
||||
() -> producer);
|
||||
task.initializeStateStores();
|
||||
task.initializeTopology();
|
||||
((InternalProcessorContext) task.context()).setRecordContext(new ProcessorRecordContext(
|
||||
|
|
|
|||
|
|
@ -33,6 +33,7 @@ import org.apache.kafka.streams.kstream.Transformer;
|
|||
import org.apache.kafka.streams.kstream.ValueTransformer;
|
||||
import org.apache.kafka.streams.processor.internals.RecordCollector;
|
||||
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
|
||||
import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
|
||||
import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
|
||||
|
||||
import java.io.File;
|
||||
|
|
@ -214,10 +215,9 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
|
|||
this.stateDir = stateDir;
|
||||
final MetricConfig metricConfig = new MetricConfig();
|
||||
metricConfig.recordLevel(Sensor.RecordingLevel.DEBUG);
|
||||
this.metrics = new StreamsMetricsImpl(
|
||||
new Metrics(metricConfig),
|
||||
"mock-processor-context-virtual-thread"
|
||||
);
|
||||
final String threadName = "mock-processor-context-virtual-thread";
|
||||
this.metrics = new StreamsMetricsImpl(new Metrics(metricConfig), threadName);
|
||||
ThreadMetrics.skipRecordSensor(metrics);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
Loading…
Reference in New Issue