KAFKA-19242: Fix commit bugs caused by race condition during rebalancing. (#19631)

### Motivation
While investigating “events skipped in group

rebalancing” ([spring‑projects/spring‑kafka#3703](https://github.com/spring-projects/spring-kafka/issues/3703))
I discovered a race
condition between
- the main poll/commit thread, and
- the consumer‑coordinator heartbeat thread.

If the main thread enters
`ConsumerCoordinator.sendOffsetCommitRequest()` while the heartbeat
thread is finishing a rebalance (`SyncGroupResponseHandler.handle()`),
the group state transitions in the following order:

```
COMPLETING_REBALANCE  →  (race window)  →  STABLE
```
Because we read the state twice without a lock:
1. `generationIfStable()` returns `null` (state still
`COMPLETING_REBALANCE`),
2. the heartbeat thread flips the state to `STABLE`,
3. the main thread re‑checks with `rebalanceInProgress()` and wrongly
decides that a rebalance is still active,
4. a spurious `CommitFailedException` is returned even though the commit
could succeed.

For more details, please refer to sequence diagram below.  <img
width="1494" alt="image"

src="https://github.com/user-attachments/assets/90f19af5-5e2d-4566-aece-ef764df2d89c"
/>

### Impact
- The exception is semantically wrong: the consumer is in a stable
group, but reports failure.
- Frameworks and applications that rely on the semantics of
`CommitFailedException` and `RetryableCommitException` (for example
`Spring Kafka`) take the wrong code path, which can ultimately skip the
events and break “at‑most‑once” guarantees.

### Fix
We enlarge the synchronized block in
`ConsumerCoordinator.sendOffsetCommitRequest()` so that the consumer
group state is examined atomically with respect to the heartbeat thread:

### Jira
https://issues.apache.org/jira/browse/KAFKA-19242

https: //github.com/spring-projects/spring-kafka/issues/3703

Signed-off-by: chickenchickenlove <ojt90902@naver.com>

Reviewers: David Jacot <david.jacot@gmail.com>
This commit is contained in:
ChickenchickenLove 2025-05-13 00:01:29 +09:00 committed by David Jacot
parent cf3c177936
commit 5015183c1c
1 changed files with 18 additions and 16 deletions

View File

@ -1263,23 +1263,25 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
final Generation generation;
final String groupInstanceId;
if (subscriptions.hasAutoAssignedPartitions()) {
generation = generationIfStable();
groupInstanceId = rebalanceConfig.groupInstanceId.orElse(null);
// if the generation is null, we are not part of an active group (and we expect to be).
// the only thing we can do is fail the commit and let the user rejoin the group in poll().
if (generation == null) {
log.info("Failing OffsetCommit request since the consumer is not part of an active group");
synchronized (ConsumerCoordinator.this) {
generation = generationIfStable();
groupInstanceId = rebalanceConfig.groupInstanceId.orElse(null);
// if the generation is null, we are not part of an active group (and we expect to be).
// the only thing we can do is fail the commit and let the user rejoin the group in poll().
if (generation == null) {
log.info("Failing OffsetCommit request since the consumer is not part of an active group");
if (rebalanceInProgress()) {
// if the client knows it is already rebalancing, we can use RebalanceInProgressException instead of
// CommitFailedException to indicate this is not a fatal error
return RequestFuture.failure(new RebalanceInProgressException("Offset commit cannot be completed since the " +
"consumer is undergoing a rebalance for auto partition assignment. You can try completing the rebalance " +
"by calling poll() and then retry the operation."));
} else {
return RequestFuture.failure(new CommitFailedException("Offset commit cannot be completed since the " +
"consumer is not part of an active group for auto partition assignment; it is likely that the consumer " +
"was kicked out of the group."));
if (rebalanceInProgress()) {
// if the client knows it is already rebalancing, we can use RebalanceInProgressException instead of
// CommitFailedException to indicate this is not a fatal error
return RequestFuture.failure(new RebalanceInProgressException("Offset commit cannot be completed since the " +
"consumer is undergoing a rebalance for auto partition assignment. You can try completing the rebalance " +
"by calling poll() and then retry the operation."));
} else {
return RequestFuture.failure(new CommitFailedException("Offset commit cannot be completed since the " +
"consumer is not part of an active group for auto partition assignment; it is likely that the consumer " +
"was kicked out of the group."));
}
}
}
} else {