KAFKA-19434: moved the metric initialization to task assignment to have the correct thread.

This commit is contained in:
Nikita 2025-09-22 17:25:02 -07:00
parent 639492cc0a
commit 23476deddc
No known key found for this signature in database
4 changed files with 16 additions and 6 deletions

View File

@ -281,6 +281,15 @@ public class KafkaStreamsTelemetryIntegrationTest {
streamsApplicationProperties = props(groupProtocol);
final Topology topology = topologyType.equals("simple") ? simpleTopology(false) : complexTopology();
shouldPassMetrics(topology);
INTERCEPTING_CONSUMERS.clear();
INTERCEPTING_ADMIN_CLIENTS.clear();
shouldPassMetrics(topology);
}
private void shouldPassMetrics(Topology topology) throws Exception {
try (final KafkaStreams streams = new KafkaStreams(topology, streamsApplicationProperties)) {
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);

View File

@ -45,8 +45,8 @@ import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetric
*/
public class StandbyTask extends AbstractTask implements Task {
private final boolean eosEnabled;
private final Sensor closeTaskSensor;
private final Sensor updateSensor;
private Sensor closeTaskSensor;
private Sensor updateSensor;
private final StreamsMetricsImpl streamsMetrics;
protected final InternalProcessorContext<?, ?> processorContext;
@ -83,8 +83,6 @@ public class StandbyTask extends AbstractTask implements Task {
this.streamsMetrics = streamsMetrics;
processorContext.transitionToStandby(cache);
closeTaskSensor = ThreadMetrics.closeTaskSensor(Thread.currentThread().getName(), streamsMetrics);
updateSensor = TaskMetrics.updateSensor(Thread.currentThread().getName(), id.toString(), streamsMetrics);
this.eosEnabled = config.eosEnabled;
}
@ -109,6 +107,9 @@ public class StandbyTask extends AbstractTask implements Task {
@Override
public void initializeIfNeeded() {
if (state() == State.CREATED) {
closeTaskSensor = ThreadMetrics.closeTaskSensor(Thread.currentThread().getName(), streamsMetrics);
updateSensor = TaskMetrics.updateSensor(Thread.currentThread().getName(), id.toString(), streamsMetrics);
StateManagerUtil.registerStateStores(log, logPrefix, topology, stateMgr, stateDirectory, processorContext);
// with and without EOS we would check for checkpointing at each commit during running,

View File

@ -251,8 +251,6 @@ public class StateDirectory implements AutoCloseable {
);
try {
task.initializeIfNeeded();
tasksForLocalState.put(id, task);
} catch (final TaskCorruptedException e) {
// Task is corrupt - wipe it out (under EOS) and don't initialize a Standby for it

View File

@ -343,6 +343,8 @@ public class TaskManager {
final TaskId taskId = entry.getKey();
final Task task = stateDirectory.removeStartupTask(taskId);
if (task != null) {
task.initializeIfNeeded();
// replace our dummy values with the real ones, now we know our thread and assignment
final Set<TopicPartition> inputPartitions = entry.getValue();
task.stateManager().assignToStreamThread(new LogContext(threadLogPrefix), changelogReader, inputPartitions);