This commit is contained in:
Lucas Brutschy 2025-10-07 10:30:34 -07:00 committed by GitHub
commit c986d7b7d3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 280 additions and 78 deletions

View File

@ -239,7 +239,8 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
def createStreamsGroup[K, V](configOverrides: Properties = new Properties,
configsToRemove: List[String] = List(),
inputTopic: String,
inputTopics: Set[String],
changelogTopics: Set[String] = Set(),
streamsGroupId: String): AsyncKafkaConsumer[K, V] = {
val props = new Properties()
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
@ -255,10 +256,10 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
Optional.empty(),
util.Map.of(
"subtopology-0", new StreamsRebalanceData.Subtopology(
util.Set.of(inputTopic),
inputTopics.asJava,
util.Set.of(),
util.Map.of(),
util.Map.of(inputTopic + "-store-changelog", new StreamsRebalanceData.TopicInfo(Optional.of(1), Optional.empty(), util.Map.of())),
changelogTopics.map(c => (c, new StreamsRebalanceData.TopicInfo(Optional.empty(), Optional.empty(), util.Map.of()))).toMap.asJava,
util.Set.of()
)),
Map.empty[String, String].asJava
@ -270,7 +271,7 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
configOverrides = props,
streamsRebalanceData = streamsRebalanceData
)
consumer.subscribe(util.Set.of(inputTopic),
consumer.subscribe(inputTopics.asJava,
new StreamsRebalanceListener {
override def onTasksRevoked(tasks: util.Set[StreamsRebalanceData.TaskId]): Unit = ()
override def onTasksAssigned(assignment: StreamsRebalanceData.Assignment): Unit = ()

View File

@ -2318,7 +2318,6 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
}
/**
* Test the consumer group APIs for member removal.
*/
@ -2587,7 +2586,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
val shareGroup = createShareConsumer(configOverrides = shareGroupConfig)
val streamsGroup = createStreamsGroup(
inputTopic = testTopicName,
inputTopics = Set(testTopicName),
changelogTopics = Set(testTopicName + "-changelog"),
streamsGroupId = streamsGroupId
)
@ -4412,7 +4412,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
prepareRecords(testTopicName)
val streams = createStreamsGroup(
inputTopic = testTopicName,
inputTopics = Set(testTopicName),
changelogTopics = Set(testTopicName + "-changelog"),
streamsGroupId = streamsGroupId
)
streams.poll(JDuration.ofMillis(500L))
@ -4422,7 +4423,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
val firstGroup = client.listGroups().all().get().stream()
.filter(g => g.groupId() == streamsGroupId).findFirst().orElse(null)
firstGroup.groupState().orElse(null) == GroupState.STABLE && firstGroup.groupId() == streamsGroupId
}, "Stream group not stable yet")
}, "Streams group did not transition to STABLE before timeout")
// Verify the describe call works correctly
val describedGroups = client.describeStreamsGroups(util.List.of(streamsGroupId)).all().get()
@ -4458,7 +4459,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
client = Admin.create(config)
val streams = createStreamsGroup(
inputTopic = testTopicName,
inputTopics = Set(testTopicName),
changelogTopics = Set(testTopicName + "-changelog"),
streamsGroupId = streamsGroupId
)
streams.poll(JDuration.ofMillis(500L))
@ -4468,7 +4470,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
val firstGroup = client.listGroups().all().get().stream()
.filter(g => g.groupId() == streamsGroupId).findFirst().orElse(null)
firstGroup.groupState().orElse(null) == GroupState.NOT_READY && firstGroup.groupId() == streamsGroupId
}, "Stream group not NOT_READY yet")
}, "Streams group did not transition to NOT_READY before timeout")
// Verify the describe call works correctly
val describedGroups = client.describeStreamsGroups(util.List.of(streamsGroupId)).all().get()
@ -4490,6 +4492,55 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
}
@Test
def testDescribeStreamsGroupsForStatelessTopology(): Unit = {
val streamsGroupId = "stream_group_id"
val testTopicName = "test_topic"
val testNumPartitions = 1
val config = createConfig
client = Admin.create(config)
prepareTopics(List(testTopicName), testNumPartitions)
prepareRecords(testTopicName)
val streams = createStreamsGroup(
inputTopics = Set(testTopicName),
streamsGroupId = streamsGroupId
)
streams.poll(JDuration.ofMillis(500L))
try {
TestUtils.waitUntilTrue(() => {
val firstGroup = client.listGroups().all().get().stream().findFirst().orElse(null)
firstGroup.groupState().orElse(null) == GroupState.STABLE && firstGroup.groupId() == streamsGroupId
}, "Streams group did not transition to STABLE before timeout")
// Verify the describe call works correctly
val describedGroups = client.describeStreamsGroups(util.List.of(streamsGroupId)).all().get()
val group = describedGroups.get(streamsGroupId)
assertNotNull(group)
assertEquals(streamsGroupId, group.groupId())
assertFalse(group.members().isEmpty)
assertNotNull(group.subtopologies())
assertFalse(group.subtopologies().isEmpty)
// Verify the topology contains the expected source and sink topics
val subtopologies = group.subtopologies().asScala
assertTrue(subtopologies.exists(subtopology =>
subtopology.sourceTopics().contains(testTopicName)))
// Test describing a non-existing group
val nonExistingGroup = "non_existing_stream_group"
val describedNonExistingGroupResponse = client.describeStreamsGroups(util.List.of(nonExistingGroup))
assertFutureThrows(classOf[GroupIdNotFoundException], describedNonExistingGroupResponse.all())
} finally {
Utils.closeQuietly(streams, "streams")
Utils.closeQuietly(client, "adminClient")
}
}
@Test
def testDeleteStreamsGroups(): Unit = {
val testTopicName = "test_topic"
@ -4512,7 +4563,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
val streamsGroupId = s"stream_group_id_$i"
val streams = createStreamsGroup(
inputTopic = testTopicName,
inputTopics = Set(testTopicName),
changelogTopics = Set(testTopicName + "-changelog"),
streamsGroupId = streamsGroupId,
)
streams.poll(JDuration.ofMillis(500L))
@ -4595,7 +4647,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
val streams = createStreamsGroup(
inputTopic = testTopicName,
inputTopics = Set(testTopicName),
changelogTopics = Set(testTopicName + "-changelog"),
streamsGroupId = streamsGroupId,
)
@ -4611,7 +4664,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
TestUtils.waitUntilTrue(() => {
val firstGroup = client.listGroups().all().get().stream().findFirst().orElse(null)
firstGroup.groupState().orElse(null) == GroupState.STABLE && firstGroup.groupId() == streamsGroupId
}, "Stream group not stable yet")
}, "Streams group did not transition to STABLE before timeout")
val allTopicPartitions = client.listStreamsGroupOffsets(
util.Map.of(streamsGroupId, new ListStreamsGroupOffsetsSpec())
@ -4655,7 +4708,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
val streams = createStreamsGroup(
inputTopic = testTopicName,
inputTopics = Set(testTopicName),
changelogTopics = Set(testTopicName + "-changelog"),
streamsGroupId = streamsGroupId,
)
@ -4732,7 +4786,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
val streams = createStreamsGroup(
inputTopic = testTopicName,
inputTopics = Set(testTopicName),
changelogTopics = Set(testTopicName + "-changelog"),
streamsGroupId = streamsGroupId,
)

View File

@ -250,9 +250,9 @@ import static org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMe
import static org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.convertToStreamsGroupTopologyRecord;
import static org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord;
import static org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord;
import static org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord;
import static org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord;
import static org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord;
import static org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord;
import static org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord;
import static org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord;
import static org.apache.kafka.coordinator.group.streams.StreamsGroupMember.hasAssignedTasksChanged;
@ -1960,19 +1960,26 @@ public class GroupMetadataManager {
updatedConfiguredTopology = group.configuredTopology().get();
}
// 3b. If the topology is validated, persist the fact that it is validated.
int validatedTopologyEpoch = -1;
if (updatedConfiguredTopology.isReady()) {
validatedTopologyEpoch = updatedTopology.topologyEpoch();
SortedMap<String, ConfiguredSubtopology> subtopologySortedMap = updatedConfiguredTopology.subtopologies().get();
throwIfRequestContainsInvalidTasks(subtopologySortedMap, ownedActiveTasks);
throwIfRequestContainsInvalidTasks(subtopologySortedMap, ownedStandbyTasks);
throwIfRequestContainsInvalidTasks(subtopologySortedMap, ownedWarmupTasks);
}
// We validated a topology that was not validated before, so bump the group epoch as we may have to reassign tasks.
if (validatedTopologyEpoch != group.validatedTopologyEpoch()) {
bumpGroupEpoch = true;
}
// Actually bump the group epoch
int groupEpoch = group.groupEpoch();
if (bumpGroupEpoch) {
groupEpoch += 1;
records.add(newStreamsGroupEpochRecord(groupId, groupEpoch, metadataHash));
log.info("[GroupId {}][MemberId {}] Bumped streams group epoch to {} with metadata hash {}.", groupId, memberId, groupEpoch, metadataHash);
records.add(newStreamsGroupMetadataRecord(groupId, groupEpoch, metadataHash, validatedTopologyEpoch));
log.info("[GroupId {}][MemberId {}] Bumped streams group epoch to {} with metadata hash {} and validated topic epoch {}.", groupId, memberId, groupEpoch, metadataHash, validatedTopologyEpoch);
metrics.record(STREAMS_GROUP_REBALANCES_SENSOR_NAME);
group.setMetadataRefreshDeadline(currentTimeMs + METADATA_REFRESH_INTERVAL_MS, groupEpoch);
}
@ -4291,7 +4298,7 @@ public class GroupMetadataManager {
// We bump the group epoch.
int groupEpoch = group.groupEpoch() + 1;
records.add(newStreamsGroupEpochRecord(group.groupId(), groupEpoch, 0));
records.add(newStreamsGroupMetadataRecord(group.groupId(), groupEpoch, group.metadataHash(), group.validatedTopologyEpoch()));
cancelTimers(group.groupId(), member.memberId());
@ -5411,6 +5418,7 @@ public class GroupMetadataManager {
StreamsGroup streamsGroup = getOrMaybeCreatePersistedStreamsGroup(groupId, true);
streamsGroup.setGroupEpoch(value.epoch());
streamsGroup.setMetadataHash(value.metadataHash());
streamsGroup.setValidatedTopologyEpoch(value.validatedTopologyEpoch());
} else {
StreamsGroup streamsGroup;
try {

View File

@ -96,10 +96,11 @@ public class StreamsCoordinatorRecordHelpers {
);
}
public static CoordinatorRecord newStreamsGroupEpochRecord(
public static CoordinatorRecord newStreamsGroupMetadataRecord(
String groupId,
int newGroupEpoch,
long metadataHash
long metadataHash,
int validatedTopologyEpoch
) {
Objects.requireNonNull(groupId, "groupId should not be null here");
@ -109,7 +110,8 @@ public class StreamsCoordinatorRecordHelpers {
new ApiMessageAndVersion(
new StreamsGroupMetadataValue()
.setEpoch(newGroupEpoch)
.setMetadataHash(metadataHash),
.setMetadataHash(metadataHash)
.setValidatedTopologyEpoch(validatedTopologyEpoch),
(short) 0
)
);

View File

@ -147,9 +147,9 @@ public class StreamsGroup implements Group {
private final TimelineHashMap<String, String> staticMembers;
/**
* The metadata associated with each subscribed topic name.
* The topology epoch for which the subscribed topics identified by metadataHash are validated.
*/
private final TimelineHashMap<String, TopicMetadata> partitionMetadata;
private final TimelineInteger validatedTopologyEpoch;
/**
* The metadata hash which is computed based on the all subscribed topics.
@ -222,7 +222,7 @@ public class StreamsGroup implements Group {
this.groupEpoch = new TimelineInteger(snapshotRegistry);
this.members = new TimelineHashMap<>(snapshotRegistry, 0);
this.staticMembers = new TimelineHashMap<>(snapshotRegistry, 0);
this.partitionMetadata = new TimelineHashMap<>(snapshotRegistry, 0);
this.validatedTopologyEpoch = new TimelineInteger(snapshotRegistry);
this.metadataHash = new TimelineLong(snapshotRegistry);
this.targetAssignmentEpoch = new TimelineInteger(snapshotRegistry);
this.targetAssignment = new TimelineHashMap<>(snapshotRegistry, 0);
@ -282,7 +282,6 @@ public class StreamsGroup implements Group {
public void setConfiguredTopology(ConfiguredTopology configuredTopology) {
this.configuredTopology.set(Optional.ofNullable(configuredTopology));
maybeUpdateGroupState();
}
/**
@ -598,6 +597,23 @@ public class StreamsGroup implements Group {
this.metadataHash.set(metadataHash);
}
/**
* @return The validated topology epoch.
*/
public int validatedTopologyEpoch() {
return validatedTopologyEpoch.get();
}
/**
* Updates the validated topology epoch.
*
* @param validatedTopologyEpoch The validated topology epoch
*/
public void setValidatedTopologyEpoch(int validatedTopologyEpoch) {
this.validatedTopologyEpoch.set(validatedTopologyEpoch);
maybeUpdateGroupState();
}
/**
* Computes the metadata hash based on the current topology and the current metadata image.
*
@ -835,7 +851,7 @@ public class StreamsGroup implements Group {
if (members.isEmpty()) {
newState = EMPTY;
clearShutdownRequestMemberId();
} else if (topology().isEmpty() || configuredTopology().isEmpty() || !configuredTopology().get().isReady()) {
} else if (topology().filter(t -> t.topologyEpoch() == validatedTopologyEpoch.get()).isEmpty()) {
newState = NOT_READY;
} else if (groupEpoch.get() > targetAssignmentEpoch.get()) {
newState = ASSIGNING;

View File

@ -24,6 +24,8 @@
{ "name": "Epoch", "versions": "0+", "type": "int32",
"about": "The group epoch." },
{ "name": "MetadataHash", "versions": "0+", "type": "int64",
"about": "The hash of all topics in the group." }
"about": "The hash of all topics in the group." },
{ "name": "ValidatedTopologyEpoch", "versions": "0+", "taggedVersions": "0+", "tag": 0, "default": -1, "type": "int32",
"about": "The topology epoch whose topics where validated to be present in a valid configuration in the metadata." }
]
}

View File

@ -139,7 +139,6 @@ import org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.TaskRol
import org.apache.kafka.coordinator.group.streams.TasksTuple;
import org.apache.kafka.coordinator.group.streams.assignor.TaskAssignor;
import org.apache.kafka.coordinator.group.streams.assignor.TaskAssignorException;
import org.apache.kafka.coordinator.group.streams.topics.InternalTopicManager;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
@ -4434,23 +4433,27 @@ public class GroupMetadataManagerTest {
.withStreamsGroup(new StreamsGroupBuilder(groupIds.get(1), 10) // Stable group
.withTargetAssignmentEpoch(10)
.withTopology(new StreamsTopology(1, Map.of()))
.withValidatedTopologyEpoch(1)
.withMember(streamsGroupMemberBuilderWithDefaults(streamsMemberIds.get(0))
.setMemberEpoch(10)
.build()))
.withStreamsGroup(new StreamsGroupBuilder(groupIds.get(2), 10) // Assigning group
.withTargetAssignmentEpoch(9)
.withTopology(new StreamsTopology(1, Map.of()))
.withValidatedTopologyEpoch(1)
.withMember(streamsGroupMemberBuilderWithDefaults(streamsMemberIds.get(1))
.setMemberEpoch(9)
.build()))
.withStreamsGroup(new StreamsGroupBuilder(groupIds.get(3), 10) // Reconciling group
.withTargetAssignmentEpoch(10)
.withTopology(new StreamsTopology(1, Map.of()))
.withValidatedTopologyEpoch(1)
.withMember(streamsGroupMemberBuilderWithDefaults(streamsMemberIds.get(2))
.setMemberEpoch(9)
.build()))
.withStreamsGroup(new StreamsGroupBuilder(groupIds.get(4), 10) // NotReady group
.withTargetAssignmentEpoch(10)
.withTopology(new StreamsTopology(1, Map.of()))
.withMember(streamsGroupMemberBuilderWithDefaults(streamsMemberIds.get(3))
.build()))
.build();
@ -9736,7 +9739,7 @@ public class GroupMetadataManagerTest {
StreamsGroupMember.Builder memberBuilder1 = streamsGroupMemberBuilderWithDefaults(memberId1);
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(streamsGroupId, memberBuilder1.build()));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(streamsGroupId, memberBuilder1.build()));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(streamsGroupId, epoch + 1, 0));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(streamsGroupId, epoch + 1, 0, -1));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(streamsGroupId, topology));
TasksTuple assignment = new TasksTuple(
@ -9749,7 +9752,7 @@ public class GroupMetadataManagerTest {
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(streamsGroupId, memberBuilder2.build()));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(streamsGroupId, memberId2, assignment));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(streamsGroupId, memberBuilder2.build()));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(streamsGroupId, epoch + 2, 0));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(streamsGroupId, epoch + 2, 0, 0));
List<StreamsGroupDescribeResponseData.DescribedGroup> actual = context.groupMetadataManager.streamsGroupDescribe(List.of(streamsGroupId), context.lastCommittedOffset);
StreamsGroupDescribeResponseData.DescribedGroup describedGroup = new StreamsGroupDescribeResponseData.DescribedGroup()
@ -9780,7 +9783,7 @@ public class GroupMetadataManagerTest {
)
)
)
.setGroupState(StreamsGroup.StreamsGroupState.NOT_READY.toString())
.setGroupState(StreamsGroup.StreamsGroupState.ASSIGNING.toString())
.setGroupEpoch(epoch + 2);
assertEquals(1, actual.size());
assertEquals(describedGroup, actual.get(0));
@ -16035,7 +16038,7 @@ public class GroupMetadataManagerTest {
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, member));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 100, 0));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 100, 0, 0));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId, topology));
@ -16283,7 +16286,7 @@ public class GroupMetadataManagerTest {
List<CoordinatorRecord> expectedRecords = List.of(
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, expectedMember),
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId, topology),
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1, groupMetadataHash),
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 1, groupMetadataHash, 0),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5),
@ -16296,6 +16299,98 @@ public class GroupMetadataManagerTest {
assertRecordsEquals(expectedRecords, result.records());
}
@Test
public void testJoinEmptyStreamsGroupAndDescribe() {
String groupId = "fooup";
String memberId = Uuid.randomUuid().toString();
String subtopology1 = "subtopology1";
String fooTopicName = "foo";
Uuid fooTopicId = Uuid.randomUuid();
Topology topology = new Topology().setSubtopologies(List.of(
new Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
));
MockTaskAssignor assignor = new MockTaskAssignor("sticky");
CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.buildCoordinatorMetadataImage();
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withStreamsGroupTaskAssignors(List.of(assignor))
.withMetadataImage(metadataImage)
.build();
assignor.prepareGroupAssignment(Map.of(memberId, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5)
)));
assertThrows(GroupIdNotFoundException.class, () ->
context.groupMetadataManager.streamsGroup(groupId));
CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> result = context.streamsGroupHeartbeat(
new StreamsGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId)
.setMemberEpoch(0)
.setProcessId("process-id")
.setRebalanceTimeoutMs(1500)
.setTopology(topology)
.setActiveTasks(List.of())
.setStandbyTasks(List.of())
.setWarmupTasks(List.of()));
assertResponseEquals(
new StreamsGroupHeartbeatResponseData()
.setMemberId(memberId)
.setMemberEpoch(1)
.setHeartbeatIntervalMs(5000)
.setActiveTasks(List.of(
new StreamsGroupHeartbeatResponseData.TaskIds()
.setSubtopologyId(subtopology1)
.setPartitions(List.of(0, 1, 2, 3, 4, 5))
))
.setStandbyTasks(List.of())
.setWarmupTasks(List.of()),
result.response().data()
);
StreamsGroupMember expectedMember = streamsGroupMemberBuilderWithDefaults(memberId)
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
.setMemberEpoch(1)
.setPreviousMemberEpoch(0)
.setClientId(DEFAULT_CLIENT_ID)
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
.setRebalanceTimeoutMs(1500)
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5)))
.build();
// Commit the offset and test again
context.commit();
List<StreamsGroupDescribeResponseData.DescribedGroup> actualDescribedGroups = context.groupMetadataManager.streamsGroupDescribe(List.of(groupId), context.lastCommittedOffset);
StreamsGroupDescribeResponseData.DescribedGroup expectedDescribedGroup = new StreamsGroupDescribeResponseData.DescribedGroup()
.setGroupId(groupId)
.setAssignmentEpoch(1)
.setTopology(
new StreamsGroupDescribeResponseData.Topology()
.setEpoch(0)
.setSubtopologies(List.of(
new StreamsGroupDescribeResponseData.Subtopology()
.setSubtopologyId(subtopology1)
.setSourceTopics(List.of(fooTopicName))
))
)
.setMembers(Collections.singletonList(
expectedMember.asStreamsGroupDescribeMember(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5)))
))
.setGroupState(StreamsGroupState.STABLE.toString())
.setGroupEpoch(1);
assertEquals(1, actualDescribedGroups.size());
assertEquals(expectedDescribedGroup, actualDescribedGroups.get(0));
}
@Test
public void testStreamsGroupMemberJoiningWithMissingSourceTopic() {
String groupId = "fooup";
@ -16364,9 +16459,9 @@ public class GroupMetadataManagerTest {
List<CoordinatorRecord> expectedRecords = List.of(
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, expectedMember),
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId, topology),
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1, computeGroupHash(Map.of(
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 1, computeGroupHash(Map.of(
fooTopicName, computeTopicHash(fooTopicName, metadataImage)
))),
)), -1),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId, TasksTuple.EMPTY),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, 1),
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, expectedMember)
@ -16450,14 +16545,15 @@ public class GroupMetadataManagerTest {
List<CoordinatorRecord> expectedRecords = List.of(
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, expectedMember),
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId, topology),
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1, computeGroupHash(Map.of(
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 1, computeGroupHash(Map.of(
fooTopicName, computeTopicHash(fooTopicName, metadataImage)
))),
)), -1),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId, TasksTuple.EMPTY),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, 1),
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, expectedMember)
);
assertEquals(StreamsGroupState.NOT_READY, context.streamsGroupState(groupId));
assertRecordsEquals(expectedRecords, result.records());
}
@ -16532,15 +16628,16 @@ public class GroupMetadataManagerTest {
List<CoordinatorRecord> expectedRecords = List.of(
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, expectedMember),
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId, topology),
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1, computeGroupHash(Map.of(
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 1, computeGroupHash(Map.of(
fooTopicName, computeTopicHash(fooTopicName, metadataImage),
barTopicName, computeTopicHash(barTopicName, metadataImage)
))),
)), -1),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId, TasksTuple.EMPTY),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, 1),
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, expectedMember)
);
assertEquals(StreamsGroupState.NOT_READY, context.streamsGroupState(groupId));
assertRecordsEquals(expectedRecords, result.records());
}
@ -16577,6 +16674,7 @@ public class GroupMetadataManagerTest {
.withStreamsGroup(
new StreamsGroupBuilder(groupId, 10)
.withTopology(StreamsTopology.fromHeartbeatRequest(topology1))
.withValidatedTopologyEpoch(1)
)
.build();
@ -16627,10 +16725,10 @@ public class GroupMetadataManagerTest {
List<CoordinatorRecord> expectedRecords = List.of(
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, expectedMember),
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, computeGroupHash(Map.of(
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 11, computeGroupHash(Map.of(
fooTopicName, computeTopicHash(fooTopicName, metadataImage),
barTopicName, computeTopicHash(barTopicName, metadataImage)
))),
)), 1),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId, TasksTuple.EMPTY),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, 11),
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, expectedMember)
@ -16682,6 +16780,7 @@ public class GroupMetadataManagerTest {
.withTargetAssignmentEpoch(10)
.withTopology(StreamsTopology.fromHeartbeatRequest(topology))
.withMetadataHash(groupMetadataHash)
.withValidatedTopologyEpoch(0)
)
.build();
@ -16751,6 +16850,8 @@ public class GroupMetadataManagerTest {
.buildCoordinatorMetadataImage();
MockTaskAssignor assignor = new MockTaskAssignor("sticky");
long metadataHash = computeGroupHash(Map.of(fooTopicName, computeTopicHash(fooTopicName, metadataImage)));
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withStreamsGroupTaskAssignors(List.of(assignor))
.withMetadataImage(metadataImage)
@ -16767,7 +16868,8 @@ public class GroupMetadataManagerTest {
.build())
.withTargetAssignmentEpoch(10)
.withTopology(StreamsTopology.fromHeartbeatRequest(topology))
.withMetadataHash(computeGroupHash(Map.of(fooTopicName, computeTopicHash(fooTopicName, metadataImage))))
.withValidatedTopologyEpoch(0)
.withMetadataHash(metadataHash)
)
.build();
@ -16792,7 +16894,7 @@ public class GroupMetadataManagerTest {
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord(groupId, memberId1),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord(groupId, memberId1),
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord(groupId, memberId1),
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, 0)
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 11, metadataHash, 0)
),
result1.records()
);
@ -16801,7 +16903,7 @@ public class GroupMetadataManagerTest {
context.replay(record);
}
assignor.prepareGroupAssignment(
Map.of(memberId1, TasksTuple.EMPTY)
Map.of(memberId2, TasksTuple.EMPTY)
);
CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> result2 = context.streamsGroupHeartbeat(
@ -16814,7 +16916,7 @@ public class GroupMetadataManagerTest {
assertResponseEquals(
new StreamsGroupHeartbeatResponseData()
.setMemberId(memberId2)
.setMemberEpoch(12)
.setMemberEpoch(11)
.setHeartbeatIntervalMs(5000)
.setStatus(List.of(
new StreamsGroupHeartbeatResponseData.Status()
@ -16916,7 +17018,7 @@ public class GroupMetadataManagerTest {
List<CoordinatorRecord> expectedRecords = List.of(
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, expectedMember),
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, groupMetadataHash),
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 11, groupMetadataHash, 0),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5),
@ -17022,10 +17124,10 @@ public class GroupMetadataManagerTest {
.build();
List<CoordinatorRecord> expectedRecords = List.of(
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, computeGroupHash(Map.of(
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 11, computeGroupHash(Map.of(
fooTopicName, computeTopicHash(fooTopicName, newMetadataImage),
barTopicName, computeTopicHash(barTopicName, newMetadataImage)
))),
)), 0),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5),
@ -17206,7 +17308,7 @@ public class GroupMetadataManagerTest {
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord(groupId, memberId2),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord(groupId, memberId2),
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord(groupId, memberId2),
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, 0)
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 11, 0, -1)
);
assertRecordsEquals(expectedRecords, result.records());
@ -17334,6 +17436,7 @@ public class GroupMetadataManagerTest {
TaskAssignmentTestUtil.mkTasks(subtopology2, 2)))
.withTargetAssignmentEpoch(10)
.withMetadataHash(groupMetadataHash)
.withValidatedTopologyEpoch(0)
)
.build();
@ -17776,16 +17879,12 @@ public class GroupMetadataManagerTest {
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId, topology));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, streamsGroupMemberBuilderWithDefaults(memberId1)
.build()));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, groupMetadataHash));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 11, groupMetadataHash, -1));
assertEquals(StreamsGroupState.NOT_READY, context.streamsGroupState(groupId));
context.groupMetadataManager.getStreamsGroupOrThrow(groupId)
.setConfiguredTopology(InternalTopicManager.configureTopics(
new LogContext(),
groupMetadataHash,
StreamsTopology.fromRecord(StreamsCoordinatorRecordHelpers.convertToStreamsGroupTopologyRecord(topology)),
metadataImage));
.setValidatedTopologyEpoch(0);
assertEquals(StreamsGroup.StreamsGroupState.ASSIGNING, context.streamsGroupState(groupId));
@ -17942,9 +18041,9 @@ public class GroupMetadataManagerTest {
.build();
List<CoordinatorRecord> expectedRecords = List.of(
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, computeGroupHash(Map.of(
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 11, computeGroupHash(Map.of(
fooTopicName, computeTopicHash(fooTopicName, metadataImage)
))),
)), 0),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5)
@ -18064,9 +18163,9 @@ public class GroupMetadataManagerTest {
.build();
List<CoordinatorRecord> expectedRecords = List.of(
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, computeGroupHash(Map.of(
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 11, computeGroupHash(Map.of(
fooTopicName, computeTopicHash(fooTopicName, metadataImage)
))),
)), 0),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5)
@ -18169,13 +18268,17 @@ public class GroupMetadataManagerTest {
Topology topology = new Topology().setSubtopologies(List.of(
new Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
));
CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.buildCoordinatorMetadataImage();
long groupMetadataHash = computeGroupHash(Map.of(
fooTopicName, computeTopicHash(fooTopicName, metadataImage)
));
MockTaskAssignor assignor = new MockTaskAssignor("sticky");
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withStreamsGroupTaskAssignors(List.of(assignor))
.withMetadataImage(new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.buildCoordinatorMetadataImage())
.withMetadataImage(metadataImage)
.build();
assignor.prepareGroupAssignment(Map.of(memberId, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
@ -18211,7 +18314,7 @@ public class GroupMetadataManagerTest {
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord(groupId, memberId),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord(groupId, memberId),
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord(groupId, memberId),
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 2, 0)
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 2, groupMetadataHash, 0)
)
)
)),
@ -18393,13 +18496,17 @@ public class GroupMetadataManagerTest {
Topology topology = new Topology().setSubtopologies(List.of(
new Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
));
CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.buildCoordinatorMetadataImage();
long groupMetadataHash = computeGroupHash(Map.of(
fooTopicName, computeTopicHash(fooTopicName, metadataImage)
));
MockTaskAssignor assignor = new MockTaskAssignor("sticky");
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withStreamsGroupTaskAssignors(List.of(assignor))
.withMetadataImage(new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 3)
.buildCoordinatorMetadataImage())
.withMetadataImage(metadataImage)
.build();
assignor.prepareGroupAssignment(
@ -18511,7 +18618,7 @@ public class GroupMetadataManagerTest {
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord(groupId, memberId1),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord(groupId, memberId1),
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord(groupId, memberId1),
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 3, 0)
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 3, groupMetadataHash, 0)
)
)
)),
@ -18799,7 +18906,7 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newGroupMetadataTombstoneRecord(classicGroupId),
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(classicGroupId, expectedMember),
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(classicGroupId, topology),
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(classicGroupId, 1, 0),
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(classicGroupId, 1, 0, -1),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(classicGroupId, memberId, TasksTuple.EMPTY),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(classicGroupId, 1),
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(classicGroupId, expectedMember)
@ -19383,7 +19490,7 @@ public class GroupMetadataManagerTest {
// The group still exists but the member is already gone. Replaying the
// StreamsGroupMemberMetadata tombstone should be a no-op.
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo", 10, 0));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord("foo", 10, 0, 0));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord("foo", "m1"));
assertThrows(UnknownMemberIdException.class, () -> context.groupMetadataManager.streamsGroup("foo").getMemberOrThrow("m1"));
@ -19439,7 +19546,7 @@ public class GroupMetadataManagerTest {
.build();
// The group is created if it does not exist.
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo", 10, 0));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord("foo", 10, 0, 0));
assertEquals(10, context.groupMetadataManager.streamsGroup("foo").groupEpoch());
}
@ -19631,7 +19738,7 @@ public class GroupMetadataManagerTest {
// The group still exists, but the member is already gone. Replaying the
// StreamsGroupCurrentMemberAssignment tombstone should be a no-op.
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo", 10, 0));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord("foo", 10, 0, 0));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord("foo", "m1"));
assertThrows(UnknownMemberIdException.class, () -> context.groupMetadataManager.streamsGroup("foo").getMemberOrThrow("m1"));
@ -19707,7 +19814,7 @@ public class GroupMetadataManagerTest {
// The group still exists, but the member is already gone. Replaying the
// StreamsGroupTopology tombstone should be a no-op.
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo", 10, 0));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord("foo", 10, 0, 0));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecordTombstone("foo"));
assertTrue(context.groupMetadataManager.streamsGroup("foo").topology().isEmpty());

View File

@ -250,19 +250,20 @@ class StreamsCoordinatorRecordHelpersTest {
}
@Test
public void testNewStreamsGroupEpochRecord() {
public void testNewStreamsGroupMetadataRecord() {
CoordinatorRecord expectedRecord = CoordinatorRecord.record(
new StreamsGroupMetadataKey()
.setGroupId(GROUP_ID),
new ApiMessageAndVersion(
new StreamsGroupMetadataValue()
.setEpoch(42)
.setMetadataHash(42),
.setMetadataHash(43)
.setValidatedTopologyEpoch(44),
(short) 0
)
);
assertEquals(expectedRecord, StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(GROUP_ID, 42, 42));
assertEquals(expectedRecord, StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(GROUP_ID, 42, 43, 44));
}
@Test
@ -676,7 +677,7 @@ class StreamsCoordinatorRecordHelpersTest {
@Test
public void testNewStreamsGroupEpochRecordNullGroupId() {
NullPointerException exception = assertThrows(NullPointerException.class, () ->
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(null, 1, 1));
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(null, 1, 1, 1));
assertEquals("groupId should not be null here", exception.getMessage());
}

View File

@ -35,6 +35,7 @@ public class StreamsGroupBuilder {
private final Map<String, StreamsGroupMember> members = new HashMap<>();
private final Map<String, TasksTuple> targetAssignments = new HashMap<>();
private long metadataHash = 0L;
private int validatedTopologyEpoch = -1;
public StreamsGroupBuilder(String groupId, int groupEpoch) {
this.groupId = groupId;
@ -53,6 +54,11 @@ public class StreamsGroupBuilder {
return this;
}
public StreamsGroupBuilder withValidatedTopologyEpoch(int validatedTopologyEpoch) {
this.validatedTopologyEpoch = validatedTopologyEpoch;
return this;
}
public StreamsGroupBuilder withTopology(StreamsTopology streamsTopology) {
this.topology = streamsTopology;
return this;
@ -79,7 +85,7 @@ public class StreamsGroupBuilder {
// Add group epoch record.
records.add(
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, groupEpoch, metadataHash));
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, groupEpoch, metadataHash, validatedTopologyEpoch));
// Add target assignment records.
targetAssignments.forEach((memberId, assignment) ->

View File

@ -498,6 +498,7 @@ public class StreamsGroupTest {
streamsGroup.setTopology(new StreamsTopology(1, Map.of()));
streamsGroup.setConfiguredTopology(new ConfiguredTopology(1, 0, Optional.of(new TreeMap<>()), Map.of(), Optional.empty()));
streamsGroup.setValidatedTopologyEpoch(1);
assertEquals(MemberState.STABLE, member1.state());
assertEquals(StreamsGroup.StreamsGroupState.ASSIGNING, streamsGroup.state());
@ -694,6 +695,7 @@ public class StreamsGroupTest {
);
group.setGroupEpoch(1);
group.setTopology(new StreamsTopology(1, Map.of()));
group.setValidatedTopologyEpoch(1);
group.setConfiguredTopology(new ConfiguredTopology(1, 0, Optional.of(new TreeMap<>()), Map.of(), Optional.empty()));
group.setTargetAssignmentEpoch(1);
group.updateMember(new StreamsGroupMember.Builder("member1")
@ -760,6 +762,7 @@ public class StreamsGroupTest {
streamsGroup.setTopology(new StreamsTopology(1, Map.of()));
streamsGroup.setConfiguredTopology(new ConfiguredTopology(1, 0, Optional.of(new TreeMap<>()), Map.of(), Optional.empty()));
streamsGroup.setValidatedTopologyEpoch(1);
assertEquals(StreamsGroup.StreamsGroupState.RECONCILING, streamsGroup.state());
assertThrows(GroupNotEmptyException.class, streamsGroup::validateDeleteGroup);
@ -805,6 +808,7 @@ public class StreamsGroupTest {
group.setGroupEpoch(1);
group.setTopology(new StreamsTopology(1, Map.of()));
group.setConfiguredTopology(new ConfiguredTopology(1, 0, Optional.of(new TreeMap<>()), Map.of(), Optional.empty()));
group.setValidatedTopologyEpoch(1);
group.setTargetAssignmentEpoch(1);
group.updateMember(new StreamsGroupMember.Builder("member1")
.setMemberEpoch(1)