|
|
|
@ -489,7 +489,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
|
|
|
|
|
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, member));
|
|
|
|
|
|
|
|
|
|
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 100));
|
|
|
|
|
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 100, 0));
|
|
|
|
|
|
|
|
|
|
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId, mkAssignment(
|
|
|
|
|
mkTopicAssignment(fooTopicId, 1, 2, 3)
|
|
|
|
@ -623,7 +623,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6),
|
|
|
|
|
barTopicName, new TopicMetadata(barTopicId, barTopicName, 3)
|
|
|
|
|
)),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1, 0),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId, mkAssignment(
|
|
|
|
|
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5),
|
|
|
|
|
mkTopicAssignment(barTopicId, 0, 1, 2)
|
|
|
|
@ -721,7 +721,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6),
|
|
|
|
|
barTopicName, new TopicMetadata(barTopicId, barTopicName, 3)
|
|
|
|
|
)),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId, mkAssignment(
|
|
|
|
|
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5),
|
|
|
|
|
mkTopicAssignment(barTopicId, 0, 1, 2)
|
|
|
|
@ -839,7 +839,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
assertUnorderedRecordsEquals(
|
|
|
|
|
List.of(
|
|
|
|
|
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember3)),
|
|
|
|
|
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11)),
|
|
|
|
|
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0)),
|
|
|
|
|
List.of(
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, mkAssignment(
|
|
|
|
|
mkTopicAssignment(fooTopicId, 0, 1),
|
|
|
|
@ -947,7 +947,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6),
|
|
|
|
|
barTopicName, new TopicMetadata(barTopicId, barTopicName, 3)
|
|
|
|
|
)),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11)
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0)
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
assertRecordsEquals(expectedRecords, result.records());
|
|
|
|
@ -1064,7 +1064,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
assertUnorderedRecordsEquals(
|
|
|
|
|
List.of(
|
|
|
|
|
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember3)),
|
|
|
|
|
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11)),
|
|
|
|
|
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0)),
|
|
|
|
|
List.of(
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, mkAssignment(
|
|
|
|
|
mkTopicAssignment(fooTopicId, 0, 1),
|
|
|
|
@ -1434,7 +1434,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6),
|
|
|
|
|
barTopicName, new TopicMetadata(barTopicId, barTopicName, 3)
|
|
|
|
|
)),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, member2RejoinId, mkAssignment(
|
|
|
|
|
mkTopicAssignment(fooTopicId, 3, 4, 5),
|
|
|
|
|
mkTopicAssignment(barTopicId, 0, 1, 2)
|
|
|
|
@ -1627,7 +1627,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6),
|
|
|
|
|
barTopicName, new TopicMetadata(barTopicId, barTopicName, 3)
|
|
|
|
|
)),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11)
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0)
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
assertRecordsEquals(expectedRecords, result.records());
|
|
|
|
@ -1805,7 +1805,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
|
|
|
|
|
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, member));
|
|
|
|
|
|
|
|
|
|
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 100));
|
|
|
|
|
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 100, 0));
|
|
|
|
|
|
|
|
|
|
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId, mkAssignment(
|
|
|
|
|
mkTopicAssignment(fooTopicId, 1, 2, 3)
|
|
|
|
@ -2592,7 +2592,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
.setState(MemberState.STABLE)
|
|
|
|
|
.setSubscribedTopicNames(List.of(fooTopicName))
|
|
|
|
|
.build()));
|
|
|
|
|
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11));
|
|
|
|
|
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0));
|
|
|
|
|
|
|
|
|
|
assertEquals(ConsumerGroup.ConsumerGroupState.ASSIGNING, context.consumerGroupState(groupId));
|
|
|
|
|
|
|
|
|
@ -2749,7 +2749,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
|
|
|
|
|
Map.of(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6))
|
|
|
|
|
),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId, mkAssignment(
|
|
|
|
|
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
|
|
|
|
|
)),
|
|
|
|
@ -2873,7 +2873,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
|
|
|
|
|
Map.of(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6))
|
|
|
|
|
),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId, mkAssignment(
|
|
|
|
|
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
|
|
|
|
|
)),
|
|
|
|
@ -3219,7 +3219,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId, memberId),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId, memberId),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of()),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 2)
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 2, 0)
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
)),
|
|
|
|
@ -3280,7 +3280,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId, memberId),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId, memberId),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of()),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11)
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0)
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
@ -3342,7 +3342,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentTombstoneRecord(groupId, memberId),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newShareGroupMemberSubscriptionTombstoneRecord(groupId, memberId),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newShareGroupSubscriptionMetadataRecord(groupId, Map.of()),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 2)
|
|
|
|
|
GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 2, 0)
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
)),
|
|
|
|
@ -3401,7 +3401,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentTombstoneRecord(groupId, memberId),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newShareGroupMemberSubscriptionTombstoneRecord(groupId, memberId),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newShareGroupSubscriptionMetadataRecord(groupId, Map.of()),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 11)
|
|
|
|
|
GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 11, 0)
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
@ -3482,7 +3482,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId, memberId),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId, memberId),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of()),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 2)
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 2, 0)
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
)),
|
|
|
|
@ -3758,7 +3758,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId, memberId1),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId, memberId1),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId, memberId1),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 3)
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 3, 0)
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
)),
|
|
|
|
@ -8941,13 +8941,13 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
.setProtocol("range")
|
|
|
|
|
.setCurrentStateTimestamp(context.time.milliseconds())));
|
|
|
|
|
// Create one share group record.
|
|
|
|
|
context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(shareGroupId, 6));
|
|
|
|
|
context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(shareGroupId, 6, 0));
|
|
|
|
|
context.commit();
|
|
|
|
|
ClassicGroup classicGroup = context.groupMetadataManager.getOrMaybeCreateClassicGroup(classicGroupId, false);
|
|
|
|
|
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(consumerGroupId, new ConsumerGroupMember.Builder(memberId1)
|
|
|
|
|
.setSubscribedTopicNames(List.of(fooTopicName))
|
|
|
|
|
.build()));
|
|
|
|
|
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(consumerGroupId, 11));
|
|
|
|
|
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(consumerGroupId, 11, 0));
|
|
|
|
|
|
|
|
|
|
// Test list group response without a group state or group type filter.
|
|
|
|
|
Map<String, ListGroupsResponseData.ListedGroup> actualAllGroupMap =
|
|
|
|
@ -9174,7 +9174,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
ConsumerGroupMember.Builder memberBuilder1 = new ConsumerGroupMember.Builder(memberId1)
|
|
|
|
|
.setSubscribedTopicNames(List.of(topicName));
|
|
|
|
|
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(consumerGroupId, memberBuilder1.build()));
|
|
|
|
|
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(consumerGroupId, epoch + 1));
|
|
|
|
|
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(consumerGroupId, epoch + 1, 0));
|
|
|
|
|
|
|
|
|
|
Map<Uuid, Set<Integer>> assignmentMap = Map.of(topicId, Set.of());
|
|
|
|
|
|
|
|
|
@ -9182,7 +9182,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(consumerGroupId, memberBuilder2.build()));
|
|
|
|
|
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(consumerGroupId, memberId2, assignmentMap));
|
|
|
|
|
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(consumerGroupId, memberBuilder2.build()));
|
|
|
|
|
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(consumerGroupId, epoch + 2));
|
|
|
|
|
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(consumerGroupId, epoch + 2, 0));
|
|
|
|
|
|
|
|
|
|
List<ConsumerGroupDescribeResponseData.DescribedGroup> actual = context.groupMetadataManager.consumerGroupDescribe(List.of(consumerGroupId), context.lastCommittedOffset);
|
|
|
|
|
ConsumerGroupDescribeResponseData.DescribedGroup describedGroup = new ConsumerGroupDescribeResponseData.DescribedGroup()
|
|
|
|
@ -9279,7 +9279,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
StreamsGroupMember.Builder memberBuilder1 = streamsGroupMemberBuilderWithDefaults(memberId1);
|
|
|
|
|
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(streamsGroupId, memberBuilder1.build()));
|
|
|
|
|
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(streamsGroupId, memberBuilder1.build()));
|
|
|
|
|
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(streamsGroupId, epoch + 1));
|
|
|
|
|
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(streamsGroupId, epoch + 1, 0));
|
|
|
|
|
|
|
|
|
|
TasksTuple assignment = new TasksTuple(
|
|
|
|
|
Map.of(subtopologyId, Set.of(0, 1)),
|
|
|
|
@ -9291,7 +9291,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(streamsGroupId, memberBuilder2.build()));
|
|
|
|
|
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(streamsGroupId, memberId2, assignment));
|
|
|
|
|
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(streamsGroupId, memberBuilder2.build()));
|
|
|
|
|
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(streamsGroupId, epoch + 2));
|
|
|
|
|
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(streamsGroupId, epoch + 2, 0));
|
|
|
|
|
|
|
|
|
|
List<StreamsGroupDescribeResponseData.DescribedGroup> actual = context.groupMetadataManager.streamsGroupDescribe(List.of(streamsGroupId), context.lastCommittedOffset);
|
|
|
|
|
StreamsGroupDescribeResponseData.DescribedGroup describedGroup = new StreamsGroupDescribeResponseData.DescribedGroup()
|
|
|
|
@ -10169,7 +10169,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
List.of(
|
|
|
|
|
GroupCoordinatorRecordHelpers.newGroupMetadataTombstoneRecord(classicGroupId),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(classicGroupId, expectedMember),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(classicGroupId, 1),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(classicGroupId, 1, 0),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(classicGroupId, memberId, Map.of()),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(classicGroupId, 1),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(classicGroupId, expectedMember)
|
|
|
|
@ -10333,7 +10333,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
|
|
|
|
|
// Create the new consumer group with member 1.
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 0),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 0, 0),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, expectedMember1.assignedPartitions()),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 0),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember1),
|
|
|
|
@ -10348,7 +10348,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
)),
|
|
|
|
|
|
|
|
|
|
// Newly joining member 2 bumps the group epoch. A new target assignment is computed.
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1, 0),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId2, assignor.targetPartitions(memberId2)),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, assignor.targetPartitions(memberId1)),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 1),
|
|
|
|
@ -10548,7 +10548,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember2),
|
|
|
|
|
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 0),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 0, 0),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, expectedMember1.assignedPartitions()),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId2, expectedMember2.assignedPartitions()),
|
|
|
|
|
|
|
|
|
@ -10567,7 +10567,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
)),
|
|
|
|
|
|
|
|
|
|
// Newly joining member 3 bumps the group epoch. A new target assignment is computed.
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1, 0),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, assignor.targetPartitions(memberId1)),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId3, assignor.targetPartitions(memberId3)),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 1),
|
|
|
|
@ -10824,7 +10824,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
|
|
|
|
|
// Create the new consumer group with the static member.
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedClassicMember),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 0),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 0, 0),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId, expectedClassicMember.assignedPartitions()),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 0),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedClassicMember),
|
|
|
|
@ -10846,7 +10846,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 1))),
|
|
|
|
|
|
|
|
|
|
// Newly joining static member bumps the group epoch. A new target assignment is computed.
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1, 0),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, newMemberId, mkAssignment(mkTopicAssignment(fooTopicId, 0))),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 1),
|
|
|
|
|
|
|
|
|
@ -10950,7 +10950,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
|
|
|
|
|
// Create the new consumer group with member 1.
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1, 0),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, expectedMember1.assignedPartitions()),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 1),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember1),
|
|
|
|
@ -10965,7 +10965,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
)),
|
|
|
|
|
|
|
|
|
|
// Newly joining member 2 bumps the group epoch. A new target assignment is computed.
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 2),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 2, 0),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId2, Map.of()),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 2),
|
|
|
|
|
|
|
|
|
@ -11335,7 +11335,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember2),
|
|
|
|
|
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1, 0),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, expectedMember1.assignedPartitions()),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId2, expectedMember2.assignedPartitions()),
|
|
|
|
|
|
|
|
|
@ -11354,7 +11354,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
)),
|
|
|
|
|
|
|
|
|
|
// Newly joining member 3 bumps the group epoch. A new target assignment is computed.
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 2),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 2, 0),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, assignor.targetPartitions(memberId1)),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId3, assignor.targetPartitions(memberId3)),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 2),
|
|
|
|
@ -12432,7 +12432,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2),
|
|
|
|
|
barTopicName, new TopicMetadata(barTopicId, barTopicName, 1)
|
|
|
|
|
))),
|
|
|
|
|
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11)),
|
|
|
|
|
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0)),
|
|
|
|
|
|
|
|
|
|
List.of(
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId, assignor.targetPartitions(memberId)),
|
|
|
|
@ -12593,7 +12593,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2),
|
|
|
|
|
barTopicName, new TopicMetadata(barTopicId, barTopicName, 1)
|
|
|
|
|
)),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0),
|
|
|
|
|
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, newMemberId, Map.of()),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 11),
|
|
|
|
@ -12849,7 +12849,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
assertUnorderedRecordsEquals(
|
|
|
|
|
List.of(
|
|
|
|
|
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember)),
|
|
|
|
|
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11)),
|
|
|
|
|
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0)),
|
|
|
|
|
List.of(
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, mkAssignment(
|
|
|
|
|
mkTopicAssignment(fooTopicId, 0),
|
|
|
|
@ -13080,7 +13080,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
barTopicName, new TopicMetadata(barTopicId, barTopicName, 1),
|
|
|
|
|
zarTopicName, new TopicMetadata(zarTopicId, zarTopicName, 1)
|
|
|
|
|
))),
|
|
|
|
|
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11)),
|
|
|
|
|
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0)),
|
|
|
|
|
|
|
|
|
|
List.of(
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, mkAssignment(
|
|
|
|
@ -13315,7 +13315,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
barTopicName, new TopicMetadata(barTopicId, barTopicName, 1),
|
|
|
|
|
zarTopicName, new TopicMetadata(zarTopicId, zarTopicName, 1)
|
|
|
|
|
))),
|
|
|
|
|
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11)),
|
|
|
|
|
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0)),
|
|
|
|
|
|
|
|
|
|
List.of(
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, mkAssignment(
|
|
|
|
@ -14125,7 +14125,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId, memberId),
|
|
|
|
|
|
|
|
|
|
// The group epoch is bumped.
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11)
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0)
|
|
|
|
|
),
|
|
|
|
|
timeout.result.records()
|
|
|
|
|
);
|
|
|
|
@ -14190,7 +14190,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId, memberId),
|
|
|
|
|
|
|
|
|
|
// The group epoch is bumped.
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11)
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0)
|
|
|
|
|
),
|
|
|
|
|
timeout.result.records()
|
|
|
|
|
);
|
|
|
|
@ -14400,7 +14400,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
// Update subscription metadata.
|
|
|
|
|
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of())),
|
|
|
|
|
// Bump the group epoch.
|
|
|
|
|
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11))
|
|
|
|
|
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0))
|
|
|
|
|
);
|
|
|
|
|
assertUnorderedRecordsEquals(expectedRecords, leaveResult.records());
|
|
|
|
|
|
|
|
|
@ -14510,7 +14510,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
Map.of(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2))
|
|
|
|
|
),
|
|
|
|
|
// Bump the group epoch.
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11)
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0)
|
|
|
|
|
);
|
|
|
|
|
assertEquals(expectedRecords, leaveResult.records());
|
|
|
|
|
}
|
|
|
|
@ -14916,8 +14916,8 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
));
|
|
|
|
|
|
|
|
|
|
List<String> groupIds = List.of("group-id-1", "group-id-2");
|
|
|
|
|
context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupIds.get(0), 100));
|
|
|
|
|
context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupIds.get(1), 15));
|
|
|
|
|
context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupIds.get(0), 100, 0));
|
|
|
|
|
context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupIds.get(1), 15, 0));
|
|
|
|
|
|
|
|
|
|
Uuid topicId = Uuid.randomUuid();
|
|
|
|
|
String topicName = "foo";
|
|
|
|
@ -15224,7 +15224,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6),
|
|
|
|
|
barTopicName, new TopicMetadata(barTopicId, barTopicName, 3)
|
|
|
|
|
)),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 1),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 1, 0),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentRecord(groupId, memberId, mkAssignment(
|
|
|
|
|
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5),
|
|
|
|
|
mkTopicAssignment(barTopicId, 0, 1, 2)
|
|
|
|
@ -15346,7 +15346,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6),
|
|
|
|
|
barTopicName, new TopicMetadata(barTopicId, barTopicName, 3)
|
|
|
|
|
)),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 11)
|
|
|
|
|
GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 11, 0)
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
assertRecordsEquals(expectedRecords, result.records());
|
|
|
|
@ -15371,7 +15371,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
Map.of()
|
|
|
|
|
));
|
|
|
|
|
|
|
|
|
|
context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 100));
|
|
|
|
|
context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 100, 0));
|
|
|
|
|
|
|
|
|
|
Uuid fooTopicId = Uuid.randomUuid();
|
|
|
|
|
String fooTopicName = "foo";
|
|
|
|
@ -15451,7 +15451,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
.withShareGroup(new ShareGroupBuilder(groupId, 10))
|
|
|
|
|
.build();
|
|
|
|
|
|
|
|
|
|
context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 10));
|
|
|
|
|
context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 10, 0));
|
|
|
|
|
|
|
|
|
|
assertEquals(ShareGroup.ShareGroupState.EMPTY, context.shareGroupState(groupId));
|
|
|
|
|
|
|
|
|
@ -15459,7 +15459,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
.setState(MemberState.STABLE)
|
|
|
|
|
.setSubscribedTopicNames(List.of(fooTopicName))
|
|
|
|
|
.build()));
|
|
|
|
|
context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 11));
|
|
|
|
|
context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 11, 0));
|
|
|
|
|
|
|
|
|
|
assertEquals(ShareGroup.ShareGroupState.STABLE, context.shareGroupState(groupId));
|
|
|
|
|
|
|
|
|
@ -15740,7 +15740,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
|
|
|
|
|
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, member));
|
|
|
|
|
|
|
|
|
|
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 100));
|
|
|
|
|
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 100, 0));
|
|
|
|
|
|
|
|
|
|
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId, topology));
|
|
|
|
|
|
|
|
|
@ -15927,7 +15927,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6),
|
|
|
|
|
barTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId, barTopicName, 3)
|
|
|
|
|
)),
|
|
|
|
|
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1),
|
|
|
|
|
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1, 0),
|
|
|
|
|
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId,
|
|
|
|
|
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
|
|
|
|
|
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5),
|
|
|
|
@ -16010,7 +16010,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6)
|
|
|
|
|
)
|
|
|
|
|
),
|
|
|
|
|
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1),
|
|
|
|
|
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1, 0),
|
|
|
|
|
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId, TasksTuple.EMPTY),
|
|
|
|
|
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, 1),
|
|
|
|
|
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, expectedMember)
|
|
|
|
@ -16094,7 +16094,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId, Map.of(
|
|
|
|
|
fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6)
|
|
|
|
|
)),
|
|
|
|
|
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1),
|
|
|
|
|
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1, 0),
|
|
|
|
|
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId, TasksTuple.EMPTY),
|
|
|
|
|
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, 1),
|
|
|
|
|
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, expectedMember)
|
|
|
|
@ -16176,7 +16176,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6),
|
|
|
|
|
barTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId, barTopicName, 3)
|
|
|
|
|
)),
|
|
|
|
|
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1),
|
|
|
|
|
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1, 0),
|
|
|
|
|
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId, TasksTuple.EMPTY),
|
|
|
|
|
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, 1),
|
|
|
|
|
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, expectedMember)
|
|
|
|
@ -16270,7 +16270,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6),
|
|
|
|
|
barTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId, barTopicName, 3)
|
|
|
|
|
)),
|
|
|
|
|
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11),
|
|
|
|
|
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, 0),
|
|
|
|
|
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId, TasksTuple.EMPTY),
|
|
|
|
|
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, 11),
|
|
|
|
|
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, expectedMember)
|
|
|
|
@ -16548,7 +16548,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
|
|
|
|
|
List<CoordinatorRecord> expectedRecords = List.of(
|
|
|
|
|
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, expectedMember),
|
|
|
|
|
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11),
|
|
|
|
|
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, 0),
|
|
|
|
|
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId,
|
|
|
|
|
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
|
|
|
|
|
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5),
|
|
|
|
@ -16650,7 +16650,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6),
|
|
|
|
|
barTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId, barTopicName, changedPartitionCount)
|
|
|
|
|
)),
|
|
|
|
|
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11),
|
|
|
|
|
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, 0),
|
|
|
|
|
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId,
|
|
|
|
|
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
|
|
|
|
|
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5),
|
|
|
|
@ -16824,7 +16824,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord(groupId, memberId2),
|
|
|
|
|
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord(groupId, memberId2),
|
|
|
|
|
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord(groupId, memberId2),
|
|
|
|
|
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11)
|
|
|
|
|
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, 0)
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
assertRecordsEquals(expectedRecords, result.records());
|
|
|
|
@ -17369,7 +17369,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId, topology));
|
|
|
|
|
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, streamsGroupMemberBuilderWithDefaults(memberId1)
|
|
|
|
|
.build()));
|
|
|
|
|
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11));
|
|
|
|
|
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, 0));
|
|
|
|
|
|
|
|
|
|
assertEquals(StreamsGroupState.NOT_READY, context.streamsGroupState(groupId));
|
|
|
|
|
|
|
|
|
@ -17532,7 +17532,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId,
|
|
|
|
|
Map.of(fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6))
|
|
|
|
|
),
|
|
|
|
|
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11),
|
|
|
|
|
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, 0),
|
|
|
|
|
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId,
|
|
|
|
|
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
|
|
|
|
|
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5)
|
|
|
|
@ -17648,7 +17648,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId,
|
|
|
|
|
Map.of(fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6))
|
|
|
|
|
),
|
|
|
|
|
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11),
|
|
|
|
|
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, 0),
|
|
|
|
|
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId,
|
|
|
|
|
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
|
|
|
|
|
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5)
|
|
|
|
@ -17793,7 +17793,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord(groupId, memberId),
|
|
|
|
|
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord(groupId, memberId),
|
|
|
|
|
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord(groupId, memberId),
|
|
|
|
|
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 2)
|
|
|
|
|
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 2, 0)
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
)),
|
|
|
|
@ -18088,7 +18088,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord(groupId, memberId1),
|
|
|
|
|
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord(groupId, memberId1),
|
|
|
|
|
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord(groupId, memberId1),
|
|
|
|
|
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 3)
|
|
|
|
|
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 3, 0)
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
)),
|
|
|
|
@ -18539,7 +18539,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
|
|
|
|
|
// The group still exists but the member is already gone. Replaying the
|
|
|
|
|
// ConsumerGroupMemberMetadata tombstone should be a no-op.
|
|
|
|
|
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord("foo", 10));
|
|
|
|
|
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord("foo", 10, 0));
|
|
|
|
|
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord("foo", "m1"));
|
|
|
|
|
assertThrows(UnknownMemberIdException.class, () -> context.groupMetadataManager.consumerGroup("foo").getOrMaybeCreateMember("m1", false));
|
|
|
|
|
|
|
|
|
@ -18555,7 +18555,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
.build();
|
|
|
|
|
|
|
|
|
|
// The group is created if it does not exist.
|
|
|
|
|
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord("foo", 10));
|
|
|
|
|
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord("foo", 10, 0));
|
|
|
|
|
assertEquals(10, context.groupMetadataManager.consumerGroup("foo").groupEpoch());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -18667,7 +18667,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
|
|
|
|
|
// The group still exists but the member is already gone. Replaying the
|
|
|
|
|
// ConsumerGroupCurrentMemberAssignment tombstone should be a no-op.
|
|
|
|
|
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord("foo", 10));
|
|
|
|
|
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord("foo", 10, 0));
|
|
|
|
|
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord("foo", "m1"));
|
|
|
|
|
assertThrows(UnknownMemberIdException.class, () -> context.groupMetadataManager.consumerGroup("foo").getOrMaybeCreateMember("m1", false));
|
|
|
|
|
|
|
|
|
@ -18706,7 +18706,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
|
|
|
|
|
// The group still exists but the member is already gone. Replaying the
|
|
|
|
|
// StreamsGroupMemberMetadata tombstone should be a no-op.
|
|
|
|
|
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo", 10));
|
|
|
|
|
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo", 10, 0));
|
|
|
|
|
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord("foo", "m1"));
|
|
|
|
|
assertThrows(UnknownMemberIdException.class, () -> context.groupMetadataManager.streamsGroup("foo").getMemberOrThrow("m1"));
|
|
|
|
|
|
|
|
|
@ -18762,7 +18762,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
.build();
|
|
|
|
|
|
|
|
|
|
// The group is created if it does not exist.
|
|
|
|
|
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo", 10));
|
|
|
|
|
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo", 10, 0));
|
|
|
|
|
assertEquals(10, context.groupMetadataManager.streamsGroup("foo").groupEpoch());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -18993,7 +18993,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
|
|
|
|
|
// The group still exists, but the member is already gone. Replaying the
|
|
|
|
|
// StreamsGroupCurrentMemberAssignment tombstone should be a no-op.
|
|
|
|
|
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo", 10));
|
|
|
|
|
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo", 10, 0));
|
|
|
|
|
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord("foo", "m1"));
|
|
|
|
|
assertThrows(UnknownMemberIdException.class, () -> context.groupMetadataManager.streamsGroup("foo").getMemberOrThrow("m1"));
|
|
|
|
|
|
|
|
|
@ -19069,7 +19069,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
|
|
|
|
|
// The group still exists, but the member is already gone. Replaying the
|
|
|
|
|
// StreamsGroupTopology tombstone should be a no-op.
|
|
|
|
|
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo", 10));
|
|
|
|
|
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo", 10, 0));
|
|
|
|
|
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecordTombstone("foo"));
|
|
|
|
|
assertTrue(context.groupMetadataManager.streamsGroup("foo").topology().isEmpty());
|
|
|
|
|
|
|
|
|
@ -19912,7 +19912,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
)
|
|
|
|
|
),
|
|
|
|
|
// The group epoch is bumped.
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11)
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0)
|
|
|
|
|
))
|
|
|
|
|
)
|
|
|
|
|
),
|
|
|
|
@ -20028,7 +20028,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
)
|
|
|
|
|
),
|
|
|
|
|
// The group epoch is bumped.
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11)
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0)
|
|
|
|
|
),
|
|
|
|
|
task.result.records()
|
|
|
|
|
);
|
|
|
|
@ -20090,7 +20090,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
// The member subscription is created.
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1),
|
|
|
|
|
// The group epoch is bumped.
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1, 0),
|
|
|
|
|
// The target assignment is created.
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, Map.of()),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 1),
|
|
|
|
@ -20198,7 +20198,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
)
|
|
|
|
|
),
|
|
|
|
|
// The group epoch is bumped.
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 2)
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 2, 0)
|
|
|
|
|
),
|
|
|
|
|
task.result.records()
|
|
|
|
|
);
|
|
|
|
@ -20340,7 +20340,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
foooTopicName, new TopicMetadata(foooTopicId, foooTopicName, 1)
|
|
|
|
|
)
|
|
|
|
|
)),
|
|
|
|
|
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11))
|
|
|
|
|
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0))
|
|
|
|
|
),
|
|
|
|
|
task.result.records()
|
|
|
|
|
);
|
|
|
|
@ -20477,7 +20477,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
context.time.milliseconds()
|
|
|
|
|
)
|
|
|
|
|
),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11)
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0)
|
|
|
|
|
))
|
|
|
|
|
)
|
|
|
|
|
),
|
|
|
|
@ -20567,7 +20567,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
barTopicName, new TopicMetadata(barTopicId, barTopicName, 3)
|
|
|
|
|
)
|
|
|
|
|
),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 12)
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 12, 0)
|
|
|
|
|
),
|
|
|
|
|
context.processTasks().get(0).result.records()
|
|
|
|
|
);
|
|
|
|
@ -20657,7 +20657,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
|
|
|
|
|
Map.of(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3))
|
|
|
|
|
),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11)
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0)
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
assertRecordsEquals(expectedRecords, result.records());
|
|
|
|
@ -20677,7 +20677,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId, memberId2),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId, "bar*"),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of()),
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 12)
|
|
|
|
|
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 12, 0)
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
)),
|
|
|
|
@ -20831,7 +20831,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
barTopicName, new TopicMetadata(barTopicId, barTopicName, 3)
|
|
|
|
|
))),
|
|
|
|
|
// Bumped epoch.
|
|
|
|
|
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11))
|
|
|
|
|
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0))
|
|
|
|
|
),
|
|
|
|
|
result.records()
|
|
|
|
|
);
|
|
|
|
@ -20863,7 +20863,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
MetadataDelta delta = new MetadataDelta(image);
|
|
|
|
|
context.groupMetadataManager.onNewMetadataImage(image, delta);
|
|
|
|
|
|
|
|
|
|
context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 0));
|
|
|
|
|
context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 0, 0));
|
|
|
|
|
|
|
|
|
|
context.replay(
|
|
|
|
|
GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord(
|
|
|
|
@ -20930,7 +20930,7 @@ public class GroupMetadataManagerTest {
|
|
|
|
|
MetadataDelta delta = new MetadataDelta(image);
|
|
|
|
|
context.groupMetadataManager.onNewMetadataImage(image, delta);
|
|
|
|
|
|
|
|
|
|
context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 0));
|
|
|
|
|
context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 0, 0));
|
|
|
|
|
|
|
|
|
|
context.replay(
|
|
|
|
|
GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord(
|
|
|
|
|