mirror of https://github.com/apache/kafka.git
KAFKA-12375: don't reuse thread.id until a thread has fully shut down (#10215)
Always grab a new thread.id and verify that a thread has fully shut down to DEAD before removing from the `threads` list and making that id available again Reviewers: Walker Carlson <wcarlson@confluent.io>, Bruno Cadonna <cadonna@confluent.io>
This commit is contained in:
parent
36d61650f4
commit
23b61ba383
|
@ -120,6 +120,7 @@
|
||||||
<module name="ClassDataAbstractionCoupling">
|
<module name="ClassDataAbstractionCoupling">
|
||||||
<!-- default is 7 -->
|
<!-- default is 7 -->
|
||||||
<property name="max" value="25"/>
|
<property name="max" value="25"/>
|
||||||
|
<property name="excludeClassesRegexps" value="AtomicInteger"/>
|
||||||
</module>
|
</module>
|
||||||
<module name="BooleanExpressionComplexity">
|
<module name="BooleanExpressionComplexity">
|
||||||
<!-- default is 3 -->
|
<!-- default is 3 -->
|
||||||
|
|
|
@ -92,6 +92,7 @@ import java.util.Properties;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
@ -463,9 +464,8 @@ public class KafkaStreams implements AutoCloseable {
|
||||||
closeToError();
|
closeToError();
|
||||||
}
|
}
|
||||||
final StreamThread deadThread = (StreamThread) Thread.currentThread();
|
final StreamThread deadThread = (StreamThread) Thread.currentThread();
|
||||||
threads.remove(deadThread);
|
|
||||||
addStreamThread();
|
|
||||||
deadThread.shutdown();
|
deadThread.shutdown();
|
||||||
|
addStreamThread();
|
||||||
if (throwable instanceof RuntimeException) {
|
if (throwable instanceof RuntimeException) {
|
||||||
throw (RuntimeException) throwable;
|
throw (RuntimeException) throwable;
|
||||||
} else if (throwable instanceof Error) {
|
} else if (throwable instanceof Error) {
|
||||||
|
@ -970,7 +970,7 @@ public class KafkaStreams implements AutoCloseable {
|
||||||
final StreamThread streamThread;
|
final StreamThread streamThread;
|
||||||
synchronized (changeThreadCount) {
|
synchronized (changeThreadCount) {
|
||||||
threadIdx = getNextThreadIndex();
|
threadIdx = getNextThreadIndex();
|
||||||
cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
|
cacheSizePerThread = getCacheSizePerThread(getNumLiveStreamThreads() + 1);
|
||||||
resizeThreadCache(cacheSizePerThread);
|
resizeThreadCache(cacheSizePerThread);
|
||||||
// Creating thread should hold the lock in order to avoid duplicate thread index.
|
// Creating thread should hold the lock in order to avoid duplicate thread index.
|
||||||
// If the duplicate index happen, the metadata of thread may be duplicate too.
|
// If the duplicate index happen, the metadata of thread may be duplicate too.
|
||||||
|
@ -984,7 +984,7 @@ public class KafkaStreams implements AutoCloseable {
|
||||||
} else {
|
} else {
|
||||||
streamThread.shutdown();
|
streamThread.shutdown();
|
||||||
threads.remove(streamThread);
|
threads.remove(streamThread);
|
||||||
resizeThreadCache(getCacheSizePerThread(threads.size()));
|
resizeThreadCache(getCacheSizePerThread(getNumLiveStreamThreads()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1038,7 +1038,7 @@ public class KafkaStreams implements AutoCloseable {
|
||||||
// make a copy of threads to avoid holding lock
|
// make a copy of threads to avoid holding lock
|
||||||
for (final StreamThread streamThread : new ArrayList<>(threads)) {
|
for (final StreamThread streamThread : new ArrayList<>(threads)) {
|
||||||
final boolean callingThreadIsNotCurrentStreamThread = !streamThread.getName().equals(Thread.currentThread().getName());
|
final boolean callingThreadIsNotCurrentStreamThread = !streamThread.getName().equals(Thread.currentThread().getName());
|
||||||
if (streamThread.isAlive() && (callingThreadIsNotCurrentStreamThread || threads.size() == 1)) {
|
if (streamThread.isAlive() && (callingThreadIsNotCurrentStreamThread || getNumLiveStreamThreads() == 1)) {
|
||||||
log.info("Removing StreamThread " + streamThread.getName());
|
log.info("Removing StreamThread " + streamThread.getName());
|
||||||
final Optional<String> groupInstanceID = streamThread.getGroupInstanceID();
|
final Optional<String> groupInstanceID = streamThread.getGroupInstanceID();
|
||||||
streamThread.requestLeaveGroupDuringShutdown();
|
streamThread.requestLeaveGroupDuringShutdown();
|
||||||
|
@ -1047,10 +1047,15 @@ public class KafkaStreams implements AutoCloseable {
|
||||||
if (!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs - begin)) {
|
if (!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs - begin)) {
|
||||||
log.warn("Thread " + streamThread.getName() + " did not shutdown in the allotted time");
|
log.warn("Thread " + streamThread.getName() + " did not shutdown in the allotted time");
|
||||||
timeout = true;
|
timeout = true;
|
||||||
}
|
// Don't remove from threads until shutdown is complete. We will trim it from the
|
||||||
}
|
// list once it reaches DEAD, and if for some reason it's hanging indefinitely in the
|
||||||
|
// shutdown then we should just consider this thread.id to be burned
|
||||||
|
} else {
|
||||||
threads.remove(streamThread);
|
threads.remove(streamThread);
|
||||||
final long cacheSizePerThread = getCacheSizePerThread(threads.size());
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
final long cacheSizePerThread = getCacheSizePerThread(getNumLiveStreamThreads());
|
||||||
resizeThreadCache(cacheSizePerThread);
|
resizeThreadCache(cacheSizePerThread);
|
||||||
if (groupInstanceID.isPresent() && callingThreadIsNotCurrentStreamThread) {
|
if (groupInstanceID.isPresent() && callingThreadIsNotCurrentStreamThread) {
|
||||||
final MemberToRemove memberToRemove = new MemberToRemove(groupInstanceID.get());
|
final MemberToRemove memberToRemove = new MemberToRemove(groupInstanceID.get());
|
||||||
|
@ -1093,18 +1098,52 @@ public class KafkaStreams implements AutoCloseable {
|
||||||
return Optional.empty();
|
return Optional.empty();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Returns the number of threads that are not in the DEAD state -- use this over threads.size()
|
||||||
|
private int getNumLiveStreamThreads() {
|
||||||
|
final AtomicInteger numLiveThreads = new AtomicInteger(0);
|
||||||
|
synchronized (threads) {
|
||||||
|
processStreamThread(thread -> {
|
||||||
|
if (thread.state() == StreamThread.State.DEAD) {
|
||||||
|
threads.remove(thread);
|
||||||
|
} else {
|
||||||
|
numLiveThreads.incrementAndGet();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
return numLiveThreads.get();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private int getNextThreadIndex() {
|
private int getNextThreadIndex() {
|
||||||
final HashSet<String> names = new HashSet<>();
|
final HashSet<String> allLiveThreadNames = new HashSet<>();
|
||||||
processStreamThread(thread -> names.add(thread.getName()));
|
final AtomicInteger maxThreadId = new AtomicInteger(1);
|
||||||
|
synchronized (threads) {
|
||||||
|
processStreamThread(thread -> {
|
||||||
|
// trim any DEAD threads from the list so we can reuse the thread.id
|
||||||
|
// this is only safe to do once the thread has fully completed shutdown
|
||||||
|
if (thread.state() == StreamThread.State.DEAD) {
|
||||||
|
threads.remove(thread);
|
||||||
|
} else {
|
||||||
|
allLiveThreadNames.add(thread.getName());
|
||||||
|
// Assume threads are always named with the "-StreamThread-<threadId>" suffix
|
||||||
|
final int threadId = Integer.parseInt(thread.getName().substring(thread.getName().lastIndexOf("-") + 1));
|
||||||
|
if (threadId > maxThreadId.get()) {
|
||||||
|
maxThreadId.set(threadId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
final String baseName = clientId + "-StreamThread-";
|
final String baseName = clientId + "-StreamThread-";
|
||||||
for (int i = 1; i <= threads.size(); i++) {
|
for (int i = 1; i <= maxThreadId.get(); i++) {
|
||||||
final String name = baseName + i;
|
final String name = baseName + i;
|
||||||
if (!names.contains(name)) {
|
if (!allLiveThreadNames.contains(name)) {
|
||||||
return i;
|
return i;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// It's safe to use threads.size() rather than getNumLiveStreamThreads() to infer the number of threads
|
||||||
|
// here since we trimmed any DEAD threads earlier in this method while holding the lock
|
||||||
return threads.size() + 1;
|
return threads.size() + 1;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private long getCacheSizePerThread(final int numStreamThreads) {
|
private long getCacheSizePerThread(final int numStreamThreads) {
|
||||||
if (numStreamThreads == 0) {
|
if (numStreamThreads == 0) {
|
||||||
|
|
|
@ -232,8 +232,8 @@ public class KafkaStreamsTest {
|
||||||
EasyMock.expect(StreamThread.processingMode(anyObject(StreamsConfig.class))).andReturn(StreamThread.ProcessingMode.AT_LEAST_ONCE).anyTimes();
|
EasyMock.expect(StreamThread.processingMode(anyObject(StreamsConfig.class))).andReturn(StreamThread.ProcessingMode.AT_LEAST_ONCE).anyTimes();
|
||||||
EasyMock.expect(streamThreadOne.getId()).andReturn(0L).anyTimes();
|
EasyMock.expect(streamThreadOne.getId()).andReturn(0L).anyTimes();
|
||||||
EasyMock.expect(streamThreadTwo.getId()).andReturn(1L).anyTimes();
|
EasyMock.expect(streamThreadTwo.getId()).andReturn(1L).anyTimes();
|
||||||
prepareStreamThread(streamThreadOne, true);
|
prepareStreamThread(streamThreadOne, 1, true);
|
||||||
prepareStreamThread(streamThreadTwo, false);
|
prepareStreamThread(streamThreadTwo, 2, false);
|
||||||
|
|
||||||
// setup global threads
|
// setup global threads
|
||||||
final AtomicReference<GlobalStreamThread.State> globalThreadState = new AtomicReference<>(GlobalStreamThread.State.CREATED);
|
final AtomicReference<GlobalStreamThread.State> globalThreadState = new AtomicReference<>(GlobalStreamThread.State.CREATED);
|
||||||
|
@ -293,7 +293,7 @@ public class KafkaStreamsTest {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void prepareStreamThread(final StreamThread thread, final boolean terminable) throws Exception {
|
private void prepareStreamThread(final StreamThread thread, final int threadId, final boolean terminable) throws Exception {
|
||||||
final AtomicReference<StreamThread.State> state = new AtomicReference<>(StreamThread.State.CREATED);
|
final AtomicReference<StreamThread.State> state = new AtomicReference<>(StreamThread.State.CREATED);
|
||||||
EasyMock.expect(thread.state()).andAnswer(state::get).anyTimes();
|
EasyMock.expect(thread.state()).andAnswer(state::get).anyTimes();
|
||||||
|
|
||||||
|
@ -321,7 +321,7 @@ public class KafkaStreamsTest {
|
||||||
}).anyTimes();
|
}).anyTimes();
|
||||||
EasyMock.expect(thread.getGroupInstanceID()).andStubReturn(Optional.empty());
|
EasyMock.expect(thread.getGroupInstanceID()).andStubReturn(Optional.empty());
|
||||||
EasyMock.expect(thread.threadMetadata()).andReturn(new ThreadMetadata(
|
EasyMock.expect(thread.threadMetadata()).andReturn(new ThreadMetadata(
|
||||||
"newThead",
|
"processId-StreamThread-" + threadId,
|
||||||
"DEAD",
|
"DEAD",
|
||||||
"",
|
"",
|
||||||
"",
|
"",
|
||||||
|
@ -337,7 +337,7 @@ public class KafkaStreamsTest {
|
||||||
EasyMock.expectLastCall().anyTimes();
|
EasyMock.expectLastCall().anyTimes();
|
||||||
thread.requestLeaveGroupDuringShutdown();
|
thread.requestLeaveGroupDuringShutdown();
|
||||||
EasyMock.expectLastCall().anyTimes();
|
EasyMock.expectLastCall().anyTimes();
|
||||||
EasyMock.expect(thread.getName()).andStubReturn("newThread");
|
EasyMock.expect(thread.getName()).andStubReturn("processId-StreamThread-" + threadId);
|
||||||
thread.shutdown();
|
thread.shutdown();
|
||||||
EasyMock.expectLastCall().andAnswer(() -> {
|
EasyMock.expectLastCall().andAnswer(() -> {
|
||||||
supplier.consumer.close();
|
supplier.consumer.close();
|
||||||
|
@ -564,7 +564,7 @@ public class KafkaStreamsTest {
|
||||||
streams.start();
|
streams.start();
|
||||||
final int oldSize = streams.threads.size();
|
final int oldSize = streams.threads.size();
|
||||||
TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, 15L, "wait until running");
|
TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, 15L, "wait until running");
|
||||||
assertThat(streams.addStreamThread(), equalTo(Optional.of("newThread")));
|
assertThat(streams.addStreamThread(), equalTo(Optional.of("processId-StreamThread-" + 2)));
|
||||||
assertThat(streams.threads.size(), equalTo(oldSize + 1));
|
assertThat(streams.threads.size(), equalTo(oldSize + 1));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -613,7 +613,7 @@ public class KafkaStreamsTest {
|
||||||
final int oldSize = streams.threads.size();
|
final int oldSize = streams.threads.size();
|
||||||
TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, 15L,
|
TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, 15L,
|
||||||
"Kafka Streams client did not reach state RUNNING");
|
"Kafka Streams client did not reach state RUNNING");
|
||||||
assertThat(streams.removeStreamThread(), equalTo(Optional.of("newThread")));
|
assertThat(streams.removeStreamThread(), equalTo(Optional.of("processId-StreamThread-" + 1)));
|
||||||
assertThat(streams.threads.size(), equalTo(oldSize - 1));
|
assertThat(streams.threads.size(), equalTo(oldSize - 1));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue