KAFKA-19641: Fix flaky RestoreIntegrationTest#shouldInvokeUserDefinedGlobalStateRestoreListener (#20419)

What I observed is that if I run both combinations useNewProtocol=true,
useNewProtocol=false it would often fail the second time, but if I only
run the second variation useNewProtocol=false it works, and only the
first variation useNewProtocol=true also works. So this points to some
state that is not cleared between the tests  - and indeed, the test
creates a topic “inputTopic”, produces to it, but doesn’t delete it, so
the second variation will run with produce to it again and then run with
twice the data.

I also reduced heartbeat interval and session timeout since some of the
tests need to wait for the old consumer to leave which (sigh) Kafka
Streams doesn't do, so we have to wait that it gets kicked out by
session timeout. So previously we waited for 45 seconds, now, we at
least wait only 1 second.

Reviewers: Bill Bejeck <bbejeck@apache.org>, Chia-Ping Tsai
<chia7712@gmail.com>
This commit is contained in:
Lucas Brutschy 2025-08-27 20:18:50 +02:00 committed by GitHub
parent 04518f4ce1
commit 0412be9e9d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 3 additions and 0 deletions

View File

@ -175,6 +175,8 @@ public class RestoreIntegrationTest {
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class); streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class);
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class);
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
streamsConfiguration.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 500);
streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000);
streamsConfiguration.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest"); streamsConfiguration.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
streamsConfiguration.putAll(extraProperties); streamsConfiguration.putAll(extraProperties);
@ -191,6 +193,7 @@ public class RestoreIntegrationTest {
IntegrationTestUtils.purgeLocalStreamsState(streamsConfigurations); IntegrationTestUtils.purgeLocalStreamsState(streamsConfigurations);
streamsConfigurations.clear(); streamsConfigurations.clear();
CLUSTER.deleteAllTopics();
} }
@ParameterizedTest @ParameterizedTest