Revert "KAFKA-13152: Replace "buffered.records.per.partition" with "input.buffer.max.bytes" (#11424)"

This reverts commit 14c6030c6a.
Reason: Implemenation breaks backward compatibility
This commit is contained in:
Matthias J. Sax 2022-02-01 14:08:11 -08:00
parent 3b534e1c7d
commit 67cf187603
66 changed files with 297 additions and 966 deletions

View File

@ -179,7 +179,7 @@ public class PageViewTypedDemo {
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, JsonTimestampExtractor.class);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JSONSerde.class);
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JSONSerde.class);
props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
// setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data

View File

@ -60,7 +60,7 @@ public class PageViewUntypedDemo {
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pageview-untyped");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, JsonTimestampExtractor.class);
props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
// setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

View File

@ -77,7 +77,7 @@ public class TemperatureDemo {
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
final Duration duration24Hours = Duration.ofHours(24);

View File

@ -61,7 +61,7 @@ public final class WordCountDemo {
}
props.putIfAbsent(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
props.putIfAbsent(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.putIfAbsent(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
props.putIfAbsent(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
props.putIfAbsent(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.putIfAbsent(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

View File

@ -108,7 +108,7 @@ public final class WordCountProcessorDemo {
props.putIfAbsent(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount-processor");
props.putIfAbsent(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.putIfAbsent(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
props.putIfAbsent(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
props.putIfAbsent(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.putIfAbsent(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

View File

@ -131,7 +131,7 @@ public final class WordCountTransformerDemo {
}
props.putIfAbsent(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount-transformer");
props.putIfAbsent(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.putIfAbsent(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
props.putIfAbsent(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
props.putIfAbsent(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.putIfAbsent(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

View File

@ -174,7 +174,6 @@ public class KafkaStreams implements AutoCloseable {
protected final Admin adminClient;
private final StreamsMetricsImpl streamsMetrics;
private final long totalCacheSize;
private final long inputBufferMaxBytes;
private final StreamStateListener streamStateListener;
private final StateRestoreListener delegatingStateRestoreListener;
private final Map<Long, StreamThread.State> threadState;
@ -938,9 +937,9 @@ public class KafkaStreams implements AutoCloseable {
streamsUncaughtExceptionHandler = this::defaultStreamsUncaughtExceptionHandler;
delegatingStateRestoreListener = new DelegatingStateRestoreListener();
totalCacheSize = applicationConfigs.getTotalCacheSize();
inputBufferMaxBytes = applicationConfigs.getLong(StreamsConfig.INPUT_BUFFER_MAX_BYTES_CONFIG);
totalCacheSize = applicationConfigs.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG);
final int numStreamThreads = topologyMetadata.getNumStreamThreads(applicationConfigs);
final long cacheSizePerThread = getCacheSizePerThread(numStreamThreads);
GlobalStreamThread.State globalThreadState = null;
if (hasGlobalTopology) {
@ -950,7 +949,7 @@ public class KafkaStreams implements AutoCloseable {
applicationConfigs,
clientSupplier.getGlobalConsumer(applicationConfigs.getGlobalConsumerConfigs(clientId)),
stateDirectory,
0L,
cacheSizePerThread,
streamsMetrics,
time,
globalThreadId,
@ -971,16 +970,14 @@ public class KafkaStreams implements AutoCloseable {
queryableStoreProvider = new QueryableStoreProvider(globalStateStoreProvider);
for (int i = 1; i <= numStreamThreads; i++) {
createAndAddStreamThread(0L, 0L, i);
createAndAddStreamThread(cacheSizePerThread, i);
}
// Initially, all Stream Threads are created with 0 cache size and max buffer size and then resized here.
resizeThreadCacheAndBufferMemory(numStreamThreads);
stateDirCleaner = setupStateDirCleaner();
rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, applicationConfigs);
}
private StreamThread createAndAddStreamThread(final long cacheSizePerThread, final long maxBufferSizePerThread, final int threadIdx) {
private StreamThread createAndAddStreamThread(final long cacheSizePerThread, final int threadIdx) {
final StreamThread streamThread = StreamThread.create(
topologyMetadata,
applicationConfigs,
@ -992,7 +989,7 @@ public class KafkaStreams implements AutoCloseable {
time,
streamsMetadataState,
cacheSizePerThread,
maxBufferSizePerThread,
stateDirectory,
delegatingStateRestoreListener,
threadIdx,
@ -1029,7 +1026,7 @@ public class KafkaStreams implements AutoCloseable {
* Since the number of stream threads increases, the sizes of the caches in the new stream thread
* and the existing stream threads are adapted so that the sum of the cache sizes over all stream
* threads does not exceed the total cache size specified in configuration
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG}.
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
* <p>
* Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING.
*
@ -1040,15 +1037,14 @@ public class KafkaStreams implements AutoCloseable {
final StreamThread streamThread;
synchronized (changeThreadCount) {
final int threadIdx = getNextThreadIndex();
final int numLiveThreads = getNumLiveStreamThreads();
final long cacheSizePerThread = getCacheSizePerThread(numLiveThreads + 1);
log.info("Adding StreamThread-{}, there will now be {} live threads and the new cache size per thread is {}",
threadIdx, numLiveThreads + 1, cacheSizePerThread);
resizeThreadCache(cacheSizePerThread);
// Creating thread should hold the lock in order to avoid duplicate thread index.
// If the duplicate index happen, the metadata of thread may be duplicate too.
// Also, we create the new thread with initial values of cache size and max buffer size as 0
// and then resize them later
streamThread = createAndAddStreamThread(0L, 0L, threadIdx);
final int numLiveThreads = getNumLiveStreamThreads();
resizeThreadCacheAndBufferMemory(numLiveThreads + 1);
log.info("Adding StreamThread-{}, there are now {} threads with cache size/max buffer size values as {} per thread.",
threadIdx, numLiveThreads + 1, getThreadCacheAndBufferMemoryString());
streamThread = createAndAddStreamThread(cacheSizePerThread, threadIdx);
}
synchronized (stateLock) {
@ -1059,9 +1055,9 @@ public class KafkaStreams implements AutoCloseable {
log.warn("Terminating the new thread because the Kafka Streams client is in state {}", state);
streamThread.shutdown();
threads.remove(streamThread);
resizeThreadCacheAndBufferMemory(getNumLiveStreamThreads());
log.info("Resizing thread cache and max buffer size per thread since new thread can not be " +
"started, cache size/max buffer size per thread is {}", getThreadCacheAndBufferMemoryString());
final long cacheSizePerThread = getCacheSizePerThread(getNumLiveStreamThreads());
log.info("Resizing thread cache due to terminating added thread, new cache size per thread is {}", cacheSizePerThread);
resizeThreadCache(cacheSizePerThread);
return Optional.empty();
}
}
@ -1079,7 +1075,7 @@ public class KafkaStreams implements AutoCloseable {
* <p>
* Since the number of stream threads decreases, the sizes of the caches in the remaining stream
* threads are adapted so that the sum of the cache sizes over all stream threads equals the total
* cache size specified in configuration {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG}.
* cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
*
* @return name of the removed stream thread or empty if a stream thread could not be removed because
* no stream threads are alive
@ -1094,10 +1090,9 @@ public class KafkaStreams implements AutoCloseable {
* The removed stream thread is gracefully shut down. This method does not specify which stream
* thread is shut down.
* <p>
* Since the number of stream threads decreases, the sizes of the caches and buffer bytes in the remaining stream
* threads are adapted so that the sum of the cache sizes and buffer bytes over all stream threads equals the total
* cache size specified in configuration {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG} and
* {@link StreamsConfig#INPUT_BUFFER_MAX_BYTES_CONFIG} respectively.
* Since the number of stream threads decreases, the sizes of the caches in the remaining stream
* threads are adapted so that the sum of the cache sizes over all stream threads equals the total
* cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
*
* @param timeout The length of time to wait for the thread to shutdown
* @throws org.apache.kafka.common.errors.TimeoutException if the thread does not stop in time
@ -1137,10 +1132,12 @@ public class KafkaStreams implements AutoCloseable {
}
} else {
log.info("{} is the last remaining thread and must remove itself, therefore we cannot wait "
+ "for it to complete shutdown as this will result in deadlock.", streamThread.getName());
+ "for it to complete shutdown as this will result in deadlock.", streamThread.getName());
}
resizeThreadCacheAndBufferMemory(getNumLiveStreamThreads());
log.info("Resizing thread cache/max buffer size due to removal of thread {}, new cache size/max buffer size per thread is {}", streamThread.getName(), getThreadCacheAndBufferMemoryString());
final long cacheSizePerThread = getCacheSizePerThread(getNumLiveStreamThreads());
log.info("Resizing thread cache due to thread removal, new cache size per thread is {}", cacheSizePerThread);
resizeThreadCache(cacheSizePerThread);
if (groupInstanceID.isPresent() && callingThreadIsNotCurrentStreamThread) {
final MemberToRemove memberToRemove = new MemberToRemove(groupInstanceID.get());
final Collection<MemberToRemove> membersToRemove = Collections.singletonList(memberToRemove);
@ -1243,22 +1240,15 @@ public class KafkaStreams implements AutoCloseable {
}
}
private String getThreadCacheAndBufferMemoryString() {
final StreamThread streamThread = threads.get(0);
return streamThread.getCacheSize() + "/" + streamThread.getMaxBufferSize();
private long getCacheSizePerThread(final int numStreamThreads) {
if (numStreamThreads == 0) {
return totalCacheSize;
}
return totalCacheSize / (numStreamThreads + (topologyMetadata.hasGlobalTopology() ? 1 : 0));
}
private void resizeThreadCacheAndBufferMemory(final int numStreamThreads) {
final long cacheSizePerThread;
final long inputBufferMaxBytesPerThread;
if (numStreamThreads == 0) {
cacheSizePerThread = totalCacheSize;
inputBufferMaxBytesPerThread = inputBufferMaxBytes;
} else {
cacheSizePerThread = totalCacheSize / (numStreamThreads + (topologyMetadata.hasGlobalTopology() ? 1 : 0));
inputBufferMaxBytesPerThread = inputBufferMaxBytes / (numStreamThreads + (topologyMetadata.hasGlobalTopology() ? 1 : 0));
}
processStreamThread(thread -> thread.resizeCacheAndBufferMemory(cacheSizePerThread, inputBufferMaxBytesPerThread));
private void resizeThreadCache(final long cacheSizePerThread) {
processStreamThread(thread -> thread.resizeCache(cacheSizePerThread));
if (globalStreamThread != null) {
globalStreamThread.resize(cacheSizePerThread);
}

View File

@ -348,30 +348,18 @@ public class StreamsConfig extends AbstractConfig {
/** {@code buffered.records.per.partition} */
@SuppressWarnings("WeakerAccess")
@Deprecated
public static final String BUFFERED_RECORDS_PER_PARTITION_CONFIG = "buffered.records.per.partition";
public static final String BUFFERED_RECORDS_PER_PARTITION_DOC = "Maximum number of records to buffer per partition.";
/** {@code input.buffer.max.bytes} */
@SuppressWarnings("WeakerAccess")
public static final String INPUT_BUFFER_MAX_BYTES_CONFIG = "input.buffer.max.bytes";
public static final String INPUT_BUFFER_MAX_BYTES_DOC = "Maximum bytes of records to buffer across all threads";
/** {@code built.in.metrics.version} */
public static final String BUILT_IN_METRICS_VERSION_CONFIG = "built.in.metrics.version";
private static final String BUILT_IN_METRICS_VERSION_DOC = "Version of the built-in metrics to use.";
/** {@code cache.max.bytes.buffering} */
@SuppressWarnings("WeakerAccess")
@Deprecated
public static final String CACHE_MAX_BYTES_BUFFERING_CONFIG = "cache.max.bytes.buffering";
public static final String CACHE_MAX_BYTES_BUFFERING_DOC = "Maximum number of memory bytes to be used for buffering across all threads";
/** {@statestore.cache.max.bytes} */
@SuppressWarnings("WeakerAccess")
public static final String STATESTORE_CACHE_MAX_BYTES_CONFIG = "statestore.cache.max.bytes";
public static final String STATESTORE_CACHE_MAX_BYTES_DOC = "Maximum number of memory bytes to be used for statestore cache across all threads";
/** {@code client.id} */
@SuppressWarnings("WeakerAccess")
public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG;
@ -657,12 +645,6 @@ public class StreamsConfig extends AbstractConfig {
atLeast(0),
Importance.MEDIUM,
CACHE_MAX_BYTES_BUFFERING_DOC)
.define(STATESTORE_CACHE_MAX_BYTES_CONFIG,
Type.LONG,
10 * 1024 * 1024L,
atLeast(0),
Importance.MEDIUM,
STATESTORE_CACHE_MAX_BYTES_DOC)
.define(CLIENT_ID_CONFIG,
Type.STRING,
"",
@ -757,11 +739,6 @@ public class StreamsConfig extends AbstractConfig {
in(NO_OPTIMIZATION, OPTIMIZE),
Importance.MEDIUM,
TOPOLOGY_OPTIMIZATION_DOC)
.define(INPUT_BUFFER_MAX_BYTES_CONFIG,
Type.LONG,
512 * 1024 * 1024,
Importance.MEDIUM,
INPUT_BUFFER_MAX_BYTES_DOC)
// LOW
@ -1430,26 +1407,6 @@ public class StreamsConfig extends AbstractConfig {
return props;
}
public long getTotalCacheSize() {
// both deprecated and new config set. Warn and use the new one.
if (originals().containsKey(CACHE_MAX_BYTES_BUFFERING_CONFIG) && originals().containsKey(STATESTORE_CACHE_MAX_BYTES_CONFIG)) {
log.warn("Use of deprecated config {} noticed.", CACHE_MAX_BYTES_BUFFERING_CONFIG);
if (!getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG).equals(getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG))) {
log.warn("Config {} and {} have been set to different values. {} would be considered as total cache size",
CACHE_MAX_BYTES_BUFFERING_CONFIG,
STATESTORE_CACHE_MAX_BYTES_CONFIG,
STATESTORE_CACHE_MAX_BYTES_CONFIG);
}
return getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG);
} else if (originals().containsKey(CACHE_MAX_BYTES_BUFFERING_CONFIG)) {
// only deprecated config set.
log.warn("Use of deprecated config {} noticed.", CACHE_MAX_BYTES_BUFFERING_CONFIG);
return getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG);
}
// only new or no config set. Use default or user specified value.
return getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG);
}
/**
* Get the configs for the {@link Admin admin client}.
* @param clientId clientId

View File

@ -78,7 +78,7 @@ public interface CogroupedKStream<K, VOut> {
* same key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
@ -128,7 +128,7 @@ public interface CogroupedKStream<K, VOut> {
* same key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
@ -179,7 +179,7 @@ public interface CogroupedKStream<K, VOut> {
* same key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
@ -232,7 +232,7 @@ public interface CogroupedKStream<K, VOut> {
* same key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* To query the local {@link org.apache.kafka.streams.state.ReadOnlyKeyValueStore} it must be obtained via

View File

@ -53,7 +53,7 @@ public interface KGroupedStream<K, V> {
* the same key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* For failure and recovery the store (which always will be of type {@link TimestampedKeyValueStore}) will be backed by
@ -81,7 +81,7 @@ public interface KGroupedStream<K, V> {
* the same key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* For failure and recovery the store (which always will be of type {@link TimestampedKeyValueStore}) will be backed by
@ -112,7 +112,7 @@ public interface KGroupedStream<K, V> {
* the same key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
@ -158,7 +158,7 @@ public interface KGroupedStream<K, V> {
* the same key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
@ -211,7 +211,7 @@ public interface KGroupedStream<K, V> {
* the same key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
*
* <p>
@ -262,7 +262,7 @@ public interface KGroupedStream<K, V> {
* the same key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
@ -326,7 +326,7 @@ public interface KGroupedStream<K, V> {
* the same key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
@ -385,7 +385,7 @@ public interface KGroupedStream<K, V> {
* the same key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
*
* <p>
@ -431,7 +431,7 @@ public interface KGroupedStream<K, V> {
* the same key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
@ -490,7 +490,7 @@ public interface KGroupedStream<K, V> {
* the same key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* To query the local {@link ReadOnlyKeyValueStore} it must be obtained via

View File

@ -52,7 +52,7 @@ public interface KGroupedTable<K, V> {
* the same key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
@ -95,7 +95,7 @@ public interface KGroupedTable<K, V> {
* the same key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
@ -138,7 +138,7 @@ public interface KGroupedTable<K, V> {
* the same key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
@ -167,7 +167,7 @@ public interface KGroupedTable<K, V> {
* the same key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
@ -223,7 +223,7 @@ public interface KGroupedTable<K, V> {
* the same key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
@ -296,7 +296,7 @@ public interface KGroupedTable<K, V> {
* the same key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
@ -368,7 +368,7 @@ public interface KGroupedTable<K, V> {
* the same key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
@ -434,7 +434,7 @@ public interface KGroupedTable<K, V> {
* the same key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
@ -518,7 +518,7 @@ public interface KGroupedTable<K, V> {
* the same key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
@ -604,7 +604,7 @@ public interface KGroupedTable<K, V> {
* the same key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
* The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
@ -674,7 +674,7 @@ public interface KGroupedTable<K, V> {
* the same key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
* The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is

View File

@ -77,7 +77,7 @@ public interface SessionWindowedCogroupedKStream<K, V> {
* the same window and key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
@ -122,7 +122,7 @@ public interface SessionWindowedCogroupedKStream<K, V> {
* the same window and key.
* The rate of propagated updates depends on your input data rate, the number of distinct
* keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
* parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
@ -166,7 +166,7 @@ public interface SessionWindowedCogroupedKStream<K, V> {
* the same window and key if caching is enabled on the {@link Materialized} instance.
* When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* To query the local {@link SessionStore} it must be obtained via
@ -226,7 +226,7 @@ public interface SessionWindowedCogroupedKStream<K, V> {
* to the same window and key if caching is enabled on the {@link Materialized} instance.
* When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct
* keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
* parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* To query the local {@link SessionStore} it must be obtained via

View File

@ -65,7 +65,7 @@ public interface SessionWindowedKStream<K, V> {
* the same session and key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
@ -95,7 +95,7 @@ public interface SessionWindowedKStream<K, V> {
* the same session and key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
@ -126,7 +126,7 @@ public interface SessionWindowedKStream<K, V> {
* to the same window and key if caching is enabled on the {@link Materialized} instance.
* When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct
* keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
* parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* To query the local {@link SessionStore} it must be obtained via
@ -172,7 +172,7 @@ public interface SessionWindowedKStream<K, V> {
* to the same window and key if caching is enabled on the {@link Materialized} instance.
* When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct
* keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
* parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* To query the local {@link SessionStore} it must be obtained via
@ -233,7 +233,7 @@ public interface SessionWindowedKStream<K, V> {
* the same window and key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
@ -282,7 +282,7 @@ public interface SessionWindowedKStream<K, V> {
* the same window and key.
* The rate of propagated updates depends on your input data rate, the number of distinct
* keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
* parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
@ -330,7 +330,7 @@ public interface SessionWindowedKStream<K, V> {
* the same window and key if caching is enabled on the {@link Materialized} instance.
* When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* To query the local {@link SessionStore} it must be obtained via
@ -391,7 +391,7 @@ public interface SessionWindowedKStream<K, V> {
* to the same window and key if caching is enabled on the {@link Materialized} instance.
* When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct
* keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
* parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* To query the local {@link SessionStore} it must be obtained via
@ -459,7 +459,7 @@ public interface SessionWindowedKStream<K, V> {
* the same window and key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
@ -504,7 +504,7 @@ public interface SessionWindowedKStream<K, V> {
* the same window and key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
@ -549,7 +549,7 @@ public interface SessionWindowedKStream<K, V> {
* to the same window and key if caching is enabled on the {@link Materialized} instance.
* When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct
* keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
* parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* To query the local {@link SessionStore} it must be obtained via
@ -609,7 +609,7 @@ public interface SessionWindowedKStream<K, V> {
* to the same window and key if caching is enabled on the {@link Materialized} instance.
* When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct
* keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
* parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* To query the local {@link SessionStore} it must be obtained via

View File

@ -75,7 +75,7 @@ public interface TimeWindowedCogroupedKStream<K, V> {
* the same window and key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* For failure and recovery the store (which always will be of type {@link TimestampedWindowStore}) will be backed by
@ -115,7 +115,7 @@ public interface TimeWindowedCogroupedKStream<K, V> {
* the same window and key.
* The rate of propagated updates depends on your input data rate, the number of distinct
* keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
* parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* For failure and recovery the store (which always will be of type {@link TimestampedWindowStore}) will be backed by
@ -156,7 +156,7 @@ public interface TimeWindowedCogroupedKStream<K, V> {
* the same window and key if caching is enabled on the {@link Materialized} instance.
* When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct
* keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
* parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* To query the local {@link ReadOnlyWindowStore} it must be obtained via
@ -213,7 +213,7 @@ public interface TimeWindowedCogroupedKStream<K, V> {
* to the same window and key if caching is enabled on the {@link Materialized} instance.
* When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct
* keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
* parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* To query the local {@link ReadOnlyWindowStore} it must be obtained via

View File

@ -65,7 +65,7 @@ public interface TimeWindowedKStream<K, V> {
* the same window and key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* For failure and recovery the store (which always will be of type {@link TimestampedWindowStore}) will be backed by
@ -95,7 +95,7 @@ public interface TimeWindowedKStream<K, V> {
* the same window and key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* For failure and recovery the store (which always will be of type {@link TimestampedWindowStore}) will be backed by
@ -126,7 +126,7 @@ public interface TimeWindowedKStream<K, V> {
* to the same window and key if caching is enabled on the {@link Materialized} instance.
* When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct
* keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
* parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* To query the local {@link ReadOnlyWindowStore} it must be obtained via
@ -175,7 +175,7 @@ public interface TimeWindowedKStream<K, V> {
* to the same window and key if caching is enabled on the {@link Materialized} instance.
* When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct
* keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
* parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}
* <p>
* To query the local {@link ReadOnlyWindowStore} it must be obtained via
@ -236,7 +236,7 @@ public interface TimeWindowedKStream<K, V> {
* the same window and key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* For failure and recovery the store (which always will be of type {@link TimestampedWindowStore}) will be backed by
@ -281,7 +281,7 @@ public interface TimeWindowedKStream<K, V> {
* the same window and key.
* The rate of propagated updates depends on your input data rate, the number of distinct
* keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
* parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* For failure and recovery the store (which always will be of type {@link TimestampedWindowStore}) will be backed by
@ -326,7 +326,7 @@ public interface TimeWindowedKStream<K, V> {
* the same window and key if caching is enabled on the {@link Materialized} instance.
* When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct
* keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
* parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* To query the local {@link ReadOnlyWindowStore} it must be obtained via
@ -387,7 +387,7 @@ public interface TimeWindowedKStream<K, V> {
* to the same window and key if caching is enabled on the {@link Materialized} instance.
* When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct
* keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
* parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}
* <p>
* To query the local {@link ReadOnlyWindowStore} it must be obtained via
@ -457,7 +457,7 @@ public interface TimeWindowedKStream<K, V> {
* the same window and key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* For failure and recovery the store (which always will be of type {@link TimestampedWindowStore}) will be backed by
@ -502,7 +502,7 @@ public interface TimeWindowedKStream<K, V> {
* the same window and key.
* The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
* parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* For failure and recovery the store (which always will be of type {@link TimestampedWindowStore}) will be backed by
@ -547,7 +547,7 @@ public interface TimeWindowedKStream<K, V> {
* to the same window and key if caching is enabled on the {@link Materialized} instance.
* When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct
* keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
* parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* To query the local {@link ReadOnlyWindowStore} it must be obtained via
@ -610,7 +610,7 @@ public interface TimeWindowedKStream<K, V> {
* to the same window and key if caching is enabled on the {@link Materialized} instance.
* When caching is enabled the rate of propagated updates depends on your input data rate, the number of distinct
* keys, the number of parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
* parameters for {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache size}, and
* parameters for {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
* {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit interval}.
* <p>
* To query the local {@link ReadOnlyWindowStore} it must be obtained via

View File

@ -64,12 +64,10 @@ public class PartitionGroup {
private final Sensor enforcedProcessingSensor;
private final long maxTaskIdleMs;
private final Sensor recordLatenessSensor;
private final Sensor totalBytesSensor;
private final PriorityQueue<RecordQueue> nonEmptyQueuesByTime;
private long streamTime;
private int totalBuffered;
private long totalBytesBuffered;
private boolean allBuffered;
private final Map<TopicPartition, Long> idlePartitionDeadlines = new HashMap<>();
@ -94,7 +92,6 @@ public class PartitionGroup {
final Function<TopicPartition, OptionalLong> lagProvider,
final Sensor recordLatenessSensor,
final Sensor enforcedProcessingSensor,
final Sensor totalBytesSensor,
final long maxTaskIdleMs) {
this.logger = logContext.logger(PartitionGroup.class);
nonEmptyQueuesByTime = new PriorityQueue<>(partitionQueues.size(), Comparator.comparingLong(RecordQueue::headRecordTimestamp));
@ -103,7 +100,6 @@ public class PartitionGroup {
this.enforcedProcessingSensor = enforcedProcessingSensor;
this.maxTaskIdleMs = maxTaskIdleMs;
this.recordLatenessSensor = recordLatenessSensor;
this.totalBytesSensor = totalBytesSensor;
totalBuffered = 0;
allBuffered = false;
streamTime = RecordQueue.UNKNOWN;
@ -122,11 +118,11 @@ public class PartitionGroup {
}
}
logger.trace("Ready for processing because max.task.idle.ms is disabled." +
"\n\tThere may be out-of-order processing for this task as a result." +
"\n\tBuffered partitions: {}" +
"\n\tNon-buffered partitions: {}",
bufferedPartitions,
emptyPartitions);
"\n\tThere may be out-of-order processing for this task as a result." +
"\n\tBuffered partitions: {}" +
"\n\tNon-buffered partitions: {}",
bufferedPartitions,
emptyPartitions);
}
return true;
}
@ -155,9 +151,9 @@ public class PartitionGroup {
// must wait to poll the data we know to be on the broker
idlePartitionDeadlines.remove(partition);
logger.trace(
"Lag for {} is currently {}, but no data is buffered locally. Waiting to buffer some records.",
partition,
fetchedLag.getAsLong()
"Lag for {} is currently {}, but no data is buffered locally. Waiting to buffer some records.",
partition,
fetchedLag.getAsLong()
);
return false;
} else {
@ -171,11 +167,11 @@ public class PartitionGroup {
final long deadline = idlePartitionDeadlines.get(partition);
if (wallClockTime < deadline) {
logger.trace(
"Lag for {} is currently 0 and current time is {}. Waiting for new data to be produced for configured idle time {} (deadline is {}).",
partition,
wallClockTime,
maxTaskIdleMs,
deadline
"Lag for {} is currently 0 and current time is {}. Waiting for new data to be produced for configured idle time {} (deadline is {}).",
partition,
wallClockTime,
maxTaskIdleMs,
deadline
);
return false;
} else {
@ -197,15 +193,15 @@ public class PartitionGroup {
} else {
enforcedProcessingSensor.record(1.0d, wallClockTime);
logger.trace("Continuing to process although some partitions are empty on the broker." +
"\n\tThere may be out-of-order processing for this task as a result." +
"\n\tPartitions with local data: {}." +
"\n\tPartitions we gave up waiting for, with their corresponding deadlines: {}." +
"\n\tConfigured max.task.idle.ms: {}." +
"\n\tCurrent wall-clock time: {}.",
queued,
enforced,
maxTaskIdleMs,
wallClockTime);
"\n\tThere may be out-of-order processing for this task as a result." +
"\n\tPartitions with local data: {}." +
"\n\tPartitions we gave up waiting for, with their corresponding deadlines: {}." +
"\n\tConfigured max.task.idle.ms: {}." +
"\n\tCurrent wall-clock time: {}.",
queued,
enforced,
maxTaskIdleMs,
wallClockTime);
return true;
}
}
@ -229,7 +225,6 @@ public class PartitionGroup {
if (!newInputPartitions.contains(topicPartition)) {
// if partition is removed should delete its queue
totalBuffered -= queueEntry.getValue().size();
totalBytesBuffered -= queueEntry.getValue().getTotalBytesBuffered();
queuesIterator.remove();
removedPartitions.add(topicPartition);
}
@ -265,17 +260,12 @@ public class PartitionGroup {
info.queue = queue;
if (queue != null) {
// get the buffer size of queue before poll
final long oldBufferSize = queue.getTotalBytesBuffered();
// get the first record from this queue.
record = queue.poll();
// After polling, the buffer size would have reduced.
final long newBufferSize = queue.getTotalBytesBuffered();
if (record != null) {
--totalBuffered;
totalBytesBuffered -= oldBufferSize - newBufferSize;
totalBytesSensor.record(totalBytesBuffered);
if (queue.isEmpty()) {
// if a certain queue has been drained, reset the flag
allBuffered = false;
@ -311,9 +301,7 @@ public class PartitionGroup {
}
final int oldSize = recordQueue.size();
final long oldBufferSize = recordQueue.getTotalBytesBuffered();
final int newSize = recordQueue.addRawRecords(rawRecords);
final long newBufferSize = recordQueue.getTotalBytesBuffered();
// add this record queue to be considered for processing in the future if it was empty before
if (oldSize == 0 && newSize > 0) {
@ -328,8 +316,7 @@ public class PartitionGroup {
}
totalBuffered += newSize - oldSize;
totalBytesBuffered += newBufferSize - oldBufferSize;
totalBytesSensor.record(totalBytesBuffered);
return newSize;
}
@ -367,20 +354,12 @@ public class PartitionGroup {
return recordQueue.size();
}
Set<TopicPartition> getNonEmptyTopicPartitions() {
final Set<TopicPartition> nonEmptyTopicPartitions = new HashSet<>();
for (final RecordQueue recordQueue : nonEmptyQueuesByTime) {
nonEmptyTopicPartitions.add(recordQueue.partition());
}
return nonEmptyTopicPartitions;
}
int numBuffered() {
return totalBuffered;
}
long totalBytesBuffered() {
return totalBytesBuffered;
boolean allPartitionsBufferedLocally() {
return allBuffered;
}
void clear() {
@ -391,10 +370,4 @@ public class PartitionGroup {
totalBuffered = 0;
streamTime = RecordQueue.UNKNOWN;
}
// Below methods are for only testing.
boolean allPartitionsBufferedLocally() {
return allBuffered;
}
}

View File

@ -18,10 +18,8 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
@ -52,8 +50,6 @@ public class RecordQueue {
private long partitionTime = UNKNOWN;
private final Sensor droppedRecordsSensor;
private long totalBytesBuffered;
private long headRecordSizeInBytes;
RecordQueue(final TopicPartition partition,
final SourceNode<?, ?> source,
@ -78,8 +74,6 @@ public class RecordQueue {
droppedRecordsSensor
);
this.log = logContext.logger(RecordQueue.class);
this.totalBytesBuffered = 0L;
this.headRecordSizeInBytes = 0L;
}
void setPartitionTime(final long partitionTime) {
@ -104,25 +98,6 @@ public class RecordQueue {
return partition;
}
private long sizeInBytes(final ConsumerRecord<byte[], byte[]> record) {
long headerSizeInBytes = 0L;
for (final Header header: record.headers().toArray()) {
headerSizeInBytes += Utils.utf8(header.key()).length;
if (header.value() != null) {
headerSizeInBytes += header.value().length;
}
}
return record.serializedKeySize() +
record.serializedValueSize() +
8L + // timestamp
8L + // offset
Utils.utf8(record.topic()).length +
4L + // partition
headerSizeInBytes;
}
/**
* Add a batch of {@link ConsumerRecord} into the queue
*
@ -132,7 +107,6 @@ public class RecordQueue {
int addRawRecords(final Iterable<ConsumerRecord<byte[], byte[]>> rawRecords) {
for (final ConsumerRecord<byte[], byte[]> rawRecord : rawRecords) {
fifoQueue.addLast(rawRecord);
this.totalBytesBuffered += sizeInBytes(rawRecord);
}
updateHead();
@ -147,9 +121,7 @@ public class RecordQueue {
*/
public StampedRecord poll() {
final StampedRecord recordToReturn = headRecord;
totalBytesBuffered -= headRecordSizeInBytes;
headRecord = null;
headRecordSizeInBytes = 0L;
partitionTime = Math.max(partitionTime, recordToReturn.timestamp);
updateHead();
@ -195,7 +167,6 @@ public class RecordQueue {
public void clear() {
fifoQueue.clear();
headRecord = null;
headRecordSizeInBytes = 0L;
partitionTime = UNKNOWN;
}
@ -234,7 +205,6 @@ public class RecordQueue {
continue;
}
headRecord = new StampedRecord(deserialized, timestamp);
headRecordSizeInBytes = sizeInBytes(raw);
}
// if all records in the FIFO queue are corrupted, make the last one the headRecord
@ -250,11 +220,4 @@ public class RecordQueue {
long partitionTime() {
return partitionTime;
}
/**
* @return the total bytes buffered for this particular RecordQueue
*/
long getTotalBytesBuffered() {
return totalBytesBuffered;
}
}

View File

@ -189,7 +189,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
createPartitionQueues(),
mainConsumer::currentLag,
TaskMetrics.recordLatenessSensor(threadId, taskId, streamsMetrics),
TaskMetrics.totalBytesSensor(threadId, taskId, streamsMetrics),
enforcedProcessingSensor,
maxTaskIdleMs
);
@ -718,8 +717,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
// after processing this record, if its partition queue's buffered size has been
// decreased to the threshold, we can then resume the consumption on this partition
// TODO maxBufferedSize != -1 would be removed once the deprecated config buffered.records.per.partition is removed
if (maxBufferedSize != -1 && recordInfo.queue().size() == maxBufferedSize) {
if (recordInfo.queue().size() == maxBufferedSize) {
mainConsumer.resume(singleton(partition));
}
@ -973,8 +971,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
// if after adding these records, its partition queue's buffered size has been
// increased beyond the threshold, we can then pause the consumption for this partition
// We do this only if the deprecated config buffered.records.per.partition is set
if (maxBufferedSize != -1 && newQueueSize > maxBufferedSize) {
if (newQueueSize > maxBufferedSize) {
mainConsumer.pause(singleton(partition));
}
}
@ -1255,14 +1252,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
return recordCollector;
}
Set<TopicPartition> getNonEmptyTopicPartitions() {
return this.partitionGroup.getNonEmptyTopicPartitions();
}
long totalBytesBuffered() {
return partitionGroup.totalBytesBuffered();
}
// below are visible for testing only
int numBuffered() {
return partitionGroup.numBuffered();

View File

@ -314,7 +314,6 @@ public class StreamThread extends Thread {
// These are used to signal from outside the stream thread, but the variables themselves are internal to the thread
private final AtomicLong cacheResizeSize = new AtomicLong(-1L);
private final AtomicBoolean leaveGroupRequested = new AtomicBoolean(false);
private final AtomicLong maxBufferSizeBytes = new AtomicLong(-1L);
private final boolean eosEnabled;
public static StreamThread create(final TopologyMetadata topologyMetadata,
@ -327,7 +326,6 @@ public class StreamThread extends Thread {
final Time time,
final StreamsMetadataState streamsMetadataState,
final long cacheSizeBytes,
final long maxBufferSizeBytes,
final StateDirectory stateDirectory,
final StateRestoreListener userStateRestoreListener,
final int threadIdx,
@ -430,8 +428,7 @@ public class StreamThread extends Thread {
referenceContainer.nonFatalExceptionsToHandle,
shutdownErrorHook,
streamsUncaughtExceptionHandler,
cache::resize,
maxBufferSizeBytes
cache::resize
);
return streamThread.updateThreadMetadata(getSharedAdminClientId(clientId));
@ -492,8 +489,7 @@ public class StreamThread extends Thread {
final Queue<StreamsException> nonFatalExceptionsToHandle,
final Runnable shutdownErrorHook,
final BiConsumer<Throwable, Boolean> streamsUncaughtExceptionHandler,
final java.util.function.Consumer<Long> cacheResizer,
final long maxBufferSizeBytes) {
final java.util.function.Consumer<Long> cacheResizer) {
super(threadId);
this.stateLock = new Object();
this.adminClient = adminClient;
@ -561,7 +557,6 @@ public class StreamThread extends Thread {
this.numIterations = 1;
this.eosEnabled = eosEnabled(config);
this.maxBufferSizeBytes.set(maxBufferSizeBytes);
}
private static final class InternalConsumerConfig extends ConsumerConfig {
@ -742,17 +737,8 @@ public class StreamThread extends Thread {
}
}
public void resizeCacheAndBufferMemory(final long cacheSize, final long maxBufferSize) {
cacheResizeSize.set(cacheSize);
maxBufferSizeBytes.set(maxBufferSize);
}
public long getCacheSize() {
return cacheResizeSize.get();
}
public long getMaxBufferSize() {
return maxBufferSizeBytes.get();
public void resizeCache(final long size) {
cacheResizeSize.set(size);
}
/**
@ -827,10 +813,6 @@ public class StreamThread extends Thread {
totalProcessed += processed;
totalRecordsProcessedSinceLastSummary += processed;
final long bufferSize = taskManager.getInputBufferSizeInBytes();
if (bufferSize <= maxBufferSizeBytes.get()) {
mainConsumer.resume(mainConsumer.paused());
}
}
log.debug("Processed {} records with {} iterations; invoking punctuators if necessary",
@ -950,8 +932,7 @@ public class StreamThread extends Thread {
}
}
// Visible for testing
long pollPhase() {
private long pollPhase() {
final ConsumerRecords<byte[], byte[]> records;
log.debug("Invoking poll on main Consumer");
@ -996,17 +977,6 @@ public class StreamThread extends Thread {
if (!records.isEmpty()) {
pollRecordsSensor.record(numRecords, now);
taskManager.addRecordsToTasks(records);
// Check buffer size after adding records to tasks
final long bufferSize = taskManager.getInputBufferSizeInBytes();
// Pausing partitions as the buffer size now exceeds max buffer size
if (bufferSize > maxBufferSizeBytes.get()) {
log.info("Buffered records size {} bytes exceeds {}. Pausing the consumer", bufferSize, maxBufferSizeBytes.get());
// Only non-empty partitions are paused here. Reason is that, if a task has multiple partitions with
// some of them empty, then in that case pausing even empty partitions would sacrifice ordered processing
// and even lead to temporal deadlock. More explanation can be found here:
// https://issues.apache.org/jira/browse/KAFKA-13152?focusedCommentId=17400647&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17400647
mainConsumer.pause(taskManager.nonEmptyPartitions());
}
}
while (!nonFatalExceptionsToHandle.isEmpty()) {

View File

@ -246,5 +246,4 @@ public interface Task {
* @return This returns the time the task started idling. If it is not idling it returns empty.
*/
Optional<Long> timeCurrentIdlingStarted();
}

View File

@ -190,15 +190,15 @@ public class TaskManager {
// We need to commit before closing the corrupted active tasks since this will force the ongoing txn to abort
try {
final Collection<Task> tasksToCommit = tasks()
.values()
.stream()
.filter(t -> t.state() == Task.State.RUNNING || t.state() == Task.State.RESTORING)
.filter(t -> !corruptedTasks.contains(t.id()))
.collect(Collectors.toSet());
.values()
.stream()
.filter(t -> t.state() == Task.State.RUNNING || t.state() == Task.State.RESTORING)
.filter(t -> !corruptedTasks.contains(t.id()))
.collect(Collectors.toSet());
commitTasksAndMaybeUpdateCommittableOffsets(tasksToCommit, new HashMap<>());
} catch (final TaskCorruptedException e) {
log.info("Some additional tasks were found corrupted while trying to commit, these will be added to the " +
"tasks to clean and revive: {}", e.corruptedTasks());
"tasks to clean and revive: {}", e.corruptedTasks());
corruptedActiveTasks.addAll(tasks.tasks(e.corruptedTasks()));
} catch (final TimeoutException e) {
log.info("Hit TimeoutException when committing all non-corrupted tasks, these will be closed and revived");
@ -249,13 +249,13 @@ public class TaskManager {
final Set<TopicPartition> currentAssignment = mainConsumer.assignment();
final Set<TopicPartition> taskInputPartitions = task.inputPartitions();
final Set<TopicPartition> assignedToPauseAndReset =
intersection(HashSet::new, currentAssignment, taskInputPartitions);
intersection(HashSet::new, currentAssignment, taskInputPartitions);
if (!assignedToPauseAndReset.equals(taskInputPartitions)) {
log.warn(
"Expected the current consumer assignment {} to contain the input partitions {}. " +
"Will proceed to recover.",
currentAssignment,
taskInputPartitions
"Expected the current consumer assignment {} to contain the input partitions {}. " +
"Will proceed to recover.",
currentAssignment,
taskInputPartitions
);
}
@ -274,15 +274,15 @@ public class TaskManager {
public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
final Map<TaskId, Set<TopicPartition>> standbyTasks) {
log.info("Handle new assignment with:\n" +
"\tNew active tasks: {}\n" +
"\tNew standby tasks: {}\n" +
"\tExisting active tasks: {}\n" +
"\tExisting standby tasks: {}",
activeTasks.keySet(), standbyTasks.keySet(), activeTaskIds(), standbyTaskIds());
"\tNew active tasks: {}\n" +
"\tNew standby tasks: {}\n" +
"\tExisting active tasks: {}\n" +
"\tExisting standby tasks: {}",
activeTasks.keySet(), standbyTasks.keySet(), activeTaskIds(), standbyTaskIds());
topologyMetadata.addSubscribedTopicsFromAssignment(
activeTasks.values().stream().flatMap(Collection::stream).collect(Collectors.toList()),
logPrefix
activeTasks.values().stream().flatMap(Collection::stream).collect(Collectors.toList()),
logPrefix
);
final LinkedHashMap<TaskId, RuntimeException> taskCloseExceptions = new LinkedHashMap<>();
@ -311,12 +311,12 @@ public class TaskManager {
// close and recycle those tasks
handleCloseAndRecycle(
tasksToRecycle,
tasksToCloseClean,
tasksToCloseDirty,
activeTasksToCreate,
standbyTasksToCreate,
taskCloseExceptions
tasksToRecycle,
tasksToCloseClean,
tasksToCloseDirty,
activeTasksToCreate,
standbyTasksToCreate,
taskCloseExceptions
);
if (!taskCloseExceptions.isEmpty()) {
@ -333,11 +333,11 @@ public class TaskManager {
throw new StreamsException(exception, taskId);
} else {
throw new StreamsException(
"Unexpected failure to close " + taskCloseExceptions.size() +
" task(s) [" + taskCloseExceptions.keySet() + "]. " +
"First unexpected exception (for task " + taskId + ") follows.",
exception,
taskId
"Unexpected failure to close " + taskCloseExceptions.size() +
" task(s) [" + taskCloseExceptions.keySet() + "]. " +
"First unexpected exception (for task " + taskId + ") follows.",
exception,
taskId
);
}
}
@ -484,10 +484,10 @@ public class TaskManager {
} catch (final TimeoutException timeoutException) {
task.maybeInitTaskTimeoutOrThrow(now, timeoutException);
log.debug(
String.format(
"Could not complete restoration for %s due to the following exception; will retry",
task.id()),
timeoutException
String.format(
"Could not complete restoration for %s due to the following exception; will retry",
task.id()),
timeoutException
);
allRunning = false;
@ -539,8 +539,8 @@ public class TaskManager {
if (!remainingRevokedPartitions.isEmpty()) {
log.debug("The following revoked partitions {} are missing from the current task partitions. It could "
+ "potentially be due to race condition of consumer detecting the heartbeat failure, or the tasks " +
"have been cleaned up by the handleAssignment callback.", remainingRevokedPartitions);
+ "potentially be due to race condition of consumer detecting the heartbeat failure, or the tasks " +
"have been cleaned up by the handleAssignment callback.", remainingRevokedPartitions);
}
prepareCommitAndAddOffsetsToMap(revokedActiveTasks, consumedOffsetsPerTask);
@ -562,7 +562,7 @@ public class TaskManager {
commitOffsetsOrTransaction(consumedOffsetsPerTask);
} catch (final TaskCorruptedException e) {
log.warn("Some tasks were corrupted when trying to commit offsets, these will be cleaned and revived: {}",
e.corruptedTasks());
e.corruptedTasks());
// If we hit a TaskCorruptedException it must be EOS, just handle the cleanup for those corrupted tasks right here
dirtyTasks.addAll(tasks.tasks(e.corruptedTasks()));
@ -773,8 +773,8 @@ public class TaskManager {
} else if (offset != OffsetCheckpoint.OFFSET_UNKNOWN) {
if (offset < 0) {
throw new StreamsException(
new IllegalStateException("Expected not to get a sentinel offset, but got: " + changelogEntry),
id);
new IllegalStateException("Expected not to get a sentinel offset, but got: " + changelogEntry),
id);
}
offsetSum += offset;
if (offsetSum < 0) {
@ -818,17 +818,17 @@ public class TaskManager {
activeTasks.addAll(tasks.activeTasks());
executeAndMaybeSwallow(
clean,
() -> closeAndCleanUpTasks(activeTasks, standbyTaskIterable(), clean),
e -> firstException.compareAndSet(null, e),
e -> log.warn("Ignoring an exception while unlocking remaining task directories.", e)
clean,
() -> closeAndCleanUpTasks(activeTasks, standbyTaskIterable(), clean),
e -> firstException.compareAndSet(null, e),
e -> log.warn("Ignoring an exception while unlocking remaining task directories.", e)
);
executeAndMaybeSwallow(
clean,
tasks::closeThreadProducerIfNeeded,
e -> firstException.compareAndSet(null, e),
e -> log.warn("Ignoring an exception while closing thread producer.", e)
clean,
tasks::closeThreadProducerIfNeeded,
e -> firstException.compareAndSet(null, e),
e -> log.warn("Ignoring an exception while closing thread producer.", e)
);
tasks.clear();
@ -836,10 +836,10 @@ public class TaskManager {
// this should be called after closing all tasks and clearing them from `tasks` to make sure we unlock the dir
// for any tasks that may have still been in CREATED at the time of shutdown, since Task#close will not do so
executeAndMaybeSwallow(
clean,
this::releaseLockedUnassignedTaskDirectories,
e -> firstException.compareAndSet(null, e),
e -> log.warn("Ignoring an exception while unlocking remaining task directories.", e)
clean,
this::releaseLockedUnassignedTaskDirectories,
e -> firstException.compareAndSet(null, e),
e -> log.warn("Ignoring an exception while unlocking remaining task directories.", e)
);
final RuntimeException fatalException = firstException.get();
@ -865,10 +865,10 @@ public class TaskManager {
// TODO: change type to `StreamTask`
for (final Task activeTask : activeTasks) {
executeAndMaybeSwallow(
clean,
() -> tasks.closeAndRemoveTaskProducerIfNeeded(activeTask),
e -> firstException.compareAndSet(null, e),
e -> log.warn("Ignoring an exception while closing task " + activeTask.id() + " producer.", e)
clean,
() -> tasks.closeAndRemoveTaskProducerIfNeeded(activeTask),
e -> firstException.compareAndSet(null, e),
e -> log.warn("Ignoring an exception while closing task " + activeTask.id() + " producer.", e)
);
}
@ -1005,14 +1005,14 @@ public class TaskManager {
Set<TaskId> activeTaskIds() {
return activeTaskStream()
.map(Task::id)
.collect(Collectors.toSet());
.map(Task::id)
.collect(Collectors.toSet());
}
Set<TaskId> standbyTaskIds() {
return standbyTaskStream()
.map(Task::id)
.collect(Collectors.toSet());
.map(Task::id)
.collect(Collectors.toSet());
}
Map<TaskId, Task> tasks() {
@ -1061,7 +1061,7 @@ public class TaskManager {
if (activeTask == null) {
log.error("Unable to locate active task for received-record partition {}. Current tasks: {}",
partition, toString(">"));
partition, toString(">"));
throw new NullPointerException("Task was unexpectedly missing for partition " + partition);
}
@ -1069,17 +1069,6 @@ public class TaskManager {
}
}
/**
* Fetch all non-empty partitions for pausing
*/
Set<TopicPartition> nonEmptyPartitions() {
final Set<TopicPartition> nonEmptyPartitions = new HashSet<>();
for (final Task task : activeTaskIterable()) {
nonEmptyPartitions.addAll(((StreamTask) task).getNonEmptyTopicPartitions());
}
return nonEmptyPartitions;
}
/**
* @throws TaskMigratedException if committing offsets failed (non-EOS)
* or if the task producer got fenced (EOS)
@ -1095,8 +1084,8 @@ public class TaskManager {
committed = commitTasksAndMaybeUpdateCommittableOffsets(tasksToCommit, consumedOffsetsAndMetadataPerTask);
} catch (final TimeoutException timeoutException) {
consumedOffsetsAndMetadataPerTask
.keySet()
.forEach(t -> t.maybeInitTaskTimeoutOrThrow(time.milliseconds(), timeoutException));
.keySet()
.forEach(t -> t.maybeInitTaskTimeoutOrThrow(time.milliseconds(), timeoutException));
}
return committed;
@ -1175,19 +1164,19 @@ public class TaskManager {
final Task task = taskToCommit.getKey();
try {
tasks.streamsProducerForTask(task.id())
.commitTransaction(taskToCommit.getValue(), mainConsumer.groupMetadata());
.commitTransaction(taskToCommit.getValue(), mainConsumer.groupMetadata());
updateTaskCommitMetadata(taskToCommit.getValue());
} catch (final TimeoutException timeoutException) {
log.error(
String.format("Committing task %s failed.", task.id()),
timeoutException
String.format("Committing task %s failed.", task.id()),
timeoutException
);
corruptedTasks.add(task.id());
}
}
} else {
final Map<TopicPartition, OffsetAndMetadata> allOffsets = offsetsPerTask.values().stream()
.flatMap(e -> e.entrySet().stream()).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
.flatMap(e -> e.entrySet().stream()).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
if (processingMode == EXACTLY_ONCE_V2) {
try {
@ -1195,17 +1184,17 @@ public class TaskManager {
updateTaskCommitMetadata(allOffsets);
} catch (final TimeoutException timeoutException) {
log.error(
String.format("Committing task(s) %s failed.",
offsetsPerTask
.keySet()
.stream()
.map(t -> t.id().toString())
.collect(Collectors.joining(", "))),
timeoutException
String.format("Committing task(s) %s failed.",
offsetsPerTask
.keySet()
.stream()
.map(t -> t.id().toString())
.collect(Collectors.joining(", "))),
timeoutException
);
offsetsPerTask
.keySet()
.forEach(task -> corruptedTasks.add(task.id()));
.keySet()
.forEach(task -> corruptedTasks.add(task.id()));
}
} else {
try {
@ -1213,16 +1202,16 @@ public class TaskManager {
updateTaskCommitMetadata(allOffsets);
} catch (final CommitFailedException error) {
throw new TaskMigratedException("Consumer committing offsets failed, " +
"indicating the corresponding thread is no longer part of the group", error);
"indicating the corresponding thread is no longer part of the group", error);
} catch (final TimeoutException timeoutException) {
log.error(
String.format("Committing task(s) %s failed.",
offsetsPerTask
.keySet()
.stream()
.map(t -> t.id().toString())
.collect(Collectors.joining(", "))),
timeoutException
String.format("Committing task(s) %s failed.",
offsetsPerTask
.keySet()
.stream()
.map(t -> t.id().toString())
.collect(Collectors.joining(", "))),
timeoutException
);
throw timeoutException;
} catch (final KafkaException error) {
@ -1280,7 +1269,7 @@ public class TaskManager {
}
final Set<TaskId> allRemovedTasks =
union(HashSet::new, activeTasksToRemove, standbyTasksToRemove).stream().map(Task::id).collect(Collectors.toSet());
union(HashSet::new, activeTasksToRemove, standbyTasksToRemove).stream().map(Task::id).collect(Collectors.toSet());
closeAndCleanUpTasks(activeTasksToRemove, standbyTasksToRemove, true);
allRemovedTasks.forEach(tasks::removeTaskBeforeClosing);
releaseLockedDirectoriesForTasks(allRemovedTasks);
@ -1292,14 +1281,6 @@ public class TaskManager {
}
}
long getInputBufferSizeInBytes() {
long bytesBuffered = 0L;
for (final Task task : activeTaskIterable()) {
bytesBuffered += ((StreamTask) task).totalBytesBuffered();
}
return bytesBuffered;
}
/**
* @throws TaskMigratedException if the task producer got fenced (EOS only)
* @throws StreamsException if any task threw an exception while processing
@ -1319,14 +1300,14 @@ public class TaskManager {
} catch (final TimeoutException timeoutException) {
task.maybeInitTaskTimeoutOrThrow(now, timeoutException);
log.debug(
String.format(
"Could not complete processing records for %s due to the following exception; will move to next task and retry later",
task.id()),
timeoutException
String.format(
"Could not complete processing records for %s due to the following exception; will move to next task and retry later",
task.id()),
timeoutException
);
} catch (final TaskMigratedException e) {
log.info("Failed to process stream task {} since it got migrated to another thread already. " +
"Will trigger a new rebalance and close all tasks as zombies together.", task.id());
"Will trigger a new rebalance and close all tasks as zombies together.", task.id());
throw e;
} catch (final StreamsException e) {
log.error("Failed to process stream task {} due to the following error:", task.id(), e);
@ -1367,7 +1348,7 @@ public class TaskManager {
}
} catch (final TaskMigratedException e) {
log.info("Failed to punctuate stream task {} since it got migrated to another thread already. " +
"Will trigger a new rebalance and close all tasks as zombies together.", task.id());
"Will trigger a new rebalance and close all tasks as zombies together.", task.id());
throw e;
} catch (final StreamsException e) {
log.error("Failed to punctuate stream task {} due to the following error:", task.id(), e);
@ -1390,7 +1371,7 @@ public class TaskManager {
if (deleteRecordsResult != null && deleteRecordsResult.all().isCompletedExceptionally()) {
log.debug("Previous delete-records request has failed: {}. Try sending the new request now",
deleteRecordsResult.lowWatermarks());
deleteRecordsResult.lowWatermarks());
}
final Map<TopicPartition, RecordsToDelete> recordsToDelete = new HashMap<>();
@ -1424,13 +1405,13 @@ public class TaskManager {
stringBuilder.append(indent).append("\tTasks:\n");
for (final Task task : tasks.allTasks()) {
stringBuilder.append(indent)
.append("\t\t")
.append(task.id())
.append(" ")
.append(task.state())
.append(" ")
.append(task.getClass().getSimpleName())
.append('(').append(task.isActive() ? "active" : "standby").append(')');
.append("\t\t")
.append(task.id())
.append(" ")
.append(task.state())
.append(" ")
.append(task.getClass().getSimpleName())
.append('(').append(task.isActive() ? "active" : "standby").append(')');
}
return stringBuilder.toString();
}
@ -1478,12 +1459,12 @@ public class TaskManager {
final String name,
final Logger log) {
executeAndMaybeSwallow(
clean,
runnable,
e -> {
throw e;
},
e -> log.debug("Ignoring error in unclean {}", name));
clean,
runnable,
e -> {
throw e;
},
e -> log.debug("Ignoring error in unclean {}", name));
}
boolean needsInitializationOrRestoration() {

View File

@ -84,9 +84,6 @@ public class TaskMetrics {
private static final String NUM_BUFFERED_RECORDS_DESCRIPTION = "The count of buffered records that are polled " +
"from consumer and not yet processed for this active task";
private static final String INPUT_BUFFER_BYTES_TOTAL = "input-buffer-bytes-total";
private static final String INPUT_BUFFER_BYTES_TOTAL_DESCRIPTION = "The total number of bytes accumulated in this task's input buffer";
public static Sensor processLatencySensor(final String threadId,
final String taskId,
final StreamsMetricsImpl streamsMetrics) {
@ -131,22 +128,6 @@ public class TaskMetrics {
return sensor;
}
public static Sensor totalBytesSensor(final String threadId,
final String taskId,
final StreamsMetricsImpl streamsMetrics) {
final String name = INPUT_BUFFER_BYTES_TOTAL;
final Sensor sensor = streamsMetrics.taskLevelSensor(threadId, taskId, name, Sensor.RecordingLevel.DEBUG);
addValueMetricToSensor(
sensor,
TASK_LEVEL_GROUP,
streamsMetrics.taskLevelTagMap(threadId, taskId),
name,
INPUT_BUFFER_BYTES_TOTAL_DESCRIPTION
);
return sensor;
}
public static Sensor punctuateSensor(final String threadId,
final String taskId,
final StreamsMetricsImpl streamsMetrics) {

View File

@ -30,10 +30,9 @@ import org.slf4j.LoggerFactory;
import java.util.Properties;
import java.util.function.Supplier;
import static org.apache.kafka.streams.StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_DOC;
import static org.apache.kafka.streams.StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.CACHE_MAX_BYTES_BUFFERING_DOC;
import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC;
@ -50,7 +49,6 @@ import static org.apache.kafka.streams.StreamsConfig.TASK_TIMEOUT_MS_DOC;
* determine the defaults, which can then be overridden for specific topologies by passing them in when creating the
* topology builders via the {@link org.apache.kafka.streams.StreamsBuilder()} method.
*/
@SuppressWarnings("deprecation")
public class TopologyConfig extends AbstractConfig {
private static final ConfigDef CONFIG;
static {
@ -60,11 +58,6 @@ public class TopologyConfig extends AbstractConfig {
null,
Importance.LOW,
BUFFERED_RECORDS_PER_PARTITION_DOC)
.define(STATESTORE_CACHE_MAX_BYTES_CONFIG,
Type.LONG,
null,
Importance.MEDIUM,
CACHE_MAX_BYTES_BUFFERING_DOC)
.define(CACHE_MAX_BYTES_BUFFERING_CONFIG,
Type.LONG,
null,
@ -123,30 +116,14 @@ public class TopologyConfig extends AbstractConfig {
maxBufferedSize = getInt(BUFFERED_RECORDS_PER_PARTITION_CONFIG);
log.info("Topology {} is overriding {} to {}", topologyName, BUFFERED_RECORDS_PER_PARTITION_CONFIG, maxBufferedSize);
} else {
maxBufferedSize = globalAppConfigs.originals().containsKey(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG)
? globalAppConfigs.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG) : -1;
maxBufferedSize = globalAppConfigs.getInt(BUFFERED_RECORDS_PER_PARTITION_CONFIG);
}
if (isTopologyOverride(STATESTORE_CACHE_MAX_BYTES_CONFIG, topologyOverrides) ||
isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides)) {
if (isTopologyOverride(STATESTORE_CACHE_MAX_BYTES_CONFIG, topologyOverrides) && isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides)) {
cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG);
log.info("Topology {} is using both {} and deprecated config {}. overriding {} to {}",
topologyName,
STATESTORE_CACHE_MAX_BYTES_CONFIG,
CACHE_MAX_BYTES_BUFFERING_CONFIG,
STATESTORE_CACHE_MAX_BYTES_CONFIG,
cacheSize);
} else if (isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides)) {
cacheSize = getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG);
log.info("Topology {} is using deprecated config {}. overriding {} to {}", topologyName, CACHE_MAX_BYTES_BUFFERING_CONFIG, CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSize);
} else {
cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG);
log.info("Topology {} is overriding {} to {}", topologyName, STATESTORE_CACHE_MAX_BYTES_CONFIG, cacheSize);
}
if (isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides)) {
cacheSize = getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG);
log.info("Topology {} is overriding {} to {}", topologyName, CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSize);
} else {
cacheSize = globalAppConfigs.getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG);
cacheSize = globalAppConfigs.getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG);
}
if (isTopologyOverride(MAX_TASK_IDLE_MS_CONFIG, topologyOverrides)) {

View File

@ -228,7 +228,6 @@ public class KafkaStreamsTest {
anyObject(Time.class),
anyObject(StreamsMetadataState.class),
anyLong(),
anyLong(),
anyObject(StateDirectory.class),
anyObject(StateRestoreListener.class),
anyInt(),
@ -240,10 +239,6 @@ public class KafkaStreamsTest {
EasyMock.expect(StreamThread.processingMode(anyObject(StreamsConfig.class))).andReturn(StreamThread.ProcessingMode.AT_LEAST_ONCE).anyTimes();
EasyMock.expect(streamThreadOne.getId()).andReturn(1L).anyTimes();
EasyMock.expect(streamThreadTwo.getId()).andReturn(2L).anyTimes();
EasyMock.expect(streamThreadOne.getCacheSize()).andReturn(10485760L).anyTimes();
EasyMock.expect(streamThreadOne.getMaxBufferSize()).andReturn(536870912L).anyTimes();
EasyMock.expect(streamThreadTwo.getCacheSize()).andReturn(10485760L).anyTimes();
EasyMock.expect(streamThreadTwo.getMaxBufferSize()).andReturn(536870912L).anyTimes();
prepareStreamThread(streamThreadOne, 1, true);
prepareStreamThread(streamThreadTwo, 2, false);
@ -292,8 +287,6 @@ public class KafkaStreamsTest {
EasyMock.expect(globalStreamThread.stillRunning()).andReturn(globalThreadState.get() == GlobalStreamThread.State.RUNNING).anyTimes();
globalStreamThread.join();
EasyMock.expectLastCall().anyTimes();
globalStreamThread.resize(EasyMock.anyLong());
EasyMock.expectLastCall().anyTimes();
PowerMock.replay(
StreamThread.class,
@ -349,7 +342,7 @@ public class KafkaStreamsTest {
).anyTimes();
EasyMock.expect(thread.waitOnThreadState(EasyMock.isA(StreamThread.State.class), anyLong())).andStubReturn(true);
EasyMock.expect(thread.isAlive()).andReturn(true).times(0, 1);
thread.resizeCacheAndBufferMemory(EasyMock.anyLong(), EasyMock.anyLong());
thread.resizeCache(EasyMock.anyLong());
EasyMock.expectLastCall().anyTimes();
thread.requestLeaveGroupDuringShutdown();
EasyMock.expectLastCall().anyTimes();

View File

@ -1124,36 +1124,6 @@ public class StreamsConfigTest {
assertThrows(ConfigException.class, () -> new StreamsConfig(props));
}
@Test
@SuppressWarnings("deprecation")
public void shouldUseStateStoreCacheMaxBytesWhenBothOldAndNewConfigsAreSet() {
props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 100);
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10);
final StreamsConfig config = new StreamsConfig(props);
assertEquals(config.getTotalCacheSize(), 100);
}
@Test
@SuppressWarnings("deprecation")
public void shouldUseCacheMaxBytesBufferingConfigWhenOnlyDeprecatedConfigIsSet() {
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10);
final StreamsConfig config = new StreamsConfig(props);
assertEquals(config.getTotalCacheSize(), 10);
}
@Test
public void shouldUseStateStoreCacheMaxBytesWhenNewConfigIsSet() {
props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 10);
final StreamsConfig config = new StreamsConfig(props);
assertEquals(config.getTotalCacheSize(), 10);
}
@Test
public void shouldUseDefaultStateStoreCacheMaxBytesConfigWhenNoConfigIsSet() {
final StreamsConfig config = new StreamsConfig(props);
assertEquals(config.getTotalCacheSize(), 10 * 1024 * 1024);
}
static class MisconfiguredSerde implements Serde<Object> {
@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {

View File

@ -123,7 +123,7 @@ public abstract class AbstractJoinIntegrationTest {
void prepareEnvironment() throws InterruptedException {
if (!cacheEnabled) {
STREAMS_CONFIG.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
}
STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot().getPath());

View File

@ -147,7 +147,7 @@ public abstract class AbstractResetIntegrationTest {
streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot().getPath());
streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfig.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
streamsConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

View File

@ -375,7 +375,7 @@ public class AdjustStreamThreadCountTest {
final Properties props = new Properties();
props.putAll(properties);
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, totalCacheBytes);
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, totalCacheBytes);
try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), props)) {
addStreamStateChangeListener(kafkaStreams);
@ -386,32 +386,7 @@ public class AdjustStreamThreadCountTest {
for (final String log : appender.getMessages()) {
// all 10 bytes should be available for remaining thread
if (log.contains("Resizing thread cache/max buffer size due to removal of thread ") && log.contains(", new cache size/max buffer size per thread is 10/536870912")) {
return;
}
}
}
}
fail();
}
@Test
public void shouldResizeMaxBufferAfterThreadRemovalTimesOut() throws InterruptedException {
final long maxBufferBytes = 10L;
final Properties props = new Properties();
props.putAll(properties);
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
props.put(StreamsConfig.INPUT_BUFFER_MAX_BYTES_CONFIG, maxBufferBytes);
try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), props)) {
addStreamStateChangeListener(kafkaStreams);
startStreamsAndWaitForRunning(kafkaStreams);
try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(KafkaStreams.class)) {
assertThrows(TimeoutException.class, () -> kafkaStreams.removeStreamThread(Duration.ofSeconds(0)));
for (final String log : appender.getMessages()) {
// all 10 bytes should be available for remaining thread
if (log.contains("Resizing thread cache/max buffer size due to removal of thread ") && log.contains(", new cache size/max buffer size per thread is 10485760/10")) {
if (log.endsWith("Resizing thread cache due to thread removal, new cache size per thread is 10")) {
return;
}
}
@ -426,7 +401,7 @@ public class AdjustStreamThreadCountTest {
final Properties props = new Properties();
props.putAll(properties);
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, totalCacheBytes);
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, totalCacheBytes);
final AtomicBoolean injectError = new AtomicBoolean(false);
@ -467,63 +442,7 @@ public class AdjustStreamThreadCountTest {
for (final String log : appender.getMessages()) {
// after we replace the thread there should be two remaining threads with 5 bytes each
if (log.endsWith("Adding StreamThread-3, there are now 3 threads with cache size/max buffer size values as 3/178956970 per thread.")) {
return;
}
}
}
}
fail();
}
@Test
public void shouldResizeMaxBufferAfterThreadReplacement() throws InterruptedException {
final long totalCacheBytes = 10L;
final Properties props = new Properties();
props.putAll(properties);
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
props.put(StreamsConfig.INPUT_BUFFER_MAX_BYTES_CONFIG, totalCacheBytes);
final AtomicBoolean injectError = new AtomicBoolean(false);
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String> stream = builder.stream(inputTopic);
stream.transform(() -> new Transformer<String, String, KeyValue<String, String>>() {
@Override
public void init(final ProcessorContext context) {
context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, timestamp -> {
if (Thread.currentThread().getName().endsWith("StreamThread-1") && injectError.get()) {
injectError.set(false);
throw new RuntimeException("BOOM");
}
});
}
@Override
public KeyValue<String, String> transform(final String key, final String value) {
return new KeyValue<>(key, value);
}
@Override
public void close() {
}
});
try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), props)) {
addStreamStateChangeListener(kafkaStreams);
kafkaStreams.setUncaughtExceptionHandler(e -> StreamThreadExceptionResponse.REPLACE_THREAD);
startStreamsAndWaitForRunning(kafkaStreams);
stateTransitionHistory.clear();
try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister()) {
injectError.set(true);
waitForCondition(() -> !injectError.get(), "StreamThread did not hit and reset the injected error");
waitForTransitionFromRebalancingToRunning();
for (final String log : appender.getMessages()) {
// after we replace the thread there should be two remaining threads with 5 bytes each
if (log.endsWith("Adding StreamThread-3, there are now 3 threads with cache size/max buffer size values as 3495253/3 per thread.")) {
if (log.endsWith("Adding StreamThread-3, there will now be 2 live threads and the new cache size per thread is 5")) {
return;
}
}

View File

@ -90,7 +90,7 @@ public class EmitOnChangeIntegrationTest {
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()),
mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1),
mkEntry(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0),
mkEntry(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0),
mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 300000L),
mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class),
mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class),

View File

@ -250,7 +250,7 @@ public class EosIntegrationTest {
final Properties properties = new Properties();
properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig);
properties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 1);
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), "1000");
@ -341,7 +341,7 @@ public class EosIntegrationTest {
final Properties properties = new Properties();
properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig);
properties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
properties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
@ -990,7 +990,7 @@ public class EosIntegrationTest {
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), maxPollIntervalMs);
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), maxPollIntervalMs - 1);
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), maxPollIntervalMs);
properties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
properties.put(StreamsConfig.STATE_DIR_CONFIG, stateTmpDir + appDir);
properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, dummyHostName + ":2142");

View File

@ -944,7 +944,7 @@ public class EosV2UpgradeIntegrationTest {
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), MAX_POLL_INTERVAL_MS);
properties.put(StreamsConfig.producerPrefix(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG), (int) commitInterval);
properties.put(StreamsConfig.producerPrefix(ProducerConfig.PARTITIONER_CLASS_CONFIG), KeyPartitioner.class);
properties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
properties.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath() + File.separator + appDir);
properties.put(InternalConfig.ASSIGNMENT_LISTENER, assignmentListener);

View File

@ -144,7 +144,7 @@ public class FineGrainedAutoResetIntegrationTest {
public void setUp() throws IOException {
final Properties props = new Properties();
props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
props.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
@ -283,7 +283,7 @@ public class FineGrainedAutoResetIntegrationTest {
@Test
public void shouldThrowStreamsExceptionNoResetSpecified() throws InterruptedException {
final Properties props = new Properties();
props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
props.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");

View File

@ -132,7 +132,7 @@ public class GlobalKTableEOSIntegrationTest {
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0L);
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0L);
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig);
streamsConfiguration.put(StreamsConfig.TASK_TIMEOUT_MS_CONFIG, 1L);

View File

@ -111,7 +111,7 @@ public class GlobalKTableIntegrationTest {
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
globalTable = builder.globalTable(globalTableTopic, Consumed.with(Serdes.Long(), Serdes.String()),
Materialized.<Long, String, KeyValueStore<Bytes, byte[]>>as(globalStore)

View File

@ -118,7 +118,7 @@ public class GlobalThreadShutDownOrderTest {
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
final Consumed<String, Long> stringLongConsumed = Consumed.with(Serdes.String(), Serdes.Long());

View File

@ -104,7 +104,7 @@ public class InternalTopicIntegrationTest {
streamsProp.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsProp.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
streamsProp.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
streamsProp.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
streamsProp.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
streamsProp.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
}

View File

@ -107,7 +107,7 @@ public class KStreamAggregationDedupIntegrationTest {
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL_MS);
streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 10 * 1024 * 1024L);
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L);
final KeyValueMapper<Integer, String, String> mapper = MockMapper.selectValueMapper();
stream = builder.stream(streamOneInput, Consumed.with(Serdes.Integer(), Serdes.String()));

View File

@ -142,7 +142,7 @@ public class KStreamAggregationIntegrationTest {
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());

View File

@ -138,7 +138,7 @@ public class KStreamRepartitionIntegrationTest {
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

View File

@ -181,7 +181,6 @@ public class KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest {
assertEquals(expectedResult, result);
}
@SuppressWarnings("deprecation")
private static Properties getStreamsConfig() {
final Properties streamsConfig = new Properties();
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "KTable-FKJ-Partitioner");

View File

@ -208,7 +208,7 @@ public class KTableKTableForeignKeyInnerJoinMultiIntegrationTest {
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "KTable-FKJ-Multi");
streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfig.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
return streamsConfig;

View File

@ -69,7 +69,7 @@ public class KTableSourceTopicRestartIntegrationTest {
STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
STREAMS_CONFIG.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5L);
STREAMS_CONFIG.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
STREAMS_CONFIG.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000);

View File

@ -195,7 +195,6 @@ public class MetricsIntegrationTest {
private static final String THREAD_START_TIME = "thread-start-time";
private static final String ACTIVE_PROCESS_RATIO = "active-process-ratio";
private static final String ACTIVE_BUFFER_COUNT = "active-buffer-count";
private static final String INPUT_BUFFER_BYTES_TOTAL = "input-buffer-bytes-total";
private static final String SKIPPED_RECORDS_RATE = "skipped-records-rate";
private static final String SKIPPED_RECORDS_TOTAL = "skipped-records-total";
private static final String RECORD_LATENESS_AVG = "record-lateness-avg";
@ -252,7 +251,7 @@ public class MetricsIntegrationTest {
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfiguration.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, Sensor.RecordingLevel.DEBUG.name);
streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 10 * 1024 * 1024L);
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L);
streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, NUM_THREADS);
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
}
@ -528,7 +527,6 @@ public class MetricsIntegrationTest {
checkMetricByName(listMetricTask, PUNCTUATE_TOTAL, 4);
checkMetricByName(listMetricTask, PROCESS_RATE, 4);
checkMetricByName(listMetricTask, PROCESS_TOTAL, 4);
checkMetricByName(listMetricTask, INPUT_BUFFER_BYTES_TOTAL, 4);
}
private void checkProcessorNodeLevelMetrics() {

View File

@ -198,7 +198,7 @@ public class OptimizedKTableIntegrationTest {
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
config.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 200);
config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000);

View File

@ -964,7 +964,7 @@ public class QueryableStateIntegrationTest {
}
private void verifyCanQueryState(final int cacheSizeBytes) throws Exception {
streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, cacheSizeBytes);
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes);
final StreamsBuilder builder = new StreamsBuilder();
final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"};

View File

@ -129,7 +129,7 @@ public class RegexSourceIntegrationTest {
public void setUp() throws InterruptedException {
outputTopic = createTopic(topicSuffixGenerator.incrementAndGet());
final Properties properties = new Properties();
properties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
properties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

View File

@ -121,7 +121,7 @@ public class RestoreIntegrationTest {
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId).getPath());
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());

View File

@ -198,7 +198,7 @@ public class RocksDBMetricsIntegrationTest {
streamsConfiguration.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, Sensor.RecordingLevel.DEBUG.name);
streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, processingGuarantee);
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
return streamsConfiguration;
}

View File

@ -400,7 +400,7 @@ public class StandbyTaskEOSIntegrationTest {
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, stateDirPath);
streamsConfiguration.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig);

View File

@ -93,7 +93,7 @@ public class StoreUpgradeIntegrationTest {
final String safeTestName = safeUniqueTestName(getClass(), testName);
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());

View File

@ -121,7 +121,7 @@ public class StreamTableJoinTopologyOptimizationIntegrationTest {
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

View File

@ -53,7 +53,7 @@ import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
@SuppressWarnings({"unchecked"})
@SuppressWarnings("unchecked")
public class KTableFilterTest {
private final Consumed<String, Integer> consumed = Consumed.with(Serdes.String(), Serdes.Integer());
private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.Integer());
@ -61,7 +61,7 @@ public class KTableFilterTest {
@Before
public void setUp() {
// disable caching at the config level
props.setProperty(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, "0");
props.setProperty(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0");
}
private final Predicate<String, Integer> predicate = (key, value) -> (value % 2) == 0;

View File

@ -71,7 +71,7 @@ public class SessionWindowedKStreamImplTest {
@Test
public void shouldCountSessionWindowedWithCachingDisabled() {
props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
shouldCountSessionWindowed();
}

View File

@ -940,22 +940,9 @@ public class InternalTopologyBuilderTest {
}
@Test
@SuppressWarnings("deprecation")
public void shouldUseNonDeprecatedConfigToSetCacheBytesWhenBothDeprecatedAndNonDeprecatedConfigsUsed() {
final Properties globalProps = StreamsTestUtils.getStreamsConfig();
globalProps.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 200L);
globalProps.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 100L);
final StreamsConfig globalStreamsConfig = new StreamsConfig(globalProps);
final InternalTopologyBuilder topologyBuilder = builder.rewriteTopology(globalStreamsConfig);
assertThat(topologyBuilder.topologyConfigs(), equalTo(new TopologyConfig(null, globalStreamsConfig, new Properties())));
assertThat(topologyBuilder.topologyConfigs().cacheSize, equalTo(200L));
}
@Test
@SuppressWarnings("deprecation")
public void shouldOverrideGlobalStreamsConfigWhenGivenNamedTopologyProps() {
final Properties topologyOverrides = new Properties();
topologyOverrides.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 12345L);
topologyOverrides.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 12345L);
topologyOverrides.put(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG, 500L);
topologyOverrides.put(StreamsConfig.TASK_TIMEOUT_MS_CONFIG, 1000L);
topologyOverrides.put(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, 15);
@ -979,10 +966,9 @@ public class InternalTopologyBuilderTest {
}
@Test
@SuppressWarnings("deprecation")
public void shouldNotOverrideGlobalStreamsConfigWhenGivenUnnamedTopologyProps() {
final Properties streamsProps = StreamsTestUtils.getStreamsConfig();
streamsProps.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 12345L);
streamsProps.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 12345L);
streamsProps.put(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG, 500L);
streamsProps.put(StreamsConfig.TASK_TIMEOUT_MS_CONFIG, 1000L);
streamsProps.put(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, 15);

View File

@ -19,12 +19,9 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Value;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
@ -47,8 +44,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.OptionalLong;
import java.util.UUID;
import java.util.Collections;
import java.util.Optional;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
@ -88,7 +83,6 @@ public class PartitionGroupTest {
private final Metrics metrics = new Metrics();
private final Sensor enforcedProcessingSensor = metrics.sensor(UUID.randomUUID().toString());
private final MetricName lastLatenessValue = new MetricName("record-lateness-last-value", "", "", mkMap());
private final MetricName totalBytesValue = new MetricName("total-bytes-last-value", "", "", mkMap());
private static Sensor getValueSensor(final Metrics metrics, final MetricName metricName) {
@ -490,7 +484,6 @@ public class PartitionGroupTest {
mkMap(mkEntry(partition1, queue1)),
tp -> OptionalLong.of(0L),
getValueSensor(metrics, lastLatenessValue),
getValueSensor(metrics, totalBytesValue),
enforcedProcessingSensor,
maxTaskIdleMs
);
@ -524,7 +517,6 @@ public class PartitionGroupTest {
mkMap(mkEntry(partition1, queue1)),
tp -> OptionalLong.of(0L),
getValueSensor(metrics, lastLatenessValue),
getValueSensor(metrics, totalBytesValue),
enforcedProcessingSensor,
maxTaskIdleMs
);
@ -560,7 +552,6 @@ public class PartitionGroupTest {
),
tp -> OptionalLong.of(0L),
getValueSensor(metrics, lastLatenessValue),
getValueSensor(metrics, totalBytesValue),
enforcedProcessingSensor,
StreamsConfig.MAX_TASK_IDLE_MS_DISABLED
);
@ -599,7 +590,6 @@ public class PartitionGroupTest {
),
tp -> OptionalLong.of(0L),
getValueSensor(metrics, lastLatenessValue),
getValueSensor(metrics, totalBytesValue),
enforcedProcessingSensor,
0L
);
@ -639,7 +629,6 @@ public class PartitionGroupTest {
),
tp -> lags.getOrDefault(tp, OptionalLong.empty()),
getValueSensor(metrics, lastLatenessValue),
getValueSensor(metrics, totalBytesValue),
enforcedProcessingSensor,
0L
);
@ -676,7 +665,6 @@ public class PartitionGroupTest {
),
tp -> lags.getOrDefault(tp, OptionalLong.empty()),
getValueSensor(metrics, lastLatenessValue),
getValueSensor(metrics, totalBytesValue),
enforcedProcessingSensor,
0L
);
@ -713,7 +701,6 @@ public class PartitionGroupTest {
),
tp -> OptionalLong.of(0L),
getValueSensor(metrics, lastLatenessValue),
getValueSensor(metrics, totalBytesValue),
enforcedProcessingSensor,
1L
);
@ -776,93 +763,6 @@ public class PartitionGroupTest {
}
}
@Test
public void shouldUpdateTotalBytesBufferedOnRecordsAdditionAndConsumption() {
final PartitionGroup group = getBasicGroup();
assertEquals(0, group.numBuffered());
assertEquals(0L, group.totalBytesBuffered());
// add three 3 records with timestamp 1, 5, 3 to partition-1
final List<ConsumerRecord<byte[], byte[]>> list1 = Arrays.asList(
new ConsumerRecord<>("topic", 1, 1L, new MockTime().milliseconds(), TimestampType.CREATE_TIME, recordKey.length, recordValue.length, recordKey, recordValue, new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>("topic", 1, 5L, new MockTime().milliseconds(), TimestampType.CREATE_TIME, recordKey.length, recordValue.length, recordKey, recordValue, new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>("topic", 1, 3L, new MockTime().milliseconds(), TimestampType.CREATE_TIME, recordKey.length, recordValue.length, recordKey, recordValue, new RecordHeaders(), Optional.empty()));
long partition1TotalBytes = getBytesBufferedForRawRecords(list1);
group.addRawRecords(partition1, list1);
verifyBuffered(3, 3, 0, group);
assertEquals(group.totalBytesBuffered(), partition1TotalBytes);
assertEquals(-1L, group.streamTime());
assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue());
assertThat(metrics.metric(totalBytesValue).metricValue(), is((double) partition1TotalBytes));
StampedRecord record;
final PartitionGroup.RecordInfo info = new PartitionGroup.RecordInfo();
// get first two records from partition 1
record = group.nextRecord(info, time.milliseconds());
assertEquals(record.timestamp, 1L);
record = group.nextRecord(info, time.milliseconds());
assertEquals(record.timestamp, 5L);
partition1TotalBytes -= getBytesBufferedForRawRecords(Arrays.asList(list1.get(0), list1.get(0)));
assertEquals(group.totalBytesBuffered(), partition1TotalBytes);
assertThat(metrics.metric(totalBytesValue).metricValue(), is((double) partition1TotalBytes));
// add three 3 records with timestamp 2, 4, 6 to partition-2
final List<ConsumerRecord<byte[], byte[]>> list2 = Arrays.asList(
new ConsumerRecord<>("topic", 2, 2L, record.timestamp, TimestampType.CREATE_TIME, recordKey.length, recordValue.length, recordKey, recordValue, new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>("topic", 2, 4L, record.timestamp, TimestampType.CREATE_TIME, recordKey.length, recordValue.length, recordKey, recordValue, new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>("topic", 2, 6L, record.timestamp, TimestampType.CREATE_TIME, recordKey.length, recordValue.length, recordKey, recordValue, new RecordHeaders(), Optional.empty()));
long partition2TotalBytes = getBytesBufferedForRawRecords(list2);
group.addRawRecords(partition2, list2);
// 1:[3]
// 2:[2, 4, 6]
assertEquals(group.totalBytesBuffered(), partition2TotalBytes + partition1TotalBytes);
assertThat(metrics.metric(totalBytesValue).metricValue(), is((double) partition2TotalBytes + partition1TotalBytes));
// get one record, next record should be ts=2 from partition 2
record = group.nextRecord(info, time.milliseconds());
// 1:[3]
// 2:[4, 6]
partition2TotalBytes -= getBytesBufferedForRawRecords(Collections.singletonList(list2.get(0)));
assertEquals(group.totalBytesBuffered(), partition2TotalBytes + partition1TotalBytes);
assertThat(metrics.metric(totalBytesValue).metricValue(), is((double) partition2TotalBytes + partition1TotalBytes));
assertEquals(record.timestamp, 2L);
// get one record, next up should have ts=3 from partition 1 (even though it has seen a larger max timestamp =5)
record = group.nextRecord(info, time.milliseconds());
// 1:[]
// 2:[4, 6]
partition1TotalBytes -= getBytesBufferedForRawRecords(Collections.singletonList(list2.get(2)));
assertEquals(group.totalBytesBuffered(), partition2TotalBytes + partition1TotalBytes);
assertThat(metrics.metric(totalBytesValue).metricValue(), is((double) partition2TotalBytes + partition1TotalBytes));
assertEquals(record.timestamp, 3L);
}
private long getBytesBufferedForRawRecords(final List<ConsumerRecord<byte[], byte[]>> rawRecords) {
long rawRecordsSizeInBytes = 0L;
for (final ConsumerRecord<byte[], byte[]> rawRecord : rawRecords) {
long headerSizeInBytes = 0L;
for (final Header header: rawRecord.headers().toArray()) {
headerSizeInBytes += header.key().getBytes().length + header.value().length;
}
rawRecordsSizeInBytes += rawRecord.serializedKeySize() +
rawRecord.serializedValueSize() +
8L + // timestamp
8L + // offset
rawRecord.topic().getBytes().length +
4L + // partition
headerSizeInBytes;
}
return rawRecordsSizeInBytes;
}
private PartitionGroup getBasicGroup() {
return new PartitionGroup(
logContext,
@ -873,7 +773,6 @@ public class PartitionGroupTest {
tp -> OptionalLong.of(0L),
getValueSensor(metrics, lastLatenessValue),
enforcedProcessingSensor,
getValueSensor(metrics, totalBytesValue),
maxTaskIdleMs
);
}

View File

@ -108,7 +108,7 @@ public class RepartitionOptimizingTest {
@Before
public void setUp() {
streamsConfiguration = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
streamsConfiguration.setProperty(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, Integer.toString(1024 * 10));
streamsConfiguration.setProperty(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, Integer.toString(1024 * 10));
streamsConfiguration.setProperty(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Long.toString(5000));
processorValueCollector.clear();

View File

@ -86,7 +86,7 @@ public class RepartitionWithMergeOptimizingTest {
@Before
public void setUp() {
streamsConfiguration = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
streamsConfiguration.setProperty(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, Integer.toString(1024 * 10));
streamsConfiguration.setProperty(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, Integer.toString(1024 * 10));
streamsConfiguration.setProperty(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Long.toString(5000));
}

View File

@ -108,6 +108,7 @@ public class StandbyTaskTest {
return new StreamsConfig(mkProperties(mkMap(
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, applicationId),
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171"),
mkEntry(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3"),
mkEntry(StreamsConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath()),
mkEntry(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName())
)));

View File

@ -230,7 +230,6 @@ public class StreamTaskTest {
return createConfig(eosConfig, enforcedProcessingValue, LogAndFailExceptionHandler.class.getName());
}
@SuppressWarnings("deprecation")
private static StreamsConfig createConfig(
final String eosConfig,
final String enforcedProcessingValue,

View File

@ -118,7 +118,6 @@ import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.streams.processor.internals.ClientUtils.getSharedAdminClientId;
import static org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME;
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.anyInt;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.mock;
@ -158,7 +157,6 @@ public class StreamThreadTest {
private final StateDirectory stateDirectory = new StateDirectory(config, mockTime, true, false);
private final InternalTopologyBuilder internalTopologyBuilder = new InternalTopologyBuilder();
private final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(internalTopologyBuilder);
private final long defaultMaxBufferSizeInBytes = 512 * 1024 * 1024;
private StreamsMetadataState streamsMetadataState;
private final static BiConsumer<Throwable, Boolean> HANDLER = (e, b) -> {
@ -198,6 +196,7 @@ public class StreamThreadTest {
return mkProperties(mkMap(
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID),
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171"),
mkEntry(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3"),
mkEntry(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName()),
mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath()),
mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, enableEoS ? StreamsConfig.EXACTLY_ONCE_V2 : StreamsConfig.AT_LEAST_ONCE),
@ -248,7 +247,6 @@ public class StreamThreadTest {
mockTime,
streamsMetadataState,
0,
defaultMaxBufferSizeInBytes,
stateDirectory,
new MockStateRestoreListener(),
threadIdx,
@ -496,7 +494,6 @@ public class StreamThreadTest {
mockTime,
streamsMetadataState,
0,
defaultMaxBufferSizeInBytes,
stateDirectory,
new MockStateRestoreListener(),
threadIdx,
@ -1136,8 +1133,7 @@ public class StreamThreadTest {
new LinkedList<>(),
null,
HANDLER,
null,
defaultMaxBufferSizeInBytes
null
).updateThreadMetadata(getSharedAdminClientId(CLIENT_ID));
final StreamsException thrown = assertThrows(StreamsException.class, thread::run);
@ -1523,7 +1519,6 @@ public class StreamThreadTest {
mockTime,
streamsMetadataState,
0,
defaultMaxBufferSizeInBytes,
stateDirectory,
new MockStateRestoreListener(),
threadIdx,
@ -2181,8 +2176,7 @@ public class StreamThreadTest {
new LinkedList<>(),
null,
HANDLER,
null,
defaultMaxBufferSizeInBytes
null
) {
@Override
void runOnce() {
@ -2249,8 +2243,7 @@ public class StreamThreadTest {
new LinkedList<>(),
null,
HANDLER,
null,
defaultMaxBufferSizeInBytes
null
) {
@Override
void runOnce() {
@ -2325,8 +2318,7 @@ public class StreamThreadTest {
new LinkedList<>(),
null,
HANDLER,
null,
defaultMaxBufferSizeInBytes
null
) {
@Override
void runOnce() {
@ -2396,8 +2388,7 @@ public class StreamThreadTest {
new LinkedList<>(),
null,
HANDLER,
null,
defaultMaxBufferSizeInBytes
null
) {
@Override
void runOnce() {
@ -2465,8 +2456,7 @@ public class StreamThreadTest {
new LinkedList<>(),
null,
HANDLER,
null,
defaultMaxBufferSizeInBytes
null
) {
@Override
void runOnce() {
@ -2619,205 +2609,6 @@ public class StreamThreadTest {
}
}
@Test
public void shouldPauseNonEmptyPartitionsWhenTotalBufferSizeExceedsMaxBufferSize() {
final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class);
expect(consumer.groupMetadata()).andStubReturn(consumerGroupMetadata);
expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty());
final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> records = new HashMap<>();
final List<TopicPartition> assignedPartitions = Collections.singletonList(t1p1);
consumer.assign(assignedPartitions);
records.put(t1p1, Collections.singletonList(new ConsumerRecord<>(
t1p1.topic(),
t1p1.partition(),
1,
mockTime.milliseconds(),
TimestampType.CREATE_TIME,
2,
6,
new byte[2],
new byte[6],
new RecordHeaders(),
Optional.empty())));
expect(consumer.poll(anyObject())).andReturn(new ConsumerRecords<>(records)).anyTimes();
EasyMock.replay(consumer, consumerGroupMetadata);
final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
final MetricName testMetricName = new MetricName("test_metric", "", "", new HashMap<>());
final Metric testMetric = new KafkaMetric(
new Object(),
testMetricName,
(Measurable) (config, now) -> 0,
null,
new MockTime());
final Map<MetricName, Metric> dummyProducerMetrics = singletonMap(testMetricName, testMetric);
expect(taskManager.producerMetrics()).andReturn(dummyProducerMetrics);
EasyMock.replay(taskManager);
final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
topologyMetadata.buildAndRewriteTopology();
final StreamsMetricsImpl streamsMetrics =
new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime);
final StreamThread thread = new StreamThread(
mockTime,
config,
null,
consumer,
consumer,
changelogReader,
null,
taskManager,
streamsMetrics,
topologyMetadata,
CLIENT_ID,
new LogContext(""),
new AtomicInteger(),
new AtomicLong(Long.MAX_VALUE),
new LinkedList<>(),
null,
HANDLER,
null,
10
);
thread.setState(StreamThread.State.STARTING);
thread.setState(StreamThread.State.PARTITIONS_ASSIGNED);
thread.pollPhase();
thread.setState(StreamThread.State.PARTITIONS_REVOKED);
thread.pollPhase();
EasyMock.reset(consumer);
consumer.pause(anyObject());
// Consumer.pause should be called only once, when we added the second record.
EasyMock.expectLastCall().times(1);
}
@Test
public void shouldResumePartitionsAfterConsumptionWhenTotalBufferSizeIsLTEMaxBufferSize() {
final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class);
final ChangelogReader changelogReader = EasyMock.createNiceMock(ChangelogReader.class);
expect(consumer.groupMetadata()).andStubReturn(consumerGroupMetadata);
expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty());
changelogReader.restore(anyObject());
expectLastCall().andVoid();
final Task task1 = mock(Task.class);
final Task task2 = mock(Task.class);
final TaskId taskId1 = new TaskId(0, 1);
final TaskId taskId2 = new TaskId(0, 2);
expect(task1.state()).andReturn(Task.State.RUNNING).anyTimes();
expect(task1.id()).andReturn(taskId1).anyTimes();
expect(task1.inputPartitions()).andReturn(mkSet(t1p1)).anyTimes();
expect(task1.committedOffsets()).andReturn(new HashMap<>()).anyTimes();
expect(task1.highWaterMark()).andReturn(new HashMap<>()).anyTimes();
expect(task1.timeCurrentIdlingStarted()).andReturn(Optional.empty()).anyTimes();
expect(task2.state()).andReturn(Task.State.RUNNING).anyTimes();
expect(task2.id()).andReturn(taskId2).anyTimes();
expect(task2.inputPartitions()).andReturn(mkSet(t1p2)).anyTimes();
expect(task2.committedOffsets()).andReturn(new HashMap<>()).anyTimes();
expect(task2.highWaterMark()).andReturn(new HashMap<>()).anyTimes();
expect(task2.timeCurrentIdlingStarted()).andReturn(Optional.empty()).anyTimes();
EasyMock.replay(task1, task2);
final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> records = new HashMap<>();
records.put(t1p1, Collections.singletonList(new ConsumerRecord<>(
t1p1.topic(),
t1p1.partition(),
1,
mockTime.milliseconds(),
TimestampType.CREATE_TIME,
2,
6,
new byte[2],
new byte[6],
new RecordHeaders(),
Optional.empty())));
records.put(t1p2, Collections.singletonList(new ConsumerRecord<>(
t1p2.topic(),
t1p2.partition(),
1,
mockTime.milliseconds(),
TimestampType.CREATE_TIME,
2,
6,
new byte[2],
new byte[6],
new RecordHeaders(),
Optional.empty())));
final List<TopicPartition> assignedPartitions = Arrays.asList(t1p1, t1p2);
consumer.assign(assignedPartitions);
expect(consumer.poll(anyObject())).andReturn(new ConsumerRecords<>(records));
EasyMock.replay(consumer, consumerGroupMetadata);
final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
final MetricName testMetricName = new MetricName("test_metric", "", "", new HashMap<>());
final Metric testMetric = new KafkaMetric(
new Object(),
testMetricName,
(Measurable) (config, now) -> 0,
null,
new MockTime());
final Map<MetricName, Metric> dummyProducerMetrics = singletonMap(testMetricName, testMetric);
expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet());
expect(taskManager.producerMetrics()).andReturn(dummyProducerMetrics);
expect(taskManager.activeTaskMap()).andReturn(mkMap(
mkEntry(taskId1, task1),
mkEntry(taskId2, task2)
));
expect(taskManager.tasks()).andStubReturn(mkMap(
mkEntry(taskId1, task1),
mkEntry(taskId2, task2)
));
expect(taskManager.standbyTaskMap()).andReturn(new HashMap<>());
expect(taskManager.commit(anyObject())).andReturn(0);
expect(taskManager.process(anyInt(), anyObject())).andReturn(1);
expect(taskManager.process(anyInt(), anyObject())).andReturn(1);
expect(taskManager.process(anyInt(), anyObject())).andReturn(0);
EasyMock.replay(taskManager);
final StreamsMetricsImpl streamsMetrics =
new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime);
final StreamThread thread = new StreamThread(
mockTime,
new StreamsConfig(configProps(true)),
null,
consumer,
consumer,
changelogReader,
null,
taskManager,
streamsMetrics,
new TopologyMetadata(internalTopologyBuilder, config),
CLIENT_ID,
new LogContext(""),
new AtomicInteger(),
new AtomicLong(Long.MAX_VALUE),
new LinkedList<>(),
null,
HANDLER,
null,
6
).updateThreadMetadata(getSharedAdminClientId(CLIENT_ID));
thread.setState(StreamThread.State.STARTING);
thread.setState(StreamThread.State.PARTITIONS_ASSIGNED);
thread.setState(StreamThread.State.RUNNING);
thread.runOnce();
EasyMock.reset(consumer);
consumer.resume(anyObject());
// Consumer.resume should be called only once, when we added the second record.
EasyMock.expectLastCall().times(1);
}
@Test
public void shouldTransmitTaskManagerMetrics() {
final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
@ -2886,8 +2677,7 @@ public class StreamThreadTest {
new LinkedList<>(),
null,
HANDLER,
null,
defaultMaxBufferSizeInBytes
null
);
final MetricName testMetricName = new MetricName("test_metric", "", "", new HashMap<>());
final Metric testMetric = new KafkaMetric(
@ -2944,8 +2734,7 @@ public class StreamThreadTest {
new LinkedList<>(),
null,
(e, b) -> { },
null,
defaultMaxBufferSizeInBytes
null
) {
@Override
void runOnce() {
@ -3064,8 +2853,7 @@ public class StreamThreadTest {
new LinkedList<>(),
null,
HANDLER,
null,
defaultMaxBufferSizeInBytes
null
);
}
}

View File

@ -72,7 +72,7 @@ public class BrokerCompatibilityTest {
streamsProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsProperties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
streamsProperties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
streamsProperties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
streamsProperties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, processingMode);
final int timeout = 6000;
streamsProperties.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), timeout);

View File

@ -107,7 +107,7 @@ public class EosTestClient extends SmokeTestUtil {
props.put(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, Duration.ofMinutes(1).toMillis());
props.put(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, Integer.MAX_VALUE);
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5000L); // increase commit interval to make sure a client is killed having an open transaction
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());

View File

@ -87,7 +87,7 @@ public class StreamsNamedRepartitionTest {
final Properties config = new Properties();
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsNamedRepartitionTest");
config.setProperty(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, "0");
config.setProperty(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0");
config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

View File

@ -110,7 +110,7 @@ public class StreamsOptimizedTest {
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsOptimizedTest");
config.setProperty(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, "0");
config.setProperty(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0");
config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
config.setProperty(StreamsConfig.adminClientPrefix(AdminClientConfig.RETRIES_CONFIG), "100");

View File

@ -67,7 +67,7 @@ public class StreamsStandByReplicaTest {
streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-standby-tasks");
streamsProperties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
streamsProperties.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
streamsProperties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
streamsProperties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
streamsProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsProperties.put(StreamsConfig.producerPrefix(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG), true);

View File

@ -155,7 +155,7 @@ import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
*
* <p> Note that the {@code TopologyTestDriver} processes input records synchronously.
* This implies that {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit.interval.ms} and
* {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG cache.max.bytes.buffering} configuration have no effect.
* {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache.max.bytes.buffering} configuration have no effect.
* The driver behaves as if both configs would be set to zero, i.e., as if a "commit" (and thus "flush") would happen
* after each input record.
*
@ -309,7 +309,6 @@ public class TopologyTestDriver implements Closeable {
* @param config the configuration for the topology
* @param initialWallClockTimeMs the initial value of internally mocked wall-clock time
*/
@SuppressWarnings({"unchecked", "deprecation"})
private TopologyTestDriver(final InternalTopologyBuilder builder,
final Properties config,
final long initialWallClockTimeMs) {
@ -330,7 +329,7 @@ public class TopologyTestDriver implements Closeable {
final ThreadCache cache = new ThreadCache(
logContext,
Math.max(0, streamsConfig.getLong(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG)),
Math.max(0, streamsConfig.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG)),
streamsMetrics
);