diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 2a93275083d..ca1c3d65763 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -194,7 +194,7 @@
+ files="(KafkaStreams|KStreamImpl|KTableImpl|InternalTopologyBuilder|StreamsPartitionAssignor|StreamThread|IQv2StoreIntegrationTest|KStreamImplTest|RocksDBStore|StreamTask).java"/>
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index b3723ef447d..3c6f4975cb8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -185,7 +185,6 @@ public class KafkaStreams implements AutoCloseable {
protected final TopologyMetadata topologyMetadata;
private final QueryableStoreProvider queryableStoreProvider;
private final DelegatingStandbyUpdateListener delegatingStandbyUpdateListener;
- private final LogContext logContext;
GlobalStreamThread globalStreamThread;
protected StateDirectory stateDirectory = null;
@@ -643,9 +642,6 @@ public class KafkaStreams implements AutoCloseable {
return;
}
- // all (alive) threads have received their assignment, close any remaining startup tasks, they're not needed
- stateDirectory.closeStartupTasks();
-
setState(State.RUNNING);
}
@@ -968,7 +964,7 @@ public class KafkaStreams implements AutoCloseable {
} else {
clientId = userClientId;
}
- logContext = new LogContext(String.format("stream-client [%s] ", clientId));
+ final LogContext logContext = new LogContext(String.format("stream-client [%s] ", clientId));
this.log = logContext.logger(getClass());
topologyMetadata.setLog(logContext);
@@ -1422,9 +1418,6 @@ public class KafkaStreams implements AutoCloseable {
*/
public synchronized void start() throws IllegalStateException, StreamsException {
if (setState(State.REBALANCING)) {
- log.debug("Initializing STANDBY tasks for existing local state");
- stateDirectory.initializeStartupTasks(topologyMetadata, streamsMetrics, logContext);
-
log.debug("Starting Streams client");
if (globalStreamThread != null) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/internals/metrics/StreamsThreadMetricsDelegatingReporter.java b/streams/src/main/java/org/apache/kafka/streams/internals/metrics/StreamsThreadMetricsDelegatingReporter.java
index 65b7990dfe0..3f45a257b72 100644
--- a/streams/src/main/java/org/apache/kafka/streams/internals/metrics/StreamsThreadMetricsDelegatingReporter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/internals/metrics/StreamsThreadMetricsDelegatingReporter.java
@@ -57,7 +57,13 @@ public class StreamsThreadMetricsDelegatingReporter implements MetricsReporter {
consumer.registerMetricForSubscription(metric);
}
}
-
+ /*
+ The KafkaMetric object is a singleton shared by all StreamThread instances.
+ So we need to make sure we only pass metrics for the current StreamThread that contains this
+ MetricsReporter instance, which will register metrics with the embedded KafkaConsumer to pass
+ through the telemetry pipeline.
+ Otherwise, Kafka Streams would register multiple metrics for all StreamThreads.
+ */
private boolean tagMatchStreamOrStateUpdaterThreadId(final KafkaMetric metric) {
final Map tags = metric.metricName().tags();
final boolean shouldInclude = tags.containsKey(THREAD_ID_TAG) && (tags.get(THREAD_ID_TAG).equals(threadId) ||
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 3506845d288..7c5cc947c41 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -222,39 +222,6 @@ public class ProcessorStateManager implements StateManager {
log.debug("Created state store manager for task {}", taskId);
}
- /**
- * Special constructor used by {@link StateDirectory} to partially initialize startup tasks for local state, before
- * they're assigned to a thread. When the task is assigned to a thread, the initialization of this StateManager is
- * completed in {@link #assignToStreamThread(LogContext, ChangelogRegister, Collection)}.
- */
- static ProcessorStateManager createStartupTaskStateManager(final TaskId taskId,
- final boolean eosEnabled,
- final LogContext logContext,
- final StateDirectory stateDirectory,
- final Map storeToChangelogTopic,
- final Set sourcePartitions,
- final boolean stateUpdaterEnabled) {
- return new ProcessorStateManager(taskId, TaskType.STANDBY, eosEnabled, logContext, stateDirectory, null, storeToChangelogTopic, sourcePartitions, stateUpdaterEnabled);
- }
-
- /**
- * Standby tasks initialized for local state on-startup are only partially initialized, because they are not yet
- * assigned to a StreamThread. Once assigned to a StreamThread, we complete their initialization here using the
- * assigned StreamThread's context.
- */
- void assignToStreamThread(final LogContext logContext,
- final ChangelogRegister changelogReader,
- final Collection sourcePartitions) {
- if (this.changelogReader != null) {
- throw new IllegalStateException("Attempted to replace an existing changelogReader on a StateManager without closing it.");
- }
- this.sourcePartitions.clear();
- this.log = logContext.logger(ProcessorStateManager.class);
- this.logPrefix = logContext.logPrefix();
- this.changelogReader = changelogReader;
- this.sourcePartitions.addAll(sourcePartitions);
- }
-
void registerStateStores(final List allStores, final InternalProcessorContext, ?> processorContext) {
processorContext.uninitialize();
for (final StateStore store : allStores) {
@@ -347,7 +314,7 @@ public class ProcessorStateManager implements StateManager {
}
private void maybeRegisterStoreWithChangelogReader(final String storeName) {
- if (isLoggingEnabled(storeName) && changelogReader != null) {
+ if (isLoggingEnabled(storeName)) {
changelogReader.register(getStorePartition(storeName), this);
}
}
@@ -616,7 +583,7 @@ public class ProcessorStateManager implements StateManager {
public void close() throws ProcessorStateException {
log.debug("Closing its state manager and all the registered state stores: {}", stores);
- if (!stateUpdaterEnabled && changelogReader != null) {
+ if (!stateUpdaterEnabled) {
changelogReader.unregister(getAllChangelogTopicPartitions());
}
@@ -664,7 +631,7 @@ public class ProcessorStateManager implements StateManager {
void recycle() {
log.debug("Recycling state for {} task {}.", taskType, taskId);
- if (!stateUpdaterEnabled && changelogReader != null) {
+ if (!stateUpdaterEnabled) {
final List allChangelogs = getAllChangelogTopicPartitions();
changelogReader.unregister(allChangelogs);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index a95d20ddae0..070f732f4e0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -16,18 +16,12 @@
*/
package org.apache.kafka.streams.processor.internals;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.errors.TaskCorruptedException;
-import org.apache.kafka.streams.internals.StreamsConfigUtils;
import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.internals.ThreadCache;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
@@ -49,18 +43,13 @@ import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
-import java.util.HashSet;
+import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Predicate;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -106,14 +95,11 @@ public class StateDirectory implements AutoCloseable {
private final boolean hasPersistentStores;
private final boolean hasNamedTopologies;
- private final ConcurrentMap lockedTasksToOwner = new ConcurrentHashMap<>();
+ private final HashMap lockedTasksToOwner = new HashMap<>();
private FileChannel stateDirLockChannel;
private FileLock stateDirLock;
- private final StreamsConfig config;
- private final ConcurrentMap tasksForLocalState = new ConcurrentHashMap<>();
-
/**
* Ensures that the state base directory as well as the application's sub-directory are created.
*
@@ -132,7 +118,6 @@ public class StateDirectory implements AutoCloseable {
this.hasPersistentStores = hasPersistentStores;
this.hasNamedTopologies = hasNamedTopologies;
this.appId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
- this.config = config;
final String stateDirName = config.getString(StreamsConfig.STATE_DIR_CONFIG);
final File baseDir = new File(stateDirName);
stateDir = new File(baseDir, appId);
@@ -197,109 +182,6 @@ public class StateDirectory implements AutoCloseable {
return stateDirLock != null;
}
- public void initializeStartupTasks(final TopologyMetadata topologyMetadata,
- final StreamsMetricsImpl streamsMetrics,
- final LogContext logContext) {
- final List nonEmptyTaskDirectories = listNonEmptyTaskDirectories();
- if (hasPersistentStores && !nonEmptyTaskDirectories.isEmpty()) {
- final ThreadCache dummyCache = new ThreadCache(logContext, 0, streamsMetrics);
- final boolean eosEnabled = StreamsConfigUtils.eosEnabled(config);
- final boolean stateUpdaterEnabled = StreamsConfig.InternalConfig.stateUpdaterEnabled(config.originals());
-
- // discover all non-empty task directories in StateDirectory
- for (final TaskDirectory taskDirectory : nonEmptyTaskDirectories) {
- final String dirName = taskDirectory.file().getName();
- final TaskId id = parseTaskDirectoryName(dirName, taskDirectory.namedTopology());
- final ProcessorTopology subTopology = topologyMetadata.buildSubtopology(id);
-
- // we still check if the task's sub-topology is stateful, even though we know its directory contains state,
- // because it's possible that the topology has changed since that data was written, and is now stateless
- // this therefore prevents us from creating unnecessary Tasks just because of some left-over state
- if (subTopology.hasStateWithChangelogs()) {
- final Set inputPartitions = topologyMetadata.nodeToSourceTopics(id).values().stream()
- .flatMap(Collection::stream)
- .map(t -> new TopicPartition(t, id.partition()))
- .collect(Collectors.toSet());
- final ProcessorStateManager stateManager = ProcessorStateManager.createStartupTaskStateManager(
- id,
- eosEnabled,
- logContext,
- this,
- subTopology.storeToChangelogTopic(),
- inputPartitions,
- stateUpdaterEnabled
- );
-
- final InternalProcessorContext