KAFKA-9885; Evict last members of a group when the maximum allowed is reached (#8525)

This PR updates the algorithm which limits the number of members within a group (`group.max.size`) to fix the following two issues:
1. As described in KAFKA-9885, we found out that multiple members of a group can be evicted if the leader of the consumer offset partition changes before the group is persisted. This happens because the current eviction logic always evict the first member rejoining the group.
2. We also found out that dynamic members, when required to have a known member id, are not always limited. The caveat is that the current logic only considers unknown members and uses the group size, which does not include the so called pending members, to accept or reject a member. In this case, when they rejoins, they are not unknown member anymore and thus could bypass the limit. See `testDynamicMembersJoinGroupWithMaxSizeAndRequiredKnownMember` for the whole scenario.

This PR changes the logic to address the above two issues and extends the tests coverage to cover all the member types.

Reviewers: Jason Gustafson <jason@confluent.io>
This commit is contained in:
David Jacot 2020-04-27 23:43:29 +02:00 committed by GitHub
parent db9e55a50f
commit c5d13dcb6c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 234 additions and 4 deletions

View File

@ -124,6 +124,35 @@ class GroupCoordinator(val brokerId: Int,
info("Shutdown complete.") info("Shutdown complete.")
} }
/**
* Verify if the group has space to accept the joining member. The various
* criteria are explained below.
*/
private def acceptJoiningMember(group: GroupMetadata, member: String): Boolean = {
group.currentState match {
// Always accept the request when the group is empty or dead
case Empty | Dead =>
true
// An existing member is accepted if it is already awaiting. New members are accepted
// up to the max group size. Note that the number of awaiting members is used here
// for two reasons:
// 1) the group size is not reliable as it could already be above the max group size
// if the max group size was reduced.
// 2) using the number of awaiting members allows to kick out the last rejoining
// members of the group.
case PreparingRebalance =>
(group.has(member) && group.get(member).isAwaitingJoin) ||
group.numAwaiting < groupConfig.groupMaxSize
// An existing member is accepted. New members are accepted up to the max group size.
// Note that the group size is used here. When the group transitions to CompletingRebalance,
// members which haven't rejoined are removed.
case CompletingRebalance | Stable =>
group.has(member) || group.size < groupConfig.groupMaxSize
}
}
def handleJoinGroup(groupId: String, def handleJoinGroup(groupId: String,
memberId: String, memberId: String,
groupInstanceId: Option[String], groupInstanceId: Option[String],
@ -152,9 +181,7 @@ class GroupCoordinator(val brokerId: Int,
responseCallback(JoinGroupResult(memberId, Errors.UNKNOWN_MEMBER_ID)) responseCallback(JoinGroupResult(memberId, Errors.UNKNOWN_MEMBER_ID))
case Some(group) => case Some(group) =>
group.inLock { group.inLock {
if ((groupIsOverCapacity(group) if (!acceptJoiningMember(group, memberId)) {
&& group.has(memberId) && !group.get(memberId).isAwaitingJoin) // oversized group, need to shed members that haven't joined yet
|| (isUnknownMember && group.size >= groupConfig.groupMaxSize)) {
group.remove(memberId) group.remove(memberId)
group.removeStaticMember(groupInstanceId) group.removeStaticMember(groupInstanceId)
responseCallback(JoinGroupResult(JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.GROUP_MAX_SIZE_REACHED)) responseCallback(JoinGroupResult(JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.GROUP_MAX_SIZE_REACHED))
@ -1051,7 +1078,8 @@ class GroupCoordinator(val brokerId: Int,
} }
} }
private def prepareRebalance(group: GroupMetadata, reason: String): Unit = { // package private for testing
private[group] def prepareRebalance(group: GroupMetadata, reason: String): Unit = {
// if any members are awaiting sync, cancel their request and have them rejoin // if any members are awaiting sync, cancel their request and have them rejoin
if (group.is(CompletingRebalance)) if (group.is(CompletingRebalance))
resetAndPropagateAssignmentError(group, Errors.REBALANCE_IN_PROGRESS) resetAndPropagateAssignmentError(group, Errors.REBALANCE_IN_PROGRESS)

View File

@ -363,6 +363,8 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
def numPending = pendingMembers.size def numPending = pendingMembers.size
def numAwaiting: Int = numMembersAwaitingJoin
def allMemberMetadata = members.values.toList def allMemberMetadata = members.values.toList
def rebalanceTimeoutMs = members.values.foldLeft(0) { (timeout, member) => def rebalanceTimeoutMs = members.values.foldLeft(0) { (timeout, member) =>

View File

@ -236,6 +236,206 @@ class GroupCoordinatorTest {
assertEquals(Errors.GROUP_MAX_SIZE_REACHED, await(errorFuture, 1).error) assertEquals(Errors.GROUP_MAX_SIZE_REACHED, await(errorFuture, 1).error)
} }
@Test
def testDynamicMembersJoinGroupWithMaxSizeAndRequiredKnownMember(): Unit = {
val requiredKnownMemberId = true
val nbMembers = GroupMaxSize + 1
// First JoinRequests
var futures = 1.to(nbMembers).map { _ =>
EasyMock.reset(replicaManager)
sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols,
None, DefaultSessionTimeout, DefaultRebalanceTimeout, requiredKnownMemberId)
}
// Get back the assigned member ids
val memberIds = futures.map(await(_, 1).memberId)
// Second JoinRequests
futures = memberIds.map { memberId =>
EasyMock.reset(replicaManager)
sendJoinGroup(groupId, memberId, protocolType, protocols,
None, DefaultSessionTimeout, DefaultRebalanceTimeout, requiredKnownMemberId)
}
// advance clock by GroupInitialRebalanceDelay to complete first InitialDelayedJoin
timer.advanceClock(GroupInitialRebalanceDelay + 1)
// advance clock by GroupInitialRebalanceDelay to complete second InitialDelayedJoin
timer.advanceClock(GroupInitialRebalanceDelay + 1)
// Awaiting results
val errors = futures.map(await(_, DefaultRebalanceTimeout + 1).error)
assertEquals(GroupMaxSize, errors.count(_ == Errors.NONE))
assertEquals(nbMembers-GroupMaxSize, errors.count(_ == Errors.GROUP_MAX_SIZE_REACHED))
// Members which were accepted can rejoin, others are rejected, while
// completing rebalance
futures = memberIds.map { memberId =>
EasyMock.reset(replicaManager)
sendJoinGroup(groupId, memberId, protocolType, protocols,
None, DefaultSessionTimeout, DefaultRebalanceTimeout, requiredKnownMemberId)
}
// Awaiting results
val rejoinErrors = futures.map(await(_, 1).error)
assertEquals(errors, rejoinErrors)
}
@Test
def testDynamicMembersJoinGroupWithMaxSize(): Unit = {
val requiredKnownMemberId = false
val nbMembers = GroupMaxSize + 1
// JoinRequests
var futures = 1.to(nbMembers).map { _ =>
EasyMock.reset(replicaManager)
sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols,
None, DefaultSessionTimeout, DefaultRebalanceTimeout, requiredKnownMemberId)
}
// advance clock by GroupInitialRebalanceDelay to complete first InitialDelayedJoin
timer.advanceClock(GroupInitialRebalanceDelay + 1)
// advance clock by GroupInitialRebalanceDelay to complete second InitialDelayedJoin
timer.advanceClock(GroupInitialRebalanceDelay + 1)
// Awaiting results
val joinGroupResults = futures.map(await(_, DefaultRebalanceTimeout + 1))
val errors = joinGroupResults.map(_.error)
assertEquals(GroupMaxSize, errors.count(_ == Errors.NONE))
assertEquals(nbMembers-GroupMaxSize, errors.count(_ == Errors.GROUP_MAX_SIZE_REACHED))
// Members which were accepted can rejoin, others are rejected, while
// completing rebalance
val memberIds = joinGroupResults.map(_.memberId)
futures = memberIds.map { memberId =>
EasyMock.reset(replicaManager)
sendJoinGroup(groupId, memberId, protocolType, protocols,
None, DefaultSessionTimeout, DefaultRebalanceTimeout, requiredKnownMemberId)
}
// Awaiting results
val rejoinErrors = futures.map(await(_, 1).error)
assertEquals(errors, rejoinErrors)
}
@Test
def testStaticMembersJoinGroupWithMaxSize(): Unit = {
val nbMembers = GroupMaxSize + 1
val instanceIds = 1.to(nbMembers).map(i => Some(s"instance-id-$i"))
// JoinRequests
var futures = instanceIds.map { instanceId =>
EasyMock.reset(replicaManager)
sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols,
instanceId, DefaultSessionTimeout, DefaultRebalanceTimeout)
}
// advance clock by GroupInitialRebalanceDelay to complete first InitialDelayedJoin
timer.advanceClock(GroupInitialRebalanceDelay + 1)
// advance clock by GroupInitialRebalanceDelay to complete second InitialDelayedJoin
timer.advanceClock(GroupInitialRebalanceDelay + 1)
// Awaiting results
val joinGroupResults = futures.map(await(_, DefaultRebalanceTimeout + 1))
val errors = joinGroupResults.map(_.error)
assertEquals(GroupMaxSize, errors.count(_ == Errors.NONE))
assertEquals(nbMembers-GroupMaxSize, errors.count(_ == Errors.GROUP_MAX_SIZE_REACHED))
// Members which were accepted can rejoin, others are rejected, while
// completing rebalance
val memberIds = joinGroupResults.map(_.memberId)
futures = instanceIds.zip(memberIds).map { case (instanceId, memberId) =>
EasyMock.reset(replicaManager)
sendJoinGroup(groupId, memberId, protocolType, protocols,
instanceId, DefaultSessionTimeout, DefaultRebalanceTimeout)
}
// Awaiting results
val rejoinErrors = futures.map(await(_, 1).error)
assertEquals(errors, rejoinErrors)
}
@Test
def testDynamicMembersCanReJoinGroupWithMaxSizeWhileRebalancing(): Unit = {
val requiredKnownMemberId = true
val nbMembers = GroupMaxSize + 1
// First JoinRequests
var futures = 1.to(nbMembers).map { _ =>
EasyMock.reset(replicaManager)
sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols,
None, DefaultSessionTimeout, DefaultRebalanceTimeout, requiredKnownMemberId)
}
// Get back the assigned member ids
val memberIds = futures.map(await(_, 1).memberId)
// Second JoinRequests
memberIds.map { memberId =>
EasyMock.reset(replicaManager)
sendJoinGroup(groupId, memberId, protocolType, protocols,
None, DefaultSessionTimeout, DefaultRebalanceTimeout, requiredKnownMemberId)
}
// Members can rejoin while rebalancing
futures = memberIds.map { memberId =>
EasyMock.reset(replicaManager)
sendJoinGroup(groupId, memberId, protocolType, protocols,
None, DefaultSessionTimeout, DefaultRebalanceTimeout, requiredKnownMemberId)
}
// advance clock by GroupInitialRebalanceDelay to complete first InitialDelayedJoin
timer.advanceClock(GroupInitialRebalanceDelay + 1)
// advance clock by GroupInitialRebalanceDelay to complete second InitialDelayedJoin
timer.advanceClock(GroupInitialRebalanceDelay + 1)
// Awaiting results
val errors = futures.map(await(_, DefaultRebalanceTimeout + 1).error)
assertEquals(GroupMaxSize, errors.count(_ == Errors.NONE))
assertEquals(nbMembers-GroupMaxSize, errors.count(_ == Errors.GROUP_MAX_SIZE_REACHED))
}
@Test
def testLastJoiningMembersAreKickedOutWhenReJoiningGroupWithMaxSize(): Unit = {
val nbMembers = GroupMaxSize + 2
val group = new GroupMetadata(groupId, Stable, new MockTime())
val memberIds = 1.to(nbMembers).map(_ => group.generateMemberId(ClientId, None))
memberIds.foreach { memberId =>
group.add(new MemberMetadata(memberId, groupId, None, ClientId, ClientHost,
DefaultRebalanceTimeout, GroupMaxSessionTimeout, protocolType, protocols))
}
groupCoordinator.groupManager.addGroup(group)
groupCoordinator.prepareRebalance(group, "")
val futures = memberIds.map { memberId =>
EasyMock.reset(replicaManager)
sendJoinGroup(groupId, memberId, protocolType, protocols,
None, GroupMaxSessionTimeout, DefaultRebalanceTimeout)
}
// advance clock by GroupInitialRebalanceDelay to complete first InitialDelayedJoin
timer.advanceClock(DefaultRebalanceTimeout + 1)
// Awaiting results
val errors = futures.map(await(_, DefaultRebalanceTimeout + 1).error)
assertEquals(Set(Errors.NONE), errors.take(GroupMaxSize).toSet)
assertEquals(Set(Errors.GROUP_MAX_SIZE_REACHED), errors.drop(GroupMaxSize).toSet)
memberIds.drop(GroupMaxSize).foreach { memberId =>
assertFalse(group.has(memberId))
}
}
@Test @Test
def testJoinGroupSessionTimeoutTooSmall(): Unit = { def testJoinGroupSessionTimeoutTooSmall(): Unit = {
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID