KAFKA-18325: Add TargetAssignmentBuilder (#18676)

A class to build a new target assignment based on the provided parameters. As a result, it yields the records that must be persisted to the log and the new member assignments as a map.

Compared to the feature branch, I extended the unit tests (testing also standby and warm-up task logic) and adopted simplifications due to the TasksTuple class.

Reviewers: Bruno Cadonna <cadonna@apache.org>, Bill Bejeck <bbejeck@apache.org>
This commit is contained in:
Lucas Brutschy 2025-02-03 17:35:28 +01:00 committed by GitHub
parent 1a106e4538
commit 4ca24a7dbf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 1468 additions and 21 deletions

View File

@ -0,0 +1,351 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.streams;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.group.streams.assignor.AssignmentMemberSpec;
import org.apache.kafka.coordinator.group.streams.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.streams.assignor.GroupSpecImpl;
import org.apache.kafka.coordinator.group.streams.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.streams.assignor.TaskAssignor;
import org.apache.kafka.coordinator.group.streams.assignor.TaskAssignorException;
import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
/**
* Build the new target member assignments based on the provided parameters by calling the task assignor.
* As a result,
* it yields the records that must be persisted to the log and the new member assignments as a map from member ID to tasks tuple.
* <p>
* Records are only created for members which have a new target assignment. If their assignment did not change, no new record is needed.
* <p>
* When a member is deleted, it is assumed that its target assignment record is deleted as part of the member deletion process. In other
* words, this class does not yield a tombstone for removed members.
*/
public class TargetAssignmentBuilder {
/**
* The group ID.
*/
private final String groupId;
/**
* The group epoch.
*/
private final int groupEpoch;
/**
* The partition assignor used to compute the assignment.
*/
private final TaskAssignor assignor;
/**
* The assignment configs.
*/
private final Map<String, String> assignmentConfigs;
/**
* The members which have been updated or deleted. A null value signals deleted members.
*/
private final Map<String, StreamsGroupMember> updatedMembers = new HashMap<>();
/**
* The members in the group.
*/
private Map<String, StreamsGroupMember> members = Map.of();
/**
* The partition metadata.
*/
private Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata> partitionMetadata = Map.of();
/**
* The existing target assignment.
*/
private Map<String, org.apache.kafka.coordinator.group.streams.TasksTuple> targetAssignment = Map.of();
/**
* The topology.
*/
private ConfiguredTopology topology;
/**
* The static members in the group.
*/
private Map<String, String> staticMembers = Map.of();
/**
* Constructs the object.
*
* @param groupId The group ID.
* @param groupEpoch The group epoch to compute a target assignment for.
* @param assignor The assignor to use to compute the target assignment.
*/
public TargetAssignmentBuilder(
String groupId,
int groupEpoch,
TaskAssignor assignor,
Map<String, String> assignmentConfigs
) {
this.groupId = Objects.requireNonNull(groupId);
this.groupEpoch = groupEpoch;
this.assignor = Objects.requireNonNull(assignor);
this.assignmentConfigs = Objects.requireNonNull(assignmentConfigs);
}
static AssignmentMemberSpec createAssignmentMemberSpec(
StreamsGroupMember member,
TasksTuple targetAssignment
) {
return new AssignmentMemberSpec(
member.instanceId(),
member.rackId(),
targetAssignment.activeTasks(),
targetAssignment.standbyTasks(),
targetAssignment.warmupTasks(),
member.processId(),
member.clientTags(),
Map.of(),
Map.of()
);
}
/**
* Adds all the existing members.
*
* @param members The existing members in the streams group.
* @return This object.
*/
public TargetAssignmentBuilder withMembers(
Map<String, StreamsGroupMember> members
) {
this.members = members;
return this;
}
/**
* Adds all the existing static members.
*
* @param staticMembers The existing static members in the streams group.
* @return This object.
*/
public TargetAssignmentBuilder withStaticMembers(
Map<String, String> staticMembers
) {
this.staticMembers = staticMembers;
return this;
}
/**
* Adds the partition metadata to use.
*
* @param partitionMetadata The partition metadata.
* @return This object.
*/
public TargetAssignmentBuilder withPartitionMetadata(
Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata> partitionMetadata
) {
this.partitionMetadata = partitionMetadata;
return this;
}
/**
* Adds the existing target assignment.
*
* @param targetAssignment The existing target assignment.
* @return This object.
*/
public TargetAssignmentBuilder withTargetAssignment(
Map<String, org.apache.kafka.coordinator.group.streams.TasksTuple> targetAssignment
) {
this.targetAssignment = targetAssignment;
return this;
}
/**
* Adds the topology image.
*
* @param topology The topology.
* @return This object.
*/
public TargetAssignmentBuilder withTopology(
ConfiguredTopology topology
) {
this.topology = topology;
return this;
}
/**
* Adds or updates a member. This is useful when the updated member is not yet materialized in memory.
*
* @param memberId The member ID.
* @param member The member to add or update.
* @return This object.
*/
public TargetAssignmentBuilder addOrUpdateMember(
String memberId,
StreamsGroupMember member
) {
this.updatedMembers.put(memberId, member);
return this;
}
/**
* Removes a member. This is useful when the removed member is not yet materialized in memory.
*
* @param memberId The member ID.
* @return This object.
*/
public TargetAssignmentBuilder removeMember(
String memberId
) {
return addOrUpdateMember(memberId, null);
}
/**
* Builds the new target assignment.
*
* @return A TargetAssignmentResult which contains the records to update the existing target assignment.
* @throws TaskAssignorException if the target assignment cannot be computed.
*/
public TargetAssignmentResult build() throws TaskAssignorException {
Map<String, AssignmentMemberSpec> memberSpecs = new HashMap<>();
// Prepare the member spec for all members.
members.forEach((memberId, member) -> memberSpecs.put(memberId, createAssignmentMemberSpec(
member,
targetAssignment.getOrDefault(memberId, org.apache.kafka.coordinator.group.streams.TasksTuple.EMPTY)
)));
// Update the member spec if updated or deleted members.
updatedMembers.forEach((memberId, updatedMemberOrNull) -> {
if (updatedMemberOrNull == null) {
memberSpecs.remove(memberId);
} else {
org.apache.kafka.coordinator.group.streams.TasksTuple assignment = targetAssignment.getOrDefault(memberId,
org.apache.kafka.coordinator.group.streams.TasksTuple.EMPTY);
// A new static member joins and needs to replace an existing departed one.
if (updatedMemberOrNull.instanceId().isPresent()) {
String previousMemberId = staticMembers.get(updatedMemberOrNull.instanceId().get());
if (previousMemberId != null && !previousMemberId.equals(memberId)) {
assignment = targetAssignment.getOrDefault(previousMemberId,
org.apache.kafka.coordinator.group.streams.TasksTuple.EMPTY);
}
}
memberSpecs.put(memberId, createAssignmentMemberSpec(
updatedMemberOrNull,
assignment
));
}
});
// Compute the assignment.
GroupAssignment newGroupAssignment;
if (topology.isReady()) {
if (topology.subtopologies().isEmpty()) {
throw new IllegalStateException("Subtopologies must be present if topology is ready.");
}
newGroupAssignment = assignor.assign(
new GroupSpecImpl(
Collections.unmodifiableMap(memberSpecs),
assignmentConfigs
),
new TopologyMetadata(partitionMetadata, topology.subtopologies().get())
);
} else {
newGroupAssignment = new GroupAssignment(
memberSpecs.keySet().stream().collect(Collectors.toMap(x -> x, x -> MemberAssignment.empty())));
}
// Compute delta from previous to new target assignment and create the
// relevant records.
List<CoordinatorRecord> records = new ArrayList<>();
Map<String, org.apache.kafka.coordinator.group.streams.TasksTuple> newTargetAssignment = new HashMap<>();
memberSpecs.keySet().forEach(memberId -> {
org.apache.kafka.coordinator.group.streams.TasksTuple oldMemberAssignment = targetAssignment.get(memberId);
org.apache.kafka.coordinator.group.streams.TasksTuple newMemberAssignment = newMemberAssignment(newGroupAssignment, memberId);
newTargetAssignment.put(memberId, newMemberAssignment);
if (oldMemberAssignment == null) {
// If the member had no assignment, we always create a record for it.
records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(
groupId,
memberId,
newMemberAssignment
));
} else {
// If the member had an assignment, we only create a record if the
// new assignment is different.
if (!newMemberAssignment.equals(oldMemberAssignment)) {
records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(
groupId,
memberId,
newMemberAssignment
));
}
}
});
// Bump the target assignment epoch.
records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, groupEpoch));
return new TargetAssignmentResult(records, newTargetAssignment);
}
private TasksTuple newMemberAssignment(
GroupAssignment newGroupAssignment,
String memberId
) {
MemberAssignment newMemberAssignment = newGroupAssignment.members().get(memberId);
if (newMemberAssignment != null) {
return new TasksTuple(
newMemberAssignment.activeTasks(),
newMemberAssignment.standbyTasks(),
newMemberAssignment.warmupTasks()
);
} else {
return TasksTuple.EMPTY;
}
}
/**
* The assignment result returned by {{@link TargetAssignmentBuilder#build()}}.
*
* @param records The records that must be applied to the __consumer_offsets topics to persist the new target assignment.
* @param targetAssignment The new target assignment for the group.
*/
public record TargetAssignmentResult(
List<CoordinatorRecord> records,
Map<String, TasksTuple> targetAssignment
) {
public TargetAssignmentResult {
Objects.requireNonNull(records);
Objects.requireNonNull(targetAssignment);
}
}
}

