KAFKA-14432: RocksDBStore relies on finalizers to not leak memory (#12935)

RocksDBStore relied on finalizers to not leak memory (and leaked memory after the upgrade to RocksDB 7).
The problem was that every call to options.statistics creates a new wrapper object that needs to be finalized.

I simplified the logic a bit and moved the ownership of the statistics from ValueProvider to RocksDBStore.

Reviewers: Bruno Cadonna <cadonna@apache.org>, Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Christo Lolov <lolovc@amazon.com>
This commit is contained in:
Lucas Brutschy 2022-12-08 03:25:58 +01:00 committed by Bruno Cadonna
parent fb8c153203
commit ddae7f0f9c
4 changed files with 80 additions and 48 deletions

View File

@ -108,6 +108,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
FlushOptions fOptions;
private Cache cache;
private BloomFilter filter;
private Statistics statistics;
private RocksDBConfigSetter configSetter;
private boolean userSpecifiedStatistics = false;
@ -229,40 +230,38 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
// Setup statistics before the database is opened, otherwise the statistics are not updated
// with the measurements from Rocks DB
maybeSetUpStatistics(configs);
setupStatistics(configs, dbOptions);
openRocksDB(dbOptions, columnFamilyOptions);
open = true;
addValueProvidersToMetricsRecorder();
}
private void maybeSetUpStatistics(final Map<String, Object> configs) {
if (userSpecifiedOptions.statistics() != null) {
private void setupStatistics(final Map<String, Object> configs, final DBOptions dbOptions) {
statistics = userSpecifiedOptions.statistics();
if (statistics == null) {
if (RecordingLevel.forName((String) configs.get(METRICS_RECORDING_LEVEL_CONFIG)) == RecordingLevel.DEBUG) {
statistics = new Statistics();
dbOptions.setStatistics(statistics);
}
userSpecifiedStatistics = false;
} else {
userSpecifiedStatistics = true;
}
if (!userSpecifiedStatistics &&
RecordingLevel.forName((String) configs.get(METRICS_RECORDING_LEVEL_CONFIG)) == RecordingLevel.DEBUG) {
// metrics recorder will clean up statistics object
final Statistics statistics = new Statistics();
userSpecifiedOptions.setStatistics(statistics);
}
}
private void addValueProvidersToMetricsRecorder() {
final TableFormatConfig tableFormatConfig = userSpecifiedOptions.tableFormatConfig();
final Statistics statistics = userSpecifiedStatistics ? null : userSpecifiedOptions.statistics();
if (tableFormatConfig instanceof BlockBasedTableConfigWithAccessibleCache) {
final Cache cache = ((BlockBasedTableConfigWithAccessibleCache) tableFormatConfig).blockCache();
metricsRecorder.addValueProviders(name, db, cache, statistics);
metricsRecorder.addValueProviders(name, db, cache, userSpecifiedStatistics ? null : statistics);
} else if (tableFormatConfig instanceof BlockBasedTableConfig) {
throw new ProcessorStateException("The used block-based table format configuration does not expose the " +
"block cache. Use the BlockBasedTableConfig instance provided by Options#tableFormatConfig() to configure " +
"the block-based table format of RocksDB. Do not provide a new instance of BlockBasedTableConfig to " +
"the RocksDB options.");
} else {
metricsRecorder.addValueProviders(name, db, null, statistics);
metricsRecorder.addValueProviders(name, db, null, userSpecifiedStatistics ? null : statistics);
}
}
@ -526,6 +525,9 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
fOptions.close();
filter.close();
cache.close();
if (statistics != null) {
statistics.close();
}
dbAccessor = null;
userSpecifiedOptions = null;
@ -534,6 +536,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
db = null;
filter = null;
cache = null;
statistics = null;
}
private void closeOpenIterators() {

View File

@ -79,12 +79,6 @@ public class RocksDBMetricsRecorder {
}
this.statistics = statistics;
}
public void maybeCloseStatistics() {
if (statistics != null) {
statistics.close();
}
}
}
private static final String ROCKSDB_PROPERTIES_PREFIX = "rocksdb.";
@ -411,7 +405,6 @@ public class RocksDBMetricsRecorder {
" could be found. This is a bug in Kafka Streams. " +
"Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues");
}
removedValueProviders.maybeCloseStatistics();
if (storeToValueProviders.isEmpty()) {
logger.debug(
"Removing metrics recorder for store {} of task {} from metrics recording trigger",

View File

@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
import java.lang.reflect.Field;
import java.util.Optional;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@ -238,12 +239,18 @@ public class RocksDBStoreTest extends AbstractKeyValueStoreTest {
public RocksDBConfigSetterWithUserProvidedStatistics(){}
public void setConfig(final String storeName, final Options options, final Map<String, Object> configs) {
options.setStatistics(new Statistics());
lastStatistics = new Statistics();
options.setStatistics(lastStatistics);
}
public void close(final String storeName, final Options options) {
options.statistics().close();
// We want to be in charge of closing our statistics ourselves.
assertTrue(lastStatistics.isOwningHandle());
lastStatistics.close();
lastStatistics = null;
}
protected static Statistics lastStatistics = null;
}
@Test
@ -258,6 +265,52 @@ public class RocksDBStoreTest extends AbstractKeyValueStoreTest {
reset(metricsRecorder);
}
@Test
public void shouldCloseStatisticsWhenUserProvidesStatistics() throws Exception {
rocksDBStore = getRocksDBStoreWithRocksDBMetricsRecorder();
context = getProcessorContext(RecordingLevel.DEBUG, RocksDBConfigSetterWithUserProvidedStatistics.class);
rocksDBStore.openDB(context.appConfigs(), context.stateDir());
final Statistics userStatistics = RocksDBConfigSetterWithUserProvidedStatistics.lastStatistics;
final Statistics statisticsHandle = getStatistics(rocksDBStore);
rocksDBStore.close();
// Both statistics handles must be closed now.
assertFalse(userStatistics.isOwningHandle());
assertFalse(statisticsHandle.isOwningHandle());
assertNull(getStatistics(rocksDBStore));
assertNull(RocksDBConfigSetterWithUserProvidedStatistics.lastStatistics);
}
@Test
public void shouldSetStatisticsInValueProvidersWhenUserProvidesNoStatistics() {
rocksDBStore = getRocksDBStoreWithRocksDBMetricsRecorder();
context = getProcessorContext(RecordingLevel.DEBUG);
metricsRecorder.addValueProviders(eq(DB_NAME), notNull(), notNull(), notNull());
replay(metricsRecorder);
rocksDBStore.openDB(context.appConfigs(), context.stateDir());
verify(metricsRecorder);
reset(metricsRecorder);
}
@Test
public void shouldCloseStatisticsWhenUserProvidesNoStatistics() throws Exception {
rocksDBStore = getRocksDBStoreWithRocksDBMetricsRecorder();
context = getProcessorContext(RecordingLevel.DEBUG);
rocksDBStore.openDB(context.appConfigs(), context.stateDir());
final Statistics statisticsHandle = getStatistics(rocksDBStore);
rocksDBStore.close();
// Statistics handles must be closed now.
assertFalse(statisticsHandle.isOwningHandle());
assertNull(getStatistics(rocksDBStore));
}
public static class RocksDBConfigSetterWithUserProvidedNewBlockBasedTableFormatConfig implements RocksDBConfigSetter {
public RocksDBConfigSetterWithUserProvidedNewBlockBasedTableFormatConfig(){}
@ -1180,5 +1233,11 @@ public class RocksDBStoreTest extends AbstractKeyValueStoreTest {
return result;
}
private Statistics getStatistics(final RocksDBStore rocksDBStore) throws Exception {
final Field statisticsField = rocksDBStore.getClass().getDeclaredField("statistics");
statisticsField.setAccessible(true);
final Statistics statistics = (Statistics) statisticsField.get(rocksDBStore);
statisticsField.setAccessible(false);
return statistics;
}
}

View File

@ -325,30 +325,7 @@ public class RocksDBMetricsRecorderTest {
}
@Test
public void shouldCloseStatisticsWhenValueProvidersAreRemoved() {
recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1);
reset(statisticsToAdd1);
statisticsToAdd1.close();
replay(statisticsToAdd1);
recorder.removeValueProviders(SEGMENT_STORE_NAME_1);
verify(statisticsToAdd1);
}
@Test
public void shouldNotCloseStatisticsWhenValueProvidersWithoutStatisticsAreRemoved() {
recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, null);
reset(statisticsToAdd1);
replay(statisticsToAdd1);
recorder.removeValueProviders(SEGMENT_STORE_NAME_1);
verify(statisticsToAdd1);
}
@Test
public void shouldRemoveItselfFromRecordingTriggerWhenLastValueProvidersAreRemoved() {
public void shouldNotRemoveItselfFromRecordingTriggerWhenAtLeastOneValueProviderIsPresent() {
recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1);
recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd2, cacheToAdd2, statisticsToAdd2);
reset(recordingTrigger);