mirror of https://github.com/apache/kafka.git
KAFKA-14133: Migrate remaining mocks in StoreChangelogReaderTest to Mockito (#15125)
Reviewers: Divij Vaidya <diviv@amazon.com>
This commit is contained in:
parent
e6f2624c48
commit
c703ce2563
|
@ -44,17 +44,11 @@ import org.apache.kafka.common.utils.LogCaptureAppender;
|
|||
import org.apache.kafka.test.MockStandbyUpdateListener;
|
||||
import org.apache.kafka.test.MockStateRestoreListener;
|
||||
import org.apache.kafka.test.StreamsTestUtils;
|
||||
import org.easymock.EasyMock;
|
||||
import org.easymock.EasyMockRule;
|
||||
import org.easymock.EasyMockSupport;
|
||||
import org.easymock.Mock;
|
||||
import org.easymock.MockType;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.junit.MockitoJUnit;
|
||||
import org.mockito.junit.MockitoRule;
|
||||
import org.mockito.junit.jupiter.MockitoSettings;
|
||||
|
@ -82,13 +76,6 @@ import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_START;
|
|||
import static org.apache.kafka.test.MockStandbyUpdateListener.UPDATE_SUSPENDED;
|
||||
import static org.apache.kafka.test.MockStandbyUpdateListener.UPDATE_START;
|
||||
import static org.apache.kafka.test.MockStandbyUpdateListener.UPDATE_BATCH;
|
||||
import static org.easymock.EasyMock.anyBoolean;
|
||||
import static org.easymock.EasyMock.anyLong;
|
||||
import static org.easymock.EasyMock.anyObject;
|
||||
import static org.easymock.EasyMock.expectLastCall;
|
||||
import static org.easymock.EasyMock.replay;
|
||||
import static org.easymock.EasyMock.resetToDefault;
|
||||
import static org.easymock.EasyMock.verify;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.hasItem;
|
||||
|
@ -97,15 +84,17 @@ import static org.junit.Assert.assertFalse;
|
|||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertThrows;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.anyBoolean;
|
||||
import static org.mockito.ArgumentMatchers.anyLong;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
@MockitoSettings(strictness = Strictness.STRICT_STUBS)
|
||||
@RunWith(Parameterized.class)
|
||||
@SuppressWarnings("this-escape")
|
||||
public class StoreChangelogReaderTest extends EasyMockSupport {
|
||||
|
||||
@Rule
|
||||
public EasyMockRule rule = new EasyMockRule(this);
|
||||
public class StoreChangelogReaderTest {
|
||||
|
||||
@Rule
|
||||
public final MockitoRule mockitoRule = MockitoJUnit.rule().strictness(Strictness.STRICT_STUBS);
|
||||
|
@ -118,11 +107,11 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
|
|||
private ProcessorStateManager standbyStateManager;
|
||||
@org.mockito.Mock
|
||||
private StateStoreMetadata storeMetadata;
|
||||
@Mock(type = MockType.NICE)
|
||||
@org.mockito.Mock
|
||||
private StateStoreMetadata storeMetadataOne;
|
||||
@Mock(type = MockType.NICE)
|
||||
@org.mockito.Mock
|
||||
private StateStoreMetadata storeMetadataTwo;
|
||||
@Mock(type = MockType.NICE)
|
||||
@org.mockito.Mock
|
||||
private StateStore store;
|
||||
|
||||
@Parameterized.Parameters
|
||||
|
@ -187,18 +176,8 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
|
|||
when(storeMetadata.store()).thenReturn(store);
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
EasyMock.expect(store.name()).andReturn(storeName).anyTimes();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() {
|
||||
EasyMock.reset(
|
||||
storeMetadataOne,
|
||||
storeMetadataTwo,
|
||||
store
|
||||
);
|
||||
private void setupStore() {
|
||||
when(store.name()).thenReturn(storeName);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -223,9 +202,6 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
|
|||
@Test
|
||||
public void shouldSupportUnregisterChangelogBeforeInitialization() {
|
||||
setupStateManagerMock();
|
||||
final Map<TaskId, Task> mockTasks = mock(Map.class);
|
||||
EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes();
|
||||
EasyMock.replay(mockTasks, store);
|
||||
|
||||
adminClient.updateEndOffsets(Collections.singletonMap(tp, 100L));
|
||||
|
||||
|
@ -256,15 +232,16 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
|
|||
public void shouldSupportUnregisterChangelogBeforeCompletion() {
|
||||
setupStateManagerMock();
|
||||
setupStoreMetadata();
|
||||
setupStore();
|
||||
@SuppressWarnings("unchecked")
|
||||
final Map<TaskId, Task> mockTasks = mock(Map.class);
|
||||
EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes();
|
||||
EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes();
|
||||
when(mockTasks.get(null)).thenReturn(mock(Task.class));
|
||||
when(mockTasks.containsKey(null)).thenReturn(true);
|
||||
when(storeMetadata.offset()).thenReturn(9L);
|
||||
if (type == STANDBY) {
|
||||
when(storeMetadata.endOffset()).thenReturn(10L);
|
||||
when(stateManager.changelogAsSource(tp)).thenReturn(true);
|
||||
}
|
||||
EasyMock.replay(mockTasks, store);
|
||||
|
||||
adminClient.updateEndOffsets(Collections.singletonMap(tp, 100L));
|
||||
|
||||
|
@ -307,15 +284,16 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
|
|||
public void shouldSupportUnregisterChangelogAfterCompletion() {
|
||||
setupStateManagerMock();
|
||||
setupStoreMetadata();
|
||||
setupStore();
|
||||
@SuppressWarnings("unchecked")
|
||||
final Map<TaskId, Task> mockTasks = mock(Map.class);
|
||||
EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes();
|
||||
EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes();
|
||||
when(mockTasks.get(null)).thenReturn(mock(Task.class));
|
||||
when(mockTasks.containsKey(null)).thenReturn(true);
|
||||
when(storeMetadata.offset()).thenReturn(9L);
|
||||
if (type == STANDBY) {
|
||||
when(storeMetadata.endOffset()).thenReturn(10L);
|
||||
when(stateManager.changelogAsSource(tp)).thenReturn(true);
|
||||
}
|
||||
EasyMock.replay(mockTasks, store);
|
||||
|
||||
adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L));
|
||||
|
||||
|
@ -364,11 +342,12 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
|
|||
public void shouldInitializeChangelogAndCheckForCompletion() {
|
||||
setupStateManagerMock();
|
||||
setupStoreMetadata();
|
||||
setupStore();
|
||||
@SuppressWarnings("unchecked")
|
||||
final Map<TaskId, Task> mockTasks = mock(Map.class);
|
||||
EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes();
|
||||
EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes();
|
||||
when(mockTasks.get(null)).thenReturn(mock(Task.class));
|
||||
when(mockTasks.containsKey(null)).thenReturn(true);
|
||||
when(storeMetadata.offset()).thenReturn(9L);
|
||||
EasyMock.replay(mockTasks, store);
|
||||
|
||||
adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L));
|
||||
|
||||
|
@ -407,11 +386,11 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
|
|||
if (type == ACTIVE) {
|
||||
setupStateManagerMock();
|
||||
setupStoreMetadata();
|
||||
@SuppressWarnings("unchecked")
|
||||
final Map<TaskId, Task> mockTasks = mock(Map.class);
|
||||
EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes();
|
||||
EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes();
|
||||
when(mockTasks.get(null)).thenReturn(mock(Task.class));
|
||||
when(mockTasks.containsKey(null)).thenReturn(true);
|
||||
when(stateManager.changelogOffsets()).thenReturn(singletonMap(tp, 5L));
|
||||
EasyMock.replay(mockTasks, store);
|
||||
|
||||
adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L));
|
||||
|
||||
|
@ -437,6 +416,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
|
|||
public void shouldPollWithRightTimeoutWithStateUpdater() {
|
||||
setupStateManagerMock();
|
||||
setupStoreMetadata();
|
||||
setupStore();
|
||||
shouldPollWithRightTimeout(true);
|
||||
}
|
||||
|
||||
|
@ -444,6 +424,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
|
|||
public void shouldPollWithRightTimeoutWithoutStateUpdater() {
|
||||
setupStateManagerMock();
|
||||
setupStoreMetadata();
|
||||
setupStore();
|
||||
shouldPollWithRightTimeout(false);
|
||||
}
|
||||
|
||||
|
@ -457,6 +438,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
|
|||
public void shouldPollWithRightTimeoutWithStateUpdaterDefault() {
|
||||
setupStateManagerMock();
|
||||
setupStoreMetadata();
|
||||
setupStore();
|
||||
final Properties properties = new Properties();
|
||||
shouldPollWithRightTimeout(properties);
|
||||
}
|
||||
|
@ -466,7 +448,6 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
|
|||
|
||||
when(storeMetadata.offset()).thenReturn(null).thenReturn(9L);
|
||||
when(stateManager.taskId()).thenReturn(taskId);
|
||||
EasyMock.replay(store);
|
||||
|
||||
consumer.updateBeginningOffsets(Collections.singletonMap(tp, 5L));
|
||||
adminClient.updateEndOffsets(Collections.singletonMap(tp, 11L));
|
||||
|
@ -499,6 +480,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
|
|||
public void shouldRestoreFromPositionAndCheckForCompletion() {
|
||||
setupStateManagerMock();
|
||||
setupStoreMetadata();
|
||||
setupStore();
|
||||
final TaskId taskId = new TaskId(0, 0);
|
||||
|
||||
when(storeMetadata.offset()).thenReturn(5L);
|
||||
|
@ -506,7 +488,6 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
|
|||
when(storeMetadata.endOffset()).thenReturn(10L);
|
||||
}
|
||||
when(stateManager.taskId()).thenReturn(taskId);
|
||||
EasyMock.replay(store);
|
||||
|
||||
adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L));
|
||||
|
||||
|
@ -572,6 +553,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
|
|||
public void shouldRestoreFromBeginningAndCheckCompletion() {
|
||||
setupStateManagerMock();
|
||||
setupStoreMetadata();
|
||||
setupStore();
|
||||
final TaskId taskId = new TaskId(0, 0);
|
||||
|
||||
if (type == STANDBY && logContext.logger(StoreChangelogReader.class).isDebugEnabled()) {
|
||||
|
@ -581,7 +563,6 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
|
|||
when(storeMetadata.offset()).thenReturn(null).thenReturn(9L);
|
||||
}
|
||||
when(stateManager.taskId()).thenReturn(taskId);
|
||||
EasyMock.replay(store);
|
||||
|
||||
consumer.updateBeginningOffsets(Collections.singletonMap(tp, 5L));
|
||||
adminClient.updateEndOffsets(Collections.singletonMap(tp, 11L));
|
||||
|
@ -653,11 +634,12 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
|
|||
public void shouldCheckCompletionIfPositionLargerThanEndOffset() {
|
||||
setupActiveStateManager();
|
||||
setupStoreMetadata();
|
||||
setupStore();
|
||||
@SuppressWarnings("unchecked")
|
||||
final Map<TaskId, Task> mockTasks = mock(Map.class);
|
||||
EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes();
|
||||
EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes();
|
||||
when(mockTasks.get(null)).thenReturn(mock(Task.class));
|
||||
when(mockTasks.containsKey(null)).thenReturn(true);
|
||||
when(storeMetadata.offset()).thenReturn(5L);
|
||||
EasyMock.replay(mockTasks, store);
|
||||
|
||||
adminClient.updateEndOffsets(Collections.singletonMap(tp, 0L));
|
||||
|
||||
|
@ -683,18 +665,13 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
|
|||
public void shouldRequestPositionAndHandleTimeoutException() {
|
||||
setupActiveStateManager();
|
||||
setupStoreMetadata();
|
||||
setupStore();
|
||||
final TaskId taskId = new TaskId(0, 0);
|
||||
|
||||
final Task mockTask = mock(Task.class);
|
||||
mockTask.clearTaskTimeout();
|
||||
mockTask.maybeInitTaskTimeoutOrThrow(anyLong(), anyObject());
|
||||
EasyMock.expectLastCall();
|
||||
mockTask.recordRestoration(anyObject(), anyLong(), anyBoolean());
|
||||
EasyMock.expectLastCall();
|
||||
when(storeMetadata.offset()).thenReturn(10L);
|
||||
when(activeStateManager.changelogOffsets()).thenReturn(singletonMap(tp, 10L));
|
||||
when(activeStateManager.taskId()).thenReturn(taskId);
|
||||
EasyMock.replay(mockTask, store);
|
||||
|
||||
final AtomicBoolean clearException = new AtomicBoolean(false);
|
||||
final MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) {
|
||||
|
@ -719,30 +696,29 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
|
|||
assertEquals(StoreChangelogReader.ChangelogState.RESTORING, changelogReader.changelogMetadata(tp).state());
|
||||
assertTrue(changelogReader.completedChangelogs().isEmpty());
|
||||
assertEquals(10L, (long) changelogReader.changelogMetadata(tp).endOffset());
|
||||
verify(mockTask);
|
||||
Mockito.verify(mockTask).clearTaskTimeout();
|
||||
Mockito.verify(mockTask).maybeInitTaskTimeoutOrThrow(anyLong(), any());
|
||||
Mockito.verify(mockTask).recordRestoration(any(), anyLong(), anyBoolean());
|
||||
|
||||
clearException.set(true);
|
||||
resetToDefault(mockTask);
|
||||
mockTask.clearTaskTimeout();
|
||||
EasyMock.expectLastCall();
|
||||
EasyMock.replay(mockTask);
|
||||
Mockito.reset(mockTask);
|
||||
changelogReader.restore(Collections.singletonMap(taskId, mockTask));
|
||||
|
||||
assertEquals(StoreChangelogReader.ChangelogState.COMPLETED, changelogReader.changelogMetadata(tp).state());
|
||||
assertEquals(10L, (long) changelogReader.changelogMetadata(tp).endOffset());
|
||||
assertEquals(Collections.singleton(tp), changelogReader.completedChangelogs());
|
||||
assertEquals(10L, consumer.position(tp));
|
||||
verify(mockTask);
|
||||
Mockito.verify(mockTask).clearTaskTimeout();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldThrowIfPositionFail() {
|
||||
setupActiveStateManager();
|
||||
setupStoreMetadata();
|
||||
setupStore();
|
||||
final TaskId taskId = new TaskId(0, 0);
|
||||
when(activeStateManager.taskId()).thenReturn(taskId);
|
||||
when(storeMetadata.offset()).thenReturn(10L);
|
||||
EasyMock.replay(store);
|
||||
|
||||
final MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) {
|
||||
@Override
|
||||
|
@ -769,16 +745,14 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
|
|||
public void shouldRequestEndOffsetsAndHandleTimeoutException() {
|
||||
setupActiveStateManager();
|
||||
setupStoreMetadata();
|
||||
setupStore();
|
||||
final TaskId taskId = new TaskId(0, 0);
|
||||
|
||||
final Task mockTask = niceMock(Task.class);
|
||||
mockTask.maybeInitTaskTimeoutOrThrow(anyLong(), anyObject());
|
||||
EasyMock.expectLastCall();
|
||||
final Task mockTask = mock(Task.class);
|
||||
|
||||
when(storeMetadata.offset()).thenReturn(5L);
|
||||
when(activeStateManager.changelogOffsets()).thenReturn(singletonMap(tp, 5L));
|
||||
when(activeStateManager.taskId()).thenReturn(taskId);
|
||||
EasyMock.replay(mockTask, store);
|
||||
|
||||
final AtomicBoolean functionCalled = new AtomicBoolean(false);
|
||||
|
||||
|
@ -812,20 +786,17 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
|
|||
assertEquals(StoreChangelogReader.ChangelogState.REGISTERED, changelogReader.changelogMetadata(tp).state());
|
||||
assertNull(changelogReader.changelogMetadata(tp).endOffset());
|
||||
assertTrue(functionCalled.get());
|
||||
verify(mockTask);
|
||||
Mockito.verify(mockTask).maybeInitTaskTimeoutOrThrow(anyLong(), any());
|
||||
|
||||
EasyMock.resetToDefault(mockTask);
|
||||
mockTask.clearTaskTimeout();
|
||||
mockTask.recordRestoration(anyObject(), anyLong(), anyBoolean());
|
||||
EasyMock.expectLastCall();
|
||||
EasyMock.replay(mockTask);
|
||||
Mockito.reset(mockTask);
|
||||
|
||||
changelogReader.restore(Collections.singletonMap(taskId, mockTask));
|
||||
|
||||
assertEquals(StoreChangelogReader.ChangelogState.RESTORING, changelogReader.changelogMetadata(tp).state());
|
||||
assertEquals(10L, (long) changelogReader.changelogMetadata(tp).endOffset());
|
||||
assertEquals(6L, consumer.position(tp));
|
||||
verify(mockTask);
|
||||
Mockito.verify(mockTask).clearTaskTimeout();
|
||||
Mockito.verify(mockTask).recordRestoration(any(), anyLong(), anyBoolean());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -835,7 +806,6 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
|
|||
final TaskId taskId = new TaskId(0, 0);
|
||||
|
||||
when(activeStateManager.taskId()).thenReturn(taskId);
|
||||
EasyMock.replay(store);
|
||||
|
||||
final MockAdminClient adminClient = new MockAdminClient() {
|
||||
@Override
|
||||
|
@ -862,6 +832,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
|
|||
public void shouldRequestCommittedOffsetsAndHandleTimeoutException() {
|
||||
setupStateManagerMock();
|
||||
setupStoreMetadata();
|
||||
setupStore();
|
||||
|
||||
final TaskId taskId = new TaskId(0, 0);
|
||||
|
||||
|
@ -869,13 +840,10 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
|
|||
if (type == ACTIVE) {
|
||||
mockTask.clearTaskTimeout();
|
||||
}
|
||||
mockTask.maybeInitTaskTimeoutOrThrow(anyLong(), anyObject());
|
||||
EasyMock.expectLastCall();
|
||||
|
||||
when(stateManager.changelogAsSource(tp)).thenReturn(true);
|
||||
when(storeMetadata.offset()).thenReturn(5L);
|
||||
when(stateManager.taskId()).thenReturn(taskId);
|
||||
EasyMock.replay(mockTask, store);
|
||||
|
||||
final AtomicBoolean functionCalled = new AtomicBoolean(false);
|
||||
final MockAdminClient adminClient = new MockAdminClient() {
|
||||
|
@ -911,24 +879,19 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
|
|||
assertEquals(0L, (long) changelogReader.changelogMetadata(tp).endOffset());
|
||||
}
|
||||
assertTrue(functionCalled.get());
|
||||
verify(mockTask);
|
||||
Mockito.verify(mockTask).maybeInitTaskTimeoutOrThrow(anyLong(), any());
|
||||
|
||||
resetToDefault(mockTask);
|
||||
if (type == ACTIVE) {
|
||||
mockTask.clearTaskTimeout();
|
||||
mockTask.clearTaskTimeout();
|
||||
expectLastCall();
|
||||
mockTask.recordRestoration(anyObject(), anyLong(), anyBoolean());
|
||||
expectLastCall();
|
||||
}
|
||||
replay(mockTask);
|
||||
Mockito.reset(mockTask);
|
||||
|
||||
changelogReader.restore(Collections.singletonMap(taskId, mockTask));
|
||||
|
||||
assertEquals(StoreChangelogReader.ChangelogState.RESTORING, changelogReader.changelogMetadata(tp).state());
|
||||
assertEquals(type == ACTIVE ? 10L : 0L, (long) changelogReader.changelogMetadata(tp).endOffset());
|
||||
assertEquals(6L, consumer.position(tp));
|
||||
verify(mockTask);
|
||||
if (type == ACTIVE) {
|
||||
Mockito.verify(mockTask, times(2)).clearTaskTimeout();
|
||||
Mockito.verify(mockTask).recordRestoration(any(), anyLong(), anyBoolean());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -940,7 +903,6 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
|
|||
|
||||
when(stateManager.taskId()).thenReturn(taskId);
|
||||
when(stateManager.changelogAsSource(tp)).thenReturn(true);
|
||||
EasyMock.replay(store);
|
||||
|
||||
final MockAdminClient adminClient = new MockAdminClient() {
|
||||
@Override
|
||||
|
@ -964,8 +926,6 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
|
|||
|
||||
@Test
|
||||
public void shouldThrowIfUnsubscribeFail() {
|
||||
EasyMock.replay(store);
|
||||
|
||||
final MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) {
|
||||
@Override
|
||||
public void unsubscribe() {
|
||||
|
@ -983,12 +943,13 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
|
|||
public void shouldOnlyRestoreStandbyChangelogInUpdateStandbyState() {
|
||||
setupStandbyStateManager();
|
||||
setupStoreMetadata();
|
||||
setupStore();
|
||||
@SuppressWarnings("unchecked")
|
||||
final Map<TaskId, Task> mockTasks = mock(Map.class);
|
||||
EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes();
|
||||
EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes();
|
||||
when(mockTasks.get(null)).thenReturn(mock(Task.class));
|
||||
when(mockTasks.containsKey(null)).thenReturn(true);
|
||||
when(storeMetadata.offset()).thenReturn(3L);
|
||||
when(storeMetadata.endOffset()).thenReturn(20L);
|
||||
EasyMock.replay(mockTasks, store);
|
||||
|
||||
consumer.updateBeginningOffsets(Collections.singletonMap(tp, 0L));
|
||||
changelogReader.register(tp, standbyStateManager);
|
||||
|
@ -1026,13 +987,14 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
|
|||
public void shouldNotUpdateLimitForNonSourceStandbyChangelog() {
|
||||
setupStandbyStateManager();
|
||||
setupStoreMetadata();
|
||||
setupStore();
|
||||
@SuppressWarnings("unchecked")
|
||||
final Map<TaskId, Task> mockTasks = mock(Map.class);
|
||||
EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes();
|
||||
EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes();
|
||||
when(mockTasks.get(null)).thenReturn(mock(Task.class));
|
||||
when(mockTasks.containsKey(null)).thenReturn(true);
|
||||
when(storeMetadata.offset()).thenReturn(3L);
|
||||
when(storeMetadata.endOffset()).thenReturn(20L);
|
||||
when(standbyStateManager.changelogAsSource(tp)).thenReturn(false);
|
||||
EasyMock.replay(mockTasks, store);
|
||||
|
||||
final MockAdminClient adminClient = new MockAdminClient() {
|
||||
@Override
|
||||
|
@ -1084,13 +1046,14 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
|
|||
public void shouldRestoreToLimitInStandbyState() {
|
||||
setupStandbyStateManager();
|
||||
setupStoreMetadata();
|
||||
setupStore();
|
||||
@SuppressWarnings("unchecked")
|
||||
final Map<TaskId, Task> mockTasks = mock(Map.class);
|
||||
EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes();
|
||||
EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes();
|
||||
when(mockTasks.get(null)).thenReturn(mock(Task.class));
|
||||
when(mockTasks.containsKey(null)).thenReturn(true);
|
||||
when(standbyStateManager.changelogAsSource(tp)).thenReturn(true);
|
||||
when(storeMetadata.offset()).thenReturn(3L);
|
||||
when(storeMetadata.endOffset()).thenReturn(20L);
|
||||
EasyMock.replay(mockTasks, store);
|
||||
|
||||
final long now = time.milliseconds();
|
||||
final Properties properties = new Properties();
|
||||
|
@ -1201,16 +1164,18 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
|
|||
public void shouldRestoreMultipleChangelogs() {
|
||||
setupActiveStateManager();
|
||||
setupStoreMetadata();
|
||||
setupStore();
|
||||
@SuppressWarnings("unchecked")
|
||||
final Map<TaskId, Task> mockTasks = mock(Map.class);
|
||||
EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes();
|
||||
EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes();
|
||||
EasyMock.expect(storeMetadataOne.changelogPartition()).andReturn(tp1).anyTimes();
|
||||
EasyMock.expect(storeMetadataOne.store()).andReturn(store).anyTimes();
|
||||
EasyMock.expect(storeMetadataTwo.changelogPartition()).andReturn(tp2).anyTimes();
|
||||
EasyMock.expect(storeMetadataTwo.store()).andReturn(store).anyTimes();
|
||||
when(mockTasks.get(null)).thenReturn(mock(Task.class));
|
||||
when(mockTasks.containsKey(null)).thenReturn(true);
|
||||
when(storeMetadataOne.changelogPartition()).thenReturn(tp1);
|
||||
when(storeMetadataOne.store()).thenReturn(store);
|
||||
when(storeMetadataTwo.changelogPartition()).thenReturn(tp2);
|
||||
when(storeMetadataTwo.store()).thenReturn(store);
|
||||
when(storeMetadata.offset()).thenReturn(0L);
|
||||
EasyMock.expect(storeMetadataOne.offset()).andReturn(0L).anyTimes();
|
||||
EasyMock.expect(storeMetadataTwo.offset()).andReturn(0L).anyTimes();
|
||||
when(storeMetadataOne.offset()).thenReturn(0L);
|
||||
when(storeMetadataTwo.offset()).thenReturn(0L);
|
||||
when(activeStateManager.storeMetadata(tp1)).thenReturn(storeMetadataOne);
|
||||
when(activeStateManager.storeMetadata(tp2)).thenReturn(storeMetadataTwo);
|
||||
when(activeStateManager.changelogOffsets()).thenReturn(mkMap(
|
||||
|
@ -1218,7 +1183,6 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
|
|||
mkEntry(tp1, 5L),
|
||||
mkEntry(tp2, 5L)
|
||||
));
|
||||
EasyMock.replay(mockTasks, store, storeMetadataOne, storeMetadataTwo);
|
||||
|
||||
setupConsumer(10, tp);
|
||||
setupConsumer(5, tp1);
|
||||
|
@ -1252,21 +1216,21 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
|
|||
public void shouldTransitState() {
|
||||
setupActiveStateManager();
|
||||
setupStoreMetadata();
|
||||
setupStore();
|
||||
when(standbyStateManager.taskType()).thenReturn(STANDBY);
|
||||
final TaskId taskId = new TaskId(0, 0);
|
||||
EasyMock.expect(storeMetadataOne.changelogPartition()).andReturn(tp1).anyTimes();
|
||||
EasyMock.expect(storeMetadataOne.store()).andReturn(store).anyTimes();
|
||||
EasyMock.expect(storeMetadataTwo.changelogPartition()).andReturn(tp2).anyTimes();
|
||||
EasyMock.expect(storeMetadataTwo.store()).andReturn(store).anyTimes();
|
||||
when(storeMetadataOne.changelogPartition()).thenReturn(tp1);
|
||||
when(storeMetadataOne.store()).thenReturn(store);
|
||||
when(storeMetadataTwo.changelogPartition()).thenReturn(tp2);
|
||||
when(storeMetadataTwo.store()).thenReturn(store);
|
||||
when(storeMetadata.offset()).thenReturn(5L);
|
||||
EasyMock.expect(storeMetadataOne.offset()).andReturn(5L).anyTimes();
|
||||
EasyMock.expect(storeMetadataTwo.offset()).andReturn(5L).anyTimes();
|
||||
when(storeMetadataOne.offset()).thenReturn(5L);
|
||||
when(storeMetadataTwo.offset()).thenReturn(5L);
|
||||
when(standbyStateManager.storeMetadata(tp1)).thenReturn(storeMetadataOne);
|
||||
when(standbyStateManager.storeMetadata(tp2)).thenReturn(storeMetadataTwo);
|
||||
when(activeStateManager.changelogOffsets()).thenReturn(singletonMap(tp, 5L));
|
||||
when(activeStateManager.taskId()).thenReturn(taskId);
|
||||
when(standbyStateManager.taskId()).thenReturn(taskId);
|
||||
EasyMock.replay(store, storeMetadataOne, storeMetadataTwo);
|
||||
|
||||
adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L));
|
||||
adminClient.updateEndOffsets(Collections.singletonMap(tp1, 10L));
|
||||
|
@ -1335,9 +1299,6 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
|
|||
when(standbyStateManager.taskType()).thenReturn(STANDBY);
|
||||
final StoreChangelogReader changelogReader = new StoreChangelogReader(time, config, logContext, adminClient, consumer, callback, standbyListener);
|
||||
when(standbyStateManager.storeMetadata(tp1)).thenReturn(storeMetadataOne);
|
||||
EasyMock.expect(storeMetadataOne.changelogPartition()).andReturn(tp1).anyTimes();
|
||||
EasyMock.expect(storeMetadataOne.store()).andReturn(store).anyTimes();
|
||||
EasyMock.replay(store, storeMetadataOne);
|
||||
changelogReader.register(tp1, standbyStateManager);
|
||||
changelogReader.transitToUpdateStandby();
|
||||
|
||||
|
@ -1350,11 +1311,11 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
|
|||
public void shouldThrowIfRestoreCallbackThrows() {
|
||||
setupActiveStateManager();
|
||||
setupStoreMetadata();
|
||||
setupStore();
|
||||
final TaskId taskId = new TaskId(0, 0);
|
||||
|
||||
when(storeMetadata.offset()).thenReturn(5L);
|
||||
when(activeStateManager.taskId()).thenReturn(taskId);
|
||||
EasyMock.replay(store);
|
||||
|
||||
adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L));
|
||||
|
||||
|
|
Loading…
Reference in New Issue