KAFKA-19453: Ignore group not found in share group record replay. (#20076)
CI / build (push) Waiting to run Details

* When a `ShareGroup*` record is replayed in group
metadata manager, there is a call to check if the group exists. If the
group does not exist - we are throwing an exception which is
unnecessary.
* In this PR, we have added check to ignore this exception.
* New test to validate the logic has been added.

Reviewers: Andrew Schofield <aschofield@confluent.io>, Dongnuo Lyu
<139248811+dongnuo123@users.noreply.github.com>
This commit is contained in:
Sushant Mahajan 2025-07-02 14:40:14 +05:30 committed by GitHub
parent 14ea11dc31
commit 28c53ba09a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 192 additions and 19 deletions

View File

@ -5356,16 +5356,26 @@ public class GroupMetadataManager {
String groupId = key.groupId();
String memberId = key.memberId();
ShareGroup shareGroup = getOrMaybeCreatePersistedShareGroup(groupId, value != null);
ShareGroup shareGroup;
ShareGroupMember oldMember;
try {
shareGroup = getOrMaybeCreatePersistedShareGroup(groupId, value != null);
oldMember = shareGroup.getOrMaybeCreateMember(memberId, value != null);
} catch (GroupIdNotFoundException ex) {
log.debug("ShareGroupMemberMetadata tombstone without group - {}", groupId, ex);
return;
} catch (UnknownMemberIdException ex) {
log.debug("ShareGroupMemberMetadata tombstone for groupId - {} without member - {}", groupId, memberId, ex);
return;
}
Set<String> oldSubscribedTopicNames = new HashSet<>(shareGroup.subscribedTopicNames().keySet());
if (value != null) {
ShareGroupMember oldMember = shareGroup.getOrMaybeCreateMember(memberId, true);
shareGroup.updateMember(new ShareGroupMember.Builder(oldMember)
.updateWith(value)
.build());
} else {
ShareGroupMember oldMember = shareGroup.getOrMaybeCreateMember(memberId, false);
if (oldMember.memberEpoch() != LEAVE_GROUP_MEMBER_EPOCH) {
throw new IllegalStateException("Received a tombstone record to delete member " + memberId
+ " with invalid leave group epoch.");
@ -5394,12 +5404,18 @@ public class GroupMetadataManager {
) {
String groupId = key.groupId();
ShareGroup shareGroup;
try {
shareGroup = getOrMaybeCreatePersistedShareGroup(groupId, value != null);
} catch (GroupIdNotFoundException ex) {
log.debug("ShareGroupMetadata tombstone without group - {}", groupId, ex);
return;
}
if (value != null) {
ShareGroup shareGroup = getOrMaybeCreatePersistedShareGroup(groupId, true);
shareGroup.setGroupEpoch(value.epoch());
shareGroup.setMetadataHash(value.metadataHash());
} else {
ShareGroup shareGroup = getOrMaybeCreatePersistedShareGroup(groupId, false);
if (!shareGroup.members().isEmpty()) {
throw new IllegalStateException("Received a tombstone record to delete group " + groupId
+ " but the group still has " + shareGroup.members().size() + " members.");
@ -5591,7 +5607,14 @@ public class GroupMetadataManager {
) {
String groupId = key.groupId();
String memberId = key.memberId();
ShareGroup group = getOrMaybeCreatePersistedShareGroup(groupId, false);
ShareGroup group;
try {
group = getOrMaybeCreatePersistedShareGroup(groupId, value != null);
} catch (GroupIdNotFoundException ex) {
log.debug("ShareGroupTargetAssignmentMember tombstone without group - {}", groupId, ex);
return;
}
if (value != null) {
group.updateTargetAssignment(memberId, Assignment.fromRecord(value));
@ -5613,7 +5636,14 @@ public class GroupMetadataManager {
ShareGroupTargetAssignmentMetadataValue value
) {
String groupId = key.groupId();
ShareGroup group = getOrMaybeCreatePersistedShareGroup(groupId, false);
ShareGroup group;
try {
group = getOrMaybeCreatePersistedShareGroup(groupId, value != null);
} catch (GroupIdNotFoundException ex) {
log.debug("ShareGroupTargetAssignmentMetadata tombstone without group - {}", groupId, ex);
return;
}
if (value != null) {
group.setTargetAssignmentEpoch(value.assignmentEpoch());
@ -5640,20 +5670,31 @@ public class GroupMetadataManager {
String groupId = key.groupId();
String memberId = key.memberId();
ShareGroup group = getOrMaybeCreatePersistedShareGroup(groupId, false);
ShareGroupMember oldMember = group.getOrMaybeCreateMember(memberId, false);
ShareGroup group;
ShareGroupMember oldMember;
try {
group = getOrMaybeCreatePersistedShareGroup(groupId, value != null);
oldMember = group.getOrMaybeCreateMember(memberId, value != null);
} catch (GroupIdNotFoundException ex) {
log.debug("ShareGroupCurrentMemberAssignment tombstone without group - {}", groupId, ex);
return;
} catch (UnknownMemberIdException ex) {
log.debug("ShareGroupCurrentMemberAssignment tombstone for groupId - {} without member - {}", groupId, memberId, ex);
return;
}
if (value != null) {
ShareGroupMember newMember = new ShareGroupMember.Builder(oldMember)
.updateWith(value)
.build();
.updateWith(value)
.build();
group.updateMember(newMember);
} else {
ShareGroupMember newMember = new ShareGroupMember.Builder(oldMember)
.setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH)
.setPreviousMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH)
.setAssignedPartitions(Map.of())
.build();
.setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH)
.setPreviousMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH)
.setAssignedPartitions(Map.of())
.build();
group.updateMember(newMember);
}
}
@ -5671,12 +5712,16 @@ public class GroupMetadataManager {
) {
String groupId = key.groupId();
getOrMaybeCreatePersistedShareGroup(groupId, false);
// Update timeline structures with info about initialized/deleted topics.
try {
getOrMaybeCreatePersistedShareGroup(groupId, value != null);
} catch (GroupIdNotFoundException ex) {
// Ignore tombstone if group not found.
log.debug("ShareGroupStatePartitionMetadata tombstone for non-existent share group {}", groupId, ex);
}
if (value == null) {
// Tombstone!
shareGroupStatePartitionMetadata.remove(groupId);
shareGroupStatePartitionMetadata.remove(groupId); // Should not throw any exceptions.
} else {
long timestamp = time.milliseconds();
ShareGroupStatePartitionMetadataInfo info = new ShareGroupStatePartitionMetadataInfo(

View File

@ -95,10 +95,18 @@ import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataV
import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupCurrentMemberAssignmentKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupCurrentMemberAssignmentValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataValue;
import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue.Endpoint;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
import org.apache.kafka.coordinator.group.modern.Assignment;
@ -204,6 +212,7 @@ import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.STABL
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CLASSIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CONSUMER_GROUP_REBALANCES_SENSOR_NAME;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.SHARE_GROUP_REBALANCES_SENSOR_NAME;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
@ -215,6 +224,7 @@ import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ -22895,6 +22905,124 @@ public class GroupMetadataManagerTest {
assertEquals(Map.of(t1Id, new InitMapValue(t1Name, Set.of(0), 1)), GroupMetadataManager.combineInitMaps(m1, m2));
}
private static Stream<CoordinatorRecord> shareGroupRecords() {
String groupId = "groupId";
String memberId = Uuid.randomUuid().toString();
return Stream.of(
// Tombstones
CoordinatorRecord.tombstone(
new ShareGroupMemberMetadataKey()
.setGroupId(groupId)
.setMemberId(memberId)
),
CoordinatorRecord.tombstone(
new ShareGroupMetadataKey()
.setGroupId(groupId)
),
CoordinatorRecord.tombstone(
new ShareGroupTargetAssignmentMemberKey()
.setGroupId(groupId)
.setMemberId(memberId)
),
CoordinatorRecord.tombstone(
new ShareGroupTargetAssignmentMetadataKey()
.setGroupId(groupId)
),
CoordinatorRecord.tombstone(
new ShareGroupCurrentMemberAssignmentKey()
.setGroupId(groupId)
.setMemberId(memberId)
),
CoordinatorRecord.tombstone(
new ShareGroupStatePartitionMetadataKey()
.setGroupId(groupId)
),
// Data
CoordinatorRecord.record(
new ShareGroupMemberMetadataKey()
.setGroupId(groupId)
.setMemberId(memberId),
new ApiMessageAndVersion(
new ShareGroupMemberMetadataValue()
.setSubscribedTopicNames(List.of("tp1")),
(short) 10
)
),
CoordinatorRecord.record(
new ShareGroupMetadataKey()
.setGroupId(groupId),
new ApiMessageAndVersion(
new ShareGroupMetadataValue()
.setEpoch(1)
.setMetadataHash(2L),
(short) 11
)
),
CoordinatorRecord.record(
new ShareGroupTargetAssignmentMetadataKey()
.setGroupId(groupId),
new ApiMessageAndVersion(
new ShareGroupTargetAssignmentMetadataValue()
.setAssignmentEpoch(5),
(short) 12
)
),
CoordinatorRecord.record(
new ShareGroupTargetAssignmentMemberKey()
.setGroupId(groupId)
.setMemberId(memberId),
new ApiMessageAndVersion(new ShareGroupTargetAssignmentMemberValue()
.setTopicPartitions(List.of(
new ShareGroupTargetAssignmentMemberValue.TopicPartition()
.setTopicId(Uuid.randomUuid())
.setPartitions(List.of(0, 1, 2))
)),
(short) 13
)
),
CoordinatorRecord.record(
new ShareGroupCurrentMemberAssignmentKey()
.setGroupId(groupId)
.setMemberId(memberId),
new ApiMessageAndVersion(new ShareGroupCurrentMemberAssignmentValue()
.setAssignedPartitions(List.of(
new ShareGroupCurrentMemberAssignmentValue.TopicPartitions()
.setTopicId(Uuid.randomUuid())
.setPartitions(List.of(0, 1, 2))
)
)
.setMemberEpoch(5)
.setPreviousMemberEpoch(4)
.setState((byte) 0),
(short) 14
)
),
CoordinatorRecord.record(
new ShareGroupStatePartitionMetadataKey()
.setGroupId(groupId),
new ApiMessageAndVersion(new ShareGroupStatePartitionMetadataValue()
.setInitializingTopics(List.of())
.setInitializedTopics(List.of())
.setDeletingTopics(List.of()),
(short) 15
)
)
);
}
@ParameterizedTest
@MethodSource("shareGroupRecords")
public void testShareGroupRecordsNoExceptionOnReplay(CoordinatorRecord record) {
MockPartitionAssignor assignor = new MockPartitionAssignor("simple");
assignor.prepareGroupAssignment(new GroupAssignment(Map.of()));
GroupMetadataManagerTestContext context = spy(new GroupMetadataManagerTestContext.Builder()
.withShareGroupAssignor(assignor)
.build());
assertDoesNotThrow(() -> context.replay(record));
}
private static void checkJoinGroupResponse(
JoinGroupResponseData expectedResponse,
JoinGroupResponseData actualResponse,