From 23476deddc0e5412d9ea0f0d43191f0483464365 Mon Sep 17 00:00:00 2001 From: Nikita Date: Mon, 22 Sep 2025 17:25:02 -0700 Subject: [PATCH] KAFKA-19434: moved the metric initialization to task assignment to have the correct thread. --- .../KafkaStreamsTelemetryIntegrationTest.java | 9 +++++++++ .../kafka/streams/processor/internals/StandbyTask.java | 9 +++++---- .../streams/processor/internals/StateDirectory.java | 2 -- .../kafka/streams/processor/internals/TaskManager.java | 2 ++ 4 files changed, 16 insertions(+), 6 deletions(-) diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java index b479446389a..e297ae24249 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java @@ -281,6 +281,15 @@ public class KafkaStreamsTelemetryIntegrationTest { streamsApplicationProperties = props(groupProtocol); final Topology topology = topologyType.equals("simple") ? simpleTopology(false) : complexTopology(); + shouldPassMetrics(topology); + + INTERCEPTING_CONSUMERS.clear(); + INTERCEPTING_ADMIN_CLIENTS.clear(); + + shouldPassMetrics(topology); + } + + private void shouldPassMetrics(Topology topology) throws Exception { try (final KafkaStreams streams = new KafkaStreams(topology, streamsApplicationProperties)) { IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index 4c6e6674bdb..3fc8f1e7d7a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -45,8 +45,8 @@ import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetric */ public class StandbyTask extends AbstractTask implements Task { private final boolean eosEnabled; - private final Sensor closeTaskSensor; - private final Sensor updateSensor; + private Sensor closeTaskSensor; + private Sensor updateSensor; private final StreamsMetricsImpl streamsMetrics; protected final InternalProcessorContext processorContext; @@ -83,8 +83,6 @@ public class StandbyTask extends AbstractTask implements Task { this.streamsMetrics = streamsMetrics; processorContext.transitionToStandby(cache); - closeTaskSensor = ThreadMetrics.closeTaskSensor(Thread.currentThread().getName(), streamsMetrics); - updateSensor = TaskMetrics.updateSensor(Thread.currentThread().getName(), id.toString(), streamsMetrics); this.eosEnabled = config.eosEnabled; } @@ -109,6 +107,9 @@ public class StandbyTask extends AbstractTask implements Task { @Override public void initializeIfNeeded() { if (state() == State.CREATED) { + closeTaskSensor = ThreadMetrics.closeTaskSensor(Thread.currentThread().getName(), streamsMetrics); + updateSensor = TaskMetrics.updateSensor(Thread.currentThread().getName(), id.toString(), streamsMetrics); + StateManagerUtil.registerStateStores(log, logPrefix, topology, stateMgr, stateDirectory, processorContext); // with and without EOS we would check for checkpointing at each commit during running, diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java index a95d20ddae0..d86fe9d3d1b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java @@ -251,8 +251,6 @@ public class StateDirectory implements AutoCloseable { ); try { - task.initializeIfNeeded(); - tasksForLocalState.put(id, task); } catch (final TaskCorruptedException e) { // Task is corrupt - wipe it out (under EOS) and don't initialize a Standby for it diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 67d009b037f..15b27ddcaf2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -343,6 +343,8 @@ public class TaskManager { final TaskId taskId = entry.getKey(); final Task task = stateDirectory.removeStartupTask(taskId); if (task != null) { + task.initializeIfNeeded(); + // replace our dummy values with the real ones, now we know our thread and assignment final Set inputPartitions = entry.getValue(); task.stateManager().assignToStreamThread(new LogContext(threadLogPrefix), changelogReader, inputPartitions);