KAFKA-10870; Handle REBALANCE_IN_PROGRESS error in JoinGroup (#9792)

Handle REBALANCE_IN_PROGRESS error in JoinGroup, which is possible if there is a replication timeout while persisting the group state after the rebalance completes.

Reviewers: Boyang Chen <boyang@confluent.io>, Jason Gustafson <jason@confluent.io>
This commit is contained in:
Luke Chen 2021-01-05 06:20:24 +08:00 committed by GitHub
parent ac7b5d3389
commit e3ce4a6e11
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 33 additions and 0 deletions

View File

@ -658,6 +658,10 @@ public abstract class AbstractCoordinator implements Closeable {
AbstractCoordinator.this.generation = new Generation(OffsetCommitRequest.DEFAULT_GENERATION_ID, memberId, null); AbstractCoordinator.this.generation = new Generation(OffsetCommitRequest.DEFAULT_GENERATION_ID, memberId, null);
} }
future.raise(error); future.raise(error);
} else if (error == Errors.REBALANCE_IN_PROGRESS) {
log.info("JoinGroup failed due to non-fatal error: REBALANCE_IN_PROGRESS, " +
"which could indicate a replication timeout on the broker. Will retry.");
future.raise(error);
} else { } else {
// unexpected error, throw the exception // unexpected error, throw the exception
log.error("JoinGroup failed due to unexpected error: {}", error.message()); log.error("JoinGroup failed due to unexpected error: {}", error.message());

View File

@ -823,6 +823,35 @@ public class AbstractCoordinatorTest {
assertTrue(coordinator.hasUnknownGeneration()); assertTrue(coordinator.hasUnknownGeneration());
} }
@Test
public void testJoinGroupRequestWithRebalanceInProgress() {
setupCoordinator();
mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
coordinator.ensureCoordinatorReady(mockTime.timer(0));
mockClient.prepareResponse(
joinGroupFollowerResponse(defaultGeneration, memberId, JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.REBALANCE_IN_PROGRESS));
RequestFuture<ByteBuffer> future = coordinator.sendJoinGroupRequest();
assertTrue(consumerClient.poll(future, mockTime.timer(REQUEST_TIMEOUT_MS)));
assertTrue(future.exception().getClass().isInstance(Errors.REBALANCE_IN_PROGRESS.exception()));
assertEquals(Errors.REBALANCE_IN_PROGRESS.message(), future.exception().getMessage());
assertTrue(coordinator.rejoinNeededOrPending());
// make sure we'll retry on next poll
assertEquals(0, coordinator.onJoinPrepareInvokes);
assertEquals(0, coordinator.onJoinCompleteInvokes);
mockClient.prepareResponse(joinGroupFollowerResponse(defaultGeneration, memberId, JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.NONE));
mockClient.prepareResponse(syncGroupResponse(Errors.NONE));
coordinator.ensureActiveGroup();
// make sure both onJoinPrepare and onJoinComplete got called
assertEquals(1, coordinator.onJoinPrepareInvokes);
assertEquals(1, coordinator.onJoinCompleteInvokes);
}
@Test @Test
public void testLeaveGroupSentWithGroupInstanceIdUnSet() { public void testLeaveGroupSentWithGroupInstanceIdUnSet() {
checkLeaveGroupRequestSent(Optional.empty()); checkLeaveGroupRequestSent(Optional.empty());