KAFKA-17561: add processId tag to thread-state metric (#18581)

Part of KIP-1091.

Reviewers: Bill Bejeck <bill@confluent.io>
This commit is contained in:
Matthias J. Sax 2025-01-21 08:09:43 -08:00 committed by GitHub
parent 3d49159c84
commit faff2de6a5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 82 additions and 13 deletions

View File

@ -2826,7 +2826,7 @@ All the following metrics have a recording level of <code>info</code>:
<tr>
<td>thread-state</td>
<td>The state of the thread as a number (<code>ordinal()</code> of the corresponding enum).</td>
<td>kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)</td>
<td>kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+),process-id=([-.\w]+)</td>
</tr>
<tr>
<td>commit-latency-avg</td>

View File

@ -497,6 +497,7 @@ public class StreamThread extends Thread implements ProcessingThread {
stateUpdater,
streamsMetrics,
topologyMetadata,
processId,
threadId,
logContext,
referenceContainer.assignmentErrorCode,
@ -574,6 +575,7 @@ public class StreamThread extends Thread implements ProcessingThread {
final StateUpdater stateUpdater,
final StreamsMetricsImpl streamsMetrics,
final TopologyMetadata topologyMetadata,
final UUID processId,
final String threadId,
final LogContext logContext,
final AtomicInteger assignmentErrorCode,
@ -618,6 +620,7 @@ public class StreamThread extends Thread implements ProcessingThread {
time.milliseconds()
);
ThreadMetrics.addThreadStateTelemetryMetric(
processId.toString(),
threadId,
streamsMetrics,
(metricConfig, now) -> this.state().ordinal());

View File

@ -232,8 +232,15 @@ public class StreamsMetricsImpl implements StreamsMetrics {
final String description,
final String threadId,
final Gauge<T> valueProvider) {
addThreadLevelMutableMetric(name, description, threadId, Collections.emptyMap(), valueProvider);
}
public <T> void addThreadLevelMutableMetric(final String name,
final String description,
final String threadId,
final Map<String, String> additionalTags,
final Gauge<T> valueProvider) {
final MetricName metricName = metrics.metricName(
name, THREAD_LEVEL_GROUP, description, threadLevelTagMap(threadId));
name, THREAD_LEVEL_GROUP, description, threadLevelTagMap(threadId, additionalTags));
synchronized (threadLevelMetrics) {
threadLevelMetrics.computeIfAbsent(
threadSensorPrefix(threadId),
@ -279,7 +286,11 @@ public class StreamsMetricsImpl implements StreamsMetrics {
}
public Map<String, String> threadLevelTagMap(final String threadId) {
final Map<String, String> tagMap = new LinkedHashMap<>();
return threadLevelTagMap(threadId, Collections.emptyMap());
}
public Map<String, String> threadLevelTagMap(final String threadId, final Map<String, String> additionalTags) {
final Map<String, String> tagMap = new LinkedHashMap<>(additionalTags);
tagMap.put(THREAD_ID_TAG, threadId);
return tagMap;
}
@ -325,32 +336,40 @@ public class StreamsMetricsImpl implements StreamsMetrics {
}
public Map<String, String> taskLevelTagMap(final String threadId, final String taskId) {
final Map<String, String> tagMap = threadLevelTagMap(threadId);
final Map<String, String> tagMap = new LinkedHashMap<>();
tagMap.put(THREAD_ID_TAG, threadId);
tagMap.put(TASK_ID_TAG, taskId);
return tagMap;
}
public Map<String, String> nodeLevelTagMap(final String threadId,
final String taskName,
final String taskId,
final String processorNodeName) {
final Map<String, String> tagMap = taskLevelTagMap(threadId, taskName);
final Map<String, String> tagMap = new LinkedHashMap<>();
tagMap.put(THREAD_ID_TAG, threadId);
tagMap.put(TASK_ID_TAG, taskId);
tagMap.put(PROCESSOR_NODE_ID_TAG, processorNodeName);
return tagMap;
}
public Map<String, String> topicLevelTagMap(final String threadId,
final String taskName,
final String taskId,
final String processorNodeName,
final String topicName) {
final Map<String, String> tagMap = nodeLevelTagMap(threadId, taskName, processorNodeName);
final Map<String, String> tagMap = new LinkedHashMap<>();
tagMap.put(THREAD_ID_TAG, threadId);
tagMap.put(TASK_ID_TAG, taskId);
tagMap.put(PROCESSOR_NODE_ID_TAG, processorNodeName);
tagMap.put(TOPIC_NAME_TAG, topicName);
return tagMap;
}
public Map<String, String> storeLevelTagMap(final String taskName,
public Map<String, String> storeLevelTagMap(final String taskId,
final String storeType,
final String storeName) {
final Map<String, String> tagMap = taskLevelTagMap(Thread.currentThread().getName(), taskName);
final Map<String, String> tagMap = new LinkedHashMap<>();
tagMap.put(THREAD_ID_TAG, Thread.currentThread().getName());
tagMap.put(TASK_ID_TAG, taskId);
tagMap.put(storeType + "-" + STORE_ID_TAG, storeName);
return tagMap;
}

View File

@ -22,9 +22,11 @@ import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.StreamThreadTotalBlockedTime;
import java.util.Collections;
import java.util.Map;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.LATENCY_SUFFIX;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESS_ID_TAG;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATE_DESCRIPTION;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATE_SUFFIX;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATIO_SUFFIX;
@ -296,13 +298,15 @@ public class ThreadMetrics {
);
}
public static void addThreadStateTelemetryMetric(final String threadId,
public static void addThreadStateTelemetryMetric(final String processId,
final String threadId,
final StreamsMetricsImpl streamsMetrics,
final Gauge<Integer> threadStateProvider) {
streamsMetrics.addThreadLevelMutableMetric(
THREAD_STATE,
THREAD_STATE_DESCRIPTION,
threadId,
Collections.singletonMap(PROCESS_ID_TAG, processId),
threadStateProvider
);
}

View File

@ -1441,6 +1441,7 @@ public class StreamThreadTest {
null,
streamsMetrics,
new TopologyMetadata(internalTopologyBuilder, config),
PROCESS_ID,
CLIENT_ID,
new LogContext(""),
new AtomicInteger(),
@ -2667,6 +2668,7 @@ public class StreamThreadTest {
null,
streamsMetrics,
topologyMetadata,
PROCESS_ID,
CLIENT_ID,
new LogContext(""),
new AtomicInteger(),
@ -2725,6 +2727,7 @@ public class StreamThreadTest {
null,
streamsMetrics,
topologyMetadata,
PROCESS_ID,
CLIENT_ID,
new LogContext(""),
new AtomicInteger(),
@ -2792,6 +2795,7 @@ public class StreamThreadTest {
null,
streamsMetrics,
topologyMetadata,
PROCESS_ID,
CLIENT_ID,
new LogContext(""),
new AtomicInteger(),
@ -2855,6 +2859,7 @@ public class StreamThreadTest {
null,
streamsMetrics,
topologyMetadata,
PROCESS_ID,
CLIENT_ID,
new LogContext(""),
new AtomicInteger(),
@ -2915,6 +2920,7 @@ public class StreamThreadTest {
null,
streamsMetrics,
topologyMetadata,
PROCESS_ID,
CLIENT_ID,
new LogContext(""),
new AtomicInteger(),
@ -3148,6 +3154,7 @@ public class StreamThreadTest {
null,
streamsMetrics,
topologyMetadata,
PROCESS_ID,
CLIENT_ID,
new LogContext(""),
new AtomicInteger(),
@ -3204,6 +3211,7 @@ public class StreamThreadTest {
null,
streamsMetrics,
topologyMetadata,
PROCESS_ID,
CLIENT_ID,
new LogContext(""),
new AtomicInteger(),
@ -3583,6 +3591,7 @@ public class StreamThreadTest {
null,
new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), mockTime),
topologyMetadata,
PROCESS_ID,
"thread-id",
new LogContext(),
null,
@ -3704,6 +3713,7 @@ public class StreamThreadTest {
null,
streamsMetrics,
topologyMetadata,
PROCESS_ID,
CLIENT_ID,
new LogContext(""),
new AtomicInteger(),

View File

@ -1281,7 +1281,35 @@ public class StreamsMetricsImplTest {
final MetricName name = metrics.metricName(
"foobar",
THREAD_LEVEL_GROUP,
Collections.singletonMap("thread-id", "t1")
mkMap(
mkEntry("thread-id", "t1")
)
);
assertThat(metrics.metric(name), notNullValue());
assertThat(metrics.metric(name).metricValue(), equalTo(measuredValue));
}
@Test
public void shouldAddThreadLevelMutableMetricWithAdditionalTags() {
final int measuredValue = 123;
final StreamsMetricsImpl streamsMetrics
= new StreamsMetricsImpl(metrics, THREAD_ID1, PROCESS_ID, time);
streamsMetrics.addThreadLevelMutableMetric(
"foobar",
"test metric",
"t1",
Collections.singletonMap("additional-tag", "additional-value"),
(c, t) -> measuredValue
);
final MetricName name = metrics.metricName(
"foobar",
THREAD_LEVEL_GROUP,
mkMap(
mkEntry("thread-id", "t1"),
mkEntry("additional-tag", "additional-value")
)
);
assertThat(metrics.metric(name), notNullValue());
assertThat(metrics.metric(name).metricValue(), equalTo(measuredValue));
@ -1325,7 +1353,9 @@ public class StreamsMetricsImplTest {
final MetricName name = metrics.metricName(
"foobar",
THREAD_LEVEL_GROUP,
Collections.singletonMap("thread-id", "t1")
mkMap(
mkEntry("thread-id", "t1")
)
);
assertThat(metrics.metric(name), notNullValue());
assertThat(metrics.metric(name).metricValue(), equalTo(measuredValue));

View File

@ -41,6 +41,7 @@ import static org.mockito.Mockito.when;
public class ThreadMetricsTest {
private static final String PROCESS_ID = "process-id";
private static final String THREAD_ID = "thread-id";
private static final String THREAD_LEVEL_GROUP = "stream-thread-metrics";
@ -418,6 +419,7 @@ public class ThreadMetricsTest {
public void shouldAddThreadStateTelemetryMetric() {
final Gauge<Integer> threadStateProvider = (streamsMetrics, startTime) -> StreamThread.State.RUNNING.ordinal();
ThreadMetrics.addThreadStateTelemetryMetric(
PROCESS_ID,
THREAD_ID,
streamsMetrics,
threadStateProvider
@ -426,6 +428,7 @@ public class ThreadMetricsTest {
"thread-state",
"The current state of the thread",
THREAD_ID,
Collections.singletonMap("process-id", PROCESS_ID),
threadStateProvider
);
}