(Cherry-pick) KAFKA-9274: handle TimeoutException on task reset (#10000) (#10372)

This PR was removed by accident in trunk and 2.8, bringing it back.

Co-authored-by: Matthias J. Sax <matthias@confluent.io>
Reviewers: Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
Boyang Chen 2021-03-22 13:39:29 -07:00 committed by GitHub
parent 7c45976c8d
commit 80f373d34f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 224 additions and 167 deletions

View File

@ -44,13 +44,13 @@ public class StandbyTask extends AbstractTask implements Task {
private final StreamsMetricsImpl streamsMetrics;
/**
* @param id the ID of this task
* @param inputPartitions input topic partitions, used for thread metadata only
* @param topology the instance of {@link ProcessorTopology}
* @param config the {@link StreamsConfig} specified by the user
* @param streamsMetrics the {@link StreamsMetrics} created by the thread
* @param stateMgr the {@link ProcessorStateManager} for this task
* @param stateDirectory the {@link StateDirectory} created by the thread
* @param id the ID of this task
* @param inputPartitions input topic partitions, used for thread metadata only
* @param topology the instance of {@link ProcessorTopology}
* @param config the {@link StreamsConfig} specified by the user
* @param streamsMetrics the {@link StreamsMetrics} created by the thread
* @param stateMgr the {@link ProcessorStateManager} for this task
* @param stateDirectory the {@link StateDirectory} created by the thread
*/
StandbyTask(final TaskId id,
final Set<TopicPartition> inputPartitions,
@ -111,7 +111,7 @@ public class StandbyTask extends AbstractTask implements Task {
}
@Override
public void completeRestoration() {
public void completeRestoration(final java.util.function.Consumer<Set<TopicPartition>> offsetResetter) {
throw new IllegalStateException("Standby task " + id + " should never be completing restoration");
}

View File

@ -88,6 +88,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
private final Map<TopicPartition, Long> consumedOffsets;
private final Map<TopicPartition, Long> committedOffsets;
private final Map<TopicPartition, Long> highWatermark;
private final Set<TopicPartition> resetOffsetsForPartitions;
private Optional<Long> timeCurrentIdlingStarted;
private final PunctuationQueue streamTimePunctuationQueue;
private final PunctuationQueue systemTimePunctuationQueue;
@ -171,6 +172,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
// initialize the consumed and committed offset cache
consumedOffsets = new HashMap<>();
resetOffsetsForPartitions = new HashSet<>();
recordQueueCreator = new RecordQueueCreator(this.logContext, config.defaultTimestampExtractor(), config.defaultDeserializationExceptionHandler());
@ -236,17 +238,22 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
}
}
public void addPartitionsForOffsetReset(final Set<TopicPartition> partitionsForOffsetReset) {
mainConsumer.pause(partitionsForOffsetReset);
resetOffsetsForPartitions.addAll(partitionsForOffsetReset);
}
/**
* @throws TimeoutException if fetching committed offsets timed out
*/
@Override
public void completeRestoration() {
public void completeRestoration(final java.util.function.Consumer<Set<TopicPartition>> offsetResetter) {
switch (state()) {
case RUNNING:
return;
case RESTORING:
initializeMetadata();
resetOffsetsIfNeededAndInitializeMetadata(offsetResetter);
initializeTopology();
processorContext.initialize();
@ -837,12 +844,27 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
return checkpointableOffsets;
}
private void initializeMetadata() {
private void resetOffsetsIfNeededAndInitializeMetadata(final java.util.function.Consumer<Set<TopicPartition>> offsetResetter) {
try {
final Map<TopicPartition, OffsetAndMetadata> offsetsAndMetadata = mainConsumer.committed(inputPartitions()).entrySet().stream()
.filter(e -> e.getValue() != null)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
initializeTaskTime(offsetsAndMetadata);
final Map<TopicPartition, OffsetAndMetadata> offsetsAndMetadata = mainConsumer.committed(inputPartitions());
for (final Map.Entry<TopicPartition, OffsetAndMetadata> committedEntry : offsetsAndMetadata.entrySet()) {
if (resetOffsetsForPartitions.contains(committedEntry.getKey())) {
final OffsetAndMetadata offsetAndMetadata = committedEntry.getValue();
if (offsetAndMetadata != null) {
mainConsumer.seek(committedEntry.getKey(), offsetAndMetadata);
resetOffsetsForPartitions.remove(committedEntry.getKey());
}
}
}
offsetResetter.accept(resetOffsetsForPartitions);
resetOffsetsForPartitions.clear();
initializeTaskTime(offsetsAndMetadata.entrySet().stream()
.filter(e -> e.getValue() != null)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))
);
} catch (final TimeoutException timeoutException) {
log.warn(
"Encountered {} while trying to fetch committed offsets, will retry initializing the metadata in the next loop." +

View File

@ -422,8 +422,6 @@ public class StreamThread extends Thread {
cache::resize
);
taskManager.setPartitionResetter(partitions -> streamThread.resetOffsets(partitions, null));
return streamThread.updateThreadMetadata(getSharedAdminClientId(clientId));
}
@ -848,7 +846,7 @@ public class StreamThread extends Thread {
// transit to restore active is idempotent so we can call it multiple times
changelogReader.enforceRestoreActive();
if (taskManager.tryToCompleteRestoration(now)) {
if (taskManager.tryToCompleteRestoration(now, partitions -> resetOffsets(partitions, null))) {
changelogReader.transitToUpdateStandby();
log.info("Restoration took {} ms for all tasks {}", time.milliseconds() - lastPartitionAssignedMs,
taskManager.tasks().keySet());

View File

@ -108,10 +108,14 @@ public interface Task {
*/
void initializeIfNeeded();
default void addPartitionsForOffsetReset(final Set<TopicPartition> partitionsForOffsetReset) {
throw new UnsupportedOperationException();
}
/**
* @throws StreamsException fatal error, should close the thread
*/
void completeRestoration();
void completeRestoration(final java.util.function.Consumer<Set<TopicPartition>> offsetResetter);
void suspend();

View File

@ -90,7 +90,6 @@ public class TaskManager {
// includes assigned & initialized tasks and unassigned tasks we locked temporarily during rebalance
private final Set<TaskId> lockedTaskDirectories = new HashSet<>();
private java.util.function.Consumer<Set<TopicPartition>> resetter;
TaskManager(final Time time,
final ChangelogReader changelogReader,
@ -226,18 +225,7 @@ public class TaskManager {
);
}
mainConsumer.pause(assignedToPauseAndReset);
// TODO: KIP-572 need to handle `TimeoutException`
final Map<TopicPartition, OffsetAndMetadata> committed = mainConsumer.committed(assignedToPauseAndReset);
for (final Map.Entry<TopicPartition, OffsetAndMetadata> committedEntry : committed.entrySet()) {
final OffsetAndMetadata offsetAndMetadata = committedEntry.getValue();
if (offsetAndMetadata != null) {
mainConsumer.seek(committedEntry.getKey(), offsetAndMetadata);
assignedToPauseAndReset.remove(committedEntry.getKey());
}
}
// throws if anything has no configured reset policy
resetter.accept(assignedToPauseAndReset);
task.addPartitionsForOffsetReset(assignedToPauseAndReset);
}
task.revive();
}
@ -418,7 +406,7 @@ public class TaskManager {
* @throws StreamsException if the store's change log does not contain the partition
* @return {@code true} if all tasks are fully restored
*/
boolean tryToCompleteRestoration(final long now) {
boolean tryToCompleteRestoration(final long now, final java.util.function.Consumer<Set<TopicPartition>> offsetResetter) {
boolean allRunning = true;
final List<Task> activeTasks = new LinkedList<>();
@ -449,7 +437,7 @@ public class TaskManager {
for (final Task task : activeTasks) {
if (restored.containsAll(task.changelogPartitions())) {
try {
task.completeRestoration();
task.completeRestoration(offsetResetter);
task.clearTaskTimeout();
} catch (final TimeoutException timeoutException) {
task.maybeInitTaskTimeoutOrThrow(now, timeoutException);
@ -1288,10 +1276,6 @@ public class TaskManager {
return tasks().values().stream().anyMatch(Task::needsInitializationOrRestoration);
}
public void setPartitionResetter(final java.util.function.Consumer<Set<TopicPartition>> resetter) {
this.resetter = resetter;
}
// for testing only
void addTask(final Task task) {
tasks.addTask(task);

View File

@ -167,7 +167,7 @@ public class StandbyTaskTest {
task = createStandbyTask();
assertThrows(LockException.class, task::initializeIfNeeded);
assertThrows(LockException.class, () -> task.initializeIfNeeded());
task = null;
}

View File

@ -82,6 +82,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -276,7 +277,7 @@ public class StreamTaskTest {
task = createStatefulTask(createConfig("100"), false);
assertThrows(LockException.class, task::initializeIfNeeded);
assertThrows(LockException.class, () -> task.initializeIfNeeded());
}
@Test
@ -326,6 +327,69 @@ public class StreamTaskTest {
ctrl.verify();
}
@Test
public void shouldResetOffsetsToLastCommittedForSpecifiedPartitions() {
task = createStatelessTask(createConfig("100"), StreamsConfig.METRICS_LATEST);
task.addPartitionsForOffsetReset(Collections.singleton(partition1));
consumer.seek(partition1, 5L);
consumer.commitSync();
consumer.seek(partition1, 10L);
consumer.seek(partition2, 15L);
final java.util.function.Consumer<Set<TopicPartition>> resetter =
EasyMock.mock(java.util.function.Consumer.class);
resetter.accept(Collections.emptySet());
EasyMock.expectLastCall();
EasyMock.replay(resetter);
task.initializeIfNeeded();
task.completeRestoration(resetter);
assertThat(consumer.position(partition1), equalTo(5L));
assertThat(consumer.position(partition2), equalTo(15L));
}
@Test
public void shouldAutoOffsetResetIfNoCommittedOffsetFound() {
task = createStatelessTask(createConfig("100"), StreamsConfig.METRICS_LATEST);
task.addPartitionsForOffsetReset(Collections.singleton(partition1));
final AtomicReference<AssertionError> shouldNotSeek = new AtomicReference<>();
final MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) {
@Override
public void seek(final TopicPartition partition, final long offset) {
final AssertionError error = shouldNotSeek.get();
if (error != null) {
throw error;
}
super.seek(partition, offset);
}
};
consumer.assign(asList(partition1, partition2));
consumer.updateBeginningOffsets(mkMap(mkEntry(partition1, 0L), mkEntry(partition2, 0L)));
consumer.seek(partition1, 5L);
consumer.seek(partition2, 15L);
shouldNotSeek.set(new AssertionError("Should not seek"));
final java.util.function.Consumer<Set<TopicPartition>> resetter =
EasyMock.mock(java.util.function.Consumer.class);
resetter.accept(Collections.singleton(partition1));
EasyMock.expectLastCall();
EasyMock.replay(resetter);
task.initializeIfNeeded();
task.completeRestoration(resetter);
// because we mocked the `resetter` positions don't change
assertThat(consumer.position(partition1), equalTo(5L));
assertThat(consumer.position(partition2), equalTo(15L));
EasyMock.verify(resetter);
}
@Test
public void shouldReadCommittedStreamTimeOnInitialize() {
stateDirectory = EasyMock.createNiceMock(StateDirectory.class);
@ -339,7 +403,7 @@ public class StreamTaskTest {
assertEquals(RecordQueue.UNKNOWN, task.streamTime());
task.initializeIfNeeded();
task.completeRestoration();
task.completeRestoration(noOpResetter -> { });
assertEquals(10L, task.streamTime());
}
@ -370,7 +434,7 @@ public class StreamTaskTest {
assertEquals(RESTORING, task.state());
task.completeRestoration();
task.completeRestoration(noOpResetter -> { });
assertEquals(RUNNING, task.state());
assertTrue(source1.initialized);
@ -574,7 +638,7 @@ public class StreamTaskTest {
task = createStatelessTaskWithForwardingTopology(evenKeyForwardingSourceNode);
task.initializeIfNeeded();
task.completeRestoration();
task.completeRestoration(noOpResetter -> { });
final String sourceNodeName = evenKeyForwardingSourceNode.name();
final String terminalNodeName = processorStreamTime.name();
@ -890,7 +954,7 @@ public class StreamTaskTest {
public void shouldPunctuateOnceStreamTimeAfterGap() {
task = createStatelessTask(createConfig(), StreamsConfig.METRICS_LATEST);
task.initializeIfNeeded();
task.completeRestoration();
task.completeRestoration(noOpResetter -> { });
task.addRecords(partition1, asList(
getConsumerRecordWithOffsetAsTimestamp(partition1, 20),
@ -975,7 +1039,7 @@ public class StreamTaskTest {
public void shouldRespectPunctuateCancellationStreamTime() {
task = createStatelessTask(createConfig("100"), StreamsConfig.METRICS_LATEST);
task.initializeIfNeeded();
task.completeRestoration();
task.completeRestoration(noOpResetter -> { });
task.addRecords(partition1, asList(
getConsumerRecordWithOffsetAsTimestamp(partition1, 20),
@ -1015,7 +1079,7 @@ public class StreamTaskTest {
public void shouldRespectPunctuateCancellationSystemTime() {
task = createStatelessTask(createConfig("100"), StreamsConfig.METRICS_LATEST);
task.initializeIfNeeded();
task.completeRestoration();
task.completeRestoration(noOpResetter -> { });
final long now = time.milliseconds();
time.sleep(10);
assertTrue(task.maybePunctuateSystemTime());
@ -1029,7 +1093,7 @@ public class StreamTaskTest {
public void shouldRespectCommitNeeded() {
task = createSingleSourceStateless(createConfig(AT_LEAST_ONCE, "0"), StreamsConfig.METRICS_LATEST);
task.initializeIfNeeded();
task.completeRestoration();
task.completeRestoration(noOpResetter -> { });
assertFalse(task.commitNeeded());
@ -1067,7 +1131,7 @@ public class StreamTaskTest {
public void shouldCommitNextOffsetFromQueueIfAvailable() {
task = createSingleSourceStateless(createConfig(AT_LEAST_ONCE, "0"), StreamsConfig.METRICS_LATEST);
task.initializeIfNeeded();
task.completeRestoration();
task.completeRestoration(noOpResetter -> { });
task.addRecords(partition1, asList(
getConsumerRecordWithOffsetAsTimestamp(partition1, 0L),
@ -1086,7 +1150,7 @@ public class StreamTaskTest {
public void shouldCommitConsumerPositionIfRecordQueueIsEmpty() {
task = createStatelessTask(createConfig(), StreamsConfig.METRICS_LATEST);
task.initializeIfNeeded();
task.completeRestoration();
task.completeRestoration(noOpResetter -> { });
consumer.addRecord(getConsumerRecordWithOffsetAsTimestamp(partition1, 0L));
consumer.addRecord(getConsumerRecordWithOffsetAsTimestamp(partition1, 1L));
@ -1120,7 +1184,7 @@ public class StreamTaskTest {
public void shouldRespectCommitRequested() {
task = createStatelessTask(createConfig("100"), StreamsConfig.METRICS_LATEST);
task.initializeIfNeeded();
task.completeRestoration();
task.completeRestoration(noOpResetter -> { });
task.requestCommit();
assertTrue(task.commitRequested());
@ -1152,7 +1216,7 @@ public class StreamTaskTest {
public void shouldBeProcessableIfAllPartitionsBuffered() {
task = createStatelessTask(createConfig("100"), StreamsConfig.METRICS_LATEST);
task.initializeIfNeeded();
task.completeRestoration();
task.completeRestoration(noOpResetter -> { });
assertFalse(task.process(0L));
@ -1170,7 +1234,7 @@ public class StreamTaskTest {
public void shouldPunctuateSystemTimeWhenIntervalElapsed() {
task = createStatelessTask(createConfig("100"), StreamsConfig.METRICS_LATEST);
task.initializeIfNeeded();
task.completeRestoration();
task.completeRestoration(noOpResetter -> { });
final long now = time.milliseconds();
time.sleep(10);
assertTrue(task.maybePunctuateSystemTime());
@ -1190,7 +1254,7 @@ public class StreamTaskTest {
public void shouldNotPunctuateSystemTimeWhenIntervalNotElapsed() {
task = createStatelessTask(createConfig("100"), StreamsConfig.METRICS_LATEST);
task.initializeIfNeeded();
task.completeRestoration();
task.completeRestoration(noOpResetter -> { });
assertFalse(task.maybePunctuateSystemTime());
time.sleep(9);
assertFalse(task.maybePunctuateSystemTime());
@ -1201,7 +1265,7 @@ public class StreamTaskTest {
public void shouldPunctuateOnceSystemTimeAfterGap() {
task = createStatelessTask(createConfig("100"), StreamsConfig.METRICS_LATEST);
task.initializeIfNeeded();
task.completeRestoration();
task.completeRestoration(noOpResetter -> { });
final long now = time.milliseconds();
time.sleep(100);
assertTrue(task.maybePunctuateSystemTime());
@ -1227,7 +1291,7 @@ public class StreamTaskTest {
public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContextWhenPunctuatingStreamTime() {
task = createStatelessTask(createConfig("100"), StreamsConfig.METRICS_LATEST);
task.initializeIfNeeded();
task.completeRestoration();
task.completeRestoration(noOpResetter -> { });
try {
task.punctuate(processorStreamTime, 1, PunctuationType.STREAM_TIME, timestamp -> {
@ -1245,7 +1309,7 @@ public class StreamTaskTest {
public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContextWhenPunctuatingWallClockTimeTime() {
task = createStatelessTask(createConfig("100"), StreamsConfig.METRICS_LATEST);
task.initializeIfNeeded();
task.completeRestoration();
task.completeRestoration(noOpResetter -> { });
try {
task.punctuate(processorSystemTime, 1, PunctuationType.WALL_CLOCK_TIME, timestamp -> {
@ -1263,7 +1327,7 @@ public class StreamTaskTest {
public void shouldNotShareHeadersBetweenPunctuateIterations() {
task = createStatelessTask(createConfig("100"), StreamsConfig.METRICS_LATEST);
task.initializeIfNeeded();
task.completeRestoration();
task.completeRestoration(noOpResetter -> { });
task.punctuate(
processorSystemTime,
@ -1289,7 +1353,7 @@ public class StreamTaskTest {
task = createFaultyStatefulTask(createConfig("100"));
task.initializeIfNeeded();
task.completeRestoration();
task.completeRestoration(noOpResetter -> { });
task.addRecords(partition1, asList(
getConsumerRecordWithOffsetAsTimestamp(partition1, 10),
@ -1317,9 +1381,9 @@ public class StreamTaskTest {
task = createDisconnectedTask(createConfig("100"));
task.initializeIfNeeded();
task.transitionTo(RESTORING);
assertThrows(TimeoutException.class, task::completeRestoration);
assertThrows(TimeoutException.class, () -> task.completeRestoration(noOpResetter -> { }));
}
@Test
@ -1347,7 +1411,7 @@ public class StreamTaskTest {
assertFalse(source1.initialized);
assertFalse(source2.initialized);
task.completeRestoration();
task.completeRestoration(noOpResetter -> { });
assertEquals(RUNNING, task.state());
assertTrue(source1.initialized);
@ -1377,7 +1441,7 @@ public class StreamTaskTest {
task = createStatefulTask(createConfig("100"), true);
task.initializeIfNeeded();
task.completeRestoration();
task.completeRestoration(noOpResetter -> { });
task.prepareCommit();
task.postCommit(true); // should checkpoint
@ -1408,7 +1472,7 @@ public class StreamTaskTest {
task = createStatefulTask(createConfig("100"), true);
task.initializeIfNeeded();
task.completeRestoration();
task.completeRestoration(noOpResetter -> { });
task.prepareCommit();
task.postCommit(true);
@ -1430,7 +1494,7 @@ public class StreamTaskTest {
task = createStatefulTask(createConfig(StreamsConfig.EXACTLY_ONCE, "100"), true);
task.initializeIfNeeded();
task.completeRestoration();
task.completeRestoration(noOpResetter -> { });
task.prepareCommit();
task.postCommit(false);
final File checkpointFile = new File(
@ -1445,7 +1509,7 @@ public class StreamTaskTest {
public void shouldThrowIllegalStateExceptionIfCurrentNodeIsNotNullWhenPunctuateCalled() {
task = createStatelessTask(createConfig("100"), StreamsConfig.METRICS_LATEST);
task.initializeIfNeeded();
task.completeRestoration();
task.completeRestoration(noOpResetter -> { });
task.processorContext().setCurrentNode(processorStreamTime);
try {
task.punctuate(processorStreamTime, 10, PunctuationType.STREAM_TIME, punctuator);
@ -1459,7 +1523,7 @@ public class StreamTaskTest {
public void shouldCallPunctuateOnPassedInProcessorNode() {
task = createStatelessTask(createConfig("100"), StreamsConfig.METRICS_LATEST);
task.initializeIfNeeded();
task.completeRestoration();
task.completeRestoration(noOpResetter -> { });
task.punctuate(processorStreamTime, 5, PunctuationType.STREAM_TIME, punctuator);
assertThat(punctuatedAt, equalTo(5L));
task.punctuate(processorStreamTime, 10, PunctuationType.STREAM_TIME, punctuator);
@ -1470,7 +1534,7 @@ public class StreamTaskTest {
public void shouldSetProcessorNodeOnContextBackToNullAfterSuccessfulPunctuate() {
task = createStatelessTask(createConfig("100"), StreamsConfig.METRICS_LATEST);
task.initializeIfNeeded();
task.completeRestoration();
task.completeRestoration(noOpResetter -> { });
task.punctuate(processorStreamTime, 5, PunctuationType.STREAM_TIME, punctuator);
assertThat(task.processorContext().currentNode(), nullValue());
}
@ -1500,7 +1564,7 @@ public class StreamTaskTest {
task = createFaultyStatefulTask(createConfig("100"));
task.initializeIfNeeded();
task.completeRestoration();
task.completeRestoration(noOpResetter -> { });
assertThrows(RuntimeException.class, () -> task.suspend());
task.closeDirty();
@ -1549,7 +1613,7 @@ public class StreamTaskTest {
logContext);
task.initializeIfNeeded();
task.completeRestoration();
task.completeRestoration(noOpResetter -> { });
task.addRecords(partition1, singletonList(getConsumerRecordWithOffsetAsTimestamp(partition1, 5L)));
task.addRecords(repartition, singletonList(getConsumerRecordWithOffsetAsTimestamp(repartition, 10L)));
@ -1578,9 +1642,9 @@ public class StreamTaskTest {
task = createOptimizedStatefulTask(createConfig("100"), consumer);
task.initializeIfNeeded();
task.transitionTo(RESTORING);
assertThrows(StreamsException.class, task::completeRestoration);
assertThrows(StreamsException.class, () -> task.completeRestoration(noOpResetter -> { }));
}
@Test
@ -1638,7 +1702,7 @@ public class StreamTaskTest {
task = createStatefulTask(createConfig("100"), true);
task.initializeIfNeeded();
task.completeRestoration();
task.completeRestoration(noOpResetter -> { });
task.prepareCommit();
task.postCommit(false);
@ -1659,7 +1723,7 @@ public class StreamTaskTest {
task = createStatefulTask(createConfig("100"), true);
task.initializeIfNeeded();
task.completeRestoration();
task.completeRestoration(noOpResetter -> { });
task.prepareCommit();
task.postCommit(false);
@ -1683,7 +1747,7 @@ public class StreamTaskTest {
task = createStatefulTask(createConfig(), true);
task.initializeIfNeeded();
task.completeRestoration();
task.completeRestoration(noOpResetter -> { });
task.addRecords(partition1, singleton(getConsumerRecordWithOffsetAsTimestamp(partition1, 10)));
task.addRecords(partition2, singleton(getConsumerRecordWithOffsetAsTimestamp(partition2, 10)));
task.process(100L);
@ -1707,7 +1771,7 @@ public class StreamTaskTest {
assertEquals(singletonMap(partition1, 50L), task.changelogOffsets());
task.completeRestoration();
task.completeRestoration(noOpResetter -> { });
assertEquals(singletonMap(partition1, Task.LATEST_OFFSET), task.changelogOffsets());
}
@ -1752,7 +1816,7 @@ public class StreamTaskTest {
task = createOptimizedStatefulTask(createConfig("100"), consumer);
task.initializeIfNeeded();
task.completeRestoration();
task.completeRestoration(noOpResetter -> { });
task.suspend();
task.prepareCommit();
task.postCommit(true);
@ -1779,7 +1843,7 @@ public class StreamTaskTest {
task = createOptimizedStatefulTask(createConfig(), consumer);
task.initializeIfNeeded();
task.completeRestoration();
task.completeRestoration(noOpResetter -> { });
task.addRecords(partition1, singletonList(getConsumerRecordWithOffsetAsTimestamp(partition1, consumedOffset)));
task.process(100L);
@ -1810,7 +1874,7 @@ public class StreamTaskTest {
task = createOptimizedStatefulTask(createConfig("100"), consumer);
task.initializeIfNeeded();
task.completeRestoration();
task.completeRestoration(noOpResetter -> { });
task.addRecords(partition1, singletonList(getConsumerRecordWithOffsetAsTimestamp(partition1, offset)));
task.process(100L);
@ -1852,7 +1916,7 @@ public class StreamTaskTest {
task = createOptimizedStatefulTask(createConfig("100"), consumer);
task.initializeIfNeeded();
task.completeRestoration();
task.completeRestoration(noOpResetter -> { });
// process one record to make commit needed
task.addRecords(partition1, singletonList(getConsumerRecordWithOffsetAsTimestamp(partition1, offset)));
@ -2004,7 +2068,7 @@ public class StreamTaskTest {
public void shouldThrowIfCleanClosingDirtyTask() {
task = createSingleSourceStateless(createConfig(AT_LEAST_ONCE, "0"), StreamsConfig.METRICS_LATEST);
task.initializeIfNeeded();
task.completeRestoration();
task.completeRestoration(noOpResetter -> { });
task.addRecords(partition1, singletonList(getConsumerRecordWithOffsetAsTimestamp(partition1, 0)));
assertTrue(task.process(0L));
@ -2017,7 +2081,7 @@ public class StreamTaskTest {
public void shouldThrowIfRecyclingDirtyTask() {
task = createStatelessTask(createConfig(), StreamsConfig.METRICS_LATEST);
task.initializeIfNeeded();
task.completeRestoration();
task.completeRestoration(noOpResetter -> { });
task.addRecords(partition1, singletonList(getConsumerRecordWithOffsetAsTimestamp(partition1, 0)));
task.addRecords(partition2, singletonList(getConsumerRecordWithOffsetAsTimestamp(partition2, 0)));
@ -2040,7 +2104,7 @@ public class StreamTaskTest {
task.initializeIfNeeded();
assertThrows(IllegalStateException.class, () -> task.closeCleanAndRecycleState()); // RESTORING
task.completeRestoration();
task.completeRestoration(noOpResetter -> { });
assertThrows(IllegalStateException.class, () -> task.closeCleanAndRecycleState()); // RUNNING
task.suspend();
@ -2075,7 +2139,7 @@ public class StreamTaskTest {
EasyMock.replay(stateManager);
task = createFaultyStatefulTask(createConfig("100"));
task.initializeIfNeeded();
task.completeRestoration();
task.completeRestoration(noOpResetter -> { });
assertThat(task.state(), equalTo(RUNNING));
assertThrows(RuntimeException.class, () -> task.suspend());
assertThat(task.state(), equalTo(SUSPENDED));

View File

@ -1739,7 +1739,7 @@ public class StreamThreadTest {
standbyTasks.put(task3, Collections.singleton(t2p1));
thread.taskManager().handleAssignment(emptyMap(), standbyTasks);
thread.taskManager().tryToCompleteRestoration(mockTime.milliseconds());
thread.taskManager().tryToCompleteRestoration(mockTime.milliseconds(), null);
thread.rebalanceListener().onPartitionsAssigned(Collections.emptyList());

View File

@ -468,7 +468,7 @@ public class TaskManagerTest {
replay(activeTaskCreator, standbyTaskCreator, topologyBuilder, consumer, changeLogReader);
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds()), is(true));
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
taskManager.handleRevocation(taskId00Partitions);
@ -547,7 +547,7 @@ public class TaskManagerTest {
replay(activeTaskCreator, standbyTaskCreator, topologyBuilder, consumer, changeLogReader);
taskManager.handleAssignment(taskId00Assignment, taskId01Assignment);
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds()), is(true));
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
assertThat(task01.state(), is(Task.State.RUNNING));
@ -601,7 +601,7 @@ public class TaskManagerTest {
replay(activeTaskCreator, standbyTaskCreator, topologyBuilder, consumer, changeLogReader);
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds()), is(true));
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
taskManager.handleRevocation(taskId00Partitions);
@ -642,18 +642,11 @@ public class TaskManagerTest {
expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))).andStubReturn(singletonList(task00));
topologyBuilder.addSubscribedTopicsFromAssignment(anyObject(), anyString());
expectLastCall().anyTimes();
expect(consumer.assignment()).andReturn(taskId00Partitions);
consumer.pause(taskId00Partitions);
expectLastCall();
final OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(0L);
expect(consumer.committed(taskId00Partitions)).andReturn(singletonMap(t1p0, offsetAndMetadata));
consumer.seek(t1p0, offsetAndMetadata);
expectLastCall();
replay(activeTaskCreator, topologyBuilder, consumer, changeLogReader);
taskManager.setPartitionResetter(tp -> assertThat(tp, is(empty())));
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds()), is(true));
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), tp -> assertThat(tp, is(empty()))), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
task00.setChangelogOffsets(singletonMap(t1p0, 0L));
@ -661,6 +654,7 @@ public class TaskManagerTest {
assertThat(task00.commitPrepared, is(true));
assertThat(task00.state(), is(Task.State.CREATED));
assertThat(task00.partitionsForOffsetReset, equalTo(taskId00Partitions));
assertThat(enforcedCheckpoint.get(), is(true));
assertThat(taskManager.activeTaskMap(), is(singletonMap(taskId00, task00)));
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
@ -688,23 +682,17 @@ public class TaskManagerTest {
topologyBuilder.addSubscribedTopicsFromAssignment(anyObject(), anyString());
expectLastCall().anyTimes();
expect(consumer.assignment()).andReturn(taskId00Partitions);
consumer.pause(taskId00Partitions);
expectLastCall();
final OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(0L);
expect(consumer.committed(taskId00Partitions)).andReturn(singletonMap(t1p0, offsetAndMetadata));
consumer.seek(t1p0, offsetAndMetadata);
expectLastCall();
taskManager.setPartitionResetter(tp -> assertThat(tp, is(empty())));
replay(activeTaskCreator, topologyBuilder, consumer, changeLogReader);
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds()), is(true));
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), tp -> assertThat(tp, is(empty()))), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
task00.setChangelogOffsets(singletonMap(t1p0, 0L));
taskManager.handleCorruption(singleton(taskId00));
assertThat(task00.commitPrepared, is(true));
assertThat(task00.state(), is(Task.State.CREATED));
assertThat(task00.partitionsForOffsetReset, equalTo(taskId00Partitions));
assertThat(taskManager.activeTaskMap(), is(singletonMap(taskId00, task00)));
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
@ -729,20 +717,13 @@ public class TaskManagerTest {
.andStubReturn(asList(corruptedTask, nonCorruptedTask));
topologyBuilder.addSubscribedTopicsFromAssignment(anyObject(), anyString());
expectLastCall().anyTimes();
expectRestoreToBeCompleted(consumer, changeLogReader);
consumer.commitSync(eq(emptyMap()));
expect(consumer.assignment()).andReturn(taskId00Partitions);
consumer.pause(taskId00Partitions);
expectLastCall();
final OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(0L);
expect(consumer.committed(taskId00Partitions)).andReturn(singletonMap(t1p0, offsetAndMetadata));
consumer.seek(t1p0, offsetAndMetadata);
expectLastCall();
replay(activeTaskCreator, topologyBuilder, consumer, changeLogReader);
taskManager.setPartitionResetter(tp -> assertThat(tp, is(empty())));
taskManager.handleAssignment(assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds()), is(true));
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), tp -> assertThat(tp, is(empty()))), is(true));
assertThat(nonCorruptedTask.state(), is(Task.State.RUNNING));
nonCorruptedTask.setCommitNeeded();
@ -751,6 +732,9 @@ public class TaskManagerTest {
taskManager.handleCorruption(singleton(taskId00));
assertTrue(nonCorruptedTask.commitPrepared);
assertThat(nonCorruptedTask.partitionsForOffsetReset, equalTo(Collections.emptySet()));
assertThat(corruptedTask.partitionsForOffsetReset, equalTo(taskId00Partitions));
verify(consumer);
}
@ -773,23 +757,18 @@ public class TaskManagerTest {
.andStubReturn(asList(corruptedTask, nonRunningNonCorruptedTask));
topologyBuilder.addSubscribedTopicsFromAssignment(anyObject(), anyString());
expectLastCall().anyTimes();
expect(consumer.assignment()).andReturn(taskId00Partitions);
consumer.pause(taskId00Partitions);
expectLastCall();
final OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(0L);
expect(consumer.committed(taskId00Partitions)).andReturn(singletonMap(t1p0, offsetAndMetadata));
consumer.seek(t1p0, offsetAndMetadata);
expectLastCall();
replay(activeTaskCreator, topologyBuilder, consumer, changeLogReader);
taskManager.setPartitionResetter(tp -> assertThat(tp, is(empty())));
taskManager.handleAssignment(assignment, emptyMap());
assertThat(nonRunningNonCorruptedTask.state(), is(Task.State.CREATED));
corruptedTask.setChangelogOffsets(singletonMap(t1p0, 0L));
taskManager.handleCorruption(singleton(taskId00));
assertThat(nonRunningNonCorruptedTask.state(), is(Task.State.CREATED));
assertThat(nonRunningNonCorruptedTask.partitionsForOffsetReset, equalTo(Collections.emptySet()));
assertThat(corruptedTask.partitionsForOffsetReset, equalTo(taskId00Partitions));
verify(activeTaskCreator);
assertFalse(nonRunningNonCorruptedTask.commitPrepared);
verify(consumer);
@ -820,7 +799,7 @@ public class TaskManagerTest {
replay(activeTaskCreator, standbyTaskCreator, topologyBuilder, consumer, changeLogReader);
taskManager.handleAssignment(taskId01Assignment, taskId00Assignment);
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds()), is(true));
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
// make sure this will be committed and throw
assertThat(runningNonCorruptedActive.state(), is(Task.State.RUNNING));
@ -848,7 +827,7 @@ public class TaskManagerTest {
replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader);
taskManager.handleAssignment(emptyMap(), taskId00Assignment);
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds()), is(true));
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
taskManager.handleAssignment(emptyMap(), emptyMap());
@ -870,12 +849,12 @@ public class TaskManagerTest {
replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader);
taskManager.handleAssignment(taskId00Assignment, taskId01Assignment);
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds()), is(true));
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
assertThat(task01.state(), is(Task.State.RUNNING));
taskManager.handleAssignment(taskId00Assignment, taskId01Assignment);
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds()), is(true));
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
assertThat(task01.state(), is(Task.State.RUNNING));
@ -894,13 +873,13 @@ public class TaskManagerTest {
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds()), is(true));
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
final Set<TopicPartition> newPartitionsSet = mkSet(t1p1);
final Map<TaskId, Set<TopicPartition>> taskIdSetMap = singletonMap(taskId00, newPartitionsSet);
taskManager.handleAssignment(taskIdSetMap, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds()), is(true));
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
assertEquals(newPartitionsSet, task00.inputPartitions());
verify(activeTaskCreator, consumer, changeLogReader);
@ -925,7 +904,7 @@ public class TaskManagerTest {
assertThat(task00.state(), is(Task.State.CREATED));
taskManager.tryToCompleteRestoration(time.milliseconds());
taskManager.tryToCompleteRestoration(time.milliseconds(), noOpResetter -> { });
assertThat(task00.state(), is(Task.State.RUNNING));
assertThat(taskManager.activeTaskMap(), Matchers.equalTo(singletonMap(taskId00, task00)));
@ -969,7 +948,7 @@ public class TaskManagerTest {
assertThat(task00.state(), is(Task.State.CREATED));
assertThat(task01.state(), is(Task.State.CREATED));
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds()), is(false));
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(false));
assertThat(task00.state(), is(Task.State.CREATED));
assertThat(task01.state(), is(Task.State.CREATED));
@ -988,7 +967,7 @@ public class TaskManagerTest {
);
final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true) {
@Override
public void completeRestoration() {
public void completeRestoration(final java.util.function.Consumer<Set<TopicPartition>> offsetResetter) {
throw new TimeoutException("timeout!");
}
};
@ -1009,7 +988,7 @@ public class TaskManagerTest {
assertThat(task00.state(), is(Task.State.CREATED));
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds()), is(false));
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(false));
assertThat(task00.state(), is(Task.State.RESTORING));
assertThat(
@ -1034,7 +1013,7 @@ public class TaskManagerTest {
replay(activeTaskCreator, consumer, changeLogReader);
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds()), is(true));
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
taskManager.handleRevocation(taskId00Partitions);
@ -1101,7 +1080,7 @@ public class TaskManagerTest {
replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader);
taskManager.handleAssignment(assignmentActive, assignmentStandby);
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds()), is(true));
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
assertThat(task01.state(), is(Task.State.RUNNING));
assertThat(task02.state(), is(Task.State.RUNNING));
@ -1160,7 +1139,7 @@ public class TaskManagerTest {
replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader);
taskManager.handleAssignment(assignmentActive, assignmentStandby);
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds()), is(true));
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
assertThat(task01.state(), is(Task.State.RUNNING));
assertThat(task02.state(), is(Task.State.RUNNING));
@ -1196,7 +1175,7 @@ public class TaskManagerTest {
replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader);
taskManager.handleAssignment(assignmentActive, assignmentStandby);
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds()), is(true));
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
assertThat(task10.state(), is(Task.State.RUNNING));
@ -1226,7 +1205,7 @@ public class TaskManagerTest {
replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader);
taskManager.handleAssignment(assignmentActive, assignmentStandby);
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds()), is(true));
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
assertThat(task10.state(), is(Task.State.RUNNING));
@ -1268,7 +1247,7 @@ public class TaskManagerTest {
replay(activeTaskCreator, consumer, changeLogReader);
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds()), is(true));
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
assertThrows(RuntimeException.class, () -> taskManager.handleRevocation(taskId00Partitions));
@ -1359,7 +1338,7 @@ public class TaskManagerTest {
assertThat(task02.state(), is(Task.State.CREATED));
assertThat(task03.state(), is(Task.State.CREATED));
taskManager.tryToCompleteRestoration(time.milliseconds());
taskManager.tryToCompleteRestoration(time.milliseconds(), null);
assertThat(task00.state(), is(Task.State.RESTORING));
assertThat(task01.state(), is(Task.State.RUNNING));
@ -1428,7 +1407,7 @@ public class TaskManagerTest {
assertThat(task00.state(), is(Task.State.CREATED));
taskManager.tryToCompleteRestoration(time.milliseconds());
taskManager.tryToCompleteRestoration(time.milliseconds(), null);
assertThat(task00.state(), is(Task.State.RESTORING));
assertThat(
@ -1479,7 +1458,7 @@ public class TaskManagerTest {
assertThat(task00.state(), is(Task.State.CREATED));
taskManager.tryToCompleteRestoration(time.milliseconds());
taskManager.tryToCompleteRestoration(time.milliseconds(), null);
assertThat(task00.state(), is(Task.State.RESTORING));
assertThat(
@ -1614,7 +1593,7 @@ public class TaskManagerTest {
assertThat(task01.state(), is(Task.State.CREATED));
assertThat(task02.state(), is(Task.State.CREATED));
taskManager.tryToCompleteRestoration(time.milliseconds());
taskManager.tryToCompleteRestoration(time.milliseconds(), null);
assertThat(task00.state(), is(Task.State.RESTORING));
assertThat(task01.state(), is(Task.State.RUNNING));
@ -1667,7 +1646,7 @@ public class TaskManagerTest {
taskManager.handleAssignment(emptyMap(), assignment);
assertThat(task00.state(), is(Task.State.CREATED));
taskManager.tryToCompleteRestoration(time.milliseconds());
taskManager.tryToCompleteRestoration(time.milliseconds(), null);
assertThat(task00.state(), is(Task.State.RUNNING));
assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
assertThat(taskManager.standbyTaskMap(), Matchers.equalTo(singletonMap(taskId00, task00)));
@ -1689,7 +1668,7 @@ public class TaskManagerTest {
replay(activeTaskCreator, consumer, changeLogReader);
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds()), is(true));
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
assertThat(taskManager.activeTaskMap(), Matchers.equalTo(singletonMap(taskId00, task00)));
@ -1709,7 +1688,7 @@ public class TaskManagerTest {
replay(standbyTaskCreator, consumer, changeLogReader);
taskManager.handleAssignment(emptyMap(), taskId01Assignment);
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds()), is(true));
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(task01.state(), is(Task.State.RUNNING));
assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
@ -1749,7 +1728,7 @@ public class TaskManagerTest {
replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader);
taskManager.handleAssignment(taskId00Assignment, taskId01Assignment);
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds()), is(true));
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
assertThat(task01.state(), is(Task.State.RUNNING));
@ -1793,7 +1772,7 @@ public class TaskManagerTest {
replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader);
taskManager.handleAssignment(assignmentActive, assignmentStandby);
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds()), is(true));
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
assertThat(task01.state(), is(Task.State.RUNNING));
@ -1824,7 +1803,7 @@ public class TaskManagerTest {
replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader);
taskManager.handleAssignment(Collections.emptyMap(), taskId00Assignment);
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds()), is(true));
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
@ -1850,7 +1829,7 @@ public class TaskManagerTest {
replay(activeTaskCreator, standbyTaskCreator, stateDirectory, consumer, changeLogReader);
taskManager.handleAssignment(taskId00Assignment, taskId01Assignment);
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds()), is(true));
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
assertThat(task01.state(), is(Task.State.RUNNING));
@ -1963,7 +1942,7 @@ public class TaskManagerTest {
replay(activeTaskCreator, consumer, changeLogReader);
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds()), is(true));
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
@ -1990,7 +1969,7 @@ public class TaskManagerTest {
replay(standbyTaskCreator, consumer, changeLogReader);
taskManager.handleAssignment(emptyMap(), taskId01Assignment);
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds()), is(true));
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(task01.state(), is(Task.State.RUNNING));
@ -2025,7 +2004,7 @@ public class TaskManagerTest {
replay(activeTaskCreator, consumer, changeLogReader);
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds()), is(true));
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
@ -2061,7 +2040,7 @@ public class TaskManagerTest {
replay(activeTaskCreator, consumer, changeLogReader);
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds()), is(true));
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
@ -2093,7 +2072,7 @@ public class TaskManagerTest {
replay(activeTaskCreator, adminClient, consumer, changeLogReader);
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds()), is(true));
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
@ -2145,7 +2124,7 @@ public class TaskManagerTest {
replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader);
taskManager.handleAssignment(assignmentActive, assignmentStandby);
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds()), is(true));
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
assertThat(task01.state(), is(Task.State.RUNNING));
@ -2185,7 +2164,7 @@ public class TaskManagerTest {
replay(activeTaskCreator, consumer, changeLogReader);
taskManager.handleAssignment(assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds()), is(true));
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
assertThat(task01.state(), is(Task.State.RUNNING));
@ -2299,7 +2278,7 @@ public class TaskManagerTest {
replay(activeTaskCreator, consumer, changeLogReader);
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds()), is(true));
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
@ -2325,7 +2304,7 @@ public class TaskManagerTest {
replay(activeTaskCreator, consumer, changeLogReader);
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds()), is(true));
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
@ -2352,7 +2331,7 @@ public class TaskManagerTest {
replay(activeTaskCreator, consumer, changeLogReader);
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds()), is(true));
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
@ -2375,7 +2354,7 @@ public class TaskManagerTest {
replay(activeTaskCreator, consumer, changeLogReader);
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds()), is(true));
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
@ -2403,7 +2382,7 @@ public class TaskManagerTest {
replay(activeTaskCreator, consumer, changeLogReader);
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds()), is(true));
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
@ -2427,7 +2406,7 @@ public class TaskManagerTest {
replay(activeTaskCreator, changeLogReader, consumer);
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds()), is(false));
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(false));
assertThat(task00.state(), is(Task.State.RESTORING));
// this could be a bit mysterious; we're verifying _no_ interactions on the consumer,
// since the taskManager should _not_ resume the assignment while we're still in RESTORING
@ -2449,7 +2428,7 @@ public class TaskManagerTest {
try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(TaskManager.class)) {
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds()), is(true));
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
taskManager.handleRevocation(mkSet(t1p0, new TopicPartition("unknown", 0)));
@ -2605,7 +2584,7 @@ public class TaskManagerTest {
replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader);
taskManager.handleAssignment(allActiveTasksAssignment, standbyAssignment);
taskManager.tryToCompleteRestoration(time.milliseconds());
taskManager.tryToCompleteRestoration(time.milliseconds(), null);
final Map<TaskId, StateMachineTask> allTasks = new HashMap<>();
@ -2964,6 +2943,7 @@ public class TaskManagerTest {
private Map<TopicPartition, OffsetAndMetadata> committableOffsets = Collections.emptyMap();
private Map<TopicPartition, Long> purgeableOffsets;
private Map<TopicPartition, Long> changelogOffsets = Collections.emptyMap();
private Set<TopicPartition> partitionsForOffsetReset = Collections.emptySet();
private Long timeout = null;
private final Map<TopicPartition, LinkedList<ConsumerRecord<byte[], byte[]>>> queue = new HashMap<>();
@ -2993,7 +2973,12 @@ public class TaskManagerTest {
}
@Override
public void completeRestoration() {
public void addPartitionsForOffsetReset(final Set<TopicPartition> partitionsForOffsetReset) {
this.partitionsForOffsetReset = partitionsForOffsetReset;
}
@Override
public void completeRestoration(final java.util.function.Consumer<Set<TopicPartition>> offsetResetter) {
if (state() == State.RUNNING) {
return;
}

View File

@ -528,7 +528,7 @@ public class TopologyTestDriver implements Closeable {
context,
logContext);
task.initializeIfNeeded();
task.completeRestoration();
task.completeRestoration(noOpResetter -> { });
task.processorContext().setRecordContext(null);
} else {