KAFKA-14133: Migrate various mocks in TaskManagerTest to Mockito (#13874)

Reviewers: Divij Vaidya <diviv@amazon.com>
This commit is contained in:
Christo Lolov 2023-07-20 17:16:18 +01:00 committed by GitHub
parent 01a16ca301
commit 8f313eaed4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 41 additions and 56 deletions

View File

@ -2014,10 +2014,7 @@ public class TaskManagerTest {
@Test
public void shouldReviveCorruptTasks() {
final ProcessorStateManager stateManager = EasyMock.createStrictMock(ProcessorStateManager.class);
stateManager.markChangelogAsCorrupted(taskId00Partitions);
EasyMock.expectLastCall().once();
replay(stateManager);
final ProcessorStateManager stateManager = Mockito.mock(ProcessorStateManager.class);
final AtomicBoolean enforcedCheckpoint = new AtomicBoolean(false);
final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) {
@ -2050,15 +2047,13 @@ public class TaskManagerTest {
assertThat(taskManager.activeTaskMap(), is(singletonMap(taskId00, task00)));
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
verify(stateManager);
verify(consumer);
Mockito.verify(stateManager).markChangelogAsCorrupted(taskId00Partitions);
}
@Test
public void shouldReviveCorruptTasksEvenIfTheyCannotCloseClean() {
final ProcessorStateManager stateManager = EasyMock.createStrictMock(ProcessorStateManager.class);
stateManager.markChangelogAsCorrupted(taskId00Partitions);
replay(stateManager);
final ProcessorStateManager stateManager = Mockito.mock(ProcessorStateManager.class);
final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) {
@Override
@ -2085,15 +2080,13 @@ public class TaskManagerTest {
assertThat(taskManager.activeTaskMap(), is(singletonMap(taskId00, task00)));
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
verify(stateManager);
verify(consumer);
Mockito.verify(stateManager).markChangelogAsCorrupted(taskId00Partitions);
}
@Test
public void shouldCommitNonCorruptedTasksOnTaskCorruptedException() {
final ProcessorStateManager stateManager = EasyMock.createStrictMock(ProcessorStateManager.class);
stateManager.markChangelogAsCorrupted(taskId00Partitions);
replay(stateManager);
final ProcessorStateManager stateManager = Mockito.mock(ProcessorStateManager.class);
final StateMachineTask corruptedTask = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
final StateMachineTask nonCorruptedTask = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager);
@ -2125,13 +2118,12 @@ public class TaskManagerTest {
assertThat(corruptedTask.partitionsForOffsetReset, equalTo(taskId00Partitions));
verify(consumer);
Mockito.verify(stateManager).markChangelogAsCorrupted(taskId00Partitions);
}
@Test
public void shouldNotCommitNonRunningNonCorruptedTasks() {
final ProcessorStateManager stateManager = EasyMock.createStrictMock(ProcessorStateManager.class);
stateManager.markChangelogAsCorrupted(taskId00Partitions);
replay(stateManager);
final ProcessorStateManager stateManager = Mockito.mock(ProcessorStateManager.class);
final StateMachineTask corruptedTask = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
final StateMachineTask nonRunningNonCorruptedTask = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager);
@ -2158,13 +2150,12 @@ public class TaskManagerTest {
assertFalse(nonRunningNonCorruptedTask.commitPrepared);
verify(consumer);
Mockito.verify(stateManager).markChangelogAsCorrupted(taskId00Partitions);
}
@Test
public void shouldCleanAndReviveCorruptedStandbyTasksBeforeCommittingNonCorruptedTasks() {
final ProcessorStateManager stateManager = EasyMock.createStrictMock(ProcessorStateManager.class);
stateManager.markChangelogAsCorrupted(taskId00Partitions);
replay(stateManager);
final ProcessorStateManager stateManager = Mockito.mock(ProcessorStateManager.class);
final StateMachineTask corruptedStandby = new StateMachineTask(taskId00, taskId00Partitions, false, stateManager);
final StateMachineTask runningNonCorruptedActive = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager) {
@ -2199,11 +2190,12 @@ public class TaskManagerTest {
assertThat(corruptedStandby.commitPrepared, is(true));
assertThat(corruptedStandby.state(), is(Task.State.CREATED));
verify(consumer);
Mockito.verify(stateManager).markChangelogAsCorrupted(taskId00Partitions);
}
@Test
public void shouldNotAttemptToCommitInHandleCorruptedDuringARebalance() {
final ProcessorStateManager stateManager = EasyMock.createNiceMock(ProcessorStateManager.class);
final ProcessorStateManager stateManager = Mockito.mock(ProcessorStateManager.class);
expect(stateDirectory.listNonEmptyTaskDirectories()).andStubReturn(new ArrayList<>());
final StateMachineTask corruptedActive = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
@ -2224,7 +2216,7 @@ public class TaskManagerTest {
expect(consumer.assignment()).andStubReturn(union(HashSet::new, taskId00Partitions, taskId01Partitions));
replay(consumer, stateDirectory, stateManager);
replay(consumer, stateDirectory);
uncorruptedActive.setCommittableOffsetsAndMetadata(offsets);
@ -2250,10 +2242,8 @@ public class TaskManagerTest {
}
@Test
public void shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitWithALSO() {
final ProcessorStateManager stateManager = EasyMock.createStrictMock(ProcessorStateManager.class);
stateManager.markChangelogAsCorrupted(taskId00Partitions);
replay(stateManager);
public void shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitWithALOS() {
final ProcessorStateManager stateManager = Mockito.mock(ProcessorStateManager.class);
final StateMachineTask corruptedActive = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
final StateMachineTask uncorruptedActive = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager) {
@ -2310,6 +2300,7 @@ public class TaskManagerTest {
assertThat(corruptedActive.state(), is(Task.State.CREATED));
assertThat(uncorruptedActive.state(), is(Task.State.CREATED));
verify(consumer);
Mockito.verify(stateManager).markChangelogAsCorrupted(taskId00Partitions);
}
@Test
@ -2317,7 +2308,7 @@ public class TaskManagerTest {
final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
final StreamsProducer producer = mock(StreamsProducer.class);
when(activeTaskCreator.threadProducer()).thenReturn(producer);
final ProcessorStateManager stateManager = EasyMock.createMock(ProcessorStateManager.class);
final ProcessorStateManager stateManager = Mockito.mock(ProcessorStateManager.class);
final AtomicBoolean corruptedTaskChangelogMarkedAsCorrupted = new AtomicBoolean(false);
final StateMachineTask corruptedActiveTask = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) {
@ -2327,7 +2318,6 @@ public class TaskManagerTest {
corruptedTaskChangelogMarkedAsCorrupted.set(true);
}
};
stateManager.markChangelogAsCorrupted(taskId00ChangelogPartitions);
final AtomicBoolean uncorruptedTaskChangelogMarkedAsCorrupted = new AtomicBoolean(false);
final StateMachineTask uncorruptedActiveTask = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager) {
@ -2339,7 +2329,6 @@ public class TaskManagerTest {
};
final Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p1, new OffsetAndMetadata(0L, null));
uncorruptedActiveTask.setCommittableOffsetsAndMetadata(offsets);
stateManager.markChangelogAsCorrupted(taskId01ChangelogPartitions);
// handleAssignment
final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>();
@ -2357,7 +2346,7 @@ public class TaskManagerTest {
expect(consumer.assignment()).andStubReturn(union(HashSet::new, taskId00Partitions, taskId01Partitions));
replay(consumer, stateManager);
replay(consumer);
taskManager.handleAssignment(assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
@ -2394,6 +2383,8 @@ public class TaskManagerTest {
assertThat(corruptedTaskChangelogMarkedAsCorrupted.get(), is(true));
assertThat(uncorruptedTaskChangelogMarkedAsCorrupted.get(), is(true));
verify(consumer);
Mockito.verify(stateManager).markChangelogAsCorrupted(taskId00ChangelogPartitions);
Mockito.verify(stateManager).markChangelogAsCorrupted(taskId01ChangelogPartitions);
}
@Test
@ -2454,7 +2445,7 @@ public class TaskManagerTest {
final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
final StreamsProducer producer = mock(StreamsProducer.class);
when(activeTaskCreator.threadProducer()).thenReturn(producer);
final ProcessorStateManager stateManager = EasyMock.createMock(ProcessorStateManager.class);
final ProcessorStateManager stateManager = Mockito.mock(ProcessorStateManager.class);
final StateMachineTask revokedActiveTask = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
final Map<TopicPartition, OffsetAndMetadata> revokedActiveTaskOffsets = singletonMap(t1p0, new OffsetAndMetadata(0L, null));
@ -2479,9 +2470,6 @@ public class TaskManagerTest {
expectedCommittedOffsets.putAll(revokedActiveTaskOffsets);
expectedCommittedOffsets.putAll(unrevokedTaskOffsets);
stateManager.markChangelogAsCorrupted(taskId00ChangelogPartitions);
stateManager.markChangelogAsCorrupted(taskId01ChangelogPartitions);
final Map<TaskId, Set<TopicPartition>> assignmentActive = mkMap(
mkEntry(taskId00, taskId00Partitions),
mkEntry(taskId01, taskId01Partitions),
@ -2500,7 +2488,7 @@ public class TaskManagerTest {
expect(consumer.assignment()).andStubReturn(union(HashSet::new, taskId00Partitions, taskId01Partitions, taskId02Partitions));
replay(consumer, stateManager);
replay(consumer);
taskManager.handleAssignment(assignmentActive, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
@ -2519,6 +2507,8 @@ public class TaskManagerTest {
assertThat(revokedActiveTask.state(), is(State.SUSPENDED));
assertThat(unrevokedActiveTask.state(), is(State.CREATED));
assertThat(unrevokedActiveTaskWithoutCommitNeeded.state(), is(State.RUNNING));
Mockito.verify(stateManager).markChangelogAsCorrupted(taskId00ChangelogPartitions);
Mockito.verify(stateManager).markChangelogAsCorrupted(taskId01ChangelogPartitions);
}
@Test
@ -3594,24 +3584,23 @@ public class TaskManagerTest {
@Test
public void shouldCommitViaProducerIfEosAlphaEnabled() {
final StreamsProducer producer = EasyMock.mock(StreamsProducer.class);
final StreamsProducer producer = Mockito.mock(StreamsProducer.class);
when(activeTaskCreator.streamsProducerForTask(any(TaskId.class)))
.thenReturn(producer);
final Map<TopicPartition, OffsetAndMetadata> offsetsT01 = singletonMap(t1p1, new OffsetAndMetadata(0L, null));
final Map<TopicPartition, OffsetAndMetadata> offsetsT02 = singletonMap(t1p2, new OffsetAndMetadata(1L, null));
producer.commitTransaction(offsetsT01, new ConsumerGroupMetadata("appId"));
expectLastCall();
producer.commitTransaction(offsetsT02, new ConsumerGroupMetadata("appId"));
expectLastCall();
shouldCommitViaProducerIfEosEnabled(ProcessingMode.EXACTLY_ONCE_ALPHA, offsetsT01, offsetsT02);
shouldCommitViaProducerIfEosEnabled(ProcessingMode.EXACTLY_ONCE_ALPHA, producer, offsetsT01, offsetsT02);
Mockito.verify(producer).commitTransaction(offsetsT01, new ConsumerGroupMetadata("appId"));
Mockito.verify(producer).commitTransaction(offsetsT02, new ConsumerGroupMetadata("appId"));
Mockito.verifyNoMoreInteractions(producer);
}
@Test
public void shouldCommitViaProducerIfEosV2Enabled() {
final StreamsProducer producer = EasyMock.mock(StreamsProducer.class);
final StreamsProducer producer = Mockito.mock(StreamsProducer.class);
when(activeTaskCreator.threadProducer()).thenReturn(producer);
final Map<TopicPartition, OffsetAndMetadata> offsetsT01 = singletonMap(t1p1, new OffsetAndMetadata(0L, null));
@ -3620,14 +3609,13 @@ public class TaskManagerTest {
allOffsets.putAll(offsetsT01);
allOffsets.putAll(offsetsT02);
producer.commitTransaction(allOffsets, new ConsumerGroupMetadata("appId"));
expectLastCall();
shouldCommitViaProducerIfEosEnabled(ProcessingMode.EXACTLY_ONCE_V2, offsetsT01, offsetsT02);
shouldCommitViaProducerIfEosEnabled(ProcessingMode.EXACTLY_ONCE_V2, producer, offsetsT01, offsetsT02);
Mockito.verify(producer).commitTransaction(allOffsets, new ConsumerGroupMetadata("appId"));
Mockito.verifyNoMoreInteractions(producer);
}
private void shouldCommitViaProducerIfEosEnabled(final ProcessingMode processingMode,
final StreamsProducer producer,
final Map<TopicPartition, OffsetAndMetadata> offsetsT01,
final Map<TopicPartition, OffsetAndMetadata> offsetsT02) {
final TaskManager taskManager = setUpTaskManager(processingMode, false);
@ -3643,11 +3631,11 @@ public class TaskManagerTest {
reset(consumer);
expect(consumer.groupMetadata()).andStubReturn(new ConsumerGroupMetadata("appId"));
replay(consumer, producer);
replay(consumer);
taskManager.commitAll();
verify(producer, consumer);
verify(consumer);
}
@Test
@ -4533,21 +4521,18 @@ public class TaskManagerTest {
@Test
public void shouldConvertActiveTaskToStandbyTask() {
final StreamTask activeTask = EasyMock.mock(StreamTask.class);
expect(activeTask.id()).andStubReturn(taskId00);
expect(activeTask.inputPartitions()).andStubReturn(taskId00Partitions);
expect(activeTask.isActive()).andStubReturn(true);
expect(activeTask.prepareCommit()).andStubReturn(Collections.emptyMap());
final StreamTask activeTask = Mockito.mock(StreamTask.class);
when(activeTask.id()).thenReturn(taskId00);
when(activeTask.inputPartitions()).thenReturn(taskId00Partitions);
when(activeTask.isActive()).thenReturn(true);
final StandbyTask standbyTask = EasyMock.mock(StandbyTask.class);
expect(standbyTask.id()).andStubReturn(taskId00);
final StandbyTask standbyTask = Mockito.mock(StandbyTask.class);
when(standbyTask.id()).thenReturn(taskId00);
when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(activeTask));
activeTask.prepareRecycle();
expectLastCall().once();
when(standbyTaskCreator.createStandbyTaskFromActive(Mockito.any(), Mockito.eq(taskId00Partitions))).thenReturn(standbyTask);
replay(activeTask, standbyTask, consumer);
replay(consumer);
taskManager.handleAssignment(taskId00Assignment, Collections.emptyMap());
taskManager.handleAssignment(Collections.emptyMap(), taskId00Assignment);