diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 9cf91f163e2..fcc999e92af 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -176,7 +176,6 @@ public class KafkaStreams implements AutoCloseable { private final long totalCacheSize; private final StreamStateListener streamStateListener; private final DelegatingStateRestoreListener delegatingStateRestoreListener; - private final Map threadState; private final UUID processId; private final KafkaClientSupplier clientSupplier; protected final TopologyMetadata topologyMetadata; @@ -633,17 +632,13 @@ public class KafkaStreams implements AutoCloseable { /** * Class that handles stream thread transitions */ - final class StreamStateListener implements StreamThread.StateListener { + private final class StreamStateListener implements StreamThread.StateListener { private final Map threadState; private GlobalStreamThread.State globalThreadState; - // this lock should always be held before the state lock - private final Object threadStatesLock; - StreamStateListener(final Map threadState, - final GlobalStreamThread.State globalThreadState) { - this.threadState = threadState; + StreamStateListener(final GlobalStreamThread.State globalThreadState) { + this.threadState = new HashMap<>(); this.globalThreadState = globalThreadState; - this.threadStatesLock = new Object(); } /** @@ -675,33 +670,35 @@ public class KafkaStreams implements AutoCloseable { public synchronized void onChange(final Thread thread, final ThreadStateTransitionValidator abstractNewState, final ThreadStateTransitionValidator abstractOldState) { - synchronized (threadStatesLock) { - // StreamThreads first - if (thread instanceof StreamThread) { - final StreamThread.State newState = (StreamThread.State) abstractNewState; - threadState.put(thread.getId(), newState); + // StreamThreads first + if (thread instanceof StreamThread) { + final StreamThread.State newState = (StreamThread.State) abstractNewState; + threadState.put(thread.getId(), newState); - if (newState == StreamThread.State.PARTITIONS_REVOKED || newState == StreamThread.State.PARTITIONS_ASSIGNED) { - setState(State.REBALANCING); - } else if (newState == StreamThread.State.RUNNING) { - maybeSetRunning(); - } - } else if (thread instanceof GlobalStreamThread) { - // global stream thread has different invariants - final GlobalStreamThread.State newState = (GlobalStreamThread.State) abstractNewState; - globalThreadState = newState; + if (newState == StreamThread.State.PARTITIONS_REVOKED || newState == StreamThread.State.PARTITIONS_ASSIGNED) { + setState(State.REBALANCING); + } else if (newState == StreamThread.State.RUNNING) { + maybeSetRunning(); + } + } else if (thread instanceof GlobalStreamThread) { + // global stream thread has different invariants + final GlobalStreamThread.State newState = (GlobalStreamThread.State) abstractNewState; + globalThreadState = newState; - if (newState == GlobalStreamThread.State.RUNNING) { - maybeSetRunning(); - } else if (newState == GlobalStreamThread.State.DEAD) { - if (state != State.PENDING_SHUTDOWN) { - log.error("Global thread has died. The streams application or client will now close to ERROR."); - closeToError(); - } + if (newState == GlobalStreamThread.State.RUNNING) { + maybeSetRunning(); + } else if (newState == GlobalStreamThread.State.DEAD) { + if (state != State.PENDING_SHUTDOWN) { + log.error("Global thread has died. The streams application or client will now close to ERROR."); + closeToError(); } } } } + + private synchronized void registerStreamThread(final StreamThread streamThread) { + threadState.put(streamThread.getId(), streamThread.state()); + } } static final class DelegatingStateRestoreListener implements StateRestoreListener { @@ -1047,8 +1044,7 @@ public class KafkaStreams implements AutoCloseable { globalThreadState = globalStreamThread.state(); } - threadState = new HashMap<>(numStreamThreads); - streamStateListener = new StreamStateListener(threadState, globalThreadState); + streamStateListener = new StreamStateListener(globalThreadState); final GlobalStateStoreProvider globalStateStoreProvider = new GlobalStateStoreProvider(this.topologyMetadata.globalStateStores()); @@ -1084,9 +1080,9 @@ public class KafkaStreams implements AutoCloseable { KafkaStreams.this::closeToError, streamsUncaughtExceptionHandler ); + streamStateListener.registerStreamThread(streamThread); streamThread.setStateListener(streamStateListener); threads.add(streamThread); - threadState.put(streamThread.getId(), streamThread.state()); queryableStoreProvider.addStoreProviderForThread(streamThread.getName(), new StreamThreadStateStoreProvider(streamThread)); return streamThread; }