KAFKA-10199: Enable state updater by default (#16107)

We have already enabled the state updater by default once.
However, we ran into issues that forced us to disable it again.
We think that we fixed those issues. So we want to enable the
state updater again by default.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
Bruno Cadonna 2024-06-12 07:51:38 +02:00 committed by GitHub
parent 0782232fbe
commit 39ffdea6d3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 4 additions and 4 deletions

View File

@ -1273,7 +1273,7 @@ public class StreamsConfig extends AbstractConfig {
public static final String STATE_UPDATER_ENABLED = "__state.updater.enabled__"; public static final String STATE_UPDATER_ENABLED = "__state.updater.enabled__";
public static boolean getStateUpdaterEnabled(final Map<String, Object> configs) { public static boolean getStateUpdaterEnabled(final Map<String, Object> configs) {
return InternalConfig.getBoolean(configs, InternalConfig.STATE_UPDATER_ENABLED, false); return InternalConfig.getBoolean(configs, InternalConfig.STATE_UPDATER_ENABLED, true);
} }
// Private API to enable processing threads (i.e. polling is decoupled from processing) // Private API to enable processing threads (i.e. polling is decoupled from processing)

View File

@ -468,10 +468,10 @@ public class StoreChangelogReaderTest {
assertEquals(Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)), consumer.lastPollTimeout()); assertEquals(Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)), consumer.lastPollTimeout());
} else { } else {
if (!properties.containsKey(InternalConfig.STATE_UPDATER_ENABLED) if (!properties.containsKey(InternalConfig.STATE_UPDATER_ENABLED)
|| !((boolean) properties.get(InternalConfig.STATE_UPDATER_ENABLED))) { || (boolean) properties.get(InternalConfig.STATE_UPDATER_ENABLED)) {
assertEquals(Duration.ZERO, consumer.lastPollTimeout());
} else {
assertEquals(Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)), consumer.lastPollTimeout()); assertEquals(Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)), consumer.lastPollTimeout());
} else {
assertEquals(Duration.ZERO, consumer.lastPollTimeout());
} }
} }
} }