KAFKA-9881: Convert integration test to verify measurements from RocksDB to unit test (#8501)

The integration test RocksDBMetricsIntegrationTest takes pretty long to complete.
Most of the runtime is spent in the two tests that verify whether the RocksDB
metrics get actual measurements from RocksDB. Those tests need to wait for the thread
that collects the measurements of the RocksDB metrics to trigger the first recordings
of the metrics.

This PR adds a unit test that verifies whether the Kafka Streams metrics get the
measurements from RocksDB and removes the two integration tests that verified it
before. The verification of the creation and scheduling of the RocksDB metrics
recording trigger thread is already contained in KafkaStreamsTest and consequently
it is not part of this PR.

Reviewers: Guozhang Wang <wangguoz@gmail.com>
This commit is contained in:
Bruno Cadonna 2020-04-17 19:49:40 +02:00 committed by GitHub
parent 00a59b392d
commit a0173ec45d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 43 additions and 43 deletions

View File

@ -190,13 +190,13 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BulkLoadingSt
// Setup metrics before the database is opened, otherwise the metrics are not updated // Setup metrics before the database is opened, otherwise the metrics are not updated
// with the measurements from Rocks DB // with the measurements from Rocks DB
maybeSetUpMetricsRecorder(context, configs); maybeSetUpMetricsRecorder(configs);
openRocksDB(dbOptions, columnFamilyOptions); openRocksDB(dbOptions, columnFamilyOptions);
open = true; open = true;
} }
private void maybeSetUpMetricsRecorder(final ProcessorContext context, final Map<String, Object> configs) { private void maybeSetUpMetricsRecorder(final Map<String, Object> configs) {
if (userSpecifiedOptions.statistics() == null && if (userSpecifiedOptions.statistics() == null &&
RecordingLevel.forName((String) configs.get(METRICS_RECORDING_LEVEL_CONFIG)) == RecordingLevel.DEBUG) { RecordingLevel.forName((String) configs.get(METRICS_RECORDING_LEVEL_CONFIG)) == RecordingLevel.DEBUG) {

View File

@ -181,40 +181,6 @@ public class RocksDBMetricsIntegrationTest {
); );
} }
@Test
public void shouldVerifyThatMetricsGetMeasurementsFromRocksDBForNonSegmentedStateStore() throws Exception {
final Properties streamsConfiguration = streamsConfig();
IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
final StreamsBuilder builder = builderForNonSegmentedStateStore();
final String metricsScope = "rocksdb-state-id";
cleanUpStateRunVerifyAndClose(
builder,
streamsConfiguration,
IntegerDeserializer.class,
StringDeserializer.class,
this::verifyThatBytesWrittenTotalIncreases,
metricsScope
);
}
@Test
public void shouldVerifyThatMetricsGetMeasurementsFromRocksDBForSegmentedStateStore() throws Exception {
final Properties streamsConfiguration = streamsConfig();
IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
final StreamsBuilder builder = builderForSegmentedStateStore();
final String metricsScope = "rocksdb-window-state-id";
cleanUpStateRunVerifyAndClose(
builder,
streamsConfiguration,
LongDeserializer.class,
LongDeserializer.class,
this::verifyThatBytesWrittenTotalIncreases,
metricsScope
);
}
private Properties streamsConfig() { private Properties streamsConfig() {
final Properties streamsConfiguration = new Properties(); final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-application"); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-application");

View File

@ -16,7 +16,10 @@
*/ */
package org.apache.kafka.streams.state.internals; package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor.RecordingLevel; import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serdes;
@ -28,21 +31,21 @@ import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateRestoreListener; import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.RocksDBConfigSetter; import org.apache.kafka.streams.state.RocksDBConfigSetter;
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics;
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder; import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecordingTrigger; import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecordingTrigger;
import org.apache.kafka.test.InternalMockProcessorContext; import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.StreamsTestUtils; import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils; import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.rocksdb.BlockBasedTableConfig; import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.BloomFilter; import org.rocksdb.BloomFilter;
import org.rocksdb.Filter; import org.rocksdb.Filter;
@ -68,6 +71,7 @@ import static org.easymock.EasyMock.reset;
import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
@ -75,8 +79,6 @@ import static org.junit.Assert.fail;
import static org.powermock.api.easymock.PowerMock.replay; import static org.powermock.api.easymock.PowerMock.replay;
import static org.powermock.api.easymock.PowerMock.verify; import static org.powermock.api.easymock.PowerMock.verify;
@RunWith(PowerMockRunner.class)
@PrepareForTest({RocksDBMetrics.class, Sensor.class})
public class RocksDBStoreTest { public class RocksDBStoreTest {
private static boolean enableBloomFilters = false; private static boolean enableBloomFilters = false;
final static String DB_NAME = "db-name"; final static String DB_NAME = "db-name";
@ -612,6 +614,38 @@ public class RocksDBStoreTest {
assertTrue(TestingBloomFilterRocksDBConfigSetter.bloomFiltersSet); assertTrue(TestingBloomFilterRocksDBConfigSetter.bloomFiltersSet);
} }
@Test
public void shouldVerifyThatMetricsGetMeasurementsFromRocksDB() {
final RocksDBMetricsRecordingTrigger rocksDBMetricsRecordingTrigger = new RocksDBMetricsRecordingTrigger();
final Metrics metrics = new Metrics(new MetricConfig().recordLevel(RecordingLevel.DEBUG));
final StreamsMetricsImpl streamsMetrics =
new StreamsMetricsImpl(metrics, "test-application", StreamsConfig.METRICS_LATEST);
streamsMetrics.setRocksDBMetricsRecordingTrigger(rocksDBMetricsRecordingTrigger);
final ProcessorContext<Object, Object> context = EasyMock.niceMock(ProcessorContext.class);
EasyMock.expect(context.metrics()).andStubReturn(streamsMetrics);
final TaskId taskId = new TaskId(0, 0);
EasyMock.expect(context.taskId()).andStubReturn(taskId);
EasyMock.expect(context.appConfigs())
.andStubReturn(new StreamsConfig(StreamsTestUtils.getStreamsConfig()).originals());
EasyMock.expect(context.taskId()).andStubReturn(taskId);
EasyMock.replay(context);
final RocksDBStore rocksDBStore = new RocksDBStore(DB_NAME, METRICS_SCOPE);
rocksDBStore.init(context, rocksDBStore);
final byte[] key = "hello".getBytes();
final byte[] value = "world".getBytes();
rocksDBStore.put(Bytes.wrap(key), value);
rocksDBMetricsRecordingTrigger.run();
final Metric bytesWrittenTotal = metrics.metric(new MetricName(
"bytes-written-total",
StreamsMetricsImpl.STATE_STORE_LEVEL_GROUP,
"description is not verified",
streamsMetrics.storeLevelTagMap(Thread.currentThread().getName(), taskId.toString(), METRICS_SCOPE, DB_NAME)
));
assertThat((double) bytesWrittenTotal.metricValue(), greaterThan(0d));
}
public static class MockRocksDbConfigSetter implements RocksDBConfigSetter { public static class MockRocksDbConfigSetter implements RocksDBConfigSetter {
static boolean called; static boolean called;