KAFKA-7438: Migrate to Mockito in TimeOrderedCachingPersistentWindowStoreTest (#12739)

Replaces EasyMock and PowerMock with Mockito in TimeOrderedCachingPersistentWindowStoreTest.

Reviewers: Divij Vaidya <diviv@amazon.com>, Guozhang Wang <wangguoz@gmail.com>, Bruno Cadonna <cadonna@apache.org>
This commit is contained in:
Shekhar Rajak 2023-06-20 17:21:56 +05:30 committed by GitHub
parent cd3c0ab1a3
commit 0e8c436c7d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 33 additions and 58 deletions

View File

@ -53,7 +53,6 @@ import org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas.KeyFirs
import org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas.TimeFirstWindowKeySchema;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -90,6 +89,11 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.times;
@RunWith(Parameterized.class)
public class TimeOrderedCachingPersistentWindowStoreTest {
@ -144,45 +148,34 @@ public class TimeOrderedCachingPersistentWindowStoreTest {
@SuppressWarnings("deprecation")
@Test
public void shouldDelegateDeprecatedInit() {
final RocksDBTimeOrderedWindowStore inner = EasyMock.mock(RocksDBTimeOrderedWindowStore.class);
EasyMock.expect(inner.hasIndex()).andReturn(hasIndex);
EasyMock.replay(inner);
final TimeOrderedCachingWindowStore outer = new TimeOrderedCachingWindowStore(inner, WINDOW_SIZE, SEGMENT_INTERVAL);
final RocksDBTimeOrderedWindowStore inner = mock(RocksDBTimeOrderedWindowStore.class);
when(inner.hasIndex()).thenReturn(hasIndex);
EasyMock.reset(inner);
EasyMock.expect(inner.name()).andStubReturn("store");
inner.init((org.apache.kafka.streams.processor.ProcessorContext) context, outer);
EasyMock.expectLastCall();
EasyMock.replay(inner);
final TimeOrderedCachingWindowStore outer = new TimeOrderedCachingWindowStore(inner, WINDOW_SIZE, SEGMENT_INTERVAL);
outer.init((org.apache.kafka.streams.processor.ProcessorContext) context, outer);
EasyMock.verify(inner);
verify(inner).init((org.apache.kafka.streams.processor.ProcessorContext) context, outer);
}
@Test
public void shouldDelegateInit() {
final RocksDBTimeOrderedWindowStore inner = EasyMock.mock(RocksDBTimeOrderedWindowStore.class);
EasyMock.expect(inner.hasIndex()).andReturn(hasIndex);
EasyMock.replay(inner);
final TimeOrderedCachingWindowStore outer = new TimeOrderedCachingWindowStore(inner, WINDOW_SIZE, SEGMENT_INTERVAL);
final RocksDBTimeOrderedWindowStore inner = mock(RocksDBTimeOrderedWindowStore.class);
when(inner.hasIndex()).thenReturn(hasIndex);
EasyMock.reset(inner);
EasyMock.expect(inner.name()).andStubReturn("store");
inner.init((StateStoreContext) context, outer);
EasyMock.expectLastCall();
EasyMock.replay(inner);
final TimeOrderedCachingWindowStore outer = new TimeOrderedCachingWindowStore(inner, WINDOW_SIZE, SEGMENT_INTERVAL);
outer.init((StateStoreContext) context, outer);
EasyMock.verify(inner);
verify(inner, times(1)).init((StateStoreContext) context, outer);
}
@Test
public void shouldThrowIfWrongStore() {
final RocksDBTimestampedWindowStore innerWrong = EasyMock.mock(RocksDBTimestampedWindowStore.class);
final RocksDBTimestampedWindowStore innerWrong = mock(RocksDBTimestampedWindowStore.class);
final Exception e = assertThrows(IllegalArgumentException.class,
() -> new TimeOrderedCachingWindowStore(innerWrong, WINDOW_SIZE, SEGMENT_INTERVAL));
assertThat(e.getMessage(),
containsString("TimeOrderedCachingWindowStore only supports RocksDBTimeOrderedWindowStore backed store"));
final RocksDBTimeOrderedWindowStore inner = EasyMock.mock(RocksDBTimeOrderedWindowStore.class);
final RocksDBTimeOrderedWindowStore inner = mock(RocksDBTimeOrderedWindowStore.class);
// Nothing happens
new TimeOrderedCachingWindowStore(inner, WINDOW_SIZE, SEGMENT_INTERVAL);
}
@ -1159,58 +1152,34 @@ public class TimeOrderedCachingPersistentWindowStoreTest {
@Test
public void shouldCloseCacheAndWrappedStoreAfterErrorDuringCacheFlush() {
setUpCloseTests();
EasyMock.reset(cache);
cache.flush(CACHE_NAMESPACE);
EasyMock.expectLastCall().andThrow(new RuntimeException("Simulating an error on flush"));
cache.close(CACHE_NAMESPACE);
EasyMock.replay(cache);
EasyMock.reset(underlyingStore);
underlyingStore.close();
EasyMock.replay(underlyingStore);
doThrow(new RuntimeException("Simulating an error on flush2")).doNothing()
.when(cache).flush(CACHE_NAMESPACE);
assertThrows(RuntimeException.class, cachingStore::close);
EasyMock.verify(cache, underlyingStore);
verifyAndTearDownCloseTests();
}
@Test
public void shouldCloseWrappedStoreAfterErrorDuringCacheClose() {
setUpCloseTests();
EasyMock.reset(cache);
cache.flush(CACHE_NAMESPACE);
cache.close(CACHE_NAMESPACE);
EasyMock.expectLastCall().andThrow(new RuntimeException("Simulating an error on close"));
EasyMock.replay(cache);
EasyMock.reset(underlyingStore);
underlyingStore.close();
EasyMock.replay(underlyingStore);
doThrow(new RuntimeException("Simulating an error on close")).doNothing().when(cache).close(CACHE_NAMESPACE);
assertThrows(RuntimeException.class, cachingStore::close);
EasyMock.verify(cache, underlyingStore);
verifyAndTearDownCloseTests();
}
@Test
public void shouldCloseCacheAfterErrorDuringStateStoreClose() {
setUpCloseTests();
EasyMock.reset(cache);
cache.flush(CACHE_NAMESPACE);
cache.close(CACHE_NAMESPACE);
EasyMock.replay(cache);
EasyMock.reset(underlyingStore);
underlyingStore.close();
EasyMock.expectLastCall().andThrow(new RuntimeException("Simulating an error on close"));
EasyMock.replay(underlyingStore);
doThrow(new RuntimeException("Simulating an error on close")).doNothing().when(underlyingStore).close();
assertThrows(RuntimeException.class, cachingStore::close);
EasyMock.verify(cache, underlyingStore);
verifyAndTearDownCloseTests();
}
private void setUpCloseTests() {
underlyingStore = EasyMock.createNiceMock(RocksDBTimeOrderedWindowStore.class);
EasyMock.expect(underlyingStore.name()).andStubReturn("store-name");
EasyMock.expect(underlyingStore.isOpen()).andStubReturn(true);
EasyMock.replay(underlyingStore);
underlyingStore = mock(RocksDBTimeOrderedWindowStore.class);
when(underlyingStore.name()).thenReturn("store-name");
when(underlyingStore.isOpen()).thenReturn(true);
cachingStore = new TimeOrderedCachingWindowStore(underlyingStore, WINDOW_SIZE, SEGMENT_INTERVAL);
cache = EasyMock.createNiceMock(ThreadCache.class);
cache = mock(ThreadCache.class);
context = new InternalMockProcessorContext<>(TestUtils.tempDirectory(), null, null, null, cache);
context.setRecordContext(new ProcessorRecordContext(10, 0, 0, TOPIC, new RecordHeaders()));
cachingStore.init((StateStoreContext) context, cachingStore);
@ -1235,4 +1204,10 @@ public class TimeOrderedCachingPersistentWindowStoreTest {
return i;
}
private void verifyAndTearDownCloseTests() {
verify(underlyingStore).close();
verify(cache).flush(CACHE_NAMESPACE);
verify(cache).close(CACHE_NAMESPACE);
}
}