diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 5931572b331..6d7bba9c120 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -526,6 +526,8 @@ public class StreamThread extends Thread { try { runOnce(); if (assignmentErrorCode.get() == AssignorError.REBALANCE_NEEDED.code()) { + log.info("Detected that the assignor requested a rebalance. Rejoining the consumer group to " + + "trigger a new rebalance."); assignmentErrorCode.set(AssignorError.NONE.code()); mainConsumer.enforceRebalance(); } diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py index ab4f0af415c..f60aabb76fe 100644 --- a/tests/kafkatest/tests/streams/streams_upgrade_test.py +++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py @@ -539,7 +539,7 @@ class StreamsUpgradeTest(Test): timeout_sec=60, err_msg="Never saw output 'Upgrade metadata to version 8' on" + str(second_other_node.account)) - log_monitor.wait_until("Version probing detected. Rejoining the consumer group to trigger a new rebalance.", + log_monitor.wait_until("Detected that the assignor requested a rebalance. Rejoining the consumer group to trigger a new rebalance.", timeout_sec=60, err_msg="Could not detect 'Triggering new rebalance' at upgrading node " + str(node.account))