KAFKA-8285: enable localized thread IDs in Kafka Streams (#6632)

Details in the JIRA: https://issues.apache.org/jira/browse/KAFKA-8285

Basically we want to avoid sharing of atomic updates for thread id with multiple stream instances on one JVM.

Reviewers: Raoul de Haard, Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
This commit is contained in:
Boyang Chen 2019-05-02 17:16:17 -07:00 committed by Guozhang Wang
parent b074173ea2
commit a4f7675db1
3 changed files with 9 additions and 6 deletions

View File

@ -726,7 +726,8 @@ public class KafkaStreams implements AutoCloseable {
streamsMetadataState,
cacheSizePerThread,
stateDirectory,
delegatingStateRestoreListener);
delegatingStateRestoreListener,
i + 1);
threadState.put(threads[i].getId(), threads[i].state());
storeProviders.add(new StreamThreadStateStoreProvider(threads[i]));
}

View File

@ -69,8 +69,6 @@ import static java.util.Collections.singleton;
public class StreamThread extends Thread {
private final static AtomicInteger STREAM_THREAD_ID_SEQUENCE = new AtomicInteger(1);
/**
* Stream thread states are the possible states that a stream thread can be in.
* A thread must only be in one state at a time
@ -600,8 +598,9 @@ public class StreamThread extends Thread {
final StreamsMetadataState streamsMetadataState,
final long cacheSizeBytes,
final StateDirectory stateDirectory,
final StateRestoreListener userStateRestoreListener) {
final String threadClientId = clientId + "-StreamThread-" + STREAM_THREAD_ID_SEQUENCE.getAndIncrement();
final StateRestoreListener userStateRestoreListener,
final int threadIdx) {
final String threadClientId = clientId + "-StreamThread-" + threadIdx;
final String logPrefix = String.format("stream-thread [%s] ", threadClientId);
final LogContext logContext = new LogContext(logPrefix);

View File

@ -104,6 +104,7 @@ public class StreamThreadTest {
private final String clientId = "clientId";
private final String applicationId = "stream-thread-test";
private final int threadIdx = 1;
private final MockTime mockTime = new MockTime();
private final Metrics metrics = new Metrics();
private final MockClientSupplier clientSupplier = new MockClientSupplier();
@ -244,7 +245,8 @@ public class StreamThreadTest {
streamsMetadataState,
0,
stateDirectory,
new MockStateRestoreListener());
new MockStateRestoreListener(),
threadIdx);
}
@Test
@ -278,6 +280,7 @@ public class StreamThreadTest {
final JmxReporter reporter = new JmxReporter("kafka.streams");
metrics.addReporter(reporter);
assertEquals(clientId + "-StreamThread-1", thread.getName());
assertTrue(reporter.containsMbean(String.format("kafka.streams:type=%s,client-id=%s",
defaultGroupName, thread.getName())));
}