KAFKA-8487: Only request re-join on REBALANCE_IN_PROGRESS in CommitOffsetResponse (#6894)

Plus some minor cleanups on AbstractCoordinator.

Reviewers: Boyang Chen <boyang@confluent.io>, Jason Gustafson <jason@confluent.io>
This commit is contained in:
Guozhang Wang 2019-06-11 09:48:43 -07:00 committed by GitHub
parent 1669b773ba
commit bebcbe3a04
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 138 additions and 46 deletions

View File

@ -555,7 +555,7 @@ public abstract class AbstractCoordinator implements Closeable {
// reset the member id and retry immediately
resetGeneration();
log.debug("Attempt to join group failed due to unknown member id.");
future.raise(Errors.UNKNOWN_MEMBER_ID);
future.raise(error);
} else if (error == Errors.COORDINATOR_NOT_AVAILABLE
|| error == Errors.NOT_COORDINATOR) {
// re-discover the coordinator and retry with backoff
@ -592,7 +592,7 @@ public abstract class AbstractCoordinator implements Closeable {
AbstractCoordinator.this.rejoinNeeded = true;
AbstractCoordinator.this.state = MemberState.UNJOINED;
}
future.raise(Errors.MEMBER_ID_REQUIRED);
future.raise(error);
} else {
// unexpected error, throw the exception
log.error("Attempt to join group failed due to unexpected error: {}", error.message());
@ -940,18 +940,18 @@ public abstract class AbstractCoordinator implements Closeable {
} else if (error == Errors.REBALANCE_IN_PROGRESS) {
log.info("Attempt to heartbeat failed since group is rebalancing");
requestRejoin();
future.raise(Errors.REBALANCE_IN_PROGRESS);
future.raise(error);
} else if (error == Errors.ILLEGAL_GENERATION) {
log.info("Attempt to heartbeat failed since generation {} is not current", generation.generationId);
resetGeneration();
future.raise(Errors.ILLEGAL_GENERATION);
future.raise(error);
} else if (error == Errors.FENCED_INSTANCE_ID) {
log.error("Received fatal exception: group.instance.id gets fenced");
future.raise(error);
} else if (error == Errors.UNKNOWN_MEMBER_ID) {
log.info("Attempt to heartbeat failed for since member id {} is not valid.", generation.memberId);
resetGeneration();
future.raise(Errors.UNKNOWN_MEMBER_ID);
future.raise(error);
} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
future.raise(new GroupAuthorizationException(groupId));
} else {

View File

@ -881,10 +881,20 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
log.error("Received fatal exception: group.instance.id gets fenced");
future.raise(error);
return;
} else if (error == Errors.REBALANCE_IN_PROGRESS) {
/* Consumer never tries to commit offset in between join-group and sync-group,
* and hence on broker-side it is not expected to see a commit offset request
* during CompletingRebalance phase; if it ever happens then broker would return
* this error. In this case we should just treat as a fatal CommitFailed exception.
* However, we do not need to reset generations and just request re-join, such that
* if the caller decides to proceed and poll, it would still try to proceed and re-join normally.
*/
requestRejoin();
future.raise(new CommitFailedException());
return;
} else if (error == Errors.UNKNOWN_MEMBER_ID
|| error == Errors.ILLEGAL_GENERATION
|| error == Errors.REBALANCE_IN_PROGRESS) {
// need to re-join group
|| error == Errors.ILLEGAL_GENERATION) {
// need to reset generation and re-join group
resetGeneration();
future.raise(new CommitFailedException());
return;

View File

@ -1678,15 +1678,43 @@ public class ConsumerCoordinatorTest {
new OffsetAndMetadata(100L, "metadata")), time.timer(Long.MAX_VALUE));
}
@Test(expected = CommitFailedException.class)
@Test
public void testCommitOffsetRebalanceInProgress() {
// we cannot retry if a rebalance occurs before the commit completed
final String consumerId = "leader";
subscriptions.subscribe(singleton(topic1), rebalanceListener);
// ensure metadata is up-to-date for leader
client.updateMetadata(metadataResponse);
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
// normal join group
Map<String, List<String>> memberSubscriptions = singletonMap(consumerId, singletonList(topic1));
partitionAssignor.prepare(singletonMap(consumerId, singletonList(t1p)));
client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE));
client.prepareResponse(body -> {
SyncGroupRequest sync = (SyncGroupRequest) body;
return sync.data.memberId().equals(consumerId) &&
sync.data.generationId() == 1 &&
sync.groupAssignments().containsKey(consumerId);
}, syncGroupResponse(singletonList(t1p), Errors.NONE));
coordinator.poll(time.timer(Long.MAX_VALUE));
AbstractCoordinator.Generation expectedGeneration = new AbstractCoordinator.Generation(1, consumerId, partitionAssignor.name());
assertFalse(coordinator.rejoinNeededOrPending());
assertEquals(expectedGeneration, coordinator.generation());
prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.REBALANCE_IN_PROGRESS);
coordinator.commitOffsetsSync(singletonMap(t1p,
new OffsetAndMetadata(100L, "metadata")), time.timer(Long.MAX_VALUE));
assertThrows(CommitFailedException.class, () -> coordinator.commitOffsetsSync(singletonMap(t1p,
new OffsetAndMetadata(100L, "metadata")), time.timer(Long.MAX_VALUE)));
assertTrue(coordinator.rejoinNeededOrPending());
assertEquals(expectedGeneration, coordinator.generation());
}
@Test(expected = KafkaException.class)

