MINOR: Add LEAVE_GROUP_EPOCH to GroupMetadataManager (#14463)

Replacing the use a hardcoded -1 with a constant (`LEAVE_GROUP_EPOCH`) that provides more clarity. Since static members also have a magic number (-2), this will motivate future commits to use constants instead of hardcoded values.

Reviewers: Sagar Rao <sagarmeansocean@gmail.com>, David Jacot <djacot@confluent.io>
This commit is contained in:
Kirk True 2023-10-04 03:09:16 -07:00 committed by GitHub
parent a12f9f97c9
commit 59e59fc545
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 18 additions and 12 deletions

View File

@ -26,6 +26,11 @@ import java.nio.ByteBuffer;
public class ConsumerGroupHeartbeatRequest extends AbstractRequest { public class ConsumerGroupHeartbeatRequest extends AbstractRequest {
/**
* A member epoch of <code>-1</code> means that the member wants to leave the group.
*/
public static final int LEAVE_GROUP_MEMBER_EPOCH = -1;
public static class Builder extends AbstractRequest.Builder<ConsumerGroupHeartbeatRequest> { public static class Builder extends AbstractRequest.Builder<ConsumerGroupHeartbeatRequest> {
private final ConsumerGroupHeartbeatRequestData data; private final ConsumerGroupHeartbeatRequestData data;

View File

@ -101,6 +101,7 @@ import static org.apache.kafka.common.protocol.Errors.COORDINATOR_NOT_AVAILABLE;
import static org.apache.kafka.common.protocol.Errors.ILLEGAL_GENERATION; import static org.apache.kafka.common.protocol.Errors.ILLEGAL_GENERATION;
import static org.apache.kafka.common.protocol.Errors.NOT_COORDINATOR; import static org.apache.kafka.common.protocol.Errors.NOT_COORDINATOR;
import static org.apache.kafka.common.protocol.Errors.UNKNOWN_SERVER_ERROR; import static org.apache.kafka.common.protocol.Errors.UNKNOWN_SERVER_ERROR;
import static org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
import static org.apache.kafka.common.requests.JoinGroupRequest.UNKNOWN_MEMBER_ID; import static org.apache.kafka.common.requests.JoinGroupRequest.UNKNOWN_MEMBER_ID;
import static org.apache.kafka.coordinator.group.Group.GroupType.CONSUMER; import static org.apache.kafka.coordinator.group.Group.GroupType.CONSUMER;
import static org.apache.kafka.coordinator.group.Group.GroupType.GENERIC; import static org.apache.kafka.coordinator.group.Group.GroupType.GENERIC;
@ -579,7 +580,7 @@ public class GroupMetadataManager {
throwIfEmptyString(request.rackId(), "RackId can't be empty."); throwIfEmptyString(request.rackId(), "RackId can't be empty.");
throwIfNotNull(request.subscribedTopicRegex(), "SubscribedTopicRegex is not supported yet."); throwIfNotNull(request.subscribedTopicRegex(), "SubscribedTopicRegex is not supported yet.");
if (request.memberEpoch() > 0 || request.memberEpoch() == -1) { if (request.memberEpoch() > 0 || request.memberEpoch() == LEAVE_GROUP_MEMBER_EPOCH) {
throwIfEmptyString(request.memberId(), "MemberId can't be empty."); throwIfEmptyString(request.memberId(), "MemberId can't be empty.");
} else if (request.memberEpoch() == 0) { } else if (request.memberEpoch() == 0) {
if (request.rebalanceTimeoutMs() == -1) { if (request.rebalanceTimeoutMs() == -1) {
@ -923,7 +924,7 @@ public class GroupMetadataManager {
List<Record> records = consumerGroupFenceMember(group, member); List<Record> records = consumerGroupFenceMember(group, member);
return new CoordinatorResult<>(records, new ConsumerGroupHeartbeatResponseData() return new CoordinatorResult<>(records, new ConsumerGroupHeartbeatResponseData()
.setMemberId(memberId) .setMemberId(memberId)
.setMemberEpoch(-1)); .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH));
} }
/** /**
@ -1083,8 +1084,7 @@ public class GroupMetadataManager {
) throws ApiException { ) throws ApiException {
throwIfConsumerGroupHeartbeatRequestIsInvalid(request); throwIfConsumerGroupHeartbeatRequestIsInvalid(request);
if (request.memberEpoch() == -1) { if (request.memberEpoch() == LEAVE_GROUP_MEMBER_EPOCH) {
// -1 means that the member wants to leave the group.
return consumerGroupLeave( return consumerGroupLeave(
request.groupId(), request.groupId(),
request.memberId() request.memberId()
@ -1133,7 +1133,7 @@ public class GroupMetadataManager {
.build()); .build());
} else { } else {
ConsumerGroupMember oldMember = consumerGroup.getOrMaybeCreateMember(memberId, false); ConsumerGroupMember oldMember = consumerGroup.getOrMaybeCreateMember(memberId, false);
if (oldMember.memberEpoch() != -1) { if (oldMember.memberEpoch() != LEAVE_GROUP_MEMBER_EPOCH) {
throw new IllegalStateException("Received a tombstone record to delete member " + memberId throw new IllegalStateException("Received a tombstone record to delete member " + memberId
+ " but did not receive ConsumerGroupCurrentMemberAssignmentValue tombstone."); + " but did not receive ConsumerGroupCurrentMemberAssignmentValue tombstone.");
} }
@ -1354,9 +1354,9 @@ public class GroupMetadataManager {
consumerGroup.updateMember(newMember); consumerGroup.updateMember(newMember);
} else { } else {
ConsumerGroupMember newMember = new ConsumerGroupMember.Builder(oldMember) ConsumerGroupMember newMember = new ConsumerGroupMember.Builder(oldMember)
.setMemberEpoch(-1) .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH)
.setPreviousMemberEpoch(-1) .setPreviousMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH)
.setTargetMemberEpoch(-1) .setTargetMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH)
.setAssignedPartitions(Collections.emptyMap()) .setAssignedPartitions(Collections.emptyMap())
.setPartitionsPendingRevocation(Collections.emptyMap()) .setPartitionsPendingRevocation(Collections.emptyMap())
.setPartitionsPendingAssignment(Collections.emptyMap()) .setPartitionsPendingAssignment(Collections.emptyMap())

View File

@ -124,6 +124,7 @@ import java.util.stream.Stream;
import static org.apache.kafka.common.utils.Utils.mkSet; import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol; import static org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol;
import static org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection; import static org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection;
import static org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
import static org.apache.kafka.common.requests.JoinGroupRequest.UNKNOWN_MEMBER_ID; import static org.apache.kafka.common.requests.JoinGroupRequest.UNKNOWN_MEMBER_ID;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment; import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
@ -1859,7 +1860,7 @@ public class GroupMetadataManagerTest {
new ConsumerGroupHeartbeatRequestData() new ConsumerGroupHeartbeatRequestData()
.setGroupId(groupId) .setGroupId(groupId)
.setMemberId(memberId2) .setMemberId(memberId2)
.setMemberEpoch(-1) .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH)
.setRebalanceTimeoutMs(5000) .setRebalanceTimeoutMs(5000)
.setSubscribedTopicNames(Arrays.asList("foo", "bar")) .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
.setTopicPartitions(Collections.emptyList())); .setTopicPartitions(Collections.emptyList()));
@ -1867,7 +1868,7 @@ public class GroupMetadataManagerTest {
assertResponseEquals( assertResponseEquals(
new ConsumerGroupHeartbeatResponseData() new ConsumerGroupHeartbeatResponseData()
.setMemberId(memberId2) .setMemberId(memberId2)
.setMemberEpoch(-1), .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH),
result.response() result.response()
); );
@ -3225,8 +3226,8 @@ public class GroupMetadataManagerTest {
new ConsumerGroupHeartbeatRequestData() new ConsumerGroupHeartbeatRequestData()
.setGroupId(groupId) .setGroupId(groupId)
.setMemberId(memberId) .setMemberId(memberId)
.setMemberEpoch(-1)); .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH));
assertEquals(-1, result.response().memberEpoch()); assertEquals(LEAVE_GROUP_MEMBER_EPOCH, result.response().memberEpoch());
// Verify that there are no timers. // Verify that there are no timers.
context.assertNoSessionTimeout(groupId, memberId); context.assertNoSessionTimeout(groupId, memberId);