mirror of https://github.com/apache/kafka.git
KAFKA-13152: KIP-770, cache size config deprecation (#12758)
PR implementing KIP-770 (#11424) was reverted as it brought in a regression wrt pausing/resuming the consumer. That KIP also introduced a change to deprecate config CACHE_MAX_BYTES_BUFFERING_CONFIG and replace it with STATESTORE_CACHE_MAX_BYTES_CONFIG. Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
This commit is contained in:
parent
b525ddc9f1
commit
9a793897ec
|
@ -196,7 +196,7 @@
|
|||
files="StreamsMetricsImpl.java"/>
|
||||
|
||||
<suppress checks="NPathComplexity"
|
||||
files="(KafkaStreams|StreamsPartitionAssignor|StreamThread|TaskManager|GlobalStateManagerImpl|KStreamImplJoin).java"/>
|
||||
files="(KafkaStreams|StreamsPartitionAssignor|StreamThread|TaskManager|GlobalStateManagerImpl|KStreamImplJoin|TopologyConfig).java"/>
|
||||
|
||||
<suppress checks="(FinalLocalVariable|UnnecessaryParentheses|BooleanExpressionComplexity|CyclomaticComplexity|WhitespaceAfter|LocalVariableName)"
|
||||
files="Murmur3.java"/>
|
||||
|
|
|
@ -179,7 +179,7 @@ public class PageViewTypedDemo {
|
|||
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, JsonTimestampExtractor.class);
|
||||
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JSONSerde.class);
|
||||
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JSONSerde.class);
|
||||
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
|
||||
props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
|
||||
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
|
||||
|
||||
// setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
|
||||
|
|
|
@ -60,7 +60,7 @@ public class PageViewUntypedDemo {
|
|||
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pageview-untyped");
|
||||
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
|
||||
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, JsonTimestampExtractor.class);
|
||||
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
|
||||
props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
|
||||
|
||||
// setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
|
||||
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||
|
|
|
@ -77,7 +77,7 @@ public class TemperatureDemo {
|
|||
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
|
||||
|
||||
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
|
||||
props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
|
||||
|
||||
final Duration duration24Hours = Duration.ofHours(24);
|
||||
|
||||
|
|
|
@ -61,7 +61,7 @@ public final class WordCountDemo {
|
|||
}
|
||||
props.putIfAbsent(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
|
||||
props.putIfAbsent(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
|
||||
props.putIfAbsent(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
|
||||
props.putIfAbsent(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
|
||||
props.putIfAbsent(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
|
||||
props.putIfAbsent(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
|
||||
|
||||
|
|
|
@ -108,7 +108,7 @@ public final class WordCountProcessorDemo {
|
|||
|
||||
props.putIfAbsent(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount-processor");
|
||||
props.putIfAbsent(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
|
||||
props.putIfAbsent(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
|
||||
props.putIfAbsent(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
|
||||
props.putIfAbsent(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
|
||||
props.putIfAbsent(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
|
||||
|
||||
|
|
|
@ -131,7 +131,7 @@ public final class WordCountTransformerDemo {
|
|||
}
|
||||
props.putIfAbsent(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount-transformer");
|
||||
props.putIfAbsent(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
|
||||
props.putIfAbsent(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
|
||||
props.putIfAbsent(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
|
||||
props.putIfAbsent(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
|
||||
props.putIfAbsent(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
|
||||
|
||||
|
|
|
@ -106,6 +106,7 @@ import java.util.stream.Collectors;
|
|||
import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
|
||||
import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
|
||||
import static org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
|
||||
import static org.apache.kafka.streams.internals.StreamsConfigUtils.getTotalCacheSize;
|
||||
import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchEndOffsets;
|
||||
import static org.apache.kafka.streams.processor.internals.TopologyMetadata.UNNAMED_TOPOLOGY;
|
||||
|
||||
|
@ -925,7 +926,7 @@ public class KafkaStreams implements AutoCloseable {
|
|||
streamsUncaughtExceptionHandler = this::defaultStreamsUncaughtExceptionHandler;
|
||||
delegatingStateRestoreListener = new DelegatingStateRestoreListener();
|
||||
|
||||
totalCacheSize = applicationConfigs.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG);
|
||||
totalCacheSize = getTotalCacheSize(applicationConfigs);
|
||||
final int numStreamThreads = topologyMetadata.getNumStreamThreads(applicationConfigs);
|
||||
final long cacheSizePerThread = getCacheSizePerThread(numStreamThreads);
|
||||
|
||||
|
@ -1009,7 +1010,7 @@ public class KafkaStreams implements AutoCloseable {
|
|||
* Since the number of stream threads increases, the sizes of the caches in the new stream thread
|
||||
* and the existing stream threads are adapted so that the sum of the cache sizes over all stream
|
||||
* threads does not exceed the total cache size specified in configuration
|
||||
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
|
||||
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG}.
|
||||
* <p>
|
||||
* Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING.
|
||||
*
|
||||
|
@ -1058,7 +1059,7 @@ public class KafkaStreams implements AutoCloseable {
|
|||
* <p>
|
||||
* Since the number of stream threads decreases, the sizes of the caches in the remaining stream
|
||||
* threads are adapted so that the sum of the cache sizes over all stream threads equals the total
|
||||
* cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
|
||||
* cache size specified in configuration {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG}.
|
||||
*
|
||||
* @return name of the removed stream thread or empty if a stream thread could not be removed because
|
||||
* no stream threads are alive
|
||||
|
@ -1075,7 +1076,7 @@ public class KafkaStreams implements AutoCloseable {
|
|||
* <p>
|
||||
* Since the number of stream threads decreases, the sizes of the caches in the remaining stream
|
||||
* threads are adapted so that the sum of the cache sizes over all stream threads equals the total
|
||||
* cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
|
||||
* cache size specified in configuration {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG}.
|
||||
*
|
||||
* @param timeout The length of time to wait for the thread to shutdown
|
||||
* @throws org.apache.kafka.common.errors.TimeoutException if the thread does not stop in time
|
||||
|
|
|
@ -468,9 +468,15 @@ public class StreamsConfig extends AbstractConfig {
|
|||
|
||||
/** {@code cache.max.bytes.buffering} */
|
||||
@SuppressWarnings("WeakerAccess")
|
||||
@Deprecated
|
||||
public static final String CACHE_MAX_BYTES_BUFFERING_CONFIG = "cache.max.bytes.buffering";
|
||||
public static final String CACHE_MAX_BYTES_BUFFERING_DOC = "Maximum number of memory bytes to be used for buffering across all threads";
|
||||
|
||||
/** {@code statestore.cache.max.bytes} */
|
||||
@SuppressWarnings("WeakerAccess")
|
||||
public static final String STATESTORE_CACHE_MAX_BYTES_CONFIG = "statestore.cache.max.bytes";
|
||||
public static final String STATESTORE_CACHE_MAX_BYTES_DOC = "Maximum number of memory bytes to be used for statestore cache across all threads";
|
||||
|
||||
/** {@code client.id} */
|
||||
@SuppressWarnings("WeakerAccess")
|
||||
public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG;
|
||||
|
@ -784,6 +790,12 @@ public class StreamsConfig extends AbstractConfig {
|
|||
atLeast(0),
|
||||
Importance.MEDIUM,
|
||||
CACHE_MAX_BYTES_BUFFERING_DOC)
|
||||
.define(STATESTORE_CACHE_MAX_BYTES_CONFIG,
|
||||
Type.LONG,
|
||||
10 * 1024 * 1024L,
|
||||
atLeast(0),
|
||||
Importance.MEDIUM,
|
||||
STATESTORE_CACHE_MAX_BYTES_DOC)
|
||||
.define(CLIENT_ID_CONFIG,
|
||||
Type.STRING,
|
||||
"",
|
||||
|
|
|
@ -36,6 +36,8 @@ import static org.apache.kafka.streams.StreamsConfig.BUFFERED_RECORDS_PER_PARTIT
|
|||
import static org.apache.kafka.streams.StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_DOC;
|
||||
import static org.apache.kafka.streams.StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG;
|
||||
import static org.apache.kafka.streams.StreamsConfig.CACHE_MAX_BYTES_BUFFERING_DOC;
|
||||
import static org.apache.kafka.streams.StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG;
|
||||
import static org.apache.kafka.streams.StreamsConfig.STATESTORE_CACHE_MAX_BYTES_DOC;
|
||||
import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG;
|
||||
import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC;
|
||||
import static org.apache.kafka.streams.StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG;
|
||||
|
@ -48,6 +50,7 @@ import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DSL_STORE_CONFIG;
|
|||
import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DSL_STORE_DOC;
|
||||
import static org.apache.kafka.streams.StreamsConfig.ROCKS_DB;
|
||||
import static org.apache.kafka.streams.StreamsConfig.IN_MEMORY;
|
||||
import static org.apache.kafka.streams.internals.StreamsConfigUtils.getTotalCacheSize;
|
||||
|
||||
/**
|
||||
* Streams configs that apply at the topology level. The values in the {@link StreamsConfig} parameter of the
|
||||
|
@ -55,6 +58,7 @@ import static org.apache.kafka.streams.StreamsConfig.IN_MEMORY;
|
|||
* determine the defaults, which can then be overridden for specific topologies by passing them in when creating the
|
||||
* topology builders via the {@link org.apache.kafka.streams.StreamsBuilder()} method.
|
||||
*/
|
||||
@SuppressWarnings("deprecation")
|
||||
public class TopologyConfig extends AbstractConfig {
|
||||
private static final ConfigDef CONFIG;
|
||||
static {
|
||||
|
@ -65,10 +69,15 @@ public class TopologyConfig extends AbstractConfig {
|
|||
Importance.LOW,
|
||||
BUFFERED_RECORDS_PER_PARTITION_DOC)
|
||||
.define(CACHE_MAX_BYTES_BUFFERING_CONFIG,
|
||||
Type.LONG,
|
||||
null,
|
||||
Importance.MEDIUM,
|
||||
CACHE_MAX_BYTES_BUFFERING_DOC)
|
||||
.define(STATESTORE_CACHE_MAX_BYTES_CONFIG,
|
||||
Type.LONG,
|
||||
null,
|
||||
Importance.MEDIUM,
|
||||
CACHE_MAX_BYTES_BUFFERING_DOC)
|
||||
STATESTORE_CACHE_MAX_BYTES_DOC)
|
||||
.define(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
|
||||
Type.CLASS,
|
||||
null,
|
||||
|
@ -132,11 +141,43 @@ public class TopologyConfig extends AbstractConfig {
|
|||
maxBufferedSize = globalAppConfigs.getInt(BUFFERED_RECORDS_PER_PARTITION_CONFIG);
|
||||
}
|
||||
|
||||
if (isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides)) {
|
||||
cacheSize = getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG);
|
||||
log.info("Topology {} is overriding {} to {}", topologyName, CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSize);
|
||||
final boolean stateStoreCacheMaxBytesOverridden = isTopologyOverride(STATESTORE_CACHE_MAX_BYTES_CONFIG, topologyOverrides);
|
||||
final boolean cacheMaxBytesBufferingOverridden = isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides);
|
||||
|
||||
if (!stateStoreCacheMaxBytesOverridden && !cacheMaxBytesBufferingOverridden) {
|
||||
cacheSize = getTotalCacheSize(globalAppConfigs);
|
||||
} else {
|
||||
cacheSize = globalAppConfigs.getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG);
|
||||
if (stateStoreCacheMaxBytesOverridden && cacheMaxBytesBufferingOverridden) {
|
||||
cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG);
|
||||
log.info("Topology {} is using both deprecated config {} and new config {}, hence {} is ignored and the new config {} (value {}) is used",
|
||||
topologyName,
|
||||
CACHE_MAX_BYTES_BUFFERING_CONFIG,
|
||||
STATESTORE_CACHE_MAX_BYTES_CONFIG,
|
||||
CACHE_MAX_BYTES_BUFFERING_CONFIG,
|
||||
STATESTORE_CACHE_MAX_BYTES_CONFIG,
|
||||
cacheSize);
|
||||
} else if (cacheMaxBytesBufferingOverridden) {
|
||||
cacheSize = getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG);
|
||||
log.info("Topology {} is using only deprecated config {}, and will be used to set cache size to {}; " +
|
||||
"we suggest setting the new config {} instead as deprecated {} would be removed in the future.",
|
||||
topologyName,
|
||||
CACHE_MAX_BYTES_BUFFERING_CONFIG,
|
||||
cacheSize,
|
||||
STATESTORE_CACHE_MAX_BYTES_CONFIG,
|
||||
CACHE_MAX_BYTES_BUFFERING_CONFIG);
|
||||
} else {
|
||||
cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG);
|
||||
}
|
||||
|
||||
if (cacheSize != 0) {
|
||||
log.warn("Topology {} is overriding cache size to {} but this will not have any effect as the "
|
||||
+ "topology-level cache size config only controls whether record buffering is enabled "
|
||||
+ "or disabled, thus the only valid override value is 0",
|
||||
topologyName, cacheSize);
|
||||
} else {
|
||||
log.info("Topology {} is overriding cache size to {}, record buffering will be disabled",
|
||||
topologyName, cacheSize);
|
||||
}
|
||||
}
|
||||
|
||||
if (isTopologyOverride(MAX_TASK_IDLE_MS_CONFIG, topologyOverrides)) {
|
||||
|
|
|
@ -17,9 +17,16 @@
|
|||
package org.apache.kafka.streams.internals;
|
||||
|
||||
import org.apache.kafka.streams.StreamsConfig;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.kafka.streams.StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG;
|
||||
import static org.apache.kafka.streams.StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG;
|
||||
|
||||
public class StreamsConfigUtils {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(StreamsConfigUtils.class);
|
||||
|
||||
public enum ProcessingMode {
|
||||
AT_LEAST_ONCE("AT_LEAST_ONCE"),
|
||||
|
||||
|
@ -66,4 +73,28 @@ public class StreamsConfigUtils {
|
|||
return processingMode == ProcessingMode.EXACTLY_ONCE_ALPHA ||
|
||||
processingMode == ProcessingMode.EXACTLY_ONCE_V2;
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
public static long getTotalCacheSize(final StreamsConfig config) {
|
||||
// both deprecated and new config set. Warn and use the new one.
|
||||
if (config.originals().containsKey(CACHE_MAX_BYTES_BUFFERING_CONFIG) && config.originals().containsKey(STATESTORE_CACHE_MAX_BYTES_CONFIG)) {
|
||||
if (!config.getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG).equals(config.getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG))) {
|
||||
LOG.warn("Both deprecated config {} and the new config {} are set, hence {} is ignored and {} is used instead.",
|
||||
CACHE_MAX_BYTES_BUFFERING_CONFIG,
|
||||
STATESTORE_CACHE_MAX_BYTES_CONFIG,
|
||||
CACHE_MAX_BYTES_BUFFERING_CONFIG,
|
||||
STATESTORE_CACHE_MAX_BYTES_CONFIG);
|
||||
}
|
||||
return config.getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG);
|
||||
} else if (config.originals().containsKey(CACHE_MAX_BYTES_BUFFERING_CONFIG)) {
|
||||
// only deprecated config set.
|
||||
LOG.warn("Deprecated config {} is set, and will be used; we suggest setting the new config {} instead as deprecated {} would be removed in the future.",
|
||||
CACHE_MAX_BYTES_BUFFERING_CONFIG,
|
||||
STATESTORE_CACHE_MAX_BYTES_CONFIG,
|
||||
CACHE_MAX_BYTES_BUFFERING_CONFIG);
|
||||
return config.getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG);
|
||||
}
|
||||
// only new or no config set. Use default or user specified value.
|
||||
return config.getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -78,7 +78,7 @@ public interface CogroupedKStream<K, VOut> {
|
|||
* same key.
|
||||
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
|
||||
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
|
||||
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
|
||||
* <p>
|
||||
* To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
|
||||
|
@ -128,7 +128,7 @@ public interface CogroupedKStream<K, VOut> {
|
|||
* same key.
|
||||
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
|
||||
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
|
||||
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
|
||||
* <p>
|
||||
* To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
|
||||
|
@ -179,7 +179,7 @@ public interface CogroupedKStream<K, VOut> {
|
|||
* same key.
|
||||
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
|
||||
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
|
||||
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
|
||||
* <p>
|
||||
* To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
|
||||
|
@ -232,7 +232,7 @@ public interface CogroupedKStream<K, VOut> {
|
|||
* same key.
|
||||
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
|
||||
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
|
||||
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
|
||||
* <p>
|
||||
* To query the local {@link org.apache.kafka.streams.state.ReadOnlyKeyValueStore} it must be obtained via
|
||||
|
|
|
@ -53,7 +53,7 @@ public interface KGroupedStream<K, V> {
|
|||
* the same key.
|
||||
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
|
||||
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
|
||||
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
|
||||
* <p>
|
||||
* For failure and recovery the store (which always will be of type {@link TimestampedKeyValueStore}) will be backed by
|
||||
|
@ -81,7 +81,7 @@ public interface KGroupedStream<K, V> {
|
|||
* the same key.
|
||||
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
|
||||
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
|
||||
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
|
||||
* <p>
|
||||
* For failure and recovery the store (which always will be of type {@link TimestampedKeyValueStore}) will be backed by
|
||||
|
@ -112,7 +112,7 @@ public interface KGroupedStream<K, V> {
|
|||
* the same key.
|
||||
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
|
||||
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
|
||||
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
|
||||
* <p>
|
||||
* To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
|
||||
|
@ -158,7 +158,7 @@ public interface KGroupedStream<K, V> {
|
|||
* the same key.
|
||||
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
|
||||
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
|
||||
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
|
||||
* <p>
|
||||
* To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
|
||||
|
@ -211,7 +211,7 @@ public interface KGroupedStream<K, V> {
|
|||
* the same key.
|
||||
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
|
||||
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
|
||||
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
|
||||
*
|
||||
* <p>
|
||||
|
@ -262,7 +262,7 @@ public interface KGroupedStream<K, V> {
|
|||
* the same key.
|
||||
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
|
||||
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
|
||||
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
|
||||
* <p>
|
||||
* To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
|
||||
|
@ -326,7 +326,7 @@ public interface KGroupedStream<K, V> {
|
|||
* the same key.
|
||||
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
|
||||
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
|
||||
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
|
||||
* <p>
|
||||
* To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
|
||||
|
@ -385,7 +385,7 @@ public interface KGroupedStream<K, V> {
|
|||
* the same key.
|
||||
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
|
||||
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
|
||||
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
|
||||
*
|
||||
* <p>
|
||||
|
@ -431,7 +431,7 @@ public interface KGroupedStream<K, V> {
|
|||
* the same key.
|
||||
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
|
||||
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
|
||||
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
|
||||
* <p>
|
||||
* To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
|
||||
|
@ -490,7 +490,7 @@ public interface KGroupedStream<K, V> {
|
|||
* the same key.
|
||||
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
|
||||
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
|
||||
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
|
||||
* <p>
|
||||
* To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
|
||||
|
|
|
@ -52,7 +52,7 @@ public interface KGroupedTable<K, V> {
|
|||
* the same key.
|
||||
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
|
||||
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
|
||||
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
|
||||
* <p>
|
||||
* To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
|
||||
|
@ -95,7 +95,7 @@ public interface KGroupedTable<K, V> {
|
|||
* the same key.
|
||||
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
|
||||
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
|
||||
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
|
||||
* <p>
|
||||
* To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
|
||||
|
@ -138,7 +138,7 @@ public interface KGroupedTable<K, V> {
|
|||
* the same key.
|
||||
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
|
||||
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
|
||||
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
|
||||
* <p>
|
||||
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
|
||||
|
@ -167,7 +167,7 @@ public interface KGroupedTable<K, V> {
|
|||
* the same key.
|
||||
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
|
||||
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
|
||||
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
|
||||
* <p>
|
||||
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
|
||||
|
@ -223,7 +223,7 @@ public interface KGroupedTable<K, V> {
|
|||
* the same key.
|
||||
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
|
||||
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
|
||||
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
|
||||
* <p>
|
||||
* To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
|
||||
|
@ -296,7 +296,7 @@ public interface KGroupedTable<K, V> {
|
|||
* the same key.
|
||||
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
|
||||
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
|
||||
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
|
||||
* <p>
|
||||
* To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
|
||||
|
@ -368,7 +368,7 @@ public interface KGroupedTable<K, V> {
|
|||
* the same key.
|
||||
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
|
||||
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
|
||||
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
|
||||
* <p>
|
||||
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
|
||||
|
@ -434,7 +434,7 @@ public interface KGroupedTable<K, V> {
|
|||
* the same key.
|
||||
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
|
||||
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
|
||||
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
|
||||
* <p>
|
||||
* To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
|
||||
|
@ -518,7 +518,7 @@ public interface KGroupedTable<K, V> {
|
|||
* the same key.
|
||||
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
|
||||
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
|
||||
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
|
||||
* <p>
|
||||
* To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
|
||||
|
@ -604,7 +604,7 @@ public interface KGroupedTable<K, V> {
|
|||
* the same key.
|
||||
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
|
||||
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
|
||||
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
|
||||
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
|
||||
* The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
|
||||
|
@ -674,7 +674,7 @@ public interface KGroupedTable<K, V> {
|
|||
* the same key.
|
||||
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
|
||||
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
|
||||
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
|
||||
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
|
||||
* The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
|
||||
|
|
|
@ -77,7 +77,7 @@ public interface SessionWindowedCogroupedKStream<K, V> {
|
|||
* the same window and key.
|
||||
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
|
||||
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
|
||||
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
|
||||
* <p>
|
||||
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
|
||||
|
@ -122,7 +122,7 @@ public interface SessionWindowedCogroupedKStream<K, V> {
|
|||
* the same window and key.
|
||||
* The rate of propagated updates depends on your input data rate, the number of distinct
|
||||
* keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
|
||||
* parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
|
||||
* parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
|
||||
* <p>
|
||||
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
|
||||
|
@ -166,7 +166,7 @@ public interface SessionWindowedCogroupedKStream<K, V> {
|
|||
* the same window and key if caching is enabled on the {@link Materialized} instance.
|
||||
* When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
|
||||
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
|
||||
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
|
||||
* <p>
|
||||
* To query the local {@link SessionStore} it must be obtained via
|
||||
|
@ -226,7 +226,7 @@ public interface SessionWindowedCogroupedKStream<K, V> {
|
|||
* to the same window and key if caching is enabled on the {@link Materialized} instance.
|
||||
* When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct
|
||||
* keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
|
||||
* parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
|
||||
* parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
|
||||
* <p>
|
||||
* To query the local {@link SessionStore} it must be obtained via
|
||||
|
|
|
@ -65,7 +65,7 @@ public interface SessionWindowedKStream<K, V> {
|
|||
* the same session and key.
|
||||
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
|
||||
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
|
||||
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
|
||||
* <p>
|
||||
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
|
||||
|
@ -95,7 +95,7 @@ public interface SessionWindowedKStream<K, V> {
|
|||
* the same session and key.
|
||||
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
|
||||
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
|
||||
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
|
||||
* <p>
|
||||
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
|
||||
|
@ -126,7 +126,7 @@ public interface SessionWindowedKStream<K, V> {
|
|||
* to the same window and key if caching is enabled on the {@link Materialized} instance.
|
||||
* When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct
|
||||
* keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
|
||||
* parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
|
||||
* parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
|
||||
* <p>
|
||||
* To query the local {@link SessionStore} it must be obtained via
|
||||
|
@ -172,7 +172,7 @@ public interface SessionWindowedKStream<K, V> {
|
|||
* to the same window and key if caching is enabled on the {@link Materialized} instance.
|
||||
* When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct
|
||||
* keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
|
||||
* parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
|
||||
* parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
|
||||
* <p>
|
||||
* To query the local {@link SessionStore} it must be obtained via
|
||||
|
@ -233,7 +233,7 @@ public interface SessionWindowedKStream<K, V> {
|
|||
* the same window and key.
|
||||
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
|
||||
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
|
||||
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
|
||||
* <p>
|
||||
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
|
||||
|
@ -282,7 +282,7 @@ public interface SessionWindowedKStream<K, V> {
|
|||
* the same window and key.
|
||||
* The rate of propagated updates depends on your input data rate, the number of distinct
|
||||
* keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
|
||||
* parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
|
||||
* parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
|
||||
* <p>
|
||||
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
|
||||
|
@ -330,7 +330,7 @@ public interface SessionWindowedKStream<K, V> {
|
|||
* the same window and key if caching is enabled on the {@link Materialized} instance.
|
||||
* When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
|
||||
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
|
||||
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
|
||||
* <p>
|
||||
* To query the local {@link SessionStore} it must be obtained via
|
||||
|
@ -391,7 +391,7 @@ public interface SessionWindowedKStream<K, V> {
|
|||
* to the same window and key if caching is enabled on the {@link Materialized} instance.
|
||||
* When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct
|
||||
* keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
|
||||
* parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
|
||||
* parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
|
||||
* <p>
|
||||
* To query the local {@link SessionStore} it must be obtained via
|
||||
|
@ -459,7 +459,7 @@ public interface SessionWindowedKStream<K, V> {
|
|||
* the same window and key.
|
||||
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
|
||||
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
|
||||
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
|
||||
* <p>
|
||||
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
|
||||
|
@ -504,7 +504,7 @@ public interface SessionWindowedKStream<K, V> {
|
|||
* the same window and key.
|
||||
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
|
||||
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
|
||||
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
|
||||
* <p>
|
||||
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
|
||||
|
@ -549,7 +549,7 @@ public interface SessionWindowedKStream<K, V> {
|
|||
* to the same window and key if caching is enabled on the {@link Materialized} instance.
|
||||
* When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct
|
||||
* keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
|
||||
* parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
|
||||
* parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
|
||||
* <p>
|
||||
* To query the local {@link SessionStore} it must be obtained via
|
||||
|
@ -609,7 +609,7 @@ public interface SessionWindowedKStream<K, V> {
|
|||
* to the same window and key if caching is enabled on the {@link Materialized} instance.
|
||||
* When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct
|
||||
* keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
|
||||
* parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
|
||||
* parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
|
||||
* <p>
|
||||
* To query the local {@link SessionStore} it must be obtained via
|
||||
|
|
|
@ -75,7 +75,7 @@ public interface TimeWindowedCogroupedKStream<K, V> {
|
|||
* the same window and key.
|
||||
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
|
||||
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
|
||||
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
|
||||
* <p>
|
||||
* For failure and recovery the store (which always will be of type {@link TimestampedWindowStore}) will be backed by
|
||||
|
@ -115,7 +115,7 @@ public interface TimeWindowedCogroupedKStream<K, V> {
|
|||
* the same window and key.
|
||||
* The rate of propagated updates depends on your input data rate, the number of distinct
|
||||
* keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
|
||||
* parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
|
||||
* parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
|
||||
* <p>
|
||||
* For failure and recovery the store (which always will be of type {@link TimestampedWindowStore}) will be backed by
|
||||
|
@ -156,7 +156,7 @@ public interface TimeWindowedCogroupedKStream<K, V> {
|
|||
* the same window and key if caching is enabled on the {@link Materialized} instance.
|
||||
* When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct
|
||||
* keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
|
||||
* parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
|
||||
* parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
|
||||
* <p>
|
||||
* To query the local {@link ReadOnlyWindowStore} it must be obtained via
|
||||
|
@ -213,7 +213,7 @@ public interface TimeWindowedCogroupedKStream<K, V> {
|
|||
* to the same window and key if caching is enabled on the {@link Materialized} instance.
|
||||
* When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct
|
||||
* keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
|
||||
* parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
|
||||
* parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
|
||||
* <p>
|
||||
* To query the local {@link ReadOnlyWindowStore} it must be obtained via
|
||||
|
|
|
@ -65,7 +65,7 @@ public interface TimeWindowedKStream<K, V> {
|
|||
* the same window and key.
|
||||
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
|
||||
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
|
||||
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
|
||||
* <p>
|
||||
* For failure and recovery the store (which always will be of type {@link TimestampedWindowStore}) will be backed by
|
||||
|
@ -95,7 +95,7 @@ public interface TimeWindowedKStream<K, V> {
|
|||
* the same window and key.
|
||||
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
|
||||
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
|
||||
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
|
||||
* <p>
|
||||
* For failure and recovery the store (which always will be of type {@link TimestampedWindowStore}) will be backed by
|
||||
|
@ -126,7 +126,7 @@ public interface TimeWindowedKStream<K, V> {
|
|||
* to the same window and key if caching is enabled on the {@link Materialized} instance.
|
||||
* When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct
|
||||
* keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
|
||||
* parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
|
||||
* parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
|
||||
* <p>
|
||||
* To query the local {@link ReadOnlyWindowStore} it must be obtained via
|
||||
|
@ -175,7 +175,7 @@ public interface TimeWindowedKStream<K, V> {
|
|||
* to the same window and key if caching is enabled on the {@link Materialized} instance.
|
||||
* When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct
|
||||
* keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
|
||||
* parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
|
||||
* parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}
|
||||
* <p>
|
||||
* To query the local {@link ReadOnlyWindowStore} it must be obtained via
|
||||
|
@ -236,7 +236,7 @@ public interface TimeWindowedKStream<K, V> {
|
|||
* the same window and key.
|
||||
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
|
||||
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
|
||||
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
|
||||
* <p>
|
||||
* For failure and recovery the store (which always will be of type {@link TimestampedWindowStore}) will be backed by
|
||||
|
@ -281,7 +281,7 @@ public interface TimeWindowedKStream<K, V> {
|
|||
* the same window and key.
|
||||
* The rate of propagated updates depends on your input data rate, the number of distinct
|
||||
* keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
|
||||
* parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
|
||||
* parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
|
||||
* <p>
|
||||
* For failure and recovery the store (which always will be of type {@link TimestampedWindowStore}) will be backed by
|
||||
|
@ -326,7 +326,7 @@ public interface TimeWindowedKStream<K, V> {
|
|||
* the same window and key if caching is enabled on the {@link Materialized} instance.
|
||||
* When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct
|
||||
* keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
|
||||
* parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
|
||||
* parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
|
||||
* <p>
|
||||
* To query the local {@link ReadOnlyWindowStore} it must be obtained via
|
||||
|
@ -387,7 +387,7 @@ public interface TimeWindowedKStream<K, V> {
|
|||
* to the same window and key if caching is enabled on the {@link Materialized} instance.
|
||||
* When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct
|
||||
* keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
|
||||
* parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
|
||||
* parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}
|
||||
* <p>
|
||||
* To query the local {@link ReadOnlyWindowStore} it must be obtained via
|
||||
|
@ -457,7 +457,7 @@ public interface TimeWindowedKStream<K, V> {
|
|||
* the same window and key.
|
||||
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
|
||||
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
|
||||
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
|
||||
* <p>
|
||||
* For failure and recovery the store (which always will be of type {@link TimestampedWindowStore}) will be backed by
|
||||
|
@ -502,7 +502,7 @@ public interface TimeWindowedKStream<K, V> {
|
|||
* the same window and key.
|
||||
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
|
||||
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
|
||||
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
|
||||
* <p>
|
||||
* For failure and recovery the store (which always will be of type {@link TimestampedWindowStore}) will be backed by
|
||||
|
@ -547,7 +547,7 @@ public interface TimeWindowedKStream<K, V> {
|
|||
* to the same window and key if caching is enabled on the {@link Materialized} instance.
|
||||
* When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct
|
||||
* keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
|
||||
* parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
|
||||
* parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
|
||||
* <p>
|
||||
* To query the local {@link ReadOnlyWindowStore} it must be obtained via
|
||||
|
@ -610,7 +610,7 @@ public interface TimeWindowedKStream<K, V> {
|
|||
* to the same window and key if caching is enabled on the {@link Materialized} instance.
|
||||
* When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct
|
||||
* keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
|
||||
* parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
|
||||
* parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
|
||||
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
|
||||
* <p>
|
||||
* To query the local {@link ReadOnlyWindowStore} it must be obtained via
|
||||
|
|
|
@ -246,6 +246,7 @@ public class KafkaStreamsTest {
|
|||
PowerMock.mockStatic(StreamsConfigUtils.class);
|
||||
EasyMock.expect(StreamsConfigUtils.processingMode(anyObject(StreamsConfig.class))).andReturn(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE).anyTimes();
|
||||
EasyMock.expect(StreamsConfigUtils.eosEnabled(anyObject(StreamsConfig.class))).andReturn(false).anyTimes();
|
||||
EasyMock.expect(StreamsConfigUtils.getTotalCacheSize(anyObject(StreamsConfig.class))).andReturn(10 * 1024 * 1024L).anyTimes();
|
||||
EasyMock.expect(streamThreadOne.getId()).andReturn(1L).anyTimes();
|
||||
EasyMock.expect(streamThreadTwo.getId()).andReturn(2L).anyTimes();
|
||||
prepareStreamThread(streamThreadOne, 1, true);
|
||||
|
|
|
@ -66,6 +66,7 @@ import static org.apache.kafka.streams.StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFI
|
|||
import static org.apache.kafka.streams.StreamsConfig.adminClientPrefix;
|
||||
import static org.apache.kafka.streams.StreamsConfig.consumerPrefix;
|
||||
import static org.apache.kafka.streams.StreamsConfig.producerPrefix;
|
||||
import static org.apache.kafka.streams.internals.StreamsConfigUtils.getTotalCacheSize;
|
||||
import static org.apache.kafka.test.StreamsTestUtils.getStreamsConfig;
|
||||
import static org.hamcrest.CoreMatchers.containsString;
|
||||
import static org.hamcrest.CoreMatchers.hasItem;
|
||||
|
@ -1256,6 +1257,36 @@ public class StreamsConfigTest {
|
|||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("deprecation")
|
||||
public void shouldUseStateStoreCacheMaxBytesWhenBothOldAndNewConfigsAreSet() {
|
||||
props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 100);
|
||||
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10);
|
||||
final StreamsConfig config = new StreamsConfig(props);
|
||||
assertEquals(getTotalCacheSize(config), 100);
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("deprecation")
|
||||
public void shouldUseCacheMaxBytesBufferingConfigWhenOnlyDeprecatedConfigIsSet() {
|
||||
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10);
|
||||
final StreamsConfig config = new StreamsConfig(props);
|
||||
assertEquals(getTotalCacheSize(config), 10);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldUseStateStoreCacheMaxBytesWhenNewConfigIsSet() {
|
||||
props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 10);
|
||||
final StreamsConfig config = new StreamsConfig(props);
|
||||
assertEquals(getTotalCacheSize(config), 10);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldUseDefaultStateStoreCacheMaxBytesConfigWhenNoConfigIsSet() {
|
||||
final StreamsConfig config = new StreamsConfig(props);
|
||||
assertEquals(getTotalCacheSize(config), 10 * 1024 * 1024);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidSecurityProtocol() {
|
||||
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "abc");
|
||||
|
|
|
@ -134,7 +134,7 @@ public abstract class AbstractJoinIntegrationTest {
|
|||
|
||||
void prepareEnvironment() throws InterruptedException {
|
||||
if (!cacheEnabled) {
|
||||
STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
|
||||
STREAMS_CONFIG.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
|
||||
}
|
||||
|
||||
STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot().getPath());
|
||||
|
|
|
@ -151,7 +151,7 @@ public abstract class AbstractResetIntegrationTest {
|
|||
streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot().getPath());
|
||||
streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
|
||||
streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
|
||||
streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
|
||||
streamsConfig.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
|
||||
streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
|
||||
streamsConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
|
||||
streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||
|
|
|
@ -371,7 +371,7 @@ public class AdjustStreamThreadCountTest {
|
|||
final Properties props = new Properties();
|
||||
props.putAll(properties);
|
||||
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
|
||||
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, totalCacheBytes);
|
||||
props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, totalCacheBytes);
|
||||
|
||||
try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), props)) {
|
||||
addStreamStateChangeListener(kafkaStreams);
|
||||
|
@ -398,7 +398,7 @@ public class AdjustStreamThreadCountTest {
|
|||
final Properties props = new Properties();
|
||||
props.putAll(properties);
|
||||
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
|
||||
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, totalCacheBytes);
|
||||
props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, totalCacheBytes);
|
||||
|
||||
final AtomicBoolean injectError = new AtomicBoolean(false);
|
||||
|
||||
|
|
|
@ -90,7 +90,7 @@ public class EmitOnChangeIntegrationTest {
|
|||
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
|
||||
mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()),
|
||||
mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1),
|
||||
mkEntry(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0),
|
||||
mkEntry(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0),
|
||||
mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 300000L),
|
||||
mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class),
|
||||
mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class),
|
||||
|
|
|
@ -249,7 +249,7 @@ public class EosIntegrationTest {
|
|||
|
||||
final Properties properties = new Properties();
|
||||
properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig);
|
||||
properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
|
||||
properties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
|
||||
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
|
||||
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 1);
|
||||
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), "1000");
|
||||
|
@ -326,7 +326,7 @@ public class EosIntegrationTest {
|
|||
|
||||
final Properties properties = new Properties();
|
||||
properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig);
|
||||
properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
|
||||
properties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
|
||||
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
|
||||
properties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000");
|
||||
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||
|
@ -940,7 +940,7 @@ public class EosIntegrationTest {
|
|||
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), maxPollIntervalMs);
|
||||
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), maxPollIntervalMs - 1);
|
||||
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), maxPollIntervalMs);
|
||||
properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
|
||||
properties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
|
||||
properties.put(StreamsConfig.STATE_DIR_CONFIG, stateTmpDir + appDir);
|
||||
properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, dummyHostName + ":2142");
|
||||
|
||||
|
|
|
@ -949,7 +949,7 @@ public class EosV2UpgradeIntegrationTest {
|
|||
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), MAX_POLL_INTERVAL_MS);
|
||||
properties.put(StreamsConfig.producerPrefix(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG), (int) commitInterval);
|
||||
properties.put(StreamsConfig.producerPrefix(ProducerConfig.PARTITIONER_CLASS_CONFIG), KeyPartitioner.class);
|
||||
properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
|
||||
properties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
|
||||
properties.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath() + File.separator + appDir);
|
||||
properties.put(InternalConfig.ASSIGNMENT_LISTENER, assignmentListener);
|
||||
|
||||
|
|
|
@ -144,7 +144,7 @@ public class FineGrainedAutoResetIntegrationTest {
|
|||
public void setUp() throws IOException {
|
||||
|
||||
final Properties props = new Properties();
|
||||
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
|
||||
props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
|
||||
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
|
||||
props.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000");
|
||||
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||
|
@ -283,7 +283,7 @@ public class FineGrainedAutoResetIntegrationTest {
|
|||
@Test
|
||||
public void shouldThrowStreamsExceptionNoResetSpecified() throws InterruptedException {
|
||||
final Properties props = new Properties();
|
||||
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
|
||||
props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
|
||||
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
|
||||
props.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000");
|
||||
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
|
||||
|
|
|
@ -136,7 +136,7 @@ public class GlobalKTableEOSIntegrationTest {
|
|||
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
|
||||
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
|
||||
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
|
||||
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0L);
|
||||
streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0L);
|
||||
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
|
||||
streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig);
|
||||
streamsConfiguration.put(StreamsConfig.TASK_TIMEOUT_MS_CONFIG, 1L);
|
||||
|
|
|
@ -107,7 +107,7 @@ public class GlobalKTableIntegrationTest {
|
|||
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
|
||||
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
|
||||
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
|
||||
streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
|
||||
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
|
||||
globalTable = builder.globalTable(globalTableTopic, Consumed.with(Serdes.Long(), Serdes.String()),
|
||||
Materialized.<Long, String, KeyValueStore<Bytes, byte[]>>as(globalStore)
|
||||
|
|
|
@ -115,7 +115,7 @@ public class GlobalThreadShutDownOrderTest {
|
|||
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
|
||||
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
|
||||
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
|
||||
streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
|
||||
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
|
||||
|
||||
final Consumed<String, Long> stringLongConsumed = Consumed.with(Serdes.String(), Serdes.Long());
|
||||
|
|
|
@ -105,7 +105,7 @@ public class InternalTopicIntegrationTest {
|
|||
streamsProp.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
|
||||
streamsProp.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
|
||||
streamsProp.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
|
||||
streamsProp.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
|
||||
streamsProp.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
|
||||
streamsProp.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||
}
|
||||
|
||||
|
|
|
@ -104,7 +104,7 @@ public class KStreamAggregationDedupIntegrationTest {
|
|||
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
|
||||
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL_MS);
|
||||
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L);
|
||||
streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 10 * 1024 * 1024L);
|
||||
|
||||
final KeyValueMapper<Integer, String, String> mapper = MockMapper.selectValueMapper();
|
||||
stream = builder.stream(streamOneInput, Consumed.with(Serdes.Integer(), Serdes.String()));
|
||||
|
|
|
@ -139,7 +139,7 @@ public class KStreamAggregationIntegrationTest {
|
|||
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
|
||||
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
|
||||
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
|
||||
streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
|
||||
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
|
||||
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
|
||||
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
|
||||
|
|
|
@ -142,7 +142,7 @@ public class KStreamRepartitionIntegrationTest {
|
|||
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
|
||||
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
|
||||
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
|
||||
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
|
||||
streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
|
||||
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
|
||||
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
|
||||
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
|
||||
|
|
|
@ -187,7 +187,7 @@ public class KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest {
|
|||
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "KTable-FKJ-Partitioner");
|
||||
streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
|
||||
streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||
streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
|
||||
streamsConfig.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
|
||||
streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
|
||||
|
||||
return streamsConfig;
|
||||
|
|
|
@ -212,7 +212,7 @@ public class KTableKTableForeignKeyInnerJoinMultiIntegrationTest {
|
|||
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "KTable-FKJ-Multi");
|
||||
streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
|
||||
streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||
streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
|
||||
streamsConfig.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
|
||||
streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
|
||||
|
||||
return streamsConfig;
|
||||
|
|
|
@ -69,7 +69,7 @@ public class KTableSourceTopicRestartIntegrationTest {
|
|||
STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
|
||||
STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
|
||||
STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
|
||||
STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
|
||||
STREAMS_CONFIG.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
|
||||
STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5L);
|
||||
STREAMS_CONFIG.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
|
||||
STREAMS_CONFIG.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000);
|
||||
|
|
|
@ -112,7 +112,7 @@ public class KafkaStreamsCloseOptionsIntegrationTest {
|
|||
streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot().getPath());
|
||||
streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
|
||||
streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
|
||||
streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
|
||||
streamsConfig.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
|
||||
streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
|
||||
streamsConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
|
||||
streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||
|
|
|
@ -198,6 +198,7 @@ public class MetricsIntegrationTest {
|
|||
private static final String THREAD_START_TIME = "thread-start-time";
|
||||
private static final String ACTIVE_PROCESS_RATIO = "active-process-ratio";
|
||||
private static final String ACTIVE_BUFFER_COUNT = "active-buffer-count";
|
||||
|
||||
private static final String SKIPPED_RECORDS_RATE = "skipped-records-rate";
|
||||
private static final String SKIPPED_RECORDS_TOTAL = "skipped-records-total";
|
||||
private static final String RECORD_LATENESS_AVG = "record-lateness-avg";
|
||||
|
@ -258,7 +259,6 @@ public class MetricsIntegrationTest {
|
|||
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
|
||||
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
|
||||
streamsConfiguration.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, Sensor.RecordingLevel.DEBUG.name);
|
||||
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L);
|
||||
streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, NUM_THREADS);
|
||||
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
|
||||
}
|
||||
|
|
|
@ -824,7 +824,7 @@ public class NamedTopologyIntegrationTest {
|
|||
try {
|
||||
final AtomicInteger noOutputExpected = new AtomicInteger(0);
|
||||
final AtomicInteger outputExpected = new AtomicInteger(0);
|
||||
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
|
||||
props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
|
||||
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 15000L);
|
||||
props.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId).getPath());
|
||||
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class);
|
||||
|
|
|
@ -213,7 +213,7 @@ public class OptimizedKTableIntegrationTest {
|
|||
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
|
||||
config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
|
||||
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
|
||||
config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
|
||||
config.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
|
||||
config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
|
||||
config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 200);
|
||||
config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000);
|
||||
|
|
|
@ -132,7 +132,7 @@ public class PauseResumeIntegrationTest {
|
|||
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
|
||||
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
|
||||
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
|
||||
properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
|
||||
properties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
|
||||
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||
properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
|
||||
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000);
|
||||
|
|
|
@ -967,7 +967,7 @@ public class QueryableStateIntegrationTest {
|
|||
}
|
||||
|
||||
private void verifyCanQueryState(final int cacheSizeBytes) throws Exception {
|
||||
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes);
|
||||
streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, cacheSizeBytes);
|
||||
final StreamsBuilder builder = new StreamsBuilder();
|
||||
final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"};
|
||||
|
||||
|
|
|
@ -133,7 +133,7 @@ public class RegexSourceIntegrationTest {
|
|||
public void setUp() throws InterruptedException {
|
||||
outputTopic = createTopic(topicSuffixGenerator.incrementAndGet());
|
||||
final Properties properties = new Properties();
|
||||
properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
|
||||
properties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
|
||||
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
|
||||
properties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000");
|
||||
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||
|
|
|
@ -129,7 +129,7 @@ public class RestoreIntegrationTest {
|
|||
final Properties streamsConfiguration = new Properties();
|
||||
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
|
||||
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
|
||||
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
|
||||
streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
|
||||
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId).getPath());
|
||||
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
|
||||
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
|
||||
|
|
|
@ -200,7 +200,7 @@ public class RocksDBMetricsIntegrationTest {
|
|||
streamsConfiguration.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, Sensor.RecordingLevel.DEBUG.name);
|
||||
streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, processingGuarantee);
|
||||
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
|
||||
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
|
||||
streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
|
||||
return streamsConfiguration;
|
||||
}
|
||||
|
||||
|
|
|
@ -92,7 +92,7 @@ public class SelfJoinUpgradeIntegrationTest {
|
|||
final String safeTestName = safeUniqueTestName(getClass(), testName);
|
||||
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
|
||||
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
|
||||
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
|
||||
streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
|
||||
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
|
||||
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
|
||||
Serdes.String().getClass());
|
||||
|
|
|
@ -141,7 +141,7 @@ public class SlidingWindowedKStreamIntegrationTest {
|
|||
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
|
||||
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
|
||||
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
|
||||
streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
|
||||
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
|
||||
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
|
||||
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
|
||||
|
|
|
@ -403,7 +403,7 @@ public class StandbyTaskEOSIntegrationTest {
|
|||
final Properties streamsConfiguration = new Properties();
|
||||
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
|
||||
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
|
||||
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
|
||||
streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
|
||||
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, stateDirPath);
|
||||
streamsConfiguration.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
|
||||
streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig);
|
||||
|
|
|
@ -94,7 +94,7 @@ public class StoreUpgradeIntegrationTest {
|
|||
final String safeTestName = safeUniqueTestName(getClass(), testName);
|
||||
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
|
||||
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
|
||||
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
|
||||
streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
|
||||
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
|
||||
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
|
||||
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
|
||||
|
|
|
@ -124,7 +124,7 @@ public class StreamTableJoinTopologyOptimizationIntegrationTest {
|
|||
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
|
||||
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
|
||||
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
|
||||
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
|
||||
streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
|
||||
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
|
||||
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
|
||||
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
|
||||
|
|
|
@ -148,7 +148,7 @@ public class TimeWindowedKStreamIntegrationTest {
|
|||
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
|
||||
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
|
||||
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
|
||||
streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
|
||||
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
|
||||
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
|
||||
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
|
||||
|
|
|
@ -59,7 +59,7 @@ public class KTableFilterTest {
|
|||
@Before
|
||||
public void setUp() {
|
||||
// disable caching at the config level
|
||||
props.setProperty(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0");
|
||||
props.setProperty(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, "0");
|
||||
}
|
||||
|
||||
private final Predicate<String, Integer> predicate = (key, value) -> (value % 2) == 0;
|
||||
|
|
|
@ -104,7 +104,7 @@ public class SessionWindowedKStreamImplTest {
|
|||
|
||||
@Test
|
||||
public void shouldCountSessionWindowedWithCachingDisabled() {
|
||||
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
|
||||
props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
|
||||
shouldCountSessionWindowed();
|
||||
}
|
||||
|
||||
|
|
|
@ -965,10 +965,22 @@ public class InternalTopologyBuilderTest {
|
|||
assertThat(topologyBuilder.topologyConfigs().getTaskConfig().maxTaskIdleMs, equalTo(100L));
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("deprecation")
|
||||
public void shouldUseNonDeprecatedConfigToSetCacheBytesWhenBothDeprecatedAndNonDeprecatedConfigsUsed() {
|
||||
final Properties globalProps = StreamsTestUtils.getStreamsConfig();
|
||||
globalProps.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 200L);
|
||||
globalProps.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 100L);
|
||||
final StreamsConfig globalStreamsConfig = new StreamsConfig(globalProps);
|
||||
final InternalTopologyBuilder topologyBuilder = builder.rewriteTopology(globalStreamsConfig);
|
||||
assertThat(topologyBuilder.topologyConfigs(), equalTo(new TopologyConfig(null, globalStreamsConfig, new Properties())));
|
||||
assertThat(topologyBuilder.topologyConfigs().cacheSize, equalTo(200L));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldOverrideGlobalStreamsConfigWhenGivenNamedTopologyProps() {
|
||||
final Properties topologyOverrides = new Properties();
|
||||
topologyOverrides.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 12345L);
|
||||
topologyOverrides.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 12345L);
|
||||
topologyOverrides.put(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG, 500L);
|
||||
topologyOverrides.put(StreamsConfig.TASK_TIMEOUT_MS_CONFIG, 1000L);
|
||||
topologyOverrides.put(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, 15);
|
||||
|
@ -996,7 +1008,7 @@ public class InternalTopologyBuilderTest {
|
|||
@Test
|
||||
public void shouldNotOverrideGlobalStreamsConfigWhenGivenUnnamedTopologyProps() {
|
||||
final Properties streamsProps = StreamsTestUtils.getStreamsConfig();
|
||||
streamsProps.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 12345L);
|
||||
streamsProps.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 12345L);
|
||||
streamsProps.put(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG, 500L);
|
||||
streamsProps.put(StreamsConfig.TASK_TIMEOUT_MS_CONFIG, 1000L);
|
||||
streamsProps.put(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, 15);
|
||||
|
|
|
@ -110,7 +110,7 @@ public class RepartitionOptimizingTest {
|
|||
@Before
|
||||
public void setUp() {
|
||||
streamsConfiguration = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
|
||||
streamsConfiguration.setProperty(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, Integer.toString(1024 * 10));
|
||||
streamsConfiguration.setProperty(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, Integer.toString(1024 * 10));
|
||||
streamsConfiguration.setProperty(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Long.toString(5000));
|
||||
|
||||
processorValueCollector.clear();
|
||||
|
|
|
@ -86,7 +86,7 @@ public class RepartitionWithMergeOptimizingTest {
|
|||
@Before
|
||||
public void setUp() {
|
||||
streamsConfiguration = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
|
||||
streamsConfiguration.setProperty(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, Integer.toString(1024 * 10));
|
||||
streamsConfiguration.setProperty(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, Integer.toString(1024 * 10));
|
||||
streamsConfiguration.setProperty(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Long.toString(5000));
|
||||
}
|
||||
|
||||
|
|
|
@ -41,7 +41,6 @@ public class TaskMetricsTest {
|
|||
private final Sensor expectedSensor = mock(Sensor.class);
|
||||
private final Map<String, String> tagMap = Collections.singletonMap("hello", "world");
|
||||
|
||||
|
||||
@Test
|
||||
public void shouldGetActiveProcessRatioSensor() {
|
||||
final String operation = "active-process-ratio";
|
||||
|
|
|
@ -72,7 +72,7 @@ public class BrokerCompatibilityTest {
|
|||
streamsProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
|
||||
streamsProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
|
||||
streamsProperties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
|
||||
streamsProperties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
|
||||
streamsProperties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
|
||||
streamsProperties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, processingMode);
|
||||
final int timeout = 6000;
|
||||
streamsProperties.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), timeout);
|
||||
|
|
|
@ -106,7 +106,7 @@ public class EosTestClient extends SmokeTestUtil {
|
|||
props.put(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, Duration.ofMinutes(1).toMillis());
|
||||
props.put(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, Integer.MAX_VALUE);
|
||||
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
|
||||
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
|
||||
props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
|
||||
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5000L); // increase commit interval to make sure a client is killed having an open transaction
|
||||
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
|
||||
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
|
||||
|
|
|
@ -87,7 +87,7 @@ public class StreamsNamedRepartitionTest {
|
|||
final Properties config = new Properties();
|
||||
|
||||
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsNamedRepartitionTest");
|
||||
config.setProperty(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0");
|
||||
config.setProperty(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, "0");
|
||||
config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
|
||||
config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
|
||||
|
||||
|
|
|
@ -110,7 +110,7 @@ public class StreamsOptimizedTest {
|
|||
|
||||
|
||||
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsOptimizedTest");
|
||||
config.setProperty(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0");
|
||||
config.setProperty(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, "0");
|
||||
config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
|
||||
config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
|
||||
config.setProperty(StreamsConfig.adminClientPrefix(AdminClientConfig.RETRIES_CONFIG), "100");
|
||||
|
|
|
@ -67,7 +67,7 @@ public class StreamsStandByReplicaTest {
|
|||
streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-standby-tasks");
|
||||
streamsProperties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
|
||||
streamsProperties.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
|
||||
streamsProperties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
|
||||
streamsProperties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
|
||||
streamsProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
|
||||
streamsProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
|
||||
streamsProperties.put(StreamsConfig.producerPrefix(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG), true);
|
||||
|
|
|
@ -155,7 +155,7 @@ import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
|
|||
*
|
||||
* <p> Note that the {@code TopologyTestDriver} processes input records synchronously.
|
||||
* This implies that {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit.interval.ms} and
|
||||
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache.max.bytes.buffering} configuration have no effect.
|
||||
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache.max.bytes.buffering} configuration have no effect.
|
||||
* The driver behaves as if both configs would be set to zero, i.e., as if a "commit" (and thus "flush") would happen
|
||||
* after each input record.
|
||||
*
|
||||
|
@ -329,7 +329,7 @@ public class TopologyTestDriver implements Closeable {
|
|||
|
||||
final ThreadCache cache = new ThreadCache(
|
||||
logContext,
|
||||
Math.max(0, streamsConfig.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG)),
|
||||
Math.max(0, streamsConfig.getLong(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG)),
|
||||
streamsMetrics
|
||||
);
|
||||
|
||||
|
|
Loading…
Reference in New Issue