KAFKA-17747: [1/N] Add MetadataHash field to Consumer/Share/StreamGroupMetadataValue (#19504)

* Add MetadataHash field to ConsumerGroupMetadataValue,
ShareGroupMetadataValue, and StreamGroupMetadataValue.
* Add metadataHash field to
GroupCoordinatorRecordHelpers#newConsumerGroupEpochRecord,
GroupCoordinatorRecordHelpers#newShareGroupEpochRecord, and
StreamsCoordinatorRecordHelpers#newStreamsGroupEpochRecord.
* Add deprecated message to ConsumerGroupPartitionMetadataKey and
ConsumerGroupPartitionMetadataValue.
* ShareGroupPartitionMetadataKey / ShareGroupPartitionMetadataValue /
StreamGroupPartitionMetadataKey / StreamGroupPartitionMetadataValue will
be removed in next PR.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, David Jacot <djacot@confluent.io>

---------

Signed-off-by: PoAn Yang <payang@apache.org>
This commit is contained in:
PoAn Yang 2025-04-25 16:21:08 +08:00 committed by GitHub
parent 732ed0696b
commit 369cc569b4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 160 additions and 110 deletions

View File

@ -176,18 +176,21 @@ public class GroupCoordinatorRecordHelpers {
* *
* @param groupId The consumer group id. * @param groupId The consumer group id.
* @param newGroupEpoch The consumer group epoch. * @param newGroupEpoch The consumer group epoch.
* @param metadataHash The consumer group metadata hash.
* @return The record. * @return The record.
*/ */
public static CoordinatorRecord newConsumerGroupEpochRecord( public static CoordinatorRecord newConsumerGroupEpochRecord(
String groupId, String groupId,
int newGroupEpoch int newGroupEpoch,
long metadataHash
) { ) {
return CoordinatorRecord.record( return CoordinatorRecord.record(
new ConsumerGroupMetadataKey() new ConsumerGroupMetadataKey()
.setGroupId(groupId), .setGroupId(groupId),
new ApiMessageAndVersion( new ApiMessageAndVersion(
new ConsumerGroupMetadataValue() new ConsumerGroupMetadataValue()
.setEpoch(newGroupEpoch), .setEpoch(newGroupEpoch)
.setMetadataHash(metadataHash),
(short) 0 (short) 0
) )
); );
@ -647,18 +650,21 @@ public class GroupCoordinatorRecordHelpers {
* *
* @param groupId The group id. * @param groupId The group id.
* @param newGroupEpoch The group epoch. * @param newGroupEpoch The group epoch.
* @param metadataHash The group metadata hash.
* @return The record. * @return The record.
*/ */
public static CoordinatorRecord newShareGroupEpochRecord( public static CoordinatorRecord newShareGroupEpochRecord(
String groupId, String groupId,
int newGroupEpoch int newGroupEpoch,
long metadataHash
) { ) {
return CoordinatorRecord.record( return CoordinatorRecord.record(
new ShareGroupMetadataKey() new ShareGroupMetadataKey()
.setGroupId(groupId), .setGroupId(groupId),
new ApiMessageAndVersion( new ApiMessageAndVersion(
new ShareGroupMetadataValue() new ShareGroupMetadataValue()
.setEpoch(newGroupEpoch), .setEpoch(newGroupEpoch)
.setMetadataHash(metadataHash),
(short) 0 (short) 0
) )
); );

View File

@ -2069,7 +2069,7 @@ public class GroupMetadataManager {
int groupEpoch = group.groupEpoch(); int groupEpoch = group.groupEpoch();
if (bumpGroupEpoch) { if (bumpGroupEpoch) {
groupEpoch += 1; groupEpoch += 1;
records.add(newStreamsGroupEpochRecord(groupId, groupEpoch)); records.add(newStreamsGroupEpochRecord(groupId, groupEpoch, 0));
log.info("[GroupId {}][MemberId {}] Bumped streams group epoch to {}.", groupId, memberId, groupEpoch); log.info("[GroupId {}][MemberId {}] Bumped streams group epoch to {}.", groupId, memberId, groupEpoch);
metrics.record(STREAMS_GROUP_REBALANCES_SENSOR_NAME); metrics.record(STREAMS_GROUP_REBALANCES_SENSOR_NAME);
group.setMetadataRefreshDeadline(currentTimeMs + METADATA_REFRESH_INTERVAL_MS, groupEpoch); group.setMetadataRefreshDeadline(currentTimeMs + METADATA_REFRESH_INTERVAL_MS, groupEpoch);
@ -2697,7 +2697,7 @@ public class GroupMetadataManager {
if (bumpGroupEpoch) { if (bumpGroupEpoch) {
groupEpoch += 1; groupEpoch += 1;
records.add(newShareGroupEpochRecord(groupId, groupEpoch)); records.add(newShareGroupEpochRecord(groupId, groupEpoch, 0));
log.info("[GroupId {}] Bumped group epoch to {}.", groupId, groupEpoch); log.info("[GroupId {}] Bumped group epoch to {}.", groupId, groupEpoch);
} }
@ -3404,7 +3404,7 @@ public class GroupMetadataManager {
if (bumpGroupEpoch) { if (bumpGroupEpoch) {
int groupEpoch = group.groupEpoch() + 1; int groupEpoch = group.groupEpoch() + 1;
records.add(newConsumerGroupEpochRecord(groupId, groupEpoch)); records.add(newConsumerGroupEpochRecord(groupId, groupEpoch, 0));
log.info("[GroupId {}] Bumped group epoch to {}.", groupId, groupEpoch); log.info("[GroupId {}] Bumped group epoch to {}.", groupId, groupEpoch);
metrics.record(CONSUMER_GROUP_REBALANCES_SENSOR_NAME); metrics.record(CONSUMER_GROUP_REBALANCES_SENSOR_NAME);
group.setMetadataRefreshDeadline( group.setMetadataRefreshDeadline(
@ -3720,7 +3720,7 @@ public class GroupMetadataManager {
if (bumpGroupEpoch) { if (bumpGroupEpoch) {
groupEpoch += 1; groupEpoch += 1;
records.add(newConsumerGroupEpochRecord(groupId, groupEpoch)); records.add(newConsumerGroupEpochRecord(groupId, groupEpoch, 0));
log.info("[GroupId {}] Bumped group epoch to {}.", groupId, groupEpoch); log.info("[GroupId {}] Bumped group epoch to {}.", groupId, groupEpoch);
metrics.record(CONSUMER_GROUP_REBALANCES_SENSOR_NAME); metrics.record(CONSUMER_GROUP_REBALANCES_SENSOR_NAME);
} }
@ -4123,7 +4123,7 @@ public class GroupMetadataManager {
// We bump the group epoch. // We bump the group epoch.
int groupEpoch = group.groupEpoch() + 1; int groupEpoch = group.groupEpoch() + 1;
records.add(newConsumerGroupEpochRecord(group.groupId(), groupEpoch)); records.add(newConsumerGroupEpochRecord(group.groupId(), groupEpoch, 0));
log.info("[GroupId {}] Bumped group epoch to {}.", group.groupId(), groupEpoch); log.info("[GroupId {}] Bumped group epoch to {}.", group.groupId(), groupEpoch);
for (ConsumerGroupMember member : members) { for (ConsumerGroupMember member : members) {
@ -4167,7 +4167,7 @@ public class GroupMetadataManager {
// We bump the group epoch. // We bump the group epoch.
int groupEpoch = group.groupEpoch() + 1; int groupEpoch = group.groupEpoch() + 1;
records.add(newShareGroupEpochRecord(group.groupId(), groupEpoch)); records.add(newShareGroupEpochRecord(group.groupId(), groupEpoch, 0));
cancelGroupSessionTimeout(group.groupId(), member.memberId()); cancelGroupSessionTimeout(group.groupId(), member.memberId());
@ -4230,7 +4230,7 @@ public class GroupMetadataManager {
// We bump the group epoch. // We bump the group epoch.
int groupEpoch = group.groupEpoch() + 1; int groupEpoch = group.groupEpoch() + 1;
records.add(newStreamsGroupEpochRecord(group.groupId(), groupEpoch)); records.add(newStreamsGroupEpochRecord(group.groupId(), groupEpoch, 0));
cancelTimers(group.groupId(), member.memberId()); cancelTimers(group.groupId(), member.memberId());

View File

@ -1210,7 +1210,7 @@ public class ConsumerGroup extends ModernGroup<ConsumerGroupMember> {
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId(), consumerGroupMember)) records.add(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId(), consumerGroupMember))
); );
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId(), groupEpoch())); records.add(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId(), groupEpoch(), 0));
members().forEach((consumerGroupMemberId, consumerGroupMember) -> members().forEach((consumerGroupMemberId, consumerGroupMember) ->
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord( records.add(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(

View File

@ -152,7 +152,8 @@ public class StreamsCoordinatorRecordHelpers {
public static CoordinatorRecord newStreamsGroupEpochRecord( public static CoordinatorRecord newStreamsGroupEpochRecord(
String groupId, String groupId,
int newGroupEpoch int newGroupEpoch,
long metadataHash
) { ) {
Objects.requireNonNull(groupId, "groupId should not be null here"); Objects.requireNonNull(groupId, "groupId should not be null here");
@ -161,7 +162,8 @@ public class StreamsCoordinatorRecordHelpers {
.setGroupId(groupId), .setGroupId(groupId),
new ApiMessageAndVersion( new ApiMessageAndVersion(
new StreamsGroupMetadataValue() new StreamsGroupMetadataValue()
.setEpoch(newGroupEpoch), .setEpoch(newGroupEpoch)
.setMetadataHash(metadataHash),
(short) 0 (short) 0
) )
); );

View File

@ -21,6 +21,13 @@
"flexibleVersions": "0+", "flexibleVersions": "0+",
"fields": [ "fields": [
{ "name": "Epoch", "versions": "0+", "type": "int32", { "name": "Epoch", "versions": "0+", "type": "int32",
"about": "The group epoch." } "about": "The group epoch." },
// The MetadataHash is added in 4.1 (KIP-1101). It's used to track
// subscribed topics in the group. When subscribed topics change,
// like partition count or rack change, the hash will be different.
// It indicates that the group should be rebalanced.
{ "name": "MetadataHash", "versions": "0+", "type": "int64",
"default": 0, "taggedVersions": "0+", "tag": 0,
"about": "The hash of all topics in the group." }
] ]
} }

View File

@ -16,6 +16,8 @@
{ {
"apiKey": 4, "apiKey": 4,
"type": "coordinator-key", "type": "coordinator-key",
// This message is replaced by ConsumerGroupMetadataValue#MetadataHash
// in 4.1 (KIP-1101).
"name": "ConsumerGroupPartitionMetadataKey", "name": "ConsumerGroupPartitionMetadataKey",
"validVersions": "0", "validVersions": "0",
"flexibleVersions": "none", "flexibleVersions": "none",

View File

@ -16,6 +16,8 @@
{ {
"apiKey": 4, "apiKey": 4,
"type": "coordinator-value", "type": "coordinator-value",
// This message is replaced by ConsumerGroupMetadataValue#MetadataHash
// in 4.1 (KIP-1101).
"name": "ConsumerGroupPartitionMetadataValue", "name": "ConsumerGroupPartitionMetadataValue",
"validVersions": "0", "validVersions": "0",
"flexibleVersions": "0+", "flexibleVersions": "0+",

View File

@ -22,6 +22,8 @@
"flexibleVersions": "0+", "flexibleVersions": "0+",
"fields": [ "fields": [
{ "name": "Epoch", "type": "int32", "versions": "0+", { "name": "Epoch", "type": "int32", "versions": "0+",
"about": "The group epoch." } "about": "The group epoch." },
{ "name": "MetadataHash", "versions": "0+", "type": "int64",
"about": "The hash of all topics in the group." }
] ]
} }

View File

@ -22,6 +22,8 @@
"flexibleVersions": "0+", "flexibleVersions": "0+",
"fields": [ "fields": [
{ "name": "Epoch", "versions": "0+", "type": "int32", { "name": "Epoch", "versions": "0+", "type": "int32",
"about": "The group epoch." } "about": "The group epoch." },
{ "name": "MetadataHash", "versions": "0+", "type": "int64",
"about": "The hash of all topics in the group." }
] ]
} }

View File

@ -45,6 +45,7 @@ import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.OffsetCommitKey; import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
import org.apache.kafka.coordinator.group.generated.OffsetCommitValue; import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataKey; import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataKey; import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataValue; import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataValue;
import org.apache.kafka.coordinator.group.modern.MemberState; import org.apache.kafka.coordinator.group.modern.MemberState;
@ -84,6 +85,7 @@ import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.n
import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochTombstoneRecord; import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochTombstoneRecord;
import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord; import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord;
import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord; import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord;
import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupEpochRecord;
import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupEpochTombstoneRecord; import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupEpochTombstoneRecord;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertThrows;
@ -251,13 +253,15 @@ public class GroupCoordinatorRecordHelpersTest {
.setGroupId("group-id"), .setGroupId("group-id"),
new ApiMessageAndVersion( new ApiMessageAndVersion(
new ConsumerGroupMetadataValue() new ConsumerGroupMetadataValue()
.setEpoch(10), .setEpoch(10)
.setMetadataHash(10),
(short) 0 (short) 0
) )
); );
assertEquals(expectedRecord, newConsumerGroupEpochRecord( assertEquals(expectedRecord, newConsumerGroupEpochRecord(
"group-id", "group-id",
10,
10 10
)); ));
} }
@ -855,6 +859,26 @@ public class GroupCoordinatorRecordHelpersTest {
assertEquals(expectedRecord, record); assertEquals(expectedRecord, record);
} }
@Test
public void testNewShareGroupEpochRecord() {
CoordinatorRecord expectedRecord = CoordinatorRecord.record(
new ShareGroupMetadataKey()
.setGroupId("group-id"),
new ApiMessageAndVersion(
new ShareGroupMetadataValue()
.setEpoch(10)
.setMetadataHash(10),
(short) 0
)
);
assertEquals(expectedRecord, newShareGroupEpochRecord(
"group-id",
10,
10
));
}
/** /**
* Creates a map of partitions to racks for testing. * Creates a map of partitions to racks for testing.
* *

View File

@ -489,7 +489,7 @@ public class GroupMetadataManagerTest {
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, member)); context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, member));
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 100)); context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 100, 0));
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId, mkAssignment( context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId, mkAssignment(
mkTopicAssignment(fooTopicId, 1, 2, 3) mkTopicAssignment(fooTopicId, 1, 2, 3)
@ -623,7 +623,7 @@ public class GroupMetadataManagerTest {
fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6), fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6),
barTopicName, new TopicMetadata(barTopicId, barTopicName, 3) barTopicName, new TopicMetadata(barTopicId, barTopicName, 3)
)), )),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1), GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1, 0),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId, mkAssignment( GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId, mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5), mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5),
mkTopicAssignment(barTopicId, 0, 1, 2) mkTopicAssignment(barTopicId, 0, 1, 2)
@ -721,7 +721,7 @@ public class GroupMetadataManagerTest {
fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6), fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6),
barTopicName, new TopicMetadata(barTopicId, barTopicName, 3) barTopicName, new TopicMetadata(barTopicId, barTopicName, 3)
)), )),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11), GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId, mkAssignment( GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId, mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5), mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5),
mkTopicAssignment(barTopicId, 0, 1, 2) mkTopicAssignment(barTopicId, 0, 1, 2)
@ -839,7 +839,7 @@ public class GroupMetadataManagerTest {
assertUnorderedRecordsEquals( assertUnorderedRecordsEquals(
List.of( List.of(
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember3)), List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember3)),
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11)), List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0)),
List.of( List.of(
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, mkAssignment( GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1), mkTopicAssignment(fooTopicId, 0, 1),
@ -947,7 +947,7 @@ public class GroupMetadataManagerTest {
fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6), fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6),
barTopicName, new TopicMetadata(barTopicId, barTopicName, 3) barTopicName, new TopicMetadata(barTopicId, barTopicName, 3)
)), )),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11) GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0)
); );
assertRecordsEquals(expectedRecords, result.records()); assertRecordsEquals(expectedRecords, result.records());
@ -1064,7 +1064,7 @@ public class GroupMetadataManagerTest {
assertUnorderedRecordsEquals( assertUnorderedRecordsEquals(
List.of( List.of(
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember3)), List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember3)),
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11)), List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0)),
List.of( List.of(
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, mkAssignment( GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1), mkTopicAssignment(fooTopicId, 0, 1),
@ -1434,7 +1434,7 @@ public class GroupMetadataManagerTest {
fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6), fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6),
barTopicName, new TopicMetadata(barTopicId, barTopicName, 3) barTopicName, new TopicMetadata(barTopicId, barTopicName, 3)
)), )),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11), GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, member2RejoinId, mkAssignment( GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, member2RejoinId, mkAssignment(
mkTopicAssignment(fooTopicId, 3, 4, 5), mkTopicAssignment(fooTopicId, 3, 4, 5),
mkTopicAssignment(barTopicId, 0, 1, 2) mkTopicAssignment(barTopicId, 0, 1, 2)
@ -1627,7 +1627,7 @@ public class GroupMetadataManagerTest {
fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6), fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6),
barTopicName, new TopicMetadata(barTopicId, barTopicName, 3) barTopicName, new TopicMetadata(barTopicId, barTopicName, 3)
)), )),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11) GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0)
); );
assertRecordsEquals(expectedRecords, result.records()); assertRecordsEquals(expectedRecords, result.records());
@ -1805,7 +1805,7 @@ public class GroupMetadataManagerTest {
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, member)); context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, member));
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 100)); context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 100, 0));
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId, mkAssignment( context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId, mkAssignment(
mkTopicAssignment(fooTopicId, 1, 2, 3) mkTopicAssignment(fooTopicId, 1, 2, 3)
@ -2592,7 +2592,7 @@ public class GroupMetadataManagerTest {
.setState(MemberState.STABLE) .setState(MemberState.STABLE)
.setSubscribedTopicNames(List.of(fooTopicName)) .setSubscribedTopicNames(List.of(fooTopicName))
.build())); .build()));
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11)); context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0));
assertEquals(ConsumerGroup.ConsumerGroupState.ASSIGNING, context.consumerGroupState(groupId)); assertEquals(ConsumerGroup.ConsumerGroupState.ASSIGNING, context.consumerGroupState(groupId));
@ -2749,7 +2749,7 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
Map.of(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6)) Map.of(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6))
), ),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11), GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId, mkAssignment( GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId, mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5) mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
)), )),
@ -2873,7 +2873,7 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
Map.of(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6)) Map.of(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6))
), ),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11), GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId, mkAssignment( GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId, mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5) mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
)), )),
@ -3219,7 +3219,7 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId, memberId), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId, memberId),
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId, memberId), GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId, memberId),
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of()), GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of()),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 2) GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 2, 0)
) )
) )
)), )),
@ -3280,7 +3280,7 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId, memberId), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId, memberId),
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId, memberId), GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId, memberId),
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of()), GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of()),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11) GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0)
) )
) )
) )
@ -3342,7 +3342,7 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentTombstoneRecord(groupId, memberId), GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentTombstoneRecord(groupId, memberId),
GroupCoordinatorRecordHelpers.newShareGroupMemberSubscriptionTombstoneRecord(groupId, memberId), GroupCoordinatorRecordHelpers.newShareGroupMemberSubscriptionTombstoneRecord(groupId, memberId),
GroupCoordinatorRecordHelpers.newShareGroupSubscriptionMetadataRecord(groupId, Map.of()), GroupCoordinatorRecordHelpers.newShareGroupSubscriptionMetadataRecord(groupId, Map.of()),
GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 2) GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 2, 0)
) )
) )
)), )),
@ -3401,7 +3401,7 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentTombstoneRecord(groupId, memberId), GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentTombstoneRecord(groupId, memberId),
GroupCoordinatorRecordHelpers.newShareGroupMemberSubscriptionTombstoneRecord(groupId, memberId), GroupCoordinatorRecordHelpers.newShareGroupMemberSubscriptionTombstoneRecord(groupId, memberId),
GroupCoordinatorRecordHelpers.newShareGroupSubscriptionMetadataRecord(groupId, Map.of()), GroupCoordinatorRecordHelpers.newShareGroupSubscriptionMetadataRecord(groupId, Map.of()),
GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 11) GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 11, 0)
) )
) )
) )
@ -3482,7 +3482,7 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId, memberId), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId, memberId),
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId, memberId), GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId, memberId),
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of()), GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of()),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 2) GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 2, 0)
) )
) )
)), )),
@ -3758,7 +3758,7 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId, memberId1), GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId, memberId1),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId, memberId1), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId, memberId1),
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId, memberId1), GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId, memberId1),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 3) GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 3, 0)
) )
) )
)), )),
@ -8941,13 +8941,13 @@ public class GroupMetadataManagerTest {
.setProtocol("range") .setProtocol("range")
.setCurrentStateTimestamp(context.time.milliseconds()))); .setCurrentStateTimestamp(context.time.milliseconds())));
// Create one share group record. // Create one share group record.
context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(shareGroupId, 6)); context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(shareGroupId, 6, 0));
context.commit(); context.commit();
ClassicGroup classicGroup = context.groupMetadataManager.getOrMaybeCreateClassicGroup(classicGroupId, false); ClassicGroup classicGroup = context.groupMetadataManager.getOrMaybeCreateClassicGroup(classicGroupId, false);
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(consumerGroupId, new ConsumerGroupMember.Builder(memberId1) context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(consumerGroupId, new ConsumerGroupMember.Builder(memberId1)
.setSubscribedTopicNames(List.of(fooTopicName)) .setSubscribedTopicNames(List.of(fooTopicName))
.build())); .build()));
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(consumerGroupId, 11)); context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(consumerGroupId, 11, 0));
// Test list group response without a group state or group type filter. // Test list group response without a group state or group type filter.
Map<String, ListGroupsResponseData.ListedGroup> actualAllGroupMap = Map<String, ListGroupsResponseData.ListedGroup> actualAllGroupMap =
@ -9174,7 +9174,7 @@ public class GroupMetadataManagerTest {
ConsumerGroupMember.Builder memberBuilder1 = new ConsumerGroupMember.Builder(memberId1) ConsumerGroupMember.Builder memberBuilder1 = new ConsumerGroupMember.Builder(memberId1)
.setSubscribedTopicNames(List.of(topicName)); .setSubscribedTopicNames(List.of(topicName));
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(consumerGroupId, memberBuilder1.build())); context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(consumerGroupId, memberBuilder1.build()));
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(consumerGroupId, epoch + 1)); context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(consumerGroupId, epoch + 1, 0));
Map<Uuid, Set<Integer>> assignmentMap = Map.of(topicId, Set.of()); Map<Uuid, Set<Integer>> assignmentMap = Map.of(topicId, Set.of());
@ -9182,7 +9182,7 @@ public class GroupMetadataManagerTest {
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(consumerGroupId, memberBuilder2.build())); context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(consumerGroupId, memberBuilder2.build()));
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(consumerGroupId, memberId2, assignmentMap)); context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(consumerGroupId, memberId2, assignmentMap));
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(consumerGroupId, memberBuilder2.build())); context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(consumerGroupId, memberBuilder2.build()));
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(consumerGroupId, epoch + 2)); context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(consumerGroupId, epoch + 2, 0));
List<ConsumerGroupDescribeResponseData.DescribedGroup> actual = context.groupMetadataManager.consumerGroupDescribe(List.of(consumerGroupId), context.lastCommittedOffset); List<ConsumerGroupDescribeResponseData.DescribedGroup> actual = context.groupMetadataManager.consumerGroupDescribe(List.of(consumerGroupId), context.lastCommittedOffset);
ConsumerGroupDescribeResponseData.DescribedGroup describedGroup = new ConsumerGroupDescribeResponseData.DescribedGroup() ConsumerGroupDescribeResponseData.DescribedGroup describedGroup = new ConsumerGroupDescribeResponseData.DescribedGroup()
@ -9279,7 +9279,7 @@ public class GroupMetadataManagerTest {
StreamsGroupMember.Builder memberBuilder1 = streamsGroupMemberBuilderWithDefaults(memberId1); StreamsGroupMember.Builder memberBuilder1 = streamsGroupMemberBuilderWithDefaults(memberId1);
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(streamsGroupId, memberBuilder1.build())); context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(streamsGroupId, memberBuilder1.build()));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(streamsGroupId, memberBuilder1.build())); context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(streamsGroupId, memberBuilder1.build()));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(streamsGroupId, epoch + 1)); context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(streamsGroupId, epoch + 1, 0));
TasksTuple assignment = new TasksTuple( TasksTuple assignment = new TasksTuple(
Map.of(subtopologyId, Set.of(0, 1)), Map.of(subtopologyId, Set.of(0, 1)),
@ -9291,7 +9291,7 @@ public class GroupMetadataManagerTest {
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(streamsGroupId, memberBuilder2.build())); context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(streamsGroupId, memberBuilder2.build()));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(streamsGroupId, memberId2, assignment)); context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(streamsGroupId, memberId2, assignment));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(streamsGroupId, memberBuilder2.build())); context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(streamsGroupId, memberBuilder2.build()));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(streamsGroupId, epoch + 2)); context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(streamsGroupId, epoch + 2, 0));
List<StreamsGroupDescribeResponseData.DescribedGroup> actual = context.groupMetadataManager.streamsGroupDescribe(List.of(streamsGroupId), context.lastCommittedOffset); List<StreamsGroupDescribeResponseData.DescribedGroup> actual = context.groupMetadataManager.streamsGroupDescribe(List.of(streamsGroupId), context.lastCommittedOffset);
StreamsGroupDescribeResponseData.DescribedGroup describedGroup = new StreamsGroupDescribeResponseData.DescribedGroup() StreamsGroupDescribeResponseData.DescribedGroup describedGroup = new StreamsGroupDescribeResponseData.DescribedGroup()
@ -10169,7 +10169,7 @@ public class GroupMetadataManagerTest {
List.of( List.of(
GroupCoordinatorRecordHelpers.newGroupMetadataTombstoneRecord(classicGroupId), GroupCoordinatorRecordHelpers.newGroupMetadataTombstoneRecord(classicGroupId),
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(classicGroupId, expectedMember), GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(classicGroupId, expectedMember),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(classicGroupId, 1), GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(classicGroupId, 1, 0),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(classicGroupId, memberId, Map.of()), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(classicGroupId, memberId, Map.of()),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(classicGroupId, 1), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(classicGroupId, 1),
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(classicGroupId, expectedMember) GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(classicGroupId, expectedMember)
@ -10333,7 +10333,7 @@ public class GroupMetadataManagerTest {
// Create the new consumer group with member 1. // Create the new consumer group with member 1.
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1), GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 0), GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 0, 0),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, expectedMember1.assignedPartitions()), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, expectedMember1.assignedPartitions()),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 0), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 0),
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember1), GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember1),
@ -10348,7 +10348,7 @@ public class GroupMetadataManagerTest {
)), )),
// Newly joining member 2 bumps the group epoch. A new target assignment is computed. // Newly joining member 2 bumps the group epoch. A new target assignment is computed.
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1), GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1, 0),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId2, assignor.targetPartitions(memberId2)), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId2, assignor.targetPartitions(memberId2)),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, assignor.targetPartitions(memberId1)), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, assignor.targetPartitions(memberId1)),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 1), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 1),
@ -10548,7 +10548,7 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1), GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1),
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember2), GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember2),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 0), GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 0, 0),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, expectedMember1.assignedPartitions()), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, expectedMember1.assignedPartitions()),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId2, expectedMember2.assignedPartitions()), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId2, expectedMember2.assignedPartitions()),
@ -10567,7 +10567,7 @@ public class GroupMetadataManagerTest {
)), )),
// Newly joining member 3 bumps the group epoch. A new target assignment is computed. // Newly joining member 3 bumps the group epoch. A new target assignment is computed.
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1), GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1, 0),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, assignor.targetPartitions(memberId1)), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, assignor.targetPartitions(memberId1)),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId3, assignor.targetPartitions(memberId3)), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId3, assignor.targetPartitions(memberId3)),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 1), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 1),
@ -10824,7 +10824,7 @@ public class GroupMetadataManagerTest {
// Create the new consumer group with the static member. // Create the new consumer group with the static member.
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedClassicMember), GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedClassicMember),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 0), GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 0, 0),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId, expectedClassicMember.assignedPartitions()), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId, expectedClassicMember.assignedPartitions()),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 0), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 0),
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedClassicMember), GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedClassicMember),
@ -10846,7 +10846,7 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 1))), GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 1))),
// Newly joining static member bumps the group epoch. A new target assignment is computed. // Newly joining static member bumps the group epoch. A new target assignment is computed.
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1), GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1, 0),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, newMemberId, mkAssignment(mkTopicAssignment(fooTopicId, 0))), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, newMemberId, mkAssignment(mkTopicAssignment(fooTopicId, 0))),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 1), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 1),
@ -10950,7 +10950,7 @@ public class GroupMetadataManagerTest {
// Create the new consumer group with member 1. // Create the new consumer group with member 1.
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1), GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1), GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1, 0),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, expectedMember1.assignedPartitions()), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, expectedMember1.assignedPartitions()),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 1), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 1),
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember1), GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember1),
@ -10965,7 +10965,7 @@ public class GroupMetadataManagerTest {
)), )),
// Newly joining member 2 bumps the group epoch. A new target assignment is computed. // Newly joining member 2 bumps the group epoch. A new target assignment is computed.
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 2), GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 2, 0),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId2, Map.of()), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId2, Map.of()),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 2), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 2),
@ -11335,7 +11335,7 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1), GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1),
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember2), GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember2),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1), GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1, 0),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, expectedMember1.assignedPartitions()), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, expectedMember1.assignedPartitions()),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId2, expectedMember2.assignedPartitions()), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId2, expectedMember2.assignedPartitions()),
@ -11354,7 +11354,7 @@ public class GroupMetadataManagerTest {
)), )),
// Newly joining member 3 bumps the group epoch. A new target assignment is computed. // Newly joining member 3 bumps the group epoch. A new target assignment is computed.
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 2), GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 2, 0),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, assignor.targetPartitions(memberId1)), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, assignor.targetPartitions(memberId1)),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId3, assignor.targetPartitions(memberId3)), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId3, assignor.targetPartitions(memberId3)),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 2), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 2),
@ -12432,7 +12432,7 @@ public class GroupMetadataManagerTest {
fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2), fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2),
barTopicName, new TopicMetadata(barTopicId, barTopicName, 1) barTopicName, new TopicMetadata(barTopicId, barTopicName, 1)
))), ))),
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11)), List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0)),
List.of( List.of(
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId, assignor.targetPartitions(memberId)), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId, assignor.targetPartitions(memberId)),
@ -12593,7 +12593,7 @@ public class GroupMetadataManagerTest {
fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2), fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2),
barTopicName, new TopicMetadata(barTopicId, barTopicName, 1) barTopicName, new TopicMetadata(barTopicId, barTopicName, 1)
)), )),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11), GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, newMemberId, Map.of()), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, newMemberId, Map.of()),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 11), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 11),
@ -12849,7 +12849,7 @@ public class GroupMetadataManagerTest {
assertUnorderedRecordsEquals( assertUnorderedRecordsEquals(
List.of( List.of(
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember)), List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember)),
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11)), List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0)),
List.of( List.of(
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, mkAssignment( GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, mkAssignment(
mkTopicAssignment(fooTopicId, 0), mkTopicAssignment(fooTopicId, 0),
@ -13080,7 +13080,7 @@ public class GroupMetadataManagerTest {
barTopicName, new TopicMetadata(barTopicId, barTopicName, 1), barTopicName, new TopicMetadata(barTopicId, barTopicName, 1),
zarTopicName, new TopicMetadata(zarTopicId, zarTopicName, 1) zarTopicName, new TopicMetadata(zarTopicId, zarTopicName, 1)
))), ))),
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11)), List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0)),
List.of( List.of(
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, mkAssignment( GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, mkAssignment(
@ -13315,7 +13315,7 @@ public class GroupMetadataManagerTest {
barTopicName, new TopicMetadata(barTopicId, barTopicName, 1), barTopicName, new TopicMetadata(barTopicId, barTopicName, 1),
zarTopicName, new TopicMetadata(zarTopicId, zarTopicName, 1) zarTopicName, new TopicMetadata(zarTopicId, zarTopicName, 1)
))), ))),
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11)), List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0)),
List.of( List.of(
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, mkAssignment( GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, mkAssignment(
@ -14125,7 +14125,7 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId, memberId), GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId, memberId),
// The group epoch is bumped. // The group epoch is bumped.
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11) GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0)
), ),
timeout.result.records() timeout.result.records()
); );
@ -14190,7 +14190,7 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId, memberId), GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId, memberId),
// The group epoch is bumped. // The group epoch is bumped.
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11) GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0)
), ),
timeout.result.records() timeout.result.records()
); );
@ -14400,7 +14400,7 @@ public class GroupMetadataManagerTest {
// Update subscription metadata. // Update subscription metadata.
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of())), List.of(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of())),
// Bump the group epoch. // Bump the group epoch.
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11)) List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0))
); );
assertUnorderedRecordsEquals(expectedRecords, leaveResult.records()); assertUnorderedRecordsEquals(expectedRecords, leaveResult.records());
@ -14510,7 +14510,7 @@ public class GroupMetadataManagerTest {
Map.of(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2)) Map.of(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2))
), ),
// Bump the group epoch. // Bump the group epoch.
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11) GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0)
); );
assertEquals(expectedRecords, leaveResult.records()); assertEquals(expectedRecords, leaveResult.records());
} }
@ -14916,8 +14916,8 @@ public class GroupMetadataManagerTest {
)); ));
List<String> groupIds = List.of("group-id-1", "group-id-2"); List<String> groupIds = List.of("group-id-1", "group-id-2");
context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupIds.get(0), 100)); context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupIds.get(0), 100, 0));
context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupIds.get(1), 15)); context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupIds.get(1), 15, 0));
Uuid topicId = Uuid.randomUuid(); Uuid topicId = Uuid.randomUuid();
String topicName = "foo"; String topicName = "foo";
@ -15224,7 +15224,7 @@ public class GroupMetadataManagerTest {
fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6), fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6),
barTopicName, new TopicMetadata(barTopicId, barTopicName, 3) barTopicName, new TopicMetadata(barTopicId, barTopicName, 3)
)), )),
GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 1), GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 1, 0),
GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentRecord(groupId, memberId, mkAssignment( GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentRecord(groupId, memberId, mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5), mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5),
mkTopicAssignment(barTopicId, 0, 1, 2) mkTopicAssignment(barTopicId, 0, 1, 2)
@ -15346,7 +15346,7 @@ public class GroupMetadataManagerTest {
fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6), fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6),
barTopicName, new TopicMetadata(barTopicId, barTopicName, 3) barTopicName, new TopicMetadata(barTopicId, barTopicName, 3)
)), )),
GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 11) GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 11, 0)
); );
assertRecordsEquals(expectedRecords, result.records()); assertRecordsEquals(expectedRecords, result.records());
@ -15371,7 +15371,7 @@ public class GroupMetadataManagerTest {
Map.of() Map.of()
)); ));
context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 100)); context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 100, 0));
Uuid fooTopicId = Uuid.randomUuid(); Uuid fooTopicId = Uuid.randomUuid();
String fooTopicName = "foo"; String fooTopicName = "foo";
@ -15451,7 +15451,7 @@ public class GroupMetadataManagerTest {
.withShareGroup(new ShareGroupBuilder(groupId, 10)) .withShareGroup(new ShareGroupBuilder(groupId, 10))
.build(); .build();
context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 10)); context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 10, 0));
assertEquals(ShareGroup.ShareGroupState.EMPTY, context.shareGroupState(groupId)); assertEquals(ShareGroup.ShareGroupState.EMPTY, context.shareGroupState(groupId));
@ -15459,7 +15459,7 @@ public class GroupMetadataManagerTest {
.setState(MemberState.STABLE) .setState(MemberState.STABLE)
.setSubscribedTopicNames(List.of(fooTopicName)) .setSubscribedTopicNames(List.of(fooTopicName))
.build())); .build()));
context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 11)); context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 11, 0));
assertEquals(ShareGroup.ShareGroupState.STABLE, context.shareGroupState(groupId)); assertEquals(ShareGroup.ShareGroupState.STABLE, context.shareGroupState(groupId));
@ -15740,7 +15740,7 @@ public class GroupMetadataManagerTest {
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, member)); context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, member));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 100)); context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 100, 0));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId, topology)); context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId, topology));
@ -15927,7 +15927,7 @@ public class GroupMetadataManagerTest {
fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6), fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6),
barTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId, barTopicName, 3) barTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId, barTopicName, 3)
)), )),
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1), StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1, 0),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId, StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5), TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5),
@ -16010,7 +16010,7 @@ public class GroupMetadataManagerTest {
fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6) fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6)
) )
), ),
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1), StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1, 0),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId, TasksTuple.EMPTY), StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId, TasksTuple.EMPTY),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, 1), StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, 1),
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, expectedMember) StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, expectedMember)
@ -16094,7 +16094,7 @@ public class GroupMetadataManagerTest {
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId, Map.of( StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId, Map.of(
fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6) fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6)
)), )),
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1), StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1, 0),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId, TasksTuple.EMPTY), StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId, TasksTuple.EMPTY),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, 1), StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, 1),
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, expectedMember) StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, expectedMember)
@ -16176,7 +16176,7 @@ public class GroupMetadataManagerTest {
fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6), fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6),
barTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId, barTopicName, 3) barTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId, barTopicName, 3)
)), )),
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1), StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1, 0),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId, TasksTuple.EMPTY), StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId, TasksTuple.EMPTY),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, 1), StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, 1),
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, expectedMember) StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, expectedMember)
@ -16270,7 +16270,7 @@ public class GroupMetadataManagerTest {
fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6), fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6),
barTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId, barTopicName, 3) barTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId, barTopicName, 3)
)), )),
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11), StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, 0),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId, TasksTuple.EMPTY), StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId, TasksTuple.EMPTY),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, 11), StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, 11),
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, expectedMember) StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, expectedMember)
@ -16548,7 +16548,7 @@ public class GroupMetadataManagerTest {
List<CoordinatorRecord> expectedRecords = List.of( List<CoordinatorRecord> expectedRecords = List.of(
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, expectedMember), StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, expectedMember),
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11), StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, 0),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId, StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5), TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5),
@ -16650,7 +16650,7 @@ public class GroupMetadataManagerTest {
fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6), fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6),
barTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId, barTopicName, changedPartitionCount) barTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId, barTopicName, changedPartitionCount)
)), )),
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11), StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, 0),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId, StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5), TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5),
@ -16824,7 +16824,7 @@ public class GroupMetadataManagerTest {
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord(groupId, memberId2), StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord(groupId, memberId2),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord(groupId, memberId2), StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord(groupId, memberId2),
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord(groupId, memberId2), StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord(groupId, memberId2),
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11) StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, 0)
); );
assertRecordsEquals(expectedRecords, result.records()); assertRecordsEquals(expectedRecords, result.records());
@ -17369,7 +17369,7 @@ public class GroupMetadataManagerTest {
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId, topology)); context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId, topology));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, streamsGroupMemberBuilderWithDefaults(memberId1) context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, streamsGroupMemberBuilderWithDefaults(memberId1)
.build())); .build()));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11)); context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, 0));
assertEquals(StreamsGroupState.NOT_READY, context.streamsGroupState(groupId)); assertEquals(StreamsGroupState.NOT_READY, context.streamsGroupState(groupId));
@ -17532,7 +17532,7 @@ public class GroupMetadataManagerTest {
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId, StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId,
Map.of(fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6)) Map.of(fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6))
), ),
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11), StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, 0),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId, StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5) TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5)
@ -17648,7 +17648,7 @@ public class GroupMetadataManagerTest {
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId, StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId,
Map.of(fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6)) Map.of(fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6))
), ),
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11), StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, 0),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId, StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5) TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5)
@ -17793,7 +17793,7 @@ public class GroupMetadataManagerTest {
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord(groupId, memberId), StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord(groupId, memberId),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord(groupId, memberId), StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord(groupId, memberId),
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord(groupId, memberId), StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord(groupId, memberId),
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 2) StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 2, 0)
) )
) )
)), )),
@ -18088,7 +18088,7 @@ public class GroupMetadataManagerTest {
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord(groupId, memberId1), StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord(groupId, memberId1),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord(groupId, memberId1), StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord(groupId, memberId1),
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord(groupId, memberId1), StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord(groupId, memberId1),
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 3) StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 3, 0)
) )
) )
)), )),
@ -18539,7 +18539,7 @@ public class GroupMetadataManagerTest {
// The group still exists but the member is already gone. Replaying the // The group still exists but the member is already gone. Replaying the
// ConsumerGroupMemberMetadata tombstone should be a no-op. // ConsumerGroupMemberMetadata tombstone should be a no-op.
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord("foo", 10)); context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord("foo", 10, 0));
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord("foo", "m1")); context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord("foo", "m1"));
assertThrows(UnknownMemberIdException.class, () -> context.groupMetadataManager.consumerGroup("foo").getOrMaybeCreateMember("m1", false)); assertThrows(UnknownMemberIdException.class, () -> context.groupMetadataManager.consumerGroup("foo").getOrMaybeCreateMember("m1", false));
@ -18555,7 +18555,7 @@ public class GroupMetadataManagerTest {
.build(); .build();
// The group is created if it does not exist. // The group is created if it does not exist.
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord("foo", 10)); context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord("foo", 10, 0));
assertEquals(10, context.groupMetadataManager.consumerGroup("foo").groupEpoch()); assertEquals(10, context.groupMetadataManager.consumerGroup("foo").groupEpoch());
} }
@ -18667,7 +18667,7 @@ public class GroupMetadataManagerTest {
// The group still exists but the member is already gone. Replaying the // The group still exists but the member is already gone. Replaying the
// ConsumerGroupCurrentMemberAssignment tombstone should be a no-op. // ConsumerGroupCurrentMemberAssignment tombstone should be a no-op.
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord("foo", 10)); context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord("foo", 10, 0));
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord("foo", "m1")); context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord("foo", "m1"));
assertThrows(UnknownMemberIdException.class, () -> context.groupMetadataManager.consumerGroup("foo").getOrMaybeCreateMember("m1", false)); assertThrows(UnknownMemberIdException.class, () -> context.groupMetadataManager.consumerGroup("foo").getOrMaybeCreateMember("m1", false));
@ -18706,7 +18706,7 @@ public class GroupMetadataManagerTest {
// The group still exists but the member is already gone. Replaying the // The group still exists but the member is already gone. Replaying the
// StreamsGroupMemberMetadata tombstone should be a no-op. // StreamsGroupMemberMetadata tombstone should be a no-op.
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo", 10)); context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo", 10, 0));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord("foo", "m1")); context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord("foo", "m1"));
assertThrows(UnknownMemberIdException.class, () -> context.groupMetadataManager.streamsGroup("foo").getMemberOrThrow("m1")); assertThrows(UnknownMemberIdException.class, () -> context.groupMetadataManager.streamsGroup("foo").getMemberOrThrow("m1"));
@ -18762,7 +18762,7 @@ public class GroupMetadataManagerTest {
.build(); .build();
// The group is created if it does not exist. // The group is created if it does not exist.
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo", 10)); context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo", 10, 0));
assertEquals(10, context.groupMetadataManager.streamsGroup("foo").groupEpoch()); assertEquals(10, context.groupMetadataManager.streamsGroup("foo").groupEpoch());
} }
@ -18993,7 +18993,7 @@ public class GroupMetadataManagerTest {
// The group still exists, but the member is already gone. Replaying the // The group still exists, but the member is already gone. Replaying the
// StreamsGroupCurrentMemberAssignment tombstone should be a no-op. // StreamsGroupCurrentMemberAssignment tombstone should be a no-op.
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo", 10)); context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo", 10, 0));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord("foo", "m1")); context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord("foo", "m1"));
assertThrows(UnknownMemberIdException.class, () -> context.groupMetadataManager.streamsGroup("foo").getMemberOrThrow("m1")); assertThrows(UnknownMemberIdException.class, () -> context.groupMetadataManager.streamsGroup("foo").getMemberOrThrow("m1"));
@ -19069,7 +19069,7 @@ public class GroupMetadataManagerTest {
// The group still exists, but the member is already gone. Replaying the // The group still exists, but the member is already gone. Replaying the
// StreamsGroupTopology tombstone should be a no-op. // StreamsGroupTopology tombstone should be a no-op.
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo", 10)); context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo", 10, 0));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecordTombstone("foo")); context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecordTombstone("foo"));
assertTrue(context.groupMetadataManager.streamsGroup("foo").topology().isEmpty()); assertTrue(context.groupMetadataManager.streamsGroup("foo").topology().isEmpty());
@ -19912,7 +19912,7 @@ public class GroupMetadataManagerTest {
) )
), ),
// The group epoch is bumped. // The group epoch is bumped.
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11) GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0)
)) ))
) )
), ),
@ -20028,7 +20028,7 @@ public class GroupMetadataManagerTest {
) )
), ),
// The group epoch is bumped. // The group epoch is bumped.
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11) GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0)
), ),
task.result.records() task.result.records()
); );
@ -20090,7 +20090,7 @@ public class GroupMetadataManagerTest {
// The member subscription is created. // The member subscription is created.
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1), GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1),
// The group epoch is bumped. // The group epoch is bumped.
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1), GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1, 0),
// The target assignment is created. // The target assignment is created.
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, Map.of()), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, Map.of()),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 1), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 1),
@ -20198,7 +20198,7 @@ public class GroupMetadataManagerTest {
) )
), ),
// The group epoch is bumped. // The group epoch is bumped.
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 2) GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 2, 0)
), ),
task.result.records() task.result.records()
); );
@ -20340,7 +20340,7 @@ public class GroupMetadataManagerTest {
foooTopicName, new TopicMetadata(foooTopicId, foooTopicName, 1) foooTopicName, new TopicMetadata(foooTopicId, foooTopicName, 1)
) )
)), )),
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11)) List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0))
), ),
task.result.records() task.result.records()
); );
@ -20477,7 +20477,7 @@ public class GroupMetadataManagerTest {
context.time.milliseconds() context.time.milliseconds()
) )
), ),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11) GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0)
)) ))
) )
), ),
@ -20567,7 +20567,7 @@ public class GroupMetadataManagerTest {
barTopicName, new TopicMetadata(barTopicId, barTopicName, 3) barTopicName, new TopicMetadata(barTopicId, barTopicName, 3)
) )
), ),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 12) GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 12, 0)
), ),
context.processTasks().get(0).result.records() context.processTasks().get(0).result.records()
); );
@ -20657,7 +20657,7 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
Map.of(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3)) Map.of(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3))
), ),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11) GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0)
); );
assertRecordsEquals(expectedRecords, result.records()); assertRecordsEquals(expectedRecords, result.records());
@ -20677,7 +20677,7 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId, memberId2), GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId, memberId2),
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId, "bar*"), GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId, "bar*"),
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of()), GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of()),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 12) GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 12, 0)
) )
) )
)), )),
@ -20831,7 +20831,7 @@ public class GroupMetadataManagerTest {
barTopicName, new TopicMetadata(barTopicId, barTopicName, 3) barTopicName, new TopicMetadata(barTopicId, barTopicName, 3)
))), ))),
// Bumped epoch. // Bumped epoch.
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11)) List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0))
), ),
result.records() result.records()
); );
@ -20863,7 +20863,7 @@ public class GroupMetadataManagerTest {
MetadataDelta delta = new MetadataDelta(image); MetadataDelta delta = new MetadataDelta(image);
context.groupMetadataManager.onNewMetadataImage(image, delta); context.groupMetadataManager.onNewMetadataImage(image, delta);
context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 0)); context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 0, 0));
context.replay( context.replay(
GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord( GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord(
@ -20930,7 +20930,7 @@ public class GroupMetadataManagerTest {
MetadataDelta delta = new MetadataDelta(image); MetadataDelta delta = new MetadataDelta(image);
context.groupMetadataManager.onNewMetadataImage(image, delta); context.groupMetadataManager.onNewMetadataImage(image, delta);
context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 0)); context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 0, 0));
context.replay( context.replay(
GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord( GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord(

View File

@ -108,7 +108,7 @@ public class ConsumerGroupBuilder {
} }
// Add group epoch record. // Add group epoch record.
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, groupEpoch)); records.add(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, groupEpoch, 0));
// Add target assignment records. // Add target assignment records.
assignments.forEach((memberId, assignment) -> assignments.forEach((memberId, assignment) ->

View File

@ -94,7 +94,7 @@ public class ShareGroupBuilder {
} }
// Add group epoch record. // Add group epoch record.
records.add(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, groupEpoch)); records.add(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, groupEpoch, 0));
// Add target assignment records. // Add target assignment records.
assignments.forEach((memberId, assignment) -> assignments.forEach((memberId, assignment) ->

View File

@ -300,12 +300,13 @@ class StreamsCoordinatorRecordHelpersTest {
.setGroupId(GROUP_ID), .setGroupId(GROUP_ID),
new ApiMessageAndVersion( new ApiMessageAndVersion(
new StreamsGroupMetadataValue() new StreamsGroupMetadataValue()
.setEpoch(42), .setEpoch(42)
.setMetadataHash(42),
(short) 0 (short) 0
) )
); );
assertEquals(expectedRecord, StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(GROUP_ID, 42)); assertEquals(expectedRecord, StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(GROUP_ID, 42, 42));
} }
@Test @Test
@ -740,7 +741,7 @@ class StreamsCoordinatorRecordHelpersTest {
@Test @Test
public void testNewStreamsGroupEpochRecordNullGroupId() { public void testNewStreamsGroupEpochRecordNullGroupId() {
NullPointerException exception = assertThrows(NullPointerException.class, () -> NullPointerException exception = assertThrows(NullPointerException.class, () ->
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(null, 1)); StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(null, 1, 1));
assertEquals("groupId should not be null here", exception.getMessage()); assertEquals("groupId should not be null here", exception.getMessage());
} }

View File

@ -85,7 +85,7 @@ public class StreamsGroupBuilder {
// Add group epoch record. // Add group epoch record.
records.add( records.add(
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, groupEpoch)); StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, groupEpoch, 0));
// Add target assignment records. // Add target assignment records.
targetAssignments.forEach((memberId, assignment) -> targetAssignments.forEach((memberId, assignment) ->

View File

@ -60,7 +60,8 @@ public class ShareGroupMessageFormatterTest extends CoordinatorRecordMessageForm
private static final ShareGroupMetadataKey SHARE_GROUP_METADATA_KEY = new ShareGroupMetadataKey() private static final ShareGroupMetadataKey SHARE_GROUP_METADATA_KEY = new ShareGroupMetadataKey()
.setGroupId("group-id"); .setGroupId("group-id");
private static final ShareGroupMetadataValue SHARE_GROUP_METADATA_VALUE = new ShareGroupMetadataValue() private static final ShareGroupMetadataValue SHARE_GROUP_METADATA_VALUE = new ShareGroupMetadataValue()
.setEpoch(1); .setEpoch(1)
.setMetadataHash(1);
private static final ShareGroupTargetAssignmentMetadataKey SHARE_GROUP_TARGET_ASSIGNMENT_METADATA_KEY = new ShareGroupTargetAssignmentMetadataKey() private static final ShareGroupTargetAssignmentMetadataKey SHARE_GROUP_TARGET_ASSIGNMENT_METADATA_KEY = new ShareGroupTargetAssignmentMetadataKey()
.setGroupId("group-id"); .setGroupId("group-id");
private static final ShareGroupTargetAssignmentMetadataValue SHARE_GROUP_TARGET_ASSIGNMENT_METADATA_VALUE = new ShareGroupTargetAssignmentMetadataValue() private static final ShareGroupTargetAssignmentMetadataValue SHARE_GROUP_TARGET_ASSIGNMENT_METADATA_VALUE = new ShareGroupTargetAssignmentMetadataValue()
@ -154,7 +155,8 @@ public class ShareGroupMessageFormatterTest extends CoordinatorRecordMessageForm
""" """
{"key":{"type":11,"data":{"groupId":"group-id"}}, {"key":{"type":11,"data":{"groupId":"group-id"}},
"value":{"version":0, "value":{"version":0,
"data":{"epoch":1}}} "data":{"epoch":1,
"metadataHash":1}}}
""" """
), ),
Arguments.of( Arguments.of(