From 1317f3f77a9e1e432e7a81de2dcb88365feeac43 Mon Sep 17 00:00:00 2001 From: "A. Sophie Blee-Goldman" Date: Wed, 30 Mar 2022 16:24:01 -0700 Subject: [PATCH] MINOR: log warning when topology override for cache size is non-zero (#11959) Since the topology-level cache size config only controls whether we disable the caching layer entirely for that topology, setting it to anything other than 0 has no effect. The actual cache memory is still just split evenly between the threads, and shared by all topologies. It's possible we'll want to change this in the future, but for now we should make sure to log a warning so that users who do try to set this override to some nonzero value are made aware that it doesn't work like this. Also includes some minor refactoring plus a fix for an off-by-one error in #11796 Reviewers: Luke Chen , Walker Carlson , Sagar Rao --- checkstyle/suppressions.xml | 2 +- .../apache/kafka/streams/KafkaStreams.java | 7 +- .../apache/kafka/streams/StreamsConfig.java | 23 ------- .../apache/kafka/streams/TopologyConfig.java | 62 +++++++++++------- .../streams/internals/StreamsConfigUtils.java | 31 +++++++++ .../kafka/streams/KafkaStreamsTest.java | 25 +++++++ .../kafka/streams/StreamsConfigTest.java | 9 +-- .../AdjustStreamThreadCountTest.java | 65 ++----------------- .../kafka/streams/TopologyTestDriver.java | 3 +- 9 files changed, 114 insertions(+), 113 deletions(-) diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index cd82efe1421..0824e0b60b7 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -188,7 +188,7 @@ files="StreamsMetricsImpl.java"/> + files="(GlobalStateManagerImpl|KafkaStreams|KStreamImplJoin|StreamsPartitionAssignor|StreamThread|TaskManager|TopologyConfig).java"/> diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 7798e58204c..9d9d57f8ab9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -109,6 +109,7 @@ import static org.apache.kafka.streams.StreamsConfig.METRICS_RECORDING_LEVEL_CON import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix; import static org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration; +import static org.apache.kafka.streams.internals.StreamsConfigUtils.getTotalCacheSize; import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchEndOffsets; /** @@ -939,7 +940,7 @@ public class KafkaStreams implements AutoCloseable { streamsUncaughtExceptionHandler = this::defaultStreamsUncaughtExceptionHandler; delegatingStateRestoreListener = new DelegatingStateRestoreListener(); - totalCacheSize = applicationConfigs.getTotalCacheSize(); + totalCacheSize = getTotalCacheSize(applicationConfigs); inputBufferMaxBytes = applicationConfigs.getLong(StreamsConfig.INPUT_BUFFER_MAX_BYTES_CONFIG); final int numStreamThreads = topologyMetadata.getNumStreamThreads(applicationConfigs); @@ -1047,9 +1048,9 @@ public class KafkaStreams implements AutoCloseable { // and then resize them later streamThread = createAndAddStreamThread(0L, 0L, threadIdx); final int numLiveThreads = getNumLiveStreamThreads(); - resizeThreadCacheAndBufferMemory(numLiveThreads + 1); + resizeThreadCacheAndBufferMemory(numLiveThreads); log.info("Adding StreamThread-{}, there are now {} threads with cache size/max buffer size values as {} per thread.", - threadIdx, numLiveThreads + 1, getThreadCacheAndBufferMemoryString()); + threadIdx, numLiveThreads, getThreadCacheAndBufferMemoryString()); } synchronized (stateLock) { diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index e9e0cca05a8..740f7af5da2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -1536,29 +1536,6 @@ public class StreamsConfig extends AbstractConfig { return props; } - public long getTotalCacheSize() { - // both deprecated and new config set. Warn and use the new one. - if (originals().containsKey(CACHE_MAX_BYTES_BUFFERING_CONFIG) && originals().containsKey(STATESTORE_CACHE_MAX_BYTES_CONFIG)) { - if (!getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG).equals(getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG))) { - log.warn("Both deprecated config {} and the new config {} are set, hence {} is ignored and {} is used instead.", - CACHE_MAX_BYTES_BUFFERING_CONFIG, - STATESTORE_CACHE_MAX_BYTES_CONFIG, - CACHE_MAX_BYTES_BUFFERING_CONFIG, - STATESTORE_CACHE_MAX_BYTES_CONFIG); - } - return getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG); - } else if (originals().containsKey(CACHE_MAX_BYTES_BUFFERING_CONFIG)) { - // only deprecated config set. - log.warn("Deprecated config {} is set, and will be used; we suggest setting the new config {} instead as deprecated {} would be removed in the future.", - CACHE_MAX_BYTES_BUFFERING_CONFIG, - STATESTORE_CACHE_MAX_BYTES_CONFIG, - CACHE_MAX_BYTES_BUFFERING_CONFIG); - return getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG); - } - // only new or no config set. Use default or user specified value. - return getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG); - } - /** * Get the configs for the {@link Admin admin client}. * @param clientId clientId diff --git a/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java b/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java index de8aec94ce4..da9a8c89bfa 100644 --- a/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java @@ -49,6 +49,7 @@ import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DSL_STORE_CONFIG; import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DSL_STORE_DOC; import static org.apache.kafka.streams.StreamsConfig.ROCKS_DB; import static org.apache.kafka.streams.StreamsConfig.IN_MEMORY; +import static org.apache.kafka.streams.internals.StreamsConfigUtils.getTotalCacheSize; /** * Streams configs that apply at the topology level. The values in the {@link StreamsConfig} parameter of the @@ -136,37 +137,54 @@ public class TopologyConfig extends AbstractConfig { maxBufferedSize = getInt(BUFFERED_RECORDS_PER_PARTITION_CONFIG); log.info("Topology {} is overriding {} to {}", topologyName, BUFFERED_RECORDS_PER_PARTITION_CONFIG, maxBufferedSize); } else { + // If the user hasn't explicitly set the buffered.records.per.partition config, then leave it unbounded + // and rely on the input.buffer.max.bytes instead to keep the memory usage under control maxBufferedSize = globalAppConfigs.originals().containsKey(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG) ? globalAppConfigs.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG) : -1; } - if (isTopologyOverride(STATESTORE_CACHE_MAX_BYTES_CONFIG, topologyOverrides) && isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides)) { - cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG); - log.info("Topology {} is using both deprecated config {} and new config {}, hence {} is ignored and the new config {} (value {}) is used", - topologyName, - CACHE_MAX_BYTES_BUFFERING_CONFIG, - STATESTORE_CACHE_MAX_BYTES_CONFIG, - CACHE_MAX_BYTES_BUFFERING_CONFIG, - STATESTORE_CACHE_MAX_BYTES_CONFIG, - cacheSize); - } else if (isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides)) { - cacheSize = getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG); - log.info("Topology {} is using only deprecated config {}, and will be used to set cache size to {}; " + - "we suggest setting the new config {} instead as deprecated {} would be removed in the future.", - topologyName, - CACHE_MAX_BYTES_BUFFERING_CONFIG, - cacheSize, - STATESTORE_CACHE_MAX_BYTES_CONFIG, - CACHE_MAX_BYTES_BUFFERING_CONFIG); - } else if (isTopologyOverride(STATESTORE_CACHE_MAX_BYTES_CONFIG, topologyOverrides)) { - cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG); + final boolean stateStoreCacheMaxBytesOverridden = isTopologyOverride(STATESTORE_CACHE_MAX_BYTES_CONFIG, topologyOverrides); + final boolean cacheMaxBytesBufferingOverridden = isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides); + + if (!stateStoreCacheMaxBytesOverridden && !cacheMaxBytesBufferingOverridden) { + cacheSize = getTotalCacheSize(globalAppConfigs); } else { - cacheSize = globalAppConfigs.getTotalCacheSize(); + if (stateStoreCacheMaxBytesOverridden && cacheMaxBytesBufferingOverridden) { + cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG); + log.info("Topology {} is using both deprecated config {} and new config {}, hence {} is ignored and the new config {} (value {}) is used", + topologyName, + CACHE_MAX_BYTES_BUFFERING_CONFIG, + STATESTORE_CACHE_MAX_BYTES_CONFIG, + CACHE_MAX_BYTES_BUFFERING_CONFIG, + STATESTORE_CACHE_MAX_BYTES_CONFIG, + cacheSize); + } else if (cacheMaxBytesBufferingOverridden) { + cacheSize = getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG); + log.info("Topology {} is using only deprecated config {}, and will be used to set cache size to {}; " + + "we suggest setting the new config {} instead as deprecated {} would be removed in the future.", + topologyName, + CACHE_MAX_BYTES_BUFFERING_CONFIG, + cacheSize, + STATESTORE_CACHE_MAX_BYTES_CONFIG, + CACHE_MAX_BYTES_BUFFERING_CONFIG); + } else { + cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG); + } + + if (cacheSize != 0) { + log.warn("Topology {} is overriding cache size to {} but this will not have any effect as the " + + "topology-level cache size config only controls whether record buffering is enabled " + + "or disabled, thus the only valid override value is 0", + topologyName, cacheSize); + } else { + log.info("Topology {} is overriding cache size to {}, record buffering will be disabled", + topologyName, cacheSize); + } } if (isTopologyOverride(MAX_TASK_IDLE_MS_CONFIG, topologyOverrides)) { maxTaskIdleMs = getLong(MAX_TASK_IDLE_MS_CONFIG); - log.info("Topology {} is overridding {} to {}", topologyName, MAX_TASK_IDLE_MS_CONFIG, maxTaskIdleMs); + log.info("Topology {} is overriding {} to {}", topologyName, MAX_TASK_IDLE_MS_CONFIG, maxTaskIdleMs); } else { maxTaskIdleMs = globalAppConfigs.getLong(MAX_TASK_IDLE_MS_CONFIG); } diff --git a/streams/src/main/java/org/apache/kafka/streams/internals/StreamsConfigUtils.java b/streams/src/main/java/org/apache/kafka/streams/internals/StreamsConfigUtils.java index e271a42ab89..6f169826f06 100644 --- a/streams/src/main/java/org/apache/kafka/streams/internals/StreamsConfigUtils.java +++ b/streams/src/main/java/org/apache/kafka/streams/internals/StreamsConfigUtils.java @@ -17,9 +17,16 @@ package org.apache.kafka.streams.internals; import org.apache.kafka.streams.StreamsConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.kafka.streams.StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG; public class StreamsConfigUtils { + private static final Logger LOG = LoggerFactory.getLogger(StreamsConfigUtils.class); + public enum ProcessingMode { AT_LEAST_ONCE("AT_LEAST_ONCE"), @@ -66,4 +73,28 @@ public class StreamsConfigUtils { return processingMode == ProcessingMode.EXACTLY_ONCE_ALPHA || processingMode == ProcessingMode.EXACTLY_ONCE_V2; } + + @SuppressWarnings("deprecation") + public static long getTotalCacheSize(final StreamsConfig config) { + // both deprecated and new config set. Warn and use the new one. + if (config.originals().containsKey(CACHE_MAX_BYTES_BUFFERING_CONFIG) && config.originals().containsKey(STATESTORE_CACHE_MAX_BYTES_CONFIG)) { + if (!config.getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG).equals(config.getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG))) { + LOG.warn("Both deprecated config {} and the new config {} are set, hence {} is ignored and {} is used instead.", + CACHE_MAX_BYTES_BUFFERING_CONFIG, + STATESTORE_CACHE_MAX_BYTES_CONFIG, + CACHE_MAX_BYTES_BUFFERING_CONFIG, + STATESTORE_CACHE_MAX_BYTES_CONFIG); + } + return config.getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG); + } else if (config.originals().containsKey(CACHE_MAX_BYTES_BUFFERING_CONFIG)) { + // only deprecated config set. + LOG.warn("Deprecated config {} is set, and will be used; we suggest setting the new config {} instead as deprecated {} would be removed in the future.", + CACHE_MAX_BYTES_BUFFERING_CONFIG, + STATESTORE_CACHE_MAX_BYTES_CONFIG, + CACHE_MAX_BYTES_BUFFERING_CONFIG); + return config.getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG); + } + // only new or no config set. Use default or user specified value. + return config.getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG); + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index b5620d5c13e..eabb8edc114 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -143,6 +143,8 @@ public class KafkaStreamsTest { private GlobalStreamThread globalStreamThread; @Mock private Metrics metrics; + @Mock + private State state; private StateListenerStub streamsStateListener; private Capture> metricsReportersCapture; @@ -216,6 +218,7 @@ public class KafkaStreamsTest { ClientMetrics.addStateMetric(anyObject(StreamsMetricsImpl.class), anyObject()); ClientMetrics.addNumAliveStreamThreadMetric(anyObject(StreamsMetricsImpl.class), anyObject()); + // setup stream threads PowerMock.mockStatic(StreamThread.class); EasyMock.expect(StreamThread.create( @@ -240,6 +243,7 @@ public class KafkaStreamsTest { PowerMock.mockStatic(StreamsConfigUtils.class); EasyMock.expect(StreamsConfigUtils.processingMode(anyObject(StreamsConfig.class))).andReturn(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE).anyTimes(); EasyMock.expect(StreamsConfigUtils.eosEnabled(anyObject(StreamsConfig.class))).andReturn(false).anyTimes(); + EasyMock.expect(StreamsConfigUtils.getTotalCacheSize(anyObject(StreamsConfig.class))).andReturn(10 * 1024 * 1024L).anyTimes(); EasyMock.expect(streamThreadOne.getId()).andReturn(1L).anyTimes(); EasyMock.expect(streamThreadTwo.getId()).andReturn(2L).anyTimes(); EasyMock.expect(streamThreadOne.getCacheSize()).andReturn(10485760L).anyTimes(); @@ -598,6 +602,27 @@ public class KafkaStreamsTest { streams.start(); final int oldSize = streams.threads.size(); waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, 15L, "wait until running"); + EasyMock.reset(streamThreadOne, streamThreadTwo); + EasyMock.expect(streamThreadOne.isRunning()).andStubReturn(true); + EasyMock.expect(streamThreadTwo.isRunning()).andStubReturn(true); + EasyMock.expect(streamThreadOne.state()).andStubReturn(StreamThread.State.RUNNING); + EasyMock.expect(streamThreadTwo.state()).andStubReturn(StreamThread.State.RUNNING); + EasyMock.expect(streamThreadOne.getName()).andStubReturn("processId-StreamThread-1"); + EasyMock.expect(streamThreadTwo.getName()).andStubReturn("processId-StreamThread-2"); + EasyMock.expect(streamThreadTwo.getId()).andStubReturn(2L); + EasyMock.expect(streamThreadOne.getCacheSize()).andReturn(10485760L).anyTimes(); + EasyMock.expect(streamThreadOne.getMaxBufferSize()).andReturn(536870912L).anyTimes(); + EasyMock.expect(streamThreadTwo.getCacheSize()).andReturn(10485760L).anyTimes(); + EasyMock.expect(streamThreadTwo.getMaxBufferSize()).andReturn(536870912L).anyTimes(); + streamThreadTwo.setStateListener(EasyMock.anyObject()); + streamThreadTwo.start(); + + streamThreadOne.resizeCacheAndBufferMemory(5 * 1024 * 1024L, 256 * 1024 * 1024L); + streamThreadTwo.resizeCacheAndBufferMemory(5 * 1024 * 1024L, 256 * 1024 * 1024L); + streamThreadOne.shutdown(); + streamThreadTwo.shutdown(); + EasyMock.expect(state.isRunningOrRebalancing()).andStubReturn(true); + EasyMock.replay(streamThreadOne, streamThreadTwo, state); assertThat(streams.addStreamThread(), equalTo(Optional.of("processId-StreamThread-" + 2))); assertThat(streams.threads.size(), equalTo(oldSize + 1)); } diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index 243a4741677..4deed5ec903 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -63,6 +63,7 @@ import static org.apache.kafka.streams.StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFI import static org.apache.kafka.streams.StreamsConfig.adminClientPrefix; import static org.apache.kafka.streams.StreamsConfig.consumerPrefix; import static org.apache.kafka.streams.StreamsConfig.producerPrefix; +import static org.apache.kafka.streams.internals.StreamsConfigUtils.getTotalCacheSize; import static org.apache.kafka.test.StreamsTestUtils.getStreamsConfig; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.hasItem; @@ -1258,7 +1259,7 @@ public class StreamsConfigTest { props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 100); props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10); final StreamsConfig config = new StreamsConfig(props); - assertEquals(config.getTotalCacheSize(), 100); + assertEquals(getTotalCacheSize(config), 100); } @Test @@ -1266,20 +1267,20 @@ public class StreamsConfigTest { public void shouldUseCacheMaxBytesBufferingConfigWhenOnlyDeprecatedConfigIsSet() { props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10); final StreamsConfig config = new StreamsConfig(props); - assertEquals(config.getTotalCacheSize(), 10); + assertEquals(getTotalCacheSize(config), 10); } @Test public void shouldUseStateStoreCacheMaxBytesWhenNewConfigIsSet() { props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 10); final StreamsConfig config = new StreamsConfig(props); - assertEquals(config.getTotalCacheSize(), 10); + assertEquals(getTotalCacheSize(config), 10); } @Test public void shouldUseDefaultStateStoreCacheMaxBytesConfigWhenNoConfigIsSet() { final StreamsConfig config = new StreamsConfig(props); - assertEquals(config.getTotalCacheSize(), 10 * 1024 * 1024); + assertEquals(getTotalCacheSize(config), 10 * 1024 * 1024); } static class MisconfiguredSerde implements Serde { diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java index 1119818b927..e5d0b9348a5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java @@ -421,12 +421,14 @@ public class AdjustStreamThreadCountTest { } @Test - public void shouldResizeCacheAfterThreadReplacement() throws InterruptedException { + public void shouldResizeCacheAndInputBufferAfterThreadReplacement() throws InterruptedException { final long totalCacheBytes = 10L; + final long maxBufferBytes = 100L; final Properties props = new Properties(); props.putAll(properties); props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2); props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, totalCacheBytes); + props.put(StreamsConfig.INPUT_BUFFER_MAX_BYTES_CONFIG, maxBufferBytes); final AtomicBoolean injectError = new AtomicBoolean(false); @@ -466,64 +468,9 @@ public class AdjustStreamThreadCountTest { waitForTransitionFromRebalancingToRunning(); for (final String log : appender.getMessages()) { - // after we replace the thread there should be two remaining threads with 5 bytes each - if (log.endsWith("Adding StreamThread-3, there are now 3 threads with cache size/max buffer size values as 3/178956970 per thread.")) { - return; - } - } - } - } - fail(); - } - - @Test - public void shouldResizeMaxBufferAfterThreadReplacement() throws InterruptedException { - final long totalCacheBytes = 10L; - final Properties props = new Properties(); - props.putAll(properties); - props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2); - props.put(StreamsConfig.INPUT_BUFFER_MAX_BYTES_CONFIG, totalCacheBytes); - - final AtomicBoolean injectError = new AtomicBoolean(false); - - final StreamsBuilder builder = new StreamsBuilder(); - final KStream stream = builder.stream(inputTopic); - stream.transform(() -> new Transformer>() { - @Override - public void init(final ProcessorContext context) { - context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, timestamp -> { - if (Thread.currentThread().getName().endsWith("StreamThread-1") && injectError.get()) { - injectError.set(false); - throw new RuntimeException("BOOM"); - } - }); - } - - @Override - public KeyValue transform(final String key, final String value) { - return new KeyValue<>(key, value); - } - - @Override - public void close() { - } - }); - - try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), props)) { - addStreamStateChangeListener(kafkaStreams); - kafkaStreams.setUncaughtExceptionHandler(e -> StreamThreadExceptionResponse.REPLACE_THREAD); - startStreamsAndWaitForRunning(kafkaStreams); - - stateTransitionHistory.clear(); - try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister()) { - injectError.set(true); - waitForCondition(() -> !injectError.get(), "StreamThread did not hit and reset the injected error"); - - waitForTransitionFromRebalancingToRunning(); - - for (final String log : appender.getMessages()) { - // after we replace the thread there should be two remaining threads with 5 bytes each - if (log.endsWith("Adding StreamThread-3, there are now 3 threads with cache size/max buffer size values as 3495253/3 per thread.")) { + // after we replace the thread there should be two remaining threads with 5 bytes each for + // the cache and 50 for the input buffer + if (log.endsWith("Adding StreamThread-3, there are now 2 threads with cache size/max buffer size values as 5/50 per thread.")) { return; } } diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java index d438a44f37b..49ebf732e96 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java @@ -117,6 +117,7 @@ import java.util.regex.Pattern; import static org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE; import static org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_ALPHA; import static org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2; +import static org.apache.kafka.streams.internals.StreamsConfigUtils.getTotalCacheSize; import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; /** @@ -330,7 +331,7 @@ public class TopologyTestDriver implements Closeable { final ThreadCache cache = new ThreadCache( logContext, - Math.max(0, streamsConfig.getTotalCacheSize()), + Math.max(0, getTotalCacheSize(streamsConfig)), streamsMetrics );