View File

@ -180,38 +180,48 @@ class GroupCoordinator(val brokerId: Int,
if (group.hasStaticMember(groupInstanceId)) {
val oldMemberId = group.getStaticMemberId(groupInstanceId)
if (group.is(Stable)) {
info(s"Static member $groupInstanceId with unknown member id rejoins, assigning new member id $newMemberId, while " +
s"old member $oldMemberId will be removed. No rebalance will be triggered.")
group.currentState match {
case Stable =>
info(s"Static member $groupInstanceId with unknown member id rejoins group ${group.groupId} " +
s"in ${group.currentState} state. Assigning new member id $newMemberId, while old member $oldMemberId " +
"will be removed. No rebalance will be triggered.")
val oldMember = group.replaceGroupInstance(oldMemberId, newMemberId, groupInstanceId)
val oldMember = group.replaceGroupInstance(oldMemberId, newMemberId, groupInstanceId)
// Heartbeat of old member id will expire without affection since the group no longer contains that member id.
// New heartbeat shall be scheduled with new member id.
completeAndScheduleNextHeartbeatExpiration(group, oldMember)
// Heartbeat of old member id will expire without affection since the group no longer contains that member id.
// New heartbeat shall be scheduled with new member id.
completeAndScheduleNextHeartbeatExpiration(group, oldMember)
responseCallback(JoinGroupResult(
members = if (group.isLeader(newMemberId)) {
group.currentMemberMetadata
} else {
List.empty
},
memberId = newMemberId,
generationId = group.generationId,
subProtocol = group.protocolOrNull,
leaderId = group.leaderOrNull,
error = Errors.NONE))
} else {
val knownStaticMember = group.get(oldMemberId)
updateMemberAndRebalance(group, knownStaticMember, protocols, responseCallback)
responseCallback(JoinGroupResult(
members = if (group.isLeader(newMemberId)) {
group.currentMemberMetadata
} else {
List.empty
},
memberId = newMemberId,
generationId = group.generationId,
subProtocol = group.protocolOrNull,
leaderId = group.leaderOrNull,
error = Errors.NONE))
case _ =>
info(s"Static member $groupInstanceId with unkonwn member id rejoins group ${group.groupId} " +
s"in ${group.currentState} state. Update its membership with the pre-registered old member id $oldMemberId.")
val knownStaticMember = group.get(oldMemberId)
updateMemberAndRebalance(group, knownStaticMember, protocols, responseCallback)
}
} else if (requireKnownMemberId) {
// If member id required (dynamic membership), register the member in the pending member list
// and send back a response to call for another join group request with allocated member id.
group.addPendingMember(newMemberId)
addPendingMemberExpiration(group, newMemberId, sessionTimeoutMs)
responseCallback(joinError(newMemberId, Errors.MEMBER_ID_REQUIRED))
debug(s"Dynamic member with unknown member id rejoins group ${group.groupId} in " +
s"${group.currentState} state. Created a new member id $newMemberId and request the member to rejoin with this id.")
group.addPendingMember(newMemberId)
addPendingMemberExpiration(group, newMemberId, sessionTimeoutMs)
responseCallback(joinError(newMemberId, Errors.MEMBER_ID_REQUIRED))
} else {
debug(s"Dynamic member with unknown member id rejoins group ${group.groupId} in " +
s"${group.currentState} state. Created a new member id $newMemberId for this member and add to the group.")
addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, newMemberId, groupInstanceId,
clientId, clientHost, protocolType, protocols, group, responseCallback)
@ -613,16 +623,26 @@ class GroupCoordinator(val brokerId: Int,
// The group is only using Kafka to store offsets.
// Also, for transactional offset commits we don't need to validate group membership and the generation.
groupManager.storeOffsets(group, memberId, offsetMetadata, responseCallback, producerId, producerEpoch)
} else if (group.is(CompletingRebalance)) {
responseCallback(offsetMetadata.mapValues(_ => Errors.REBALANCE_IN_PROGRESS))
} else if (!group.has(memberId)) {
responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID))
} else if (generationId != group.generationId) {
responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION))
} else {
val member = group.get(memberId)
completeAndScheduleNextHeartbeatExpiration(group, member)
groupManager.storeOffsets(group, memberId, offsetMetadata, responseCallback)
group.currentState match {
case Stable | PreparingRebalance =>
// During PreparingRebalance phase, we still allow a commit request since we rely
// on heartbeat response to eventually notify the rebalance in progress signal to the consumer
val member = group.get(memberId)
completeAndScheduleNextHeartbeatExpiration(group, member)
groupManager.storeOffsets(group, memberId, offsetMetadata, responseCallback)
case CompletingRebalance =>
// We should not receive a commit request if the group has not completed rebalance;
// but since the consumer's member.id and generation is valid, it means it has received
// the latest group generation information from the JoinResponse.
// So let's return a REBALANCE_IN_PROGRESS to let consumer handle it gracefully.
responseCallback(offsetMetadata.mapValues(_ => Errors.REBALANCE_IN_PROGRESS))
}
}
}
}

