diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java index 90b17ded1d1..70cd401fb5e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java @@ -53,7 +53,6 @@ import org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas.KeyFirs import org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas.TimeFirstWindowKeySchema; import org.apache.kafka.test.InternalMockProcessorContext; import org.apache.kafka.test.TestUtils; -import org.easymock.EasyMock; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -90,6 +89,11 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.times; @RunWith(Parameterized.class) public class TimeOrderedCachingPersistentWindowStoreTest { @@ -144,45 +148,34 @@ public class TimeOrderedCachingPersistentWindowStoreTest { @SuppressWarnings("deprecation") @Test public void shouldDelegateDeprecatedInit() { - final RocksDBTimeOrderedWindowStore inner = EasyMock.mock(RocksDBTimeOrderedWindowStore.class); - EasyMock.expect(inner.hasIndex()).andReturn(hasIndex); - EasyMock.replay(inner); - final TimeOrderedCachingWindowStore outer = new TimeOrderedCachingWindowStore(inner, WINDOW_SIZE, SEGMENT_INTERVAL); + final RocksDBTimeOrderedWindowStore inner = mock(RocksDBTimeOrderedWindowStore.class); + when(inner.hasIndex()).thenReturn(hasIndex); - EasyMock.reset(inner); - EasyMock.expect(inner.name()).andStubReturn("store"); - inner.init((org.apache.kafka.streams.processor.ProcessorContext) context, outer); - EasyMock.expectLastCall(); - EasyMock.replay(inner); + final TimeOrderedCachingWindowStore outer = new TimeOrderedCachingWindowStore(inner, WINDOW_SIZE, SEGMENT_INTERVAL); outer.init((org.apache.kafka.streams.processor.ProcessorContext) context, outer); - EasyMock.verify(inner); + + verify(inner).init((org.apache.kafka.streams.processor.ProcessorContext) context, outer); } @Test public void shouldDelegateInit() { - final RocksDBTimeOrderedWindowStore inner = EasyMock.mock(RocksDBTimeOrderedWindowStore.class); - EasyMock.expect(inner.hasIndex()).andReturn(hasIndex); - EasyMock.replay(inner); - final TimeOrderedCachingWindowStore outer = new TimeOrderedCachingWindowStore(inner, WINDOW_SIZE, SEGMENT_INTERVAL); + final RocksDBTimeOrderedWindowStore inner = mock(RocksDBTimeOrderedWindowStore.class); + when(inner.hasIndex()).thenReturn(hasIndex); - EasyMock.reset(inner); - EasyMock.expect(inner.name()).andStubReturn("store"); - inner.init((StateStoreContext) context, outer); - EasyMock.expectLastCall(); - EasyMock.replay(inner); + final TimeOrderedCachingWindowStore outer = new TimeOrderedCachingWindowStore(inner, WINDOW_SIZE, SEGMENT_INTERVAL); outer.init((StateStoreContext) context, outer); - EasyMock.verify(inner); + verify(inner, times(1)).init((StateStoreContext) context, outer); } @Test public void shouldThrowIfWrongStore() { - final RocksDBTimestampedWindowStore innerWrong = EasyMock.mock(RocksDBTimestampedWindowStore.class); + final RocksDBTimestampedWindowStore innerWrong = mock(RocksDBTimestampedWindowStore.class); final Exception e = assertThrows(IllegalArgumentException.class, () -> new TimeOrderedCachingWindowStore(innerWrong, WINDOW_SIZE, SEGMENT_INTERVAL)); assertThat(e.getMessage(), containsString("TimeOrderedCachingWindowStore only supports RocksDBTimeOrderedWindowStore backed store")); - final RocksDBTimeOrderedWindowStore inner = EasyMock.mock(RocksDBTimeOrderedWindowStore.class); + final RocksDBTimeOrderedWindowStore inner = mock(RocksDBTimeOrderedWindowStore.class); // Nothing happens new TimeOrderedCachingWindowStore(inner, WINDOW_SIZE, SEGMENT_INTERVAL); } @@ -1159,58 +1152,34 @@ public class TimeOrderedCachingPersistentWindowStoreTest { @Test public void shouldCloseCacheAndWrappedStoreAfterErrorDuringCacheFlush() { setUpCloseTests(); - EasyMock.reset(cache); - cache.flush(CACHE_NAMESPACE); - EasyMock.expectLastCall().andThrow(new RuntimeException("Simulating an error on flush")); - cache.close(CACHE_NAMESPACE); - EasyMock.replay(cache); - EasyMock.reset(underlyingStore); - underlyingStore.close(); - EasyMock.replay(underlyingStore); - + doThrow(new RuntimeException("Simulating an error on flush2")).doNothing() + .when(cache).flush(CACHE_NAMESPACE); assertThrows(RuntimeException.class, cachingStore::close); - EasyMock.verify(cache, underlyingStore); + verifyAndTearDownCloseTests(); } @Test public void shouldCloseWrappedStoreAfterErrorDuringCacheClose() { setUpCloseTests(); - EasyMock.reset(cache); - cache.flush(CACHE_NAMESPACE); - cache.close(CACHE_NAMESPACE); - EasyMock.expectLastCall().andThrow(new RuntimeException("Simulating an error on close")); - EasyMock.replay(cache); - EasyMock.reset(underlyingStore); - underlyingStore.close(); - EasyMock.replay(underlyingStore); - + doThrow(new RuntimeException("Simulating an error on close")).doNothing().when(cache).close(CACHE_NAMESPACE); assertThrows(RuntimeException.class, cachingStore::close); - EasyMock.verify(cache, underlyingStore); + verifyAndTearDownCloseTests(); } @Test public void shouldCloseCacheAfterErrorDuringStateStoreClose() { setUpCloseTests(); - EasyMock.reset(cache); - cache.flush(CACHE_NAMESPACE); - cache.close(CACHE_NAMESPACE); - EasyMock.replay(cache); - EasyMock.reset(underlyingStore); - underlyingStore.close(); - EasyMock.expectLastCall().andThrow(new RuntimeException("Simulating an error on close")); - EasyMock.replay(underlyingStore); - + doThrow(new RuntimeException("Simulating an error on close")).doNothing().when(underlyingStore).close(); assertThrows(RuntimeException.class, cachingStore::close); - EasyMock.verify(cache, underlyingStore); + verifyAndTearDownCloseTests(); } private void setUpCloseTests() { - underlyingStore = EasyMock.createNiceMock(RocksDBTimeOrderedWindowStore.class); - EasyMock.expect(underlyingStore.name()).andStubReturn("store-name"); - EasyMock.expect(underlyingStore.isOpen()).andStubReturn(true); - EasyMock.replay(underlyingStore); + underlyingStore = mock(RocksDBTimeOrderedWindowStore.class); + when(underlyingStore.name()).thenReturn("store-name"); + when(underlyingStore.isOpen()).thenReturn(true); cachingStore = new TimeOrderedCachingWindowStore(underlyingStore, WINDOW_SIZE, SEGMENT_INTERVAL); - cache = EasyMock.createNiceMock(ThreadCache.class); + cache = mock(ThreadCache.class); context = new InternalMockProcessorContext<>(TestUtils.tempDirectory(), null, null, null, cache); context.setRecordContext(new ProcessorRecordContext(10, 0, 0, TOPIC, new RecordHeaders())); cachingStore.init((StateStoreContext) context, cachingStore); @@ -1235,4 +1204,10 @@ public class TimeOrderedCachingPersistentWindowStoreTest { return i; } + private void verifyAndTearDownCloseTests() { + verify(underlyingStore).close(); + verify(cache).flush(CACHE_NAMESPACE); + verify(cache).close(CACHE_NAMESPACE); + } + }