View File

@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.streams;
import org.apache.kafka.coordinator.group.streams.assignor.TopologyDescriber;
import org.apache.kafka.coordinator.group.streams.topics.ConfiguredSubtopology;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.SortedMap;
import java.util.stream.Stream;
/**
* The topology metadata class is used by the {@link org.apache.kafka.coordinator.group.streams.assignor.TaskAssignor} to get topic and
* partition metadata for the topology that the streams group using.
*
* @param topicMetadata The topic Ids mapped to their corresponding {@link TopicMetadata} object, which contains topic and partition
* metadata.
* @param subtopologyMap The configured subtopologies
*/
public record TopologyMetadata(Map<String, TopicMetadata> topicMetadata, SortedMap<String, ConfiguredSubtopology> subtopologyMap) implements TopologyDescriber {
public TopologyMetadata {
topicMetadata = Objects.requireNonNull(Collections.unmodifiableMap(topicMetadata));
subtopologyMap = Objects.requireNonNull(Collections.unmodifiableSortedMap(subtopologyMap));
}
/**
* Map of topic names to topic metadata.
*
* @return The map of topic Ids to topic metadata.
*/
@Override
public Map<String, TopicMetadata> topicMetadata() {
return this.topicMetadata;
}
/**
* Checks whether the given subtopology is associated with a changelog topic.
*
* @param subtopologyId String identifying the subtopology.
* @throws NoSuchElementException if the subtopology ID does not exist.
* @return true if the subtopology is associated with a changelog topic, false otherwise.
*/
@Override
public boolean isStateful(String subtopologyId) {
final ConfiguredSubtopology subtopology = getSubtopologyOrFail(subtopologyId);
return !subtopology.stateChangelogTopics().isEmpty();
}
/**
* The list of subtopologies in the topology.
*
* @return a list of subtopology IDs.
*/
@Override
public List<String> subtopologies() {
return subtopologyMap.keySet().stream().toList();
}
/**
* The maximal number of input partitions among all source topics for the given subtopology.
*
* @param subtopologyId String identifying the subtopology.
*
* @throws NoSuchElementException if the subtopology ID does not exist.
* @throws IllegalStateException if the subtopology contains no source topics.
* @return The maximal number of input partitions among all source topics for the given subtopology.
*/
@Override
public int maxNumInputPartitions(String subtopologyId) {
final ConfiguredSubtopology subtopology = getSubtopologyOrFail(subtopologyId);
return Stream.concat(
subtopology.sourceTopics().stream(),
subtopology.repartitionSourceTopics().keySet().stream()
).map(topic -> this.topicMetadata.get(topic).numPartitions()).max(Integer::compareTo).orElseThrow(
() -> new IllegalStateException("Subtopology does not contain any source topics")
);
}
private ConfiguredSubtopology getSubtopologyOrFail(String subtopologyId) {
if (!subtopologyMap.containsKey(subtopologyId)) {
throw new NoSuchElementException(String.format("Topology does not contain subtopology %s", subtopologyId));
}
return subtopologyMap.get(subtopologyId);
}
}

