diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 7c32223961b..1d60fde34d2 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -196,7 +196,7 @@ files="StreamsMetricsImpl.java"/> + files="(KafkaStreams|StreamsPartitionAssignor|StreamThread|TaskManager|GlobalStateManagerImpl|KStreamImplJoin|TopologyConfig).java"/> diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java index a5086de8c66..be54bafca2b 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java @@ -179,7 +179,7 @@ public class PageViewTypedDemo { props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, JsonTimestampExtractor.class); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JSONSerde.class); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JSONSerde.class); - props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java index cdb36394a98..8fc874488ab 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java @@ -60,7 +60,7 @@ public class PageViewUntypedDemo { props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pageview-untyped"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, JsonTimestampExtractor.class); - props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java index 3dc8eda25f1..6e40fa03066 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java @@ -77,7 +77,7 @@ public class TemperatureDemo { props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); final Duration duration24Hours = Duration.ofHours(24); diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java index 4ca5d73f1d8..d290c660bbf 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java @@ -61,7 +61,7 @@ public final class WordCountDemo { } props.putIfAbsent(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount"); props.putIfAbsent(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); - props.putIfAbsent(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + props.putIfAbsent(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); props.putIfAbsent(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.putIfAbsent(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java index 014923fb64c..6204c422bc0 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java @@ -108,7 +108,7 @@ public final class WordCountProcessorDemo { props.putIfAbsent(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount-processor"); props.putIfAbsent(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); - props.putIfAbsent(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + props.putIfAbsent(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); props.putIfAbsent(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.putIfAbsent(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountTransformerDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountTransformerDemo.java index 617e491e111..90f4764be7f 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountTransformerDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountTransformerDemo.java @@ -131,7 +131,7 @@ public final class WordCountTransformerDemo { } props.putIfAbsent(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount-transformer"); props.putIfAbsent(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); - props.putIfAbsent(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + props.putIfAbsent(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); props.putIfAbsent(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.putIfAbsent(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index ac24e709fbb..1dc942b8f26 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -106,6 +106,7 @@ import java.util.stream.Collectors; import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix; import static org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration; +import static org.apache.kafka.streams.internals.StreamsConfigUtils.getTotalCacheSize; import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchEndOffsets; import static org.apache.kafka.streams.processor.internals.TopologyMetadata.UNNAMED_TOPOLOGY; @@ -925,7 +926,7 @@ public class KafkaStreams implements AutoCloseable { streamsUncaughtExceptionHandler = this::defaultStreamsUncaughtExceptionHandler; delegatingStateRestoreListener = new DelegatingStateRestoreListener(); - totalCacheSize = applicationConfigs.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG); + totalCacheSize = getTotalCacheSize(applicationConfigs); final int numStreamThreads = topologyMetadata.getNumStreamThreads(applicationConfigs); final long cacheSizePerThread = getCacheSizePerThread(numStreamThreads); @@ -1009,7 +1010,7 @@ public class KafkaStreams implements AutoCloseable { * Since the number of stream threads increases, the sizes of the caches in the new stream thread * and the existing stream threads are adapted so that the sum of the cache sizes over all stream * threads does not exceed the total cache size specified in configuration - * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}. + * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG}. *

* Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING. * @@ -1058,7 +1059,7 @@ public class KafkaStreams implements AutoCloseable { *

* Since the number of stream threads decreases, the sizes of the caches in the remaining stream * threads are adapted so that the sum of the cache sizes over all stream threads equals the total - * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}. + * cache size specified in configuration {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG}. * * @return name of the removed stream thread or empty if a stream thread could not be removed because * no stream threads are alive @@ -1075,7 +1076,7 @@ public class KafkaStreams implements AutoCloseable { *

* Since the number of stream threads decreases, the sizes of the caches in the remaining stream * threads are adapted so that the sum of the cache sizes over all stream threads equals the total - * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}. + * cache size specified in configuration {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG}. * * @param timeout The length of time to wait for the thread to shutdown * @throws org.apache.kafka.common.errors.TimeoutException if the thread does not stop in time diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 7d68a188f3f..89d90c15af3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -468,9 +468,15 @@ public class StreamsConfig extends AbstractConfig { /** {@code cache.max.bytes.buffering} */ @SuppressWarnings("WeakerAccess") + @Deprecated public static final String CACHE_MAX_BYTES_BUFFERING_CONFIG = "cache.max.bytes.buffering"; public static final String CACHE_MAX_BYTES_BUFFERING_DOC = "Maximum number of memory bytes to be used for buffering across all threads"; + /** {@code statestore.cache.max.bytes} */ + @SuppressWarnings("WeakerAccess") + public static final String STATESTORE_CACHE_MAX_BYTES_CONFIG = "statestore.cache.max.bytes"; + public static final String STATESTORE_CACHE_MAX_BYTES_DOC = "Maximum number of memory bytes to be used for statestore cache across all threads"; + /** {@code client.id} */ @SuppressWarnings("WeakerAccess") public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG; @@ -784,6 +790,12 @@ public class StreamsConfig extends AbstractConfig { atLeast(0), Importance.MEDIUM, CACHE_MAX_BYTES_BUFFERING_DOC) + .define(STATESTORE_CACHE_MAX_BYTES_CONFIG, + Type.LONG, + 10 * 1024 * 1024L, + atLeast(0), + Importance.MEDIUM, + STATESTORE_CACHE_MAX_BYTES_DOC) .define(CLIENT_ID_CONFIG, Type.STRING, "", diff --git a/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java b/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java index c4bc85656eb..8979d704cfb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java @@ -36,6 +36,8 @@ import static org.apache.kafka.streams.StreamsConfig.BUFFERED_RECORDS_PER_PARTIT import static org.apache.kafka.streams.StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_DOC; import static org.apache.kafka.streams.StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG; import static org.apache.kafka.streams.StreamsConfig.CACHE_MAX_BYTES_BUFFERING_DOC; +import static org.apache.kafka.streams.StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.STATESTORE_CACHE_MAX_BYTES_DOC; import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG; import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC; import static org.apache.kafka.streams.StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG; @@ -48,6 +50,7 @@ import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DSL_STORE_CONFIG; import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DSL_STORE_DOC; import static org.apache.kafka.streams.StreamsConfig.ROCKS_DB; import static org.apache.kafka.streams.StreamsConfig.IN_MEMORY; +import static org.apache.kafka.streams.internals.StreamsConfigUtils.getTotalCacheSize; /** * Streams configs that apply at the topology level. The values in the {@link StreamsConfig} parameter of the @@ -55,6 +58,7 @@ import static org.apache.kafka.streams.StreamsConfig.IN_MEMORY; * determine the defaults, which can then be overridden for specific topologies by passing them in when creating the * topology builders via the {@link org.apache.kafka.streams.StreamsBuilder()} method. */ +@SuppressWarnings("deprecation") public class TopologyConfig extends AbstractConfig { private static final ConfigDef CONFIG; static { @@ -65,10 +69,15 @@ public class TopologyConfig extends AbstractConfig { Importance.LOW, BUFFERED_RECORDS_PER_PARTITION_DOC) .define(CACHE_MAX_BYTES_BUFFERING_CONFIG, + Type.LONG, + null, + Importance.MEDIUM, + CACHE_MAX_BYTES_BUFFERING_DOC) + .define(STATESTORE_CACHE_MAX_BYTES_CONFIG, Type.LONG, null, Importance.MEDIUM, - CACHE_MAX_BYTES_BUFFERING_DOC) + STATESTORE_CACHE_MAX_BYTES_DOC) .define(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, Type.CLASS, null, @@ -132,11 +141,43 @@ public class TopologyConfig extends AbstractConfig { maxBufferedSize = globalAppConfigs.getInt(BUFFERED_RECORDS_PER_PARTITION_CONFIG); } - if (isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides)) { - cacheSize = getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG); - log.info("Topology {} is overriding {} to {}", topologyName, CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSize); + final boolean stateStoreCacheMaxBytesOverridden = isTopologyOverride(STATESTORE_CACHE_MAX_BYTES_CONFIG, topologyOverrides); + final boolean cacheMaxBytesBufferingOverridden = isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides); + + if (!stateStoreCacheMaxBytesOverridden && !cacheMaxBytesBufferingOverridden) { + cacheSize = getTotalCacheSize(globalAppConfigs); } else { - cacheSize = globalAppConfigs.getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG); + if (stateStoreCacheMaxBytesOverridden && cacheMaxBytesBufferingOverridden) { + cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG); + log.info("Topology {} is using both deprecated config {} and new config {}, hence {} is ignored and the new config {} (value {}) is used", + topologyName, + CACHE_MAX_BYTES_BUFFERING_CONFIG, + STATESTORE_CACHE_MAX_BYTES_CONFIG, + CACHE_MAX_BYTES_BUFFERING_CONFIG, + STATESTORE_CACHE_MAX_BYTES_CONFIG, + cacheSize); + } else if (cacheMaxBytesBufferingOverridden) { + cacheSize = getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG); + log.info("Topology {} is using only deprecated config {}, and will be used to set cache size to {}; " + + "we suggest setting the new config {} instead as deprecated {} would be removed in the future.", + topologyName, + CACHE_MAX_BYTES_BUFFERING_CONFIG, + cacheSize, + STATESTORE_CACHE_MAX_BYTES_CONFIG, + CACHE_MAX_BYTES_BUFFERING_CONFIG); + } else { + cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG); + } + + if (cacheSize != 0) { + log.warn("Topology {} is overriding cache size to {} but this will not have any effect as the " + + "topology-level cache size config only controls whether record buffering is enabled " + + "or disabled, thus the only valid override value is 0", + topologyName, cacheSize); + } else { + log.info("Topology {} is overriding cache size to {}, record buffering will be disabled", + topologyName, cacheSize); + } } if (isTopologyOverride(MAX_TASK_IDLE_MS_CONFIG, topologyOverrides)) { diff --git a/streams/src/main/java/org/apache/kafka/streams/internals/StreamsConfigUtils.java b/streams/src/main/java/org/apache/kafka/streams/internals/StreamsConfigUtils.java index e271a42ab89..708e51d9d28 100644 --- a/streams/src/main/java/org/apache/kafka/streams/internals/StreamsConfigUtils.java +++ b/streams/src/main/java/org/apache/kafka/streams/internals/StreamsConfigUtils.java @@ -17,9 +17,16 @@ package org.apache.kafka.streams.internals; import org.apache.kafka.streams.StreamsConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.kafka.streams.StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG; public class StreamsConfigUtils { + private static final Logger LOG = LoggerFactory.getLogger(StreamsConfigUtils.class); + public enum ProcessingMode { AT_LEAST_ONCE("AT_LEAST_ONCE"), @@ -66,4 +73,28 @@ public class StreamsConfigUtils { return processingMode == ProcessingMode.EXACTLY_ONCE_ALPHA || processingMode == ProcessingMode.EXACTLY_ONCE_V2; } + + @SuppressWarnings("deprecation") + public static long getTotalCacheSize(final StreamsConfig config) { + // both deprecated and new config set. Warn and use the new one. + if (config.originals().containsKey(CACHE_MAX_BYTES_BUFFERING_CONFIG) && config.originals().containsKey(STATESTORE_CACHE_MAX_BYTES_CONFIG)) { + if (!config.getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG).equals(config.getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG))) { + LOG.warn("Both deprecated config {} and the new config {} are set, hence {} is ignored and {} is used instead.", + CACHE_MAX_BYTES_BUFFERING_CONFIG, + STATESTORE_CACHE_MAX_BYTES_CONFIG, + CACHE_MAX_BYTES_BUFFERING_CONFIG, + STATESTORE_CACHE_MAX_BYTES_CONFIG); + } + return config.getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG); + } else if (config.originals().containsKey(CACHE_MAX_BYTES_BUFFERING_CONFIG)) { + // only deprecated config set. + LOG.warn("Deprecated config {} is set, and will be used; we suggest setting the new config {} instead as deprecated {} would be removed in the future.", + CACHE_MAX_BYTES_BUFFERING_CONFIG, + STATESTORE_CACHE_MAX_BYTES_CONFIG, + CACHE_MAX_BYTES_BUFFERING_CONFIG); + return config.getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG); + } + // only new or no config set. Use default or user specified value. + return config.getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG); + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java index 051396fbc94..b0f1deca1cb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java @@ -78,7 +78,7 @@ public interface CogroupedKStream { * same key. * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for - * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}. *

* To query the local {@link ReadOnlyKeyValueStore} it must be obtained via @@ -128,7 +128,7 @@ public interface CogroupedKStream { * same key. * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for - * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}. *

* To query the local {@link ReadOnlyKeyValueStore} it must be obtained via @@ -179,7 +179,7 @@ public interface CogroupedKStream { * same key. * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for - * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}. *

* To query the local {@link ReadOnlyKeyValueStore} it must be obtained via @@ -232,7 +232,7 @@ public interface CogroupedKStream { * same key. * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for - * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}. *

* To query the local {@link org.apache.kafka.streams.state.ReadOnlyKeyValueStore} it must be obtained via diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java index 072558cf6ed..513d94dae65 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java @@ -53,7 +53,7 @@ public interface KGroupedStream { * the same key. * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for - * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}. *

* For failure and recovery the store (which always will be of type {@link TimestampedKeyValueStore}) will be backed by @@ -81,7 +81,7 @@ public interface KGroupedStream { * the same key. * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for - * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}. *

* For failure and recovery the store (which always will be of type {@link TimestampedKeyValueStore}) will be backed by @@ -112,7 +112,7 @@ public interface KGroupedStream { * the same key. * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for - * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}. *

* To query the local {@link ReadOnlyKeyValueStore} it must be obtained via @@ -158,7 +158,7 @@ public interface KGroupedStream { * the same key. * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for - * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}. *

* To query the local {@link ReadOnlyKeyValueStore} it must be obtained via @@ -211,7 +211,7 @@ public interface KGroupedStream { * the same key. * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for - * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}. * *

@@ -262,7 +262,7 @@ public interface KGroupedStream { * the same key. * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for - * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}. *

* To query the local {@link ReadOnlyKeyValueStore} it must be obtained via @@ -326,7 +326,7 @@ public interface KGroupedStream { * the same key. * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for - * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}. *

* To query the local {@link ReadOnlyKeyValueStore} it must be obtained via @@ -385,7 +385,7 @@ public interface KGroupedStream { * the same key. * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for - * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}. * *

@@ -431,7 +431,7 @@ public interface KGroupedStream { * the same key. * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for - * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}. *

* To query the local {@link ReadOnlyKeyValueStore} it must be obtained via @@ -490,7 +490,7 @@ public interface KGroupedStream { * the same key. * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for - * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}. *

* To query the local {@link ReadOnlyKeyValueStore} it must be obtained via diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java index 06d12e1d4ce..5733aef319c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java @@ -52,7 +52,7 @@ public interface KGroupedTable { * the same key. * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for - * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}. *

* To query the local {@link ReadOnlyKeyValueStore} it must be obtained via @@ -95,7 +95,7 @@ public interface KGroupedTable { * the same key. * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for - * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}. *

* To query the local {@link ReadOnlyKeyValueStore} it must be obtained via @@ -138,7 +138,7 @@ public interface KGroupedTable { * the same key. * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for - * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}. *

* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. @@ -167,7 +167,7 @@ public interface KGroupedTable { * the same key. * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for - * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}. *

* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. @@ -223,7 +223,7 @@ public interface KGroupedTable { * the same key. * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for - * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}. *

* To query the local {@link ReadOnlyKeyValueStore} it must be obtained via @@ -296,7 +296,7 @@ public interface KGroupedTable { * the same key. * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for - * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}. *

* To query the local {@link ReadOnlyKeyValueStore} it must be obtained via @@ -368,7 +368,7 @@ public interface KGroupedTable { * the same key. * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for - * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}. *

* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. @@ -434,7 +434,7 @@ public interface KGroupedTable { * the same key. * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for - * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}. *

* To query the local {@link ReadOnlyKeyValueStore} it must be obtained via @@ -518,7 +518,7 @@ public interface KGroupedTable { * the same key. * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for - * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}. *

* To query the local {@link ReadOnlyKeyValueStore} it must be obtained via @@ -604,7 +604,7 @@ public interface KGroupedTable { * the same key. * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for - * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}. * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. * The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is @@ -674,7 +674,7 @@ public interface KGroupedTable { * the same key. * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for - * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}. * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. * The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedCogroupedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedCogroupedKStream.java index b7e3b07e371..eeeb3e1a036 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedCogroupedKStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedCogroupedKStream.java @@ -77,7 +77,7 @@ public interface SessionWindowedCogroupedKStream { * the same window and key. * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for - * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}. *

* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. @@ -122,7 +122,7 @@ public interface SessionWindowedCogroupedKStream { * the same window and key. * The rate of propagated updates depends on your input data rate, the number of distinct * keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} - * parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}. *

* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. @@ -166,7 +166,7 @@ public interface SessionWindowedCogroupedKStream { * the same window and key if caching is enabled on the {@link Materialized} instance. * When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct keys, the number of * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for - * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}. *

* To query the local {@link SessionStore} it must be obtained via @@ -226,7 +226,7 @@ public interface SessionWindowedCogroupedKStream { * to the same window and key if caching is enabled on the {@link Materialized} instance. * When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct * keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} - * parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}. *

* To query the local {@link SessionStore} it must be obtained via diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java index a9eaddbee48..fe897515a9d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java @@ -65,7 +65,7 @@ public interface SessionWindowedKStream { * the same session and key. * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for - * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}. *

* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. @@ -95,7 +95,7 @@ public interface SessionWindowedKStream { * the same session and key. * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for - * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}. *

* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. @@ -126,7 +126,7 @@ public interface SessionWindowedKStream { * to the same window and key if caching is enabled on the {@link Materialized} instance. * When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct * keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} - * parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}. *

* To query the local {@link SessionStore} it must be obtained via @@ -172,7 +172,7 @@ public interface SessionWindowedKStream { * to the same window and key if caching is enabled on the {@link Materialized} instance. * When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct * keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} - * parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}. *

* To query the local {@link SessionStore} it must be obtained via @@ -233,7 +233,7 @@ public interface SessionWindowedKStream { * the same window and key. * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for - * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}. *

* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. @@ -282,7 +282,7 @@ public interface SessionWindowedKStream { * the same window and key. * The rate of propagated updates depends on your input data rate, the number of distinct * keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} - * parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}. *

* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. @@ -330,7 +330,7 @@ public interface SessionWindowedKStream { * the same window and key if caching is enabled on the {@link Materialized} instance. * When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct keys, the number of * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for - * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}. *

* To query the local {@link SessionStore} it must be obtained via @@ -391,7 +391,7 @@ public interface SessionWindowedKStream { * to the same window and key if caching is enabled on the {@link Materialized} instance. * When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct * keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} - * parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}. *

* To query the local {@link SessionStore} it must be obtained via @@ -459,7 +459,7 @@ public interface SessionWindowedKStream { * the same window and key. * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for - * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}. *

* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. @@ -504,7 +504,7 @@ public interface SessionWindowedKStream { * the same window and key. * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for - * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}. *

* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. @@ -549,7 +549,7 @@ public interface SessionWindowedKStream { * to the same window and key if caching is enabled on the {@link Materialized} instance. * When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct * keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} - * parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}. *

* To query the local {@link SessionStore} it must be obtained via @@ -609,7 +609,7 @@ public interface SessionWindowedKStream { * to the same window and key if caching is enabled on the {@link Materialized} instance. * When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct * keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} - * parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}. *

