KAFKA-19275 client-state and thread-state metrics are always "Unavailable" (#19712)
CI / build (push) Has been cancelled Details

Fix the issue where JMC is unable to correctly display client-state and
thread-state metrics. The root cause is that these two metrics directly
return the `State` class to JMX. If the user has not set up the RMI
server, JMC or other monitoring tools will be unable to interpret the
`State` class. To resolve this, we should return a string representation
of the state instead of the State class in these two metrics.

Reviewers: Luke Chen <showuon@gmail.com>, Ken Huang
 <s7133700@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Kuan-Po Tseng 2025-05-14 14:07:32 +08:00 committed by Chia-Ping Tsai
parent d64a97099d
commit f99db0804e
7 changed files with 13 additions and 15 deletions

View File

@ -272,7 +272,7 @@ public class MetricsIntegrationTest {
kafkaStreams = new KafkaStreams(topology, streamsConfiguration); kafkaStreams = new KafkaStreams(topology, streamsConfiguration);
verifyAliveStreamThreadsMetric(); verifyAliveStreamThreadsMetric();
verifyStateMetric(State.CREATED); verifyStateMetric(State.CREATED.name());
verifyTopologyDescriptionMetric(topology.describe().toString()); verifyTopologyDescriptionMetric(topology.describe().toString());
verifyApplicationIdMetric(); verifyApplicationIdMetric();
@ -283,7 +283,7 @@ public class MetricsIntegrationTest {
() -> "Kafka Streams application did not reach state RUNNING in " + timeout + " ms"); () -> "Kafka Streams application did not reach state RUNNING in " + timeout + " ms");
verifyAliveStreamThreadsMetric(); verifyAliveStreamThreadsMetric();
verifyStateMetric(State.RUNNING); verifyStateMetric(State.RUNNING.name());
} }
private void produceRecordsForTwoSegments(final Duration segmentInterval) { private void produceRecordsForTwoSegments(final Duration segmentInterval) {
@ -357,7 +357,7 @@ public class MetricsIntegrationTest {
.to(STREAM_OUTPUT_4); .to(STREAM_OUTPUT_4);
startApplication(); startApplication();
verifyStateMetric(State.RUNNING); verifyStateMetric(State.RUNNING.name());
checkClientLevelMetrics(); checkClientLevelMetrics();
checkThreadLevelMetrics(); checkThreadLevelMetrics();
checkTaskLevelMetrics(); checkTaskLevelMetrics();
@ -392,7 +392,7 @@ public class MetricsIntegrationTest {
produceRecordsForClosingWindow(windowSize); produceRecordsForClosingWindow(windowSize);
startApplication(); startApplication();
verifyStateMetric(State.RUNNING); verifyStateMetric(State.RUNNING.name());
checkWindowStoreAndSuppressionBufferMetrics(); checkWindowStoreAndSuppressionBufferMetrics();
@ -421,7 +421,7 @@ public class MetricsIntegrationTest {
startApplication(); startApplication();
verifyStateMetric(State.RUNNING); verifyStateMetric(State.RUNNING.name());
checkSessionStoreMetrics(); checkSessionStoreMetrics();
@ -439,14 +439,14 @@ public class MetricsIntegrationTest {
assertThat(metricsList.get(0).metricValue(), is(NUM_THREADS)); assertThat(metricsList.get(0).metricValue(), is(NUM_THREADS));
} }
private void verifyStateMetric(final State state) { private void verifyStateMetric(final String state) {
final List<Metric> metricsList = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream() final List<Metric> metricsList = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream()
.filter(m -> m.metricName().name().equals(STATE) && .filter(m -> m.metricName().name().equals(STATE) &&
m.metricName().group().equals(STREAM_CLIENT_NODE_METRICS)) m.metricName().group().equals(STREAM_CLIENT_NODE_METRICS))
.collect(Collectors.toList()); .collect(Collectors.toList());
assertThat(metricsList.size(), is(1)); assertThat(metricsList.size(), is(1));
assertThat(metricsList.get(0).metricValue(), is(state)); assertThat(metricsList.get(0).metricValue(), is(state));
assertThat(metricsList.get(0).metricValue().toString(), is(state.toString())); assertThat(metricsList.get(0).metricValue().toString(), is(state));
} }
private void verifyTopologyDescriptionMetric(final String topologyDescription) { private void verifyTopologyDescriptionMetric(final String topologyDescription) {

View File

@ -983,7 +983,7 @@ public class KafkaStreams implements AutoCloseable {
ClientMetrics.addCommitIdMetric(streamsMetrics); ClientMetrics.addCommitIdMetric(streamsMetrics);
ClientMetrics.addApplicationIdMetric(streamsMetrics, applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG)); ClientMetrics.addApplicationIdMetric(streamsMetrics, applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG));
ClientMetrics.addTopologyDescriptionMetric(streamsMetrics, (metricsConfig, now) -> this.topologyMetadata.topologyDescriptionString()); ClientMetrics.addTopologyDescriptionMetric(streamsMetrics, (metricsConfig, now) -> this.topologyMetadata.topologyDescriptionString());
ClientMetrics.addStateMetric(streamsMetrics, (metricsConfig, now) -> state); ClientMetrics.addStateMetric(streamsMetrics, (metricsConfig, now) -> state.name());
ClientMetrics.addClientStateTelemetryMetric(streamsMetrics, (metricsConfig, now) -> state.ordinal()); ClientMetrics.addClientStateTelemetryMetric(streamsMetrics, (metricsConfig, now) -> state.ordinal());
ClientMetrics.addClientRecordingLevelMetric(streamsMetrics, calculateMetricsRecordingLevel()); ClientMetrics.addClientRecordingLevelMetric(streamsMetrics, calculateMetricsRecordingLevel());
threads = Collections.synchronizedList(new LinkedList<>()); threads = Collections.synchronizedList(new LinkedList<>());

View File

@ -19,7 +19,6 @@ package org.apache.kafka.streams.internals.metrics;
import org.apache.kafka.common.metrics.Gauge; import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.Sensor.RecordingLevel; import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
import org.apache.kafka.streams.KafkaStreams.State;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -118,7 +117,7 @@ public class ClientMetrics {
} }
public static void addStateMetric(final StreamsMetricsImpl streamsMetrics, public static void addStateMetric(final StreamsMetricsImpl streamsMetrics,
final Gauge<State> stateProvider) { final Gauge<String> stateProvider) {
streamsMetrics.addClientLevelMutableMetric( streamsMetrics.addClientLevelMutableMetric(
STATE, STATE,
STATE_DESCRIPTION, STATE_DESCRIPTION,

View File

@ -628,7 +628,7 @@ public class StreamThread extends Thread implements ProcessingThread {
ThreadMetrics.addThreadStateMetric( ThreadMetrics.addThreadStateMetric(
threadId, threadId,
streamsMetrics, streamsMetrics,
(metricConfig, now) -> this.state()); (metricConfig, now) -> this.state().name());
ThreadMetrics.addThreadBlockedTimeMetric( ThreadMetrics.addThreadBlockedTimeMetric(
threadId, threadId,
new StreamThreadTotalBlockedTime( new StreamThreadTotalBlockedTime(

View File

@ -19,7 +19,6 @@ package org.apache.kafka.streams.processor.internals.metrics;
import org.apache.kafka.common.metrics.Gauge; import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.Sensor.RecordingLevel; import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.StreamThreadTotalBlockedTime; import org.apache.kafka.streams.processor.internals.StreamThreadTotalBlockedTime;
import java.util.Collections; import java.util.Collections;
@ -313,7 +312,7 @@ public class ThreadMetrics {
public static void addThreadStateMetric(final String threadId, public static void addThreadStateMetric(final String threadId,
final StreamsMetricsImpl streamsMetrics, final StreamsMetricsImpl streamsMetrics,
final Gauge<StreamThread.State> threadStateProvider) { final Gauge<String> threadStateProvider) {
streamsMetrics.addThreadLevelMutableMetric( streamsMetrics.addThreadLevelMutableMetric(
STATE, STATE,
THREAD_STATE_DESCRIPTION, THREAD_STATE_DESCRIPTION,

View File

@ -89,7 +89,7 @@ public class ClientMetricsTest {
public void shouldAddStateMetric() { public void shouldAddStateMetric() {
final String name = "state"; final String name = "state";
final String description = "The state of the Kafka Streams client"; final String description = "The state of the Kafka Streams client";
final Gauge<State> stateProvider = (config, now) -> State.RUNNING; final Gauge<String> stateProvider = (config, now) -> State.RUNNING.name();
setUpAndVerifyMutableMetric( setUpAndVerifyMutableMetric(
name, name,
description, description,

View File

@ -435,7 +435,7 @@ public class ThreadMetricsTest {
@Test @Test
public void shouldAddThreadStateJmxMetric() { public void shouldAddThreadStateJmxMetric() {
final Gauge<StreamThread.State> threadStateProvider = (streamsMetrics, startTime) -> StreamThread.State.RUNNING; final Gauge<String> threadStateProvider = (streamsMetrics, startTime) -> StreamThread.State.RUNNING.name();
ThreadMetrics.addThreadStateMetric( ThreadMetrics.addThreadStateMetric(
THREAD_ID, THREAD_ID,
streamsMetrics, streamsMetrics,