KAFKA-16148: Implement GroupMetadataManager#onUnloaded (#15446)

This patch completes all awaiting futures when a group is unloaded.

Reviewers: David Jacot <djacot@confluent.io>
This commit is contained in:
Jeff Kim 2024-04-02 06:16:02 -04:00 committed by GitHub
parent 7a10f4a17e
commit b3116f4f76
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 172 additions and 13 deletions

View File

@ -639,6 +639,7 @@ public class GroupCoordinatorShard implements CoordinatorShard<Record> {
public void onUnloaded() {
timer.cancel(GROUP_EXPIRATION_KEY);
coordinatorMetrics.deactivateMetricsShard(metricsShard);
groupMetadataManager.onUnloaded();
}
/**

View File

@ -275,6 +275,7 @@ public class GroupMetadataManager {
);
}
}
/**
* The log context.
*/
@ -1920,6 +1921,47 @@ public class GroupMetadataManager {
});
}
/**
* Called when the partition is unloaded.
* ClassicGroup: Complete all awaiting join and sync futures. Transition group to Dead.
*/
public void onUnloaded() {
groups.values().forEach(group -> {
switch (group.type()) {
case CONSUMER:
ConsumerGroup consumerGroup = (ConsumerGroup) group;
log.info("[GroupId={}] Unloaded group metadata for group epoch {}.",
consumerGroup.groupId(), consumerGroup.groupEpoch());
break;
case CLASSIC:
ClassicGroup classicGroup = (ClassicGroup) group;
log.info("[GroupId={}] Unloading group metadata for generation {}.",
classicGroup.groupId(), classicGroup.generationId());
classicGroup.transitionTo(DEAD);
switch (classicGroup.previousState()) {
case EMPTY:
case DEAD:
break;
case PREPARING_REBALANCE:
classicGroup.allMembers().forEach(member -> {
classicGroup.completeJoinFuture(member, new JoinGroupResponseData()
.setMemberId(member.memberId())
.setErrorCode(NOT_COORDINATOR.code()));
});
break;
case COMPLETING_REBALANCE:
case STABLE:
classicGroup.allMembers().forEach(member -> {
classicGroup.completeSyncFuture(member, new SyncGroupResponseData()
.setErrorCode(NOT_COORDINATOR.code()));
});
}
}
});
}
public static String consumerGroupSessionTimeoutKey(String groupId, String memberId) {
return "session-timeout-" + groupId + "-" + memberId;
}
@ -3088,7 +3130,6 @@ public class GroupMetadataManager {
responseFuture.complete(
new JoinGroupResponseData()
.setMembers(Collections.emptyList())
.setMemberId(UNKNOWN_MEMBER_ID)
.setGenerationId(group.generationId())
.setProtocolName(group.protocolName().orElse(null))
@ -3111,7 +3152,6 @@ public class GroupMetadataManager {
);
} else {
group.completeJoinFuture(newMember, new JoinGroupResponseData()
.setMembers(Collections.emptyList())
.setMemberId(newMemberId)
.setGenerationId(group.generationId())
.setProtocolName(group.protocolName().orElse(null))

View File

@ -71,16 +71,6 @@ import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.STABL
*/
public class ClassicGroup implements Group {
/**
* Empty generation.
*/
public static final int NO_GENERATION = -1;
/**
* Protocol with empty name.
*/
public static final String NO_PROTOCOL_NAME = "";
/**
* No leader.
*/
@ -545,7 +535,6 @@ public class ClassicGroup implements Group {
JoinGroupResponseData joinGroupResponse = new JoinGroupResponseData()
.setMembers(Collections.emptyList())
.setMemberId(oldMemberId)
.setGenerationId(NO_GENERATION)
.setProtocolName(null)
.setProtocolType(null)
.setLeader(NO_LEADER)

View File

@ -1056,4 +1056,26 @@ public class GroupCoordinatorShardTest {
assertEquals(records, result.records());
assertNull(result.response());
}
@Test
public void testOnUnloaded() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
Time mockTime = new MockTime();
MockCoordinatorTimer<Void, Record> timer = new MockCoordinatorTimer<>(mockTime);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
offsetMetadataManager,
mockTime,
timer,
GroupCoordinatorConfigTest.createGroupCoordinatorConfig(4096, 1000L, 24 * 60),
mock(CoordinatorMetrics.class),
mock(CoordinatorMetricsShard.class)
);
coordinator.onUnloaded();
assertEquals(0, timer.size());
verify(groupMetadataManager, times(1)).onUnloaded();
}
}

View File

@ -94,6 +94,7 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import static org.apache.kafka.common.protocol.Errors.NOT_COORDINATOR;
import static org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol;
@ -113,6 +114,7 @@ import static org.apache.kafka.coordinator.group.GroupMetadataManager.EMPTY_RESU
import static org.apache.kafka.coordinator.group.GroupMetadataManager.classicGroupHeartbeatKey;
import static org.apache.kafka.coordinator.group.GroupMetadataManager.classicGroupSyncKey;
import static org.apache.kafka.coordinator.group.RecordHelpersTest.mkMapOfPartitionRacks;
import static org.apache.kafka.coordinator.group.classic.ClassicGroupMember.EMPTY_ASSIGNMENT;
import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.COMPLETING_REBALANCE;
import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.DEAD;
import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.EMPTY;
@ -9520,6 +9522,107 @@ public class GroupMetadataManagerTest {
);
}
@Test
public void testClassicGroupOnUnloadedEmptyAndPreparingRebalance() throws Exception {
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.build();
ClassicGroup emptyGroup = context.createClassicGroup("empty-group");
assertTrue(emptyGroup.isInState(EMPTY));
ClassicGroup preparingGroup = context.createClassicGroup("preparing-group");
JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
.withGroupId("preparing-group")
.withMemberId(UNKNOWN_MEMBER_ID)
.withDefaultProtocolTypeAndProtocols()
.build();
// preparing-group should have 2 members.
GroupMetadataManagerTestContext.JoinResult joinResult1 = context.sendClassicGroupJoin(request);
GroupMetadataManagerTestContext.JoinResult joinResult2 = context.sendClassicGroupJoin(request);
assertFalse(joinResult1.joinFuture.isDone());
assertFalse(joinResult2.joinFuture.isDone());
assertTrue(preparingGroup.isInState(PREPARING_REBALANCE));
assertEquals(2, preparingGroup.size());
context.onUnloaded();
assertTrue(emptyGroup.isInState(DEAD));
assertTrue(preparingGroup.isInState(DEAD));
assertTrue(joinResult1.joinFuture.isDone());
assertTrue(joinResult2.joinFuture.isDone());
assertEquals(new JoinGroupResponseData()
.setMemberId(joinResult1.joinFuture.get().memberId())
.setMembers(Collections.emptyList())
.setErrorCode(NOT_COORDINATOR.code()), joinResult1.joinFuture.get());
assertEquals(new JoinGroupResponseData()
.setMemberId(joinResult2.joinFuture.get().memberId())
.setMembers(Collections.emptyList())
.setErrorCode(NOT_COORDINATOR.code()), joinResult2.joinFuture.get());
}
@Test
public void testClassicGroupOnUnloadedCompletingRebalance() throws Exception {
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.build();
ClassicGroup group = context.createClassicGroup("group-id");
// Set up a group in with a leader, follower, and a pending member.
// Have the pending member join the group and both the pending member
// and the follower sync. We should have 2 members awaiting sync.
GroupMetadataManagerTestContext.PendingMemberGroupResult pendingGroupResult = context.setupGroupWithPendingMember(group);
String pendingMemberId = pendingGroupResult.pendingMemberResponse.memberId();
// Compete join group for the pending member
JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
.withGroupId("group-id")
.withMemberId(pendingMemberId)
.withDefaultProtocolTypeAndProtocols()
.build();
GroupMetadataManagerTestContext.JoinResult joinResult = context.sendClassicGroupJoin(request);
assertTrue(joinResult.records.isEmpty());
assertTrue(joinResult.joinFuture.isDone());
assertEquals(Errors.NONE.code(), joinResult.joinFuture.get().errorCode());
assertEquals(3, group.allMembers().size());
assertEquals(0, group.numPendingJoinMembers());
// Follower and pending send SyncGroup request.
// Follower and pending member should be awaiting sync while the leader is pending sync.
GroupMetadataManagerTestContext.SyncResult followerSyncResult = context.sendClassicGroupSync(
new GroupMetadataManagerTestContext.SyncGroupRequestBuilder()
.withGroupId("group-id")
.withMemberId(pendingGroupResult.followerId)
.withGenerationId(joinResult.joinFuture.get().generationId())
.build());
GroupMetadataManagerTestContext.SyncResult pendingMemberSyncResult = context.sendClassicGroupSync(
new GroupMetadataManagerTestContext.SyncGroupRequestBuilder()
.withGroupId("group-id")
.withMemberId(pendingMemberId)
.withGenerationId(joinResult.joinFuture.get().generationId())
.build());
assertFalse(followerSyncResult.syncFuture.isDone());
assertFalse(pendingMemberSyncResult.syncFuture.isDone());
assertTrue(group.isInState(COMPLETING_REBALANCE));
context.onUnloaded();
assertTrue(group.isInState(DEAD));
assertTrue(followerSyncResult.syncFuture.isDone());
assertTrue(pendingMemberSyncResult.syncFuture.isDone());
assertEquals(new SyncGroupResponseData()
.setAssignment(EMPTY_ASSIGNMENT)
.setErrorCode(NOT_COORDINATOR.code()), followerSyncResult.syncFuture.get());
assertEquals(new SyncGroupResponseData()
.setAssignment(EMPTY_ASSIGNMENT)
.setErrorCode(NOT_COORDINATOR.code()), pendingMemberSyncResult.syncFuture.get());
}
private static void checkJoinGroupResponse(
JoinGroupResponseData expectedResponse,
JoinGroupResponseData actualResponse,

View File

@ -1274,4 +1274,8 @@ public class GroupMetadataManagerTestContext {
lastWrittenOffset++;
snapshotRegistry.getOrCreateSnapshot(lastWrittenOffset);
}
void onUnloaded() {
groupMetadataManager.onUnloaded();
}
}