* To query the local {@link SessionStore} it must be obtained via diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedCogroupedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedCogroupedKStream.java index e4178bc9e3d..a46da057a08 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedCogroupedKStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedCogroupedKStream.java @@ -75,7 +75,7 @@ public interface TimeWindowedCogroupedKStream { * the same window and key. * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for - * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}. *

* For failure and recovery the store (which always will be of type {@link TimestampedWindowStore}) will be backed by @@ -115,7 +115,7 @@ public interface TimeWindowedCogroupedKStream { * the same window and key. * The rate of propagated updates depends on your input data rate, the number of distinct * keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} - * parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}. *

* For failure and recovery the store (which always will be of type {@link TimestampedWindowStore}) will be backed by @@ -156,7 +156,7 @@ public interface TimeWindowedCogroupedKStream { * the same window and key if caching is enabled on the {@link Materialized} instance. * When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct * keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} - * parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}. *

* To query the local {@link ReadOnlyWindowStore} it must be obtained via @@ -213,7 +213,7 @@ public interface TimeWindowedCogroupedKStream { * to the same window and key if caching is enabled on the {@link Materialized} instance. * When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct * keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} - * parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}. *

* To query the local {@link ReadOnlyWindowStore} it must be obtained via diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java index 122f73f7a70..3f36838f202 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java @@ -65,7 +65,7 @@ public interface TimeWindowedKStream { * the same window and key. * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for - * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}. *

