diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 560da2928f7..3bb6a32fd6d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -952,11 +952,7 @@ public class StreamThread extends Thread implements ProcessingThread { final long pollLatency; taskManager.resumePollingForPartitionsWithAvailableSpace(); - try { - pollLatency = pollPhase(); - } finally { - taskManager.updateLags(); - } + pollLatency = pollPhase(); // Shutdown hook could potentially be triggered and transit the thread state to PENDING_SHUTDOWN during #pollRequests(). // The task manager internal states could be uninitialized if the state transition happens during #onPartitionsAssigned(). @@ -981,6 +977,9 @@ public class StreamThread extends Thread implements ProcessingThread { long totalPunctuateLatency = 0L; if (state == State.RUNNING || (stateUpdaterEnabled && isStartingRunningOrPartitionAssigned())) { + + taskManager.updateLags(); + /* * Within an iteration, after processing up to N (N initialized as 1 upon start up) records for each applicable tasks, check the current time: * 1. If it is time to punctuate, do it; @@ -1101,11 +1100,7 @@ public class StreamThread extends Thread implements ProcessingThread { final long pollLatency; taskManager.resumePollingForPartitionsWithAvailableSpace(); - try { - pollLatency = pollPhase(); - } finally { - taskManager.updateLags(); - } + pollLatency = pollPhase(); // Shutdown hook could potentially be triggered and transit the thread state to PENDING_SHUTDOWN during #pollRequests(). // The task manager internal states could be uninitialized if the state transition happens during #onPartitionsAssigned(). @@ -1119,6 +1114,8 @@ public class StreamThread extends Thread implements ProcessingThread { long totalCommitLatency = 0L; if (isRunning()) { + taskManager.updateLags(); + checkStateUpdater(); taskManager.maybeThrowTaskExceptionsFromProcessingThreads(); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 46cf86008d5..8bdc66ea665 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -3265,6 +3265,8 @@ public class StreamThreadTest { thread = setUpThread(streamsConfigProps); thread.setState(State.STARTING); thread.setState(State.PARTITIONS_ASSIGNED); + thread.updateThreadMetadata("metadata"); + thread.setState(State.RUNNING); runOnce();