KAFKA-14133: Migrate storeMetadata mock in StoreChangelogReaderTest to Mockito (#15116)

Reviewers: Divij Vaidya <diviv@amazon.com>
This commit is contained in:
Christo Lolov 2024-01-04 14:50:14 +02:00 committed by GitHub
parent 5bc3aa4280
commit e6f2624c48
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 77 additions and 62 deletions

View File

@ -116,7 +116,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
private ProcessorStateManager activeStateManager;
@org.mockito.Mock
private ProcessorStateManager standbyStateManager;
@Mock(type = MockType.NICE)
@org.mockito.Mock
private StateStoreMetadata storeMetadata;
@Mock(type = MockType.NICE)
private StateStoreMetadata storeMetadataOne;
@ -182,17 +182,19 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
when(standbyStateManager.taskType()).thenReturn(STANDBY);
}
private void setupStoreMetadata() {
when(storeMetadata.changelogPartition()).thenReturn(tp);
when(storeMetadata.store()).thenReturn(store);
}
@Before
public void setUp() {
EasyMock.expect(storeMetadata.changelogPartition()).andReturn(tp).anyTimes();
EasyMock.expect(storeMetadata.store()).andReturn(store).anyTimes();
EasyMock.expect(store.name()).andReturn(storeName).anyTimes();
}
@After
public void tearDown() {
EasyMock.reset(
storeMetadata,
storeMetadataOne,
storeMetadataTwo,
store
@ -202,7 +204,6 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
@Test
public void shouldNotRegisterSameStoreMultipleTimes() {
setupStateManagerMock();
EasyMock.replay(storeMetadata);
changelogReader.register(tp, stateManager);
@ -215,8 +216,6 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
@Test
public void shouldNotRegisterStoreWithoutMetadata() {
EasyMock.replay(storeMetadata);
assertThrows(IllegalStateException.class,
() -> changelogReader.register(new TopicPartition("ChangelogWithoutStoreMetadata", 0), stateManager));
}
@ -226,8 +225,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
setupStateManagerMock();
final Map<TaskId, Task> mockTasks = mock(Map.class);
EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes();
EasyMock.expect(storeMetadata.offset()).andReturn(9L).anyTimes();
EasyMock.replay(mockTasks, storeMetadata, store);
EasyMock.replay(mockTasks, store);
adminClient.updateEndOffsets(Collections.singletonMap(tp, 100L));
@ -257,15 +255,16 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
@Test
public void shouldSupportUnregisterChangelogBeforeCompletion() {
setupStateManagerMock();
setupStoreMetadata();
final Map<TaskId, Task> mockTasks = mock(Map.class);
EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes();
EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes();
EasyMock.expect(storeMetadata.offset()).andReturn(9L).anyTimes();
EasyMock.expect(storeMetadata.endOffset()).andReturn(10L).anyTimes();
when(storeMetadata.offset()).thenReturn(9L);
if (type == STANDBY) {
when(storeMetadata.endOffset()).thenReturn(10L);
when(stateManager.changelogAsSource(tp)).thenReturn(true);
}
EasyMock.replay(mockTasks, storeMetadata, store);
EasyMock.replay(mockTasks, store);
adminClient.updateEndOffsets(Collections.singletonMap(tp, 100L));
@ -307,15 +306,16 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
@Test
public void shouldSupportUnregisterChangelogAfterCompletion() {
setupStateManagerMock();
setupStoreMetadata();
final Map<TaskId, Task> mockTasks = mock(Map.class);
EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes();
EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes();
EasyMock.expect(storeMetadata.offset()).andReturn(9L).anyTimes();
EasyMock.expect(storeMetadata.endOffset()).andReturn(10L).anyTimes();
when(storeMetadata.offset()).thenReturn(9L);
if (type == STANDBY) {
when(storeMetadata.endOffset()).thenReturn(10L);
when(stateManager.changelogAsSource(tp)).thenReturn(true);
}
EasyMock.replay(mockTasks, storeMetadata, store);
EasyMock.replay(mockTasks, store);
adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L));
@ -363,12 +363,12 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
@Test
public void shouldInitializeChangelogAndCheckForCompletion() {
setupStateManagerMock();
setupStoreMetadata();
final Map<TaskId, Task> mockTasks = mock(Map.class);
EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes();
EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes();
EasyMock.expect(storeMetadata.offset()).andReturn(9L).anyTimes();
EasyMock.expect(storeMetadata.endOffset()).andReturn(10L).anyTimes();
EasyMock.replay(mockTasks, storeMetadata, store);
when(storeMetadata.offset()).thenReturn(9L);
EasyMock.replay(mockTasks, store);
adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L));
@ -406,11 +406,12 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
// restore listener is only triggered for active tasks
if (type == ACTIVE) {
setupStateManagerMock();
setupStoreMetadata();
final Map<TaskId, Task> mockTasks = mock(Map.class);
EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes();
EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes();
when(stateManager.changelogOffsets()).thenReturn(singletonMap(tp, 5L));
EasyMock.replay(mockTasks, storeMetadata, store);
EasyMock.replay(mockTasks, store);
adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L));
@ -435,12 +436,14 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
@Test
public void shouldPollWithRightTimeoutWithStateUpdater() {
setupStateManagerMock();
setupStoreMetadata();
shouldPollWithRightTimeout(true);
}
@Test
public void shouldPollWithRightTimeoutWithoutStateUpdater() {
setupStateManagerMock();
setupStoreMetadata();
shouldPollWithRightTimeout(false);
}
@ -453,6 +456,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
@Test
public void shouldPollWithRightTimeoutWithStateUpdaterDefault() {
setupStateManagerMock();
setupStoreMetadata();
final Properties properties = new Properties();
shouldPollWithRightTimeout(properties);
}
@ -460,10 +464,9 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
private void shouldPollWithRightTimeout(final Properties properties) {
final TaskId taskId = new TaskId(0, 0);
EasyMock.expect(storeMetadata.offset()).andReturn(null).andReturn(9L).anyTimes();
EasyMock.expect(storeMetadata.endOffset()).andReturn(10L).anyTimes();
when(storeMetadata.offset()).thenReturn(null).thenReturn(9L);
when(stateManager.taskId()).thenReturn(taskId);
EasyMock.replay(storeMetadata, store);
EasyMock.replay(store);
consumer.updateBeginningOffsets(Collections.singletonMap(tp, 5L));
adminClient.updateEndOffsets(Collections.singletonMap(tp, 11L));
@ -495,12 +498,15 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
@Test
public void shouldRestoreFromPositionAndCheckForCompletion() {
setupStateManagerMock();
setupStoreMetadata();
final TaskId taskId = new TaskId(0, 0);
EasyMock.expect(storeMetadata.offset()).andReturn(5L).anyTimes();
EasyMock.expect(storeMetadata.endOffset()).andReturn(10L).anyTimes();
when(storeMetadata.offset()).thenReturn(5L);
if (type == STANDBY) {
when(storeMetadata.endOffset()).thenReturn(10L);
}
when(stateManager.taskId()).thenReturn(taskId);
EasyMock.replay(storeMetadata, store);
EasyMock.replay(store);
adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L));
@ -565,17 +571,17 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
@Test
public void shouldRestoreFromBeginningAndCheckCompletion() {
setupStateManagerMock();
setupStoreMetadata();
final TaskId taskId = new TaskId(0, 0);
if (type == STANDBY && logContext.logger(StoreChangelogReader.class).isDebugEnabled()) {
EasyMock.expect(storeMetadata.offset()).andReturn(null).andReturn(null).andReturn(9L).anyTimes();
EasyMock.expect(storeMetadata.endOffset()).andReturn(10L).anyTimes();
when(storeMetadata.offset()).thenReturn(null).thenReturn(null).thenReturn(9L);
when(storeMetadata.endOffset()).thenReturn(10L);
} else {
EasyMock.expect(storeMetadata.offset()).andReturn(null).andReturn(9L).anyTimes();
EasyMock.expect(storeMetadata.endOffset()).andReturn(10L).anyTimes();
when(storeMetadata.offset()).thenReturn(null).thenReturn(9L);
}
when(stateManager.taskId()).thenReturn(taskId);
EasyMock.replay(storeMetadata, store);
EasyMock.replay(store);
consumer.updateBeginningOffsets(Collections.singletonMap(tp, 5L));
adminClient.updateEndOffsets(Collections.singletonMap(tp, 11L));
@ -646,11 +652,12 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
@Test
public void shouldCheckCompletionIfPositionLargerThanEndOffset() {
setupActiveStateManager();
setupStoreMetadata();
final Map<TaskId, Task> mockTasks = mock(Map.class);
EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes();
EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes();
EasyMock.expect(storeMetadata.offset()).andReturn(5L).anyTimes();
EasyMock.replay(mockTasks, storeMetadata, store);
when(storeMetadata.offset()).thenReturn(5L);
EasyMock.replay(mockTasks, store);
adminClient.updateEndOffsets(Collections.singletonMap(tp, 0L));
@ -675,6 +682,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
@Test
public void shouldRequestPositionAndHandleTimeoutException() {
setupActiveStateManager();
setupStoreMetadata();
final TaskId taskId = new TaskId(0, 0);
final Task mockTask = mock(Task.class);
@ -683,10 +691,10 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
EasyMock.expectLastCall();
mockTask.recordRestoration(anyObject(), anyLong(), anyBoolean());
EasyMock.expectLastCall();
EasyMock.expect(storeMetadata.offset()).andReturn(10L).anyTimes();
when(storeMetadata.offset()).thenReturn(10L);
when(activeStateManager.changelogOffsets()).thenReturn(singletonMap(tp, 10L));
when(activeStateManager.taskId()).thenReturn(taskId);
EasyMock.replay(mockTask, storeMetadata, store);
EasyMock.replay(mockTask, store);
final AtomicBoolean clearException = new AtomicBoolean(false);
final MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) {
@ -730,10 +738,11 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
@Test
public void shouldThrowIfPositionFail() {
setupActiveStateManager();
setupStoreMetadata();
final TaskId taskId = new TaskId(0, 0);
when(activeStateManager.taskId()).thenReturn(taskId);
EasyMock.expect(storeMetadata.offset()).andReturn(10L).anyTimes();
EasyMock.replay(storeMetadata, store);
when(storeMetadata.offset()).thenReturn(10L);
EasyMock.replay(store);
final MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) {
@Override
@ -759,16 +768,17 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
@Test
public void shouldRequestEndOffsetsAndHandleTimeoutException() {
setupActiveStateManager();
setupStoreMetadata();
final TaskId taskId = new TaskId(0, 0);
final Task mockTask = niceMock(Task.class);
mockTask.maybeInitTaskTimeoutOrThrow(anyLong(), anyObject());
EasyMock.expectLastCall();
EasyMock.expect(storeMetadata.offset()).andReturn(5L).anyTimes();
when(storeMetadata.offset()).thenReturn(5L);
when(activeStateManager.changelogOffsets()).thenReturn(singletonMap(tp, 5L));
when(activeStateManager.taskId()).thenReturn(taskId);
EasyMock.replay(mockTask, storeMetadata, store);
EasyMock.replay(mockTask, store);
final AtomicBoolean functionCalled = new AtomicBoolean(false);
@ -821,11 +831,11 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
@Test
public void shouldThrowIfEndOffsetsFail() {
setupActiveStateManager();
when(storeMetadata.changelogPartition()).thenReturn(tp);
final TaskId taskId = new TaskId(0, 0);
EasyMock.expect(storeMetadata.offset()).andReturn(10L).anyTimes();
when(activeStateManager.taskId()).thenReturn(taskId);
EasyMock.replay(storeMetadata, store);
EasyMock.replay(store);
final MockAdminClient adminClient = new MockAdminClient() {
@Override
@ -851,6 +861,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
@Test
public void shouldRequestCommittedOffsetsAndHandleTimeoutException() {
setupStateManagerMock();
setupStoreMetadata();
final TaskId taskId = new TaskId(0, 0);
@ -862,10 +873,9 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
EasyMock.expectLastCall();
when(stateManager.changelogAsSource(tp)).thenReturn(true);
EasyMock.expect(storeMetadata.offset()).andReturn(5L).anyTimes();
EasyMock.expect(storeMetadata.endOffset()).andReturn(10L).anyTimes();
when(storeMetadata.offset()).thenReturn(5L);
when(stateManager.taskId()).thenReturn(taskId);
EasyMock.replay(mockTask, storeMetadata, store);
EasyMock.replay(mockTask, store);
final AtomicBoolean functionCalled = new AtomicBoolean(false);
final MockAdminClient adminClient = new MockAdminClient() {
@ -924,13 +934,13 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
@Test
public void shouldThrowIfCommittedOffsetsFail() {
setupStateManagerMock();
when(storeMetadata.changelogPartition()).thenReturn(tp);
final TaskId taskId = new TaskId(0, 0);
when(stateManager.taskId()).thenReturn(taskId);
when(stateManager.changelogAsSource(tp)).thenReturn(true);
EasyMock.expect(storeMetadata.offset()).andReturn(10L).anyTimes();
EasyMock.replay(storeMetadata, store);
EasyMock.replay(store);
final MockAdminClient adminClient = new MockAdminClient() {
@Override
@ -954,7 +964,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
@Test
public void shouldThrowIfUnsubscribeFail() {
EasyMock.replay(storeMetadata, store);
EasyMock.replay(store);
final MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) {
@Override
@ -972,12 +982,13 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
@Test
public void shouldOnlyRestoreStandbyChangelogInUpdateStandbyState() {
setupStandbyStateManager();
setupStoreMetadata();
final Map<TaskId, Task> mockTasks = mock(Map.class);
EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes();
EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes();
EasyMock.expect(storeMetadata.offset()).andReturn(3L).anyTimes();
EasyMock.expect(storeMetadata.endOffset()).andReturn(20L).anyTimes();
EasyMock.replay(mockTasks, storeMetadata, store);
when(storeMetadata.offset()).thenReturn(3L);
when(storeMetadata.endOffset()).thenReturn(20L);
EasyMock.replay(mockTasks, store);
consumer.updateBeginningOffsets(Collections.singletonMap(tp, 0L));
changelogReader.register(tp, standbyStateManager);
@ -1014,14 +1025,14 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
@Test
public void shouldNotUpdateLimitForNonSourceStandbyChangelog() {
setupStandbyStateManager();
setupStoreMetadata();
final Map<TaskId, Task> mockTasks = mock(Map.class);
//EasyMock.expect(storeMetadata.offset()).andReturn(0L).anyTimes();
EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes();
EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes();
EasyMock.expect(storeMetadata.offset()).andReturn(3L).anyTimes();
EasyMock.expect(storeMetadata.endOffset()).andReturn(20L).anyTimes();
when(storeMetadata.offset()).thenReturn(3L);
when(storeMetadata.endOffset()).thenReturn(20L);
when(standbyStateManager.changelogAsSource(tp)).thenReturn(false);
EasyMock.replay(mockTasks, storeMetadata, store);
EasyMock.replay(mockTasks, store);
final MockAdminClient adminClient = new MockAdminClient() {
@Override
@ -1072,13 +1083,14 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
@Test
public void shouldRestoreToLimitInStandbyState() {
setupStandbyStateManager();
setupStoreMetadata();
final Map<TaskId, Task> mockTasks = mock(Map.class);
EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes();
EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes();
when(standbyStateManager.changelogAsSource(tp)).thenReturn(true);
EasyMock.expect(storeMetadata.offset()).andReturn(3L).anyTimes();
EasyMock.expect(storeMetadata.endOffset()).andReturn(20L).anyTimes();
EasyMock.replay(mockTasks, storeMetadata, store);
when(storeMetadata.offset()).thenReturn(3L);
when(storeMetadata.endOffset()).thenReturn(20L);
EasyMock.replay(mockTasks, store);
final long now = time.milliseconds();
final Properties properties = new Properties();
@ -1188,6 +1200,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
@Test
public void shouldRestoreMultipleChangelogs() {
setupActiveStateManager();
setupStoreMetadata();
final Map<TaskId, Task> mockTasks = mock(Map.class);
EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes();
EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes();
@ -1195,7 +1208,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
EasyMock.expect(storeMetadataOne.store()).andReturn(store).anyTimes();
EasyMock.expect(storeMetadataTwo.changelogPartition()).andReturn(tp2).anyTimes();
EasyMock.expect(storeMetadataTwo.store()).andReturn(store).anyTimes();
EasyMock.expect(storeMetadata.offset()).andReturn(0L).anyTimes();
when(storeMetadata.offset()).thenReturn(0L);
EasyMock.expect(storeMetadataOne.offset()).andReturn(0L).anyTimes();
EasyMock.expect(storeMetadataTwo.offset()).andReturn(0L).anyTimes();
when(activeStateManager.storeMetadata(tp1)).thenReturn(storeMetadataOne);
@ -1205,7 +1218,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
mkEntry(tp1, 5L),
mkEntry(tp2, 5L)
));
EasyMock.replay(mockTasks, storeMetadata, store, storeMetadataOne, storeMetadataTwo);
EasyMock.replay(mockTasks, store, storeMetadataOne, storeMetadataTwo);
setupConsumer(10, tp);
setupConsumer(5, tp1);
@ -1238,13 +1251,14 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
@Test
public void shouldTransitState() {
setupActiveStateManager();
setupStoreMetadata();
when(standbyStateManager.taskType()).thenReturn(STANDBY);
final TaskId taskId = new TaskId(0, 0);
EasyMock.expect(storeMetadataOne.changelogPartition()).andReturn(tp1).anyTimes();
EasyMock.expect(storeMetadataOne.store()).andReturn(store).anyTimes();
EasyMock.expect(storeMetadataTwo.changelogPartition()).andReturn(tp2).anyTimes();
EasyMock.expect(storeMetadataTwo.store()).andReturn(store).anyTimes();
EasyMock.expect(storeMetadata.offset()).andReturn(5L).anyTimes();
when(storeMetadata.offset()).thenReturn(5L);
EasyMock.expect(storeMetadataOne.offset()).andReturn(5L).anyTimes();
EasyMock.expect(storeMetadataTwo.offset()).andReturn(5L).anyTimes();
when(standbyStateManager.storeMetadata(tp1)).thenReturn(storeMetadataOne);
@ -1252,7 +1266,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
when(activeStateManager.changelogOffsets()).thenReturn(singletonMap(tp, 5L));
when(activeStateManager.taskId()).thenReturn(taskId);
when(standbyStateManager.taskId()).thenReturn(taskId);
EasyMock.replay(storeMetadata, store, storeMetadataOne, storeMetadataTwo);
EasyMock.replay(store, storeMetadataOne, storeMetadataTwo);
adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L));
adminClient.updateEndOffsets(Collections.singletonMap(tp1, 10L));
@ -1335,11 +1349,12 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
@Test
public void shouldThrowIfRestoreCallbackThrows() {
setupActiveStateManager();
setupStoreMetadata();
final TaskId taskId = new TaskId(0, 0);
EasyMock.expect(storeMetadata.offset()).andReturn(5L).anyTimes();
when(storeMetadata.offset()).thenReturn(5L);
when(activeStateManager.taskId()).thenReturn(taskId);
EasyMock.replay(storeMetadata, store);
EasyMock.replay(store);
adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L));