KAFKA-17747: [1/N] Add MetadataHash field to Consumer/Share/StreamGroupMetadataValue (#19504)

* Add MetadataHash field to ConsumerGroupMetadataValue,
ShareGroupMetadataValue, and StreamGroupMetadataValue.
* Add metadataHash field to
GroupCoordinatorRecordHelpers#newConsumerGroupEpochRecord,
GroupCoordinatorRecordHelpers#newShareGroupEpochRecord, and
StreamsCoordinatorRecordHelpers#newStreamsGroupEpochRecord.
* Add deprecated message to ConsumerGroupPartitionMetadataKey and
ConsumerGroupPartitionMetadataValue.
* ShareGroupPartitionMetadataKey / ShareGroupPartitionMetadataValue /
StreamGroupPartitionMetadataKey / StreamGroupPartitionMetadataValue will
be removed in next PR.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, David Jacot <djacot@confluent.io>

---------

Signed-off-by: PoAn Yang <payang@apache.org>
This commit is contained in:
PoAn Yang 2025-04-25 16:21:08 +08:00 committed by GitHub
parent 732ed0696b
commit 369cc569b4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 160 additions and 110 deletions

View File

@ -176,18 +176,21 @@ public class GroupCoordinatorRecordHelpers {
*
* @param groupId The consumer group id.
* @param newGroupEpoch The consumer group epoch.
* @param metadataHash The consumer group metadata hash.
* @return The record.
*/
public static CoordinatorRecord newConsumerGroupEpochRecord(
String groupId,
int newGroupEpoch
int newGroupEpoch,
long metadataHash
) {
return CoordinatorRecord.record(
new ConsumerGroupMetadataKey()
.setGroupId(groupId),
new ApiMessageAndVersion(
new ConsumerGroupMetadataValue()
.setEpoch(newGroupEpoch),
.setEpoch(newGroupEpoch)
.setMetadataHash(metadataHash),
(short) 0
)
);
@ -647,18 +650,21 @@ public class GroupCoordinatorRecordHelpers {
*
* @param groupId The group id.
* @param newGroupEpoch The group epoch.
* @param metadataHash The group metadata hash.
* @return The record.
*/
public static CoordinatorRecord newShareGroupEpochRecord(
String groupId,
int newGroupEpoch
int newGroupEpoch,
long metadataHash
) {
return CoordinatorRecord.record(
new ShareGroupMetadataKey()
.setGroupId(groupId),
new ApiMessageAndVersion(
new ShareGroupMetadataValue()
.setEpoch(newGroupEpoch),
.setEpoch(newGroupEpoch)
.setMetadataHash(metadataHash),
(short) 0
)
);

View File

@ -2069,7 +2069,7 @@ public class GroupMetadataManager {
int groupEpoch = group.groupEpoch();
if (bumpGroupEpoch) {
groupEpoch += 1;
records.add(newStreamsGroupEpochRecord(groupId, groupEpoch));
records.add(newStreamsGroupEpochRecord(groupId, groupEpoch, 0));
log.info("[GroupId {}][MemberId {}] Bumped streams group epoch to {}.", groupId, memberId, groupEpoch);
metrics.record(STREAMS_GROUP_REBALANCES_SENSOR_NAME);
group.setMetadataRefreshDeadline(currentTimeMs + METADATA_REFRESH_INTERVAL_MS, groupEpoch);
@ -2697,7 +2697,7 @@ public class GroupMetadataManager {
if (bumpGroupEpoch) {
groupEpoch += 1;
records.add(newShareGroupEpochRecord(groupId, groupEpoch));
records.add(newShareGroupEpochRecord(groupId, groupEpoch, 0));
log.info("[GroupId {}] Bumped group epoch to {}.", groupId, groupEpoch);
}
@ -3404,7 +3404,7 @@ public class GroupMetadataManager {
if (bumpGroupEpoch) {
int groupEpoch = group.groupEpoch() + 1;
records.add(newConsumerGroupEpochRecord(groupId, groupEpoch));
records.add(newConsumerGroupEpochRecord(groupId, groupEpoch, 0));
log.info("[GroupId {}] Bumped group epoch to {}.", groupId, groupEpoch);
metrics.record(CONSUMER_GROUP_REBALANCES_SENSOR_NAME);
group.setMetadataRefreshDeadline(
@ -3720,7 +3720,7 @@ public class GroupMetadataManager {
if (bumpGroupEpoch) {
groupEpoch += 1;
records.add(newConsumerGroupEpochRecord(groupId, groupEpoch));
records.add(newConsumerGroupEpochRecord(groupId, groupEpoch, 0));
log.info("[GroupId {}] Bumped group epoch to {}.", groupId, groupEpoch);
metrics.record(CONSUMER_GROUP_REBALANCES_SENSOR_NAME);
}
@ -4123,7 +4123,7 @@ public class GroupMetadataManager {
// We bump the group epoch.
int groupEpoch = group.groupEpoch() + 1;
records.add(newConsumerGroupEpochRecord(group.groupId(), groupEpoch));
records.add(newConsumerGroupEpochRecord(group.groupId(), groupEpoch, 0));
log.info("[GroupId {}] Bumped group epoch to {}.", group.groupId(), groupEpoch);
for (ConsumerGroupMember member : members) {
@ -4167,7 +4167,7 @@ public class GroupMetadataManager {
// We bump the group epoch.
int groupEpoch = group.groupEpoch() + 1;
records.add(newShareGroupEpochRecord(group.groupId(), groupEpoch));
records.add(newShareGroupEpochRecord(group.groupId(), groupEpoch, 0));
cancelGroupSessionTimeout(group.groupId(), member.memberId());
@ -4230,7 +4230,7 @@ public class GroupMetadataManager {
// We bump the group epoch.
int groupEpoch = group.groupEpoch() + 1;
records.add(newStreamsGroupEpochRecord(group.groupId(), groupEpoch));
records.add(newStreamsGroupEpochRecord(group.groupId(), groupEpoch, 0));
cancelTimers(group.groupId(), member.memberId());

View File

@ -1210,7 +1210,7 @@ public class ConsumerGroup extends ModernGroup<ConsumerGroupMember> {
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId(), consumerGroupMember))
);
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId(), groupEpoch()));
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId(), groupEpoch(), 0));
members().forEach((consumerGroupMemberId, consumerGroupMember) ->
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(

View File

@ -152,7 +152,8 @@ public class StreamsCoordinatorRecordHelpers {
public static CoordinatorRecord newStreamsGroupEpochRecord(
String groupId,
int newGroupEpoch
int newGroupEpoch,
long metadataHash
) {
Objects.requireNonNull(groupId, "groupId should not be null here");
@ -161,7 +162,8 @@ public class StreamsCoordinatorRecordHelpers {
.setGroupId(groupId),
new ApiMessageAndVersion(
new StreamsGroupMetadataValue()
.setEpoch(newGroupEpoch),
.setEpoch(newGroupEpoch)
.setMetadataHash(metadataHash),
(short) 0
)
);

View File

@ -21,6 +21,13 @@
"flexibleVersions": "0+",
"fields": [
{ "name": "Epoch", "versions": "0+", "type": "int32",
"about": "The group epoch." }
"about": "The group epoch." },
// The MetadataHash is added in 4.1 (KIP-1101). It's used to track
// subscribed topics in the group. When subscribed topics change,
// like partition count or rack change, the hash will be different.
// It indicates that the group should be rebalanced.
{ "name": "MetadataHash", "versions": "0+", "type": "int64",
"default": 0, "taggedVersions": "0+", "tag": 0,
"about": "The hash of all topics in the group." }
]
}

View File

@ -16,6 +16,8 @@
{
"apiKey": 4,
"type": "coordinator-key",
// This message is replaced by ConsumerGroupMetadataValue#MetadataHash
// in 4.1 (KIP-1101).
"name": "ConsumerGroupPartitionMetadataKey",
"validVersions": "0",
"flexibleVersions": "none",

View File

@ -16,6 +16,8 @@
{
"apiKey": 4,
"type": "coordinator-value",
// This message is replaced by ConsumerGroupMetadataValue#MetadataHash
// in 4.1 (KIP-1101).
"name": "ConsumerGroupPartitionMetadataValue",
"validVersions": "0",
"flexibleVersions": "0+",

View File

@ -22,6 +22,8 @@
"flexibleVersions": "0+",
"fields": [
{ "name": "Epoch", "type": "int32", "versions": "0+",
"about": "The group epoch." }
"about": "The group epoch." },
{ "name": "MetadataHash", "versions": "0+", "type": "int64",
"about": "The hash of all topics in the group." }
]
}

View File

@ -22,6 +22,8 @@
"flexibleVersions": "0+",
"fields": [
{ "name": "Epoch", "versions": "0+", "type": "int32",
"about": "The group epoch." }
"about": "The group epoch." },
{ "name": "MetadataHash", "versions": "0+", "type": "int64",
"about": "The hash of all topics in the group." }
]
}

View File

@ -45,6 +45,7 @@ import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataValue;
import org.apache.kafka.coordinator.group.modern.MemberState;
@ -84,6 +85,7 @@ import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.n
import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochTombstoneRecord;
import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord;
import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord;
import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupEpochRecord;
import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupEpochTombstoneRecord;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
@ -251,13 +253,15 @@ public class GroupCoordinatorRecordHelpersTest {
.setGroupId("group-id"),
new ApiMessageAndVersion(
new ConsumerGroupMetadataValue()
.setEpoch(10),
.setEpoch(10)
.setMetadataHash(10),
(short) 0
)
);
assertEquals(expectedRecord, newConsumerGroupEpochRecord(
"group-id",
10,
10
));
}
@ -855,6 +859,26 @@ public class GroupCoordinatorRecordHelpersTest {
assertEquals(expectedRecord, record);
}
@Test
public void testNewShareGroupEpochRecord() {
CoordinatorRecord expectedRecord = CoordinatorRecord.record(
new ShareGroupMetadataKey()
.setGroupId("group-id"),
new ApiMessageAndVersion(
new ShareGroupMetadataValue()
.setEpoch(10)
.setMetadataHash(10),
(short) 0
)
);
assertEquals(expectedRecord, newShareGroupEpochRecord(
"group-id",
10,
10
));
}
/**
* Creates a map of partitions to racks for testing.
*

View File

@ -489,7 +489,7 @@ public class GroupMetadataManagerTest {
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, member));
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 100));
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 100, 0));
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId, mkAssignment(
mkTopicAssignment(fooTopicId, 1, 2, 3)
@ -623,7 +623,7 @@ public class GroupMetadataManagerTest {
fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6),
barTopicName, new TopicMetadata(barTopicId, barTopicName, 3)
)),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1, 0),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId, mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5),
mkTopicAssignment(barTopicId, 0, 1, 2)
@ -721,7 +721,7 @@ public class GroupMetadataManagerTest {
fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6),
barTopicName, new TopicMetadata(barTopicId, barTopicName, 3)
)),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId, mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5),
mkTopicAssignment(barTopicId, 0, 1, 2)
@ -839,7 +839,7 @@ public class GroupMetadataManagerTest {
assertUnorderedRecordsEquals(
List.of(
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember3)),
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11)),
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0)),
List.of(
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1),
@ -947,7 +947,7 @@ public class GroupMetadataManagerTest {
fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6),
barTopicName, new TopicMetadata(barTopicId, barTopicName, 3)
)),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11)
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0)
);
assertRecordsEquals(expectedRecords, result.records());
@ -1064,7 +1064,7 @@ public class GroupMetadataManagerTest {
assertUnorderedRecordsEquals(
List.of(
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember3)),
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11)),
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0)),
List.of(
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1),
@ -1434,7 +1434,7 @@ public class GroupMetadataManagerTest {
fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6),
barTopicName, new TopicMetadata(barTopicId, barTopicName, 3)
)),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, member2RejoinId, mkAssignment(
mkTopicAssignment(fooTopicId, 3, 4, 5),
mkTopicAssignment(barTopicId, 0, 1, 2)
@ -1627,7 +1627,7 @@ public class GroupMetadataManagerTest {
fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6),
barTopicName, new TopicMetadata(barTopicId, barTopicName, 3)
)),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11)
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0)
);
assertRecordsEquals(expectedRecords, result.records());
@ -1805,7 +1805,7 @@ public class GroupMetadataManagerTest {
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, member));
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 100));
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 100, 0));
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId, mkAssignment(
mkTopicAssignment(fooTopicId, 1, 2, 3)
@ -2592,7 +2592,7 @@ public class GroupMetadataManagerTest {
.setState(MemberState.STABLE)
.setSubscribedTopicNames(List.of(fooTopicName))
.build()));
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11));
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0));
assertEquals(ConsumerGroup.ConsumerGroupState.ASSIGNING, context.consumerGroupState(groupId));
@ -2749,7 +2749,7 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
Map.of(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6))
),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId, mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
)),
@ -2873,7 +2873,7 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
Map.of(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6))
),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId, mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
)),
@ -3219,7 +3219,7 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId, memberId),
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId, memberId),
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of()),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 2)
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 2, 0)
)
)
)),
@ -3280,7 +3280,7 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId, memberId),
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId, memberId),
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of()),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11)
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0)
)
)
)
@ -3342,7 +3342,7 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentTombstoneRecord(groupId, memberId),
GroupCoordinatorRecordHelpers.newShareGroupMemberSubscriptionTombstoneRecord(groupId, memberId),
GroupCoordinatorRecordHelpers.newShareGroupSubscriptionMetadataRecord(groupId, Map.of()),
GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 2)
GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 2, 0)
)
)
)),
@ -3401,7 +3401,7 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentTombstoneRecord(groupId, memberId),
GroupCoordinatorRecordHelpers.newShareGroupMemberSubscriptionTombstoneRecord(groupId, memberId),
GroupCoordinatorRecordHelpers.newShareGroupSubscriptionMetadataRecord(groupId, Map.of()),
GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 11)
GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 11, 0)
)
)
)
@ -3482,7 +3482,7 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId, memberId),
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId, memberId),
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of()),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 2)
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 2, 0)
)
)
)),
@ -3758,7 +3758,7 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId, memberId1),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId, memberId1),
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId, memberId1),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 3)
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 3, 0)
)
)
)),
@ -8941,13 +8941,13 @@ public class GroupMetadataManagerTest {
.setProtocol("range")
.setCurrentStateTimestamp(context.time.milliseconds())));
// Create one share group record.
context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(shareGroupId, 6));
context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(shareGroupId, 6, 0));
context.commit();
ClassicGroup classicGroup = context.groupMetadataManager.getOrMaybeCreateClassicGroup(classicGroupId, false);
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(consumerGroupId, new ConsumerGroupMember.Builder(memberId1)
.setSubscribedTopicNames(List.of(fooTopicName))
.build()));
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(consumerGroupId, 11));
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(consumerGroupId, 11, 0));
// Test list group response without a group state or group type filter.
Map<String, ListGroupsResponseData.ListedGroup> actualAllGroupMap =
@ -9174,7 +9174,7 @@ public class GroupMetadataManagerTest {
ConsumerGroupMember.Builder memberBuilder1 = new ConsumerGroupMember.Builder(memberId1)
.setSubscribedTopicNames(List.of(topicName));
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(consumerGroupId, memberBuilder1.build()));
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(consumerGroupId, epoch + 1));
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(consumerGroupId, epoch + 1, 0));
Map<Uuid, Set<Integer>> assignmentMap = Map.of(topicId, Set.of());
@ -9182,7 +9182,7 @@ public class GroupMetadataManagerTest {
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(consumerGroupId, memberBuilder2.build()));
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(consumerGroupId, memberId2, assignmentMap));
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(consumerGroupId, memberBuilder2.build()));
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(consumerGroupId, epoch + 2));
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(consumerGroupId, epoch + 2, 0));
List<ConsumerGroupDescribeResponseData.DescribedGroup> actual = context.groupMetadataManager.consumerGroupDescribe(List.of(consumerGroupId), context.lastCommittedOffset);
ConsumerGroupDescribeResponseData.DescribedGroup describedGroup = new ConsumerGroupDescribeResponseData.DescribedGroup()
@ -9279,7 +9279,7 @@ public class GroupMetadataManagerTest {
StreamsGroupMember.Builder memberBuilder1 = streamsGroupMemberBuilderWithDefaults(memberId1);
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(streamsGroupId, memberBuilder1.build()));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(streamsGroupId, memberBuilder1.build()));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(streamsGroupId, epoch + 1));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(streamsGroupId, epoch + 1, 0));
TasksTuple assignment = new TasksTuple(
Map.of(subtopologyId, Set.of(0, 1)),
@ -9291,7 +9291,7 @@ public class GroupMetadataManagerTest {
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(streamsGroupId, memberBuilder2.build()));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(streamsGroupId, memberId2, assignment));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(streamsGroupId, memberBuilder2.build()));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(streamsGroupId, epoch + 2));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(streamsGroupId, epoch + 2, 0));
List<StreamsGroupDescribeResponseData.DescribedGroup> actual = context.groupMetadataManager.streamsGroupDescribe(List.of(streamsGroupId), context.lastCommittedOffset);
StreamsGroupDescribeResponseData.DescribedGroup describedGroup = new StreamsGroupDescribeResponseData.DescribedGroup()
@ -10169,7 +10169,7 @@ public class GroupMetadataManagerTest {
List.of(
GroupCoordinatorRecordHelpers.newGroupMetadataTombstoneRecord(classicGroupId),
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(classicGroupId, expectedMember),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(classicGroupId, 1),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(classicGroupId, 1, 0),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(classicGroupId, memberId, Map.of()),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(classicGroupId, 1),
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(classicGroupId, expectedMember)
@ -10333,7 +10333,7 @@ public class GroupMetadataManagerTest {
// Create the new consumer group with member 1.
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 0),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 0, 0),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, expectedMember1.assignedPartitions()),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 0),
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember1),
@ -10348,7 +10348,7 @@ public class GroupMetadataManagerTest {
)),
// Newly joining member 2 bumps the group epoch. A new target assignment is computed.
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1, 0),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId2, assignor.targetPartitions(memberId2)),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, assignor.targetPartitions(memberId1)),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 1),
@ -10548,7 +10548,7 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1),
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember2),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 0),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 0, 0),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, expectedMember1.assignedPartitions()),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId2, expectedMember2.assignedPartitions()),
@ -10567,7 +10567,7 @@ public class GroupMetadataManagerTest {
)),
// Newly joining member 3 bumps the group epoch. A new target assignment is computed.
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1, 0),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, assignor.targetPartitions(memberId1)),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId3, assignor.targetPartitions(memberId3)),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 1),
@ -10824,7 +10824,7 @@ public class GroupMetadataManagerTest {
// Create the new consumer group with the static member.
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedClassicMember),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 0),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 0, 0),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId, expectedClassicMember.assignedPartitions()),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 0),
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedClassicMember),
@ -10846,7 +10846,7 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 1))),
// Newly joining static member bumps the group epoch. A new target assignment is computed.
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1, 0),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, newMemberId, mkAssignment(mkTopicAssignment(fooTopicId, 0))),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 1),
@ -10950,7 +10950,7 @@ public class GroupMetadataManagerTest {
// Create the new consumer group with member 1.
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1, 0),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, expectedMember1.assignedPartitions()),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 1),
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember1),
@ -10965,7 +10965,7 @@ public class GroupMetadataManagerTest {
)),
// Newly joining member 2 bumps the group epoch. A new target assignment is computed.
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 2),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 2, 0),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId2, Map.of()),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 2),
@ -11335,7 +11335,7 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1),
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember2),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1, 0),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, expectedMember1.assignedPartitions()),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId2, expectedMember2.assignedPartitions()),
@ -11354,7 +11354,7 @@ public class GroupMetadataManagerTest {
)),
// Newly joining member 3 bumps the group epoch. A new target assignment is computed.
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 2),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 2, 0),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, assignor.targetPartitions(memberId1)),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId3, assignor.targetPartitions(memberId3)),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 2),
@ -12432,7 +12432,7 @@ public class GroupMetadataManagerTest {
fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2),
barTopicName, new TopicMetadata(barTopicId, barTopicName, 1)
))),
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11)),
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0)),
List.of(
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId, assignor.targetPartitions(memberId)),
@ -12593,7 +12593,7 @@ public class GroupMetadataManagerTest {
fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2),
barTopicName, new TopicMetadata(barTopicId, barTopicName, 1)
)),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, newMemberId, Map.of()),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 11),
@ -12849,7 +12849,7 @@ public class GroupMetadataManagerTest {
assertUnorderedRecordsEquals(
List.of(
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember)),
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11)),
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0)),
List.of(
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, mkAssignment(
mkTopicAssignment(fooTopicId, 0),
@ -13080,7 +13080,7 @@ public class GroupMetadataManagerTest {
barTopicName, new TopicMetadata(barTopicId, barTopicName, 1),
zarTopicName, new TopicMetadata(zarTopicId, zarTopicName, 1)
))),
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11)),
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0)),
List.of(
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, mkAssignment(
@ -13315,7 +13315,7 @@ public class GroupMetadataManagerTest {
barTopicName, new TopicMetadata(barTopicId, barTopicName, 1),
zarTopicName, new TopicMetadata(zarTopicId, zarTopicName, 1)
))),
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11)),
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0)),
List.of(
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, mkAssignment(
@ -14125,7 +14125,7 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId, memberId),
// The group epoch is bumped.
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11)
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0)
),
timeout.result.records()
);
@ -14190,7 +14190,7 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId, memberId),
// The group epoch is bumped.
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11)
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0)
),
timeout.result.records()
);
@ -14400,7 +14400,7 @@ public class GroupMetadataManagerTest {
// Update subscription metadata.
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of())),
// Bump the group epoch.
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11))
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0))
);
assertUnorderedRecordsEquals(expectedRecords, leaveResult.records());
@ -14510,7 +14510,7 @@ public class GroupMetadataManagerTest {
Map.of(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2))
),
// Bump the group epoch.
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11)
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0)
);
assertEquals(expectedRecords, leaveResult.records());
}
@ -14916,8 +14916,8 @@ public class GroupMetadataManagerTest {
));
List<String> groupIds = List.of("group-id-1", "group-id-2");
context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupIds.get(0), 100));
context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupIds.get(1), 15));
context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupIds.get(0), 100, 0));
context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupIds.get(1), 15, 0));
Uuid topicId = Uuid.randomUuid();
String topicName = "foo";
@ -15224,7 +15224,7 @@ public class GroupMetadataManagerTest {
fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6),
barTopicName, new TopicMetadata(barTopicId, barTopicName, 3)
)),
GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 1),
GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 1, 0),
GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentRecord(groupId, memberId, mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5),
mkTopicAssignment(barTopicId, 0, 1, 2)
@ -15346,7 +15346,7 @@ public class GroupMetadataManagerTest {
fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6),
barTopicName, new TopicMetadata(barTopicId, barTopicName, 3)
)),
GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 11)
GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 11, 0)
);
assertRecordsEquals(expectedRecords, result.records());
@ -15371,7 +15371,7 @@ public class GroupMetadataManagerTest {
Map.of()
));
context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 100));
context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 100, 0));
Uuid fooTopicId = Uuid.randomUuid();
String fooTopicName = "foo";
@ -15451,7 +15451,7 @@ public class GroupMetadataManagerTest {
.withShareGroup(new ShareGroupBuilder(groupId, 10))
.build();
context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 10));
context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 10, 0));
assertEquals(ShareGroup.ShareGroupState.EMPTY, context.shareGroupState(groupId));
@ -15459,7 +15459,7 @@ public class GroupMetadataManagerTest {
.setState(MemberState.STABLE)
.setSubscribedTopicNames(List.of(fooTopicName))
.build()));
context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 11));
context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 11, 0));
assertEquals(ShareGroup.ShareGroupState.STABLE, context.shareGroupState(groupId));
@ -15740,7 +15740,7 @@ public class GroupMetadataManagerTest {
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, member));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 100));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 100, 0));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId, topology));
@ -15927,7 +15927,7 @@ public class GroupMetadataManagerTest {
fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6),
barTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId, barTopicName, 3)
)),
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1),
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1, 0),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5),
@ -16010,7 +16010,7 @@ public class GroupMetadataManagerTest {
fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6)
)
),
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1),
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1, 0),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId, TasksTuple.EMPTY),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, 1),
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, expectedMember)
@ -16094,7 +16094,7 @@ public class GroupMetadataManagerTest {
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId, Map.of(
fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6)
)),
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1),
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1, 0),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId, TasksTuple.EMPTY),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, 1),
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, expectedMember)
@ -16176,7 +16176,7 @@ public class GroupMetadataManagerTest {
fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6),
barTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId, barTopicName, 3)
)),
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1),
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1, 0),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId, TasksTuple.EMPTY),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, 1),
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, expectedMember)
@ -16270,7 +16270,7 @@ public class GroupMetadataManagerTest {
fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6),
barTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId, barTopicName, 3)
)),
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11),
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, 0),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId, TasksTuple.EMPTY),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, 11),
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, expectedMember)
@ -16548,7 +16548,7 @@ public class GroupMetadataManagerTest {
List<CoordinatorRecord> expectedRecords = List.of(
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, expectedMember),
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11),
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, 0),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5),
@ -16650,7 +16650,7 @@ public class GroupMetadataManagerTest {
fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6),
barTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId, barTopicName, changedPartitionCount)
)),
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11),
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, 0),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5),
@ -16824,7 +16824,7 @@ public class GroupMetadataManagerTest {
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord(groupId, memberId2),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord(groupId, memberId2),
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord(groupId, memberId2),
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11)
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, 0)
);
assertRecordsEquals(expectedRecords, result.records());
@ -17369,7 +17369,7 @@ public class GroupMetadataManagerTest {
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId, topology));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, streamsGroupMemberBuilderWithDefaults(memberId1)
.build()));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, 0));
assertEquals(StreamsGroupState.NOT_READY, context.streamsGroupState(groupId));
@ -17532,7 +17532,7 @@ public class GroupMetadataManagerTest {
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId,
Map.of(fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6))
),
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11),
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, 0),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5)
@ -17648,7 +17648,7 @@ public class GroupMetadataManagerTest {
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId,
Map.of(fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6))
),
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11),
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, 0),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5)
@ -17793,7 +17793,7 @@ public class GroupMetadataManagerTest {
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord(groupId, memberId),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord(groupId, memberId),
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord(groupId, memberId),
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 2)
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 2, 0)
)
)
)),
@ -18088,7 +18088,7 @@ public class GroupMetadataManagerTest {
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord(groupId, memberId1),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord(groupId, memberId1),
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord(groupId, memberId1),
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 3)
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 3, 0)
)
)
)),
@ -18539,7 +18539,7 @@ public class GroupMetadataManagerTest {
// The group still exists but the member is already gone. Replaying the
// ConsumerGroupMemberMetadata tombstone should be a no-op.
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord("foo", 10));
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord("foo", 10, 0));
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord("foo", "m1"));
assertThrows(UnknownMemberIdException.class, () -> context.groupMetadataManager.consumerGroup("foo").getOrMaybeCreateMember("m1", false));
@ -18555,7 +18555,7 @@ public class GroupMetadataManagerTest {
.build();
// The group is created if it does not exist.
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord("foo", 10));
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord("foo", 10, 0));
assertEquals(10, context.groupMetadataManager.consumerGroup("foo").groupEpoch());
}
@ -18667,7 +18667,7 @@ public class GroupMetadataManagerTest {
// The group still exists but the member is already gone. Replaying the
// ConsumerGroupCurrentMemberAssignment tombstone should be a no-op.
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord("foo", 10));
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord("foo", 10, 0));
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord("foo", "m1"));
assertThrows(UnknownMemberIdException.class, () -> context.groupMetadataManager.consumerGroup("foo").getOrMaybeCreateMember("m1", false));
@ -18706,7 +18706,7 @@ public class GroupMetadataManagerTest {
// The group still exists but the member is already gone. Replaying the
// StreamsGroupMemberMetadata tombstone should be a no-op.
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo", 10));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo", 10, 0));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord("foo", "m1"));
assertThrows(UnknownMemberIdException.class, () -> context.groupMetadataManager.streamsGroup("foo").getMemberOrThrow("m1"));
@ -18762,7 +18762,7 @@ public class GroupMetadataManagerTest {
.build();
// The group is created if it does not exist.
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo", 10));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo", 10, 0));
assertEquals(10, context.groupMetadataManager.streamsGroup("foo").groupEpoch());
}
@ -18993,7 +18993,7 @@ public class GroupMetadataManagerTest {
// The group still exists, but the member is already gone. Replaying the
// StreamsGroupCurrentMemberAssignment tombstone should be a no-op.
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo", 10));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo", 10, 0));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord("foo", "m1"));
assertThrows(UnknownMemberIdException.class, () -> context.groupMetadataManager.streamsGroup("foo").getMemberOrThrow("m1"));
@ -19069,7 +19069,7 @@ public class GroupMetadataManagerTest {
// The group still exists, but the member is already gone. Replaying the
// StreamsGroupTopology tombstone should be a no-op.
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo", 10));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo", 10, 0));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecordTombstone("foo"));
assertTrue(context.groupMetadataManager.streamsGroup("foo").topology().isEmpty());
@ -19912,7 +19912,7 @@ public class GroupMetadataManagerTest {
)
),
// The group epoch is bumped.
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11)
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0)
))
)
),
@ -20028,7 +20028,7 @@ public class GroupMetadataManagerTest {
)
),
// The group epoch is bumped.
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11)
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0)
),
task.result.records()
);
@ -20090,7 +20090,7 @@ public class GroupMetadataManagerTest {
// The member subscription is created.
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1),
// The group epoch is bumped.
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1, 0),
// The target assignment is created.
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, Map.of()),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 1),
@ -20198,7 +20198,7 @@ public class GroupMetadataManagerTest {
)
),
// The group epoch is bumped.
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 2)
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 2, 0)
),
task.result.records()
);
@ -20340,7 +20340,7 @@ public class GroupMetadataManagerTest {
foooTopicName, new TopicMetadata(foooTopicId, foooTopicName, 1)
)
)),
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11))
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0))
),
task.result.records()
);
@ -20477,7 +20477,7 @@ public class GroupMetadataManagerTest {
context.time.milliseconds()
)
),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11)
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0)
))
)
),
@ -20567,7 +20567,7 @@ public class GroupMetadataManagerTest {
barTopicName, new TopicMetadata(barTopicId, barTopicName, 3)
)
),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 12)
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 12, 0)
),
context.processTasks().get(0).result.records()
);
@ -20657,7 +20657,7 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
Map.of(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3))
),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11)
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0)
);
assertRecordsEquals(expectedRecords, result.records());
@ -20677,7 +20677,7 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId, memberId2),
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId, "bar*"),
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of()),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 12)
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 12, 0)
)
)
)),
@ -20831,7 +20831,7 @@ public class GroupMetadataManagerTest {
barTopicName, new TopicMetadata(barTopicId, barTopicName, 3)
))),
// Bumped epoch.
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11))
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0))
),
result.records()
);
@ -20863,7 +20863,7 @@ public class GroupMetadataManagerTest {
MetadataDelta delta = new MetadataDelta(image);
context.groupMetadataManager.onNewMetadataImage(image, delta);
context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 0));
context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 0, 0));
context.replay(
GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord(
@ -20930,7 +20930,7 @@ public class GroupMetadataManagerTest {
MetadataDelta delta = new MetadataDelta(image);
context.groupMetadataManager.onNewMetadataImage(image, delta);
context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 0));
context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 0, 0));
context.replay(
GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord(

View File

@ -108,7 +108,7 @@ public class ConsumerGroupBuilder {
}
// Add group epoch record.
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, groupEpoch));
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, groupEpoch, 0));
// Add target assignment records.
assignments.forEach((memberId, assignment) ->

View File

@ -94,7 +94,7 @@ public class ShareGroupBuilder {
}
// Add group epoch record.
records.add(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, groupEpoch));
records.add(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, groupEpoch, 0));
// Add target assignment records.
assignments.forEach((memberId, assignment) ->

View File

@ -300,12 +300,13 @@ class StreamsCoordinatorRecordHelpersTest {
.setGroupId(GROUP_ID),
new ApiMessageAndVersion(
new StreamsGroupMetadataValue()
.setEpoch(42),
.setEpoch(42)
.setMetadataHash(42),
(short) 0
)
);
assertEquals(expectedRecord, StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(GROUP_ID, 42));
assertEquals(expectedRecord, StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(GROUP_ID, 42, 42));
}
@Test
@ -740,7 +741,7 @@ class StreamsCoordinatorRecordHelpersTest {
@Test
public void testNewStreamsGroupEpochRecordNullGroupId() {
NullPointerException exception = assertThrows(NullPointerException.class, () ->
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(null, 1));
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(null, 1, 1));
assertEquals("groupId should not be null here", exception.getMessage());
}

View File

@ -85,7 +85,7 @@ public class StreamsGroupBuilder {
// Add group epoch record.
records.add(
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, groupEpoch));
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, groupEpoch, 0));
// Add target assignment records.
targetAssignments.forEach((memberId, assignment) ->

View File

@ -60,7 +60,8 @@ public class ShareGroupMessageFormatterTest extends CoordinatorRecordMessageForm
private static final ShareGroupMetadataKey SHARE_GROUP_METADATA_KEY = new ShareGroupMetadataKey()
.setGroupId("group-id");
private static final ShareGroupMetadataValue SHARE_GROUP_METADATA_VALUE = new ShareGroupMetadataValue()
.setEpoch(1);
.setEpoch(1)
.setMetadataHash(1);
private static final ShareGroupTargetAssignmentMetadataKey SHARE_GROUP_TARGET_ASSIGNMENT_METADATA_KEY = new ShareGroupTargetAssignmentMetadataKey()
.setGroupId("group-id");
private static final ShareGroupTargetAssignmentMetadataValue SHARE_GROUP_TARGET_ASSIGNMENT_METADATA_VALUE = new ShareGroupTargetAssignmentMetadataValue()
@ -154,7 +155,8 @@ public class ShareGroupMessageFormatterTest extends CoordinatorRecordMessageForm
"""
{"key":{"type":11,"data":{"groupId":"group-id"}},
"value":{"version":0,
"data":{"epoch":1}}}
"data":{"epoch":1,
"metadataHash":1}}}
"""
),
Arguments.of(