MINOR: Revert initial transactional state store semantics commit (#19956)

Reverts #16922 and #18732 of incomplete feature.

PRs #16922 and #18732 are part of
[KIP-1035](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1035%3A+StateStore+managed+changelog+offsets).
In particular,
on starting a Kafka Streams instance, if it has pre-existing state, the
state stores are initialized on the main thread.  Part of this
initialization registers the stateful metrics with the JMX thread-id tag
of `main`.  This breaks the KIP-1076 implementation where need to
register metrics with thread-id tags of `xxxStreamThread-N`.  This is
necessary due to the fact that the `StreamsMetric` is a singleton shared
by all `StreamThread` instances, so we need to make sure only add
metrics for the current `StreamThread` otherwise duplicate metrics are
registered. This PR reverts the changes until a fix is implemented,
allowing the individual  `StreamThread`s to register the metrics.

Reviewers: Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
Bill Bejeck 2025-06-24 17:31:25 -04:00 committed by GitHub
parent 75ab2f5d03
commit 019ab2cb11
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 24 additions and 569 deletions

View File

@ -194,7 +194,7 @@
<!-- Streams -->
<suppress checks="ClassFanOutComplexity"
files="(KafkaStreams|KStreamImpl|KTableImpl|InternalTopologyBuilder|StreamsPartitionAssignor|StreamThread|IQv2StoreIntegrationTest|KStreamImplTest|RocksDBStore|StreamTask|TaskManager).java"/>
files="(KafkaStreams|KStreamImpl|KTableImpl|InternalTopologyBuilder|StreamsPartitionAssignor|StreamThread|IQv2StoreIntegrationTest|KStreamImplTest|RocksDBStore|StreamTask).java"/>
<suppress checks="MethodLength"
files="KTableImpl.java"/>

View File

@ -185,7 +185,6 @@ public class KafkaStreams implements AutoCloseable {
protected final TopologyMetadata topologyMetadata;
private final QueryableStoreProvider queryableStoreProvider;
private final DelegatingStandbyUpdateListener delegatingStandbyUpdateListener;
private final LogContext logContext;
GlobalStreamThread globalStreamThread;
protected StateDirectory stateDirectory = null;
@ -643,9 +642,6 @@ public class KafkaStreams implements AutoCloseable {
return;
}
// all (alive) threads have received their assignment, close any remaining startup tasks, they're not needed
stateDirectory.closeStartupTasks();
setState(State.RUNNING);
}
@ -968,7 +964,7 @@ public class KafkaStreams implements AutoCloseable {
} else {
clientId = userClientId;
}
logContext = new LogContext(String.format("stream-client [%s] ", clientId));
final LogContext logContext = new LogContext(String.format("stream-client [%s] ", clientId));
this.log = logContext.logger(getClass());
topologyMetadata.setLog(logContext);
@ -1422,9 +1418,6 @@ public class KafkaStreams implements AutoCloseable {
*/
public synchronized void start() throws IllegalStateException, StreamsException {
if (setState(State.REBALANCING)) {
log.debug("Initializing STANDBY tasks for existing local state");
stateDirectory.initializeStartupTasks(topologyMetadata, streamsMetrics, logContext);
log.debug("Starting Streams client");
if (globalStreamThread != null) {

View File

@ -57,7 +57,13 @@ public class StreamsThreadMetricsDelegatingReporter implements MetricsReporter {
consumer.registerMetricForSubscription(metric);
}
}
/*
The KafkaMetric object is a singleton shared by all StreamThread instances.
So we need to make sure we only pass metrics for the current StreamThread that contains this
MetricsReporter instance, which will register metrics with the embedded KafkaConsumer to pass
through the telemetry pipeline.
Otherwise, Kafka Streams would register multiple metrics for all StreamThreads.
*/
private boolean tagMatchStreamOrStateUpdaterThreadId(final KafkaMetric metric) {
final Map<String, String> tags = metric.metricName().tags();
final boolean shouldInclude = tags.containsKey(THREAD_ID_TAG) && (tags.get(THREAD_ID_TAG).equals(threadId) ||

View File

@ -222,39 +222,6 @@ public class ProcessorStateManager implements StateManager {
log.debug("Created state store manager for task {}", taskId);
}
/**
* Special constructor used by {@link StateDirectory} to partially initialize startup tasks for local state, before
* they're assigned to a thread. When the task is assigned to a thread, the initialization of this StateManager is
* completed in {@link #assignToStreamThread(LogContext, ChangelogRegister, Collection)}.
*/
static ProcessorStateManager createStartupTaskStateManager(final TaskId taskId,
final boolean eosEnabled,
final LogContext logContext,
final StateDirectory stateDirectory,
final Map<String, String> storeToChangelogTopic,
final Set<TopicPartition> sourcePartitions,
final boolean stateUpdaterEnabled) {
return new ProcessorStateManager(taskId, TaskType.STANDBY, eosEnabled, logContext, stateDirectory, null, storeToChangelogTopic, sourcePartitions, stateUpdaterEnabled);
}
/**
* Standby tasks initialized for local state on-startup are only partially initialized, because they are not yet
* assigned to a StreamThread. Once assigned to a StreamThread, we complete their initialization here using the
* assigned StreamThread's context.
*/
void assignToStreamThread(final LogContext logContext,
final ChangelogRegister changelogReader,
final Collection<TopicPartition> sourcePartitions) {
if (this.changelogReader != null) {
throw new IllegalStateException("Attempted to replace an existing changelogReader on a StateManager without closing it.");
}
this.sourcePartitions.clear();
this.log = logContext.logger(ProcessorStateManager.class);
this.logPrefix = logContext.logPrefix();
this.changelogReader = changelogReader;
this.sourcePartitions.addAll(sourcePartitions);
}
void registerStateStores(final List<StateStore> allStores, final InternalProcessorContext<?, ?> processorContext) {
processorContext.uninitialize();
for (final StateStore store : allStores) {
@ -347,7 +314,7 @@ public class ProcessorStateManager implements StateManager {
}
private void maybeRegisterStoreWithChangelogReader(final String storeName) {
if (isLoggingEnabled(storeName) && changelogReader != null) {
if (isLoggingEnabled(storeName)) {
changelogReader.register(getStorePartition(storeName), this);
}
}
@ -616,7 +583,7 @@ public class ProcessorStateManager implements StateManager {
public void close() throws ProcessorStateException {
log.debug("Closing its state manager and all the registered state stores: {}", stores);
if (!stateUpdaterEnabled && changelogReader != null) {
if (!stateUpdaterEnabled) {
changelogReader.unregister(getAllChangelogTopicPartitions());
}
@ -664,7 +631,7 @@ public class ProcessorStateManager implements StateManager {
void recycle() {
log.debug("Recycling state for {} task {}.", taskType, taskId);
if (!stateUpdaterEnabled && changelogReader != null) {
if (!stateUpdaterEnabled) {
final List<TopicPartition> allChangelogs = getAllChangelogTopicPartitions();
changelogReader.unregister(allChangelogs);
}

View File

@ -16,18 +16,12 @@
*/
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.internals.StreamsConfigUtils;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.internals.ThreadCache;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
@ -49,18 +43,13 @@ import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@ -106,14 +95,11 @@ public class StateDirectory implements AutoCloseable {
private final boolean hasPersistentStores;
private final boolean hasNamedTopologies;
private final ConcurrentMap<TaskId, Thread> lockedTasksToOwner = new ConcurrentHashMap<>();
private final HashMap<TaskId, Thread> lockedTasksToOwner = new HashMap<>();
private FileChannel stateDirLockChannel;
private FileLock stateDirLock;
private final StreamsConfig config;
private final ConcurrentMap<TaskId, Task> tasksForLocalState = new ConcurrentHashMap<>();
/**
* Ensures that the state base directory as well as the application's sub-directory are created.
*
@ -132,7 +118,6 @@ public class StateDirectory implements AutoCloseable {
this.hasPersistentStores = hasPersistentStores;
this.hasNamedTopologies = hasNamedTopologies;
this.appId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
this.config = config;
final String stateDirName = config.getString(StreamsConfig.STATE_DIR_CONFIG);
final File baseDir = new File(stateDirName);
stateDir = new File(baseDir, appId);
@ -197,109 +182,6 @@ public class StateDirectory implements AutoCloseable {
return stateDirLock != null;
}
public void initializeStartupTasks(final TopologyMetadata topologyMetadata,
final StreamsMetricsImpl streamsMetrics,
final LogContext logContext) {
final List<TaskDirectory> nonEmptyTaskDirectories = listNonEmptyTaskDirectories();
if (hasPersistentStores && !nonEmptyTaskDirectories.isEmpty()) {
final ThreadCache dummyCache = new ThreadCache(logContext, 0, streamsMetrics);
final boolean eosEnabled = StreamsConfigUtils.eosEnabled(config);
final boolean stateUpdaterEnabled = StreamsConfig.InternalConfig.stateUpdaterEnabled(config.originals());
// discover all non-empty task directories in StateDirectory
for (final TaskDirectory taskDirectory : nonEmptyTaskDirectories) {
final String dirName = taskDirectory.file().getName();
final TaskId id = parseTaskDirectoryName(dirName, taskDirectory.namedTopology());
final ProcessorTopology subTopology = topologyMetadata.buildSubtopology(id);
// we still check if the task's sub-topology is stateful, even though we know its directory contains state,
// because it's possible that the topology has changed since that data was written, and is now stateless
// this therefore prevents us from creating unnecessary Tasks just because of some left-over state
if (subTopology.hasStateWithChangelogs()) {
final Set<TopicPartition> inputPartitions = topologyMetadata.nodeToSourceTopics(id).values().stream()
.flatMap(Collection::stream)
.map(t -> new TopicPartition(t, id.partition()))
.collect(Collectors.toSet());
final ProcessorStateManager stateManager = ProcessorStateManager.createStartupTaskStateManager(
id,
eosEnabled,
logContext,
this,
subTopology.storeToChangelogTopic(),
inputPartitions,
stateUpdaterEnabled
);
final InternalProcessorContext<Object, Object> context = new ProcessorContextImpl(
id,
config,
stateManager,
streamsMetrics,
dummyCache
);
final Task task = new StandbyTask(
id,
inputPartitions,
subTopology,
topologyMetadata.taskConfig(id),
streamsMetrics,
stateManager,
this,
dummyCache,
context
);
try {
task.initializeIfNeeded();
tasksForLocalState.put(id, task);
} catch (final TaskCorruptedException e) {
// Task is corrupt - wipe it out (under EOS) and don't initialize a Standby for it
task.suspend();
task.closeDirty();
}
}
}
}
}
public boolean hasStartupTasks() {
return !tasksForLocalState.isEmpty();
}
public Task removeStartupTask(final TaskId taskId) {
final Task task = tasksForLocalState.remove(taskId);
if (task != null) {
lockedTasksToOwner.replace(taskId, Thread.currentThread());
}
return task;
}
public void closeStartupTasks() {
closeStartupTasks(t -> true);
}
private void closeStartupTasks(final Predicate<Task> predicate) {
if (!tasksForLocalState.isEmpty()) {
// "drain" Tasks first to ensure that we don't try to close Tasks that another thread is attempting to close
final Set<Task> drainedTasks = new HashSet<>(tasksForLocalState.size());
for (final Map.Entry<TaskId, Task> entry : tasksForLocalState.entrySet()) {
if (predicate.test(entry.getValue()) && removeStartupTask(entry.getKey()) != null) {
// only add to our list of drained Tasks if we exclusively "claimed" a Task from tasksForLocalState
// to ensure we don't accidentally try to drain the same Task multiple times from concurrent threads
drainedTasks.add(entry.getValue());
}
}
// now that we have exclusive ownership of the drained tasks, close them
for (final Task task : drainedTasks) {
task.suspend();
task.closeClean();
}
}
}
public UUID initializeProcessId() {
if (!hasPersistentStores) {
final UUID processId = UUID.randomUUID();
@ -496,17 +378,9 @@ public class StateDirectory implements AutoCloseable {
}
}
/**
* Expose for tests.
*/
Thread lockOwner(final TaskId taskId) {
return lockedTasksToOwner.get(taskId);
}
@Override
public void close() {
if (hasPersistentStores) {
closeStartupTasks();
try {
stateDirLock.release();
stateDirLockChannel.close();
@ -624,7 +498,6 @@ public class StateDirectory implements AutoCloseable {
);
if (namedTopologyDirs != null) {
for (final File namedTopologyDir : namedTopologyDirs) {
closeStartupTasks(task -> task.id().topologyName().equals(parseNamedTopologyFromDirectory(namedTopologyDir.getName())));
final File[] contents = namedTopologyDir.listFiles();
if (contents != null && contents.length == 0) {
try {
@ -662,7 +535,6 @@ public class StateDirectory implements AutoCloseable {
log.debug("Tried to clear out the local state for NamedTopology {} but none was found", topologyName);
}
try {
closeStartupTasks(task -> task.id().topologyName().equals(topologyName));
Utils.delete(namedTopologyDir);
} catch (final IOException e) {
log.error("Hit an unexpected error while clearing local state for topology " + topologyName, e);

View File

@ -333,31 +333,6 @@ public class TaskManager {
}
}
private Map<Task, Set<TopicPartition>> assignStartupTasks(final Map<TaskId, Set<TopicPartition>> tasksToAssign,
final String threadLogPrefix,
final TopologyMetadata topologyMetadata,
final ChangelogRegister changelogReader) {
if (stateDirectory.hasStartupTasks()) {
final Map<Task, Set<TopicPartition>> assignedTasks = new HashMap<>(tasksToAssign.size());
for (final Map.Entry<TaskId, Set<TopicPartition>> entry : tasksToAssign.entrySet()) {
final TaskId taskId = entry.getKey();
final Task task = stateDirectory.removeStartupTask(taskId);
if (task != null) {
// replace our dummy values with the real ones, now we know our thread and assignment
final Set<TopicPartition> inputPartitions = entry.getValue();
task.stateManager().assignToStreamThread(new LogContext(threadLogPrefix), changelogReader, inputPartitions);
updateInputPartitionsOfStandbyTaskIfTheyChanged(task, inputPartitions);
assignedTasks.put(task, inputPartitions);
}
}
return assignedTasks;
} else {
return Collections.emptyMap();
}
}
/**
* @throws TaskMigratedException if the task producer got fenced (EOS only)
* @throws StreamsException fatal error while creating / initializing the task
@ -487,15 +462,6 @@ public class TaskManager {
final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate,
final Map<Task, Set<TopicPartition>> tasksToRecycle,
final Set<Task> tasksToCloseClean) {
final Map<Task, Set<TopicPartition>> startupStandbyTasksToRecycle = assignStartupTasks(activeTasksToCreate, logPrefix, topologyMetadata, changelogReader);
final Map<Task, Set<TopicPartition>> startupStandbyTasksToUse = assignStartupTasks(standbyTasksToCreate, logPrefix, topologyMetadata, changelogReader);
// recycle the startup standbys to active
tasks.addStandbyTasks(startupStandbyTasksToRecycle.keySet());
// use startup Standbys as real Standby tasks
tasks.addStandbyTasks(startupStandbyTasksToUse.keySet());
for (final Task task : tasks.allTasks()) {
final TaskId taskId = task.id();
if (activeTasksToCreate.containsKey(taskId)) {
@ -550,7 +516,6 @@ public class TaskManager {
final Set<Task> tasksToCloseClean,
final Map<TaskId, RuntimeException> failedTasks) {
handleTasksPendingInitialization();
handleStartupTaskReuse(activeTasksToCreate, standbyTasksToCreate, failedTasks);
handleRestoringAndUpdatingTasks(activeTasksToCreate, standbyTasksToCreate, failedTasks);
handleRunningAndSuspendedTasks(activeTasksToCreate, standbyTasksToCreate, tasksToRecycle, tasksToCloseClean);
}
@ -568,34 +533,6 @@ public class TaskManager {
}
}
private void handleStartupTaskReuse(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate,
final Map<TaskId, RuntimeException> failedTasks) {
final Map<Task, Set<TopicPartition>> startupStandbyTasksToRecycle = assignStartupTasks(activeTasksToCreate, logPrefix, topologyMetadata, changelogReader);
final Map<Task, Set<TopicPartition>> startupStandbyTasksToUse = assignStartupTasks(standbyTasksToCreate, logPrefix, topologyMetadata, changelogReader);
// recycle the startup standbys to active, and remove them from the set of actives that need to be created
if (!startupStandbyTasksToRecycle.isEmpty()) {
final Set<Task> tasksToCloseDirty = new HashSet<>();
for (final Map.Entry<Task, Set<TopicPartition>> entry : startupStandbyTasksToRecycle.entrySet()) {
final Task task = entry.getKey();
recycleTaskFromStateUpdater(task, entry.getValue(), tasksToCloseDirty, failedTasks);
activeTasksToCreate.remove(task.id());
}
// if any standby tasks failed to recycle, close them dirty
tasksToCloseDirty.forEach(task ->
closeTaskDirty(task, false)
);
}
// use startup Standbys as real Standby tasks
if (!startupStandbyTasksToUse.isEmpty()) {
tasks.addPendingTasksToInit(startupStandbyTasksToUse.keySet());
startupStandbyTasksToUse.keySet().forEach(task -> standbyTasksToCreate.remove(task.id()));
}
}
private void handleRunningAndSuspendedTasks(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate,
final Map<Task, Set<TopicPartition>> tasksToRecycle,

View File

@ -400,44 +400,6 @@ public class KafkaStreamsTest {
}
}
@Test
public void shouldInitializeTasksForLocalStateOnStart() {
prepareStreams();
prepareStreamThread(streamThreadOne, 1);
prepareStreamThread(streamThreadTwo, 2);
try (final MockedConstruction<StateDirectory> constructed = mockConstruction(StateDirectory.class,
(mock, context) -> when(mock.initializeProcessId()).thenReturn(UUID.randomUUID()))) {
try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
assertEquals(1, constructed.constructed().size());
final StateDirectory stateDirectory = constructed.constructed().get(0);
verify(stateDirectory, times(0)).initializeStartupTasks(any(), any(), any());
streams.start();
verify(stateDirectory, times(1)).initializeStartupTasks(any(), any(), any());
}
}
}
@Test
public void shouldCloseStartupTasksAfterFirstRebalance() throws Exception {
prepareStreams();
final AtomicReference<StreamThread.State> state1 = prepareStreamThread(streamThreadOne, 1);
final AtomicReference<StreamThread.State> state2 = prepareStreamThread(streamThreadTwo, 2);
prepareThreadState(streamThreadOne, state1);
prepareThreadState(streamThreadTwo, state2);
try (final MockedConstruction<StateDirectory> constructed = mockConstruction(StateDirectory.class,
(mock, context) -> when(mock.initializeProcessId()).thenReturn(UUID.randomUUID()))) {
try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
assertEquals(1, constructed.constructed().size());
final StateDirectory stateDirectory = constructed.constructed().get(0);
streams.setStateListener(streamsStateListener);
streams.start();
waitForCondition(() -> streams.state() == State.RUNNING, "Streams never started.");
verify(stateDirectory, times(1)).closeStartupTasks();
}
}
}
@Test
public void stateShouldTransitToRunningIfNonDeadThreadsBackToRunning() throws Exception {
prepareStreams();

View File

@ -17,20 +17,14 @@
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyConfig;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.StateDirectory.TaskDirectory;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.test.MockKeyValueStore;
import org.apache.kafka.test.TestUtils;
import com.fasterxml.jackson.annotation.JsonProperty;
@ -39,8 +33,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import java.io.BufferedWriter;
import java.io.File;
@ -79,7 +71,6 @@ import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.endsWith;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.MatcherAssert.assertThat;
@ -96,7 +87,6 @@ public class StateDirectoryTest {
private final MockTime time = new MockTime();
private File stateDir;
private final String applicationId = "applicationId";
private StreamsConfig config;
private StateDirectory directory;
private File appDir;
@ -105,14 +95,15 @@ public class StateDirectoryTest {
if (!createStateDirectory) {
cleanup();
}
config = new StreamsConfig(new Properties() {
directory = new StateDirectory(
new StreamsConfig(new Properties() {
{
put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
put(StreamsConfig.STATE_DIR_CONFIG, stateDir.getPath());
}
});
directory = new StateDirectory(config, time, createStateDirectory, hasNamedTopology);
}),
time, createStateDirectory, hasNamedTopology);
appDir = new File(stateDir, applicationId);
}
@ -821,144 +812,6 @@ public class StateDirectoryTest {
assertThat(directory.initializeProcessId(), equalTo(processId));
}
@Test
public void shouldNotInitializeStandbyTasksWhenNoLocalState() {
final TaskId taskId = new TaskId(0, 0);
initializeStartupTasks(new TaskId(0, 0), false);
assertFalse(directory.hasStartupTasks());
assertNull(directory.removeStartupTask(taskId));
assertFalse(directory.hasStartupTasks());
}
@Test
public void shouldInitializeStandbyTasksForLocalState() {
final TaskId taskId = new TaskId(0, 0);
initializeStartupTasks(new TaskId(0, 0), true);
assertTrue(directory.hasStartupTasks());
assertNotNull(directory.removeStartupTask(taskId));
assertFalse(directory.hasStartupTasks());
assertNull(directory.removeStartupTask(taskId));
}
@Test
public void shouldNotAssignStartupTasksWeDontHave() {
final TaskId taskId = new TaskId(0, 0);
initializeStartupTasks(taskId, false);
final Task task = directory.removeStartupTask(taskId);
assertNull(task);
}
private class FakeStreamThread extends Thread {
private final TaskId taskId;
private final AtomicReference<Task> result;
private FakeStreamThread(final TaskId taskId, final AtomicReference<Task> result) {
this.taskId = taskId;
this.result = result;
}
@Override
public void run() {
result.set(directory.removeStartupTask(taskId));
}
}
@Test
public void shouldAssignStartupTaskToStreamThread() throws InterruptedException {
final TaskId taskId = new TaskId(0, 0);
initializeStartupTasks(taskId, true);
// main thread owns the newly initialized tasks
assertThat(directory.lockOwner(taskId), is(Thread.currentThread()));
// spawn off a "fake" StreamThread, so we can verify the lock was updated to the correct thread
final AtomicReference<Task> result = new AtomicReference<>();
final Thread streamThread = new FakeStreamThread(taskId, result);
streamThread.start();
streamThread.join();
final Task task = result.get();
assertNotNull(task);
assertThat(task, instanceOf(StandbyTask.class));
// verify the owner of the task directory lock has been shifted over to our assigned StreamThread
assertThat(directory.lockOwner(taskId), is(instanceOf(FakeStreamThread.class)));
}
@Test
public void shouldUnlockStartupTasksOnClose() {
final TaskId taskId = new TaskId(0, 0);
initializeStartupTasks(taskId, true);
assertEquals(Thread.currentThread(), directory.lockOwner(taskId));
directory.closeStartupTasks();
assertNull(directory.lockOwner(taskId));
}
@Test
public void shouldCloseStartupTasksOnDirectoryClose() {
final StateStore store = initializeStartupTasks(new TaskId(0, 0), true);
assertTrue(directory.hasStartupTasks());
assertTrue(store.isOpen());
directory.close();
assertFalse(directory.hasStartupTasks());
assertFalse(store.isOpen());
}
@Test
public void shouldNotCloseStartupTasksOnAutoCleanUp() {
// we need to set this because the auto-cleanup uses the last-modified time from the filesystem,
// which can't be mocked
time.setCurrentTimeMs(System.currentTimeMillis());
final StateStore store = initializeStartupTasks(new TaskId(0, 0), true);
assertTrue(directory.hasStartupTasks());
assertTrue(store.isOpen());
time.sleep(10000);
directory.cleanRemovedTasks(1000);
assertTrue(directory.hasStartupTasks());
assertTrue(store.isOpen());
}
private StateStore initializeStartupTasks(final TaskId taskId, final boolean createTaskDir) {
directory.initializeProcessId();
final TopologyMetadata metadata = Mockito.mock(TopologyMetadata.class);
final TopologyConfig topologyConfig = new TopologyConfig(config);
final StateStore store = new MockKeyValueStore("test", true);
if (createTaskDir) {
final File taskDir = directory.getOrCreateDirectoryForTask(taskId);
final File storeDir = new File(taskDir, store.name());
storeDir.mkdir();
}
final ProcessorTopology processorTopology = new ProcessorTopology(
Collections.emptyList(),
Collections.emptyMap(),
Collections.emptyMap(),
Collections.singletonList(store),
Collections.emptyList(),
Collections.singletonMap(store.name(), store.name() + "-changelog"),
Collections.emptySet(),
Collections.emptyMap()
);
Mockito.when(metadata.buildSubtopology(ArgumentMatchers.any())).thenReturn(processorTopology);
Mockito.when(metadata.taskConfig(ArgumentMatchers.any())).thenReturn(topologyConfig.getTaskConfig());
directory.initializeStartupTasks(metadata, new StreamsMetricsImpl(new Metrics(), "test", "processId", time), new LogContext("test"));
return store;
}
private static class FutureStateDirectoryProcessFile {
@JsonProperty

View File

@ -1163,7 +1163,6 @@ public class StreamThreadTest {
new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), mockTime);
final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
topologyMetadata.buildAndRewriteTopology();
stateDirectory = new StateDirectory(config, mockTime, true, false);
final TaskManager taskManager = new TaskManager(
new MockTime(),
@ -1175,7 +1174,7 @@ public class StreamThreadTest {
new Tasks(new LogContext()),
topologyMetadata,
null,
stateDirectory,
null,
stateUpdater,
schedulingTaskManager
) {

View File

@ -61,8 +61,6 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@ -4719,138 +4717,6 @@ public class TaskManagerTest {
assertEquals(taskManager.notPausedTasks().size(), 0);
}
@Test
public void shouldRecycleStartupTasksFromStateDirectoryAsActive() {
final StandbyTask startupTask = standbyTask(taskId00, taskId00ChangelogPartitions).build();
final StreamTask activeTask = statefulTask(taskId00, taskId00ChangelogPartitions).build();
when(activeTaskCreator.createActiveTaskFromStandby(eq(startupTask), eq(taskId00Partitions), any()))
.thenReturn(activeTask);
when(stateDirectory.hasStartupTasks()).thenReturn(true, false);
when(stateDirectory.removeStartupTask(taskId00)).thenReturn(startupTask, (Task) null);
taskManager.handleAssignment(taskId00Assignment, Collections.emptyMap());
// ensure we recycled our existing startup Standby into an Active task
verify(activeTaskCreator).createActiveTaskFromStandby(eq(startupTask), eq(taskId00Partitions), any());
// ensure we didn't construct any new Tasks
verify(activeTaskCreator).createTasks(any(), eq(Collections.emptyMap()));
verify(standbyTaskCreator).createTasks(Collections.emptyMap());
verifyNoMoreInteractions(activeTaskCreator);
verifyNoMoreInteractions(standbyTaskCreator);
// verify the recycled task is now being used as an assigned Active
assertEquals(Collections.singletonMap(taskId00, activeTask), taskManager.activeTaskMap());
assertEquals(Collections.emptyMap(), taskManager.standbyTaskMap());
}
@Test
public void shouldUseStartupTasksFromStateDirectoryAsStandby() {
final StandbyTask startupTask = standbyTask(taskId00, taskId00ChangelogPartitions).build();
when(stateDirectory.hasStartupTasks()).thenReturn(true, true, false);
when(stateDirectory.removeStartupTask(taskId00)).thenReturn(startupTask, (Task) null);
taskManager.handleAssignment(Collections.emptyMap(), taskId00Assignment);
// ensure we used our existing startup Task directly as a Standby
verify(startupTask).resume();
// ensure we didn't construct any new Tasks, or recycle an existing Task; we only used the one we already have
verify(activeTaskCreator).createTasks(any(), eq(Collections.emptyMap()));
verify(standbyTaskCreator).createTasks(Collections.emptyMap());
verifyNoMoreInteractions(activeTaskCreator);
verifyNoMoreInteractions(standbyTaskCreator);
// verify the startup Standby is now being used as an assigned Standby
assertEquals(Collections.emptyMap(), taskManager.activeTaskMap());
assertEquals(Collections.singletonMap(taskId00, startupTask), taskManager.standbyTaskMap());
}
@Test
public void shouldRecycleStartupTasksFromStateDirectoryAsActiveWithStateUpdater() {
final Tasks taskRegistry = new Tasks(new LogContext());
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, taskRegistry, true);
final StandbyTask startupTask = standbyTask(taskId00, taskId00ChangelogPartitions).build();
final StreamTask activeTask = statefulTask(taskId00, taskId00ChangelogPartitions).build();
when(activeTaskCreator.createActiveTaskFromStandby(eq(startupTask), eq(taskId00Partitions), any()))
.thenReturn(activeTask);
when(stateDirectory.hasStartupTasks()).thenReturn(true, false);
when(stateDirectory.removeStartupTask(taskId00)).thenReturn(startupTask, (Task) null);
taskManager.handleAssignment(taskId00Assignment, Collections.emptyMap());
// ensure we used our existing startup Task directly as a Standby
assertTrue(taskRegistry.hasPendingTasksToInit());
assertEquals(Collections.singleton(activeTask), taskRegistry.drainPendingTasksToInit());
// we're using a mock StateUpdater here, so now that we've drained the task from the queue of startup tasks to init
// let's "add" it to our mock StateUpdater
when(stateUpdater.tasks()).thenReturn(Collections.singleton(activeTask));
when(stateUpdater.standbyTasks()).thenReturn(Collections.emptySet());
// ensure we recycled our existing startup Standby into an Active task
verify(activeTaskCreator).createActiveTaskFromStandby(eq(startupTask), eq(taskId00Partitions), any());
// ensure we didn't construct any new Tasks
verify(activeTaskCreator).createTasks(any(), eq(Collections.emptyMap()));
verify(standbyTaskCreator).createTasks(Collections.emptyMap());
verifyNoMoreInteractions(activeTaskCreator);
verifyNoMoreInteractions(standbyTaskCreator);
// verify the recycled task is now being used as an assigned Active
assertEquals(Collections.singletonMap(taskId00, activeTask), taskManager.activeTaskMap());
assertEquals(Collections.emptyMap(), taskManager.standbyTaskMap());
}
@Test
public void shouldUseStartupTasksFromStateDirectoryAsStandbyWithStateUpdater() {
final Tasks taskRegistry = new Tasks(new LogContext());
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, taskRegistry, true);
final StandbyTask startupTask = standbyTask(taskId00, taskId00ChangelogPartitions).build();
when(stateDirectory.hasStartupTasks()).thenReturn(true, true, false);
when(stateDirectory.removeStartupTask(taskId00)).thenReturn(startupTask, (Task) null);
assertFalse(taskRegistry.hasPendingTasksToInit());
taskManager.handleAssignment(Collections.emptyMap(), taskId00Assignment);
// ensure we used our existing startup Task directly as a Standby
assertTrue(taskRegistry.hasPendingTasksToInit());
assertEquals(Collections.singleton(startupTask), taskRegistry.drainPendingTasksToInit());
// we're using a mock StateUpdater here, so now that we've drained the task from the queue of startup tasks to init
// let's "add" it to our mock StateUpdater
when(stateUpdater.tasks()).thenReturn(Collections.singleton(startupTask));
when(stateUpdater.standbyTasks()).thenReturn(Collections.singleton(startupTask));
// ensure we didn't construct any new Tasks, or recycle an existing Task; we only used the one we already have
verify(activeTaskCreator).createTasks(any(), eq(Collections.emptyMap()));
verify(standbyTaskCreator).createTasks(Collections.emptyMap());
verifyNoMoreInteractions(activeTaskCreator);
verifyNoMoreInteractions(standbyTaskCreator);
// verify the startup Standby is now being used as an assigned Standby
assertEquals(Collections.emptyMap(), taskManager.activeTaskMap());
assertEquals(Collections.singletonMap(taskId00, startupTask), taskManager.standbyTaskMap());
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void shouldStartStateUpdaterOnInit(final boolean stateUpdaterEnabled) {
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, stateUpdaterEnabled);
taskManager.init();
if (stateUpdaterEnabled) {
verify(stateUpdater).start();
} else {
verify(stateUpdater, never()).start();
}
}
private static KafkaFutureImpl<DeletedRecords> completedFuture() {
final KafkaFutureImpl<DeletedRecords> futureDeletedRecords = new KafkaFutureImpl<>();
futureDeletedRecords.complete(null);