diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala index f178dc73fe7..def9e815555 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala @@ -124,6 +124,35 @@ class GroupCoordinator(val brokerId: Int, info("Shutdown complete.") } + /** + * Verify if the group has space to accept the joining member. The various + * criteria are explained below. + */ + private def acceptJoiningMember(group: GroupMetadata, member: String): Boolean = { + group.currentState match { + // Always accept the request when the group is empty or dead + case Empty | Dead => + true + + // An existing member is accepted if it is already awaiting. New members are accepted + // up to the max group size. Note that the number of awaiting members is used here + // for two reasons: + // 1) the group size is not reliable as it could already be above the max group size + // if the max group size was reduced. + // 2) using the number of awaiting members allows to kick out the last rejoining + // members of the group. + case PreparingRebalance => + (group.has(member) && group.get(member).isAwaitingJoin) || + group.numAwaiting < groupConfig.groupMaxSize + + // An existing member is accepted. New members are accepted up to the max group size. + // Note that the group size is used here. When the group transitions to CompletingRebalance, + // members which haven't rejoined are removed. + case CompletingRebalance | Stable => + group.has(member) || group.size < groupConfig.groupMaxSize + } + } + def handleJoinGroup(groupId: String, memberId: String, groupInstanceId: Option[String], @@ -152,9 +181,7 @@ class GroupCoordinator(val brokerId: Int, responseCallback(JoinGroupResult(memberId, Errors.UNKNOWN_MEMBER_ID)) case Some(group) => group.inLock { - if ((groupIsOverCapacity(group) - && group.has(memberId) && !group.get(memberId).isAwaitingJoin) // oversized group, need to shed members that haven't joined yet - || (isUnknownMember && group.size >= groupConfig.groupMaxSize)) { + if (!acceptJoiningMember(group, memberId)) { group.remove(memberId) group.removeStaticMember(groupInstanceId) responseCallback(JoinGroupResult(JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.GROUP_MAX_SIZE_REACHED)) @@ -1051,7 +1078,8 @@ class GroupCoordinator(val brokerId: Int, } } - private def prepareRebalance(group: GroupMetadata, reason: String): Unit = { + // package private for testing + private[group] def prepareRebalance(group: GroupMetadata, reason: String): Unit = { // if any members are awaiting sync, cancel their request and have them rejoin if (group.is(CompletingRebalance)) resetAndPropagateAssignmentError(group, Errors.REBALANCE_IN_PROGRESS) diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala index eacb432747a..7e3b470b158 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala @@ -363,6 +363,8 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState def numPending = pendingMembers.size + def numAwaiting: Int = numMembersAwaitingJoin + def allMemberMetadata = members.values.toList def rebalanceTimeoutMs = members.values.foldLeft(0) { (timeout, member) => diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala index be080da3fe7..3200f81c9ef 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala @@ -236,6 +236,206 @@ class GroupCoordinatorTest { assertEquals(Errors.GROUP_MAX_SIZE_REACHED, await(errorFuture, 1).error) } + @Test + def testDynamicMembersJoinGroupWithMaxSizeAndRequiredKnownMember(): Unit = { + val requiredKnownMemberId = true + val nbMembers = GroupMaxSize + 1 + + // First JoinRequests + var futures = 1.to(nbMembers).map { _ => + EasyMock.reset(replicaManager) + sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, + None, DefaultSessionTimeout, DefaultRebalanceTimeout, requiredKnownMemberId) + } + + // Get back the assigned member ids + val memberIds = futures.map(await(_, 1).memberId) + + // Second JoinRequests + futures = memberIds.map { memberId => + EasyMock.reset(replicaManager) + sendJoinGroup(groupId, memberId, protocolType, protocols, + None, DefaultSessionTimeout, DefaultRebalanceTimeout, requiredKnownMemberId) + } + + // advance clock by GroupInitialRebalanceDelay to complete first InitialDelayedJoin + timer.advanceClock(GroupInitialRebalanceDelay + 1) + // advance clock by GroupInitialRebalanceDelay to complete second InitialDelayedJoin + timer.advanceClock(GroupInitialRebalanceDelay + 1) + + // Awaiting results + val errors = futures.map(await(_, DefaultRebalanceTimeout + 1).error) + + assertEquals(GroupMaxSize, errors.count(_ == Errors.NONE)) + assertEquals(nbMembers-GroupMaxSize, errors.count(_ == Errors.GROUP_MAX_SIZE_REACHED)) + + // Members which were accepted can rejoin, others are rejected, while + // completing rebalance + futures = memberIds.map { memberId => + EasyMock.reset(replicaManager) + sendJoinGroup(groupId, memberId, protocolType, protocols, + None, DefaultSessionTimeout, DefaultRebalanceTimeout, requiredKnownMemberId) + } + + // Awaiting results + val rejoinErrors = futures.map(await(_, 1).error) + + assertEquals(errors, rejoinErrors) + } + + @Test + def testDynamicMembersJoinGroupWithMaxSize(): Unit = { + val requiredKnownMemberId = false + val nbMembers = GroupMaxSize + 1 + + // JoinRequests + var futures = 1.to(nbMembers).map { _ => + EasyMock.reset(replicaManager) + sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, + None, DefaultSessionTimeout, DefaultRebalanceTimeout, requiredKnownMemberId) + } + + // advance clock by GroupInitialRebalanceDelay to complete first InitialDelayedJoin + timer.advanceClock(GroupInitialRebalanceDelay + 1) + // advance clock by GroupInitialRebalanceDelay to complete second InitialDelayedJoin + timer.advanceClock(GroupInitialRebalanceDelay + 1) + + // Awaiting results + val joinGroupResults = futures.map(await(_, DefaultRebalanceTimeout + 1)) + val errors = joinGroupResults.map(_.error) + + assertEquals(GroupMaxSize, errors.count(_ == Errors.NONE)) + assertEquals(nbMembers-GroupMaxSize, errors.count(_ == Errors.GROUP_MAX_SIZE_REACHED)) + + // Members which were accepted can rejoin, others are rejected, while + // completing rebalance + val memberIds = joinGroupResults.map(_.memberId) + futures = memberIds.map { memberId => + EasyMock.reset(replicaManager) + sendJoinGroup(groupId, memberId, protocolType, protocols, + None, DefaultSessionTimeout, DefaultRebalanceTimeout, requiredKnownMemberId) + } + + // Awaiting results + val rejoinErrors = futures.map(await(_, 1).error) + + assertEquals(errors, rejoinErrors) + } + + @Test + def testStaticMembersJoinGroupWithMaxSize(): Unit = { + val nbMembers = GroupMaxSize + 1 + val instanceIds = 1.to(nbMembers).map(i => Some(s"instance-id-$i")) + + // JoinRequests + var futures = instanceIds.map { instanceId => + EasyMock.reset(replicaManager) + sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, + instanceId, DefaultSessionTimeout, DefaultRebalanceTimeout) + } + + // advance clock by GroupInitialRebalanceDelay to complete first InitialDelayedJoin + timer.advanceClock(GroupInitialRebalanceDelay + 1) + // advance clock by GroupInitialRebalanceDelay to complete second InitialDelayedJoin + timer.advanceClock(GroupInitialRebalanceDelay + 1) + + // Awaiting results + val joinGroupResults = futures.map(await(_, DefaultRebalanceTimeout + 1)) + val errors = joinGroupResults.map(_.error) + + assertEquals(GroupMaxSize, errors.count(_ == Errors.NONE)) + assertEquals(nbMembers-GroupMaxSize, errors.count(_ == Errors.GROUP_MAX_SIZE_REACHED)) + + // Members which were accepted can rejoin, others are rejected, while + // completing rebalance + val memberIds = joinGroupResults.map(_.memberId) + futures = instanceIds.zip(memberIds).map { case (instanceId, memberId) => + EasyMock.reset(replicaManager) + sendJoinGroup(groupId, memberId, protocolType, protocols, + instanceId, DefaultSessionTimeout, DefaultRebalanceTimeout) + } + + // Awaiting results + val rejoinErrors = futures.map(await(_, 1).error) + + assertEquals(errors, rejoinErrors) + } + + @Test + def testDynamicMembersCanReJoinGroupWithMaxSizeWhileRebalancing(): Unit = { + val requiredKnownMemberId = true + val nbMembers = GroupMaxSize + 1 + + // First JoinRequests + var futures = 1.to(nbMembers).map { _ => + EasyMock.reset(replicaManager) + sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, + None, DefaultSessionTimeout, DefaultRebalanceTimeout, requiredKnownMemberId) + } + + // Get back the assigned member ids + val memberIds = futures.map(await(_, 1).memberId) + + // Second JoinRequests + memberIds.map { memberId => + EasyMock.reset(replicaManager) + sendJoinGroup(groupId, memberId, protocolType, protocols, + None, DefaultSessionTimeout, DefaultRebalanceTimeout, requiredKnownMemberId) + } + + // Members can rejoin while rebalancing + futures = memberIds.map { memberId => + EasyMock.reset(replicaManager) + sendJoinGroup(groupId, memberId, protocolType, protocols, + None, DefaultSessionTimeout, DefaultRebalanceTimeout, requiredKnownMemberId) + } + + // advance clock by GroupInitialRebalanceDelay to complete first InitialDelayedJoin + timer.advanceClock(GroupInitialRebalanceDelay + 1) + // advance clock by GroupInitialRebalanceDelay to complete second InitialDelayedJoin + timer.advanceClock(GroupInitialRebalanceDelay + 1) + + // Awaiting results + val errors = futures.map(await(_, DefaultRebalanceTimeout + 1).error) + + assertEquals(GroupMaxSize, errors.count(_ == Errors.NONE)) + assertEquals(nbMembers-GroupMaxSize, errors.count(_ == Errors.GROUP_MAX_SIZE_REACHED)) + } + + @Test + def testLastJoiningMembersAreKickedOutWhenReJoiningGroupWithMaxSize(): Unit = { + val nbMembers = GroupMaxSize + 2 + val group = new GroupMetadata(groupId, Stable, new MockTime()) + val memberIds = 1.to(nbMembers).map(_ => group.generateMemberId(ClientId, None)) + + memberIds.foreach { memberId => + group.add(new MemberMetadata(memberId, groupId, None, ClientId, ClientHost, + DefaultRebalanceTimeout, GroupMaxSessionTimeout, protocolType, protocols)) + } + groupCoordinator.groupManager.addGroup(group) + + groupCoordinator.prepareRebalance(group, "") + + val futures = memberIds.map { memberId => + EasyMock.reset(replicaManager) + sendJoinGroup(groupId, memberId, protocolType, protocols, + None, GroupMaxSessionTimeout, DefaultRebalanceTimeout) + } + + // advance clock by GroupInitialRebalanceDelay to complete first InitialDelayedJoin + timer.advanceClock(DefaultRebalanceTimeout + 1) + + // Awaiting results + val errors = futures.map(await(_, DefaultRebalanceTimeout + 1).error) + + assertEquals(Set(Errors.NONE), errors.take(GroupMaxSize).toSet) + assertEquals(Set(Errors.GROUP_MAX_SIZE_REACHED), errors.drop(GroupMaxSize).toSet) + + memberIds.drop(GroupMaxSize).foreach { memberId => + assertFalse(group.has(memberId)) + } + } + @Test def testJoinGroupSessionTimeoutTooSmall(): Unit = { val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID