From e3ce4a6e11dd8f3d8e08f39c22c4f19f30ff65b8 Mon Sep 17 00:00:00 2001 From: Luke Chen <43372967+showuon@users.noreply.github.com> Date: Tue, 5 Jan 2021 06:20:24 +0800 Subject: [PATCH] KAFKA-10870; Handle REBALANCE_IN_PROGRESS error in JoinGroup (#9792) Handle REBALANCE_IN_PROGRESS error in JoinGroup, which is possible if there is a replication timeout while persisting the group state after the rebalance completes. Reviewers: Boyang Chen , Jason Gustafson --- .../internals/AbstractCoordinator.java | 4 +++ .../internals/AbstractCoordinatorTest.java | 29 +++++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 63b0f23eba5..b1db415d43c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -658,6 +658,10 @@ public abstract class AbstractCoordinator implements Closeable { AbstractCoordinator.this.generation = new Generation(OffsetCommitRequest.DEFAULT_GENERATION_ID, memberId, null); } future.raise(error); + } else if (error == Errors.REBALANCE_IN_PROGRESS) { + log.info("JoinGroup failed due to non-fatal error: REBALANCE_IN_PROGRESS, " + + "which could indicate a replication timeout on the broker. Will retry."); + future.raise(error); } else { // unexpected error, throw the exception log.error("JoinGroup failed due to unexpected error: {}", error.message()); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java index c24fa8577cc..29bfcb81d53 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java @@ -823,6 +823,35 @@ public class AbstractCoordinatorTest { assertTrue(coordinator.hasUnknownGeneration()); } + @Test + public void testJoinGroupRequestWithRebalanceInProgress() { + setupCoordinator(); + mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + coordinator.ensureCoordinatorReady(mockTime.timer(0)); + + mockClient.prepareResponse( + joinGroupFollowerResponse(defaultGeneration, memberId, JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.REBALANCE_IN_PROGRESS)); + + RequestFuture future = coordinator.sendJoinGroupRequest(); + + assertTrue(consumerClient.poll(future, mockTime.timer(REQUEST_TIMEOUT_MS))); + assertTrue(future.exception().getClass().isInstance(Errors.REBALANCE_IN_PROGRESS.exception())); + assertEquals(Errors.REBALANCE_IN_PROGRESS.message(), future.exception().getMessage()); + assertTrue(coordinator.rejoinNeededOrPending()); + + // make sure we'll retry on next poll + assertEquals(0, coordinator.onJoinPrepareInvokes); + assertEquals(0, coordinator.onJoinCompleteInvokes); + + mockClient.prepareResponse(joinGroupFollowerResponse(defaultGeneration, memberId, JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.NONE)); + mockClient.prepareResponse(syncGroupResponse(Errors.NONE)); + + coordinator.ensureActiveGroup(); + // make sure both onJoinPrepare and onJoinComplete got called + assertEquals(1, coordinator.onJoinPrepareInvokes); + assertEquals(1, coordinator.onJoinCompleteInvokes); + } + @Test public void testLeaveGroupSentWithGroupInstanceIdUnSet() { checkLeaveGroupRequestSent(Optional.empty());