View File

@ -46,7 +46,7 @@ public class MockAssignor implements TaskAssignor {
Map<String, String[]> subtopologyToActiveMember = new HashMap<>();
for (String subtopology : topologyDescriber.subtopologies()) {
int numberOfPartitions = topologyDescriber.numTasks(subtopology);
int numberOfPartitions = topologyDescriber.maxNumInputPartitions(subtopology);
subtopologyToActiveMember.put(subtopology, new String[numberOfPartitions]);
}

View File

@ -72,7 +72,7 @@ public class StickyTaskAssignor implements TaskAssignor {
Set<TaskId> ret = new HashSet<>();
for (String subtopology : topologyDescriber.subtopologies()) {
if (isActive || topologyDescriber.isStateful(subtopology)) {
int numberOfPartitions = topologyDescriber.numTasks(subtopology);
int numberOfPartitions = topologyDescriber.maxNumInputPartitions(subtopology);
for (int i = 0; i < numberOfPartitions; i++) {
ret.add(new TaskId(subtopology, i));
}
@ -85,7 +85,7 @@ public class StickyTaskAssignor implements TaskAssignor {
localState = new LocalState();
localState.allTasks = 0;
for (String subtopology : topologyDescriber.subtopologies()) {
int numberOfPartitions = topologyDescriber.numTasks(subtopology);
int numberOfPartitions = topologyDescriber.maxNumInputPartitions(subtopology);
localState.allTasks += numberOfPartitions;
}
localState.totalCapacity = groupSpec.members().size();

View File

@ -20,30 +20,34 @@ import java.util.List;
import java.util.NoSuchElementException;
/**
* The subscribed topic describer is used by the {@link TaskAssignor} to obtain topic and task metadata of the groups topology.
* The topology describer is used by the {@link TaskAssignor} to get topic and task metadata of the group's topology.
*/
public interface TopologyDescriber {
/**
* Map of topic names to topic metadata.
*
* @return The list of subtopologies IDs.
*/
List<String> subtopologies();
/**
* The number of tasks for the given subtopology.
* The maximal number of input partitions among all source topics for the given subtopology.
*
* @param subtopologyId String identifying the subtopology.
*
* @return The number of tasks corresponding to the given subtopology ID.
* @throws NoSuchElementException if subtopology does not exist in the topology.
* @throws NoSuchElementException if the subtopology ID does not exist.
* @throws IllegalStateException if the subtopology contains no source topics.
* @return The maximal number of input partitions among all source topics for the given subtopology.
*/
int numTasks(String subtopologyId) throws NoSuchElementException;
int maxNumInputPartitions(String subtopologyId) throws NoSuchElementException;
/**
* Whether the given subtopology is stateful.
* Checks whether the given subtopology is associated with a changelog topic.
*
* @param subtopologyId String identifying the subtopology.
* @return true if the subtopology is stateful, false otherwise.
* @throws NoSuchElementException if the subtopology ID does not exist.
* @return true if the subtopology is associated with a changelog topic, false otherwise.
*/
boolean isStateful(String subtopologyId);

View File

@ -23,6 +23,7 @@ import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.SortedMap;
import java.util.stream.Collectors;
/**
@ -41,7 +42,7 @@ import java.util.stream.Collectors;
* reported back to the client.
*/
public record ConfiguredTopology(int topologyEpoch,
Optional<Map<String, ConfiguredSubtopology>> subtopologies,
Optional<SortedMap<String, ConfiguredSubtopology>> subtopologies,
Map<String, CreatableTopic> internalTopicsToBeCreated,
Optional<TopicConfigurationException> topicConfigurationException) {

View File

@ -34,6 +34,8 @@ import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -74,12 +76,16 @@ public class InternalTopicManager {
Map<String, Integer> decidedPartitionCountsForInternalTopics =
decidePartitionCounts(logContext, topology, topicMetadata, copartitionGroupsBySubtopology, log);
final Map<String, ConfiguredSubtopology> configuredSubtopologies =
final SortedMap<String, ConfiguredSubtopology> configuredSubtopologies =
subtopologies.stream()
.collect(Collectors.toMap(
StreamsGroupTopologyValue.Subtopology::subtopologyId,
x -> fromPersistedSubtopology(x, decidedPartitionCountsForInternalTopics))
);
x -> fromPersistedSubtopology(x, decidedPartitionCountsForInternalTopics),
(v1, v2) -> {
throw new RuntimeException(String.format("Duplicate key for values %s and %s", v1, v2));
},
TreeMap::new
));
Map<String, CreatableTopic> internalTopicsToCreate = missingInternalTopics(configuredSubtopologies, topicMetadata);
if (!internalTopicsToCreate.isEmpty()) {

View File

@ -0,0 +1,852 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.streams;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.group.MetadataImageBuilder;
import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue;
import org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.TaskRole;
import org.apache.kafka.coordinator.group.streams.assignor.AssignmentMemberSpec;
import org.apache.kafka.coordinator.group.streams.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.streams.assignor.GroupSpecImpl;
import org.apache.kafka.coordinator.group.streams.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.streams.assignor.TaskAssignor;
import org.apache.kafka.coordinator.group.streams.topics.ConfiguredSubtopology;
import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.coordinator.group.Assertions.assertUnorderedRecordsEquals;
import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpersTest.mkMapOfPartitionRacks;
import static org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord;
import static org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord;
import static org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.createAssignmentMemberSpec;
import static org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasks;
import static org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasksTuple;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class TargetAssignmentBuilderTest {
@Test
public void testBuildEmptyAssignmentWhenTopologyNotReady() {
String groupId = "test-group";
int groupEpoch = 1;
TaskAssignor assignor = mock(TaskAssignor.class);
ConfiguredTopology topology = mock(ConfiguredTopology.class);
Map<String, String> assignmentConfigs = new HashMap<>();
when(topology.isReady()).thenReturn(false);
TargetAssignmentBuilder builder = new TargetAssignmentBuilder(groupId, groupEpoch, assignor, assignmentConfigs)
.withTopology(topology);
TargetAssignmentBuilder.TargetAssignmentResult result = builder.build();
List<CoordinatorRecord> expectedRecords = Collections.singletonList(
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, groupEpoch)
);
assertEquals(expectedRecords, result.records());
assertEquals(Collections.emptyMap(), result.targetAssignment());
}
@ParameterizedTest
@EnumSource(TaskRole.class)
public void testCreateAssignmentMemberSpec(TaskRole taskRole) {
String fooSubtopologyId = Uuid.randomUuid().toString();
String barSubtopologyId = Uuid.randomUuid().toString();
final Map<String, String> clientTags = mkMap(mkEntry("tag1", "value1"), mkEntry("tag2", "value2"));
StreamsGroupMember member = new StreamsGroupMember.Builder("member-id")
.setRackId("rackId")
.setInstanceId("instanceId")
.setProcessId("processId")
.setClientTags(clientTags)
.build();
TasksTuple assignment = mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 1, 2, 3),
mkTasks(barSubtopologyId, 1, 2, 3)
);
AssignmentMemberSpec assignmentMemberSpec = createAssignmentMemberSpec(
member,
assignment
);
assertEquals(new AssignmentMemberSpec(
Optional.of("instanceId"),
Optional.of("rackId"),
assignment.activeTasks(),
assignment.standbyTasks(),
assignment.warmupTasks(),
"processId",
clientTags,
Map.of(),
Map.of()
), assignmentMemberSpec);
}
@Test
public void testEmpty() {
TargetAssignmentBuilderTestContext context = new TargetAssignmentBuilderTestContext(
"my-group",
20
);
org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.TargetAssignmentResult result = context.build();
assertEquals(List.of(newStreamsGroupTargetAssignmentEpochRecord(
"my-group",
20
)), result.records());
assertEquals(Map.of(), result.targetAssignment());
}
@ParameterizedTest
@EnumSource(TaskRole.class)
public void testAssignmentHasNotChanged(TaskRole taskRole) {
TargetAssignmentBuilderTestContext context = new TargetAssignmentBuilderTestContext(
"my-group",
20
);
String fooSubtopologyId = context.addSubtopologyWithSingleSourceTopic("foo", 6, Map.of());
String barSubtopologyId = context.addSubtopologyWithSingleSourceTopic("bar", 6, Map.of());
context.addGroupMember("member-1", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 1, 2, 3),
mkTasks(barSubtopologyId, 1, 2, 3)
));
context.addGroupMember("member-2", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 4, 5, 6),
mkTasks(barSubtopologyId, 4, 5, 6)
));
context.prepareMemberAssignment("member-1", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 1, 2, 3),
mkTasks(barSubtopologyId, 1, 2, 3)
));
context.prepareMemberAssignment("member-2", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 4, 5, 6),
mkTasks(barSubtopologyId, 4, 5, 6)
));
org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.TargetAssignmentResult result = context.build();
assertEquals(List.of(newStreamsGroupTargetAssignmentEpochRecord(
"my-group",
20
)), result.records());
Map<String, TasksTuple> expectedAssignment = new HashMap<>();
expectedAssignment.put("member-1", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 1, 2, 3),
mkTasks(barSubtopologyId, 1, 2, 3)
));
expectedAssignment.put("member-2", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 4, 5, 6),
mkTasks(barSubtopologyId, 4, 5, 6)
));
assertEquals(expectedAssignment, result.targetAssignment());
}
@ParameterizedTest
@EnumSource(TaskRole.class)
public void testAssignmentSwapped(TaskRole taskRole) {
TargetAssignmentBuilderTestContext context = new TargetAssignmentBuilderTestContext(
"my-group",
20
);
String fooSubtopologyId = context.addSubtopologyWithSingleSourceTopic("foo", 6, Map.of());
String barSubtopologyId = context.addSubtopologyWithSingleSourceTopic("bar", 6, Map.of());
context.addGroupMember("member-1", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 1, 2, 3),
mkTasks(barSubtopologyId, 1, 2, 3)
));
context.addGroupMember("member-2", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 4, 5, 6),
mkTasks(barSubtopologyId, 4, 5, 6)
));
context.prepareMemberAssignment("member-2", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 1, 2, 3),
mkTasks(barSubtopologyId, 1, 2, 3)
));
context.prepareMemberAssignment("member-1", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 4, 5, 6),
mkTasks(barSubtopologyId, 4, 5, 6)
));
org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.TargetAssignmentResult result = context.build();
assertEquals(3, result.records().size());
assertUnorderedRecordsEquals(List.of(List.of(
newStreamsGroupTargetAssignmentRecord("my-group", "member-1", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 4, 5, 6),
mkTasks(barSubtopologyId, 4, 5, 6)
)),
newStreamsGroupTargetAssignmentRecord("my-group", "member-2", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 1, 2, 3),
mkTasks(barSubtopologyId, 1, 2, 3)
))
)), result.records().subList(0, 2));
assertEquals(newStreamsGroupTargetAssignmentEpochRecord(
"my-group",
20
), result.records().get(2));
Map<String, TasksTuple> expectedAssignment = new HashMap<>();
expectedAssignment.put("member-2", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 1, 2, 3),
mkTasks(barSubtopologyId, 1, 2, 3)
));
expectedAssignment.put("member-1", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 4, 5, 6),
mkTasks(barSubtopologyId, 4, 5, 6)
));
assertEquals(expectedAssignment, result.targetAssignment());
}
@ParameterizedTest
@EnumSource(TaskRole.class)
public void testNewMember(TaskRole taskRole) {
TargetAssignmentBuilderTestContext context = new TargetAssignmentBuilderTestContext(
"my-group",
20
);
String fooSubtopologyId = context.addSubtopologyWithSingleSourceTopic("foo", 6, Map.of());
String barSubtopologyId = context.addSubtopologyWithSingleSourceTopic("bar", 6, Map.of());
context.addGroupMember("member-1", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 1, 2, 3),
mkTasks(barSubtopologyId, 1, 2, 3)
));
context.addGroupMember("member-2", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 4, 5, 6),
mkTasks(barSubtopologyId, 4, 5, 6)
));
context.updateMemberMetadata("member-3");
context.prepareMemberAssignment("member-1", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 1, 2),
mkTasks(barSubtopologyId, 1, 2)
));
context.prepareMemberAssignment("member-2", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 3, 4),
mkTasks(barSubtopologyId, 3, 4)
));
context.prepareMemberAssignment("member-3", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 5, 6),
mkTasks(barSubtopologyId, 5, 6)
));
org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.TargetAssignmentResult result = context.build();
assertEquals(4, result.records().size());
assertUnorderedRecordsEquals(List.of(List.of(
newStreamsGroupTargetAssignmentRecord("my-group", "member-1", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 1, 2),
mkTasks(barSubtopologyId, 1, 2)
)),
newStreamsGroupTargetAssignmentRecord("my-group", "member-2", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 3, 4),
mkTasks(barSubtopologyId, 3, 4)
)),
newStreamsGroupTargetAssignmentRecord("my-group", "member-3", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 5, 6),
mkTasks(barSubtopologyId, 5, 6)
))
)), result.records().subList(0, 3));
assertEquals(newStreamsGroupTargetAssignmentEpochRecord(
"my-group",
20
), result.records().get(3));
Map<String, TasksTuple> expectedAssignment = new HashMap<>();
expectedAssignment.put("member-1", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 1, 2),
mkTasks(barSubtopologyId, 1, 2)
));
expectedAssignment.put("member-2", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 3, 4),
mkTasks(barSubtopologyId, 3, 4)
));
expectedAssignment.put("member-3", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 5, 6),
mkTasks(barSubtopologyId, 5, 6)
));
assertEquals(expectedAssignment, result.targetAssignment());
}
@ParameterizedTest
@EnumSource(TaskRole.class)
public void testUpdateMember(TaskRole taskRole) {
TargetAssignmentBuilderTestContext context = new TargetAssignmentBuilderTestContext(
"my-group",
20
);
String fooSubtopologyId = context.addSubtopologyWithSingleSourceTopic("foo", 6, Map.of());
String barSubtopologyId = context.addSubtopologyWithSingleSourceTopic("bar", 6, Map.of());
context.addGroupMember("member-1", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 1, 2, 3),
mkTasks(barSubtopologyId, 1, 2)
));
context.addGroupMember("member-2", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 4, 5, 6),
mkTasks(barSubtopologyId, 3, 4)
));
context.addGroupMember("member-3", mkTasksTuple(taskRole,
mkTasks(barSubtopologyId, 5, 6)
));
context.updateMemberMetadata(
"member-3",
Optional.of("instance-id-3"),
Optional.of("rack-0")
);
context.prepareMemberAssignment("member-1", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 1, 2),
mkTasks(barSubtopologyId, 1, 2)
));
context.prepareMemberAssignment("member-2", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 3, 4),
mkTasks(barSubtopologyId, 3, 4)
));
context.prepareMemberAssignment("member-3", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 5, 6),
mkTasks(barSubtopologyId, 5, 6)
));
org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.TargetAssignmentResult result = context.build();
assertEquals(4, result.records().size());
assertUnorderedRecordsEquals(List.of(List.of(
newStreamsGroupTargetAssignmentRecord("my-group", "member-1", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 1, 2),
mkTasks(barSubtopologyId, 1, 2)
)),
newStreamsGroupTargetAssignmentRecord("my-group", "member-2", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 3, 4),
mkTasks(barSubtopologyId, 3, 4)
)),
newStreamsGroupTargetAssignmentRecord("my-group", "member-3", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 5, 6),
mkTasks(barSubtopologyId, 5, 6)
))
)), result.records().subList(0, 3));
assertEquals(newStreamsGroupTargetAssignmentEpochRecord(
"my-group",
20
), result.records().get(3));
Map<String, TasksTuple> expectedAssignment = new HashMap<>();
expectedAssignment.put("member-1", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 1, 2),
mkTasks(barSubtopologyId, 1, 2)
));
expectedAssignment.put("member-2", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 3, 4),
mkTasks(barSubtopologyId, 3, 4)
));
expectedAssignment.put("member-3", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 5, 6),
mkTasks(barSubtopologyId, 5, 6)
));
assertEquals(expectedAssignment, result.targetAssignment());
}
@ParameterizedTest
@EnumSource(TaskRole.class)
public void testPartialAssignmentUpdate(TaskRole taskRole) {
TargetAssignmentBuilderTestContext context = new TargetAssignmentBuilderTestContext(
"my-group",
20
);
String fooSubtopologyId = context.addSubtopologyWithSingleSourceTopic("foo", 6, mkMapOfPartitionRacks(6));
String barSubtopologyId = context.addSubtopologyWithSingleSourceTopic("bar", 6, mkMapOfPartitionRacks(6));
context.addGroupMember("member-1", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 1, 2),
mkTasks(barSubtopologyId, 1, 2)
));
context.addGroupMember("member-2", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 3, 4),
mkTasks(barSubtopologyId, 3, 4)
));
context.addGroupMember("member-3", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 5, 6),
mkTasks(barSubtopologyId, 5, 6)
));
context.prepareMemberAssignment("member-1", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 1, 2),
mkTasks(barSubtopologyId, 1, 2)
));
context.prepareMemberAssignment("member-2", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 3, 4, 5),
mkTasks(barSubtopologyId, 3, 4, 5)
));
context.prepareMemberAssignment("member-3", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 6),
mkTasks(barSubtopologyId, 6)
));
org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.TargetAssignmentResult result = context.build();
assertEquals(3, result.records().size());
// Member 1 has no record because its assignment did not change.
assertUnorderedRecordsEquals(List.of(List.of(
newStreamsGroupTargetAssignmentRecord("my-group", "member-2", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 3, 4, 5),
mkTasks(barSubtopologyId, 3, 4, 5)
)),
newStreamsGroupTargetAssignmentRecord("my-group", "member-3", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 6),
mkTasks(barSubtopologyId, 6)
))
)), result.records().subList(0, 2));
assertEquals(newStreamsGroupTargetAssignmentEpochRecord(
"my-group",
20
), result.records().get(2));
Map<String, TasksTuple> expectedAssignment = new HashMap<>();
expectedAssignment.put("member-1", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 1, 2),
mkTasks(barSubtopologyId, 1, 2)
));
expectedAssignment.put("member-2", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 3, 4, 5),
mkTasks(barSubtopologyId, 3, 4, 5)
));
expectedAssignment.put("member-3", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 6),
mkTasks(barSubtopologyId, 6)
));
assertEquals(expectedAssignment, result.targetAssignment());
}
@ParameterizedTest
@EnumSource(TaskRole.class)
public void testDeleteMember(TaskRole taskRole) {
TargetAssignmentBuilderTestContext context = new TargetAssignmentBuilderTestContext(
"my-group",
20
);
String fooSubtopologyId = context.addSubtopologyWithSingleSourceTopic("foo", 6, Map.of());
String barSubtopologyId = context.addSubtopologyWithSingleSourceTopic("bar", 6, Map.of());
context.addGroupMember("member-1", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 1, 2),
mkTasks(barSubtopologyId, 1, 2)
));
context.addGroupMember("member-2", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 3, 4),
mkTasks(barSubtopologyId, 3, 4)
));
context.addGroupMember("member-3", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 5, 6),
mkTasks(barSubtopologyId, 5, 6)
));
context.removeMember("member-3");
context.prepareMemberAssignment("member-1", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 1, 2, 3),
mkTasks(barSubtopologyId, 1, 2, 3)
));
context.prepareMemberAssignment("member-2", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 4, 5, 6),
mkTasks(barSubtopologyId, 4, 5, 6)
));
org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.TargetAssignmentResult result = context.build();
assertEquals(3, result.records().size());
assertUnorderedRecordsEquals(List.of(List.of(
newStreamsGroupTargetAssignmentRecord("my-group", "member-1", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 1, 2, 3),
mkTasks(barSubtopologyId, 1, 2, 3)
)),
newStreamsGroupTargetAssignmentRecord("my-group", "member-2", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 4, 5, 6),
mkTasks(barSubtopologyId, 4, 5, 6)
))
)), result.records().subList(0, 2));
assertEquals(newStreamsGroupTargetAssignmentEpochRecord(
"my-group",
20
), result.records().get(2));
Map<String, TasksTuple> expectedAssignment = new HashMap<>();
expectedAssignment.put("member-1", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 1, 2, 3),
mkTasks(barSubtopologyId, 1, 2, 3)
));
expectedAssignment.put("member-2", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 4, 5, 6),
mkTasks(barSubtopologyId, 4, 5, 6)
));
assertEquals(expectedAssignment, result.targetAssignment());
}
@ParameterizedTest
@EnumSource(TaskRole.class)
public void testReplaceStaticMember(TaskRole taskRole) {
TargetAssignmentBuilderTestContext context = new TargetAssignmentBuilderTestContext(
"my-group",
20
);
String fooSubtopologyId = context.addSubtopologyWithSingleSourceTopic("foo", 6, Map.of());
String barSubtopologyId = context.addSubtopologyWithSingleSourceTopic("bar", 6, Map.of());
context.addGroupMember("member-1", "instance-member-1", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 1, 2),
mkTasks(barSubtopologyId, 1, 2)
));
context.addGroupMember("member-2", "instance-member-2", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 3, 4),
mkTasks(barSubtopologyId, 3, 4)
));
context.addGroupMember("member-3", "instance-member-3", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 5, 6),
mkTasks(barSubtopologyId, 5, 6)
));
// Static member 3 leaves
context.removeMember("member-3");
// Another static member joins with the same instance id as the departed one
context.updateMemberMetadata("member-3-a", Optional.of("instance-member-3"),
Optional.empty());
context.prepareMemberAssignment("member-1", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 1, 2),
mkTasks(barSubtopologyId, 1, 2)
));
context.prepareMemberAssignment("member-2", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 3, 4),
mkTasks(barSubtopologyId, 3, 4)
));
context.prepareMemberAssignment("member-3-a", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 5, 6),
mkTasks(barSubtopologyId, 5, 6)
));
TargetAssignmentBuilder.TargetAssignmentResult result = context.build();
assertEquals(2, result.records().size());
assertUnorderedRecordsEquals(List.of(List.of(
newStreamsGroupTargetAssignmentRecord("my-group", "member-3-a", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 5, 6),
mkTasks(barSubtopologyId, 5, 6)
))
)), result.records().subList(0, 1));
assertEquals(newStreamsGroupTargetAssignmentEpochRecord(
"my-group",
20
), result.records().get(1));
Map<String, TasksTuple> expectedAssignment = new HashMap<>();
expectedAssignment.put("member-1", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 1, 2),
mkTasks(barSubtopologyId, 1, 2)
));
expectedAssignment.put("member-2", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 3, 4),
mkTasks(barSubtopologyId, 3, 4)
));
expectedAssignment.put("member-3-a", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 5, 6),
mkTasks(barSubtopologyId, 5, 6)
));
assertEquals(expectedAssignment, result.targetAssignment());
}
public static class TargetAssignmentBuilderTestContext {
private final String groupId;
private final int groupEpoch;
private final TaskAssignor assignor = mock(TaskAssignor.class);
private final SortedMap<String, ConfiguredSubtopology> subtopologies = new TreeMap<>();
private final ConfiguredTopology topology = new ConfiguredTopology(0, Optional.of(subtopologies), new HashMap<>(),
Optional.empty());
private final Map<String, StreamsGroupMember> members = new HashMap<>();
private final Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata> subscriptionMetadata = new HashMap<>();
private final Map<String, StreamsGroupMember> updatedMembers = new HashMap<>();
private final Map<String, TasksTuple> targetAssignment = new HashMap<>();
private final Map<String, MemberAssignment> memberAssignments = new HashMap<>();
private final Map<String, String> staticMembers = new HashMap<>();
private MetadataImageBuilder topicsImageBuilder = new MetadataImageBuilder();
public TargetAssignmentBuilderTestContext(
String groupId,
int groupEpoch
) {
this.groupId = groupId;
this.groupEpoch = groupEpoch;
}
public void addGroupMember(
String memberId,
TasksTuple targetTasks
) {
addGroupMember(memberId, null, targetTasks);
}
private void addGroupMember(
String memberId,
String instanceId,
TasksTuple targetTasks
) {
StreamsGroupMember.Builder memberBuilder = new StreamsGroupMember.Builder(memberId);
memberBuilder.setProcessId("processId");
memberBuilder.setClientTags(Map.of());
memberBuilder.setUserEndpoint(new StreamsGroupMemberMetadataValue.Endpoint().setHost("host").setPort(9090));
if (instanceId != null) {
memberBuilder.setInstanceId(instanceId);
staticMembers.put(instanceId, memberId);
} else {
memberBuilder.setInstanceId(null);
}
memberBuilder.setRackId(null);
members.put(memberId, memberBuilder.build());
targetAssignment.put(memberId, targetTasks);
}
public String addSubtopologyWithSingleSourceTopic(
String topicName,
int numTasks,
Map<Integer, Set<String>> partitionRacks
) {
String subtopologyId = Uuid.randomUuid().toString();
Uuid topicId = Uuid.randomUuid();
subscriptionMetadata.put(topicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(
topicId,
topicName,
numTasks,
partitionRacks
));
topicsImageBuilder = topicsImageBuilder.addTopic(topicId, topicName, numTasks);
subtopologies.put(subtopologyId, new ConfiguredSubtopology(Set.of(topicId.toString()), Map.of(), Set.of(), Map.of()));
return subtopologyId;
}
public void updateMemberMetadata(
String memberId
) {
updateMemberMetadata(
memberId,
Optional.empty(),
Optional.empty()
);
}
public void updateMemberMetadata(
String memberId,
Optional<String> instanceId,
Optional<String> rackId
) {
StreamsGroupMember existingMember = members.get(memberId);
StreamsGroupMember.Builder builder;
if (existingMember != null) {
builder = new StreamsGroupMember.Builder(existingMember);
} else {
builder = new StreamsGroupMember.Builder(memberId);
builder.setProcessId("processId");
builder.setRackId(null);
builder.setInstanceId(null);
builder.setClientTags(Map.of());
builder.setUserEndpoint(new StreamsGroupMemberMetadataValue.Endpoint().setHost("host").setPort(9090));
}
updatedMembers.put(memberId, builder
.maybeUpdateInstanceId(instanceId)
.maybeUpdateRackId(rackId)
.build());
}
public void removeMember(
String memberId
) {
this.updatedMembers.put(memberId, null);
}
public void prepareMemberAssignment(
String memberId,
TasksTuple assignment
) {
memberAssignments.put(memberId, new MemberAssignment(assignment.activeTasks(), assignment.standbyTasks(), assignment.warmupTasks()));
}
public org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.TargetAssignmentResult build() {
// Prepare expected member specs.
Map<String, AssignmentMemberSpec> memberSpecs = new HashMap<>();
// All the existing members are prepared.
members.forEach((memberId, member) ->
memberSpecs.put(memberId, createAssignmentMemberSpec(
member,
targetAssignment.getOrDefault(memberId, TasksTuple.EMPTY)
)
));
// All the updated are added and all the deleted
// members are removed.
updatedMembers.forEach((memberId, updatedMemberOrNull) -> {
if (updatedMemberOrNull == null) {
memberSpecs.remove(memberId);
} else {
TasksTuple assignment = targetAssignment.getOrDefault(memberId,
TasksTuple.EMPTY);
// A new static member joins and needs to replace an existing departed one.
if (updatedMemberOrNull.instanceId().isPresent()) {
String previousMemberId = staticMembers.get(updatedMemberOrNull.instanceId().get());
if (previousMemberId != null && !previousMemberId.equals(memberId)) {
assignment = targetAssignment.getOrDefault(previousMemberId,
TasksTuple.EMPTY);
}
}
memberSpecs.put(memberId, createAssignmentMemberSpec(
updatedMemberOrNull,
assignment
));
}
});
// Prepare the expected topology metadata.
TopologyMetadata topologyMetadata = new TopologyMetadata(subscriptionMetadata, subtopologies);
// Prepare the expected assignment spec.
GroupSpecImpl groupSpec = new GroupSpecImpl(memberSpecs, new HashMap<>());
// We use `any` here to always return an assignment but use `verify` later on
// to ensure that the input was correct.
when(assignor.assign(any(), any()))
.thenReturn(new GroupAssignment(memberAssignments));
// Create and populate the assignment builder.
org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder builder = new org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder(
groupId, groupEpoch, assignor, Map.of())
.withMembers(members)
.withTopology(topology)
.withStaticMembers(staticMembers)
.withPartitionMetadata(subscriptionMetadata)
.withTargetAssignment(targetAssignment);
// Add the updated members or delete the deleted members.
updatedMembers.forEach((memberId, updatedMemberOrNull) -> {
if (updatedMemberOrNull != null) {
builder.addOrUpdateMember(memberId, updatedMemberOrNull);
} else {
builder.removeMember(memberId);
}
});
// Execute the builder.
org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.TargetAssignmentResult result = builder.build();
// Verify that the assignor was called once with the expected
// assignment spec.
verify(assignor, times(1))
.assign(groupSpec, topologyMetadata);
return result;
}
}
}

