KAFKA-18613: Add StreamsGroupHeartbeat handler in the group coordinator (#19114)

Basic streams group heartbeat handling. The main part of are the unit
tests that make sure that we behave, for the most part, like a consumer
group.

- No support for static membership
- No support for configurations (using constants instead)
- No support for regular expressions

Reviewers: Bill Bejeck <bill@confluent.io>, Bruno Cadonna
<cadonna@apache.org>
This commit is contained in:
Lucas Brutschy 2025-03-31 13:21:06 +02:00 committed by GitHub
parent 4144290335
commit 6d72677eda
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 3255 additions and 87 deletions

View File

@ -400,7 +400,7 @@ public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord
* @param context The request context.
* @param request The actual StreamsGroupHeartbeat request.
*
* @return A Result containing the StreamsGroupHeartbeat response, a list of internal topics to be created and
* @return A result containing the StreamsGroupHeartbeat response, a list of internal topics to be created and
* a list of records to update the state machine.
*/
public CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> streamsGroupHeartbeat(

View File

@ -146,7 +146,7 @@ public class CurrentAssignmentBuilder {
* @param ownedAssignment A collection of active, standby and warm-up tasks
* @return This object.
*/
protected CurrentAssignmentBuilder withOwnedAssignment(TasksTuple ownedAssignment) {
public CurrentAssignmentBuilder withOwnedAssignment(TasksTuple ownedAssignment) {
this.ownedTasks = Optional.ofNullable(ownedAssignment);
return this;
}

View File

@ -330,31 +330,63 @@ public class StreamsGroup implements Group {
}
/**
* Gets or creates a new member but without adding it to the group. Adding a member is done via the
* {@link StreamsGroup#updateMember(StreamsGroupMember)} method.
* Gets a new member or throws an exception, if the member does not exist.
*
* @param memberId The member ID.
* @param createIfNotExists Booleans indicating whether the member must be created if it does not exist.
* @param memberId The member ID.
* @throws UnknownMemberIdException If the member is not found.
* @return A StreamsGroupMember.
*/
public StreamsGroupMember getOrMaybeCreateMember(
String memberId,
boolean createIfNotExists
public StreamsGroupMember getMemberOrThrow(
String memberId
) throws UnknownMemberIdException {
StreamsGroupMember member = members.get(memberId);
if (member != null) {
return member;
}
if (!createIfNotExists) {
throw new UnknownMemberIdException(
String.format("Member %s is not a member of group %s.", memberId, groupId)
);
throw new UnknownMemberIdException(
String.format("Member %s is not a member of group %s.", memberId, groupId)
);
}
/**
* Gets or creates a new member, but keeping its fields uninitialized. This is used on the replay-path.
* The member is not added to the group, adding a member is done via the
* {@link StreamsGroup#updateMember(StreamsGroupMember)} method.
*
* @param memberId The member ID.
* @return A StreamsGroupMember.
*/
public StreamsGroupMember getOrCreateUninitializedMember(
String memberId
) throws UnknownMemberIdException {
StreamsGroupMember member = members.get(memberId);
if (member != null) {
return member;
}
return new StreamsGroupMember.Builder(memberId).build();
}
/**
* Gets or creates a new member, setting default values on the fields. This is used on the replay-path.
* The member is not added to the group, adding a member is done via the
* {@link StreamsGroup#updateMember(StreamsGroupMember)} method.
*
* @param memberId The member ID.
* @return A StreamsGroupMember.
*/
public StreamsGroupMember getOrCreateDefaultMember(
String memberId
) throws UnknownMemberIdException {
StreamsGroupMember member = members.get(memberId);
if (member != null) {
return member;
}
return StreamsGroupMember.Builder.withDefaults(memberId).build();
}
/**
* Gets a static member.
*
@ -363,7 +395,7 @@ public class StreamsGroup implements Group {
*/
public StreamsGroupMember staticMember(String instanceId) {
String existingMemberId = staticMemberId(instanceId);
return existingMemberId == null ? null : getOrMaybeCreateMember(existingMemberId, false);
return existingMemberId == null ? null : getMemberOrThrow(existingMemberId);
}
/**
@ -656,7 +688,7 @@ public class StreamsGroup implements Group {
memberId.equals(JoinGroupRequest.UNKNOWN_MEMBER_ID) && groupInstanceId == null)
return;
final StreamsGroupMember member = getOrMaybeCreateMember(memberId, false);
final StreamsGroupMember member = getMemberOrThrow(memberId);
// If the commit is not transactional and the member uses the new streams protocol (KIP-1071),
// the member should be using the OffsetCommit API version >= 9.

View File

@ -278,6 +278,21 @@ public record StreamsGroupMember(String memberId,
taskIds -> Set.copyOf(taskIds.partitions())));
}
public static Builder withDefaults(String memberId) {
return new Builder(memberId)
.setRebalanceTimeoutMs(-1)
.setTopologyEpoch(-1)
.setInstanceId(null)
.setRackId(null)
.setProcessId("")
.setClientTags(Collections.emptyMap())
.setState(MemberState.STABLE)
.setMemberEpoch(0)
.setAssignedTasks(TasksTuple.EMPTY)
.setTasksPendingRevocation(TasksTuple.EMPTY)
.setUserEndpoint(null);
}
public StreamsGroupMember build() {
return new StreamsGroupMember(
memberId,

View File

@ -16,6 +16,7 @@
*/
package org.apache.kafka.coordinator.group.streams;
import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicInfo;
@ -81,4 +82,17 @@ public record StreamsTopology(int topologyEpoch,
record.subtopologies().stream().collect(Collectors.toMap(Subtopology::subtopologyId, x -> x))
);
}
/**
* Creates an instance of StreamsTopology from a StreamsGroupHeartbeatRequestData request.
*
* @param topology The topology supplied in the request.
* @return The instance of StreamsTopology created from the request.
*/
public static StreamsTopology fromHeartbeatRequest(StreamsGroupHeartbeatRequestData.Topology topology) {
StreamsGroupTopologyValue recordValue = StreamsCoordinatorRecordHelpers.convertToStreamsGroupTopologyRecord(topology);
final Map<String, StreamsGroupTopologyValue.Subtopology> subtopologyMap = recordValue.subtopologies().stream()
.collect(Collectors.toMap(StreamsGroupTopologyValue.Subtopology::subtopologyId, x -> x));
return new StreamsTopology(topology.epoch(), subtopologyMap);
}
}

View File

@ -16,12 +16,16 @@
*/
package org.apache.kafka.coordinator.group.streams;
import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
@ -30,11 +34,11 @@ import java.util.stream.Collectors;
* An immutable tuple containing active, standby and warm-up tasks.
*
* @param activeTasks Active tasks.
* The key of the map is the subtopology ID and the value is the set of partition IDs.
* The key of the map is the subtopology ID, and the value is the set of partition IDs.
* @param standbyTasks Standby tasks.
* The key of the map is the subtopology ID and the value is the set of partition IDs.
* The key of the map is the subtopology ID, and the value is the set of partition IDs.
* @param warmupTasks Warm-up tasks.
* The key of the map is the subtopology ID and the value is the set of partition IDs.
* The key of the map is the subtopology ID, and the value is the set of partition IDs.
*/
public record TasksTuple(Map<String, Set<Integer>> activeTasks,
Map<String, Set<Integer>> standbyTasks,
@ -88,7 +92,7 @@ public record TasksTuple(Map<String, Set<Integer>> activeTasks,
/**
* Checks if this task tuple contains any of the tasks in another task tuple.
*
* @param other The other task tuple.
* @param other Another task tuple.
* @return true if there is at least one active, standby or warm-up task that is present in both tuples.
*/
public boolean containsAny(TasksTuple other) {
@ -130,4 +134,63 @@ public record TasksTuple(Map<String, Set<Integer>> activeTasks,
)
);
}
public String toString() {
return "(active=" + taskAssignmentToString(activeTasks) +
", standby=" + taskAssignmentToString(standbyTasks) +
", warmup=" + taskAssignmentToString(warmupTasks) +
')';
}
public static TasksTuple fromHeartbeatRequest(final List<StreamsGroupHeartbeatRequestData.TaskIds> ownedActiveTasks,
final List<StreamsGroupHeartbeatRequestData.TaskIds> ownedStandbyTasks,
final List<StreamsGroupHeartbeatRequestData.TaskIds> ownedWarmupTasks) {
return new TasksTuple(
ownedActiveTasks.stream()
.collect(Collectors.toMap(
StreamsGroupHeartbeatRequestData.TaskIds::subtopologyId,
taskId -> new HashSet<>(taskId.partitions())
)
),
ownedStandbyTasks.stream()
.collect(Collectors.toMap(
StreamsGroupHeartbeatRequestData.TaskIds::subtopologyId,
taskId -> new HashSet<>(taskId.partitions())
)
),
ownedWarmupTasks.stream()
.collect(Collectors.toMap(
StreamsGroupHeartbeatRequestData.TaskIds::subtopologyId,
taskId -> new HashSet<>(taskId.partitions())
)
)
);
}
/**
* @return The provided assignment as a String.
*
* Example:
* [subtopologyID1-0, subtopologyID1-1, subtopologyID2-0, subtopologyID2-1]
*/
private static String taskAssignmentToString(
Map<String, Set<Integer>> assignment
) {
StringBuilder builder = new StringBuilder("[");
Iterator<Entry<String, Set<Integer>>> subtopologyIterator = assignment.entrySet().iterator();
while (subtopologyIterator.hasNext()) {
Map.Entry<String, Set<Integer>> entry = subtopologyIterator.next();
Iterator<Integer> partitionsIterator = entry.getValue().iterator();
while (partitionsIterator.hasNext()) {
builder.append(entry.getKey());
builder.append("-");
builder.append(partitionsIterator.next());
if (partitionsIterator.hasNext() || subtopologyIterator.hasNext()) {
builder.append(", ");
}
}
}
builder.append("]");
return builder.toString();
}
}

View File

@ -23,7 +23,9 @@ import java.util.Set;
/**
* The task assignment for a Streams group member.
*
* @param activeTasks The target tasks assigned to this member keyed by subtopologyId.
* @param activeTasks The active tasks assigned to this member keyed by subtopologyId.
* @param standbyTasks The standby tasks assigned to this member keyed by subtopologyId.
* @param warmupTasks The warm-up tasks assigned to this member keyed by subtopologyId.
*/
public record MemberAssignment(Map<String, Set<Integer>> activeTasks,
Map<String, Set<Integer>> standbyTasks,

View File

@ -108,8 +108,13 @@ import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup;
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupBuilder;
import org.apache.kafka.coordinator.group.modern.share.ShareGroup;
import org.apache.kafka.coordinator.group.modern.share.ShareGroupBuilder;
import org.apache.kafka.coordinator.group.streams.MockTaskAssignor;
import org.apache.kafka.coordinator.group.streams.StreamsGroup;
import org.apache.kafka.coordinator.group.streams.StreamsGroupBuilder;
import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult;
import org.apache.kafka.coordinator.group.streams.StreamsGroupMember;
import org.apache.kafka.coordinator.group.streams.TasksTuple;
import org.apache.kafka.coordinator.group.streams.assignor.TaskAssignor;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.server.common.ApiMessageAndVersion;
@ -158,6 +163,7 @@ import static org.mockito.Mockito.mock;
public class GroupMetadataManagerTestContext {
static final String DEFAULT_CLIENT_ID = "client";
static final InetAddress DEFAULT_CLIENT_ADDRESS = InetAddress.getLoopbackAddress();
static final String DEFAULT_PROCESS_ID = "process-id";
private static class GroupCoordinatorConfigContext extends GroupCoordinatorConfig {
GroupCoordinatorConfigContext(AbstractConfig config) {
@ -465,6 +471,7 @@ public class GroupMetadataManagerTestContext {
private final List<ShareGroupBuilder> shareGroupBuilders = new ArrayList<>();
private final Map<String, Object> config = new HashMap<>();
private Optional<Authorizer> authorizer = Optional.empty();
private List<TaskAssignor> streamsGroupAssignors = Collections.singletonList(new MockTaskAssignor("mock"));
public Builder withConfig(String key, Object value) {
config.put(key, value);
@ -500,6 +507,11 @@ public class GroupMetadataManagerTestContext {
this.authorizer = Optional.of(authorizer);
return this;
}
public Builder withStreamsGroupTaskAssignors(List<TaskAssignor> assignors) {
this.streamsGroupAssignors = assignors;
return this;
}
public GroupMetadataManagerTestContext build() {
if (metadataImage == null) metadataImage = MetadataImage.EMPTY;
@ -531,6 +543,7 @@ public class GroupMetadataManagerTestContext {
.withShareGroupAssignor(shareGroupAssignor)
.withGroupConfigManager(groupConfigManager)
.withAuthorizer(authorizer)
.withStreamsGroupAssignors(streamsGroupAssignors)
.build(),
groupConfigManager
);
@ -607,6 +620,14 @@ public class GroupMetadataManagerTestContext {
.state();
}
public StreamsGroup.StreamsGroupState streamsGroupState(
String groupId
) {
return groupMetadataManager
.streamsGroup(groupId)
.state();
}
public MemberState consumerGroupMemberState(
String groupId,
String memberId
@ -617,6 +638,16 @@ public class GroupMetadataManagerTestContext {
.state();
}
public org.apache.kafka.coordinator.group.streams.MemberState streamsGroupMemberState(
String groupId,
String memberId
) {
return groupMetadataManager
.streamsGroup(groupId)
.getMemberOrThrow(memberId)
.state();
}
public CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> consumerGroupHeartbeat(
ConsumerGroupHeartbeatRequestData request
) {
@ -1746,4 +1777,22 @@ public class GroupMetadataManagerTestContext {
public void updateGroupConfig(String groupId, Properties newGroupConfig) {
groupConfigManager.updateGroupConfig(groupId, newGroupConfig);
}
public static StreamsGroupMember.Builder streamsGroupMemberBuilderWithDefaults(String memberId) {
return new StreamsGroupMember.Builder(memberId)
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
.setAssignedTasks(TasksTuple.EMPTY)
.setTasksPendingRevocation(TasksTuple.EMPTY)
.setClientId(DEFAULT_CLIENT_ID)
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
.setRackId(null)
.setInstanceId(null)
.setRebalanceTimeoutMs(1500)
.setAssignedTasks(TasksTuple.EMPTY)
.setTasksPendingRevocation(TasksTuple.EMPTY)
.setTopologyEpoch(0)
.setClientTags(Map.of())
.setProcessId(DEFAULT_PROCESS_ID)
.setUserEndpoint(null);
}
}

View File

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.streams;
import org.apache.kafka.coordinator.group.streams.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.streams.assignor.GroupSpec;
import org.apache.kafka.coordinator.group.streams.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.streams.assignor.TaskAssignor;
import org.apache.kafka.coordinator.group.streams.assignor.TaskAssignorException;
import org.apache.kafka.coordinator.group.streams.assignor.TopologyDescriber;
import java.util.Map;
import java.util.Map.Entry;
import java.util.stream.Collectors;
public class MockTaskAssignor implements TaskAssignor {
private final String name;
private GroupAssignment preparedGroupAssignment = null;
public MockTaskAssignor(String name) {
this.name = name;
}
public void prepareGroupAssignment(GroupAssignment prepareGroupAssignment) {
this.preparedGroupAssignment = prepareGroupAssignment;
}
public void prepareGroupAssignment(Map<String, TasksTuple> memberAssignments) {
this.preparedGroupAssignment =
new GroupAssignment(memberAssignments.entrySet().stream().collect(
Collectors.toMap(
Entry::getKey,
entry -> {
TasksTuple tasksTuple = entry.getValue();
return new MemberAssignment(
tasksTuple.activeTasks(), tasksTuple.standbyTasks(), tasksTuple.warmupTasks());
})));
}
@Override
public String name() {
return name;
}
@Override
public GroupAssignment assign(final GroupSpec groupSpec, final TopologyDescriber topologyDescriber)
throws TaskAssignorException {
return preparedGroupAssignment;
}
}

View File

@ -18,8 +18,10 @@ package org.apache.kafka.coordinator.group.streams;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -97,7 +99,7 @@ public class StreamsGroupBuilder {
groupId,
new StreamsGroupTopologyValue()
.setEpoch(topology.topologyEpoch())
.setSubtopologies(topology.subtopologies().values().stream().sorted().toList()))
.setSubtopologies(topology.subtopologies().values().stream().sorted(Comparator.comparing(Subtopology::subtopologyId)).toList()))
);
}

View File

@ -74,6 +74,7 @@ import static org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
@ -100,23 +101,53 @@ public class StreamsGroupTest {
}
@Test
public void testGetOrCreateMember() {
public void testGetOrCreateUninitializedMember() {
StreamsGroup streamsGroup = createStreamsGroup("foo");
StreamsGroupMember uninitializedMember = new StreamsGroupMember.Builder("member-id").build();
StreamsGroupMember member = streamsGroup.getOrCreateUninitializedMember("member-id");
assertEquals(uninitializedMember, member);
StreamsGroupMember updatedMember = new StreamsGroupMember.Builder(member).setInstanceId("unique-new-id").build();
streamsGroup.updateMember(updatedMember);
assertEquals(updatedMember, streamsGroup.getOrCreateUninitializedMember("member-id"));
assertNotEquals(uninitializedMember, streamsGroup.getOrCreateUninitializedMember("member-id"));
}
@Test
public void testGetOrCreateDefaultMember() {
StreamsGroup streamsGroup = createStreamsGroup("foo");
StreamsGroupMember defaultMember = StreamsGroupMember.Builder.withDefaults("member-id").build();
StreamsGroupMember member = streamsGroup.getOrCreateDefaultMember("member-id");
assertEquals(defaultMember, member);
StreamsGroupMember updatedMember = new StreamsGroupMember.Builder(member).setInstanceId("unique-new-id").build();
streamsGroup.updateMember(updatedMember);
assertEquals(updatedMember, streamsGroup.getOrCreateDefaultMember("member-id"));
assertNotEquals(defaultMember, streamsGroup.getOrCreateDefaultMember("member-id"));
}
@Test
public void testGetMemberOrThrow() {
StreamsGroup streamsGroup = createStreamsGroup("foo");
StreamsGroupMember member;
// Create a member.
member = streamsGroup.getOrMaybeCreateMember("member-id", true);
member = streamsGroup.getOrCreateDefaultMember("member-id");
assertEquals("member-id", member.memberId());
// Add member to the group.
streamsGroup.updateMember(member);
// Get that member back.
member = streamsGroup.getOrMaybeCreateMember("member-id", false);
member = streamsGroup.getMemberOrThrow("member-id");
assertEquals("member-id", member.memberId());
assertThrows(UnknownMemberIdException.class, () ->
streamsGroup.getOrMaybeCreateMember("does-not-exist", false));
streamsGroup.getMemberOrThrow("does-not-exist"));
}
@Test
@ -124,13 +155,13 @@ public class StreamsGroupTest {
StreamsGroup streamsGroup = createStreamsGroup("foo");
StreamsGroupMember member;
member = streamsGroup.getOrMaybeCreateMember("member", true);
member = streamsGroup.getOrCreateDefaultMember("member");
member = new StreamsGroupMember.Builder(member).build();
streamsGroup.updateMember(member);
assertEquals(member, streamsGroup.getOrMaybeCreateMember("member", false));
assertEquals(member, streamsGroup.getMemberOrThrow("member"));
}
@Test
@ -138,7 +169,7 @@ public class StreamsGroupTest {
StreamsGroup streamsGroup = createStreamsGroup("foo");
// Create a new member which is not static
streamsGroup.getOrMaybeCreateMember("member", true);
streamsGroup.getOrCreateDefaultMember("member");
assertNull(streamsGroup.staticMember("instance-id"));
}
@ -147,7 +178,7 @@ public class StreamsGroupTest {
StreamsGroup streamsGroup = createStreamsGroup("foo");
StreamsGroupMember member;
member = streamsGroup.getOrMaybeCreateMember("member", true);
member = streamsGroup.getOrCreateDefaultMember("member");
member = new StreamsGroupMember.Builder(member)
.setInstanceId("instance")
@ -156,7 +187,7 @@ public class StreamsGroupTest {
streamsGroup.updateMember(member);
assertEquals(member, streamsGroup.staticMember("instance"));
assertEquals(member, streamsGroup.getOrMaybeCreateMember("member", false));
assertEquals(member, streamsGroup.getMemberOrThrow("member"));
assertEquals(member.memberId(), streamsGroup.staticMemberId("instance"));
}
@ -164,13 +195,12 @@ public class StreamsGroupTest {
public void testRemoveMember() {
StreamsGroup streamsGroup = createStreamsGroup("foo");
StreamsGroupMember member = streamsGroup.getOrMaybeCreateMember("member", true);
StreamsGroupMember member = streamsGroup.getOrCreateDefaultMember("member");
streamsGroup.updateMember(member);
assertTrue(streamsGroup.hasMember("member"));
streamsGroup.removeMember("member");
assertFalse(streamsGroup.hasMember("member"));
}
@Test

View File

@ -16,6 +16,7 @@
*/
package org.apache.kafka.coordinator.group.streams;
import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicInfo;
@ -103,6 +104,22 @@ public class StreamsTopologyTest {
assertEquals(mkSubtopology2(), topology.subtopologies().get(SUBTOPOLOGY_ID_2));
}
@Test
public void fromHeartbeatRequestShouldCreateCorrectTopology() {
StreamsGroupHeartbeatRequestData.Topology requestTopology = new StreamsGroupHeartbeatRequestData.Topology()
.setEpoch(1)
.setSubtopologies(List.of(mkRequestSubtopology1(), mkRequestSubtopology2()));
StreamsTopology topology = StreamsTopology.fromHeartbeatRequest(requestTopology);
assertEquals(1, topology.topologyEpoch());
assertEquals(2, topology.subtopologies().size());
assertTrue(topology.subtopologies().containsKey(SUBTOPOLOGY_ID_1));
assertEquals(mkSubtopology1(), topology.subtopologies().get(SUBTOPOLOGY_ID_1));
assertTrue(topology.subtopologies().containsKey(SUBTOPOLOGY_ID_2));
assertEquals(mkSubtopology2(), topology.subtopologies().get(SUBTOPOLOGY_ID_2));
}
private Subtopology mkSubtopology1() {
return new Subtopology()
.setSubtopologyId(SUBTOPOLOGY_ID_1)
@ -147,4 +164,49 @@ public class StreamsTopologyTest {
new TopicInfo().setName(CHANGELOG_TOPIC_3)
));
}
private StreamsGroupHeartbeatRequestData.Subtopology mkRequestSubtopology1() {
return new StreamsGroupHeartbeatRequestData.Subtopology()
.setSubtopologyId(SUBTOPOLOGY_ID_1)
.setSourceTopics(List.of(
SOURCE_TOPIC_1,
SOURCE_TOPIC_2,
REPARTITION_TOPIC_1,
REPARTITION_TOPIC_2
))
.setRepartitionSourceTopics(List.of(
new StreamsGroupHeartbeatRequestData.TopicInfo().setName(REPARTITION_TOPIC_1),
new StreamsGroupHeartbeatRequestData.TopicInfo().setName(REPARTITION_TOPIC_2)
))
.setRepartitionSinkTopics(List.of(
REPARTITION_TOPIC_3
))
.setStateChangelogTopics(List.of(
new StreamsGroupHeartbeatRequestData.TopicInfo().setName(CHANGELOG_TOPIC_1),
new StreamsGroupHeartbeatRequestData.TopicInfo().setName(CHANGELOG_TOPIC_2)
))
.setCopartitionGroups(List.of(
new StreamsGroupHeartbeatRequestData.CopartitionGroup()
.setRepartitionSourceTopics(List.of((short) 0))
.setSourceTopics(List.of((short) 0)),
new StreamsGroupHeartbeatRequestData.CopartitionGroup()
.setRepartitionSourceTopics(List.of((short) 1))
.setSourceTopics(List.of((short) 1))
));
}
private StreamsGroupHeartbeatRequestData.Subtopology mkRequestSubtopology2() {
return new StreamsGroupHeartbeatRequestData.Subtopology()
.setSubtopologyId(SUBTOPOLOGY_ID_2)
.setSourceTopics(List.of(
SOURCE_TOPIC_3,
REPARTITION_TOPIC_3
))
.setRepartitionSourceTopics(List.of(
new StreamsGroupHeartbeatRequestData.TopicInfo().setName(REPARTITION_TOPIC_3)
))
.setStateChangelogTopics(List.of(
new StreamsGroupHeartbeatRequestData.TopicInfo().setName(CHANGELOG_TOPIC_3)
));
}
}