KAFKA-13036: Replace EasyMock and PowerMock with Mockito for RocksDBMetricsRecorderTest (#12459)

Changes:
- Migrate to Mockito
- Add more assertive checks using verify
- Minor indentation fixes

Reviewers: Dalibor Plavcic <dalibor.os@proton.me>, Bruno Cadonna <cadonna@apache.org>
This commit is contained in:
Divij Vaidya 2022-08-30 19:25:26 +02:00 committed by GitHub
parent fe99262fe2
commit 140faf9f2b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 233 additions and 366 deletions

View File

@ -23,11 +23,12 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.RocksDBMetricContext; import org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.RocksDBMetricContext;
import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PrepareForTest; import org.mockito.MockedStatic;
import org.powermock.modules.junit4.PowerMockRunner; import org.mockito.junit.MockitoJUnitRunner;
import org.rocksdb.Cache; import org.rocksdb.Cache;
import org.rocksdb.HistogramData; import org.rocksdb.HistogramData;
import org.rocksdb.HistogramType; import org.rocksdb.HistogramType;
@ -36,22 +37,22 @@ import org.rocksdb.Statistics;
import org.rocksdb.StatsLevel; import org.rocksdb.StatsLevel;
import org.rocksdb.TickerType; import org.rocksdb.TickerType;
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.mock;
import static org.easymock.EasyMock.niceMock;
import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertThrows;
import static org.powermock.api.easymock.PowerMock.reset; import static org.mockito.ArgumentMatchers.any;
import static org.powermock.api.easymock.PowerMock.createMock; import static org.mockito.ArgumentMatchers.eq;
import static org.powermock.api.easymock.PowerMock.mockStatic; import static org.mockito.ArgumentMatchers.isA;
import static org.powermock.api.easymock.PowerMock.replay; import static org.mockito.Mockito.mock;
import static org.powermock.api.easymock.PowerMock.verify; import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
@RunWith(PowerMockRunner.class) @RunWith(MockitoJUnitRunner.StrictStubs.class)
@PrepareForTest({RocksDBMetrics.class, Sensor.class})
public class RocksDBMetricsRecorderTest { public class RocksDBMetricsRecorderTest {
private final static String METRICS_SCOPE = "metrics-scope"; private final static String METRICS_SCOPE = "metrics-scope";
private final static TaskId TASK_ID1 = new TaskId(0, 0); private final static TaskId TASK_ID1 = new TaskId(0, 0);
@ -70,53 +71,92 @@ public class RocksDBMetricsRecorderTest {
private final Statistics statisticsToAdd2 = mock(Statistics.class); private final Statistics statisticsToAdd2 = mock(Statistics.class);
private final Statistics statisticsToAdd3 = mock(Statistics.class); private final Statistics statisticsToAdd3 = mock(Statistics.class);
private final Sensor bytesWrittenToDatabaseSensor = createMock(Sensor.class); private final Sensor bytesWrittenToDatabaseSensor = mock(Sensor.class);
private final Sensor bytesReadFromDatabaseSensor = createMock(Sensor.class); private final Sensor bytesReadFromDatabaseSensor = mock(Sensor.class);
private final Sensor memtableBytesFlushedSensor = createMock(Sensor.class); private final Sensor memtableBytesFlushedSensor = mock(Sensor.class);
private final Sensor memtableHitRatioSensor = createMock(Sensor.class); private final Sensor memtableHitRatioSensor = mock(Sensor.class);
private final Sensor memtableAvgFlushTimeSensor = createMock(Sensor.class); private final Sensor memtableAvgFlushTimeSensor = mock(Sensor.class);
private final Sensor memtableMinFlushTimeSensor = createMock(Sensor.class); private final Sensor memtableMinFlushTimeSensor = mock(Sensor.class);
private final Sensor memtableMaxFlushTimeSensor = createMock(Sensor.class); private final Sensor memtableMaxFlushTimeSensor = mock(Sensor.class);
private final Sensor writeStallDurationSensor = createMock(Sensor.class); private final Sensor writeStallDurationSensor = mock(Sensor.class);
private final Sensor blockCacheDataHitRatioSensor = createMock(Sensor.class); private final Sensor blockCacheDataHitRatioSensor = mock(Sensor.class);
private final Sensor blockCacheIndexHitRatioSensor = createMock(Sensor.class); private final Sensor blockCacheIndexHitRatioSensor = mock(Sensor.class);
private final Sensor blockCacheFilterHitRatioSensor = createMock(Sensor.class); private final Sensor blockCacheFilterHitRatioSensor = mock(Sensor.class);
private final Sensor bytesReadDuringCompactionSensor = createMock(Sensor.class); private final Sensor bytesReadDuringCompactionSensor = mock(Sensor.class);
private final Sensor bytesWrittenDuringCompactionSensor = createMock(Sensor.class); private final Sensor bytesWrittenDuringCompactionSensor = mock(Sensor.class);
private final Sensor compactionTimeAvgSensor = createMock(Sensor.class); private final Sensor compactionTimeAvgSensor = mock(Sensor.class);
private final Sensor compactionTimeMinSensor = createMock(Sensor.class); private final Sensor compactionTimeMinSensor = mock(Sensor.class);
private final Sensor compactionTimeMaxSensor = createMock(Sensor.class); private final Sensor compactionTimeMaxSensor = mock(Sensor.class);
private final Sensor numberOfOpenFilesSensor = createMock(Sensor.class); private final Sensor numberOfOpenFilesSensor = mock(Sensor.class);
private final Sensor numberOfFileErrorsSensor = createMock(Sensor.class); private final Sensor numberOfFileErrorsSensor = mock(Sensor.class);
private final StreamsMetricsImpl streamsMetrics = niceMock(StreamsMetricsImpl.class); private final StreamsMetricsImpl streamsMetrics = mock(StreamsMetricsImpl.class);
private final RocksDBMetricsRecordingTrigger recordingTrigger = mock(RocksDBMetricsRecordingTrigger.class); private final RocksDBMetricsRecordingTrigger recordingTrigger = mock(RocksDBMetricsRecordingTrigger.class);
private final RocksDBMetricsRecorder recorder = new RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME); private final RocksDBMetricsRecorder recorder = new RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME);
private final RocksDBMetricContext metricsContext = new RocksDBMetricContext(TASK_ID1.toString(), METRICS_SCOPE, STORE_NAME);
private MockedStatic<RocksDBMetrics> dbMetrics;
@Before @Before
public void setUp() { public void setUp() {
setUpMetricsStubMock(); setUpMetricsMock();
expect(streamsMetrics.rocksDBMetricsRecordingTrigger()).andStubReturn(recordingTrigger); when(streamsMetrics.rocksDBMetricsRecordingTrigger()).thenReturn(recordingTrigger);
replay(streamsMetrics);
recorder.init(streamsMetrics, TASK_ID1); recorder.init(streamsMetrics, TASK_ID1);
} }
@After
public void cleanUpMocks() {
dbMetrics.close();
}
@Test @Test
public void shouldInitMetricsRecorder() { public void shouldInitMetricsRecorder() {
setUpMetricsMock(); dbMetrics.verify(() -> RocksDBMetrics.bytesWrittenToDatabaseSensor(any(), any()));
dbMetrics.verify(() -> RocksDBMetrics.bytesReadFromDatabaseSensor(any(), any()));
recorder.init(streamsMetrics, TASK_ID1); dbMetrics.verify(() -> RocksDBMetrics.memtableBytesFlushedSensor(any(), any()));
dbMetrics.verify(() -> RocksDBMetrics.memtableHitRatioSensor(any(), any()));
verify(RocksDBMetrics.class); dbMetrics.verify(() -> RocksDBMetrics.memtableAvgFlushTimeSensor(any(), any()));
dbMetrics.verify(() -> RocksDBMetrics.memtableMinFlushTimeSensor(any(), any()));
dbMetrics.verify(() -> RocksDBMetrics.memtableMaxFlushTimeSensor(any(), any()));
dbMetrics.verify(() -> RocksDBMetrics.writeStallDurationSensor(any(), any()));
dbMetrics.verify(() -> RocksDBMetrics.blockCacheDataHitRatioSensor(any(), any()));
dbMetrics.verify(() -> RocksDBMetrics.blockCacheIndexHitRatioSensor(any(), any()));
dbMetrics.verify(() -> RocksDBMetrics.blockCacheFilterHitRatioSensor(any(), any()));
dbMetrics.verify(() -> RocksDBMetrics.bytesWrittenDuringCompactionSensor(any(), any()));
dbMetrics.verify(() -> RocksDBMetrics.bytesReadDuringCompactionSensor(any(), any()));
dbMetrics.verify(() -> RocksDBMetrics.compactionTimeAvgSensor(any(), any()));
dbMetrics.verify(() -> RocksDBMetrics.compactionTimeMinSensor(any(), any()));
dbMetrics.verify(() -> RocksDBMetrics.compactionTimeMaxSensor(any(), any()));
dbMetrics.verify(() -> RocksDBMetrics.numberOfOpenFilesSensor(any(), any()));
dbMetrics.verify(() -> RocksDBMetrics.numberOfFileErrorsSensor(any(), any()));
dbMetrics.verify(() -> RocksDBMetrics.addNumImmutableMemTableMetric(eq(streamsMetrics), eq(metricsContext), any()));
dbMetrics.verify(() -> RocksDBMetrics.addCurSizeActiveMemTable(eq(streamsMetrics), eq(metricsContext), any()));
dbMetrics.verify(() -> RocksDBMetrics.addCurSizeAllMemTables(eq(streamsMetrics), eq(metricsContext), any()));
dbMetrics.verify(() -> RocksDBMetrics.addSizeAllMemTables(eq(streamsMetrics), eq(metricsContext), any()));
dbMetrics.verify(() -> RocksDBMetrics.addNumEntriesActiveMemTableMetric(eq(streamsMetrics), eq(metricsContext), any()));
dbMetrics.verify(() -> RocksDBMetrics.addNumEntriesImmMemTablesMetric(eq(streamsMetrics), eq(metricsContext), any()));
dbMetrics.verify(() -> RocksDBMetrics.addNumDeletesActiveMemTableMetric(eq(streamsMetrics), eq(metricsContext), any()));
dbMetrics.verify(() -> RocksDBMetrics.addNumDeletesImmMemTablesMetric(eq(streamsMetrics), eq(metricsContext), any()));
dbMetrics.verify(() -> RocksDBMetrics.addMemTableFlushPending(eq(streamsMetrics), eq(metricsContext), any()));
dbMetrics.verify(() -> RocksDBMetrics.addNumRunningFlushesMetric(eq(streamsMetrics), eq(metricsContext), any()));
dbMetrics.verify(() -> RocksDBMetrics.addCompactionPendingMetric(eq(streamsMetrics), eq(metricsContext), any()));
dbMetrics.verify(() -> RocksDBMetrics.addNumRunningCompactionsMetric(eq(streamsMetrics), eq(metricsContext), any()));
dbMetrics.verify(() -> RocksDBMetrics.addEstimatePendingCompactionBytesMetric(eq(streamsMetrics), eq(metricsContext), any()));
dbMetrics.verify(() -> RocksDBMetrics.addTotalSstFilesSizeMetric(eq(streamsMetrics), eq(metricsContext), any()));
dbMetrics.verify(() -> RocksDBMetrics.addLiveSstFilesSizeMetric(eq(streamsMetrics), eq(metricsContext), any()));
dbMetrics.verify(() -> RocksDBMetrics.addNumLiveVersionMetric(eq(streamsMetrics), eq(metricsContext), any()));
dbMetrics.verify(() -> RocksDBMetrics.addBlockCacheCapacityMetric(eq(streamsMetrics), eq(metricsContext), any()));
dbMetrics.verify(() -> RocksDBMetrics.addBlockCacheUsageMetric(eq(streamsMetrics), eq(metricsContext), any()));
dbMetrics.verify(() -> RocksDBMetrics.addBlockCachePinnedUsageMetric(eq(streamsMetrics), eq(metricsContext), any()));
dbMetrics.verify(() -> RocksDBMetrics.addEstimateNumKeysMetric(eq(streamsMetrics), eq(metricsContext), any()));
dbMetrics.verify(() -> RocksDBMetrics.addEstimateTableReadersMemMetric(eq(streamsMetrics), eq(metricsContext), any()));
dbMetrics.verify(() -> RocksDBMetrics.addBackgroundErrorsMetric(eq(streamsMetrics), eq(metricsContext), any()));
assertThat(recorder.taskId(), is(TASK_ID1)); assertThat(recorder.taskId(), is(TASK_ID1));
} }
@Test @Test
public void shouldThrowIfMetricRecorderIsReInitialisedWithDifferentTask() { public void shouldThrowIfMetricRecorderIsReInitialisedWithDifferentTask() {
setUpMetricsStubMock();
recorder.init(streamsMetrics, TASK_ID1);
assertThrows( assertThrows(
IllegalStateException.class, IllegalStateException.class,
() -> recorder.init(streamsMetrics, TASK_ID2) () -> recorder.init(streamsMetrics, TASK_ID2)
@ -125,9 +165,6 @@ public class RocksDBMetricsRecorderTest {
@Test @Test
public void shouldThrowIfMetricRecorderIsReInitialisedWithDifferentStreamsMetrics() { public void shouldThrowIfMetricRecorderIsReInitialisedWithDifferentStreamsMetrics() {
setUpMetricsStubMock();
recorder.init(streamsMetrics, TASK_ID1);
assertThrows( assertThrows(
IllegalStateException.class, IllegalStateException.class,
() -> recorder.init( () -> recorder.init(
@ -139,21 +176,14 @@ public class RocksDBMetricsRecorderTest {
@Test @Test
public void shouldSetStatsLevelToExceptDetailedTimersWhenValueProvidersWithStatisticsAreAdded() { public void shouldSetStatsLevelToExceptDetailedTimersWhenValueProvidersWithStatisticsAreAdded() {
statisticsToAdd1.setStatsLevel(StatsLevel.EXCEPT_DETAILED_TIMERS);
replay(statisticsToAdd1);
recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1); recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1);
verify(statisticsToAdd1).setStatsLevel(StatsLevel.EXCEPT_DETAILED_TIMERS);
verify(statisticsToAdd1);
} }
@Test @Test
public void shouldNotSetStatsLevelToExceptDetailedTimersWhenValueProvidersWithoutStatisticsAreAdded() { public void shouldNotSetStatsLevelToExceptDetailedTimersWhenValueProvidersWithoutStatisticsAreAdded() {
replay(statisticsToAdd1);
recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, null); recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, null);
verifyNoInteractions(statisticsToAdd1);
verify(statisticsToAdd1);
} }
@Test @Test
@ -292,79 +322,63 @@ public class RocksDBMetricsRecorderTest {
@Test @Test
public void shouldAddItselfToRecordingTriggerWhenFirstValueProvidersAreAddedToNewlyCreatedRecorder() { public void shouldAddItselfToRecordingTriggerWhenFirstValueProvidersAreAddedToNewlyCreatedRecorder() {
recordingTrigger.addMetricsRecorder(recorder);
replay(recordingTrigger);
recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1); recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1);
verify(recordingTrigger); verify(recordingTrigger).addMetricsRecorder(recorder);
} }
@Test @Test
public void shouldAddItselfToRecordingTriggerWhenFirstValueProvidersAreAddedAfterLastValueProvidersWereRemoved() { public void shouldAddItselfToRecordingTriggerWhenFirstValueProvidersAreAddedAfterLastValueProvidersWereRemoved() {
recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1); recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1);
recorder.removeValueProviders(SEGMENT_STORE_NAME_1); recorder.removeValueProviders(SEGMENT_STORE_NAME_1);
reset(recordingTrigger);
recordingTrigger.addMetricsRecorder(recorder);
replay(recordingTrigger);
recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd2, cacheToAdd2, statisticsToAdd2); recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd2, cacheToAdd2, statisticsToAdd2);
verify(recordingTrigger, times(2)).addMetricsRecorder(recorder);
verify(recordingTrigger);
} }
@Test @Test
public void shouldNotAddItselfToRecordingTriggerWhenNotEmpty2() { public void shouldNotAddItselfToRecordingTriggerWhenNotEmpty() {
recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1); recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1);
reset(recordingTrigger);
replay(recordingTrigger); verify(recordingTrigger).addMetricsRecorder(recorder);
recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd2, cacheToAdd2, statisticsToAdd2); recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd2, cacheToAdd2, statisticsToAdd2);
verify(recordingTrigger); verifyNoMoreInteractions(recordingTrigger);
} }
@Test @Test
public void shouldCloseStatisticsWhenValueProvidersAreRemoved() { public void shouldCloseStatisticsWhenValueProvidersAreRemoved() {
recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1); recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1);
reset(statisticsToAdd1);
statisticsToAdd1.close();
replay(statisticsToAdd1);
recorder.removeValueProviders(SEGMENT_STORE_NAME_1); recorder.removeValueProviders(SEGMENT_STORE_NAME_1);
verify(statisticsToAdd1).close();
verify(statisticsToAdd1);
} }
@Test @Test
public void shouldNotCloseStatisticsWhenValueProvidersWithoutStatisticsAreRemoved() { public void shouldNotCloseStatisticsWhenValueProvidersWithoutStatisticsAreRemoved() {
recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, null); recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, null);
reset(statisticsToAdd1);
replay(statisticsToAdd1);
recorder.removeValueProviders(SEGMENT_STORE_NAME_1); recorder.removeValueProviders(SEGMENT_STORE_NAME_1);
verify(statisticsToAdd1, never()).close();
verify(statisticsToAdd1);
} }
@Test @Test
public void shouldRemoveItselfFromRecordingTriggerWhenLastValueProvidersAreRemoved() { public void shouldNotRemoveItselfFromRecordingTriggerWhenAtLeastOneValueProviderIsPresent() {
recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1); recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1);
recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd2, cacheToAdd2, statisticsToAdd2); recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd2, cacheToAdd2, statisticsToAdd2);
reset(recordingTrigger);
replay(recordingTrigger);
recorder.removeValueProviders(SEGMENT_STORE_NAME_1); recorder.removeValueProviders(SEGMENT_STORE_NAME_1);
verify(recordingTrigger); verify(recordingTrigger, never()).removeMetricsRecorder(recorder);
}
reset(recordingTrigger); @Test
recordingTrigger.removeMetricsRecorder(recorder); public void shouldRemoveItselfFromRecordingTriggerWhenAllValueProvidersAreRemoved() {
replay(recordingTrigger); recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1);
recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd2, cacheToAdd2, statisticsToAdd2);
recorder.removeValueProviders(SEGMENT_STORE_NAME_1);
recorder.removeValueProviders(SEGMENT_STORE_NAME_2); recorder.removeValueProviders(SEGMENT_STORE_NAME_2);
verify(recordingTrigger); verify(recordingTrigger).removeMetricsRecorder(recorder);
} }
@Test @Test
@ -381,157 +395,115 @@ public class RocksDBMetricsRecorderTest {
public void shouldRecordStatisticsBasedMetrics() { public void shouldRecordStatisticsBasedMetrics() {
recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1); recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1);
recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd2, cacheToAdd2, statisticsToAdd2); recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd2, cacheToAdd2, statisticsToAdd2);
reset(statisticsToAdd1); final long now = 0L;
reset(statisticsToAdd2);
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BYTES_WRITTEN)).andReturn(1L); when(statisticsToAdd1.getAndResetTickerCount(TickerType.BYTES_WRITTEN)).thenReturn(1L);
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BYTES_WRITTEN)).andReturn(2L); when(statisticsToAdd2.getAndResetTickerCount(TickerType.BYTES_WRITTEN)).thenReturn(2L);
bytesWrittenToDatabaseSensor.record(1 + 2, 0L); final double expectedBytesWrittenToDatabaseSensor = 1 + 2;
replay(bytesWrittenToDatabaseSensor);
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BYTES_READ)).andReturn(2L); when(statisticsToAdd1.getAndResetTickerCount(TickerType.BYTES_READ)).thenReturn(2L);
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BYTES_READ)).andReturn(3L); when(statisticsToAdd2.getAndResetTickerCount(TickerType.BYTES_READ)).thenReturn(3L);
bytesReadFromDatabaseSensor.record(2 + 3, 0L); final double expectedBytesReadFromDatabaseSensor = 2 + 3;
replay(bytesReadFromDatabaseSensor);
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.FLUSH_WRITE_BYTES)).andReturn(3L); when(statisticsToAdd1.getAndResetTickerCount(TickerType.FLUSH_WRITE_BYTES)).thenReturn(3L);
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.FLUSH_WRITE_BYTES)).andReturn(4L); when(statisticsToAdd2.getAndResetTickerCount(TickerType.FLUSH_WRITE_BYTES)).thenReturn(4L);
memtableBytesFlushedSensor.record(3 + 4, 0L); final double expectedMemtableBytesFlushedSensor = 3 + 4;
replay(memtableBytesFlushedSensor);
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.MEMTABLE_HIT)).andReturn(1L); when(statisticsToAdd1.getAndResetTickerCount(TickerType.MEMTABLE_HIT)).thenReturn(1L);
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.MEMTABLE_MISS)).andReturn(2L); when(statisticsToAdd1.getAndResetTickerCount(TickerType.MEMTABLE_MISS)).thenReturn(2L);
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.MEMTABLE_HIT)).andReturn(3L); when(statisticsToAdd2.getAndResetTickerCount(TickerType.MEMTABLE_HIT)).thenReturn(3L);
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.MEMTABLE_MISS)).andReturn(4L); when(statisticsToAdd2.getAndResetTickerCount(TickerType.MEMTABLE_MISS)).thenReturn(4L);
memtableHitRatioSensor.record((double) 4 / (4 + 6), 0L); final double expectedMemtableHitRatioSensorRecord = (double) 4 / (4 + 6);
replay(memtableHitRatioSensor);
final HistogramData memtableFlushTimeData1 = new HistogramData(0.0, 0.0, 0.0, 0.0, 0.0, 16.0, 2L, 10L, 3.0); final HistogramData memtableFlushTimeData1 = new HistogramData(0.0, 0.0, 0.0, 0.0, 0.0, 16.0, 2L, 10L, 3.0);
final HistogramData memtableFlushTimeData2 = new HistogramData(0.0, 0.0, 0.0, 0.0, 0.0, 20.0, 4L, 8L, 10.0); final HistogramData memtableFlushTimeData2 = new HistogramData(0.0, 0.0, 0.0, 0.0, 0.0, 20.0, 4L, 8L, 10.0);
expect(statisticsToAdd1.getHistogramData(HistogramType.FLUSH_TIME)).andReturn(memtableFlushTimeData1); when(statisticsToAdd1.getHistogramData(HistogramType.FLUSH_TIME)).thenReturn(memtableFlushTimeData1);
expect(statisticsToAdd2.getHistogramData(HistogramType.FLUSH_TIME)).andReturn(memtableFlushTimeData2); when(statisticsToAdd2.getHistogramData(HistogramType.FLUSH_TIME)).thenReturn(memtableFlushTimeData2);
memtableAvgFlushTimeSensor.record((double) (10 + 8) / (2 + 4), 0L); final double expectedMemtableAvgFlushTimeSensor = (double) (10 + 8) / (2 + 4);
replay(memtableAvgFlushTimeSensor); final double expectedMemtableMinFlushTimeSensor = 3.0d;
memtableMinFlushTimeSensor.record(3.0, 0L); final double expectedMemtableMaxFlushTimeSensor = 20.0d;
replay(memtableMinFlushTimeSensor);
memtableMaxFlushTimeSensor.record(20.0, 0L);
replay(memtableMaxFlushTimeSensor);
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.STALL_MICROS)).andReturn(4L); when(statisticsToAdd1.getAndResetTickerCount(TickerType.STALL_MICROS)).thenReturn(4L);
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.STALL_MICROS)).andReturn(5L); when(statisticsToAdd2.getAndResetTickerCount(TickerType.STALL_MICROS)).thenReturn(5L);
writeStallDurationSensor.record(4 + 5, 0L); final double expectedWriteStallDurationSensor = 4 + 5;
replay(writeStallDurationSensor);
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_HIT)).andReturn(5L); when(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_HIT)).thenReturn(5L);
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_MISS)).andReturn(4L); when(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_MISS)).thenReturn(4L);
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_HIT)).andReturn(3L); when(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_HIT)).thenReturn(3L);
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_MISS)).andReturn(2L); when(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_MISS)).thenReturn(2L);
blockCacheDataHitRatioSensor.record((double) 8 / (8 + 6), 0L); final double expectedBlockCacheDataHitRatioSensor = (double) 8 / (8 + 6);
replay(blockCacheDataHitRatioSensor);
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_HIT)).andReturn(4L); when(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_HIT)).thenReturn(4L);
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_MISS)).andReturn(2L); when(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_MISS)).thenReturn(2L);
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_HIT)).andReturn(2L); when(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_HIT)).thenReturn(2L);
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_MISS)).andReturn(4L); when(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_MISS)).thenReturn(4L);
blockCacheIndexHitRatioSensor.record((double) 6 / (6 + 6), 0L); final double expectedBlockCacheIndexHitRatioSensor = (double) 6 / (6 + 6);
replay(blockCacheIndexHitRatioSensor);
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_HIT)).andReturn(2L); when(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_HIT)).thenReturn(2L);
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_MISS)).andReturn(4L); when(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_MISS)).thenReturn(4L);
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_HIT)).andReturn(3L); when(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_HIT)).thenReturn(3L);
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_MISS)).andReturn(5L); when(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_MISS)).thenReturn(5L);
blockCacheFilterHitRatioSensor.record((double) 5 / (5 + 9), 0L); final double expectedBlockCacheFilterHitRatioSensor = (double) 5 / (5 + 9);
replay(blockCacheFilterHitRatioSensor);
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.COMPACT_WRITE_BYTES)).andReturn(2L); when(statisticsToAdd1.getAndResetTickerCount(TickerType.COMPACT_WRITE_BYTES)).thenReturn(2L);
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.COMPACT_WRITE_BYTES)).andReturn(4L); when(statisticsToAdd2.getAndResetTickerCount(TickerType.COMPACT_WRITE_BYTES)).thenReturn(4L);
bytesWrittenDuringCompactionSensor.record(2 + 4, 0L); final double expectedBytesWrittenDuringCompactionSensor = 2 + 4;
replay(bytesWrittenDuringCompactionSensor);
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.COMPACT_READ_BYTES)).andReturn(5L); when(statisticsToAdd1.getAndResetTickerCount(TickerType.COMPACT_READ_BYTES)).thenReturn(5L);
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.COMPACT_READ_BYTES)).andReturn(6L); when(statisticsToAdd2.getAndResetTickerCount(TickerType.COMPACT_READ_BYTES)).thenReturn(6L);
bytesReadDuringCompactionSensor.record(5 + 6, 0L); final double expectedBytesReadDuringCompactionSensor = 5 + 6;
replay(bytesReadDuringCompactionSensor);
final HistogramData compactionTimeData1 = new HistogramData(0.0, 0.0, 0.0, 0.0, 0.0, 16.0, 2L, 8L, 6.0); final HistogramData compactionTimeData1 = new HistogramData(0.0, 0.0, 0.0, 0.0, 0.0, 16.0, 2L, 8L, 6.0);
final HistogramData compactionTimeData2 = new HistogramData(0.0, 0.0, 0.0, 0.0, 0.0, 24.0, 2L, 8L, 4.0); final HistogramData compactionTimeData2 = new HistogramData(0.0, 0.0, 0.0, 0.0, 0.0, 24.0, 2L, 8L, 4.0);
expect(statisticsToAdd1.getHistogramData(HistogramType.COMPACTION_TIME)).andReturn(compactionTimeData1); when(statisticsToAdd1.getHistogramData(HistogramType.COMPACTION_TIME)).thenReturn(compactionTimeData1);
expect(statisticsToAdd2.getHistogramData(HistogramType.COMPACTION_TIME)).andReturn(compactionTimeData2); when(statisticsToAdd2.getHistogramData(HistogramType.COMPACTION_TIME)).thenReturn(compactionTimeData2);
compactionTimeAvgSensor.record((double) (8 + 8) / (2 + 2), 0L); final double expectedCompactionTimeAvgSensor = (double) (8 + 8) / (2 + 2);
replay(compactionTimeAvgSensor); final double expectedCompactionTimeMinSensor = 4.0;
compactionTimeMinSensor.record(4.0, 0L); final double expectedCompactionTimeMaxSensor = 24.0;
replay(compactionTimeMinSensor);
compactionTimeMaxSensor.record(24.0, 0L);
replay(compactionTimeMaxSensor);
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_OPENS)).andReturn(5L); when(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_OPENS)).thenReturn(5L);
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_CLOSES)).andReturn(3L); when(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_CLOSES)).thenReturn(3L);
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_OPENS)).andReturn(7L); when(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_OPENS)).thenReturn(7L);
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_CLOSES)).andReturn(4L); when(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_CLOSES)).thenReturn(4L);
numberOfOpenFilesSensor.record((5 + 7) - (3 + 4), 0L); final double expectedNumberOfOpenFilesSensor = (5 + 7) - (3 + 4);
replay(numberOfOpenFilesSensor);
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_ERRORS)).andReturn(34L); when(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_ERRORS)).thenReturn(34L);
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_ERRORS)).andReturn(11L); when(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_ERRORS)).thenReturn(11L);
numberOfFileErrorsSensor.record(11 + 34, 0L); final double expectedNumberOfFileErrorsSensor = 11 + 34;
replay(numberOfFileErrorsSensor);
replay(statisticsToAdd1); recorder.record(now);
replay(statisticsToAdd2);
recorder.record(0L); verify(statisticsToAdd1, times(17)).getAndResetTickerCount(isA(TickerType.class));
verify(statisticsToAdd2, times(17)).getAndResetTickerCount(isA(TickerType.class));
verify(statisticsToAdd1); verify(statisticsToAdd1, times(2)).getHistogramData(isA(HistogramType.class));
verify(statisticsToAdd2); verify(statisticsToAdd2, times(2)).getHistogramData(isA(HistogramType.class));
verify( verify(bytesWrittenToDatabaseSensor).record(expectedBytesWrittenToDatabaseSensor, now);
bytesWrittenToDatabaseSensor, verify(bytesReadFromDatabaseSensor).record(expectedBytesReadFromDatabaseSensor, now);
bytesReadFromDatabaseSensor, verify(memtableBytesFlushedSensor).record(expectedMemtableBytesFlushedSensor, now);
memtableBytesFlushedSensor, verify(memtableHitRatioSensor).record(expectedMemtableHitRatioSensorRecord, now);
memtableHitRatioSensor, verify(memtableAvgFlushTimeSensor).record(expectedMemtableAvgFlushTimeSensor, now);
memtableAvgFlushTimeSensor, verify(memtableMinFlushTimeSensor).record(expectedMemtableMinFlushTimeSensor, now);
memtableMinFlushTimeSensor, verify(memtableMaxFlushTimeSensor).record(expectedMemtableMaxFlushTimeSensor, now);
memtableMaxFlushTimeSensor, verify(writeStallDurationSensor).record(expectedWriteStallDurationSensor, now);
writeStallDurationSensor, verify(blockCacheDataHitRatioSensor).record(expectedBlockCacheDataHitRatioSensor, now);
blockCacheDataHitRatioSensor, verify(blockCacheIndexHitRatioSensor).record(expectedBlockCacheIndexHitRatioSensor, now);
blockCacheIndexHitRatioSensor, verify(blockCacheFilterHitRatioSensor).record(expectedBlockCacheFilterHitRatioSensor, now);
blockCacheFilterHitRatioSensor, verify(bytesWrittenDuringCompactionSensor).record(expectedBytesWrittenDuringCompactionSensor, now);
bytesWrittenDuringCompactionSensor, verify(bytesReadDuringCompactionSensor).record(expectedBytesReadDuringCompactionSensor, now);
bytesReadDuringCompactionSensor, verify(compactionTimeAvgSensor).record(expectedCompactionTimeAvgSensor, now);
compactionTimeAvgSensor, verify(compactionTimeMinSensor).record(expectedCompactionTimeMinSensor, now);
compactionTimeMinSensor, verify(compactionTimeMaxSensor).record(expectedCompactionTimeMaxSensor, now);
compactionTimeMaxSensor, verify(numberOfOpenFilesSensor).record(expectedNumberOfOpenFilesSensor, now);
numberOfOpenFilesSensor, verify(numberOfFileErrorsSensor).record(expectedNumberOfFileErrorsSensor, now);
numberOfFileErrorsSensor
);
} }
@Test @Test
public void shouldNotRecordStatisticsBasedMetricsIfStatisticsIsNull() { public void shouldNotRecordStatisticsBasedMetricsIfStatisticsIsNull() {
recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, null); recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, null);
replay(
bytesWrittenToDatabaseSensor,
bytesReadFromDatabaseSensor,
memtableBytesFlushedSensor,
memtableHitRatioSensor,
memtableAvgFlushTimeSensor,
memtableMinFlushTimeSensor,
memtableMaxFlushTimeSensor,
writeStallDurationSensor,
blockCacheDataHitRatioSensor,
blockCacheIndexHitRatioSensor,
blockCacheFilterHitRatioSensor,
bytesWrittenDuringCompactionSensor,
bytesReadDuringCompactionSensor,
compactionTimeAvgSensor,
compactionTimeMinSensor,
compactionTimeMaxSensor,
numberOfOpenFilesSensor,
numberOfFileErrorsSensor
);
recorder.record(0L); recorder.record(0L);
verify( verifyNoInteractions(
bytesWrittenToDatabaseSensor, bytesWrittenToDatabaseSensor,
bytesReadFromDatabaseSensor, bytesReadFromDatabaseSensor,
memtableBytesFlushedSensor, memtableBytesFlushedSensor,
@ -555,173 +527,68 @@ public class RocksDBMetricsRecorderTest {
@Test @Test
public void shouldCorrectlyHandleHitRatioRecordingsWithZeroHitsAndMisses() { public void shouldCorrectlyHandleHitRatioRecordingsWithZeroHitsAndMisses() {
reset(statisticsToAdd1);
recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1); recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1);
expect(statisticsToAdd1.getHistogramData(anyObject())).andStubReturn(new HistogramData(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0L, 0L, 0.0)); when(statisticsToAdd1.getHistogramData(any())).thenReturn(new HistogramData(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0L, 0L, 0.0));
expect(statisticsToAdd1.getAndResetTickerCount(anyObject())).andStubReturn(0L); when(statisticsToAdd1.getAndResetTickerCount(any())).thenReturn(0L);
replay(statisticsToAdd1);
memtableHitRatioSensor.record(0, 0L);
blockCacheDataHitRatioSensor.record(0, 0L);
blockCacheIndexHitRatioSensor.record(0, 0L);
blockCacheFilterHitRatioSensor.record(0, 0L);
replay(memtableHitRatioSensor);
replay(blockCacheDataHitRatioSensor);
replay(blockCacheIndexHitRatioSensor);
replay(blockCacheFilterHitRatioSensor);
recorder.record(0L); recorder.record(0L);
verify(memtableHitRatioSensor); verify(memtableHitRatioSensor).record(0d, 0L);
verify(blockCacheDataHitRatioSensor); verify(blockCacheDataHitRatioSensor).record(0d, 0L);
verify(blockCacheIndexHitRatioSensor); verify(blockCacheIndexHitRatioSensor).record(0d, 0L);
verify(blockCacheFilterHitRatioSensor); verify(blockCacheFilterHitRatioSensor).record(0d, 0L);
} }
@Test @Test
public void shouldCorrectlyHandleAvgRecordingsWithZeroSumAndCount() { public void shouldCorrectlyHandleAvgRecordingsWithZeroSumAndCount() {
reset(statisticsToAdd1);
recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1); recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1);
expect(statisticsToAdd1.getHistogramData(anyObject())).andStubReturn(new HistogramData(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0L, 0L, 0.0)); final long now = 0L;
expect(statisticsToAdd1.getAndResetTickerCount(anyObject())).andStubReturn(0L); when(statisticsToAdd1.getHistogramData(any())).thenReturn(new HistogramData(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0L, 0L, 0.0));
replay(statisticsToAdd1); when(statisticsToAdd1.getAndResetTickerCount(any())).thenReturn(0L);
memtableAvgFlushTimeSensor.record(0, 0L);
compactionTimeAvgSensor.record(0, 0L);
replay(memtableAvgFlushTimeSensor);
replay(compactionTimeAvgSensor);
recorder.record(0L); recorder.record(now);
verify(memtableAvgFlushTimeSensor); verify(compactionTimeAvgSensor).record(0d, now);
verify(compactionTimeAvgSensor); verify(memtableAvgFlushTimeSensor).record(0d, now);
} }
private void setUpMetricsMock() { private void setUpMetricsMock() {
mockStatic(RocksDBMetrics.class); dbMetrics = mockStatic(RocksDBMetrics.class);
final RocksDBMetricContext metricsContext = dbMetrics.when(() -> RocksDBMetrics.bytesWrittenToDatabaseSensor(streamsMetrics, metricsContext))
new RocksDBMetricContext(TASK_ID1.toString(), METRICS_SCOPE, STORE_NAME); .thenReturn(bytesWrittenToDatabaseSensor);
expect(RocksDBMetrics.bytesWrittenToDatabaseSensor(eq(streamsMetrics), eq(metricsContext))) dbMetrics.when(() -> RocksDBMetrics.bytesReadFromDatabaseSensor(streamsMetrics, metricsContext))
.andReturn(bytesWrittenToDatabaseSensor); .thenReturn(bytesReadFromDatabaseSensor);
expect(RocksDBMetrics.bytesReadFromDatabaseSensor(eq(streamsMetrics), eq(metricsContext))) dbMetrics.when(() -> RocksDBMetrics.memtableBytesFlushedSensor(streamsMetrics, metricsContext))
.andReturn(bytesReadFromDatabaseSensor); .thenReturn(memtableBytesFlushedSensor);
expect(RocksDBMetrics.memtableBytesFlushedSensor(eq(streamsMetrics), eq(metricsContext))) dbMetrics.when(() -> RocksDBMetrics.memtableHitRatioSensor(streamsMetrics, metricsContext))
.andReturn(memtableBytesFlushedSensor); .thenReturn(memtableHitRatioSensor);
expect(RocksDBMetrics.memtableHitRatioSensor(eq(streamsMetrics), eq(metricsContext))) dbMetrics.when(() -> RocksDBMetrics.memtableAvgFlushTimeSensor(streamsMetrics, metricsContext))
.andReturn(memtableHitRatioSensor); .thenReturn(memtableAvgFlushTimeSensor);
expect(RocksDBMetrics.memtableAvgFlushTimeSensor(eq(streamsMetrics), eq(metricsContext))) dbMetrics.when(() -> RocksDBMetrics.memtableMinFlushTimeSensor(streamsMetrics, metricsContext))
.andReturn(memtableAvgFlushTimeSensor); .thenReturn(memtableMinFlushTimeSensor);
expect(RocksDBMetrics.memtableMinFlushTimeSensor(eq(streamsMetrics), eq(metricsContext))) dbMetrics.when(() -> RocksDBMetrics.memtableMaxFlushTimeSensor(streamsMetrics, metricsContext))
.andReturn(memtableMinFlushTimeSensor); .thenReturn(memtableMaxFlushTimeSensor);
expect(RocksDBMetrics.memtableMaxFlushTimeSensor(eq(streamsMetrics), eq(metricsContext))) dbMetrics.when(() -> RocksDBMetrics.writeStallDurationSensor(streamsMetrics, metricsContext))
.andReturn(memtableMaxFlushTimeSensor); .thenReturn(writeStallDurationSensor);
expect(RocksDBMetrics.writeStallDurationSensor(eq(streamsMetrics), eq(metricsContext))) dbMetrics.when(() -> RocksDBMetrics.blockCacheDataHitRatioSensor(streamsMetrics, metricsContext))
.andReturn(writeStallDurationSensor); .thenReturn(blockCacheDataHitRatioSensor);
expect(RocksDBMetrics.blockCacheDataHitRatioSensor(eq(streamsMetrics), eq(metricsContext))) dbMetrics.when(() -> RocksDBMetrics.blockCacheIndexHitRatioSensor(streamsMetrics, metricsContext))
.andReturn(blockCacheDataHitRatioSensor); .thenReturn(blockCacheIndexHitRatioSensor);
expect(RocksDBMetrics.blockCacheIndexHitRatioSensor(eq(streamsMetrics), eq(metricsContext))) dbMetrics.when(() -> RocksDBMetrics.blockCacheFilterHitRatioSensor(streamsMetrics, metricsContext))
.andReturn(blockCacheIndexHitRatioSensor); .thenReturn(blockCacheFilterHitRatioSensor);
expect(RocksDBMetrics.blockCacheFilterHitRatioSensor(eq(streamsMetrics), eq(metricsContext))) dbMetrics.when(() -> RocksDBMetrics.bytesWrittenDuringCompactionSensor(streamsMetrics, metricsContext))
.andReturn(blockCacheFilterHitRatioSensor); .thenReturn(bytesWrittenDuringCompactionSensor);
expect(RocksDBMetrics.bytesWrittenDuringCompactionSensor(eq(streamsMetrics), eq(metricsContext))) dbMetrics.when(() -> RocksDBMetrics.bytesReadDuringCompactionSensor(streamsMetrics, metricsContext))
.andReturn(bytesWrittenDuringCompactionSensor); .thenReturn(bytesReadDuringCompactionSensor);
expect(RocksDBMetrics.bytesReadDuringCompactionSensor(eq(streamsMetrics), eq(metricsContext))) dbMetrics.when(() -> RocksDBMetrics.compactionTimeAvgSensor(streamsMetrics, metricsContext))
.andReturn(bytesReadDuringCompactionSensor); .thenReturn(compactionTimeAvgSensor);
expect(RocksDBMetrics.compactionTimeAvgSensor(eq(streamsMetrics), eq(metricsContext))) dbMetrics.when(() -> RocksDBMetrics.compactionTimeMinSensor(streamsMetrics, metricsContext))
.andReturn(compactionTimeAvgSensor); .thenReturn(compactionTimeMinSensor);
expect(RocksDBMetrics.compactionTimeMinSensor(eq(streamsMetrics), eq(metricsContext))) dbMetrics.when(() -> RocksDBMetrics.compactionTimeMaxSensor(streamsMetrics, metricsContext))
.andReturn(compactionTimeMinSensor); .thenReturn(compactionTimeMaxSensor);
expect(RocksDBMetrics.compactionTimeMaxSensor(eq(streamsMetrics), eq(metricsContext))) dbMetrics.when(() -> RocksDBMetrics.numberOfOpenFilesSensor(streamsMetrics, metricsContext))
.andReturn(compactionTimeMaxSensor); .thenReturn(numberOfOpenFilesSensor);
expect(RocksDBMetrics.numberOfOpenFilesSensor(eq(streamsMetrics), eq(metricsContext))) dbMetrics.when(() -> RocksDBMetrics.numberOfFileErrorsSensor(streamsMetrics, metricsContext))
.andReturn(numberOfOpenFilesSensor); .thenReturn(numberOfFileErrorsSensor);
expect(RocksDBMetrics.numberOfFileErrorsSensor(eq(streamsMetrics), eq(metricsContext)))
.andReturn(numberOfFileErrorsSensor);
RocksDBMetrics.addNumImmutableMemTableMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
RocksDBMetrics.addCurSizeActiveMemTable(eq(streamsMetrics), eq(metricsContext), anyObject());
RocksDBMetrics.addCurSizeAllMemTables(eq(streamsMetrics), eq(metricsContext), anyObject());
RocksDBMetrics.addSizeAllMemTables(eq(streamsMetrics), eq(metricsContext), anyObject());
RocksDBMetrics.addNumEntriesActiveMemTableMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
RocksDBMetrics.addNumEntriesImmMemTablesMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
RocksDBMetrics.addNumDeletesActiveMemTableMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
RocksDBMetrics.addNumDeletesImmMemTablesMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
RocksDBMetrics.addMemTableFlushPending(eq(streamsMetrics), eq(metricsContext), anyObject());
RocksDBMetrics.addNumRunningFlushesMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
RocksDBMetrics.addCompactionPendingMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
RocksDBMetrics.addNumRunningCompactionsMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
RocksDBMetrics.addEstimatePendingCompactionBytesMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
RocksDBMetrics.addTotalSstFilesSizeMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
RocksDBMetrics.addLiveSstFilesSizeMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
RocksDBMetrics.addNumLiveVersionMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
RocksDBMetrics.addBlockCacheCapacityMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
RocksDBMetrics.addBlockCacheUsageMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
RocksDBMetrics.addBlockCachePinnedUsageMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
RocksDBMetrics.addEstimateNumKeysMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
RocksDBMetrics.addEstimateTableReadersMemMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
RocksDBMetrics.addBackgroundErrorsMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
replay(RocksDBMetrics.class);
}
private void setUpMetricsStubMock() {
mockStatic(RocksDBMetrics.class);
final RocksDBMetricContext metricsContext =
new RocksDBMetricContext(TASK_ID1.toString(), METRICS_SCOPE, STORE_NAME);
expect(RocksDBMetrics.bytesWrittenToDatabaseSensor(streamsMetrics, metricsContext))
.andStubReturn(bytesWrittenToDatabaseSensor);
expect(RocksDBMetrics.bytesReadFromDatabaseSensor(streamsMetrics, metricsContext))
.andStubReturn(bytesReadFromDatabaseSensor);
expect(RocksDBMetrics.memtableBytesFlushedSensor(streamsMetrics, metricsContext))
.andStubReturn(memtableBytesFlushedSensor);
expect(RocksDBMetrics.memtableHitRatioSensor(streamsMetrics, metricsContext))
.andStubReturn(memtableHitRatioSensor);
expect(RocksDBMetrics.memtableAvgFlushTimeSensor(streamsMetrics, metricsContext))
.andStubReturn(memtableAvgFlushTimeSensor);
expect(RocksDBMetrics.memtableMinFlushTimeSensor(streamsMetrics, metricsContext))
.andStubReturn(memtableMinFlushTimeSensor);
expect(RocksDBMetrics.memtableMaxFlushTimeSensor(streamsMetrics, metricsContext))
.andStubReturn(memtableMaxFlushTimeSensor);
expect(RocksDBMetrics.writeStallDurationSensor(streamsMetrics, metricsContext))
.andStubReturn(writeStallDurationSensor);
expect(RocksDBMetrics.blockCacheDataHitRatioSensor(streamsMetrics, metricsContext))
.andStubReturn(blockCacheDataHitRatioSensor);
expect(RocksDBMetrics.blockCacheIndexHitRatioSensor(streamsMetrics, metricsContext))
.andStubReturn(blockCacheIndexHitRatioSensor);
expect(RocksDBMetrics.blockCacheFilterHitRatioSensor(streamsMetrics, metricsContext))
.andStubReturn(blockCacheFilterHitRatioSensor);
expect(RocksDBMetrics.bytesWrittenDuringCompactionSensor(streamsMetrics, metricsContext))
.andStubReturn(bytesWrittenDuringCompactionSensor);
expect(RocksDBMetrics.bytesReadDuringCompactionSensor(streamsMetrics, metricsContext))
.andStubReturn(bytesReadDuringCompactionSensor);
expect(RocksDBMetrics.compactionTimeAvgSensor(streamsMetrics, metricsContext))
.andStubReturn(compactionTimeAvgSensor);
expect(RocksDBMetrics.compactionTimeMinSensor(streamsMetrics, metricsContext))
.andStubReturn(compactionTimeMinSensor);
expect(RocksDBMetrics.compactionTimeMaxSensor(streamsMetrics, metricsContext))
.andStubReturn(compactionTimeMaxSensor);
expect(RocksDBMetrics.numberOfOpenFilesSensor(streamsMetrics, metricsContext))
.andStubReturn(numberOfOpenFilesSensor);
expect(RocksDBMetrics.numberOfFileErrorsSensor(streamsMetrics, metricsContext))
.andStubReturn(numberOfFileErrorsSensor);
RocksDBMetrics.addNumImmutableMemTableMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
RocksDBMetrics.addCurSizeActiveMemTable(eq(streamsMetrics), eq(metricsContext), anyObject());
RocksDBMetrics.addCurSizeAllMemTables(eq(streamsMetrics), eq(metricsContext), anyObject());
RocksDBMetrics.addSizeAllMemTables(eq(streamsMetrics), eq(metricsContext), anyObject());
RocksDBMetrics.addNumEntriesActiveMemTableMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
RocksDBMetrics.addNumEntriesImmMemTablesMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
RocksDBMetrics.addNumDeletesActiveMemTableMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
RocksDBMetrics.addNumDeletesImmMemTablesMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
RocksDBMetrics.addMemTableFlushPending(eq(streamsMetrics), eq(metricsContext), anyObject());
RocksDBMetrics.addNumRunningFlushesMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
RocksDBMetrics.addCompactionPendingMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
RocksDBMetrics.addNumRunningCompactionsMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
RocksDBMetrics.addEstimatePendingCompactionBytesMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
RocksDBMetrics.addTotalSstFilesSizeMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
RocksDBMetrics.addLiveSstFilesSizeMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
RocksDBMetrics.addNumLiveVersionMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
RocksDBMetrics.addBlockCacheCapacityMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
RocksDBMetrics.addBlockCacheUsageMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
RocksDBMetrics.addBlockCachePinnedUsageMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
RocksDBMetrics.addEstimateNumKeysMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
RocksDBMetrics.addEstimateTableReadersMemMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
RocksDBMetrics.addBackgroundErrorsMetric(eq(streamsMetrics), eq(metricsContext), anyObject());
replay(RocksDBMetrics.class);
} }
} }