From 39ffdea6d321ef3dd5e787aef1b1102c33448c0f Mon Sep 17 00:00:00 2001 From: Bruno Cadonna Date: Wed, 12 Jun 2024 07:51:38 +0200 Subject: [PATCH] KAFKA-10199: Enable state updater by default (#16107) We have already enabled the state updater by default once. However, we ran into issues that forced us to disable it again. We think that we fixed those issues. So we want to enable the state updater again by default. Reviewers: Lucas Brutschy , Matthias J. Sax --- .../main/java/org/apache/kafka/streams/StreamsConfig.java | 2 +- .../processor/internals/StoreChangelogReaderTest.java | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index bfea3d43680..e77e4ca795d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -1273,7 +1273,7 @@ public class StreamsConfig extends AbstractConfig { public static final String STATE_UPDATER_ENABLED = "__state.updater.enabled__"; public static boolean getStateUpdaterEnabled(final Map configs) { - return InternalConfig.getBoolean(configs, InternalConfig.STATE_UPDATER_ENABLED, false); + return InternalConfig.getBoolean(configs, InternalConfig.STATE_UPDATER_ENABLED, true); } // Private API to enable processing threads (i.e. polling is decoupled from processing) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java index 763394611b9..457508cd20e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java @@ -468,10 +468,10 @@ public class StoreChangelogReaderTest { assertEquals(Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)), consumer.lastPollTimeout()); } else { if (!properties.containsKey(InternalConfig.STATE_UPDATER_ENABLED) - || !((boolean) properties.get(InternalConfig.STATE_UPDATER_ENABLED))) { - assertEquals(Duration.ZERO, consumer.lastPollTimeout()); - } else { + || (boolean) properties.get(InternalConfig.STATE_UPDATER_ENABLED)) { assertEquals(Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)), consumer.lastPollTimeout()); + } else { + assertEquals(Duration.ZERO, consumer.lastPollTimeout()); } } }