View File

@ -1176,25 +1176,25 @@ class GroupCoordinatorTest {
val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols,
rebalanceTimeout = sessionTimeout, sessionTimeout = sessionTimeout)
val assignedConsumerId = joinGroupResult.memberId
val assignedMemberId = joinGroupResult.memberId
val generationId = joinGroupResult.generationId
val joinGroupError = joinGroupResult.error
assertEquals(Errors.NONE, joinGroupError)
EasyMock.reset(replicaManager)
val (_, syncGroupError) = syncGroupLeader(groupId, generationId, assignedConsumerId, Map(assignedConsumerId -> Array[Byte]()))
val (_, syncGroupError) = syncGroupLeader(groupId, generationId, assignedMemberId, Map(assignedMemberId -> Array[Byte]()))
assertEquals(Errors.NONE, syncGroupError)
timer.advanceClock(sessionTimeout / 2)
EasyMock.reset(replicaManager)
val commitOffsetResult = commitOffsets(groupId, assignedConsumerId, generationId, Map(tp -> offset))
val commitOffsetResult = commitOffsets(groupId, assignedMemberId, generationId, Map(tp -> offset))
assertEquals(Errors.NONE, commitOffsetResult(tp))
timer.advanceClock(sessionTimeout / 2 + 100)
EasyMock.reset(replicaManager)
val heartbeatResult = heartbeat(groupId, assignedConsumerId, 1)
val heartbeatResult = heartbeat(groupId, assignedMemberId, 1)
assertEquals(Errors.NONE, heartbeatResult)
}
@ -2158,6 +2158,40 @@ class GroupCoordinatorTest {
assertEquals(Errors.REBALANCE_IN_PROGRESS, commitOffsetResult(tp))
}
@Test
def testCommitOffsetInCompletingRebalanceFromUnknownMemberId() {
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
val tp = new TopicPartition("topic", 0)
val offset = offsetAndMetadata(0)
val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols)
val assignedMemberId = joinGroupResult.memberId
val generationId = joinGroupResult.generationId
val joinGroupError = joinGroupResult.error
assertEquals(Errors.NONE, joinGroupError)
EasyMock.reset(replicaManager)
val commitOffsetResult = commitOffsets(groupId, memberId, generationId, Map(tp -> offset))
assertEquals(Errors.UNKNOWN_MEMBER_ID, commitOffsetResult(tp))
}
@Test
def testCommitOffsetInCompletingRebalanceFromIllegalGeneration() {
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
val tp = new TopicPartition("topic", 0)
val offset = offsetAndMetadata(0)
val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, protocols)
val assignedMemberId = joinGroupResult.memberId
val generationId = joinGroupResult.generationId
val joinGroupError = joinGroupResult.error
assertEquals(Errors.NONE, joinGroupError)
EasyMock.reset(replicaManager)
val commitOffsetResult = commitOffsets(groupId, assignedMemberId, generationId + 1, Map(tp -> offset))
assertEquals(Errors.ILLEGAL_GENERATION, commitOffsetResult(tp))
}
@Test
def testHeartbeatDuringRebalanceCausesRebalanceInProgress() {
// First start up a group (with a slightly larger timeout to give us time to heartbeat when the rebalance starts)
@ -2666,7 +2700,7 @@ class GroupCoordinatorTest {
}
private def commitOffsets(groupId: String,
consumerId: String,
memberId: String,
generationId: Int,
offsets: Map[TopicPartition, OffsetAndMetadata],
groupInstanceId: Option[String] = None): CommitOffsetCallbackParams = {
@ -2692,7 +2726,7 @@ class GroupCoordinatorTest {
EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(RecordBatch.MAGIC_VALUE_V1)).anyTimes()
EasyMock.replay(replicaManager)
groupCoordinator.handleCommitOffsets(groupId, consumerId, groupInstanceId, generationId, offsets, responseCallback)
groupCoordinator.handleCommitOffsets(groupId, memberId, groupInstanceId, generationId, offsets, responseCallback)
Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS))
}