From 42f87309376a08c2522c4b7c98c7780093131bec Mon Sep 17 00:00:00 2001 From: Lucas Brutschy Date: Thu, 8 Dec 2022 03:25:58 +0100 Subject: [PATCH] KAFKA-14432: RocksDBStore relies on finalizers to not leak memory (#12935) RocksDBStore relied on finalizers to not leak memory (and leaked memory after the upgrade to RocksDB 7). The problem was that every call to options.statistics creates a new wrapper object that needs to be finalized. I simplified the logic a bit and moved the ownership of the statistics from ValueProvider to RocksDBStore. Reviewers: Bruno Cadonna , Anna Sophie Blee-Goldman , Christo Lolov --- .../streams/state/internals/RocksDBStore.java | 31 +++++---- .../metrics/RocksDBMetricsRecorder.java | 7 -- .../state/internals/RocksDBStoreTest.java | 65 ++++++++++++++++++- .../metrics/RocksDBMetricsRecorderTest.java | 25 +------ 4 files changed, 81 insertions(+), 47 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index aa1b1ba09f0..10980270d90 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -98,6 +98,7 @@ public class RocksDBStore implements KeyValueStore, BatchWritingS FlushOptions fOptions; private Cache cache; private BloomFilter filter; + private Statistics statistics; private RocksDBConfigSetter configSetter; private boolean userSpecifiedStatistics = false; @@ -178,40 +179,38 @@ public class RocksDBStore implements KeyValueStore, BatchWritingS // Setup statistics before the database is opened, otherwise the statistics are not updated // with the measurements from Rocks DB - maybeSetUpStatistics(configs); - + setupStatistics(configs, dbOptions); openRocksDB(dbOptions, columnFamilyOptions); open = true; addValueProvidersToMetricsRecorder(); } - private void maybeSetUpStatistics(final Map configs) { - if (userSpecifiedOptions.statistics() != null) { + private void setupStatistics(final Map configs, final DBOptions dbOptions) { + statistics = userSpecifiedOptions.statistics(); + if (statistics == null) { + if (RecordingLevel.forName((String) configs.get(METRICS_RECORDING_LEVEL_CONFIG)) == RecordingLevel.DEBUG) { + statistics = new Statistics(); + dbOptions.setStatistics(statistics); + } + userSpecifiedStatistics = false; + } else { userSpecifiedStatistics = true; } - if (!userSpecifiedStatistics && - RecordingLevel.forName((String) configs.get(METRICS_RECORDING_LEVEL_CONFIG)) == RecordingLevel.DEBUG) { - - // metrics recorder will clean up statistics object - final Statistics statistics = new Statistics(); - userSpecifiedOptions.setStatistics(statistics); - } } private void addValueProvidersToMetricsRecorder() { final TableFormatConfig tableFormatConfig = userSpecifiedOptions.tableFormatConfig(); - final Statistics statistics = userSpecifiedStatistics ? null : userSpecifiedOptions.statistics(); if (tableFormatConfig instanceof BlockBasedTableConfigWithAccessibleCache) { final Cache cache = ((BlockBasedTableConfigWithAccessibleCache) tableFormatConfig).blockCache(); - metricsRecorder.addValueProviders(name, db, cache, statistics); + metricsRecorder.addValueProviders(name, db, cache, userSpecifiedStatistics ? null : statistics); } else if (tableFormatConfig instanceof BlockBasedTableConfig) { throw new ProcessorStateException("The used block-based table format configuration does not expose the " + "block cache. Use the BlockBasedTableConfig instance provided by Options#tableFormatConfig() to configure " + "the block-based table format of RocksDB. Do not provide a new instance of BlockBasedTableConfig to " + "the RocksDB options."); } else { - metricsRecorder.addValueProviders(name, db, null, statistics); + metricsRecorder.addValueProviders(name, db, null, userSpecifiedStatistics ? null : statistics); } } @@ -481,6 +480,9 @@ public class RocksDBStore implements KeyValueStore, BatchWritingS fOptions.close(); filter.close(); cache.close(); + if (statistics != null) { + statistics.close(); + } dbAccessor = null; userSpecifiedOptions = null; @@ -489,6 +491,7 @@ public class RocksDBStore implements KeyValueStore, BatchWritingS db = null; filter = null; cache = null; + statistics = null; } private void closeOpenIterators() { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java index 85412d1c5e8..b54fa037e2f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java @@ -77,12 +77,6 @@ public class RocksDBMetricsRecorder { } this.statistics = statistics; } - - public void maybeCloseStatistics() { - if (statistics != null) { - statistics.close(); - } - } } private static final String ROCKSDB_PROPERTIES_PREFIX = "rocksdb."; @@ -397,7 +391,6 @@ public class RocksDBMetricsRecorder { " could be found. This is a bug in Kafka Streams. " + "Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues"); } - removedValueProviders.maybeCloseStatistics(); if (storeToValueProviders.isEmpty()) { logger.debug( "Removing metrics recorder for store {} of task {} from metrics recording trigger", diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java index 066f080c764..2bd703fdce7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state.internals; +import java.lang.reflect.Field; import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.metrics.MetricConfig; @@ -90,6 +91,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.greaterThan; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.powermock.api.easymock.PowerMock.replay; @@ -221,12 +223,18 @@ public class RocksDBStoreTest extends AbstractKeyValueStoreTest { public RocksDBConfigSetterWithUserProvidedStatistics(){} public void setConfig(final String storeName, final Options options, final Map configs) { - options.setStatistics(new Statistics()); + lastStatistics = new Statistics(); + options.setStatistics(lastStatistics); } public void close(final String storeName, final Options options) { - options.statistics().close(); + // We want to be in charge of closing our statistics ourselves. + assertTrue(lastStatistics.isOwningHandle()); + lastStatistics.close(); + lastStatistics = null; } + + protected static Statistics lastStatistics = null; } @Test @@ -241,6 +249,52 @@ public class RocksDBStoreTest extends AbstractKeyValueStoreTest { reset(metricsRecorder); } + + @Test + public void shouldCloseStatisticsWhenUserProvidesStatistics() throws Exception { + rocksDBStore = getRocksDBStoreWithRocksDBMetricsRecorder(); + context = getProcessorContext(RecordingLevel.DEBUG, RocksDBConfigSetterWithUserProvidedStatistics.class); + + rocksDBStore.openDB(context.appConfigs(), context.stateDir()); + final Statistics userStatistics = RocksDBConfigSetterWithUserProvidedStatistics.lastStatistics; + final Statistics statisticsHandle = getStatistics(rocksDBStore); + rocksDBStore.close(); + + // Both statistics handles must be closed now. + assertFalse(userStatistics.isOwningHandle()); + assertFalse(statisticsHandle.isOwningHandle()); + assertNull(getStatistics(rocksDBStore)); + assertNull(RocksDBConfigSetterWithUserProvidedStatistics.lastStatistics); + } + + @Test + public void shouldSetStatisticsInValueProvidersWhenUserProvidesNoStatistics() { + rocksDBStore = getRocksDBStoreWithRocksDBMetricsRecorder(); + context = getProcessorContext(RecordingLevel.DEBUG); + metricsRecorder.addValueProviders(eq(DB_NAME), notNull(), notNull(), notNull()); + replay(metricsRecorder); + + rocksDBStore.openDB(context.appConfigs(), context.stateDir()); + + verify(metricsRecorder); + reset(metricsRecorder); + } + + @Test + public void shouldCloseStatisticsWhenUserProvidesNoStatistics() throws Exception { + rocksDBStore = getRocksDBStoreWithRocksDBMetricsRecorder(); + context = getProcessorContext(RecordingLevel.DEBUG); + + rocksDBStore.openDB(context.appConfigs(), context.stateDir()); + final Statistics statisticsHandle = getStatistics(rocksDBStore); + rocksDBStore.close(); + + // Statistics handles must be closed now. + assertFalse(statisticsHandle.isOwningHandle()); + assertNull(getStatistics(rocksDBStore)); + } + + public static class RocksDBConfigSetterWithUserProvidedNewBlockBasedTableFormatConfig implements RocksDBConfigSetter { public RocksDBConfigSetterWithUserProvidedNewBlockBasedTableFormatConfig(){} @@ -981,4 +1035,11 @@ public class RocksDBStoreTest extends AbstractKeyValueStoreTest { return result; } + private Statistics getStatistics(final RocksDBStore rocksDBStore) throws Exception { + final Field statisticsField = rocksDBStore.getClass().getDeclaredField("statistics"); + statisticsField.setAccessible(true); + final Statistics statistics = (Statistics) statisticsField.get(rocksDBStore); + statisticsField.setAccessible(false); + return statistics; + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java index dc08f845fe1..bdd0cda875d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java @@ -319,30 +319,7 @@ public class RocksDBMetricsRecorderTest { } @Test - public void shouldCloseStatisticsWhenValueProvidersAreRemoved() { - recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1); - reset(statisticsToAdd1); - statisticsToAdd1.close(); - replay(statisticsToAdd1); - - recorder.removeValueProviders(SEGMENT_STORE_NAME_1); - - verify(statisticsToAdd1); - } - - @Test - public void shouldNotCloseStatisticsWhenValueProvidersWithoutStatisticsAreRemoved() { - recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, null); - reset(statisticsToAdd1); - replay(statisticsToAdd1); - - recorder.removeValueProviders(SEGMENT_STORE_NAME_1); - - verify(statisticsToAdd1); - } - - @Test - public void shouldRemoveItselfFromRecordingTriggerWhenLastValueProvidersAreRemoved() { + public void shouldNotRemoveItselfFromRecordingTriggerWhenAtLeastOneValueProviderIsPresent() { recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1); recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd2, cacheToAdd2, statisticsToAdd2); reset(recordingTrigger);