KAFKA-14133: Migrate stateManager mock in StoreChangelogReaderTest to Mockito (#14929)

Reviewers: Divij Vaidya <diviv@amazon.com>
This commit is contained in:
Christo Lolov 2024-01-02 14:36:52 +02:00 committed by GitHub
parent 3c6b9e440b
commit 65a28246ad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 54 additions and 37 deletions

View File

@ -55,6 +55,10 @@ import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.Parameterized; import org.junit.runners.Parameterized;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import java.time.Duration; import java.time.Duration;
import java.util.Collections; import java.util.Collections;
@ -93,7 +97,9 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.when;
@MockitoSettings(strictness = Strictness.STRICT_STUBS)
@RunWith(Parameterized.class) @RunWith(Parameterized.class)
@SuppressWarnings("this-escape") @SuppressWarnings("this-escape")
public class StoreChangelogReaderTest extends EasyMockSupport { public class StoreChangelogReaderTest extends EasyMockSupport {
@ -101,7 +107,10 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
@Rule @Rule
public EasyMockRule rule = new EasyMockRule(this); public EasyMockRule rule = new EasyMockRule(this);
@Mock(type = MockType.NICE) @Rule
public final MockitoRule mockitoRule = MockitoJUnit.rule().strictness(Strictness.STRICT_STUBS);
@org.mockito.Mock
private ProcessorStateManager stateManager; private ProcessorStateManager stateManager;
@Mock(type = MockType.NICE) @Mock(type = MockType.NICE)
private ProcessorStateManager activeStateManager; private ProcessorStateManager activeStateManager;
@ -158,10 +167,13 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
private final StoreChangelogReader changelogReader = private final StoreChangelogReader changelogReader =
new StoreChangelogReader(time, config, logContext, adminClient, consumer, callback, standbyListener); new StoreChangelogReader(time, config, logContext, adminClient, consumer, callback, standbyListener);
private void setupStateManagerMock() {
when(stateManager.storeMetadata(tp)).thenReturn(storeMetadata);
when(stateManager.taskType()).thenReturn(type);
}
@Before @Before
public void setUp() { public void setUp() {
EasyMock.expect(stateManager.storeMetadata(tp)).andReturn(storeMetadata).anyTimes();
EasyMock.expect(stateManager.taskType()).andReturn(type).anyTimes();
EasyMock.expect(activeStateManager.storeMetadata(tp)).andReturn(storeMetadata).anyTimes(); EasyMock.expect(activeStateManager.storeMetadata(tp)).andReturn(storeMetadata).anyTimes();
EasyMock.expect(activeStateManager.taskType()).andReturn(ACTIVE).anyTimes(); EasyMock.expect(activeStateManager.taskType()).andReturn(ACTIVE).anyTimes();
EasyMock.expect(standbyStateManager.storeMetadata(tp)).andReturn(storeMetadata).anyTimes(); EasyMock.expect(standbyStateManager.storeMetadata(tp)).andReturn(storeMetadata).anyTimes();
@ -175,7 +187,6 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
@After @After
public void tearDown() { public void tearDown() {
EasyMock.reset( EasyMock.reset(
stateManager,
activeStateManager, activeStateManager,
standbyStateManager, standbyStateManager,
storeMetadata, storeMetadata,
@ -187,7 +198,8 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
@Test @Test
public void shouldNotRegisterSameStoreMultipleTimes() { public void shouldNotRegisterSameStoreMultipleTimes() {
EasyMock.replay(stateManager, storeMetadata); setupStateManagerMock();
EasyMock.replay(storeMetadata);
changelogReader.register(tp, stateManager); changelogReader.register(tp, stateManager);
@ -200,7 +212,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
@Test @Test
public void shouldNotRegisterStoreWithoutMetadata() { public void shouldNotRegisterStoreWithoutMetadata() {
EasyMock.replay(stateManager, storeMetadata); EasyMock.replay(storeMetadata);
assertThrows(IllegalStateException.class, assertThrows(IllegalStateException.class,
() -> changelogReader.register(new TopicPartition("ChangelogWithoutStoreMetadata", 0), stateManager)); () -> changelogReader.register(new TopicPartition("ChangelogWithoutStoreMetadata", 0), stateManager));
@ -208,11 +220,11 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
@Test @Test
public void shouldSupportUnregisterChangelogBeforeInitialization() { public void shouldSupportUnregisterChangelogBeforeInitialization() {
setupStateManagerMock();
final Map<TaskId, Task> mockTasks = mock(Map.class); final Map<TaskId, Task> mockTasks = mock(Map.class);
EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes(); EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes();
EasyMock.expect(storeMetadata.offset()).andReturn(9L).anyTimes(); EasyMock.expect(storeMetadata.offset()).andReturn(9L).anyTimes();
EasyMock.expect(stateManager.changelogOffsets()).andReturn(singletonMap(tp, 10L)); EasyMock.replay(mockTasks, storeMetadata, store);
EasyMock.replay(mockTasks, stateManager, storeMetadata, store);
adminClient.updateEndOffsets(Collections.singletonMap(tp, 100L)); adminClient.updateEndOffsets(Collections.singletonMap(tp, 100L));
@ -241,16 +253,17 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
@Test @Test
public void shouldSupportUnregisterChangelogBeforeCompletion() { public void shouldSupportUnregisterChangelogBeforeCompletion() {
setupStateManagerMock();
final Map<TaskId, Task> mockTasks = mock(Map.class); final Map<TaskId, Task> mockTasks = mock(Map.class);
EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes(); EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes();
EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes(); EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes();
EasyMock.expect(storeMetadata.offset()).andReturn(9L).anyTimes(); EasyMock.expect(storeMetadata.offset()).andReturn(9L).anyTimes();
EasyMock.expect(storeMetadata.endOffset()).andReturn(10L).anyTimes(); EasyMock.expect(storeMetadata.endOffset()).andReturn(10L).anyTimes();
EasyMock.expect(stateManager.changelogOffsets()).andReturn(singletonMap(tp, 10L));
if (type == STANDBY) { if (type == STANDBY) {
EasyMock.expect(stateManager.changelogAsSource(tp)).andReturn(true).anyTimes(); when(stateManager.changelogAsSource(tp)).thenReturn(true);
} }
EasyMock.replay(mockTasks, stateManager, storeMetadata, store); EasyMock.replay(mockTasks, storeMetadata, store);
adminClient.updateEndOffsets(Collections.singletonMap(tp, 100L)); adminClient.updateEndOffsets(Collections.singletonMap(tp, 100L));
final StoreChangelogReader changelogReader = final StoreChangelogReader changelogReader =
@ -290,16 +303,16 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
@Test @Test
public void shouldSupportUnregisterChangelogAfterCompletion() { public void shouldSupportUnregisterChangelogAfterCompletion() {
setupStateManagerMock();
final Map<TaskId, Task> mockTasks = mock(Map.class); final Map<TaskId, Task> mockTasks = mock(Map.class);
EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes(); EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes();
EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes(); EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes();
EasyMock.expect(storeMetadata.offset()).andReturn(9L).anyTimes(); EasyMock.expect(storeMetadata.offset()).andReturn(9L).anyTimes();
EasyMock.expect(storeMetadata.endOffset()).andReturn(10L).anyTimes(); EasyMock.expect(storeMetadata.endOffset()).andReturn(10L).anyTimes();
EasyMock.expect(stateManager.changelogOffsets()).andReturn(singletonMap(tp, 10L));
if (type == STANDBY) { if (type == STANDBY) {
EasyMock.expect(stateManager.changelogAsSource(tp)).andReturn(true).anyTimes(); when(stateManager.changelogAsSource(tp)).thenReturn(true);
} }
EasyMock.replay(mockTasks, stateManager, storeMetadata, store); EasyMock.replay(mockTasks, storeMetadata, store);
adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L)); adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L));
@ -346,12 +359,13 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
@Test @Test
public void shouldInitializeChangelogAndCheckForCompletion() { public void shouldInitializeChangelogAndCheckForCompletion() {
setupStateManagerMock();
final Map<TaskId, Task> mockTasks = mock(Map.class); final Map<TaskId, Task> mockTasks = mock(Map.class);
EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes(); EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes();
EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes(); EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes();
EasyMock.expect(storeMetadata.offset()).andReturn(9L).anyTimes(); EasyMock.expect(storeMetadata.offset()).andReturn(9L).anyTimes();
EasyMock.expect(storeMetadata.endOffset()).andReturn(10L).anyTimes(); EasyMock.expect(storeMetadata.endOffset()).andReturn(10L).anyTimes();
EasyMock.replay(mockTasks, stateManager, storeMetadata, store); EasyMock.replay(mockTasks, storeMetadata, store);
adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L)); adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L));
@ -388,11 +402,12 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
public void shouldTriggerRestoreListenerWithOffsetZeroIfPositionThrowsTimeoutException() { public void shouldTriggerRestoreListenerWithOffsetZeroIfPositionThrowsTimeoutException() {
// restore listener is only triggered for active tasks // restore listener is only triggered for active tasks
if (type == ACTIVE) { if (type == ACTIVE) {
setupStateManagerMock();
final Map<TaskId, Task> mockTasks = mock(Map.class); final Map<TaskId, Task> mockTasks = mock(Map.class);
EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes(); EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes();
EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes(); EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes();
EasyMock.expect(stateManager.changelogOffsets()).andReturn(singletonMap(tp, 5L)); when(stateManager.changelogOffsets()).thenReturn(singletonMap(tp, 5L));
EasyMock.replay(mockTasks, stateManager, storeMetadata, store); EasyMock.replay(mockTasks, storeMetadata, store);
adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L)); adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L));
@ -416,11 +431,13 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
@Test @Test
public void shouldPollWithRightTimeoutWithStateUpdater() { public void shouldPollWithRightTimeoutWithStateUpdater() {
setupStateManagerMock();
shouldPollWithRightTimeout(true); shouldPollWithRightTimeout(true);
} }
@Test @Test
public void shouldPollWithRightTimeoutWithoutStateUpdater() { public void shouldPollWithRightTimeoutWithoutStateUpdater() {
setupStateManagerMock();
shouldPollWithRightTimeout(false); shouldPollWithRightTimeout(false);
} }
@ -432,6 +449,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
@Test @Test
public void shouldPollWithRightTimeoutWithStateUpdaterDefault() { public void shouldPollWithRightTimeoutWithStateUpdaterDefault() {
setupStateManagerMock();
final Properties properties = new Properties(); final Properties properties = new Properties();
shouldPollWithRightTimeout(properties); shouldPollWithRightTimeout(properties);
} }
@ -441,9 +459,8 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
EasyMock.expect(storeMetadata.offset()).andReturn(null).andReturn(9L).anyTimes(); EasyMock.expect(storeMetadata.offset()).andReturn(null).andReturn(9L).anyTimes();
EasyMock.expect(storeMetadata.endOffset()).andReturn(10L).anyTimes(); EasyMock.expect(storeMetadata.endOffset()).andReturn(10L).anyTimes();
EasyMock.expect(stateManager.changelogOffsets()).andReturn(singletonMap(tp, 5L)); when(stateManager.taskId()).thenReturn(taskId);
EasyMock.expect(stateManager.taskId()).andReturn(taskId).anyTimes(); EasyMock.replay(storeMetadata, store);
EasyMock.replay(stateManager, storeMetadata, store);
consumer.updateBeginningOffsets(Collections.singletonMap(tp, 5L)); consumer.updateBeginningOffsets(Collections.singletonMap(tp, 5L));
adminClient.updateEndOffsets(Collections.singletonMap(tp, 11L)); adminClient.updateEndOffsets(Collections.singletonMap(tp, 11L));
@ -474,13 +491,13 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
@Test @Test
public void shouldRestoreFromPositionAndCheckForCompletion() { public void shouldRestoreFromPositionAndCheckForCompletion() {
setupStateManagerMock();
final TaskId taskId = new TaskId(0, 0); final TaskId taskId = new TaskId(0, 0);
EasyMock.expect(storeMetadata.offset()).andReturn(5L).anyTimes(); EasyMock.expect(storeMetadata.offset()).andReturn(5L).anyTimes();
EasyMock.expect(storeMetadata.endOffset()).andReturn(10L).anyTimes(); EasyMock.expect(storeMetadata.endOffset()).andReturn(10L).anyTimes();
EasyMock.expect(stateManager.changelogOffsets()).andReturn(singletonMap(tp, 5L)); when(stateManager.taskId()).thenReturn(taskId);
EasyMock.expect(stateManager.taskId()).andReturn(taskId).anyTimes(); EasyMock.replay(storeMetadata, store);
EasyMock.replay(stateManager, storeMetadata, store);
adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L)); adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L));
@ -544,6 +561,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
@Test @Test
public void shouldRestoreFromBeginningAndCheckCompletion() { public void shouldRestoreFromBeginningAndCheckCompletion() {
setupStateManagerMock();
final TaskId taskId = new TaskId(0, 0); final TaskId taskId = new TaskId(0, 0);
if (type == STANDBY && logContext.logger(StoreChangelogReader.class).isDebugEnabled()) { if (type == STANDBY && logContext.logger(StoreChangelogReader.class).isDebugEnabled()) {
@ -553,9 +571,8 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
EasyMock.expect(storeMetadata.offset()).andReturn(null).andReturn(9L).anyTimes(); EasyMock.expect(storeMetadata.offset()).andReturn(null).andReturn(9L).anyTimes();
EasyMock.expect(storeMetadata.endOffset()).andReturn(10L).anyTimes(); EasyMock.expect(storeMetadata.endOffset()).andReturn(10L).anyTimes();
} }
EasyMock.expect(stateManager.changelogOffsets()).andReturn(singletonMap(tp, 5L)); when(stateManager.taskId()).thenReturn(taskId);
EasyMock.expect(stateManager.taskId()).andReturn(taskId).anyTimes(); EasyMock.replay(storeMetadata, store);
EasyMock.replay(stateManager, storeMetadata, store);
consumer.updateBeginningOffsets(Collections.singletonMap(tp, 5L)); consumer.updateBeginningOffsets(Collections.singletonMap(tp, 5L));
adminClient.updateEndOffsets(Collections.singletonMap(tp, 11L)); adminClient.updateEndOffsets(Collections.singletonMap(tp, 11L));
@ -825,6 +842,8 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
@Test @Test
public void shouldRequestCommittedOffsetsAndHandleTimeoutException() { public void shouldRequestCommittedOffsetsAndHandleTimeoutException() {
setupStateManagerMock();
final TaskId taskId = new TaskId(0, 0); final TaskId taskId = new TaskId(0, 0);
final Task mockTask = mock(Task.class); final Task mockTask = mock(Task.class);
@ -834,12 +853,11 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
mockTask.maybeInitTaskTimeoutOrThrow(anyLong(), anyObject()); mockTask.maybeInitTaskTimeoutOrThrow(anyLong(), anyObject());
EasyMock.expectLastCall(); EasyMock.expectLastCall();
EasyMock.expect(stateManager.changelogAsSource(tp)).andReturn(true).anyTimes(); when(stateManager.changelogAsSource(tp)).thenReturn(true);
EasyMock.expect(storeMetadata.offset()).andReturn(5L).anyTimes(); EasyMock.expect(storeMetadata.offset()).andReturn(5L).anyTimes();
EasyMock.expect(storeMetadata.endOffset()).andReturn(10L).anyTimes(); EasyMock.expect(storeMetadata.endOffset()).andReturn(10L).anyTimes();
EasyMock.expect(stateManager.changelogOffsets()).andReturn(singletonMap(tp, 5L)); when(stateManager.taskId()).thenReturn(taskId);
EasyMock.expect(stateManager.taskId()).andReturn(taskId).anyTimes(); EasyMock.replay(mockTask, storeMetadata, store);
EasyMock.replay(mockTask, stateManager, storeMetadata, store);
final AtomicBoolean functionCalled = new AtomicBoolean(false); final AtomicBoolean functionCalled = new AtomicBoolean(false);
final MockAdminClient adminClient = new MockAdminClient() { final MockAdminClient adminClient = new MockAdminClient() {
@ -897,12 +915,14 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
@Test @Test
public void shouldThrowIfCommittedOffsetsFail() { public void shouldThrowIfCommittedOffsetsFail() {
setupStateManagerMock();
final TaskId taskId = new TaskId(0, 0); final TaskId taskId = new TaskId(0, 0);
EasyMock.expect(stateManager.taskId()).andReturn(taskId); when(stateManager.taskId()).thenReturn(taskId);
EasyMock.expect(stateManager.changelogAsSource(tp)).andReturn(true).anyTimes(); when(stateManager.changelogAsSource(tp)).thenReturn(true);
EasyMock.expect(storeMetadata.offset()).andReturn(10L).anyTimes(); EasyMock.expect(storeMetadata.offset()).andReturn(10L).anyTimes();
EasyMock.replay(stateManager, storeMetadata, store); EasyMock.replay(storeMetadata, store);
final MockAdminClient adminClient = new MockAdminClient() { final MockAdminClient adminClient = new MockAdminClient() {
@Override @Override
@ -926,7 +946,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
@Test @Test
public void shouldThrowIfUnsubscribeFail() { public void shouldThrowIfUnsubscribeFail() {
EasyMock.replay(stateManager, storeMetadata, store); EasyMock.replay(storeMetadata, store);
final MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) { final MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) {
@Override @Override
@ -946,7 +966,6 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
final Map<TaskId, Task> mockTasks = mock(Map.class); final Map<TaskId, Task> mockTasks = mock(Map.class);
EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes(); EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes();
EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes(); EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes();
EasyMock.expect(stateManager.taskType()).andReturn(type);
EasyMock.expect(storeMetadata.offset()).andReturn(3L).anyTimes(); EasyMock.expect(storeMetadata.offset()).andReturn(3L).anyTimes();
EasyMock.expect(storeMetadata.endOffset()).andReturn(20L).anyTimes(); EasyMock.expect(storeMetadata.endOffset()).andReturn(20L).anyTimes();
EasyMock.replay(mockTasks, standbyStateManager, storeMetadata, store); EasyMock.replay(mockTasks, standbyStateManager, storeMetadata, store);
@ -989,7 +1008,6 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
//EasyMock.expect(storeMetadata.offset()).andReturn(0L).anyTimes(); //EasyMock.expect(storeMetadata.offset()).andReturn(0L).anyTimes();
EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes(); EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes();
EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes(); EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes();
EasyMock.expect(stateManager.taskType()).andReturn(type);
EasyMock.expect(storeMetadata.offset()).andReturn(3L).anyTimes(); EasyMock.expect(storeMetadata.offset()).andReturn(3L).anyTimes();
EasyMock.expect(storeMetadata.endOffset()).andReturn(20L).anyTimes(); EasyMock.expect(storeMetadata.endOffset()).andReturn(20L).anyTimes();
EasyMock.expect(standbyStateManager.changelogAsSource(tp)).andReturn(false).anyTimes(); EasyMock.expect(standbyStateManager.changelogAsSource(tp)).andReturn(false).anyTimes();
@ -1047,7 +1065,6 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes(); EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes();
EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes(); EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes();
EasyMock.expect(standbyStateManager.changelogAsSource(tp)).andReturn(true).anyTimes(); EasyMock.expect(standbyStateManager.changelogAsSource(tp)).andReturn(true).anyTimes();
EasyMock.expect(stateManager.taskType()).andReturn(type);
EasyMock.expect(storeMetadata.offset()).andReturn(3L).anyTimes(); EasyMock.expect(storeMetadata.offset()).andReturn(3L).anyTimes();
EasyMock.expect(storeMetadata.endOffset()).andReturn(20L).anyTimes(); EasyMock.expect(storeMetadata.endOffset()).andReturn(20L).anyTimes();
EasyMock.replay(mockTasks, standbyStateManager, storeMetadata, store); EasyMock.replay(mockTasks, standbyStateManager, storeMetadata, store);