From 65a28246ad7fa827ed146aa59b58c420b47a1037 Mon Sep 17 00:00:00 2001 From: Christo Lolov Date: Tue, 2 Jan 2024 14:36:52 +0200 Subject: [PATCH] KAFKA-14133: Migrate stateManager mock in StoreChangelogReaderTest to Mockito (#14929) Reviewers: Divij Vaidya --- .../internals/StoreChangelogReaderTest.java | 91 +++++++++++-------- 1 file changed, 54 insertions(+), 37 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java index 0e69a9482b7..40de78f519b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java @@ -55,6 +55,10 @@ import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; import java.time.Duration; import java.util.Collections; @@ -93,7 +97,9 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.when; +@MockitoSettings(strictness = Strictness.STRICT_STUBS) @RunWith(Parameterized.class) @SuppressWarnings("this-escape") public class StoreChangelogReaderTest extends EasyMockSupport { @@ -101,7 +107,10 @@ public class StoreChangelogReaderTest extends EasyMockSupport { @Rule public EasyMockRule rule = new EasyMockRule(this); - @Mock(type = MockType.NICE) + @Rule + public final MockitoRule mockitoRule = MockitoJUnit.rule().strictness(Strictness.STRICT_STUBS); + + @org.mockito.Mock private ProcessorStateManager stateManager; @Mock(type = MockType.NICE) private ProcessorStateManager activeStateManager; @@ -158,10 +167,13 @@ public class StoreChangelogReaderTest extends EasyMockSupport { private final StoreChangelogReader changelogReader = new StoreChangelogReader(time, config, logContext, adminClient, consumer, callback, standbyListener); + private void setupStateManagerMock() { + when(stateManager.storeMetadata(tp)).thenReturn(storeMetadata); + when(stateManager.taskType()).thenReturn(type); + } + @Before public void setUp() { - EasyMock.expect(stateManager.storeMetadata(tp)).andReturn(storeMetadata).anyTimes(); - EasyMock.expect(stateManager.taskType()).andReturn(type).anyTimes(); EasyMock.expect(activeStateManager.storeMetadata(tp)).andReturn(storeMetadata).anyTimes(); EasyMock.expect(activeStateManager.taskType()).andReturn(ACTIVE).anyTimes(); EasyMock.expect(standbyStateManager.storeMetadata(tp)).andReturn(storeMetadata).anyTimes(); @@ -175,7 +187,6 @@ public class StoreChangelogReaderTest extends EasyMockSupport { @After public void tearDown() { EasyMock.reset( - stateManager, activeStateManager, standbyStateManager, storeMetadata, @@ -187,7 +198,8 @@ public class StoreChangelogReaderTest extends EasyMockSupport { @Test public void shouldNotRegisterSameStoreMultipleTimes() { - EasyMock.replay(stateManager, storeMetadata); + setupStateManagerMock(); + EasyMock.replay(storeMetadata); changelogReader.register(tp, stateManager); @@ -200,7 +212,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport { @Test public void shouldNotRegisterStoreWithoutMetadata() { - EasyMock.replay(stateManager, storeMetadata); + EasyMock.replay(storeMetadata); assertThrows(IllegalStateException.class, () -> changelogReader.register(new TopicPartition("ChangelogWithoutStoreMetadata", 0), stateManager)); @@ -208,11 +220,11 @@ public class StoreChangelogReaderTest extends EasyMockSupport { @Test public void shouldSupportUnregisterChangelogBeforeInitialization() { + setupStateManagerMock(); final Map mockTasks = mock(Map.class); EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes(); EasyMock.expect(storeMetadata.offset()).andReturn(9L).anyTimes(); - EasyMock.expect(stateManager.changelogOffsets()).andReturn(singletonMap(tp, 10L)); - EasyMock.replay(mockTasks, stateManager, storeMetadata, store); + EasyMock.replay(mockTasks, storeMetadata, store); adminClient.updateEndOffsets(Collections.singletonMap(tp, 100L)); @@ -241,16 +253,17 @@ public class StoreChangelogReaderTest extends EasyMockSupport { @Test public void shouldSupportUnregisterChangelogBeforeCompletion() { + setupStateManagerMock(); final Map mockTasks = mock(Map.class); EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes(); EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes(); EasyMock.expect(storeMetadata.offset()).andReturn(9L).anyTimes(); EasyMock.expect(storeMetadata.endOffset()).andReturn(10L).anyTimes(); - EasyMock.expect(stateManager.changelogOffsets()).andReturn(singletonMap(tp, 10L)); if (type == STANDBY) { - EasyMock.expect(stateManager.changelogAsSource(tp)).andReturn(true).anyTimes(); + when(stateManager.changelogAsSource(tp)).thenReturn(true); } - EasyMock.replay(mockTasks, stateManager, storeMetadata, store); + EasyMock.replay(mockTasks, storeMetadata, store); + adminClient.updateEndOffsets(Collections.singletonMap(tp, 100L)); final StoreChangelogReader changelogReader = @@ -290,16 +303,16 @@ public class StoreChangelogReaderTest extends EasyMockSupport { @Test public void shouldSupportUnregisterChangelogAfterCompletion() { + setupStateManagerMock(); final Map mockTasks = mock(Map.class); EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes(); EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes(); EasyMock.expect(storeMetadata.offset()).andReturn(9L).anyTimes(); EasyMock.expect(storeMetadata.endOffset()).andReturn(10L).anyTimes(); - EasyMock.expect(stateManager.changelogOffsets()).andReturn(singletonMap(tp, 10L)); if (type == STANDBY) { - EasyMock.expect(stateManager.changelogAsSource(tp)).andReturn(true).anyTimes(); + when(stateManager.changelogAsSource(tp)).thenReturn(true); } - EasyMock.replay(mockTasks, stateManager, storeMetadata, store); + EasyMock.replay(mockTasks, storeMetadata, store); adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L)); @@ -346,12 +359,13 @@ public class StoreChangelogReaderTest extends EasyMockSupport { @Test public void shouldInitializeChangelogAndCheckForCompletion() { + setupStateManagerMock(); final Map mockTasks = mock(Map.class); EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes(); EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes(); EasyMock.expect(storeMetadata.offset()).andReturn(9L).anyTimes(); EasyMock.expect(storeMetadata.endOffset()).andReturn(10L).anyTimes(); - EasyMock.replay(mockTasks, stateManager, storeMetadata, store); + EasyMock.replay(mockTasks, storeMetadata, store); adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L)); @@ -388,11 +402,12 @@ public class StoreChangelogReaderTest extends EasyMockSupport { public void shouldTriggerRestoreListenerWithOffsetZeroIfPositionThrowsTimeoutException() { // restore listener is only triggered for active tasks if (type == ACTIVE) { + setupStateManagerMock(); final Map mockTasks = mock(Map.class); EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes(); EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes(); - EasyMock.expect(stateManager.changelogOffsets()).andReturn(singletonMap(tp, 5L)); - EasyMock.replay(mockTasks, stateManager, storeMetadata, store); + when(stateManager.changelogOffsets()).thenReturn(singletonMap(tp, 5L)); + EasyMock.replay(mockTasks, storeMetadata, store); adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L)); @@ -416,11 +431,13 @@ public class StoreChangelogReaderTest extends EasyMockSupport { @Test public void shouldPollWithRightTimeoutWithStateUpdater() { + setupStateManagerMock(); shouldPollWithRightTimeout(true); } @Test public void shouldPollWithRightTimeoutWithoutStateUpdater() { + setupStateManagerMock(); shouldPollWithRightTimeout(false); } @@ -432,6 +449,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport { @Test public void shouldPollWithRightTimeoutWithStateUpdaterDefault() { + setupStateManagerMock(); final Properties properties = new Properties(); shouldPollWithRightTimeout(properties); } @@ -441,9 +459,8 @@ public class StoreChangelogReaderTest extends EasyMockSupport { EasyMock.expect(storeMetadata.offset()).andReturn(null).andReturn(9L).anyTimes(); EasyMock.expect(storeMetadata.endOffset()).andReturn(10L).anyTimes(); - EasyMock.expect(stateManager.changelogOffsets()).andReturn(singletonMap(tp, 5L)); - EasyMock.expect(stateManager.taskId()).andReturn(taskId).anyTimes(); - EasyMock.replay(stateManager, storeMetadata, store); + when(stateManager.taskId()).thenReturn(taskId); + EasyMock.replay(storeMetadata, store); consumer.updateBeginningOffsets(Collections.singletonMap(tp, 5L)); adminClient.updateEndOffsets(Collections.singletonMap(tp, 11L)); @@ -474,13 +491,13 @@ public class StoreChangelogReaderTest extends EasyMockSupport { @Test public void shouldRestoreFromPositionAndCheckForCompletion() { + setupStateManagerMock(); final TaskId taskId = new TaskId(0, 0); EasyMock.expect(storeMetadata.offset()).andReturn(5L).anyTimes(); EasyMock.expect(storeMetadata.endOffset()).andReturn(10L).anyTimes(); - EasyMock.expect(stateManager.changelogOffsets()).andReturn(singletonMap(tp, 5L)); - EasyMock.expect(stateManager.taskId()).andReturn(taskId).anyTimes(); - EasyMock.replay(stateManager, storeMetadata, store); + when(stateManager.taskId()).thenReturn(taskId); + EasyMock.replay(storeMetadata, store); adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L)); @@ -544,6 +561,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport { @Test public void shouldRestoreFromBeginningAndCheckCompletion() { + setupStateManagerMock(); final TaskId taskId = new TaskId(0, 0); if (type == STANDBY && logContext.logger(StoreChangelogReader.class).isDebugEnabled()) { @@ -553,9 +571,8 @@ public class StoreChangelogReaderTest extends EasyMockSupport { EasyMock.expect(storeMetadata.offset()).andReturn(null).andReturn(9L).anyTimes(); EasyMock.expect(storeMetadata.endOffset()).andReturn(10L).anyTimes(); } - EasyMock.expect(stateManager.changelogOffsets()).andReturn(singletonMap(tp, 5L)); - EasyMock.expect(stateManager.taskId()).andReturn(taskId).anyTimes(); - EasyMock.replay(stateManager, storeMetadata, store); + when(stateManager.taskId()).thenReturn(taskId); + EasyMock.replay(storeMetadata, store); consumer.updateBeginningOffsets(Collections.singletonMap(tp, 5L)); adminClient.updateEndOffsets(Collections.singletonMap(tp, 11L)); @@ -825,6 +842,8 @@ public class StoreChangelogReaderTest extends EasyMockSupport { @Test public void shouldRequestCommittedOffsetsAndHandleTimeoutException() { + setupStateManagerMock(); + final TaskId taskId = new TaskId(0, 0); final Task mockTask = mock(Task.class); @@ -834,12 +853,11 @@ public class StoreChangelogReaderTest extends EasyMockSupport { mockTask.maybeInitTaskTimeoutOrThrow(anyLong(), anyObject()); EasyMock.expectLastCall(); - EasyMock.expect(stateManager.changelogAsSource(tp)).andReturn(true).anyTimes(); + when(stateManager.changelogAsSource(tp)).thenReturn(true); EasyMock.expect(storeMetadata.offset()).andReturn(5L).anyTimes(); EasyMock.expect(storeMetadata.endOffset()).andReturn(10L).anyTimes(); - EasyMock.expect(stateManager.changelogOffsets()).andReturn(singletonMap(tp, 5L)); - EasyMock.expect(stateManager.taskId()).andReturn(taskId).anyTimes(); - EasyMock.replay(mockTask, stateManager, storeMetadata, store); + when(stateManager.taskId()).thenReturn(taskId); + EasyMock.replay(mockTask, storeMetadata, store); final AtomicBoolean functionCalled = new AtomicBoolean(false); final MockAdminClient adminClient = new MockAdminClient() { @@ -897,12 +915,14 @@ public class StoreChangelogReaderTest extends EasyMockSupport { @Test public void shouldThrowIfCommittedOffsetsFail() { + setupStateManagerMock(); + final TaskId taskId = new TaskId(0, 0); - EasyMock.expect(stateManager.taskId()).andReturn(taskId); - EasyMock.expect(stateManager.changelogAsSource(tp)).andReturn(true).anyTimes(); + when(stateManager.taskId()).thenReturn(taskId); + when(stateManager.changelogAsSource(tp)).thenReturn(true); EasyMock.expect(storeMetadata.offset()).andReturn(10L).anyTimes(); - EasyMock.replay(stateManager, storeMetadata, store); + EasyMock.replay(storeMetadata, store); final MockAdminClient adminClient = new MockAdminClient() { @Override @@ -926,7 +946,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport { @Test public void shouldThrowIfUnsubscribeFail() { - EasyMock.replay(stateManager, storeMetadata, store); + EasyMock.replay(storeMetadata, store); final MockConsumer consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { @Override @@ -946,7 +966,6 @@ public class StoreChangelogReaderTest extends EasyMockSupport { final Map mockTasks = mock(Map.class); EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes(); EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes(); - EasyMock.expect(stateManager.taskType()).andReturn(type); EasyMock.expect(storeMetadata.offset()).andReturn(3L).anyTimes(); EasyMock.expect(storeMetadata.endOffset()).andReturn(20L).anyTimes(); EasyMock.replay(mockTasks, standbyStateManager, storeMetadata, store); @@ -989,7 +1008,6 @@ public class StoreChangelogReaderTest extends EasyMockSupport { //EasyMock.expect(storeMetadata.offset()).andReturn(0L).anyTimes(); EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes(); EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes(); - EasyMock.expect(stateManager.taskType()).andReturn(type); EasyMock.expect(storeMetadata.offset()).andReturn(3L).anyTimes(); EasyMock.expect(storeMetadata.endOffset()).andReturn(20L).anyTimes(); EasyMock.expect(standbyStateManager.changelogAsSource(tp)).andReturn(false).anyTimes(); @@ -1047,7 +1065,6 @@ public class StoreChangelogReaderTest extends EasyMockSupport { EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes(); EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes(); EasyMock.expect(standbyStateManager.changelogAsSource(tp)).andReturn(true).anyTimes(); - EasyMock.expect(stateManager.taskType()).andReturn(type); EasyMock.expect(storeMetadata.offset()).andReturn(3L).anyTimes(); EasyMock.expect(storeMetadata.endOffset()).andReturn(20L).anyTimes(); EasyMock.replay(mockTasks, standbyStateManager, storeMetadata, store);