diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index cd82efe1421..0824e0b60b7 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -188,7 +188,7 @@
files="StreamsMetricsImpl.java"/>
+ files="(GlobalStateManagerImpl|KafkaStreams|KStreamImplJoin|StreamsPartitionAssignor|StreamThread|TaskManager|TopologyConfig).java"/>
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 7798e58204c..9d9d57f8ab9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -109,6 +109,7 @@ import static org.apache.kafka.streams.StreamsConfig.METRICS_RECORDING_LEVEL_CON
import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
import static org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
+import static org.apache.kafka.streams.internals.StreamsConfigUtils.getTotalCacheSize;
import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchEndOffsets;
/**
@@ -939,7 +940,7 @@ public class KafkaStreams implements AutoCloseable {
streamsUncaughtExceptionHandler = this::defaultStreamsUncaughtExceptionHandler;
delegatingStateRestoreListener = new DelegatingStateRestoreListener();
- totalCacheSize = applicationConfigs.getTotalCacheSize();
+ totalCacheSize = getTotalCacheSize(applicationConfigs);
inputBufferMaxBytes = applicationConfigs.getLong(StreamsConfig.INPUT_BUFFER_MAX_BYTES_CONFIG);
final int numStreamThreads = topologyMetadata.getNumStreamThreads(applicationConfigs);
@@ -1047,9 +1048,9 @@ public class KafkaStreams implements AutoCloseable {
// and then resize them later
streamThread = createAndAddStreamThread(0L, 0L, threadIdx);
final int numLiveThreads = getNumLiveStreamThreads();
- resizeThreadCacheAndBufferMemory(numLiveThreads + 1);
+ resizeThreadCacheAndBufferMemory(numLiveThreads);
log.info("Adding StreamThread-{}, there are now {} threads with cache size/max buffer size values as {} per thread.",
- threadIdx, numLiveThreads + 1, getThreadCacheAndBufferMemoryString());
+ threadIdx, numLiveThreads, getThreadCacheAndBufferMemoryString());
}
synchronized (stateLock) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index e9e0cca05a8..740f7af5da2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -1536,29 +1536,6 @@ public class StreamsConfig extends AbstractConfig {
return props;
}
- public long getTotalCacheSize() {
- // both deprecated and new config set. Warn and use the new one.
- if (originals().containsKey(CACHE_MAX_BYTES_BUFFERING_CONFIG) && originals().containsKey(STATESTORE_CACHE_MAX_BYTES_CONFIG)) {
- if (!getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG).equals(getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG))) {
- log.warn("Both deprecated config {} and the new config {} are set, hence {} is ignored and {} is used instead.",
- CACHE_MAX_BYTES_BUFFERING_CONFIG,
- STATESTORE_CACHE_MAX_BYTES_CONFIG,
- CACHE_MAX_BYTES_BUFFERING_CONFIG,
- STATESTORE_CACHE_MAX_BYTES_CONFIG);
- }
- return getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG);
- } else if (originals().containsKey(CACHE_MAX_BYTES_BUFFERING_CONFIG)) {
- // only deprecated config set.
- log.warn("Deprecated config {} is set, and will be used; we suggest setting the new config {} instead as deprecated {} would be removed in the future.",
- CACHE_MAX_BYTES_BUFFERING_CONFIG,
- STATESTORE_CACHE_MAX_BYTES_CONFIG,
- CACHE_MAX_BYTES_BUFFERING_CONFIG);
- return getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG);
- }
- // only new or no config set. Use default or user specified value.
- return getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG);
- }
-
/**
* Get the configs for the {@link Admin admin client}.
* @param clientId clientId
diff --git a/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java b/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java
index de8aec94ce4..da9a8c89bfa 100644
--- a/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java
@@ -49,6 +49,7 @@ import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DSL_STORE_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DSL_STORE_DOC;
import static org.apache.kafka.streams.StreamsConfig.ROCKS_DB;
import static org.apache.kafka.streams.StreamsConfig.IN_MEMORY;
+import static org.apache.kafka.streams.internals.StreamsConfigUtils.getTotalCacheSize;
/**
* Streams configs that apply at the topology level. The values in the {@link StreamsConfig} parameter of the
@@ -136,37 +137,54 @@ public class TopologyConfig extends AbstractConfig {
maxBufferedSize = getInt(BUFFERED_RECORDS_PER_PARTITION_CONFIG);
log.info("Topology {} is overriding {} to {}", topologyName, BUFFERED_RECORDS_PER_PARTITION_CONFIG, maxBufferedSize);
} else {
+ // If the user hasn't explicitly set the buffered.records.per.partition config, then leave it unbounded
+ // and rely on the input.buffer.max.bytes instead to keep the memory usage under control
maxBufferedSize = globalAppConfigs.originals().containsKey(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG)
? globalAppConfigs.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG) : -1;
}
- if (isTopologyOverride(STATESTORE_CACHE_MAX_BYTES_CONFIG, topologyOverrides) && isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides)) {
- cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG);
- log.info("Topology {} is using both deprecated config {} and new config {}, hence {} is ignored and the new config {} (value {}) is used",
- topologyName,
- CACHE_MAX_BYTES_BUFFERING_CONFIG,
- STATESTORE_CACHE_MAX_BYTES_CONFIG,
- CACHE_MAX_BYTES_BUFFERING_CONFIG,
- STATESTORE_CACHE_MAX_BYTES_CONFIG,
- cacheSize);
- } else if (isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides)) {
- cacheSize = getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG);
- log.info("Topology {} is using only deprecated config {}, and will be used to set cache size to {}; " +
- "we suggest setting the new config {} instead as deprecated {} would be removed in the future.",
- topologyName,
- CACHE_MAX_BYTES_BUFFERING_CONFIG,
- cacheSize,
- STATESTORE_CACHE_MAX_BYTES_CONFIG,
- CACHE_MAX_BYTES_BUFFERING_CONFIG);
- } else if (isTopologyOverride(STATESTORE_CACHE_MAX_BYTES_CONFIG, topologyOverrides)) {
- cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG);
+ final boolean stateStoreCacheMaxBytesOverridden = isTopologyOverride(STATESTORE_CACHE_MAX_BYTES_CONFIG, topologyOverrides);
+ final boolean cacheMaxBytesBufferingOverridden = isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides);
+
+ if (!stateStoreCacheMaxBytesOverridden && !cacheMaxBytesBufferingOverridden) {
+ cacheSize = getTotalCacheSize(globalAppConfigs);
} else {
- cacheSize = globalAppConfigs.getTotalCacheSize();
+ if (stateStoreCacheMaxBytesOverridden && cacheMaxBytesBufferingOverridden) {
+ cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG);
+ log.info("Topology {} is using both deprecated config {} and new config {}, hence {} is ignored and the new config {} (value {}) is used",
+ topologyName,
+ CACHE_MAX_BYTES_BUFFERING_CONFIG,
+ STATESTORE_CACHE_MAX_BYTES_CONFIG,
+ CACHE_MAX_BYTES_BUFFERING_CONFIG,
+ STATESTORE_CACHE_MAX_BYTES_CONFIG,
+ cacheSize);
+ } else if (cacheMaxBytesBufferingOverridden) {
+ cacheSize = getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG);
+ log.info("Topology {} is using only deprecated config {}, and will be used to set cache size to {}; " +
+ "we suggest setting the new config {} instead as deprecated {} would be removed in the future.",
+ topologyName,
+ CACHE_MAX_BYTES_BUFFERING_CONFIG,
+ cacheSize,
+ STATESTORE_CACHE_MAX_BYTES_CONFIG,
+ CACHE_MAX_BYTES_BUFFERING_CONFIG);
+ } else {
+ cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG);
+ }
+
+ if (cacheSize != 0) {
+ log.warn("Topology {} is overriding cache size to {} but this will not have any effect as the "
+ + "topology-level cache size config only controls whether record buffering is enabled "
+ + "or disabled, thus the only valid override value is 0",
+ topologyName, cacheSize);
+ } else {
+ log.info("Topology {} is overriding cache size to {}, record buffering will be disabled",
+ topologyName, cacheSize);
+ }
}
if (isTopologyOverride(MAX_TASK_IDLE_MS_CONFIG, topologyOverrides)) {
maxTaskIdleMs = getLong(MAX_TASK_IDLE_MS_CONFIG);
- log.info("Topology {} is overridding {} to {}", topologyName, MAX_TASK_IDLE_MS_CONFIG, maxTaskIdleMs);
+ log.info("Topology {} is overriding {} to {}", topologyName, MAX_TASK_IDLE_MS_CONFIG, maxTaskIdleMs);
} else {
maxTaskIdleMs = globalAppConfigs.getLong(MAX_TASK_IDLE_MS_CONFIG);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/internals/StreamsConfigUtils.java b/streams/src/main/java/org/apache/kafka/streams/internals/StreamsConfigUtils.java
index e271a42ab89..6f169826f06 100644
--- a/streams/src/main/java/org/apache/kafka/streams/internals/StreamsConfigUtils.java
+++ b/streams/src/main/java/org/apache/kafka/streams/internals/StreamsConfigUtils.java
@@ -17,9 +17,16 @@
package org.apache.kafka.streams.internals;
import org.apache.kafka.streams.StreamsConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.kafka.streams.StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG;
+import static org.apache.kafka.streams.StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG;
public class StreamsConfigUtils {
+ private static final Logger LOG = LoggerFactory.getLogger(StreamsConfigUtils.class);
+
public enum ProcessingMode {
AT_LEAST_ONCE("AT_LEAST_ONCE"),
@@ -66,4 +73,28 @@ public class StreamsConfigUtils {
return processingMode == ProcessingMode.EXACTLY_ONCE_ALPHA ||
processingMode == ProcessingMode.EXACTLY_ONCE_V2;
}
+
+ @SuppressWarnings("deprecation")
+ public static long getTotalCacheSize(final StreamsConfig config) {
+ // both deprecated and new config set. Warn and use the new one.
+ if (config.originals().containsKey(CACHE_MAX_BYTES_BUFFERING_CONFIG) && config.originals().containsKey(STATESTORE_CACHE_MAX_BYTES_CONFIG)) {
+ if (!config.getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG).equals(config.getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG))) {
+ LOG.warn("Both deprecated config {} and the new config {} are set, hence {} is ignored and {} is used instead.",
+ CACHE_MAX_BYTES_BUFFERING_CONFIG,
+ STATESTORE_CACHE_MAX_BYTES_CONFIG,
+ CACHE_MAX_BYTES_BUFFERING_CONFIG,
+ STATESTORE_CACHE_MAX_BYTES_CONFIG);
+ }
+ return config.getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG);
+ } else if (config.originals().containsKey(CACHE_MAX_BYTES_BUFFERING_CONFIG)) {
+ // only deprecated config set.
+ LOG.warn("Deprecated config {} is set, and will be used; we suggest setting the new config {} instead as deprecated {} would be removed in the future.",
+ CACHE_MAX_BYTES_BUFFERING_CONFIG,
+ STATESTORE_CACHE_MAX_BYTES_CONFIG,
+ CACHE_MAX_BYTES_BUFFERING_CONFIG);
+ return config.getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG);
+ }
+ // only new or no config set. Use default or user specified value.
+ return config.getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG);
+ }
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index b5620d5c13e..eabb8edc114 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -143,6 +143,8 @@ public class KafkaStreamsTest {
private GlobalStreamThread globalStreamThread;
@Mock
private Metrics metrics;
+ @Mock
+ private State state;
private StateListenerStub streamsStateListener;
private Capture> metricsReportersCapture;
@@ -216,6 +218,7 @@ public class KafkaStreamsTest {
ClientMetrics.addStateMetric(anyObject(StreamsMetricsImpl.class), anyObject());
ClientMetrics.addNumAliveStreamThreadMetric(anyObject(StreamsMetricsImpl.class), anyObject());
+
// setup stream threads
PowerMock.mockStatic(StreamThread.class);
EasyMock.expect(StreamThread.create(
@@ -240,6 +243,7 @@ public class KafkaStreamsTest {
PowerMock.mockStatic(StreamsConfigUtils.class);
EasyMock.expect(StreamsConfigUtils.processingMode(anyObject(StreamsConfig.class))).andReturn(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE).anyTimes();
EasyMock.expect(StreamsConfigUtils.eosEnabled(anyObject(StreamsConfig.class))).andReturn(false).anyTimes();
+ EasyMock.expect(StreamsConfigUtils.getTotalCacheSize(anyObject(StreamsConfig.class))).andReturn(10 * 1024 * 1024L).anyTimes();
EasyMock.expect(streamThreadOne.getId()).andReturn(1L).anyTimes();
EasyMock.expect(streamThreadTwo.getId()).andReturn(2L).anyTimes();
EasyMock.expect(streamThreadOne.getCacheSize()).andReturn(10485760L).anyTimes();
@@ -598,6 +602,27 @@ public class KafkaStreamsTest {
streams.start();
final int oldSize = streams.threads.size();
waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, 15L, "wait until running");
+ EasyMock.reset(streamThreadOne, streamThreadTwo);
+ EasyMock.expect(streamThreadOne.isRunning()).andStubReturn(true);
+ EasyMock.expect(streamThreadTwo.isRunning()).andStubReturn(true);
+ EasyMock.expect(streamThreadOne.state()).andStubReturn(StreamThread.State.RUNNING);
+ EasyMock.expect(streamThreadTwo.state()).andStubReturn(StreamThread.State.RUNNING);
+ EasyMock.expect(streamThreadOne.getName()).andStubReturn("processId-StreamThread-1");
+ EasyMock.expect(streamThreadTwo.getName()).andStubReturn("processId-StreamThread-2");
+ EasyMock.expect(streamThreadTwo.getId()).andStubReturn(2L);
+ EasyMock.expect(streamThreadOne.getCacheSize()).andReturn(10485760L).anyTimes();
+ EasyMock.expect(streamThreadOne.getMaxBufferSize()).andReturn(536870912L).anyTimes();
+ EasyMock.expect(streamThreadTwo.getCacheSize()).andReturn(10485760L).anyTimes();
+ EasyMock.expect(streamThreadTwo.getMaxBufferSize()).andReturn(536870912L).anyTimes();
+ streamThreadTwo.setStateListener(EasyMock.anyObject());
+ streamThreadTwo.start();
+
+ streamThreadOne.resizeCacheAndBufferMemory(5 * 1024 * 1024L, 256 * 1024 * 1024L);
+ streamThreadTwo.resizeCacheAndBufferMemory(5 * 1024 * 1024L, 256 * 1024 * 1024L);
+ streamThreadOne.shutdown();
+ streamThreadTwo.shutdown();
+ EasyMock.expect(state.isRunningOrRebalancing()).andStubReturn(true);
+ EasyMock.replay(streamThreadOne, streamThreadTwo, state);
assertThat(streams.addStreamThread(), equalTo(Optional.of("processId-StreamThread-" + 2)));
assertThat(streams.threads.size(), equalTo(oldSize + 1));
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 243a4741677..4deed5ec903 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -63,6 +63,7 @@ import static org.apache.kafka.streams.StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFI
import static org.apache.kafka.streams.StreamsConfig.adminClientPrefix;
import static org.apache.kafka.streams.StreamsConfig.consumerPrefix;
import static org.apache.kafka.streams.StreamsConfig.producerPrefix;
+import static org.apache.kafka.streams.internals.StreamsConfigUtils.getTotalCacheSize;
import static org.apache.kafka.test.StreamsTestUtils.getStreamsConfig;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.hasItem;
@@ -1258,7 +1259,7 @@ public class StreamsConfigTest {
props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 100);
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10);
final StreamsConfig config = new StreamsConfig(props);
- assertEquals(config.getTotalCacheSize(), 100);
+ assertEquals(getTotalCacheSize(config), 100);
}
@Test
@@ -1266,20 +1267,20 @@ public class StreamsConfigTest {
public void shouldUseCacheMaxBytesBufferingConfigWhenOnlyDeprecatedConfigIsSet() {
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10);
final StreamsConfig config = new StreamsConfig(props);
- assertEquals(config.getTotalCacheSize(), 10);
+ assertEquals(getTotalCacheSize(config), 10);
}
@Test
public void shouldUseStateStoreCacheMaxBytesWhenNewConfigIsSet() {
props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 10);
final StreamsConfig config = new StreamsConfig(props);
- assertEquals(config.getTotalCacheSize(), 10);
+ assertEquals(getTotalCacheSize(config), 10);
}
@Test
public void shouldUseDefaultStateStoreCacheMaxBytesConfigWhenNoConfigIsSet() {
final StreamsConfig config = new StreamsConfig(props);
- assertEquals(config.getTotalCacheSize(), 10 * 1024 * 1024);
+ assertEquals(getTotalCacheSize(config), 10 * 1024 * 1024);
}
static class MisconfiguredSerde implements Serde