KAFKA-16955: fix synchronization of streams threadState (#16337)

Each KafkaStreams instance maintains a map from threadId to state
to use to aggregate to a KafkaStreams app state. The map is updated
on every state change, and when a new thread is created. State change
updates are done in a synchronized blocks, however the update that
happens on thread creation is not, which can raise
ConcurrentModificationException. This patch moves this update
into the listener object and protects it using the object's lock.
It also moves ownership of the state map into the listener so that
its less likely that future changes access it without locking

Reviewers: Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
Rohan 2024-06-14 10:44:36 -07:00 committed by GitHub
parent fc6f8b6591
commit 9a239c6142
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 28 additions and 32 deletions

View File

@ -176,7 +176,6 @@ public class KafkaStreams implements AutoCloseable {
private final long totalCacheSize;
private final StreamStateListener streamStateListener;
private final DelegatingStateRestoreListener delegatingStateRestoreListener;
private final Map<Long, StreamThread.State> threadState;
private final UUID processId;
private final KafkaClientSupplier clientSupplier;
protected final TopologyMetadata topologyMetadata;
@ -633,17 +632,13 @@ public class KafkaStreams implements AutoCloseable {
/**
* Class that handles stream thread transitions
*/
final class StreamStateListener implements StreamThread.StateListener {
private final class StreamStateListener implements StreamThread.StateListener {
private final Map<Long, StreamThread.State> threadState;
private GlobalStreamThread.State globalThreadState;
// this lock should always be held before the state lock
private final Object threadStatesLock;
StreamStateListener(final Map<Long, StreamThread.State> threadState,
final GlobalStreamThread.State globalThreadState) {
this.threadState = threadState;
StreamStateListener(final GlobalStreamThread.State globalThreadState) {
this.threadState = new HashMap<>();
this.globalThreadState = globalThreadState;
this.threadStatesLock = new Object();
}
/**
@ -675,33 +670,35 @@ public class KafkaStreams implements AutoCloseable {
public synchronized void onChange(final Thread thread,
final ThreadStateTransitionValidator abstractNewState,
final ThreadStateTransitionValidator abstractOldState) {
synchronized (threadStatesLock) {
// StreamThreads first
if (thread instanceof StreamThread) {
final StreamThread.State newState = (StreamThread.State) abstractNewState;
threadState.put(thread.getId(), newState);
// StreamThreads first
if (thread instanceof StreamThread) {
final StreamThread.State newState = (StreamThread.State) abstractNewState;
threadState.put(thread.getId(), newState);
if (newState == StreamThread.State.PARTITIONS_REVOKED || newState == StreamThread.State.PARTITIONS_ASSIGNED) {
setState(State.REBALANCING);
} else if (newState == StreamThread.State.RUNNING) {
maybeSetRunning();
}
} else if (thread instanceof GlobalStreamThread) {
// global stream thread has different invariants
final GlobalStreamThread.State newState = (GlobalStreamThread.State) abstractNewState;
globalThreadState = newState;
if (newState == StreamThread.State.PARTITIONS_REVOKED || newState == StreamThread.State.PARTITIONS_ASSIGNED) {
setState(State.REBALANCING);
} else if (newState == StreamThread.State.RUNNING) {
maybeSetRunning();
}
} else if (thread instanceof GlobalStreamThread) {
// global stream thread has different invariants
final GlobalStreamThread.State newState = (GlobalStreamThread.State) abstractNewState;
globalThreadState = newState;
if (newState == GlobalStreamThread.State.RUNNING) {
maybeSetRunning();
} else if (newState == GlobalStreamThread.State.DEAD) {
if (state != State.PENDING_SHUTDOWN) {
log.error("Global thread has died. The streams application or client will now close to ERROR.");
closeToError();
}
if (newState == GlobalStreamThread.State.RUNNING) {
maybeSetRunning();
} else if (newState == GlobalStreamThread.State.DEAD) {
if (state != State.PENDING_SHUTDOWN) {
log.error("Global thread has died. The streams application or client will now close to ERROR.");
closeToError();
}
}
}
}
private synchronized void registerStreamThread(final StreamThread streamThread) {
threadState.put(streamThread.getId(), streamThread.state());
}
}
static final class DelegatingStateRestoreListener implements StateRestoreListener {
@ -1047,8 +1044,7 @@ public class KafkaStreams implements AutoCloseable {
globalThreadState = globalStreamThread.state();
}
threadState = new HashMap<>(numStreamThreads);
streamStateListener = new StreamStateListener(threadState, globalThreadState);
streamStateListener = new StreamStateListener(globalThreadState);
final GlobalStateStoreProvider globalStateStoreProvider = new GlobalStateStoreProvider(this.topologyMetadata.globalStateStores());
@ -1084,9 +1080,9 @@ public class KafkaStreams implements AutoCloseable {
KafkaStreams.this::closeToError,
streamsUncaughtExceptionHandler
);
streamStateListener.registerStreamThread(streamThread);
streamThread.setStateListener(streamStateListener);
threads.add(streamThread);
threadState.put(streamThread.getId(), streamThread.state());
queryableStoreProvider.addStoreProviderForThread(streamThread.getName(), new StreamThreadStateStoreProvider(streamThread));
return streamThread;
}