View File

@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.streams;
import org.apache.kafka.coordinator.group.streams.topics.ConfiguredInternalTopic;
import org.apache.kafka.coordinator.group.streams.topics.ConfiguredSubtopology;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
class TopologyMetadataTest {
private Map<String, TopicMetadata> topicMetadata;
private SortedMap<String, ConfiguredSubtopology> subtopologyMap;
private TopologyMetadata topologyMetadata;
@BeforeEach
void setUp() {
topicMetadata = new HashMap<>();
subtopologyMap = new TreeMap<>();
topologyMetadata = new TopologyMetadata(topicMetadata, subtopologyMap);
}
@Test
void testTopicMetadata() {
assertEquals(topicMetadata, topologyMetadata.topicMetadata());
}
@Test
void testTopology() {
assertEquals(subtopologyMap, topologyMetadata.subtopologyMap());
}
@Test
void testIsStateful() {
ConfiguredInternalTopic internalTopic = mock(ConfiguredInternalTopic.class);
ConfiguredSubtopology subtopology1 = mock(ConfiguredSubtopology.class);
ConfiguredSubtopology subtopology2 = mock(ConfiguredSubtopology.class);
subtopologyMap.put("subtopology1", subtopology1);
subtopologyMap.put("subtopology2", subtopology2);
when(subtopology1.stateChangelogTopics()).thenReturn(Map.of("state_changelog_topic", internalTopic));
when(subtopology2.stateChangelogTopics()).thenReturn(Map.of());
assertTrue(topologyMetadata.isStateful("subtopology1"));
assertFalse(topologyMetadata.isStateful("subtopology2"));
}
@Test
void testMaxNumInputPartitions() {
ConfiguredInternalTopic internalTopic = mock(ConfiguredInternalTopic.class);
ConfiguredSubtopology subtopology = mock(ConfiguredSubtopology.class);
subtopologyMap.put("subtopology1", subtopology);
when(subtopology.sourceTopics()).thenReturn(Set.of("source_topic"));
when(subtopology.repartitionSourceTopics()).thenReturn(Map.of("repartition_source_topic", internalTopic));
TopicMetadata topicMeta1 = mock(TopicMetadata.class);
TopicMetadata topicMeta2 = mock(TopicMetadata.class);
topicMetadata.put("source_topic", topicMeta1);
topicMetadata.put("repartition_source_topic", topicMeta2);
when(topicMeta1.numPartitions()).thenReturn(3);
when(topicMeta2.numPartitions()).thenReturn(4);
assertEquals(4, topologyMetadata.maxNumInputPartitions("subtopology1"));
}
@Test
void testSubtopologies() {
ConfiguredSubtopology subtopology1 = mock(ConfiguredSubtopology.class);
ConfiguredSubtopology subtopology2 = mock(ConfiguredSubtopology.class);
subtopologyMap.put("subtopology1", subtopology1);
subtopologyMap.put("subtopology2", subtopology2);
List<String> expectedSubtopologies = List.of("subtopology1", "subtopology2");
assertEquals(expectedSubtopologies, topologyMetadata.subtopologies());
}
@Test
void testIsStatefulThrowsExceptionWhenSubtopologyIdDoesNotExist() {
assertThrows(NoSuchElementException.class, () -> topologyMetadata.isStateful("non_existent_subtopology"));
}
@Test
void testMaxNumInputPartitionsThrowsExceptionWhenSubtopologyIdDoesNotExist() {
assertThrows(NoSuchElementException.class, () -> topologyMetadata.maxNumInputPartitions("non_existent_subtopology"));
}
@Test
void testMaxNumInputPartitionsThrowsExceptionWhenSubtopologyContainsNoSourceTopics() {
ConfiguredSubtopology subtopology = mock(ConfiguredSubtopology.class);
when(subtopology.sourceTopics()).thenReturn(Set.of());
when(subtopology.repartitionSourceTopics()).thenReturn(Map.of());
subtopologyMap.put("subtopology1", subtopology);
assertThrows(IllegalStateException.class, () -> topologyMetadata.maxNumInputPartitions("subtopology1"));
}
}

