KAFKA-10199: Register and unregister changelog topics in state updater (#12638)

Registering and unregistering the changelog topics in the
changelog reader outside of the state updater leads to
race conditions between the stream thread and the state
updater thread. Thus, this PR moves registering and
unregistering of changelog topics in the changelog
reader into the state updater if the state updater
is enabled.

Reviewers: Guozhang Wang <wangguoz@gmail.com>, Hao Li <1127478+lihaosky@users.noreply.github.com>
This commit is contained in:
Bruno Cadonna 2022-09-16 09:05:11 +02:00 committed by GitHub
parent fdcde1fb78
commit a1f3c6d160
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 154 additions and 55 deletions

View File

@ -132,6 +132,11 @@ public abstract class AbstractTask implements Task {
return state;
}
@Override
public ProcessorStateManager stateManager() {
return stateMgr;
}
@Override
public void revive() {
if (state == CLOSED) {

View File

@ -65,6 +65,7 @@ class ActiveTaskCreator {
private final StreamsProducer threadProducer;
private final Map<TaskId, StreamsProducer> taskProducers;
private final ProcessingMode processingMode;
private final boolean stateUpdaterEnabled;
ActiveTaskCreator(final TopologyMetadata topologyMetadata,
final StreamsConfig applicationConfig,
@ -76,7 +77,8 @@ class ActiveTaskCreator {
final KafkaClientSupplier clientSupplier,
final String threadId,
final UUID processId,
final Logger log) {
final Logger log,
final boolean stateUpdaterEnabled) {
this.topologyMetadata = topologyMetadata;
this.applicationConfig = applicationConfig;
this.streamsMetrics = streamsMetrics;
@ -87,6 +89,7 @@ class ActiveTaskCreator {
this.clientSupplier = clientSupplier;
this.threadId = threadId;
this.log = log;
this.stateUpdaterEnabled = stateUpdaterEnabled;
createTaskSensor = ThreadMetrics.createTaskSensor(threadId, streamsMetrics);
processingMode = processingMode(applicationConfig);
@ -154,8 +157,8 @@ class ActiveTaskCreator {
stateDirectory,
storeChangelogReader,
topology.storeToChangelogTopic(),
partitions
);
partitions,
stateUpdaterEnabled);
final InternalProcessorContext<Object, Object> context = new ProcessorContextImpl(
taskId,

View File

@ -17,6 +17,8 @@
package org.apache.kafka.streams.processor.internals;
import java.util.Collection;
import java.util.Set;
import org.apache.kafka.common.TopicPartition;
/**
@ -31,6 +33,8 @@ public interface ChangelogRegister {
*/
void register(final TopicPartition partition, final ProcessorStateManager stateManager);
void register(final Set<TopicPartition> partitions, final ProcessorStateManager stateManager);
/**
* Unregisters and removes the passed in partitions from the set of changelogs
* @param removedPartitions the set of partitions to remove

View File

@ -17,6 +17,7 @@
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
@ -67,7 +68,7 @@ public class DefaultStateUpdater implements StateUpdater {
super(name);
this.changelogReader = changelogReader;
final String logPrefix = String.format("%s ", name);
final String logPrefix = String.format("state-updater [%s] ", name);
final LogContext logContext = new LogContext(logPrefix);
log = logContext.logger(DefaultStateUpdater.class);
}
@ -98,7 +99,7 @@ public class DefaultStateUpdater implements StateUpdater {
while (isRunning.get()) {
try {
runOnce();
} catch (final InterruptedException interruptedException) {
} catch (final InterruptedException | InterruptException interruptedException) {
return;
}
}
@ -262,19 +263,19 @@ public class DefaultStateUpdater implements StateUpdater {
private void addTask(final Task task) {
if (isStateless(task)) {
addToRestoredTasks((StreamTask) task);
log.debug("Stateless active task " + task.id() + " was added to the restored tasks of the state updater");
log.info("Stateless active task " + task.id() + " was added to the restored tasks of the state updater");
} else {
final Task existingTask = updatingTasks.putIfAbsent(task.id(), task);
if (existingTask != null) {
throw new IllegalStateException((existingTask.isActive() ? "Active" : "Standby") + " task " + task.id() + " already exist, " +
"should not try to add another " + (task.isActive() ? "active" : "standby") + " task with the same id. " + BUG_ERROR_MESSAGE);
}
changelogReader.register(task.changelogPartitions(), task.stateManager());
if (task.isActive()) {
log.debug("Stateful active task " + task.id() + " was added to the updating tasks of the state updater");
log.info("Stateful active task " + task.id() + " was added to the state updater");
changelogReader.enforceRestoreActive();
} else {
log.debug("Standby task " + task.id() + " was added to the updating tasks of the state updater");
log.info("Standby task " + task.id() + " was added to the state updater");
if (updatingTasks.size() == 1) {
changelogReader.transitToUpdateStandby();
}
@ -287,19 +288,23 @@ public class DefaultStateUpdater implements StateUpdater {
if (updatingTasks.containsKey(taskId)) {
task = updatingTasks.get(taskId);
task.maybeCheckpoint(true);
final Collection<TopicPartition> changelogPartitions = task.changelogPartitions();
changelogReader.unregister(changelogPartitions);
removedTasks.add(task);
updatingTasks.remove(taskId);
transitToUpdateStandbysIfOnlyStandbysLeft();
log.debug((task.isActive() ? "Active" : "Standby")
log.info((task.isActive() ? "Active" : "Standby")
+ " task " + task.id() + " was removed from the updating tasks and added to the removed tasks.");
} else if (pausedTasks.containsKey(taskId)) {
task = pausedTasks.get(taskId);
final Collection<TopicPartition> changelogPartitions = task.changelogPartitions();
changelogReader.unregister(changelogPartitions);
removedTasks.add(task);
pausedTasks.remove(taskId);
log.debug((task.isActive() ? "Active" : "Standby")
log.info((task.isActive() ? "Active" : "Standby")
+ " task " + task.id() + " was removed from the paused tasks and added to the removed tasks.");
} else {
log.debug("Task " + taskId + " was not removed since it is not updating or paused.");
log.info("Task " + taskId + " was not removed since it is not updating or paused.");
}
}
@ -344,12 +349,13 @@ public class DefaultStateUpdater implements StateUpdater {
private void maybeCompleteRestoration(final StreamTask task,
final Set<TopicPartition> restoredChangelogs) {
final Collection<TopicPartition> taskChangelogPartitions = task.changelogPartitions();
if (restoredChangelogs.containsAll(taskChangelogPartitions)) {
final Collection<TopicPartition> changelogPartitions = task.changelogPartitions();
if (restoredChangelogs.containsAll(changelogPartitions)) {
task.maybeCheckpoint(true);
changelogReader.unregister(changelogPartitions);
addToRestoredTasks(task);
updatingTasks.remove(task.id());
log.debug("Stateful active task " + task.id() + " completed restoration");
log.info("Stateful active task " + task.id() + " completed restoration");
transitToUpdateStandbysIfOnlyStandbysLeft();
}
}
@ -406,17 +412,21 @@ public class DefaultStateUpdater implements StateUpdater {
private StateUpdaterThread stateUpdaterThread = null;
private CountDownLatch shutdownGate;
public DefaultStateUpdater(final StreamsConfig config,
private String name;
public DefaultStateUpdater(final String name,
final StreamsConfig config,
final ChangelogReader changelogReader,
final Time time) {
this.changelogReader = changelogReader;
this.time = time;
this.commitIntervalMs = config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG);
this.name = name;
}
public void start() {
if (stateUpdaterThread == null) {
stateUpdaterThread = new StateUpdaterThread("state-updater", changelogReader);
stateUpdaterThread = new StateUpdaterThread(name, changelogReader);
stateUpdaterThread.start();
shutdownGate = new CountDownLatch(1);

View File

@ -170,6 +170,7 @@ public class ProcessorStateManager implements StateManager {
private final OffsetCheckpoint checkpointFile;
private TaskType taskType;
private final boolean stateUpdaterEnabled;
public static String storeChangelogTopic(final String prefix, final String storeName, final String namedTopology) {
if (namedTopology == null) {
@ -189,7 +190,8 @@ public class ProcessorStateManager implements StateManager {
final StateDirectory stateDirectory,
final ChangelogRegister changelogReader,
final Map<String, String> storeToChangelogTopic,
final Collection<TopicPartition> sourcePartitions) throws ProcessorStateException {
final Collection<TopicPartition> sourcePartitions,
final boolean stateUpdaterEnabled) throws ProcessorStateException {
this.storeToChangelogTopic = storeToChangelogTopic;
this.log = logContext.logger(ProcessorStateManager.class);
this.logPrefix = logContext.logPrefix();
@ -198,6 +200,7 @@ public class ProcessorStateManager implements StateManager {
this.eosEnabled = eosEnabled;
this.changelogReader = changelogReader;
this.sourcePartitions = sourcePartitions;
this.stateUpdaterEnabled = stateUpdaterEnabled;
this.baseDir = stateDirectory.getOrCreateDirectoryForTask(taskId);
this.checkpointFile = new OffsetCheckpoint(stateDirectory.checkpointFileFor(taskId));
@ -209,7 +212,9 @@ public class ProcessorStateManager implements StateManager {
processorContext.uninitialize();
for (final StateStore store : allStores) {
if (stores.containsKey(store.name())) {
maybeRegisterStoreWithChangelogReader(store.name());
if (!stateUpdaterEnabled) {
maybeRegisterStoreWithChangelogReader(store.name());
}
} else {
store.init((StateStoreContext) processorContext, store);
}
@ -352,7 +357,9 @@ public class ProcessorStateManager implements StateManager {
// on the state manager this state store would be closed as well
stores.put(storeName, storeMetadata);
maybeRegisterStoreWithChangelogReader(storeName);
if (!stateUpdaterEnabled) {
maybeRegisterStoreWithChangelogReader(storeName);
}
log.debug("Registered state store {} to its state manager", storeName);
}
@ -536,7 +543,9 @@ public class ProcessorStateManager implements StateManager {
public void close() throws ProcessorStateException {
log.debug("Closing its state manager and all the registered state stores: {}", stores);
changelogReader.unregister(getAllChangelogTopicPartitions());
if (!stateUpdaterEnabled) {
changelogReader.unregister(getAllChangelogTopicPartitions());
}
RuntimeException firstException = null;
// attempting to close the stores, just in case they
@ -575,8 +584,10 @@ public class ProcessorStateManager implements StateManager {
void recycle() {
log.debug("Recycling state for {} task {}.", taskType, taskId);
final List<TopicPartition> allChangelogs = getAllChangelogTopicPartitions();
changelogReader.unregister(allChangelogs);
if (!stateUpdaterEnabled) {
final List<TopicPartition> allChangelogs = getAllChangelogTopicPartitions();
changelogReader.unregister(allChangelogs);
}
}
void transitionTaskType(final TaskType newType) {

View File

@ -217,4 +217,9 @@ public class ReadOnlyTask implements Task {
public Optional<Long> timeCurrentIdlingStarted() {
throw new UnsupportedOperationException("This task is read-only");
}
@Override
public ProcessorStateManager stateManager() {
throw new UnsupportedOperationException("This task is read-only");
}
}

View File

@ -43,6 +43,7 @@ class StandbyTaskCreator {
private final ThreadCache dummyCache;
private final Logger log;
private final Sensor createTaskSensor;
private final boolean stateUpdaterEnabled;
StandbyTaskCreator(final TopologyMetadata topologyMetadata,
final StreamsConfig applicationConfig,
@ -50,13 +51,15 @@ class StandbyTaskCreator {
final StateDirectory stateDirectory,
final ChangelogReader storeChangelogReader,
final String threadId,
final Logger log) {
final Logger log,
final boolean stateUpdaterEnabled) {
this.topologyMetadata = topologyMetadata;
this.applicationConfig = applicationConfig;
this.streamsMetrics = streamsMetrics;
this.stateDirectory = stateDirectory;
this.storeChangelogReader = storeChangelogReader;
this.log = log;
this.stateUpdaterEnabled = stateUpdaterEnabled;
createTaskSensor = ThreadMetrics.createTaskSensor(threadId, streamsMetrics);
@ -85,8 +88,8 @@ class StandbyTaskCreator {
stateDirectory,
storeChangelogReader,
topology.storeToChangelogTopic(),
partitions
);
partitions,
stateUpdaterEnabled);
final InternalProcessorContext<Object, Object> context = new ProcessorContextImpl(
taskId,

View File

@ -28,6 +28,7 @@ import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
@ -347,6 +348,13 @@ public class StoreChangelogReader implements ChangelogReader {
}
}
@Override
public void register(final Set<TopicPartition> changelogPartitions, final ProcessorStateManager stateManager) {
for (final TopicPartition changelogPartition : changelogPartitions) {
register(changelogPartition, stateManager);
}
}
private ChangelogMetadata restoringChangelogByPartition(final TopicPartition partition) {
final ChangelogMetadata changelogMetadata = changelogs.get(partition);
if (changelogMetadata == null) {
@ -444,6 +452,8 @@ public class StoreChangelogReader implements ChangelogReader {
final Set<TaskId> corruptedTasks = new HashSet<>();
e.partitions().forEach(partition -> corruptedTasks.add(changelogs.get(partition).stateManager.taskId()));
throw new TaskCorruptedException(corruptedTasks, e);
} catch (final InterruptException interruptException) {
throw interruptException;
} catch (final KafkaException e) {
throw new StreamsException("Restore consumer get unexpected error polling records.", e);
}

View File

@ -364,6 +364,8 @@ public class StreamThread extends Thread {
final ThreadCache cache = new ThreadCache(logContext, cacheSizeBytes, streamsMetrics);
final boolean stateUpdaterEnabled =
InternalConfig.getBoolean(config.originals(), InternalConfig.STATE_UPDATER_ENABLED, false);
final ActiveTaskCreator activeTaskCreator = new ActiveTaskCreator(
topologyMetadata,
config,
@ -375,8 +377,8 @@ public class StreamThread extends Thread {
clientSupplier,
threadId,
processId,
log
);
log,
stateUpdaterEnabled);
final StandbyTaskCreator standbyTaskCreator = new StandbyTaskCreator(
topologyMetadata,
config,
@ -384,8 +386,8 @@ public class StreamThread extends Thread {
stateDirectory,
changelogReader,
threadId,
log
);
log,
stateUpdaterEnabled);
final TaskManager taskManager = new TaskManager(
time,
@ -398,7 +400,7 @@ public class StreamThread extends Thread {
topologyMetadata,
adminClient,
stateDirectory,
maybeCreateAndStartStateUpdater(config, changelogReader, time)
maybeCreateAndStartStateUpdater(stateUpdaterEnabled, config, changelogReader, time, clientId, threadIdx)
);
referenceContainer.taskManager = taskManager;
@ -441,13 +443,15 @@ public class StreamThread extends Thread {
return streamThread.updateThreadMetadata(getSharedAdminClientId(clientId));
}
private static StateUpdater maybeCreateAndStartStateUpdater(final StreamsConfig streamsConfig,
private static StateUpdater maybeCreateAndStartStateUpdater(final boolean stateUpdaterEnabled,
final StreamsConfig streamsConfig,
final ChangelogReader changelogReader,
final Time time) {
final boolean stateUpdaterEnabled =
InternalConfig.getBoolean(streamsConfig.originals(), InternalConfig.STATE_UPDATER_ENABLED, false);
final Time time,
final String clientId,
final int threadIdx) {
if (stateUpdaterEnabled) {
final StateUpdater stateUpdater = new DefaultStateUpdater(streamsConfig, changelogReader, time);
final String name = clientId + "-StateUpdater-" + threadIdx;
final StateUpdater stateUpdater = new DefaultStateUpdater(name, streamsConfig, changelogReader, time);
stateUpdater.start();
return stateUpdater;
} else {

View File

@ -215,6 +215,8 @@ public interface Task {
State state();
ProcessorStateManager stateManager();
default boolean needsInitializationOrRestoration() {
return state() == State.CREATED || state() == State.RESTORING;
}

View File

@ -169,7 +169,7 @@ class Tasks implements TasksRegistry {
private boolean containsTaskIdWithAction(final TaskId taskId, final Action action) {
final PendingUpdateAction pendingUpdateAction = pendingUpdateActions.get(taskId);
return !(pendingUpdateAction == null || pendingUpdateAction.getAction() != action);
return pendingUpdateAction != null && pendingUpdateAction.getAction() == action;
}
@Override

View File

@ -504,8 +504,8 @@ public class ActiveTaskCreatorTest {
mockClientSupplier,
"clientId-StreamThread-0",
uuid,
new LogContext().logger(ActiveTaskCreator.class)
);
new LogContext().logger(ActiveTaskCreator.class),
false);
assertThat(
activeTaskCreator.createTasks(

View File

@ -87,7 +87,7 @@ class DefaultStateUpdaterTest {
private final Time time = new MockTime(1L);
private final StreamsConfig config = new StreamsConfig(configProps(COMMIT_INTERVAL));
private final ChangelogReader changelogReader = mock(ChangelogReader.class);
private DefaultStateUpdater stateUpdater = new DefaultStateUpdater(config, changelogReader, time);
private DefaultStateUpdater stateUpdater = new DefaultStateUpdater("test-state-updater", config, changelogReader, time);
@AfterEach
public void tearDown() {
@ -147,7 +147,7 @@ class DefaultStateUpdaterTest {
@Test
public void shouldRemoveUpdatingTasksOnShutdown() throws Exception {
stateUpdater.shutdown(Duration.ofMillis(Long.MAX_VALUE));
stateUpdater = new DefaultStateUpdater(new StreamsConfig(configProps(Integer.MAX_VALUE)), changelogReader, time);
stateUpdater = new DefaultStateUpdater("test-state-updater", new StreamsConfig(configProps(Integer.MAX_VALUE)), changelogReader, time);
final StreamTask activeTask = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
final StandbyTask standbyTask = standbyTask(TASK_0_2, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build();
when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
@ -314,7 +314,9 @@ class DefaultStateUpdaterTest {
verifyExceptionsAndFailedTasks();
verifyRemovedTasks();
verifyPausedTasks();
verify(changelogReader, times(1)).enforceRestoreActive();
verify(changelogReader).register(task.changelogPartitions(), task.stateManager());
verify(changelogReader).unregister(task.changelogPartitions());
verify(changelogReader).enforceRestoreActive();
verify(changelogReader, atLeast(3)).restore(anyMap());
verify(changelogReader, never()).transitToUpdateStandby();
}
@ -346,6 +348,12 @@ class DefaultStateUpdaterTest {
verifyExceptionsAndFailedTasks();
verifyRemovedTasks();
verifyPausedTasks();
verify(changelogReader).register(task1.changelogPartitions(), task1.stateManager());
verify(changelogReader).register(task2.changelogPartitions(), task2.stateManager());
verify(changelogReader).register(task3.changelogPartitions(), task3.stateManager());
verify(changelogReader).unregister(task1.changelogPartitions());
verify(changelogReader).unregister(task2.changelogPartitions());
verify(changelogReader).unregister(task3.changelogPartitions());
verify(changelogReader, times(3)).enforceRestoreActive();
verify(changelogReader, atLeast(4)).restore(anyMap());
verify(changelogReader, never()).transitToUpdateStandby();
@ -547,7 +555,10 @@ class DefaultStateUpdaterTest {
verifyExceptionsAndFailedTasks();
verifyRemovedTasks();
verifyPausedTasks();
verify(changelogReader, times(1)).transitToUpdateStandby();
for (final StandbyTask task : tasks) {
verify(changelogReader).register(task.changelogPartitions(), task.stateManager());
}
verify(changelogReader).transitToUpdateStandby();
verify(changelogReader, timeout(VERIFICATION_TIMEOUT).atLeast(1)).restore(anyMap());
verify(changelogReader, never()).enforceRestoreActive();
}
@ -576,10 +587,14 @@ class DefaultStateUpdaterTest {
verifyExceptionsAndFailedTasks();
verifyRemovedTasks();
verifyPausedTasks();
verify(changelogReader).register(task1.changelogPartitions(), task1.stateManager());
verify(changelogReader).register(task2.changelogPartitions(), task2.stateManager());
verify(changelogReader).register(task3.changelogPartitions(), task3.stateManager());
verify(changelogReader).register(task4.changelogPartitions(), task4.stateManager());
verify(changelogReader, atLeast(3)).restore(anyMap());
final InOrder orderVerifier = inOrder(changelogReader, task1, task2);
orderVerifier.verify(changelogReader, times(2)).enforceRestoreActive();
orderVerifier.verify(changelogReader, times(1)).transitToUpdateStandby();
orderVerifier.verify(changelogReader).transitToUpdateStandby();
}
@Test
@ -637,7 +652,7 @@ class DefaultStateUpdaterTest {
verifyExceptionsAndFailedTasks(expectedExceptionAndTasks);
final InOrder orderVerifier = inOrder(changelogReader);
orderVerifier.verify(changelogReader, atLeast(1)).enforceRestoreActive();
orderVerifier.verify(changelogReader, times(1)).transitToUpdateStandby();
orderVerifier.verify(changelogReader).transitToUpdateStandby();
}
@Test
@ -688,6 +703,7 @@ class DefaultStateUpdaterTest {
verifyUpdatingTasks();
verifyPausedTasks();
verifyExceptionsAndFailedTasks();
verify(changelogReader).unregister(task.changelogPartitions());
}
@Test
@ -714,6 +730,8 @@ class DefaultStateUpdaterTest {
verifyCheckpointTasks(true, task1, task2);
verifyUpdatingTasks();
verifyExceptionsAndFailedTasks();
verify(changelogReader).unregister(task1.changelogPartitions());
verify(changelogReader).unregister(task2.changelogPartitions());
}
@Test
@ -1288,7 +1306,7 @@ class DefaultStateUpdaterTest {
public void shouldNotAutoCheckpointTasksIfIntervalNotElapsed() {
// we need to use a non auto-ticking timer here to control how much time elapsed exactly
final Time time = new MockTime();
final DefaultStateUpdater stateUpdater = new DefaultStateUpdater(config, changelogReader, time);
final DefaultStateUpdater stateUpdater = new DefaultStateUpdater("test-state-updater", config, changelogReader, time);
try {
final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
final StreamTask task2 = statefulTask(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();

View File

@ -38,6 +38,13 @@ public class MockChangelogReader implements ChangelogReader {
restoringPartitions.add(partition);
}
@Override
public void register(final Set<TopicPartition> changelogPartitions, final ProcessorStateManager stateManager) {
for (final TopicPartition changelogPartition : changelogPartitions) {
register(changelogPartition, stateManager);
}
}
@Override
public void restore(final Map<TaskId, Task> tasks) {
// do nothing

View File

@ -210,7 +210,8 @@ public class ProcessorStateManagerTest {
mkEntry(persistentStoreTwoName, persistentStoreTwoTopicName),
mkEntry(nonPersistentStoreName, nonPersistentStoreTopicName)
),
mkSet(persistentStorePartition, nonPersistentStorePartition));
mkSet(persistentStorePartition, nonPersistentStorePartition),
false);
assertTrue(stateMgr.changelogAsSource(persistentStorePartition));
assertTrue(stateMgr.changelogAsSource(nonPersistentStorePartition));
@ -229,7 +230,8 @@ public class ProcessorStateManagerTest {
mkEntry(persistentStoreName, persistentStoreTopicName),
mkEntry(persistentStoreTwoName, persistentStoreTopicName)
),
Collections.emptySet());
Collections.emptySet(),
false);
stateMgr.registerStore(persistentStore, persistentStore.stateRestoreCallback, null);
stateMgr.registerStore(persistentStoreTwo, persistentStore.stateRestoreCallback, null);
@ -403,7 +405,8 @@ public class ProcessorStateManagerTest {
stateDirectory,
changelogReader,
emptyMap(),
emptySet());
emptySet(),
false);
try {
stateMgr.registerStore(persistentStore, persistentStore.stateRestoreCallback, null);
@ -675,7 +678,8 @@ public class ProcessorStateManagerTest {
stateDirectory,
changelogReader,
emptyMap(),
emptySet());
emptySet(),
false);
try {
stateMgr.registerStore(persistentStore, persistentStore.stateRestoreCallback, null);
@ -1179,7 +1183,8 @@ public class ProcessorStateManagerTest {
mkEntry(persistentStoreTwoName, persistentStoreTwoTopicName),
mkEntry(nonPersistentStoreName, nonPersistentStoreTopicName)
),
emptySet());
emptySet(),
false);
}
private ProcessorStateManager getStateManager(final Task.TaskType taskType) {

View File

@ -3020,7 +3020,8 @@ public class StreamThreadTest {
stateDirectory,
new MockChangelogReader(),
CLIENT_ID,
log);
log,
false);
return standbyTaskCreator.createTasks(singletonMap(new TaskId(1, 2), emptySet()));
}

View File

@ -416,7 +416,9 @@ public class StreamThreadStateStoreProviderTest {
clientSupplier.adminClient,
clientSupplier.restoreConsumer,
new MockStateRestoreListener()),
topology.storeToChangelogTopic(), partitions);
topology.storeToChangelogTopic(),
partitions,
false);
final RecordCollector recordCollector = new RecordCollectorImpl(
logContext,
taskId,

View File

@ -27,6 +27,7 @@ import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.StandbyTask;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.Task;
@ -318,6 +319,7 @@ public final class StreamsTestUtils {
final Set<TopicPartition> changelogPartitions) {
when(task.changelogPartitions()).thenReturn(changelogPartitions);
when(task.id()).thenReturn(taskId);
when(task.stateManager()).thenReturn(mock(ProcessorStateManager.class));
}
public TaskBuilder<T> inState(final Task.State state) {

View File

@ -489,8 +489,8 @@ public class TopologyTestDriver implements Closeable {
stateDirectory,
new MockChangelogRegister(),
processorTopology.storeToChangelogTopic(),
new HashSet<>(partitionsByInputTopic.values())
);
new HashSet<>(partitionsByInputTopic.values()),
false);
final RecordCollector recordCollector = new RecordCollectorImpl(
logContext,
TASK_ID,
@ -1128,6 +1128,13 @@ public class TopologyTestDriver implements Closeable {
restoringPartitions.add(partition);
}
@Override
public void register(final Set<TopicPartition> changelogPartitions, final ProcessorStateManager stateManager) {
for (final TopicPartition changelogPartition : changelogPartitions) {
register(changelogPartition, stateManager);
}
}
@Override
public void unregister(final Collection<TopicPartition> partitions) {
restoringPartitions.removeAll(partitions);