KAFKA-14432: RocksDBStore relies on finalizers to not leak memory (#12935)

RocksDBStore relied on finalizers to not leak memory (and leaked memory after the upgrade to RocksDB 7).
The problem was that every call to options.statistics creates a new wrapper object that needs to be finalized.

I simplified the logic a bit and moved the ownership of the statistics from ValueProvider to RocksDBStore.

Reviewers: Bruno Cadonna <cadonna@apache.org>, Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Christo Lolov <lolovc@amazon.com>
This commit is contained in:
Lucas Brutschy 2022-12-08 03:25:58 +01:00 committed by Bruno Cadonna
parent ca082c4bf2
commit 42f8730937
4 changed files with 81 additions and 47 deletions

View File

@ -98,6 +98,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
FlushOptions fOptions; FlushOptions fOptions;
private Cache cache; private Cache cache;
private BloomFilter filter; private BloomFilter filter;
private Statistics statistics;
private RocksDBConfigSetter configSetter; private RocksDBConfigSetter configSetter;
private boolean userSpecifiedStatistics = false; private boolean userSpecifiedStatistics = false;
@ -178,40 +179,38 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
// Setup statistics before the database is opened, otherwise the statistics are not updated // Setup statistics before the database is opened, otherwise the statistics are not updated
// with the measurements from Rocks DB // with the measurements from Rocks DB
maybeSetUpStatistics(configs); setupStatistics(configs, dbOptions);
openRocksDB(dbOptions, columnFamilyOptions); openRocksDB(dbOptions, columnFamilyOptions);
open = true; open = true;
addValueProvidersToMetricsRecorder(); addValueProvidersToMetricsRecorder();
} }
private void maybeSetUpStatistics(final Map<String, Object> configs) { private void setupStatistics(final Map<String, Object> configs, final DBOptions dbOptions) {
if (userSpecifiedOptions.statistics() != null) { statistics = userSpecifiedOptions.statistics();
if (statistics == null) {
if (RecordingLevel.forName((String) configs.get(METRICS_RECORDING_LEVEL_CONFIG)) == RecordingLevel.DEBUG) {
statistics = new Statistics();
dbOptions.setStatistics(statistics);
}
userSpecifiedStatistics = false;
} else {
userSpecifiedStatistics = true; userSpecifiedStatistics = true;
} }
if (!userSpecifiedStatistics &&
RecordingLevel.forName((String) configs.get(METRICS_RECORDING_LEVEL_CONFIG)) == RecordingLevel.DEBUG) {
// metrics recorder will clean up statistics object
final Statistics statistics = new Statistics();
userSpecifiedOptions.setStatistics(statistics);
}
} }
private void addValueProvidersToMetricsRecorder() { private void addValueProvidersToMetricsRecorder() {
final TableFormatConfig tableFormatConfig = userSpecifiedOptions.tableFormatConfig(); final TableFormatConfig tableFormatConfig = userSpecifiedOptions.tableFormatConfig();
final Statistics statistics = userSpecifiedStatistics ? null : userSpecifiedOptions.statistics();
if (tableFormatConfig instanceof BlockBasedTableConfigWithAccessibleCache) { if (tableFormatConfig instanceof BlockBasedTableConfigWithAccessibleCache) {
final Cache cache = ((BlockBasedTableConfigWithAccessibleCache) tableFormatConfig).blockCache(); final Cache cache = ((BlockBasedTableConfigWithAccessibleCache) tableFormatConfig).blockCache();
metricsRecorder.addValueProviders(name, db, cache, statistics); metricsRecorder.addValueProviders(name, db, cache, userSpecifiedStatistics ? null : statistics);
} else if (tableFormatConfig instanceof BlockBasedTableConfig) { } else if (tableFormatConfig instanceof BlockBasedTableConfig) {
throw new ProcessorStateException("The used block-based table format configuration does not expose the " + throw new ProcessorStateException("The used block-based table format configuration does not expose the " +
"block cache. Use the BlockBasedTableConfig instance provided by Options#tableFormatConfig() to configure " + "block cache. Use the BlockBasedTableConfig instance provided by Options#tableFormatConfig() to configure " +
"the block-based table format of RocksDB. Do not provide a new instance of BlockBasedTableConfig to " + "the block-based table format of RocksDB. Do not provide a new instance of BlockBasedTableConfig to " +
"the RocksDB options."); "the RocksDB options.");
} else { } else {
metricsRecorder.addValueProviders(name, db, null, statistics); metricsRecorder.addValueProviders(name, db, null, userSpecifiedStatistics ? null : statistics);
} }
} }
@ -481,6 +480,9 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
fOptions.close(); fOptions.close();
filter.close(); filter.close();
cache.close(); cache.close();
if (statistics != null) {
statistics.close();
}
dbAccessor = null; dbAccessor = null;
userSpecifiedOptions = null; userSpecifiedOptions = null;
@ -489,6 +491,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
db = null; db = null;
filter = null; filter = null;
cache = null; cache = null;
statistics = null;
} }
private void closeOpenIterators() { private void closeOpenIterators() {

View File

@ -77,12 +77,6 @@ public class RocksDBMetricsRecorder {
} }
this.statistics = statistics; this.statistics = statistics;
} }
public void maybeCloseStatistics() {
if (statistics != null) {
statistics.close();
}
}
} }
private static final String ROCKSDB_PROPERTIES_PREFIX = "rocksdb."; private static final String ROCKSDB_PROPERTIES_PREFIX = "rocksdb.";
@ -397,7 +391,6 @@ public class RocksDBMetricsRecorder {
" could be found. This is a bug in Kafka Streams. " + " could be found. This is a bug in Kafka Streams. " +
"Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues"); "Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues");
} }
removedValueProviders.maybeCloseStatistics();
if (storeToValueProviders.isEmpty()) { if (storeToValueProviders.isEmpty()) {
logger.debug( logger.debug(
"Removing metrics recorder for store {} of task {} from metrics recording trigger", "Removing metrics recorder for store {} of task {} from metrics recording trigger",

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.kafka.streams.state.internals; package org.apache.kafka.streams.state.internals;
import java.lang.reflect.Field;
import org.apache.kafka.common.Metric; import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName; import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.MetricConfig;
@ -90,6 +91,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThan;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.powermock.api.easymock.PowerMock.replay; import static org.powermock.api.easymock.PowerMock.replay;
@ -221,12 +223,18 @@ public class RocksDBStoreTest extends AbstractKeyValueStoreTest {
public RocksDBConfigSetterWithUserProvidedStatistics(){} public RocksDBConfigSetterWithUserProvidedStatistics(){}
public void setConfig(final String storeName, final Options options, final Map<String, Object> configs) { public void setConfig(final String storeName, final Options options, final Map<String, Object> configs) {
options.setStatistics(new Statistics()); lastStatistics = new Statistics();
options.setStatistics(lastStatistics);
} }
public void close(final String storeName, final Options options) { public void close(final String storeName, final Options options) {
options.statistics().close(); // We want to be in charge of closing our statistics ourselves.
assertTrue(lastStatistics.isOwningHandle());
lastStatistics.close();
lastStatistics = null;
} }
protected static Statistics lastStatistics = null;
} }
@Test @Test
@ -241,6 +249,52 @@ public class RocksDBStoreTest extends AbstractKeyValueStoreTest {
reset(metricsRecorder); reset(metricsRecorder);
} }
@Test
public void shouldCloseStatisticsWhenUserProvidesStatistics() throws Exception {
rocksDBStore = getRocksDBStoreWithRocksDBMetricsRecorder();
context = getProcessorContext(RecordingLevel.DEBUG, RocksDBConfigSetterWithUserProvidedStatistics.class);
rocksDBStore.openDB(context.appConfigs(), context.stateDir());
final Statistics userStatistics = RocksDBConfigSetterWithUserProvidedStatistics.lastStatistics;
final Statistics statisticsHandle = getStatistics(rocksDBStore);
rocksDBStore.close();
// Both statistics handles must be closed now.
assertFalse(userStatistics.isOwningHandle());
assertFalse(statisticsHandle.isOwningHandle());
assertNull(getStatistics(rocksDBStore));
assertNull(RocksDBConfigSetterWithUserProvidedStatistics.lastStatistics);
}
@Test
public void shouldSetStatisticsInValueProvidersWhenUserProvidesNoStatistics() {
rocksDBStore = getRocksDBStoreWithRocksDBMetricsRecorder();
context = getProcessorContext(RecordingLevel.DEBUG);
metricsRecorder.addValueProviders(eq(DB_NAME), notNull(), notNull(), notNull());
replay(metricsRecorder);
rocksDBStore.openDB(context.appConfigs(), context.stateDir());
verify(metricsRecorder);
reset(metricsRecorder);
}
@Test
public void shouldCloseStatisticsWhenUserProvidesNoStatistics() throws Exception {
rocksDBStore = getRocksDBStoreWithRocksDBMetricsRecorder();
context = getProcessorContext(RecordingLevel.DEBUG);
rocksDBStore.openDB(context.appConfigs(), context.stateDir());
final Statistics statisticsHandle = getStatistics(rocksDBStore);
rocksDBStore.close();
// Statistics handles must be closed now.
assertFalse(statisticsHandle.isOwningHandle());
assertNull(getStatistics(rocksDBStore));
}
public static class RocksDBConfigSetterWithUserProvidedNewBlockBasedTableFormatConfig implements RocksDBConfigSetter { public static class RocksDBConfigSetterWithUserProvidedNewBlockBasedTableFormatConfig implements RocksDBConfigSetter {
public RocksDBConfigSetterWithUserProvidedNewBlockBasedTableFormatConfig(){} public RocksDBConfigSetterWithUserProvidedNewBlockBasedTableFormatConfig(){}
@ -981,4 +1035,11 @@ public class RocksDBStoreTest extends AbstractKeyValueStoreTest {
return result; return result;
} }
private Statistics getStatistics(final RocksDBStore rocksDBStore) throws Exception {
final Field statisticsField = rocksDBStore.getClass().getDeclaredField("statistics");
statisticsField.setAccessible(true);
final Statistics statistics = (Statistics) statisticsField.get(rocksDBStore);
statisticsField.setAccessible(false);
return statistics;
}
} }

View File

@ -319,30 +319,7 @@ public class RocksDBMetricsRecorderTest {
} }
@Test @Test
public void shouldCloseStatisticsWhenValueProvidersAreRemoved() { public void shouldNotRemoveItselfFromRecordingTriggerWhenAtLeastOneValueProviderIsPresent() {
recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1);
reset(statisticsToAdd1);
statisticsToAdd1.close();
replay(statisticsToAdd1);
recorder.removeValueProviders(SEGMENT_STORE_NAME_1);
verify(statisticsToAdd1);
}
@Test
public void shouldNotCloseStatisticsWhenValueProvidersWithoutStatisticsAreRemoved() {
recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, null);
reset(statisticsToAdd1);
replay(statisticsToAdd1);
recorder.removeValueProviders(SEGMENT_STORE_NAME_1);
verify(statisticsToAdd1);
}
@Test
public void shouldRemoveItselfFromRecordingTriggerWhenLastValueProvidersAreRemoved() {
recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1); recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1);
recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd2, cacheToAdd2, statisticsToAdd2); recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd2, cacheToAdd2, statisticsToAdd2);
reset(recordingTrigger); reset(recordingTrigger);