From a2a1bcc75b112b7dc414264cec795624ed4c6b86 Mon Sep 17 00:00:00 2001 From: ChickenchickenLove Date: Tue, 13 May 2025 00:01:29 +0900 Subject: [PATCH] KAFKA-19242: Fix commit bugs caused by race condition during rebalancing. (#19631) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### Motivation While investigating “events skipped in group rebalancing” ([spring‑projects/spring‑kafka#3703](https://github.com/spring-projects/spring-kafka/issues/3703)) I discovered a race condition between - the main poll/commit thread, and - the consumer‑coordinator heartbeat thread. If the main thread enters `ConsumerCoordinator.sendOffsetCommitRequest()` while the heartbeat thread is finishing a rebalance (`SyncGroupResponseHandler.handle()`), the group state transitions in the following order: ``` COMPLETING_REBALANCE → (race window) → STABLE ``` Because we read the state twice without a lock: 1. `generationIfStable()` returns `null` (state still `COMPLETING_REBALANCE`), 2. the heartbeat thread flips the state to `STABLE`, 3. the main thread re‑checks with `rebalanceInProgress()` and wrongly decides that a rebalance is still active, 4. a spurious `CommitFailedException` is returned even though the commit could succeed. For more details, please refer to sequence diagram below. image ### Impact - The exception is semantically wrong: the consumer is in a stable group, but reports failure. - Frameworks and applications that rely on the semantics of `CommitFailedException` and `RetryableCommitException` (for example `Spring Kafka`) take the wrong code path, which can ultimately skip the events and break “at‑most‑once” guarantees. ### Fix We enlarge the synchronized block in `ConsumerCoordinator.sendOffsetCommitRequest()` so that the consumer group state is examined atomically with respect to the heartbeat thread: ### Jira https://issues.apache.org/jira/browse/KAFKA-19242 https: //github.com/spring-projects/spring-kafka/issues/3703 Signed-off-by: chickenchickenlove Reviewers: David Jacot --- .../internals/ConsumerCoordinator.java | 34 ++++++++++--------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 8647476758f..7c94792ede4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -1263,23 +1263,25 @@ public final class ConsumerCoordinator extends AbstractCoordinator { final Generation generation; final String groupInstanceId; if (subscriptions.hasAutoAssignedPartitions()) { - generation = generationIfStable(); - groupInstanceId = rebalanceConfig.groupInstanceId.orElse(null); - // if the generation is null, we are not part of an active group (and we expect to be). - // the only thing we can do is fail the commit and let the user rejoin the group in poll(). - if (generation == null) { - log.info("Failing OffsetCommit request since the consumer is not part of an active group"); + synchronized (ConsumerCoordinator.this) { + generation = generationIfStable(); + groupInstanceId = rebalanceConfig.groupInstanceId.orElse(null); + // if the generation is null, we are not part of an active group (and we expect to be). + // the only thing we can do is fail the commit and let the user rejoin the group in poll(). + if (generation == null) { + log.info("Failing OffsetCommit request since the consumer is not part of an active group"); - if (rebalanceInProgress()) { - // if the client knows it is already rebalancing, we can use RebalanceInProgressException instead of - // CommitFailedException to indicate this is not a fatal error - return RequestFuture.failure(new RebalanceInProgressException("Offset commit cannot be completed since the " + - "consumer is undergoing a rebalance for auto partition assignment. You can try completing the rebalance " + - "by calling poll() and then retry the operation.")); - } else { - return RequestFuture.failure(new CommitFailedException("Offset commit cannot be completed since the " + - "consumer is not part of an active group for auto partition assignment; it is likely that the consumer " + - "was kicked out of the group.")); + if (rebalanceInProgress()) { + // if the client knows it is already rebalancing, we can use RebalanceInProgressException instead of + // CommitFailedException to indicate this is not a fatal error + return RequestFuture.failure(new RebalanceInProgressException("Offset commit cannot be completed since the " + + "consumer is undergoing a rebalance for auto partition assignment. You can try completing the rebalance " + + "by calling poll() and then retry the operation.")); + } else { + return RequestFuture.failure(new CommitFailedException("Offset commit cannot be completed since the " + + "consumer is not part of an active group for auto partition assignment; it is likely that the consumer " + + "was kicked out of the group.")); + } } } } else {