diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 0879283931a..ab186575e79 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -5356,16 +5356,26 @@ public class GroupMetadataManager { String groupId = key.groupId(); String memberId = key.memberId(); - ShareGroup shareGroup = getOrMaybeCreatePersistedShareGroup(groupId, value != null); + ShareGroup shareGroup; + ShareGroupMember oldMember; + try { + shareGroup = getOrMaybeCreatePersistedShareGroup(groupId, value != null); + oldMember = shareGroup.getOrMaybeCreateMember(memberId, value != null); + } catch (GroupIdNotFoundException ex) { + log.debug("ShareGroupMemberMetadata tombstone without group - {}", groupId, ex); + return; + } catch (UnknownMemberIdException ex) { + log.debug("ShareGroupMemberMetadata tombstone for groupId - {} without member - {}", groupId, memberId, ex); + return; + } + Set oldSubscribedTopicNames = new HashSet<>(shareGroup.subscribedTopicNames().keySet()); if (value != null) { - ShareGroupMember oldMember = shareGroup.getOrMaybeCreateMember(memberId, true); shareGroup.updateMember(new ShareGroupMember.Builder(oldMember) .updateWith(value) .build()); } else { - ShareGroupMember oldMember = shareGroup.getOrMaybeCreateMember(memberId, false); if (oldMember.memberEpoch() != LEAVE_GROUP_MEMBER_EPOCH) { throw new IllegalStateException("Received a tombstone record to delete member " + memberId + " with invalid leave group epoch."); @@ -5394,12 +5404,18 @@ public class GroupMetadataManager { ) { String groupId = key.groupId(); + ShareGroup shareGroup; + try { + shareGroup = getOrMaybeCreatePersistedShareGroup(groupId, value != null); + } catch (GroupIdNotFoundException ex) { + log.debug("ShareGroupMetadata tombstone without group - {}", groupId, ex); + return; + } + if (value != null) { - ShareGroup shareGroup = getOrMaybeCreatePersistedShareGroup(groupId, true); shareGroup.setGroupEpoch(value.epoch()); shareGroup.setMetadataHash(value.metadataHash()); } else { - ShareGroup shareGroup = getOrMaybeCreatePersistedShareGroup(groupId, false); if (!shareGroup.members().isEmpty()) { throw new IllegalStateException("Received a tombstone record to delete group " + groupId + " but the group still has " + shareGroup.members().size() + " members."); @@ -5591,7 +5607,14 @@ public class GroupMetadataManager { ) { String groupId = key.groupId(); String memberId = key.memberId(); - ShareGroup group = getOrMaybeCreatePersistedShareGroup(groupId, false); + + ShareGroup group; + try { + group = getOrMaybeCreatePersistedShareGroup(groupId, value != null); + } catch (GroupIdNotFoundException ex) { + log.debug("ShareGroupTargetAssignmentMember tombstone without group - {}", groupId, ex); + return; + } if (value != null) { group.updateTargetAssignment(memberId, Assignment.fromRecord(value)); @@ -5613,7 +5636,14 @@ public class GroupMetadataManager { ShareGroupTargetAssignmentMetadataValue value ) { String groupId = key.groupId(); - ShareGroup group = getOrMaybeCreatePersistedShareGroup(groupId, false); + + ShareGroup group; + try { + group = getOrMaybeCreatePersistedShareGroup(groupId, value != null); + } catch (GroupIdNotFoundException ex) { + log.debug("ShareGroupTargetAssignmentMetadata tombstone without group - {}", groupId, ex); + return; + } if (value != null) { group.setTargetAssignmentEpoch(value.assignmentEpoch()); @@ -5640,20 +5670,31 @@ public class GroupMetadataManager { String groupId = key.groupId(); String memberId = key.memberId(); - ShareGroup group = getOrMaybeCreatePersistedShareGroup(groupId, false); - ShareGroupMember oldMember = group.getOrMaybeCreateMember(memberId, false); + ShareGroup group; + ShareGroupMember oldMember; + + try { + group = getOrMaybeCreatePersistedShareGroup(groupId, value != null); + oldMember = group.getOrMaybeCreateMember(memberId, value != null); + } catch (GroupIdNotFoundException ex) { + log.debug("ShareGroupCurrentMemberAssignment tombstone without group - {}", groupId, ex); + return; + } catch (UnknownMemberIdException ex) { + log.debug("ShareGroupCurrentMemberAssignment tombstone for groupId - {} without member - {}", groupId, memberId, ex); + return; + } if (value != null) { ShareGroupMember newMember = new ShareGroupMember.Builder(oldMember) - .updateWith(value) - .build(); + .updateWith(value) + .build(); group.updateMember(newMember); } else { ShareGroupMember newMember = new ShareGroupMember.Builder(oldMember) - .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH) - .setPreviousMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH) - .setAssignedPartitions(Map.of()) - .build(); + .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH) + .setPreviousMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH) + .setAssignedPartitions(Map.of()) + .build(); group.updateMember(newMember); } } @@ -5671,12 +5712,16 @@ public class GroupMetadataManager { ) { String groupId = key.groupId(); - getOrMaybeCreatePersistedShareGroup(groupId, false); - // Update timeline structures with info about initialized/deleted topics. + try { + getOrMaybeCreatePersistedShareGroup(groupId, value != null); + } catch (GroupIdNotFoundException ex) { + // Ignore tombstone if group not found. + log.debug("ShareGroupStatePartitionMetadata tombstone for non-existent share group {}", groupId, ex); + } + if (value == null) { - // Tombstone! - shareGroupStatePartitionMetadata.remove(groupId); + shareGroupStatePartitionMetadata.remove(groupId); // Should not throw any exceptions. } else { long timestamp = time.milliseconds(); ShareGroupStatePartitionMetadataInfo info = new ShareGroupStatePartitionMetadataInfo( diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index 1cf13c70490..bc5afd7704f 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -95,10 +95,18 @@ import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataV import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; import org.apache.kafka.coordinator.group.generated.GroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ShareGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ShareGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataValue; import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataKey; import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataValue; import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataKey; import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataValue; import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue.Endpoint; import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; import org.apache.kafka.coordinator.group.modern.Assignment; @@ -204,6 +212,7 @@ import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.STABL import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CLASSIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME; import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CONSUMER_GROUP_REBALANCES_SENSOR_NAME; import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.SHARE_GROUP_REBALANCES_SENSOR_NAME; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; @@ -215,6 +224,7 @@ import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -22895,6 +22905,124 @@ public class GroupMetadataManagerTest { assertEquals(Map.of(t1Id, new InitMapValue(t1Name, Set.of(0), 1)), GroupMetadataManager.combineInitMaps(m1, m2)); } + private static Stream shareGroupRecords() { + String groupId = "groupId"; + String memberId = Uuid.randomUuid().toString(); + + return Stream.of( + // Tombstones + CoordinatorRecord.tombstone( + new ShareGroupMemberMetadataKey() + .setGroupId(groupId) + .setMemberId(memberId) + ), + CoordinatorRecord.tombstone( + new ShareGroupMetadataKey() + .setGroupId(groupId) + ), + CoordinatorRecord.tombstone( + new ShareGroupTargetAssignmentMemberKey() + .setGroupId(groupId) + .setMemberId(memberId) + ), + CoordinatorRecord.tombstone( + new ShareGroupTargetAssignmentMetadataKey() + .setGroupId(groupId) + ), + CoordinatorRecord.tombstone( + new ShareGroupCurrentMemberAssignmentKey() + .setGroupId(groupId) + .setMemberId(memberId) + ), + CoordinatorRecord.tombstone( + new ShareGroupStatePartitionMetadataKey() + .setGroupId(groupId) + ), + // Data + CoordinatorRecord.record( + new ShareGroupMemberMetadataKey() + .setGroupId(groupId) + .setMemberId(memberId), + new ApiMessageAndVersion( + new ShareGroupMemberMetadataValue() + .setSubscribedTopicNames(List.of("tp1")), + (short) 10 + ) + ), + CoordinatorRecord.record( + new ShareGroupMetadataKey() + .setGroupId(groupId), + new ApiMessageAndVersion( + new ShareGroupMetadataValue() + .setEpoch(1) + .setMetadataHash(2L), + (short) 11 + ) + ), + CoordinatorRecord.record( + new ShareGroupTargetAssignmentMetadataKey() + .setGroupId(groupId), + new ApiMessageAndVersion( + new ShareGroupTargetAssignmentMetadataValue() + .setAssignmentEpoch(5), + (short) 12 + ) + ), + CoordinatorRecord.record( + new ShareGroupTargetAssignmentMemberKey() + .setGroupId(groupId) + .setMemberId(memberId), + new ApiMessageAndVersion(new ShareGroupTargetAssignmentMemberValue() + .setTopicPartitions(List.of( + new ShareGroupTargetAssignmentMemberValue.TopicPartition() + .setTopicId(Uuid.randomUuid()) + .setPartitions(List.of(0, 1, 2)) + )), + (short) 13 + ) + ), + CoordinatorRecord.record( + new ShareGroupCurrentMemberAssignmentKey() + .setGroupId(groupId) + .setMemberId(memberId), + new ApiMessageAndVersion(new ShareGroupCurrentMemberAssignmentValue() + .setAssignedPartitions(List.of( + new ShareGroupCurrentMemberAssignmentValue.TopicPartitions() + .setTopicId(Uuid.randomUuid()) + .setPartitions(List.of(0, 1, 2)) + ) + ) + .setMemberEpoch(5) + .setPreviousMemberEpoch(4) + .setState((byte) 0), + (short) 14 + ) + ), + CoordinatorRecord.record( + new ShareGroupStatePartitionMetadataKey() + .setGroupId(groupId), + new ApiMessageAndVersion(new ShareGroupStatePartitionMetadataValue() + .setInitializingTopics(List.of()) + .setInitializedTopics(List.of()) + .setDeletingTopics(List.of()), + (short) 15 + ) + ) + ); + } + + @ParameterizedTest + @MethodSource("shareGroupRecords") + public void testShareGroupRecordsNoExceptionOnReplay(CoordinatorRecord record) { + MockPartitionAssignor assignor = new MockPartitionAssignor("simple"); + assignor.prepareGroupAssignment(new GroupAssignment(Map.of())); + GroupMetadataManagerTestContext context = spy(new GroupMetadataManagerTestContext.Builder() + .withShareGroupAssignor(assignor) + .build()); + + assertDoesNotThrow(() -> context.replay(record)); + } + private static void checkJoinGroupResponse( JoinGroupResponseData expectedResponse, JoinGroupResponseData actualResponse,