KAFKA-16097: Disable state updater in 3.7 (#15146)

Several problems are still appearing while running 3.7 with
the state updater. This change will disable the state updater
by default.
This commit is contained in:
Lucas Brutschy 2024-01-09 09:24:33 +01:00 committed by GitHub
parent 56774efdd7
commit 01ecb1ab48
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 5 additions and 4 deletions

View File

@ -1226,7 +1226,7 @@ public class StreamsConfig extends AbstractConfig {
public static final String STATE_UPDATER_ENABLED = "__state.updater.enabled__"; public static final String STATE_UPDATER_ENABLED = "__state.updater.enabled__";
public static boolean getStateUpdaterEnabled(final Map<String, Object> configs) { public static boolean getStateUpdaterEnabled(final Map<String, Object> configs) {
return InternalConfig.getBoolean(configs, InternalConfig.STATE_UPDATER_ENABLED, true); return InternalConfig.getBoolean(configs, InternalConfig.STATE_UPDATER_ENABLED, false);
} }
// Private API to enable processing threads (i.e. polling is decoupled from processing) // Private API to enable processing threads (i.e. polling is decoupled from processing)

View File

@ -1119,6 +1119,7 @@ public class EosIntegrationTest {
properties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); properties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
properties.put(StreamsConfig.STATE_DIR_CONFIG, stateTmpDir + appDir); properties.put(StreamsConfig.STATE_DIR_CONFIG, stateTmpDir + appDir);
properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, dummyHostName + ":2142"); properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, dummyHostName + ":2142");
properties.put(InternalConfig.STATE_UPDATER_ENABLED, processingThreadsEnabled);
properties.put(InternalConfig.PROCESSING_THREADS_ENABLED, processingThreadsEnabled); properties.put(InternalConfig.PROCESSING_THREADS_ENABLED, processingThreadsEnabled);
final Properties config = StreamsTestUtils.getStreamsConfig( final Properties config = StreamsTestUtils.getStreamsConfig(

View File

@ -464,10 +464,10 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
assertEquals(Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)), consumer.lastPollTimeout()); assertEquals(Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)), consumer.lastPollTimeout());
} else { } else {
if (!properties.containsKey(InternalConfig.STATE_UPDATER_ENABLED) if (!properties.containsKey(InternalConfig.STATE_UPDATER_ENABLED)
|| (boolean) properties.get(InternalConfig.STATE_UPDATER_ENABLED)) { || !((boolean) properties.get(InternalConfig.STATE_UPDATER_ENABLED))) {
assertEquals(Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)), consumer.lastPollTimeout());
} else {
assertEquals(Duration.ZERO, consumer.lastPollTimeout()); assertEquals(Duration.ZERO, consumer.lastPollTimeout());
} else {
assertEquals(Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)), consumer.lastPollTimeout());
} }
} }
} }