* For failure and recovery the store (which always will be of type {@link TimestampedWindowStore}) will be backed by @@ -95,7 +95,7 @@ public interface TimeWindowedKStream { * the same window and key. * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for - * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}. *

* For failure and recovery the store (which always will be of type {@link TimestampedWindowStore}) will be backed by @@ -126,7 +126,7 @@ public interface TimeWindowedKStream { * to the same window and key if caching is enabled on the {@link Materialized} instance. * When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct * keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} - * parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}. *

* To query the local {@link ReadOnlyWindowStore} it must be obtained via @@ -175,7 +175,7 @@ public interface TimeWindowedKStream { * to the same window and key if caching is enabled on the {@link Materialized} instance. * When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct * keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} - * parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval} *

* To query the local {@link ReadOnlyWindowStore} it must be obtained via @@ -236,7 +236,7 @@ public interface TimeWindowedKStream { * the same window and key. * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for - * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}. *

* For failure and recovery the store (which always will be of type {@link TimestampedWindowStore}) will be backed by @@ -281,7 +281,7 @@ public interface TimeWindowedKStream { * the same window and key. * The rate of propagated updates depends on your input data rate, the number of distinct * keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} - * parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}. *

* For failure and recovery the store (which always will be of type {@link TimestampedWindowStore}) will be backed by @@ -326,7 +326,7 @@ public interface TimeWindowedKStream { * the same window and key if caching is enabled on the {@link Materialized} instance. * When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct * keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} - * parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}. *

