From 4fed0001ec1ddbfd1f8fc9af75007e89ef40ea8d Mon Sep 17 00:00:00 2001 From: Bruno Cadonna Date: Thu, 25 Nov 2021 10:48:09 +0100 Subject: [PATCH] MINOR: Fix system test StreamsCooperativeRebalanceUpgradeTest.test_upgrade_to_cooperative_rebalance (#11532) Log messages were changed in the AssignorConfiguration (#11490) that are also used for verification in system test StreamsCooperativeRebalanceUpgradeTest.test_upgrade_to_cooperative_rebalance. This commit fixes the test and adds comments to the log messages that point to the test that needs to be updated in case of changes to the log messages. Reviewers: John Roesler , Luke Chen , David Jacot --- .../internals/assignment/AssignorConfiguration.java | 8 ++++++++ .../streams/streams_cooperative_rebalance_upgrade_test.py | 4 ++-- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java index a71c2fbb48f..c14ab700bfb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java @@ -105,6 +105,10 @@ public final class AssignorConfiguration { case StreamsConfig.UPGRADE_FROM_21: case StreamsConfig.UPGRADE_FROM_22: case StreamsConfig.UPGRADE_FROM_23: + // ATTENTION: The following log messages is used for verification in system test + // streams/streams_cooperative_rebalance_upgrade_test.py::StreamsCooperativeRebalanceUpgradeTest.test_upgrade_to_cooperative_rebalance + // If you change it, please do also change the system test accordingly and + // verify whether the test passes. log.info("Eager rebalancing protocol is enabled now for upgrade from {}.x", upgradeFrom); log.warn("The eager rebalancing protocol is deprecated and will stop being supported in a future release." + " Please be prepared to remove the 'upgrade.from' config soon."); @@ -113,6 +117,10 @@ public final class AssignorConfiguration { throw new IllegalArgumentException("Unknown configuration value for parameter 'upgrade.from': " + upgradeFrom); } } + // ATTENTION: The following log messages is used for verification in system test + // streams/streams_cooperative_rebalance_upgrade_test.py::StreamsCooperativeRebalanceUpgradeTest.test_upgrade_to_cooperative_rebalance + // If you change it, please do also change the system test accordingly and + // verify whether the test passes. log.info("Cooperative rebalancing protocol is enabled now"); return RebalanceProtocol.COOPERATIVE; } diff --git a/tests/kafkatest/tests/streams/streams_cooperative_rebalance_upgrade_test.py b/tests/kafkatest/tests/streams/streams_cooperative_rebalance_upgrade_test.py index 4658a532601..b4125837d5c 100644 --- a/tests/kafkatest/tests/streams/streams_cooperative_rebalance_upgrade_test.py +++ b/tests/kafkatest/tests/streams/streams_cooperative_rebalance_upgrade_test.py @@ -39,8 +39,8 @@ class StreamsCooperativeRebalanceUpgradeTest(Test): processing_message = "Processed [0-9]* records so far" stopped_message = "COOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED" running_state_msg = "STREAMS in a RUNNING State" - cooperative_turned_off_msg = "Eager rebalancing enabled now for upgrade from %s" - cooperative_enabled_msg = "Cooperative rebalancing enabled now" + cooperative_turned_off_msg = "Eager rebalancing protocol is enabled now for upgrade from %s" + cooperative_enabled_msg = "Cooperative rebalancing protocol is enabled now" first_bounce_phase = "first_bounce_phase-" second_bounce_phase = "second_bounce_phase-"