mirror of https://github.com/apache/kafka.git
KAFKA-19453: Ignore group not found in share group record replay (#20100)
* When a `ShareGroup*` record is replayed in group metadata manager, there is a call to check if the group exists. If the group does not exist - we are throwing an exception which is unnecessary. * In this PR, we have added check to ignore this exception. * New test to validate the logic has been added. Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com> Note: cherry pick from PR https://github.com/apache/kafka/pull/20076 in trunk.
This commit is contained in:
parent
d02028d773
commit
71f5600283
|
@ -5358,16 +5358,26 @@ public class GroupMetadataManager {
|
|||
String groupId = key.groupId();
|
||||
String memberId = key.memberId();
|
||||
|
||||
ShareGroup shareGroup = getOrMaybeCreatePersistedShareGroup(groupId, value != null);
|
||||
ShareGroup shareGroup;
|
||||
ShareGroupMember oldMember;
|
||||
try {
|
||||
shareGroup = getOrMaybeCreatePersistedShareGroup(groupId, value != null);
|
||||
oldMember = shareGroup.getOrMaybeCreateMember(memberId, value != null);
|
||||
} catch (GroupIdNotFoundException ex) {
|
||||
log.debug("ShareGroupMemberMetadata tombstone without group - {}", groupId, ex);
|
||||
return;
|
||||
} catch (UnknownMemberIdException ex) {
|
||||
log.debug("ShareGroupMemberMetadata tombstone for groupId - {} without member - {}", groupId, memberId, ex);
|
||||
return;
|
||||
}
|
||||
|
||||
Set<String> oldSubscribedTopicNames = new HashSet<>(shareGroup.subscribedTopicNames().keySet());
|
||||
|
||||
if (value != null) {
|
||||
ShareGroupMember oldMember = shareGroup.getOrMaybeCreateMember(memberId, true);
|
||||
shareGroup.updateMember(new ShareGroupMember.Builder(oldMember)
|
||||
.updateWith(value)
|
||||
.build());
|
||||
} else {
|
||||
ShareGroupMember oldMember = shareGroup.getOrMaybeCreateMember(memberId, false);
|
||||
if (oldMember.memberEpoch() != LEAVE_GROUP_MEMBER_EPOCH) {
|
||||
throw new IllegalStateException("Received a tombstone record to delete member " + memberId
|
||||
+ " with invalid leave group epoch.");
|
||||
|
@ -5396,12 +5406,18 @@ public class GroupMetadataManager {
|
|||
) {
|
||||
String groupId = key.groupId();
|
||||
|
||||
ShareGroup shareGroup;
|
||||
try {
|
||||
shareGroup = getOrMaybeCreatePersistedShareGroup(groupId, value != null);
|
||||
} catch (GroupIdNotFoundException ex) {
|
||||
log.debug("ShareGroupMetadata tombstone without group - {}", groupId, ex);
|
||||
return;
|
||||
}
|
||||
|
||||
if (value != null) {
|
||||
ShareGroup shareGroup = getOrMaybeCreatePersistedShareGroup(groupId, true);
|
||||
shareGroup.setGroupEpoch(value.epoch());
|
||||
shareGroup.setMetadataHash(value.metadataHash());
|
||||
} else {
|
||||
ShareGroup shareGroup = getOrMaybeCreatePersistedShareGroup(groupId, false);
|
||||
if (!shareGroup.members().isEmpty()) {
|
||||
throw new IllegalStateException("Received a tombstone record to delete group " + groupId
|
||||
+ " but the group still has " + shareGroup.members().size() + " members.");
|
||||
|
@ -5593,7 +5609,14 @@ public class GroupMetadataManager {
|
|||
) {
|
||||
String groupId = key.groupId();
|
||||
String memberId = key.memberId();
|
||||
ShareGroup group = getOrMaybeCreatePersistedShareGroup(groupId, false);
|
||||
|
||||
ShareGroup group;
|
||||
try {
|
||||
group = getOrMaybeCreatePersistedShareGroup(groupId, value != null);
|
||||
} catch (GroupIdNotFoundException ex) {
|
||||
log.debug("ShareGroupTargetAssignmentMember tombstone without group - {}", groupId, ex);
|
||||
return;
|
||||
}
|
||||
|
||||
if (value != null) {
|
||||
group.updateTargetAssignment(memberId, Assignment.fromRecord(value));
|
||||
|
@ -5615,7 +5638,14 @@ public class GroupMetadataManager {
|
|||
ShareGroupTargetAssignmentMetadataValue value
|
||||
) {
|
||||
String groupId = key.groupId();
|
||||
ShareGroup group = getOrMaybeCreatePersistedShareGroup(groupId, false);
|
||||
|
||||
ShareGroup group;
|
||||
try {
|
||||
group = getOrMaybeCreatePersistedShareGroup(groupId, value != null);
|
||||
} catch (GroupIdNotFoundException ex) {
|
||||
log.debug("ShareGroupTargetAssignmentMetadata tombstone without group - {}", groupId, ex);
|
||||
return;
|
||||
}
|
||||
|
||||
if (value != null) {
|
||||
group.setTargetAssignmentEpoch(value.assignmentEpoch());
|
||||
|
@ -5642,8 +5672,19 @@ public class GroupMetadataManager {
|
|||
String groupId = key.groupId();
|
||||
String memberId = key.memberId();
|
||||
|
||||
ShareGroup group = getOrMaybeCreatePersistedShareGroup(groupId, false);
|
||||
ShareGroupMember oldMember = group.getOrMaybeCreateMember(memberId, false);
|
||||
ShareGroup group;
|
||||
ShareGroupMember oldMember;
|
||||
|
||||
try {
|
||||
group = getOrMaybeCreatePersistedShareGroup(groupId, value != null);
|
||||
oldMember = group.getOrMaybeCreateMember(memberId, value != null);
|
||||
} catch (GroupIdNotFoundException ex) {
|
||||
log.debug("ShareGroupCurrentMemberAssignment tombstone without group - {}", groupId, ex);
|
||||
return;
|
||||
} catch (UnknownMemberIdException ex) {
|
||||
log.debug("ShareGroupCurrentMemberAssignment tombstone for groupId - {} without member - {}", groupId, memberId, ex);
|
||||
return;
|
||||
}
|
||||
|
||||
if (value != null) {
|
||||
ShareGroupMember newMember = new ShareGroupMember.Builder(oldMember)
|
||||
|
@ -5673,12 +5714,16 @@ public class GroupMetadataManager {
|
|||
) {
|
||||
String groupId = key.groupId();
|
||||
|
||||
getOrMaybeCreatePersistedShareGroup(groupId, false);
|
||||
|
||||
// Update timeline structures with info about initialized/deleted topics.
|
||||
try {
|
||||
getOrMaybeCreatePersistedShareGroup(groupId, value != null);
|
||||
} catch (GroupIdNotFoundException ex) {
|
||||
// Ignore tombstone if group not found.
|
||||
log.debug("ShareGroupStatePartitionMetadata tombstone for non-existent share group {}", groupId, ex);
|
||||
}
|
||||
|
||||
if (value == null) {
|
||||
// Tombstone!
|
||||
shareGroupStatePartitionMetadata.remove(groupId);
|
||||
shareGroupStatePartitionMetadata.remove(groupId); // Should not throw any exceptions.
|
||||
} else {
|
||||
long timestamp = time.milliseconds();
|
||||
ShareGroupStatePartitionMetadataInfo info = new ShareGroupStatePartitionMetadataInfo(
|
||||
|
|
|
@ -95,10 +95,18 @@ import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataV
|
|||
import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
|
||||
import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
|
||||
import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
|
||||
import org.apache.kafka.coordinator.group.generated.ShareGroupCurrentMemberAssignmentKey;
|
||||
import org.apache.kafka.coordinator.group.generated.ShareGroupCurrentMemberAssignmentValue;
|
||||
import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataKey;
|
||||
import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataValue;
|
||||
import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataKey;
|
||||
import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataValue;
|
||||
import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataKey;
|
||||
import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataValue;
|
||||
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberKey;
|
||||
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberValue;
|
||||
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataKey;
|
||||
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataValue;
|
||||
import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue.Endpoint;
|
||||
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
|
||||
import org.apache.kafka.coordinator.group.modern.Assignment;
|
||||
|
@ -204,6 +212,7 @@ import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.STABL
|
|||
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CLASSIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME;
|
||||
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CONSUMER_GROUP_REBALANCES_SENSOR_NAME;
|
||||
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.SHARE_GROUP_REBALANCES_SENSOR_NAME;
|
||||
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotEquals;
|
||||
|
@ -215,6 +224,7 @@ import static org.junit.jupiter.api.Assertions.fail;
|
|||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
@ -22895,6 +22905,124 @@ public class GroupMetadataManagerTest {
|
|||
assertEquals(Map.of(t1Id, new InitMapValue(t1Name, Set.of(0), 1)), GroupMetadataManager.combineInitMaps(m1, m2));
|
||||
}
|
||||
|
||||
private static Stream<CoordinatorRecord> shareGroupRecords() {
|
||||
String groupId = "groupId";
|
||||
String memberId = Uuid.randomUuid().toString();
|
||||
|
||||
return Stream.of(
|
||||
// Tombstones
|
||||
CoordinatorRecord.tombstone(
|
||||
new ShareGroupMemberMetadataKey()
|
||||
.setGroupId(groupId)
|
||||
.setMemberId(memberId)
|
||||
),
|
||||
CoordinatorRecord.tombstone(
|
||||
new ShareGroupMetadataKey()
|
||||
.setGroupId(groupId)
|
||||
),
|
||||
CoordinatorRecord.tombstone(
|
||||
new ShareGroupTargetAssignmentMemberKey()
|
||||
.setGroupId(groupId)
|
||||
.setMemberId(memberId)
|
||||
),
|
||||
CoordinatorRecord.tombstone(
|
||||
new ShareGroupTargetAssignmentMetadataKey()
|
||||
.setGroupId(groupId)
|
||||
),
|
||||
CoordinatorRecord.tombstone(
|
||||
new ShareGroupCurrentMemberAssignmentKey()
|
||||
.setGroupId(groupId)
|
||||
.setMemberId(memberId)
|
||||
),
|
||||
CoordinatorRecord.tombstone(
|
||||
new ShareGroupStatePartitionMetadataKey()
|
||||
.setGroupId(groupId)
|
||||
),
|
||||
// Data
|
||||
CoordinatorRecord.record(
|
||||
new ShareGroupMemberMetadataKey()
|
||||
.setGroupId(groupId)
|
||||
.setMemberId(memberId),
|
||||
new ApiMessageAndVersion(
|
||||
new ShareGroupMemberMetadataValue()
|
||||
.setSubscribedTopicNames(List.of("tp1")),
|
||||
(short) 10
|
||||
)
|
||||
),
|
||||
CoordinatorRecord.record(
|
||||
new ShareGroupMetadataKey()
|
||||
.setGroupId(groupId),
|
||||
new ApiMessageAndVersion(
|
||||
new ShareGroupMetadataValue()
|
||||
.setEpoch(1)
|
||||
.setMetadataHash(2L),
|
||||
(short) 11
|
||||
)
|
||||
),
|
||||
CoordinatorRecord.record(
|
||||
new ShareGroupTargetAssignmentMetadataKey()
|
||||
.setGroupId(groupId),
|
||||
new ApiMessageAndVersion(
|
||||
new ShareGroupTargetAssignmentMetadataValue()
|
||||
.setAssignmentEpoch(5),
|
||||
(short) 12
|
||||
)
|
||||
),
|
||||
CoordinatorRecord.record(
|
||||
new ShareGroupTargetAssignmentMemberKey()
|
||||
.setGroupId(groupId)
|
||||
.setMemberId(memberId),
|
||||
new ApiMessageAndVersion(new ShareGroupTargetAssignmentMemberValue()
|
||||
.setTopicPartitions(List.of(
|
||||
new ShareGroupTargetAssignmentMemberValue.TopicPartition()
|
||||
.setTopicId(Uuid.randomUuid())
|
||||
.setPartitions(List.of(0, 1, 2))
|
||||
)),
|
||||
(short) 13
|
||||
)
|
||||
),
|
||||
CoordinatorRecord.record(
|
||||
new ShareGroupCurrentMemberAssignmentKey()
|
||||
.setGroupId(groupId)
|
||||
.setMemberId(memberId),
|
||||
new ApiMessageAndVersion(new ShareGroupCurrentMemberAssignmentValue()
|
||||
.setAssignedPartitions(List.of(
|
||||
new ShareGroupCurrentMemberAssignmentValue.TopicPartitions()
|
||||
.setTopicId(Uuid.randomUuid())
|
||||
.setPartitions(List.of(0, 1, 2))
|
||||
)
|
||||
)
|
||||
.setMemberEpoch(5)
|
||||
.setPreviousMemberEpoch(4)
|
||||
.setState((byte) 0),
|
||||
(short) 14
|
||||
)
|
||||
),
|
||||
CoordinatorRecord.record(
|
||||
new ShareGroupStatePartitionMetadataKey()
|
||||
.setGroupId(groupId),
|
||||
new ApiMessageAndVersion(new ShareGroupStatePartitionMetadataValue()
|
||||
.setInitializingTopics(List.of())
|
||||
.setInitializedTopics(List.of())
|
||||
.setDeletingTopics(List.of()),
|
||||
(short) 15
|
||||
)
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("shareGroupRecords")
|
||||
public void testShareGroupRecordsNoExceptionOnReplay(CoordinatorRecord record) {
|
||||
MockPartitionAssignor assignor = new MockPartitionAssignor("simple");
|
||||
assignor.prepareGroupAssignment(new GroupAssignment(Map.of()));
|
||||
GroupMetadataManagerTestContext context = spy(new GroupMetadataManagerTestContext.Builder()
|
||||
.withShareGroupAssignor(assignor)
|
||||
.build());
|
||||
|
||||
assertDoesNotThrow(() -> context.replay(record));
|
||||
}
|
||||
|
||||
private static void checkJoinGroupResponse(
|
||||
JoinGroupResponseData expectedResponse,
|
||||
JoinGroupResponseData actualResponse,
|
||||
|
|
Loading…
Reference in New Issue