* To query the local {@link ReadOnlyWindowStore} it must be obtained via @@ -387,7 +387,7 @@ public interface TimeWindowedKStream { * to the same window and key if caching is enabled on the {@link Materialized} instance. * When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct * keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} - * parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval} *

* To query the local {@link ReadOnlyWindowStore} it must be obtained via @@ -457,7 +457,7 @@ public interface TimeWindowedKStream { * the same window and key. * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for - * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}. *

* For failure and recovery the store (which always will be of type {@link TimestampedWindowStore}) will be backed by @@ -502,7 +502,7 @@ public interface TimeWindowedKStream { * the same window and key. * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for - * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}. *

* For failure and recovery the store (which always will be of type {@link TimestampedWindowStore}) will be backed by @@ -547,7 +547,7 @@ public interface TimeWindowedKStream { * to the same window and key if caching is enabled on the {@link Materialized} instance. * When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct * keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} - * parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}. *

* To query the local {@link ReadOnlyWindowStore} it must be obtained via @@ -610,7 +610,7 @@ public interface TimeWindowedKStream { * to the same window and key if caching is enabled on the {@link Materialized} instance. * When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct * keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} - * parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}. *

* To query the local {@link ReadOnlyWindowStore} it must be obtained via diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index 766cd04498e..e27154f0e35 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -246,6 +246,7 @@ public class KafkaStreamsTest { PowerMock.mockStatic(StreamsConfigUtils.class); EasyMock.expect(StreamsConfigUtils.processingMode(anyObject(StreamsConfig.class))).andReturn(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE).anyTimes(); EasyMock.expect(StreamsConfigUtils.eosEnabled(anyObject(StreamsConfig.class))).andReturn(false).anyTimes(); + EasyMock.expect(StreamsConfigUtils.getTotalCacheSize(anyObject(StreamsConfig.class))).andReturn(10 * 1024 * 1024L).anyTimes(); EasyMock.expect(streamThreadOne.getId()).andReturn(1L).anyTimes(); EasyMock.expect(streamThreadTwo.getId()).andReturn(2L).anyTimes(); prepareStreamThread(streamThreadOne, 1, true); diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index 496327635ba..f0c0ce74ccd 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -66,6 +66,7 @@ import static org.apache.kafka.streams.StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFI import static org.apache.kafka.streams.StreamsConfig.adminClientPrefix; import static org.apache.kafka.streams.StreamsConfig.consumerPrefix; import static org.apache.kafka.streams.StreamsConfig.producerPrefix; +import static org.apache.kafka.streams.internals.StreamsConfigUtils.getTotalCacheSize; import static org.apache.kafka.test.StreamsTestUtils.getStreamsConfig; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.hasItem; @@ -1256,6 +1257,36 @@ public class StreamsConfigTest { ); } + @Test + @SuppressWarnings("deprecation") + public void shouldUseStateStoreCacheMaxBytesWhenBothOldAndNewConfigsAreSet() { + props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 100); + props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10); + final StreamsConfig config = new StreamsConfig(props); + assertEquals(getTotalCacheSize(config), 100); + } + + @Test + @SuppressWarnings("deprecation") + public void shouldUseCacheMaxBytesBufferingConfigWhenOnlyDeprecatedConfigIsSet() { + props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10); + final StreamsConfig config = new StreamsConfig(props); + assertEquals(getTotalCacheSize(config), 10); + } + + @Test + public void shouldUseStateStoreCacheMaxBytesWhenNewConfigIsSet() { + props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 10); + final StreamsConfig config = new StreamsConfig(props); + assertEquals(getTotalCacheSize(config), 10); + } + + @Test + public void shouldUseDefaultStateStoreCacheMaxBytesConfigWhenNoConfigIsSet() { + final StreamsConfig config = new StreamsConfig(props); + assertEquals(getTotalCacheSize(config), 10 * 1024 * 1024); + } + @Test public void testInvalidSecurityProtocol() { props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "abc"); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java index fdc8525bd42..44ca7a186ea 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java @@ -134,7 +134,7 @@ public abstract class AbstractJoinIntegrationTest { void prepareEnvironment() throws InterruptedException { if (!cacheEnabled) { - STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + STREAMS_CONFIG.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); } STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot().getPath()); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java index fa988187dda..9a923e318ae 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java @@ -151,7 +151,7 @@ public abstract class AbstractResetIntegrationTest { streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot().getPath()); streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass()); streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); - streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + streamsConfig.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); streamsConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100); streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java index b203b46b376..3e109ad1926 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java @@ -371,7 +371,7 @@ public class AdjustStreamThreadCountTest { final Properties props = new Properties(); props.putAll(properties); props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2); - props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, totalCacheBytes); + props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, totalCacheBytes); try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), props)) { addStreamStateChangeListener(kafkaStreams); @@ -398,7 +398,7 @@ public class AdjustStreamThreadCountTest { final Properties props = new Properties(); props.putAll(properties); props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2); - props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, totalCacheBytes); + props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, totalCacheBytes); final AtomicBoolean injectError = new AtomicBoolean(false); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java index c7aae1e0075..6945d708b87 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java @@ -90,7 +90,7 @@ public class EmitOnChangeIntegrationTest { mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId), mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()), mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1), - mkEntry(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0), + mkEntry(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0), mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 300000L), mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class), mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class), diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java index 12cb0bf9563..7fd07b01ff6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java @@ -249,7 +249,7 @@ public class EosIntegrationTest { final Properties properties = new Properties(); properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig); - properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + properties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 1); properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), "1000"); @@ -326,7 +326,7 @@ public class EosIntegrationTest { final Properties properties = new Properties(); properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig); - properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + properties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); properties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000"); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); @@ -940,7 +940,7 @@ public class EosIntegrationTest { properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), maxPollIntervalMs); properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), maxPollIntervalMs - 1); properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), maxPollIntervalMs); - properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + properties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); properties.put(StreamsConfig.STATE_DIR_CONFIG, stateTmpDir + appDir); properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, dummyHostName + ":2142"); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosV2UpgradeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosV2UpgradeIntegrationTest.java index 7f652f0c7f5..aacb9e0483b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/EosV2UpgradeIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosV2UpgradeIntegrationTest.java @@ -949,7 +949,7 @@ public class EosV2UpgradeIntegrationTest { properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), MAX_POLL_INTERVAL_MS); properties.put(StreamsConfig.producerPrefix(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG), (int) commitInterval); properties.put(StreamsConfig.producerPrefix(ProducerConfig.PARTITIONER_CLASS_CONFIG), KeyPartitioner.class); - properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + properties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); properties.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath() + File.separator + appDir); properties.put(InternalConfig.ASSIGNMENT_LISTENER, assignmentListener); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java index e5ceb2fad4c..d05a57890b6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java @@ -144,7 +144,7 @@ public class FineGrainedAutoResetIntegrationTest { public void setUp() throws IOException { final Properties props = new Properties(); - props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); props.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); @@ -283,7 +283,7 @@ public class FineGrainedAutoResetIntegrationTest { @Test public void shouldThrowStreamsExceptionNoResetSpecified() throws InterruptedException { final Properties props = new Properties(); - props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); props.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none"); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java index 3ac94ad9683..b2272d0ec2b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java @@ -136,7 +136,7 @@ public class GlobalKTableEOSIntegrationTest { streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); - streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0L); + streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0L); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig); streamsConfiguration.put(StreamsConfig.TASK_TIMEOUT_MS_CONFIG, 1L); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java index 4668eab7ca3..b4b20d2aa07 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java @@ -107,7 +107,7 @@ public class GlobalKTableIntegrationTest { streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); - streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); globalTable = builder.globalTable(globalTableTopic, Consumed.with(Serdes.Long(), Serdes.String()), Materialized.>as(globalStore) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java index 4012224d301..06314adbd40 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java @@ -115,7 +115,7 @@ public class GlobalThreadShutDownOrderTest { streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); - streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); final Consumed stringLongConsumed = Consumed.with(Serdes.String(), Serdes.Long()); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java index 2e19c0caabc..37c26a6a4b9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java @@ -105,7 +105,7 @@ public class InternalTopicIntegrationTest { streamsProp.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsProp.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); streamsProp.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); - streamsProp.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + streamsProp.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); streamsProp.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java index 459d5f871ca..980917111ba 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java @@ -104,7 +104,7 @@ public class KStreamAggregationDedupIntegrationTest { streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL_MS); - streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L); + streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 10 * 1024 * 1024L); final KeyValueMapper mapper = MockMapper.selectValueMapper(); stream = builder.stream(streamOneInput, Consumed.with(Serdes.Integer(), Serdes.String())); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java index 6954f86e8b5..9a66a7d0570 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java @@ -139,7 +139,7 @@ public class KStreamAggregationIntegrationTest { streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); - streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java index 5043ee2f8b9..1cff0fc2016 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java @@ -142,7 +142,7 @@ public class KStreamRepartitionIntegrationTest { streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); - streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java index e628ed6ced6..b5eb98a31a1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java @@ -187,7 +187,7 @@ public class KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest { streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "KTable-FKJ-Partitioner"); streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + streamsConfig.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); return streamsConfig; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java index 3604f1127e3..82f6ffae188 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java @@ -212,7 +212,7 @@ public class KTableKTableForeignKeyInnerJoinMultiIntegrationTest { streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "KTable-FKJ-Multi"); streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + streamsConfig.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); return streamsConfig; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java index be583f17a7e..cefc48d0161 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java @@ -69,7 +69,7 @@ public class KTableSourceTopicRestartIntegrationTest { STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); - STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + STREAMS_CONFIG.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5L); STREAMS_CONFIG.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class); STREAMS_CONFIG.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java index 8d3cb8e8795..4f7de4397db 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java @@ -112,7 +112,7 @@ public class KafkaStreamsCloseOptionsIntegrationTest { streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot().getPath()); streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass()); streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); - streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + streamsConfig.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); streamsConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100); streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java index c937999ca5e..5fb5188e5f2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java @@ -198,6 +198,7 @@ public class MetricsIntegrationTest { private static final String THREAD_START_TIME = "thread-start-time"; private static final String ACTIVE_PROCESS_RATIO = "active-process-ratio"; private static final String ACTIVE_BUFFER_COUNT = "active-buffer-count"; + private static final String SKIPPED_RECORDS_RATE = "skipped-records-rate"; private static final String SKIPPED_RECORDS_TOTAL = "skipped-records-total"; private static final String RECORD_LATENESS_AVG = "record-lateness-avg"; @@ -258,7 +259,6 @@ public class MetricsIntegrationTest { streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); streamsConfiguration.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, Sensor.RecordingLevel.DEBUG.name); - streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L); streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, NUM_THREADS); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java index 5cc693265c6..d1659cbc9d4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java @@ -824,7 +824,7 @@ public class NamedTopologyIntegrationTest { try { final AtomicInteger noOutputExpected = new AtomicInteger(0); final AtomicInteger outputExpected = new AtomicInteger(0); - props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 15000L); props.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId).getPath()); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java index 8f8faea448f..2e709af0b11 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java @@ -213,7 +213,7 @@ public class OptimizedKTableIntegrationTest { config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); - config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + config.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100); config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 200); config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java index c897e0602f9..e09ba997b2e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java @@ -132,7 +132,7 @@ public class PauseResumeIntegrationTest { properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass()); properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); - properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + properties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100); properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java index a5dbdcc18c7..82409af50f8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java @@ -967,7 +967,7 @@ public class QueryableStateIntegrationTest { } private void verifyCanQueryState(final int cacheSizeBytes) throws Exception { - streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes); + streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, cacheSizeBytes); final StreamsBuilder builder = new StreamsBuilder(); final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"}; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java index 894108c246d..1a22c90fcf1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java @@ -133,7 +133,7 @@ public class RegexSourceIntegrationTest { public void setUp() throws InterruptedException { outputTopic = createTopic(topicSuffixGenerator.incrementAndGet()); final Properties properties = new Properties(); - properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + properties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); properties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000"); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java index efe1d39924e..75b57e6751a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java @@ -129,7 +129,7 @@ public class RestoreIntegrationTest { final Properties streamsConfiguration = new Properties(); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appId); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId).getPath()); streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java index 17610c8450d..2d667839fae 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java @@ -200,7 +200,7 @@ public class RocksDBMetricsIntegrationTest { streamsConfiguration.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, Sensor.RecordingLevel.DEBUG.name); streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, processingGuarantee); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); - streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); return streamsConfiguration; } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SelfJoinUpgradeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/SelfJoinUpgradeIntegrationTest.java index aaa791aa1dd..ff8cd3b8339 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/SelfJoinUpgradeIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/SelfJoinUpgradeIntegrationTest.java @@ -92,7 +92,7 @@ public class SelfJoinUpgradeIntegrationTest { final String safeTestName = safeUniqueTestName(getClass(), testName); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java index b15ecee4942..395ef6efd66 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java @@ -141,7 +141,7 @@ public class SlidingWindowedKStreamIntegrationTest { streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); - streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java index ed68a1f33ae..2ac2c2bddee 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java @@ -403,7 +403,7 @@ public class StandbyTaskEOSIntegrationTest { final Properties streamsConfiguration = new Properties(); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appId); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, stateDirPath); streamsConfiguration.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java index 6c6fc5d24b8..17df2b59a21 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java @@ -94,7 +94,7 @@ public class StoreUpgradeIntegrationTest { final String safeTestName = safeUniqueTestName(getClass(), testName); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java index 84876e77fd7..38dc3a6ffc1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java @@ -124,7 +124,7 @@ public class StreamTableJoinTopologyOptimizationIntegrationTest { streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); - streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java index e0ec6c9bbef..9abc2c9500d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java @@ -148,7 +148,7 @@ public class TimeWindowedKStreamIntegrationTest { streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); - streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java index 0974ed6464b..95185c49ffc 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java @@ -59,7 +59,7 @@ public class KTableFilterTest { @Before public void setUp() { // disable caching at the config level - props.setProperty(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0"); + props.setProperty(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, "0"); } private final Predicate predicate = (key, value) -> (value % 2) == 0; diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java index 3356b37c408..41876581b38 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java @@ -104,7 +104,7 @@ public class SessionWindowedKStreamImplTest { @Test public void shouldCountSessionWindowedWithCachingDisabled() { - props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); shouldCountSessionWindowed(); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java index f620cd22c18..29046429687 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java @@ -965,10 +965,22 @@ public class InternalTopologyBuilderTest { assertThat(topologyBuilder.topologyConfigs().getTaskConfig().maxTaskIdleMs, equalTo(100L)); } + @Test + @SuppressWarnings("deprecation") + public void shouldUseNonDeprecatedConfigToSetCacheBytesWhenBothDeprecatedAndNonDeprecatedConfigsUsed() { + final Properties globalProps = StreamsTestUtils.getStreamsConfig(); + globalProps.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 200L); + globalProps.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 100L); + final StreamsConfig globalStreamsConfig = new StreamsConfig(globalProps); + final InternalTopologyBuilder topologyBuilder = builder.rewriteTopology(globalStreamsConfig); + assertThat(topologyBuilder.topologyConfigs(), equalTo(new TopologyConfig(null, globalStreamsConfig, new Properties()))); + assertThat(topologyBuilder.topologyConfigs().cacheSize, equalTo(200L)); + } + @Test public void shouldOverrideGlobalStreamsConfigWhenGivenNamedTopologyProps() { final Properties topologyOverrides = new Properties(); - topologyOverrides.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 12345L); + topologyOverrides.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 12345L); topologyOverrides.put(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG, 500L); topologyOverrides.put(StreamsConfig.TASK_TIMEOUT_MS_CONFIG, 1000L); topologyOverrides.put(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, 15); @@ -996,7 +1008,7 @@ public class InternalTopologyBuilderTest { @Test public void shouldNotOverrideGlobalStreamsConfigWhenGivenUnnamedTopologyProps() { final Properties streamsProps = StreamsTestUtils.getStreamsConfig(); - streamsProps.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 12345L); + streamsProps.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 12345L); streamsProps.put(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG, 500L); streamsProps.put(StreamsConfig.TASK_TIMEOUT_MS_CONFIG, 1000L); streamsProps.put(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, 15); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java index 251a2637c43..7d0e7165ba7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java @@ -110,7 +110,7 @@ public class RepartitionOptimizingTest { @Before public void setUp() { streamsConfiguration = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String()); - streamsConfiguration.setProperty(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, Integer.toString(1024 * 10)); + streamsConfiguration.setProperty(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, Integer.toString(1024 * 10)); streamsConfiguration.setProperty(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Long.toString(5000)); processorValueCollector.clear(); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionWithMergeOptimizingTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionWithMergeOptimizingTest.java index b388a6a67ee..bc04505ebfe 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionWithMergeOptimizingTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionWithMergeOptimizingTest.java @@ -86,7 +86,7 @@ public class RepartitionWithMergeOptimizingTest { @Before public void setUp() { streamsConfiguration = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String()); - streamsConfiguration.setProperty(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, Integer.toString(1024 * 10)); + streamsConfiguration.setProperty(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, Integer.toString(1024 * 10)); streamsConfiguration.setProperty(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Long.toString(5000)); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetricsTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetricsTest.java index a38fb322bc2..485a12bbea6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetricsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetricsTest.java @@ -41,7 +41,6 @@ public class TaskMetricsTest { private final Sensor expectedSensor = mock(Sensor.class); private final Map tagMap = Collections.singletonMap("hello", "world"); - @Test public void shouldGetActiveProcessRatioSensor() { final String operation = "active-process-ratio"; diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java index 8a402ab9abf..1e06b4ab314 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java @@ -72,7 +72,7 @@ public class BrokerCompatibilityTest { streamsProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); streamsProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); streamsProperties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); - streamsProperties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + streamsProperties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); streamsProperties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, processingMode); final int timeout = 6000; streamsProperties.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), timeout); diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java index 3b1aa4478f3..ae9d7527d7a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java @@ -106,7 +106,7 @@ public class EosTestClient extends SmokeTestUtil { props.put(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, Duration.ofMinutes(1).toMillis()); props.put(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, Integer.MAX_VALUE); props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3); - props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5000L); // increase commit interval to make sure a client is killed having an open transaction props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java index b98f86141a0..af3614c7326 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java @@ -87,7 +87,7 @@ public class StreamsNamedRepartitionTest { final Properties config = new Properties(); config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsNamedRepartitionTest"); - config.setProperty(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0"); + config.setProperty(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, "0"); config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java index 714aa110ef3..95945b1b446 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java @@ -110,7 +110,7 @@ public class StreamsOptimizedTest { config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsOptimizedTest"); - config.setProperty(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0"); + config.setProperty(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, "0"); config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); config.setProperty(StreamsConfig.adminClientPrefix(AdminClientConfig.RETRIES_CONFIG), "100"); diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java index 3c693cc7135..2568b498c97 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java @@ -67,7 +67,7 @@ public class StreamsStandByReplicaTest { streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-standby-tasks"); streamsProperties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); streamsProperties.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); - streamsProperties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + streamsProperties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); streamsProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); streamsProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); streamsProperties.put(StreamsConfig.producerPrefix(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG), true); diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java index 9a7878aba56..96e8030d8fd 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java @@ -155,7 +155,7 @@ import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; * *

Note that the {@code TopologyTestDriver} processes input records synchronously. * This implies that {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit.interval.ms} and - * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache.max.bytes.buffering} configuration have no effect. + * {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache.max.bytes.buffering} configuration have no effect. * The driver behaves as if both configs would be set to zero, i.e., as if a "commit" (and thus "flush") would happen * after each input record. * @@ -329,7 +329,7 @@ public class TopologyTestDriver implements Closeable { final ThreadCache cache = new ThreadCache( logContext, - Math.max(0, streamsConfig.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG)), + Math.max(0, streamsConfig.getLong(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG)), streamsMetrics );