diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java index 149c0243566..457508cd20e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java @@ -44,17 +44,11 @@ import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.test.MockStandbyUpdateListener; import org.apache.kafka.test.MockStateRestoreListener; import org.apache.kafka.test.StreamsTestUtils; -import org.easymock.EasyMock; -import org.easymock.EasyMockRule; -import org.easymock.EasyMockSupport; -import org.easymock.Mock; -import org.easymock.MockType; -import org.junit.After; -import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import org.mockito.Mockito; import org.mockito.junit.MockitoJUnit; import org.mockito.junit.MockitoRule; import org.mockito.junit.jupiter.MockitoSettings; @@ -82,13 +76,6 @@ import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_START; import static org.apache.kafka.test.MockStandbyUpdateListener.UPDATE_SUSPENDED; import static org.apache.kafka.test.MockStandbyUpdateListener.UPDATE_START; import static org.apache.kafka.test.MockStandbyUpdateListener.UPDATE_BATCH; -import static org.easymock.EasyMock.anyBoolean; -import static org.easymock.EasyMock.anyLong; -import static org.easymock.EasyMock.anyObject; -import static org.easymock.EasyMock.expectLastCall; -import static org.easymock.EasyMock.replay; -import static org.easymock.EasyMock.resetToDefault; -import static org.easymock.EasyMock.verify; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItem; @@ -97,15 +84,17 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.when; @MockitoSettings(strictness = Strictness.STRICT_STUBS) @RunWith(Parameterized.class) @SuppressWarnings("this-escape") -public class StoreChangelogReaderTest extends EasyMockSupport { - - @Rule - public EasyMockRule rule = new EasyMockRule(this); +public class StoreChangelogReaderTest { @Rule public final MockitoRule mockitoRule = MockitoJUnit.rule().strictness(Strictness.STRICT_STUBS); @@ -118,11 +107,11 @@ public class StoreChangelogReaderTest extends EasyMockSupport { private ProcessorStateManager standbyStateManager; @org.mockito.Mock private StateStoreMetadata storeMetadata; - @Mock(type = MockType.NICE) + @org.mockito.Mock private StateStoreMetadata storeMetadataOne; - @Mock(type = MockType.NICE) + @org.mockito.Mock private StateStoreMetadata storeMetadataTwo; - @Mock(type = MockType.NICE) + @org.mockito.Mock private StateStore store; @Parameterized.Parameters @@ -187,18 +176,8 @@ public class StoreChangelogReaderTest extends EasyMockSupport { when(storeMetadata.store()).thenReturn(store); } - @Before - public void setUp() { - EasyMock.expect(store.name()).andReturn(storeName).anyTimes(); - } - - @After - public void tearDown() { - EasyMock.reset( - storeMetadataOne, - storeMetadataTwo, - store - ); + private void setupStore() { + when(store.name()).thenReturn(storeName); } @Test @@ -223,9 +202,6 @@ public class StoreChangelogReaderTest extends EasyMockSupport { @Test public void shouldSupportUnregisterChangelogBeforeInitialization() { setupStateManagerMock(); - final Map mockTasks = mock(Map.class); - EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes(); - EasyMock.replay(mockTasks, store); adminClient.updateEndOffsets(Collections.singletonMap(tp, 100L)); @@ -256,15 +232,16 @@ public class StoreChangelogReaderTest extends EasyMockSupport { public void shouldSupportUnregisterChangelogBeforeCompletion() { setupStateManagerMock(); setupStoreMetadata(); + setupStore(); + @SuppressWarnings("unchecked") final Map mockTasks = mock(Map.class); - EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes(); - EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes(); + when(mockTasks.get(null)).thenReturn(mock(Task.class)); + when(mockTasks.containsKey(null)).thenReturn(true); when(storeMetadata.offset()).thenReturn(9L); if (type == STANDBY) { when(storeMetadata.endOffset()).thenReturn(10L); when(stateManager.changelogAsSource(tp)).thenReturn(true); } - EasyMock.replay(mockTasks, store); adminClient.updateEndOffsets(Collections.singletonMap(tp, 100L)); @@ -307,15 +284,16 @@ public class StoreChangelogReaderTest extends EasyMockSupport { public void shouldSupportUnregisterChangelogAfterCompletion() { setupStateManagerMock(); setupStoreMetadata(); + setupStore(); + @SuppressWarnings("unchecked") final Map mockTasks = mock(Map.class); - EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes(); - EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes(); + when(mockTasks.get(null)).thenReturn(mock(Task.class)); + when(mockTasks.containsKey(null)).thenReturn(true); when(storeMetadata.offset()).thenReturn(9L); if (type == STANDBY) { when(storeMetadata.endOffset()).thenReturn(10L); when(stateManager.changelogAsSource(tp)).thenReturn(true); } - EasyMock.replay(mockTasks, store); adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L)); @@ -364,11 +342,12 @@ public class StoreChangelogReaderTest extends EasyMockSupport { public void shouldInitializeChangelogAndCheckForCompletion() { setupStateManagerMock(); setupStoreMetadata(); + setupStore(); + @SuppressWarnings("unchecked") final Map mockTasks = mock(Map.class); - EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes(); - EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes(); + when(mockTasks.get(null)).thenReturn(mock(Task.class)); + when(mockTasks.containsKey(null)).thenReturn(true); when(storeMetadata.offset()).thenReturn(9L); - EasyMock.replay(mockTasks, store); adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L)); @@ -407,11 +386,11 @@ public class StoreChangelogReaderTest extends EasyMockSupport { if (type == ACTIVE) { setupStateManagerMock(); setupStoreMetadata(); + @SuppressWarnings("unchecked") final Map mockTasks = mock(Map.class); - EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes(); - EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes(); + when(mockTasks.get(null)).thenReturn(mock(Task.class)); + when(mockTasks.containsKey(null)).thenReturn(true); when(stateManager.changelogOffsets()).thenReturn(singletonMap(tp, 5L)); - EasyMock.replay(mockTasks, store); adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L)); @@ -437,6 +416,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport { public void shouldPollWithRightTimeoutWithStateUpdater() { setupStateManagerMock(); setupStoreMetadata(); + setupStore(); shouldPollWithRightTimeout(true); } @@ -444,6 +424,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport { public void shouldPollWithRightTimeoutWithoutStateUpdater() { setupStateManagerMock(); setupStoreMetadata(); + setupStore(); shouldPollWithRightTimeout(false); } @@ -457,6 +438,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport { public void shouldPollWithRightTimeoutWithStateUpdaterDefault() { setupStateManagerMock(); setupStoreMetadata(); + setupStore(); final Properties properties = new Properties(); shouldPollWithRightTimeout(properties); } @@ -466,7 +448,6 @@ public class StoreChangelogReaderTest extends EasyMockSupport { when(storeMetadata.offset()).thenReturn(null).thenReturn(9L); when(stateManager.taskId()).thenReturn(taskId); - EasyMock.replay(store); consumer.updateBeginningOffsets(Collections.singletonMap(tp, 5L)); adminClient.updateEndOffsets(Collections.singletonMap(tp, 11L)); @@ -499,6 +480,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport { public void shouldRestoreFromPositionAndCheckForCompletion() { setupStateManagerMock(); setupStoreMetadata(); + setupStore(); final TaskId taskId = new TaskId(0, 0); when(storeMetadata.offset()).thenReturn(5L); @@ -506,7 +488,6 @@ public class StoreChangelogReaderTest extends EasyMockSupport { when(storeMetadata.endOffset()).thenReturn(10L); } when(stateManager.taskId()).thenReturn(taskId); - EasyMock.replay(store); adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L)); @@ -572,6 +553,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport { public void shouldRestoreFromBeginningAndCheckCompletion() { setupStateManagerMock(); setupStoreMetadata(); + setupStore(); final TaskId taskId = new TaskId(0, 0); if (type == STANDBY && logContext.logger(StoreChangelogReader.class).isDebugEnabled()) { @@ -581,7 +563,6 @@ public class StoreChangelogReaderTest extends EasyMockSupport { when(storeMetadata.offset()).thenReturn(null).thenReturn(9L); } when(stateManager.taskId()).thenReturn(taskId); - EasyMock.replay(store); consumer.updateBeginningOffsets(Collections.singletonMap(tp, 5L)); adminClient.updateEndOffsets(Collections.singletonMap(tp, 11L)); @@ -653,11 +634,12 @@ public class StoreChangelogReaderTest extends EasyMockSupport { public void shouldCheckCompletionIfPositionLargerThanEndOffset() { setupActiveStateManager(); setupStoreMetadata(); + setupStore(); + @SuppressWarnings("unchecked") final Map mockTasks = mock(Map.class); - EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes(); - EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes(); + when(mockTasks.get(null)).thenReturn(mock(Task.class)); + when(mockTasks.containsKey(null)).thenReturn(true); when(storeMetadata.offset()).thenReturn(5L); - EasyMock.replay(mockTasks, store); adminClient.updateEndOffsets(Collections.singletonMap(tp, 0L)); @@ -683,18 +665,13 @@ public class StoreChangelogReaderTest extends EasyMockSupport { public void shouldRequestPositionAndHandleTimeoutException() { setupActiveStateManager(); setupStoreMetadata(); + setupStore(); final TaskId taskId = new TaskId(0, 0); final Task mockTask = mock(Task.class); - mockTask.clearTaskTimeout(); - mockTask.maybeInitTaskTimeoutOrThrow(anyLong(), anyObject()); - EasyMock.expectLastCall(); - mockTask.recordRestoration(anyObject(), anyLong(), anyBoolean()); - EasyMock.expectLastCall(); when(storeMetadata.offset()).thenReturn(10L); when(activeStateManager.changelogOffsets()).thenReturn(singletonMap(tp, 10L)); when(activeStateManager.taskId()).thenReturn(taskId); - EasyMock.replay(mockTask, store); final AtomicBoolean clearException = new AtomicBoolean(false); final MockConsumer consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { @@ -719,30 +696,29 @@ public class StoreChangelogReaderTest extends EasyMockSupport { assertEquals(StoreChangelogReader.ChangelogState.RESTORING, changelogReader.changelogMetadata(tp).state()); assertTrue(changelogReader.completedChangelogs().isEmpty()); assertEquals(10L, (long) changelogReader.changelogMetadata(tp).endOffset()); - verify(mockTask); + Mockito.verify(mockTask).clearTaskTimeout(); + Mockito.verify(mockTask).maybeInitTaskTimeoutOrThrow(anyLong(), any()); + Mockito.verify(mockTask).recordRestoration(any(), anyLong(), anyBoolean()); clearException.set(true); - resetToDefault(mockTask); - mockTask.clearTaskTimeout(); - EasyMock.expectLastCall(); - EasyMock.replay(mockTask); + Mockito.reset(mockTask); changelogReader.restore(Collections.singletonMap(taskId, mockTask)); assertEquals(StoreChangelogReader.ChangelogState.COMPLETED, changelogReader.changelogMetadata(tp).state()); assertEquals(10L, (long) changelogReader.changelogMetadata(tp).endOffset()); assertEquals(Collections.singleton(tp), changelogReader.completedChangelogs()); assertEquals(10L, consumer.position(tp)); - verify(mockTask); + Mockito.verify(mockTask).clearTaskTimeout(); } @Test public void shouldThrowIfPositionFail() { setupActiveStateManager(); setupStoreMetadata(); + setupStore(); final TaskId taskId = new TaskId(0, 0); when(activeStateManager.taskId()).thenReturn(taskId); when(storeMetadata.offset()).thenReturn(10L); - EasyMock.replay(store); final MockConsumer consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { @Override @@ -769,16 +745,14 @@ public class StoreChangelogReaderTest extends EasyMockSupport { public void shouldRequestEndOffsetsAndHandleTimeoutException() { setupActiveStateManager(); setupStoreMetadata(); + setupStore(); final TaskId taskId = new TaskId(0, 0); - final Task mockTask = niceMock(Task.class); - mockTask.maybeInitTaskTimeoutOrThrow(anyLong(), anyObject()); - EasyMock.expectLastCall(); + final Task mockTask = mock(Task.class); when(storeMetadata.offset()).thenReturn(5L); when(activeStateManager.changelogOffsets()).thenReturn(singletonMap(tp, 5L)); when(activeStateManager.taskId()).thenReturn(taskId); - EasyMock.replay(mockTask, store); final AtomicBoolean functionCalled = new AtomicBoolean(false); @@ -812,20 +786,17 @@ public class StoreChangelogReaderTest extends EasyMockSupport { assertEquals(StoreChangelogReader.ChangelogState.REGISTERED, changelogReader.changelogMetadata(tp).state()); assertNull(changelogReader.changelogMetadata(tp).endOffset()); assertTrue(functionCalled.get()); - verify(mockTask); + Mockito.verify(mockTask).maybeInitTaskTimeoutOrThrow(anyLong(), any()); - EasyMock.resetToDefault(mockTask); - mockTask.clearTaskTimeout(); - mockTask.recordRestoration(anyObject(), anyLong(), anyBoolean()); - EasyMock.expectLastCall(); - EasyMock.replay(mockTask); + Mockito.reset(mockTask); changelogReader.restore(Collections.singletonMap(taskId, mockTask)); assertEquals(StoreChangelogReader.ChangelogState.RESTORING, changelogReader.changelogMetadata(tp).state()); assertEquals(10L, (long) changelogReader.changelogMetadata(tp).endOffset()); assertEquals(6L, consumer.position(tp)); - verify(mockTask); + Mockito.verify(mockTask).clearTaskTimeout(); + Mockito.verify(mockTask).recordRestoration(any(), anyLong(), anyBoolean()); } @Test @@ -835,7 +806,6 @@ public class StoreChangelogReaderTest extends EasyMockSupport { final TaskId taskId = new TaskId(0, 0); when(activeStateManager.taskId()).thenReturn(taskId); - EasyMock.replay(store); final MockAdminClient adminClient = new MockAdminClient() { @Override @@ -862,6 +832,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport { public void shouldRequestCommittedOffsetsAndHandleTimeoutException() { setupStateManagerMock(); setupStoreMetadata(); + setupStore(); final TaskId taskId = new TaskId(0, 0); @@ -869,13 +840,10 @@ public class StoreChangelogReaderTest extends EasyMockSupport { if (type == ACTIVE) { mockTask.clearTaskTimeout(); } - mockTask.maybeInitTaskTimeoutOrThrow(anyLong(), anyObject()); - EasyMock.expectLastCall(); when(stateManager.changelogAsSource(tp)).thenReturn(true); when(storeMetadata.offset()).thenReturn(5L); when(stateManager.taskId()).thenReturn(taskId); - EasyMock.replay(mockTask, store); final AtomicBoolean functionCalled = new AtomicBoolean(false); final MockAdminClient adminClient = new MockAdminClient() { @@ -911,24 +879,19 @@ public class StoreChangelogReaderTest extends EasyMockSupport { assertEquals(0L, (long) changelogReader.changelogMetadata(tp).endOffset()); } assertTrue(functionCalled.get()); - verify(mockTask); + Mockito.verify(mockTask).maybeInitTaskTimeoutOrThrow(anyLong(), any()); - resetToDefault(mockTask); - if (type == ACTIVE) { - mockTask.clearTaskTimeout(); - mockTask.clearTaskTimeout(); - expectLastCall(); - mockTask.recordRestoration(anyObject(), anyLong(), anyBoolean()); - expectLastCall(); - } - replay(mockTask); + Mockito.reset(mockTask); changelogReader.restore(Collections.singletonMap(taskId, mockTask)); assertEquals(StoreChangelogReader.ChangelogState.RESTORING, changelogReader.changelogMetadata(tp).state()); assertEquals(type == ACTIVE ? 10L : 0L, (long) changelogReader.changelogMetadata(tp).endOffset()); assertEquals(6L, consumer.position(tp)); - verify(mockTask); + if (type == ACTIVE) { + Mockito.verify(mockTask, times(2)).clearTaskTimeout(); + Mockito.verify(mockTask).recordRestoration(any(), anyLong(), anyBoolean()); + } } @Test @@ -940,7 +903,6 @@ public class StoreChangelogReaderTest extends EasyMockSupport { when(stateManager.taskId()).thenReturn(taskId); when(stateManager.changelogAsSource(tp)).thenReturn(true); - EasyMock.replay(store); final MockAdminClient adminClient = new MockAdminClient() { @Override @@ -964,8 +926,6 @@ public class StoreChangelogReaderTest extends EasyMockSupport { @Test public void shouldThrowIfUnsubscribeFail() { - EasyMock.replay(store); - final MockConsumer consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { @Override public void unsubscribe() { @@ -983,12 +943,13 @@ public class StoreChangelogReaderTest extends EasyMockSupport { public void shouldOnlyRestoreStandbyChangelogInUpdateStandbyState() { setupStandbyStateManager(); setupStoreMetadata(); + setupStore(); + @SuppressWarnings("unchecked") final Map mockTasks = mock(Map.class); - EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes(); - EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes(); + when(mockTasks.get(null)).thenReturn(mock(Task.class)); + when(mockTasks.containsKey(null)).thenReturn(true); when(storeMetadata.offset()).thenReturn(3L); when(storeMetadata.endOffset()).thenReturn(20L); - EasyMock.replay(mockTasks, store); consumer.updateBeginningOffsets(Collections.singletonMap(tp, 0L)); changelogReader.register(tp, standbyStateManager); @@ -1026,13 +987,14 @@ public class StoreChangelogReaderTest extends EasyMockSupport { public void shouldNotUpdateLimitForNonSourceStandbyChangelog() { setupStandbyStateManager(); setupStoreMetadata(); + setupStore(); + @SuppressWarnings("unchecked") final Map mockTasks = mock(Map.class); - EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes(); - EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes(); + when(mockTasks.get(null)).thenReturn(mock(Task.class)); + when(mockTasks.containsKey(null)).thenReturn(true); when(storeMetadata.offset()).thenReturn(3L); when(storeMetadata.endOffset()).thenReturn(20L); when(standbyStateManager.changelogAsSource(tp)).thenReturn(false); - EasyMock.replay(mockTasks, store); final MockAdminClient adminClient = new MockAdminClient() { @Override @@ -1084,13 +1046,14 @@ public class StoreChangelogReaderTest extends EasyMockSupport { public void shouldRestoreToLimitInStandbyState() { setupStandbyStateManager(); setupStoreMetadata(); + setupStore(); + @SuppressWarnings("unchecked") final Map mockTasks = mock(Map.class); - EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes(); - EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes(); + when(mockTasks.get(null)).thenReturn(mock(Task.class)); + when(mockTasks.containsKey(null)).thenReturn(true); when(standbyStateManager.changelogAsSource(tp)).thenReturn(true); when(storeMetadata.offset()).thenReturn(3L); when(storeMetadata.endOffset()).thenReturn(20L); - EasyMock.replay(mockTasks, store); final long now = time.milliseconds(); final Properties properties = new Properties(); @@ -1201,16 +1164,18 @@ public class StoreChangelogReaderTest extends EasyMockSupport { public void shouldRestoreMultipleChangelogs() { setupActiveStateManager(); setupStoreMetadata(); + setupStore(); + @SuppressWarnings("unchecked") final Map mockTasks = mock(Map.class); - EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes(); - EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes(); - EasyMock.expect(storeMetadataOne.changelogPartition()).andReturn(tp1).anyTimes(); - EasyMock.expect(storeMetadataOne.store()).andReturn(store).anyTimes(); - EasyMock.expect(storeMetadataTwo.changelogPartition()).andReturn(tp2).anyTimes(); - EasyMock.expect(storeMetadataTwo.store()).andReturn(store).anyTimes(); + when(mockTasks.get(null)).thenReturn(mock(Task.class)); + when(mockTasks.containsKey(null)).thenReturn(true); + when(storeMetadataOne.changelogPartition()).thenReturn(tp1); + when(storeMetadataOne.store()).thenReturn(store); + when(storeMetadataTwo.changelogPartition()).thenReturn(tp2); + when(storeMetadataTwo.store()).thenReturn(store); when(storeMetadata.offset()).thenReturn(0L); - EasyMock.expect(storeMetadataOne.offset()).andReturn(0L).anyTimes(); - EasyMock.expect(storeMetadataTwo.offset()).andReturn(0L).anyTimes(); + when(storeMetadataOne.offset()).thenReturn(0L); + when(storeMetadataTwo.offset()).thenReturn(0L); when(activeStateManager.storeMetadata(tp1)).thenReturn(storeMetadataOne); when(activeStateManager.storeMetadata(tp2)).thenReturn(storeMetadataTwo); when(activeStateManager.changelogOffsets()).thenReturn(mkMap( @@ -1218,7 +1183,6 @@ public class StoreChangelogReaderTest extends EasyMockSupport { mkEntry(tp1, 5L), mkEntry(tp2, 5L) )); - EasyMock.replay(mockTasks, store, storeMetadataOne, storeMetadataTwo); setupConsumer(10, tp); setupConsumer(5, tp1); @@ -1252,21 +1216,21 @@ public class StoreChangelogReaderTest extends EasyMockSupport { public void shouldTransitState() { setupActiveStateManager(); setupStoreMetadata(); + setupStore(); when(standbyStateManager.taskType()).thenReturn(STANDBY); final TaskId taskId = new TaskId(0, 0); - EasyMock.expect(storeMetadataOne.changelogPartition()).andReturn(tp1).anyTimes(); - EasyMock.expect(storeMetadataOne.store()).andReturn(store).anyTimes(); - EasyMock.expect(storeMetadataTwo.changelogPartition()).andReturn(tp2).anyTimes(); - EasyMock.expect(storeMetadataTwo.store()).andReturn(store).anyTimes(); + when(storeMetadataOne.changelogPartition()).thenReturn(tp1); + when(storeMetadataOne.store()).thenReturn(store); + when(storeMetadataTwo.changelogPartition()).thenReturn(tp2); + when(storeMetadataTwo.store()).thenReturn(store); when(storeMetadata.offset()).thenReturn(5L); - EasyMock.expect(storeMetadataOne.offset()).andReturn(5L).anyTimes(); - EasyMock.expect(storeMetadataTwo.offset()).andReturn(5L).anyTimes(); + when(storeMetadataOne.offset()).thenReturn(5L); + when(storeMetadataTwo.offset()).thenReturn(5L); when(standbyStateManager.storeMetadata(tp1)).thenReturn(storeMetadataOne); when(standbyStateManager.storeMetadata(tp2)).thenReturn(storeMetadataTwo); when(activeStateManager.changelogOffsets()).thenReturn(singletonMap(tp, 5L)); when(activeStateManager.taskId()).thenReturn(taskId); when(standbyStateManager.taskId()).thenReturn(taskId); - EasyMock.replay(store, storeMetadataOne, storeMetadataTwo); adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L)); adminClient.updateEndOffsets(Collections.singletonMap(tp1, 10L)); @@ -1335,9 +1299,6 @@ public class StoreChangelogReaderTest extends EasyMockSupport { when(standbyStateManager.taskType()).thenReturn(STANDBY); final StoreChangelogReader changelogReader = new StoreChangelogReader(time, config, logContext, adminClient, consumer, callback, standbyListener); when(standbyStateManager.storeMetadata(tp1)).thenReturn(storeMetadataOne); - EasyMock.expect(storeMetadataOne.changelogPartition()).andReturn(tp1).anyTimes(); - EasyMock.expect(storeMetadataOne.store()).andReturn(store).anyTimes(); - EasyMock.replay(store, storeMetadataOne); changelogReader.register(tp1, standbyStateManager); changelogReader.transitToUpdateStandby(); @@ -1350,11 +1311,11 @@ public class StoreChangelogReaderTest extends EasyMockSupport { public void shouldThrowIfRestoreCallbackThrows() { setupActiveStateManager(); setupStoreMetadata(); + setupStore(); final TaskId taskId = new TaskId(0, 0); when(storeMetadata.offset()).thenReturn(5L); when(activeStateManager.taskId()).thenReturn(taskId); - EasyMock.replay(store); adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L));