mirror of https://github.com/apache/kafka.git
MINOR: log warning when topology override for cache size is non-zero (#11959)
Since the topology-level cache size config only controls whether we disable the caching layer entirely for that topology, setting it to anything other than 0 has no effect. The actual cache memory is still just split evenly between the threads, and shared by all topologies. It's possible we'll want to change this in the future, but for now we should make sure to log a warning so that users who do try to set this override to some nonzero value are made aware that it doesn't work like this. Also includes some minor refactoring plus a fix for an off-by-one error in #11796 Reviewers: Luke Chen <showuon@gmail.com>, Walker Carlson <wcarlson@confluent.io>, Sagar Rao <sagarmeansocean@gmail.com>
This commit is contained in:
parent
b2cb6caa1e
commit
1317f3f77a
|
@ -188,7 +188,7 @@
|
|||
files="StreamsMetricsImpl.java"/>
|
||||
|
||||
<suppress checks="NPathComplexity"
|
||||
files="(KafkaStreams|StreamsPartitionAssignor|StreamThread|TaskManager|GlobalStateManagerImpl|KStreamImplJoin).java"/>
|
||||
files="(GlobalStateManagerImpl|KafkaStreams|KStreamImplJoin|StreamsPartitionAssignor|StreamThread|TaskManager|TopologyConfig).java"/>
|
||||
|
||||
<suppress checks="(FinalLocalVariable|UnnecessaryParentheses|BooleanExpressionComplexity|CyclomaticComplexity|WhitespaceAfter|LocalVariableName)"
|
||||
files="Murmur3.java"/>
|
||||
|
|
|
@ -109,6 +109,7 @@ import static org.apache.kafka.streams.StreamsConfig.METRICS_RECORDING_LEVEL_CON
|
|||
import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
|
||||
import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
|
||||
import static org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
|
||||
import static org.apache.kafka.streams.internals.StreamsConfigUtils.getTotalCacheSize;
|
||||
import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchEndOffsets;
|
||||
|
||||
/**
|
||||
|
@ -939,7 +940,7 @@ public class KafkaStreams implements AutoCloseable {
|
|||
streamsUncaughtExceptionHandler = this::defaultStreamsUncaughtExceptionHandler;
|
||||
delegatingStateRestoreListener = new DelegatingStateRestoreListener();
|
||||
|
||||
totalCacheSize = applicationConfigs.getTotalCacheSize();
|
||||
totalCacheSize = getTotalCacheSize(applicationConfigs);
|
||||
inputBufferMaxBytes = applicationConfigs.getLong(StreamsConfig.INPUT_BUFFER_MAX_BYTES_CONFIG);
|
||||
final int numStreamThreads = topologyMetadata.getNumStreamThreads(applicationConfigs);
|
||||
|
||||
|
@ -1047,9 +1048,9 @@ public class KafkaStreams implements AutoCloseable {
|
|||
// and then resize them later
|
||||
streamThread = createAndAddStreamThread(0L, 0L, threadIdx);
|
||||
final int numLiveThreads = getNumLiveStreamThreads();
|
||||
resizeThreadCacheAndBufferMemory(numLiveThreads + 1);
|
||||
resizeThreadCacheAndBufferMemory(numLiveThreads);
|
||||
log.info("Adding StreamThread-{}, there are now {} threads with cache size/max buffer size values as {} per thread.",
|
||||
threadIdx, numLiveThreads + 1, getThreadCacheAndBufferMemoryString());
|
||||
threadIdx, numLiveThreads, getThreadCacheAndBufferMemoryString());
|
||||
}
|
||||
|
||||
synchronized (stateLock) {
|
||||
|
|
|
@ -1536,29 +1536,6 @@ public class StreamsConfig extends AbstractConfig {
|
|||
return props;
|
||||
}
|
||||
|
||||
public long getTotalCacheSize() {
|
||||
// both deprecated and new config set. Warn and use the new one.
|
||||
if (originals().containsKey(CACHE_MAX_BYTES_BUFFERING_CONFIG) && originals().containsKey(STATESTORE_CACHE_MAX_BYTES_CONFIG)) {
|
||||
if (!getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG).equals(getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG))) {
|
||||
log.warn("Both deprecated config {} and the new config {} are set, hence {} is ignored and {} is used instead.",
|
||||
CACHE_MAX_BYTES_BUFFERING_CONFIG,
|
||||
STATESTORE_CACHE_MAX_BYTES_CONFIG,
|
||||
CACHE_MAX_BYTES_BUFFERING_CONFIG,
|
||||
STATESTORE_CACHE_MAX_BYTES_CONFIG);
|
||||
}
|
||||
return getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG);
|
||||
} else if (originals().containsKey(CACHE_MAX_BYTES_BUFFERING_CONFIG)) {
|
||||
// only deprecated config set.
|
||||
log.warn("Deprecated config {} is set, and will be used; we suggest setting the new config {} instead as deprecated {} would be removed in the future.",
|
||||
CACHE_MAX_BYTES_BUFFERING_CONFIG,
|
||||
STATESTORE_CACHE_MAX_BYTES_CONFIG,
|
||||
CACHE_MAX_BYTES_BUFFERING_CONFIG);
|
||||
return getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG);
|
||||
}
|
||||
// only new or no config set. Use default or user specified value.
|
||||
return getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the configs for the {@link Admin admin client}.
|
||||
* @param clientId clientId
|
||||
|
|
|
@ -49,6 +49,7 @@ import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DSL_STORE_CONFIG;
|
|||
import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DSL_STORE_DOC;
|
||||
import static org.apache.kafka.streams.StreamsConfig.ROCKS_DB;
|
||||
import static org.apache.kafka.streams.StreamsConfig.IN_MEMORY;
|
||||
import static org.apache.kafka.streams.internals.StreamsConfigUtils.getTotalCacheSize;
|
||||
|
||||
/**
|
||||
* Streams configs that apply at the topology level. The values in the {@link StreamsConfig} parameter of the
|
||||
|
@ -136,37 +137,54 @@ public class TopologyConfig extends AbstractConfig {
|
|||
maxBufferedSize = getInt(BUFFERED_RECORDS_PER_PARTITION_CONFIG);
|
||||
log.info("Topology {} is overriding {} to {}", topologyName, BUFFERED_RECORDS_PER_PARTITION_CONFIG, maxBufferedSize);
|
||||
} else {
|
||||
// If the user hasn't explicitly set the buffered.records.per.partition config, then leave it unbounded
|
||||
// and rely on the input.buffer.max.bytes instead to keep the memory usage under control
|
||||
maxBufferedSize = globalAppConfigs.originals().containsKey(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG)
|
||||
? globalAppConfigs.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG) : -1;
|
||||
}
|
||||
|
||||
if (isTopologyOverride(STATESTORE_CACHE_MAX_BYTES_CONFIG, topologyOverrides) && isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides)) {
|
||||
cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG);
|
||||
log.info("Topology {} is using both deprecated config {} and new config {}, hence {} is ignored and the new config {} (value {}) is used",
|
||||
topologyName,
|
||||
CACHE_MAX_BYTES_BUFFERING_CONFIG,
|
||||
STATESTORE_CACHE_MAX_BYTES_CONFIG,
|
||||
CACHE_MAX_BYTES_BUFFERING_CONFIG,
|
||||
STATESTORE_CACHE_MAX_BYTES_CONFIG,
|
||||
cacheSize);
|
||||
} else if (isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides)) {
|
||||
cacheSize = getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG);
|
||||
log.info("Topology {} is using only deprecated config {}, and will be used to set cache size to {}; " +
|
||||
"we suggest setting the new config {} instead as deprecated {} would be removed in the future.",
|
||||
topologyName,
|
||||
CACHE_MAX_BYTES_BUFFERING_CONFIG,
|
||||
cacheSize,
|
||||
STATESTORE_CACHE_MAX_BYTES_CONFIG,
|
||||
CACHE_MAX_BYTES_BUFFERING_CONFIG);
|
||||
} else if (isTopologyOverride(STATESTORE_CACHE_MAX_BYTES_CONFIG, topologyOverrides)) {
|
||||
cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG);
|
||||
final boolean stateStoreCacheMaxBytesOverridden = isTopologyOverride(STATESTORE_CACHE_MAX_BYTES_CONFIG, topologyOverrides);
|
||||
final boolean cacheMaxBytesBufferingOverridden = isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides);
|
||||
|
||||
if (!stateStoreCacheMaxBytesOverridden && !cacheMaxBytesBufferingOverridden) {
|
||||
cacheSize = getTotalCacheSize(globalAppConfigs);
|
||||
} else {
|
||||
cacheSize = globalAppConfigs.getTotalCacheSize();
|
||||
if (stateStoreCacheMaxBytesOverridden && cacheMaxBytesBufferingOverridden) {
|
||||
cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG);
|
||||
log.info("Topology {} is using both deprecated config {} and new config {}, hence {} is ignored and the new config {} (value {}) is used",
|
||||
topologyName,
|
||||
CACHE_MAX_BYTES_BUFFERING_CONFIG,
|
||||
STATESTORE_CACHE_MAX_BYTES_CONFIG,
|
||||
CACHE_MAX_BYTES_BUFFERING_CONFIG,
|
||||
STATESTORE_CACHE_MAX_BYTES_CONFIG,
|
||||
cacheSize);
|
||||
} else if (cacheMaxBytesBufferingOverridden) {
|
||||
cacheSize = getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG);
|
||||
log.info("Topology {} is using only deprecated config {}, and will be used to set cache size to {}; " +
|
||||
"we suggest setting the new config {} instead as deprecated {} would be removed in the future.",
|
||||
topologyName,
|
||||
CACHE_MAX_BYTES_BUFFERING_CONFIG,
|
||||
cacheSize,
|
||||
STATESTORE_CACHE_MAX_BYTES_CONFIG,
|
||||
CACHE_MAX_BYTES_BUFFERING_CONFIG);
|
||||
} else {
|
||||
cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG);
|
||||
}
|
||||
|
||||
if (cacheSize != 0) {
|
||||
log.warn("Topology {} is overriding cache size to {} but this will not have any effect as the "
|
||||
+ "topology-level cache size config only controls whether record buffering is enabled "
|
||||
+ "or disabled, thus the only valid override value is 0",
|
||||
topologyName, cacheSize);
|
||||
} else {
|
||||
log.info("Topology {} is overriding cache size to {}, record buffering will be disabled",
|
||||
topologyName, cacheSize);
|
||||
}
|
||||
}
|
||||
|
||||
if (isTopologyOverride(MAX_TASK_IDLE_MS_CONFIG, topologyOverrides)) {
|
||||
maxTaskIdleMs = getLong(MAX_TASK_IDLE_MS_CONFIG);
|
||||
log.info("Topology {} is overridding {} to {}", topologyName, MAX_TASK_IDLE_MS_CONFIG, maxTaskIdleMs);
|
||||
log.info("Topology {} is overriding {} to {}", topologyName, MAX_TASK_IDLE_MS_CONFIG, maxTaskIdleMs);
|
||||
} else {
|
||||
maxTaskIdleMs = globalAppConfigs.getLong(MAX_TASK_IDLE_MS_CONFIG);
|
||||
}
|
||||
|
|
|
@ -17,9 +17,16 @@
|
|||
package org.apache.kafka.streams.internals;
|
||||
|
||||
import org.apache.kafka.streams.StreamsConfig;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.kafka.streams.StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG;
|
||||
import static org.apache.kafka.streams.StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG;
|
||||
|
||||
public class StreamsConfigUtils {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(StreamsConfigUtils.class);
|
||||
|
||||
public enum ProcessingMode {
|
||||
AT_LEAST_ONCE("AT_LEAST_ONCE"),
|
||||
|
||||
|
@ -66,4 +73,28 @@ public class StreamsConfigUtils {
|
|||
return processingMode == ProcessingMode.EXACTLY_ONCE_ALPHA ||
|
||||
processingMode == ProcessingMode.EXACTLY_ONCE_V2;
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
public static long getTotalCacheSize(final StreamsConfig config) {
|
||||
// both deprecated and new config set. Warn and use the new one.
|
||||
if (config.originals().containsKey(CACHE_MAX_BYTES_BUFFERING_CONFIG) && config.originals().containsKey(STATESTORE_CACHE_MAX_BYTES_CONFIG)) {
|
||||
if (!config.getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG).equals(config.getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG))) {
|
||||
LOG.warn("Both deprecated config {} and the new config {} are set, hence {} is ignored and {} is used instead.",
|
||||
CACHE_MAX_BYTES_BUFFERING_CONFIG,
|
||||
STATESTORE_CACHE_MAX_BYTES_CONFIG,
|
||||
CACHE_MAX_BYTES_BUFFERING_CONFIG,
|
||||
STATESTORE_CACHE_MAX_BYTES_CONFIG);
|
||||
}
|
||||
return config.getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG);
|
||||
} else if (config.originals().containsKey(CACHE_MAX_BYTES_BUFFERING_CONFIG)) {
|
||||
// only deprecated config set.
|
||||
LOG.warn("Deprecated config {} is set, and will be used; we suggest setting the new config {} instead as deprecated {} would be removed in the future.",
|
||||
CACHE_MAX_BYTES_BUFFERING_CONFIG,
|
||||
STATESTORE_CACHE_MAX_BYTES_CONFIG,
|
||||
CACHE_MAX_BYTES_BUFFERING_CONFIG);
|
||||
return config.getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG);
|
||||
}
|
||||
// only new or no config set. Use default or user specified value.
|
||||
return config.getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -143,6 +143,8 @@ public class KafkaStreamsTest {
|
|||
private GlobalStreamThread globalStreamThread;
|
||||
@Mock
|
||||
private Metrics metrics;
|
||||
@Mock
|
||||
private State state;
|
||||
|
||||
private StateListenerStub streamsStateListener;
|
||||
private Capture<List<MetricsReporter>> metricsReportersCapture;
|
||||
|
@ -216,6 +218,7 @@ public class KafkaStreamsTest {
|
|||
ClientMetrics.addStateMetric(anyObject(StreamsMetricsImpl.class), anyObject());
|
||||
ClientMetrics.addNumAliveStreamThreadMetric(anyObject(StreamsMetricsImpl.class), anyObject());
|
||||
|
||||
|
||||
// setup stream threads
|
||||
PowerMock.mockStatic(StreamThread.class);
|
||||
EasyMock.expect(StreamThread.create(
|
||||
|
@ -240,6 +243,7 @@ public class KafkaStreamsTest {
|
|||
PowerMock.mockStatic(StreamsConfigUtils.class);
|
||||
EasyMock.expect(StreamsConfigUtils.processingMode(anyObject(StreamsConfig.class))).andReturn(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE).anyTimes();
|
||||
EasyMock.expect(StreamsConfigUtils.eosEnabled(anyObject(StreamsConfig.class))).andReturn(false).anyTimes();
|
||||
EasyMock.expect(StreamsConfigUtils.getTotalCacheSize(anyObject(StreamsConfig.class))).andReturn(10 * 1024 * 1024L).anyTimes();
|
||||
EasyMock.expect(streamThreadOne.getId()).andReturn(1L).anyTimes();
|
||||
EasyMock.expect(streamThreadTwo.getId()).andReturn(2L).anyTimes();
|
||||
EasyMock.expect(streamThreadOne.getCacheSize()).andReturn(10485760L).anyTimes();
|
||||
|
@ -598,6 +602,27 @@ public class KafkaStreamsTest {
|
|||
streams.start();
|
||||
final int oldSize = streams.threads.size();
|
||||
waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, 15L, "wait until running");
|
||||
EasyMock.reset(streamThreadOne, streamThreadTwo);
|
||||
EasyMock.expect(streamThreadOne.isRunning()).andStubReturn(true);
|
||||
EasyMock.expect(streamThreadTwo.isRunning()).andStubReturn(true);
|
||||
EasyMock.expect(streamThreadOne.state()).andStubReturn(StreamThread.State.RUNNING);
|
||||
EasyMock.expect(streamThreadTwo.state()).andStubReturn(StreamThread.State.RUNNING);
|
||||
EasyMock.expect(streamThreadOne.getName()).andStubReturn("processId-StreamThread-1");
|
||||
EasyMock.expect(streamThreadTwo.getName()).andStubReturn("processId-StreamThread-2");
|
||||
EasyMock.expect(streamThreadTwo.getId()).andStubReturn(2L);
|
||||
EasyMock.expect(streamThreadOne.getCacheSize()).andReturn(10485760L).anyTimes();
|
||||
EasyMock.expect(streamThreadOne.getMaxBufferSize()).andReturn(536870912L).anyTimes();
|
||||
EasyMock.expect(streamThreadTwo.getCacheSize()).andReturn(10485760L).anyTimes();
|
||||
EasyMock.expect(streamThreadTwo.getMaxBufferSize()).andReturn(536870912L).anyTimes();
|
||||
streamThreadTwo.setStateListener(EasyMock.anyObject());
|
||||
streamThreadTwo.start();
|
||||
|
||||
streamThreadOne.resizeCacheAndBufferMemory(5 * 1024 * 1024L, 256 * 1024 * 1024L);
|
||||
streamThreadTwo.resizeCacheAndBufferMemory(5 * 1024 * 1024L, 256 * 1024 * 1024L);
|
||||
streamThreadOne.shutdown();
|
||||
streamThreadTwo.shutdown();
|
||||
EasyMock.expect(state.isRunningOrRebalancing()).andStubReturn(true);
|
||||
EasyMock.replay(streamThreadOne, streamThreadTwo, state);
|
||||
assertThat(streams.addStreamThread(), equalTo(Optional.of("processId-StreamThread-" + 2)));
|
||||
assertThat(streams.threads.size(), equalTo(oldSize + 1));
|
||||
}
|
||||
|
|
|
@ -63,6 +63,7 @@ import static org.apache.kafka.streams.StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFI
|
|||
import static org.apache.kafka.streams.StreamsConfig.adminClientPrefix;
|
||||
import static org.apache.kafka.streams.StreamsConfig.consumerPrefix;
|
||||
import static org.apache.kafka.streams.StreamsConfig.producerPrefix;
|
||||
import static org.apache.kafka.streams.internals.StreamsConfigUtils.getTotalCacheSize;
|
||||
import static org.apache.kafka.test.StreamsTestUtils.getStreamsConfig;
|
||||
import static org.hamcrest.CoreMatchers.containsString;
|
||||
import static org.hamcrest.CoreMatchers.hasItem;
|
||||
|
@ -1258,7 +1259,7 @@ public class StreamsConfigTest {
|
|||
props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 100);
|
||||
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10);
|
||||
final StreamsConfig config = new StreamsConfig(props);
|
||||
assertEquals(config.getTotalCacheSize(), 100);
|
||||
assertEquals(getTotalCacheSize(config), 100);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -1266,20 +1267,20 @@ public class StreamsConfigTest {
|
|||
public void shouldUseCacheMaxBytesBufferingConfigWhenOnlyDeprecatedConfigIsSet() {
|
||||
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10);
|
||||
final StreamsConfig config = new StreamsConfig(props);
|
||||
assertEquals(config.getTotalCacheSize(), 10);
|
||||
assertEquals(getTotalCacheSize(config), 10);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldUseStateStoreCacheMaxBytesWhenNewConfigIsSet() {
|
||||
props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 10);
|
||||
final StreamsConfig config = new StreamsConfig(props);
|
||||
assertEquals(config.getTotalCacheSize(), 10);
|
||||
assertEquals(getTotalCacheSize(config), 10);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldUseDefaultStateStoreCacheMaxBytesConfigWhenNoConfigIsSet() {
|
||||
final StreamsConfig config = new StreamsConfig(props);
|
||||
assertEquals(config.getTotalCacheSize(), 10 * 1024 * 1024);
|
||||
assertEquals(getTotalCacheSize(config), 10 * 1024 * 1024);
|
||||
}
|
||||
|
||||
static class MisconfiguredSerde implements Serde<Object> {
|
||||
|
|
|
@ -421,12 +421,14 @@ public class AdjustStreamThreadCountTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void shouldResizeCacheAfterThreadReplacement() throws InterruptedException {
|
||||
public void shouldResizeCacheAndInputBufferAfterThreadReplacement() throws InterruptedException {
|
||||
final long totalCacheBytes = 10L;
|
||||
final long maxBufferBytes = 100L;
|
||||
final Properties props = new Properties();
|
||||
props.putAll(properties);
|
||||
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
|
||||
props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, totalCacheBytes);
|
||||
props.put(StreamsConfig.INPUT_BUFFER_MAX_BYTES_CONFIG, maxBufferBytes);
|
||||
|
||||
final AtomicBoolean injectError = new AtomicBoolean(false);
|
||||
|
||||
|
@ -466,64 +468,9 @@ public class AdjustStreamThreadCountTest {
|
|||
waitForTransitionFromRebalancingToRunning();
|
||||
|
||||
for (final String log : appender.getMessages()) {
|
||||
// after we replace the thread there should be two remaining threads with 5 bytes each
|
||||
if (log.endsWith("Adding StreamThread-3, there are now 3 threads with cache size/max buffer size values as 3/178956970 per thread.")) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
fail();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldResizeMaxBufferAfterThreadReplacement() throws InterruptedException {
|
||||
final long totalCacheBytes = 10L;
|
||||
final Properties props = new Properties();
|
||||
props.putAll(properties);
|
||||
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
|
||||
props.put(StreamsConfig.INPUT_BUFFER_MAX_BYTES_CONFIG, totalCacheBytes);
|
||||
|
||||
final AtomicBoolean injectError = new AtomicBoolean(false);
|
||||
|
||||
final StreamsBuilder builder = new StreamsBuilder();
|
||||
final KStream<String, String> stream = builder.stream(inputTopic);
|
||||
stream.transform(() -> new Transformer<String, String, KeyValue<String, String>>() {
|
||||
@Override
|
||||
public void init(final ProcessorContext context) {
|
||||
context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, timestamp -> {
|
||||
if (Thread.currentThread().getName().endsWith("StreamThread-1") && injectError.get()) {
|
||||
injectError.set(false);
|
||||
throw new RuntimeException("BOOM");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public KeyValue<String, String> transform(final String key, final String value) {
|
||||
return new KeyValue<>(key, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
}
|
||||
});
|
||||
|
||||
try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), props)) {
|
||||
addStreamStateChangeListener(kafkaStreams);
|
||||
kafkaStreams.setUncaughtExceptionHandler(e -> StreamThreadExceptionResponse.REPLACE_THREAD);
|
||||
startStreamsAndWaitForRunning(kafkaStreams);
|
||||
|
||||
stateTransitionHistory.clear();
|
||||
try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister()) {
|
||||
injectError.set(true);
|
||||
waitForCondition(() -> !injectError.get(), "StreamThread did not hit and reset the injected error");
|
||||
|
||||
waitForTransitionFromRebalancingToRunning();
|
||||
|
||||
for (final String log : appender.getMessages()) {
|
||||
// after we replace the thread there should be two remaining threads with 5 bytes each
|
||||
if (log.endsWith("Adding StreamThread-3, there are now 3 threads with cache size/max buffer size values as 3495253/3 per thread.")) {
|
||||
// after we replace the thread there should be two remaining threads with 5 bytes each for
|
||||
// the cache and 50 for the input buffer
|
||||
if (log.endsWith("Adding StreamThread-3, there are now 2 threads with cache size/max buffer size values as 5/50 per thread.")) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -117,6 +117,7 @@ import java.util.regex.Pattern;
|
|||
import static org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE;
|
||||
import static org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_ALPHA;
|
||||
import static org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2;
|
||||
import static org.apache.kafka.streams.internals.StreamsConfigUtils.getTotalCacheSize;
|
||||
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
|
||||
|
||||
/**
|
||||
|
@ -330,7 +331,7 @@ public class TopologyTestDriver implements Closeable {
|
|||
|
||||
final ThreadCache cache = new ThreadCache(
|
||||
logContext,
|
||||
Math.max(0, streamsConfig.getTotalCacheSize()),
|
||||
Math.max(0, getTotalCacheSize(streamsConfig)),
|
||||
streamsMetrics
|
||||
);
|
||||
|
||||
|
|
Loading…
Reference in New Issue