diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java index 7b8356bf1b2..149c0243566 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java @@ -116,7 +116,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport { private ProcessorStateManager activeStateManager; @org.mockito.Mock private ProcessorStateManager standbyStateManager; - @Mock(type = MockType.NICE) + @org.mockito.Mock private StateStoreMetadata storeMetadata; @Mock(type = MockType.NICE) private StateStoreMetadata storeMetadataOne; @@ -182,17 +182,19 @@ public class StoreChangelogReaderTest extends EasyMockSupport { when(standbyStateManager.taskType()).thenReturn(STANDBY); } + private void setupStoreMetadata() { + when(storeMetadata.changelogPartition()).thenReturn(tp); + when(storeMetadata.store()).thenReturn(store); + } + @Before public void setUp() { - EasyMock.expect(storeMetadata.changelogPartition()).andReturn(tp).anyTimes(); - EasyMock.expect(storeMetadata.store()).andReturn(store).anyTimes(); EasyMock.expect(store.name()).andReturn(storeName).anyTimes(); } @After public void tearDown() { EasyMock.reset( - storeMetadata, storeMetadataOne, storeMetadataTwo, store @@ -202,7 +204,6 @@ public class StoreChangelogReaderTest extends EasyMockSupport { @Test public void shouldNotRegisterSameStoreMultipleTimes() { setupStateManagerMock(); - EasyMock.replay(storeMetadata); changelogReader.register(tp, stateManager); @@ -215,8 +216,6 @@ public class StoreChangelogReaderTest extends EasyMockSupport { @Test public void shouldNotRegisterStoreWithoutMetadata() { - EasyMock.replay(storeMetadata); - assertThrows(IllegalStateException.class, () -> changelogReader.register(new TopicPartition("ChangelogWithoutStoreMetadata", 0), stateManager)); } @@ -226,8 +225,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport { setupStateManagerMock(); final Map mockTasks = mock(Map.class); EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes(); - EasyMock.expect(storeMetadata.offset()).andReturn(9L).anyTimes(); - EasyMock.replay(mockTasks, storeMetadata, store); + EasyMock.replay(mockTasks, store); adminClient.updateEndOffsets(Collections.singletonMap(tp, 100L)); @@ -257,15 +255,16 @@ public class StoreChangelogReaderTest extends EasyMockSupport { @Test public void shouldSupportUnregisterChangelogBeforeCompletion() { setupStateManagerMock(); + setupStoreMetadata(); final Map mockTasks = mock(Map.class); EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes(); EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes(); - EasyMock.expect(storeMetadata.offset()).andReturn(9L).anyTimes(); - EasyMock.expect(storeMetadata.endOffset()).andReturn(10L).anyTimes(); + when(storeMetadata.offset()).thenReturn(9L); if (type == STANDBY) { + when(storeMetadata.endOffset()).thenReturn(10L); when(stateManager.changelogAsSource(tp)).thenReturn(true); } - EasyMock.replay(mockTasks, storeMetadata, store); + EasyMock.replay(mockTasks, store); adminClient.updateEndOffsets(Collections.singletonMap(tp, 100L)); @@ -307,15 +306,16 @@ public class StoreChangelogReaderTest extends EasyMockSupport { @Test public void shouldSupportUnregisterChangelogAfterCompletion() { setupStateManagerMock(); + setupStoreMetadata(); final Map mockTasks = mock(Map.class); EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes(); EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes(); - EasyMock.expect(storeMetadata.offset()).andReturn(9L).anyTimes(); - EasyMock.expect(storeMetadata.endOffset()).andReturn(10L).anyTimes(); + when(storeMetadata.offset()).thenReturn(9L); if (type == STANDBY) { + when(storeMetadata.endOffset()).thenReturn(10L); when(stateManager.changelogAsSource(tp)).thenReturn(true); } - EasyMock.replay(mockTasks, storeMetadata, store); + EasyMock.replay(mockTasks, store); adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L)); @@ -363,12 +363,12 @@ public class StoreChangelogReaderTest extends EasyMockSupport { @Test public void shouldInitializeChangelogAndCheckForCompletion() { setupStateManagerMock(); + setupStoreMetadata(); final Map mockTasks = mock(Map.class); EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes(); EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes(); - EasyMock.expect(storeMetadata.offset()).andReturn(9L).anyTimes(); - EasyMock.expect(storeMetadata.endOffset()).andReturn(10L).anyTimes(); - EasyMock.replay(mockTasks, storeMetadata, store); + when(storeMetadata.offset()).thenReturn(9L); + EasyMock.replay(mockTasks, store); adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L)); @@ -406,11 +406,12 @@ public class StoreChangelogReaderTest extends EasyMockSupport { // restore listener is only triggered for active tasks if (type == ACTIVE) { setupStateManagerMock(); + setupStoreMetadata(); final Map mockTasks = mock(Map.class); EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes(); EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes(); when(stateManager.changelogOffsets()).thenReturn(singletonMap(tp, 5L)); - EasyMock.replay(mockTasks, storeMetadata, store); + EasyMock.replay(mockTasks, store); adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L)); @@ -435,12 +436,14 @@ public class StoreChangelogReaderTest extends EasyMockSupport { @Test public void shouldPollWithRightTimeoutWithStateUpdater() { setupStateManagerMock(); + setupStoreMetadata(); shouldPollWithRightTimeout(true); } @Test public void shouldPollWithRightTimeoutWithoutStateUpdater() { setupStateManagerMock(); + setupStoreMetadata(); shouldPollWithRightTimeout(false); } @@ -453,6 +456,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport { @Test public void shouldPollWithRightTimeoutWithStateUpdaterDefault() { setupStateManagerMock(); + setupStoreMetadata(); final Properties properties = new Properties(); shouldPollWithRightTimeout(properties); } @@ -460,10 +464,9 @@ public class StoreChangelogReaderTest extends EasyMockSupport { private void shouldPollWithRightTimeout(final Properties properties) { final TaskId taskId = new TaskId(0, 0); - EasyMock.expect(storeMetadata.offset()).andReturn(null).andReturn(9L).anyTimes(); - EasyMock.expect(storeMetadata.endOffset()).andReturn(10L).anyTimes(); + when(storeMetadata.offset()).thenReturn(null).thenReturn(9L); when(stateManager.taskId()).thenReturn(taskId); - EasyMock.replay(storeMetadata, store); + EasyMock.replay(store); consumer.updateBeginningOffsets(Collections.singletonMap(tp, 5L)); adminClient.updateEndOffsets(Collections.singletonMap(tp, 11L)); @@ -495,12 +498,15 @@ public class StoreChangelogReaderTest extends EasyMockSupport { @Test public void shouldRestoreFromPositionAndCheckForCompletion() { setupStateManagerMock(); + setupStoreMetadata(); final TaskId taskId = new TaskId(0, 0); - EasyMock.expect(storeMetadata.offset()).andReturn(5L).anyTimes(); - EasyMock.expect(storeMetadata.endOffset()).andReturn(10L).anyTimes(); + when(storeMetadata.offset()).thenReturn(5L); + if (type == STANDBY) { + when(storeMetadata.endOffset()).thenReturn(10L); + } when(stateManager.taskId()).thenReturn(taskId); - EasyMock.replay(storeMetadata, store); + EasyMock.replay(store); adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L)); @@ -565,17 +571,17 @@ public class StoreChangelogReaderTest extends EasyMockSupport { @Test public void shouldRestoreFromBeginningAndCheckCompletion() { setupStateManagerMock(); + setupStoreMetadata(); final TaskId taskId = new TaskId(0, 0); if (type == STANDBY && logContext.logger(StoreChangelogReader.class).isDebugEnabled()) { - EasyMock.expect(storeMetadata.offset()).andReturn(null).andReturn(null).andReturn(9L).anyTimes(); - EasyMock.expect(storeMetadata.endOffset()).andReturn(10L).anyTimes(); + when(storeMetadata.offset()).thenReturn(null).thenReturn(null).thenReturn(9L); + when(storeMetadata.endOffset()).thenReturn(10L); } else { - EasyMock.expect(storeMetadata.offset()).andReturn(null).andReturn(9L).anyTimes(); - EasyMock.expect(storeMetadata.endOffset()).andReturn(10L).anyTimes(); + when(storeMetadata.offset()).thenReturn(null).thenReturn(9L); } when(stateManager.taskId()).thenReturn(taskId); - EasyMock.replay(storeMetadata, store); + EasyMock.replay(store); consumer.updateBeginningOffsets(Collections.singletonMap(tp, 5L)); adminClient.updateEndOffsets(Collections.singletonMap(tp, 11L)); @@ -646,11 +652,12 @@ public class StoreChangelogReaderTest extends EasyMockSupport { @Test public void shouldCheckCompletionIfPositionLargerThanEndOffset() { setupActiveStateManager(); + setupStoreMetadata(); final Map mockTasks = mock(Map.class); EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes(); EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes(); - EasyMock.expect(storeMetadata.offset()).andReturn(5L).anyTimes(); - EasyMock.replay(mockTasks, storeMetadata, store); + when(storeMetadata.offset()).thenReturn(5L); + EasyMock.replay(mockTasks, store); adminClient.updateEndOffsets(Collections.singletonMap(tp, 0L)); @@ -675,6 +682,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport { @Test public void shouldRequestPositionAndHandleTimeoutException() { setupActiveStateManager(); + setupStoreMetadata(); final TaskId taskId = new TaskId(0, 0); final Task mockTask = mock(Task.class); @@ -683,10 +691,10 @@ public class StoreChangelogReaderTest extends EasyMockSupport { EasyMock.expectLastCall(); mockTask.recordRestoration(anyObject(), anyLong(), anyBoolean()); EasyMock.expectLastCall(); - EasyMock.expect(storeMetadata.offset()).andReturn(10L).anyTimes(); + when(storeMetadata.offset()).thenReturn(10L); when(activeStateManager.changelogOffsets()).thenReturn(singletonMap(tp, 10L)); when(activeStateManager.taskId()).thenReturn(taskId); - EasyMock.replay(mockTask, storeMetadata, store); + EasyMock.replay(mockTask, store); final AtomicBoolean clearException = new AtomicBoolean(false); final MockConsumer consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { @@ -730,10 +738,11 @@ public class StoreChangelogReaderTest extends EasyMockSupport { @Test public void shouldThrowIfPositionFail() { setupActiveStateManager(); + setupStoreMetadata(); final TaskId taskId = new TaskId(0, 0); when(activeStateManager.taskId()).thenReturn(taskId); - EasyMock.expect(storeMetadata.offset()).andReturn(10L).anyTimes(); - EasyMock.replay(storeMetadata, store); + when(storeMetadata.offset()).thenReturn(10L); + EasyMock.replay(store); final MockConsumer consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { @Override @@ -759,16 +768,17 @@ public class StoreChangelogReaderTest extends EasyMockSupport { @Test public void shouldRequestEndOffsetsAndHandleTimeoutException() { setupActiveStateManager(); + setupStoreMetadata(); final TaskId taskId = new TaskId(0, 0); final Task mockTask = niceMock(Task.class); mockTask.maybeInitTaskTimeoutOrThrow(anyLong(), anyObject()); EasyMock.expectLastCall(); - EasyMock.expect(storeMetadata.offset()).andReturn(5L).anyTimes(); + when(storeMetadata.offset()).thenReturn(5L); when(activeStateManager.changelogOffsets()).thenReturn(singletonMap(tp, 5L)); when(activeStateManager.taskId()).thenReturn(taskId); - EasyMock.replay(mockTask, storeMetadata, store); + EasyMock.replay(mockTask, store); final AtomicBoolean functionCalled = new AtomicBoolean(false); @@ -821,11 +831,11 @@ public class StoreChangelogReaderTest extends EasyMockSupport { @Test public void shouldThrowIfEndOffsetsFail() { setupActiveStateManager(); + when(storeMetadata.changelogPartition()).thenReturn(tp); final TaskId taskId = new TaskId(0, 0); - EasyMock.expect(storeMetadata.offset()).andReturn(10L).anyTimes(); when(activeStateManager.taskId()).thenReturn(taskId); - EasyMock.replay(storeMetadata, store); + EasyMock.replay(store); final MockAdminClient adminClient = new MockAdminClient() { @Override @@ -851,6 +861,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport { @Test public void shouldRequestCommittedOffsetsAndHandleTimeoutException() { setupStateManagerMock(); + setupStoreMetadata(); final TaskId taskId = new TaskId(0, 0); @@ -862,10 +873,9 @@ public class StoreChangelogReaderTest extends EasyMockSupport { EasyMock.expectLastCall(); when(stateManager.changelogAsSource(tp)).thenReturn(true); - EasyMock.expect(storeMetadata.offset()).andReturn(5L).anyTimes(); - EasyMock.expect(storeMetadata.endOffset()).andReturn(10L).anyTimes(); + when(storeMetadata.offset()).thenReturn(5L); when(stateManager.taskId()).thenReturn(taskId); - EasyMock.replay(mockTask, storeMetadata, store); + EasyMock.replay(mockTask, store); final AtomicBoolean functionCalled = new AtomicBoolean(false); final MockAdminClient adminClient = new MockAdminClient() { @@ -924,13 +934,13 @@ public class StoreChangelogReaderTest extends EasyMockSupport { @Test public void shouldThrowIfCommittedOffsetsFail() { setupStateManagerMock(); + when(storeMetadata.changelogPartition()).thenReturn(tp); final TaskId taskId = new TaskId(0, 0); when(stateManager.taskId()).thenReturn(taskId); when(stateManager.changelogAsSource(tp)).thenReturn(true); - EasyMock.expect(storeMetadata.offset()).andReturn(10L).anyTimes(); - EasyMock.replay(storeMetadata, store); + EasyMock.replay(store); final MockAdminClient adminClient = new MockAdminClient() { @Override @@ -954,7 +964,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport { @Test public void shouldThrowIfUnsubscribeFail() { - EasyMock.replay(storeMetadata, store); + EasyMock.replay(store); final MockConsumer consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { @Override @@ -972,12 +982,13 @@ public class StoreChangelogReaderTest extends EasyMockSupport { @Test public void shouldOnlyRestoreStandbyChangelogInUpdateStandbyState() { setupStandbyStateManager(); + setupStoreMetadata(); final Map mockTasks = mock(Map.class); EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes(); EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes(); - EasyMock.expect(storeMetadata.offset()).andReturn(3L).anyTimes(); - EasyMock.expect(storeMetadata.endOffset()).andReturn(20L).anyTimes(); - EasyMock.replay(mockTasks, storeMetadata, store); + when(storeMetadata.offset()).thenReturn(3L); + when(storeMetadata.endOffset()).thenReturn(20L); + EasyMock.replay(mockTasks, store); consumer.updateBeginningOffsets(Collections.singletonMap(tp, 0L)); changelogReader.register(tp, standbyStateManager); @@ -1014,14 +1025,14 @@ public class StoreChangelogReaderTest extends EasyMockSupport { @Test public void shouldNotUpdateLimitForNonSourceStandbyChangelog() { setupStandbyStateManager(); + setupStoreMetadata(); final Map mockTasks = mock(Map.class); - //EasyMock.expect(storeMetadata.offset()).andReturn(0L).anyTimes(); EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes(); EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes(); - EasyMock.expect(storeMetadata.offset()).andReturn(3L).anyTimes(); - EasyMock.expect(storeMetadata.endOffset()).andReturn(20L).anyTimes(); + when(storeMetadata.offset()).thenReturn(3L); + when(storeMetadata.endOffset()).thenReturn(20L); when(standbyStateManager.changelogAsSource(tp)).thenReturn(false); - EasyMock.replay(mockTasks, storeMetadata, store); + EasyMock.replay(mockTasks, store); final MockAdminClient adminClient = new MockAdminClient() { @Override @@ -1072,13 +1083,14 @@ public class StoreChangelogReaderTest extends EasyMockSupport { @Test public void shouldRestoreToLimitInStandbyState() { setupStandbyStateManager(); + setupStoreMetadata(); final Map mockTasks = mock(Map.class); EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes(); EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes(); when(standbyStateManager.changelogAsSource(tp)).thenReturn(true); - EasyMock.expect(storeMetadata.offset()).andReturn(3L).anyTimes(); - EasyMock.expect(storeMetadata.endOffset()).andReturn(20L).anyTimes(); - EasyMock.replay(mockTasks, storeMetadata, store); + when(storeMetadata.offset()).thenReturn(3L); + when(storeMetadata.endOffset()).thenReturn(20L); + EasyMock.replay(mockTasks, store); final long now = time.milliseconds(); final Properties properties = new Properties(); @@ -1188,6 +1200,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport { @Test public void shouldRestoreMultipleChangelogs() { setupActiveStateManager(); + setupStoreMetadata(); final Map mockTasks = mock(Map.class); EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes(); EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes(); @@ -1195,7 +1208,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport { EasyMock.expect(storeMetadataOne.store()).andReturn(store).anyTimes(); EasyMock.expect(storeMetadataTwo.changelogPartition()).andReturn(tp2).anyTimes(); EasyMock.expect(storeMetadataTwo.store()).andReturn(store).anyTimes(); - EasyMock.expect(storeMetadata.offset()).andReturn(0L).anyTimes(); + when(storeMetadata.offset()).thenReturn(0L); EasyMock.expect(storeMetadataOne.offset()).andReturn(0L).anyTimes(); EasyMock.expect(storeMetadataTwo.offset()).andReturn(0L).anyTimes(); when(activeStateManager.storeMetadata(tp1)).thenReturn(storeMetadataOne); @@ -1205,7 +1218,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport { mkEntry(tp1, 5L), mkEntry(tp2, 5L) )); - EasyMock.replay(mockTasks, storeMetadata, store, storeMetadataOne, storeMetadataTwo); + EasyMock.replay(mockTasks, store, storeMetadataOne, storeMetadataTwo); setupConsumer(10, tp); setupConsumer(5, tp1); @@ -1238,13 +1251,14 @@ public class StoreChangelogReaderTest extends EasyMockSupport { @Test public void shouldTransitState() { setupActiveStateManager(); + setupStoreMetadata(); when(standbyStateManager.taskType()).thenReturn(STANDBY); final TaskId taskId = new TaskId(0, 0); EasyMock.expect(storeMetadataOne.changelogPartition()).andReturn(tp1).anyTimes(); EasyMock.expect(storeMetadataOne.store()).andReturn(store).anyTimes(); EasyMock.expect(storeMetadataTwo.changelogPartition()).andReturn(tp2).anyTimes(); EasyMock.expect(storeMetadataTwo.store()).andReturn(store).anyTimes(); - EasyMock.expect(storeMetadata.offset()).andReturn(5L).anyTimes(); + when(storeMetadata.offset()).thenReturn(5L); EasyMock.expect(storeMetadataOne.offset()).andReturn(5L).anyTimes(); EasyMock.expect(storeMetadataTwo.offset()).andReturn(5L).anyTimes(); when(standbyStateManager.storeMetadata(tp1)).thenReturn(storeMetadataOne); @@ -1252,7 +1266,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport { when(activeStateManager.changelogOffsets()).thenReturn(singletonMap(tp, 5L)); when(activeStateManager.taskId()).thenReturn(taskId); when(standbyStateManager.taskId()).thenReturn(taskId); - EasyMock.replay(storeMetadata, store, storeMetadataOne, storeMetadataTwo); + EasyMock.replay(store, storeMetadataOne, storeMetadataTwo); adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L)); adminClient.updateEndOffsets(Collections.singletonMap(tp1, 10L)); @@ -1335,11 +1349,12 @@ public class StoreChangelogReaderTest extends EasyMockSupport { @Test public void shouldThrowIfRestoreCallbackThrows() { setupActiveStateManager(); + setupStoreMetadata(); final TaskId taskId = new TaskId(0, 0); - EasyMock.expect(storeMetadata.offset()).andReturn(5L).anyTimes(); + when(storeMetadata.offset()).thenReturn(5L); when(activeStateManager.taskId()).thenReturn(taskId); - EasyMock.replay(storeMetadata, store); + EasyMock.replay(store); adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L));