View File

@ -253,7 +253,7 @@ public class MockAssignorTest {
}
@Override
public int numTasks(String subtopologyId) {
public int maxNumInputPartitions(String subtopologyId) {
return numPartitions;
}

View File

@ -1094,7 +1094,7 @@ public class StickyTaskAssignorTest {
}
@Override
public int numTasks(String subtopologyId) throws NoSuchElementException {
public int maxNumInputPartitions(String subtopologyId) throws NoSuchElementException {
return numTasks;
}
@ -1112,7 +1112,7 @@ public class StickyTaskAssignorTest {
}
@Override
public int numTasks(String subtopologyId) throws NoSuchElementException {
public int maxNumInputPartitions(String subtopologyId) throws NoSuchElementException {
if (subtopologyId.equals("test-subtopology1"))
return 6;
return 1;

View File

@ -26,6 +26,8 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.SortedMap;
import java.util.TreeMap;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@ -53,7 +55,7 @@ public class ConfiguredTopologyTest {
assertThrows(NullPointerException.class,
() -> new ConfiguredTopology(
0,
Optional.of(Map.of()),
Optional.of(new TreeMap<>()),
null,
Optional.empty()
)
@ -77,7 +79,7 @@ public class ConfiguredTopologyTest {
assertThrows(IllegalArgumentException.class,
() -> new ConfiguredTopology(
-1,
Optional.of(Map.of()),
Optional.of(new TreeMap<>()),
Collections.emptyMap(),
Optional.empty()
)
@ -100,7 +102,7 @@ public class ConfiguredTopologyTest {
@Test
public void testIsReady() {
ConfiguredTopology readyTopology = new ConfiguredTopology(
1, Optional.of(Map.of()), new HashMap<>(), Optional.empty());
1, Optional.of(new TreeMap<>()), new HashMap<>(), Optional.empty());
assertTrue(readyTopology.isReady());
ConfiguredTopology notReadyTopology = new ConfiguredTopology(
@ -114,7 +116,7 @@ public class ConfiguredTopologyTest {
ConfiguredSubtopology subtopologyMock = mock(ConfiguredSubtopology.class);
StreamsGroupDescribeResponseData.Subtopology subtopologyResponse = new StreamsGroupDescribeResponseData.Subtopology();
when(subtopologyMock.asStreamsGroupDescribeSubtopology(Mockito.anyString())).thenReturn(subtopologyResponse);
Map<String, ConfiguredSubtopology> subtopologies = new HashMap<>();
SortedMap<String, ConfiguredSubtopology> subtopologies = new TreeMap<>();
subtopologies.put("subtopology1", subtopologyMock);
Map<String, CreatableTopic> internalTopicsToBeCreated = new HashMap<>();
Optional<TopicConfigurationException> topicConfigurationException = Optional.empty();