KAFKA-18757: Create full-function SimpleAssignor to match KIP-932 description (#18864)

### About
The current `SimpleAssignor` in AK assigned all subscribed topic
partitions to all the share group members. This does not match the
description given in
[KIP-932](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255070434#KIP932:QueuesforKafka-TheSimpleAssignor).
Here are the rules as mentioned in the KIP by which the assignment
should happen. We have changed the step 3 implementation here due to the
reasons
[described](https://github.com/apache/kafka/pull/18864#issuecomment-2659266502)
-

1. The assignor hashes the member IDs of the members and maps the
partitions assigned to the members based on the hash. This gives
approximately even balance.
2. If any partitions were not assigned any members by (1) and do not
have members already assigned in the current assignment, members are
assigned round-robin until each partition has at least one member
assigned to it.
3. We combine the current and new assignment. (Original rule - If any
partitions were assigned members by (1) and also have members in the
current assignment assigned by (2), the members assigned by (2) are
removed.)

### Tests
The added code has been verified with unit tests and the already present
integration tests.

Reviewers: Andrew Schofield <aschofield@confluent.io>, Apoorv Mittal <apoorvmittal10@gmail.com>, TaiJuWu <tjwu1217@gmail.com>
This commit is contained in:
Abhinav Dixit 2025-02-26 16:32:23 +05:30 committed by GitHub
parent f20f299492
commit 4b5a16bf6f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 712 additions and 71 deletions

View File

@ -28,6 +28,7 @@ import org.apache.kafka.common.test.ClusterInstance
import org.junit.jupiter.api.Assertions.{assertEquals, assertNotEquals, assertNotNull, assertNull, assertTrue} import org.junit.jupiter.api.Assertions.{assertEquals, assertNotEquals, assertNotNull, assertNull, assertTrue}
import org.junit.jupiter.api.{Tag, Timeout} import org.junit.jupiter.api.{Tag, Timeout}
import java.util
import scala.jdk.CollectionConverters._ import scala.jdk.CollectionConverters._
@Timeout(120) @Timeout(120)
@ -216,18 +217,12 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
assertNotEquals(memberId1, memberId2) assertNotEquals(memberId1, memberId2)
// Create the topic. // Create the topic.
val topicId = TestUtils.createTopicWithAdminRaw( TestUtils.createTopicWithAdminRaw(
admin = admin, admin = admin,
topic = "foo", topic = "foo",
numPartitions = 3 numPartitions = 3
) )
// This is the expected assignment.
val expectedAssignment = new ShareGroupHeartbeatResponseData.Assignment()
.setTopicPartitions(List(new ShareGroupHeartbeatResponseData.TopicPartitions()
.setTopicId(topicId)
.setPartitions(List[Integer](0, 1, 2).asJava)).asJava)
// Prepare the next heartbeat for member 1. // Prepare the next heartbeat for member 1.
shareGroupHeartbeatRequest = new ShareGroupHeartbeatRequest.Builder( shareGroupHeartbeatRequest = new ShareGroupHeartbeatRequest.Builder(
new ShareGroupHeartbeatRequestData() new ShareGroupHeartbeatRequestData()
@ -241,10 +236,10 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
shareGroupHeartbeatResponse = null shareGroupHeartbeatResponse = null
TestUtils.waitUntilTrue(() => { TestUtils.waitUntilTrue(() => {
shareGroupHeartbeatResponse = connectAndReceive(shareGroupHeartbeatRequest) shareGroupHeartbeatResponse = connectAndReceive(shareGroupHeartbeatRequest)
shareGroupHeartbeatResponse.data.errorCode == Errors.NONE.code && shareGroupHeartbeatResponse.data.errorCode == Errors.NONE.code && shareGroupHeartbeatResponse.data.assignment != null
shareGroupHeartbeatResponse.data.assignment == expectedAssignment
}, msg = s"Could not get partitions assigned. Last response $shareGroupHeartbeatResponse.") }, msg = s"Could not get partitions assigned. Last response $shareGroupHeartbeatResponse.")
val topicPartitionsAssignedToMember1 = shareGroupHeartbeatResponse.data.assignment.topicPartitions()
// Verify the response. // Verify the response.
assertEquals(3, shareGroupHeartbeatResponse.data.memberEpoch) assertEquals(3, shareGroupHeartbeatResponse.data.memberEpoch)
@ -261,13 +256,23 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
shareGroupHeartbeatResponse = null shareGroupHeartbeatResponse = null
TestUtils.waitUntilTrue(() => { TestUtils.waitUntilTrue(() => {
shareGroupHeartbeatResponse = connectAndReceive(shareGroupHeartbeatRequest) shareGroupHeartbeatResponse = connectAndReceive(shareGroupHeartbeatRequest)
shareGroupHeartbeatResponse.data.errorCode == Errors.NONE.code && shareGroupHeartbeatResponse.data.errorCode == Errors.NONE.code && shareGroupHeartbeatResponse.data.assignment != null
shareGroupHeartbeatResponse.data.assignment == expectedAssignment
}, msg = s"Could not get partitions assigned. Last response $shareGroupHeartbeatResponse.") }, msg = s"Could not get partitions assigned. Last response $shareGroupHeartbeatResponse.")
val topicPartitionsAssignedToMember2 = shareGroupHeartbeatResponse.data.assignment.topicPartitions()
// Verify the response. // Verify the response.
assertEquals(3, shareGroupHeartbeatResponse.data.memberEpoch) assertEquals(3, shareGroupHeartbeatResponse.data.memberEpoch)
val partitionsAssigned: util.Set[Integer] = new util.HashSet[Integer]()
topicPartitionsAssignedToMember1.forEach(topicPartition => {
partitionsAssigned.addAll(topicPartition.partitions())
})
topicPartitionsAssignedToMember2.forEach(topicPartition => {
partitionsAssigned.addAll(topicPartition.partitions())
})
// Verify all the 3 topic partitions for "foo" have been assigned to at least 1 member.
assertEquals(util.Set.of(0, 1, 2), partitionsAssigned)
// Verify the assignments are not changed for member 1. // Verify the assignments are not changed for member 1.
// Prepare another heartbeat for member 1 with latest received epoch 3 for member 1. // Prepare another heartbeat for member 1 with latest received epoch 3 for member 1.
shareGroupHeartbeatRequest = new ShareGroupHeartbeatRequest.Builder( shareGroupHeartbeatRequest = new ShareGroupHeartbeatRequest.Builder(

View File

@ -25,19 +25,23 @@ import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorExceptio
import org.apache.kafka.coordinator.group.api.assignor.ShareGroupPartitionAssignor; import org.apache.kafka.coordinator.group.api.assignor.ShareGroupPartitionAssignor;
import org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber; import org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber;
import org.apache.kafka.coordinator.group.modern.MemberAssignmentImpl; import org.apache.kafka.coordinator.group.modern.MemberAssignmentImpl;
import org.apache.kafka.server.common.TopicIdPartition;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS; import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS;
/** /**
* A simple partition assignor that assigns each member all partitions of the subscribed topics. * A simple partition assignor that assigns partitions of the subscribed topics based on the rules defined in KIP-932 to different members.
*/ */
public class SimpleAssignor implements ShareGroupPartitionAssignor { public class SimpleAssignor implements ShareGroupPartitionAssignor {
@ -54,7 +58,7 @@ public class SimpleAssignor implements ShareGroupPartitionAssignor {
SubscribedTopicDescriber subscribedTopicDescriber SubscribedTopicDescriber subscribedTopicDescriber
) throws PartitionAssignorException { ) throws PartitionAssignorException {
if (groupSpec.memberIds().isEmpty()) if (groupSpec.memberIds().isEmpty())
return new GroupAssignment(Collections.emptyMap()); return new GroupAssignment(Map.of());
if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) { if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) {
return assignHomogenous(groupSpec, subscribedTopicDescriber); return assignHomogenous(groupSpec, subscribedTopicDescriber);
@ -67,42 +71,240 @@ public class SimpleAssignor implements ShareGroupPartitionAssignor {
GroupSpec groupSpec, GroupSpec groupSpec,
SubscribedTopicDescriber subscribedTopicDescriber SubscribedTopicDescriber subscribedTopicDescriber
) { ) {
Set<Uuid> subscribeTopicIds = groupSpec.memberSubscription(groupSpec.memberIds().iterator().next()) Set<Uuid> subscribedTopicIds = groupSpec.memberSubscription(groupSpec.memberIds().iterator().next())
.subscribedTopicIds(); .subscribedTopicIds();
if (subscribeTopicIds.isEmpty()) if (subscribedTopicIds.isEmpty())
return new GroupAssignment(Collections.emptyMap()); return new GroupAssignment(Map.of());
Map<Uuid, Set<Integer>> targetPartitions = computeTargetPartitions( // Subscribed topic partitions for the share group.
subscribeTopicIds, subscribedTopicDescriber); List<TopicIdPartition> targetPartitions = computeTargetPartitions(
subscribedTopicIds, subscribedTopicDescriber);
return new GroupAssignment(groupSpec.memberIds().stream().collect(Collectors.toMap( // The current assignment from topic partition to members.
Function.identity(), memberId -> new MemberAssignmentImpl(targetPartitions)))); Map<TopicIdPartition, List<String>> currentAssignment = currentAssignment(groupSpec);
return newAssignmentHomogeneous(groupSpec, subscribedTopicIds, targetPartitions, currentAssignment);
} }
private GroupAssignment assignHeterogeneous( private GroupAssignment assignHeterogeneous(
GroupSpec groupSpec, GroupSpec groupSpec,
SubscribedTopicDescriber subscribedTopicDescriber SubscribedTopicDescriber subscribedTopicDescriber
) { ) {
Map<String, MemberAssignment> members = new HashMap<>(); Map<String, List<TopicIdPartition>> memberToPartitionsSubscription = new HashMap<>();
for (String memberId : groupSpec.memberIds()) { for (String memberId : groupSpec.memberIds()) {
MemberSubscription spec = groupSpec.memberSubscription(memberId); MemberSubscription spec = groupSpec.memberSubscription(memberId);
if (spec.subscribedTopicIds().isEmpty()) if (spec.subscribedTopicIds().isEmpty())
continue; continue;
Map<Uuid, Set<Integer>> targetPartitions = computeTargetPartitions( // Subscribed topic partitions for the share group member.
List<TopicIdPartition> targetPartitions = computeTargetPartitions(
spec.subscribedTopicIds(), subscribedTopicDescriber); spec.subscribedTopicIds(), subscribedTopicDescriber);
memberToPartitionsSubscription.put(memberId, targetPartitions);
members.put(memberId, new MemberAssignmentImpl(targetPartitions));
} }
// The current assignment from topic partition to members.
Map<TopicIdPartition, List<String>> currentAssignment = currentAssignment(groupSpec);
return newAssignmentHeterogeneous(groupSpec, memberToPartitionsSubscription, currentAssignment);
}
/**
* Get the current assignment by topic partitions.
* @param groupSpec - The group metadata specifications.
* @return the current assignment for subscribed topic partitions to memberIds.
*/
private Map<TopicIdPartition, List<String>> currentAssignment(GroupSpec groupSpec) {
Map<TopicIdPartition, List<String>> assignment = new HashMap<>();
for (String member : groupSpec.memberIds()) {
Map<Uuid, Set<Integer>> assignedTopicPartitions = groupSpec.memberAssignment(member).partitions();
assignedTopicPartitions.forEach((topicId, partitions) -> partitions.forEach(
partition -> assignment.computeIfAbsent(new TopicIdPartition(topicId, partition), k -> new ArrayList<>()).add(member)));
}
return assignment;
}
/**
* This function computes the new assignment for a homogeneous group.
* @param groupSpec - The group metadata specifications.
* @param subscribedTopicIds - The set of all the subscribed topic ids for the group.
* @param targetPartitions - The list of all topic partitions that need assignment.
* @param currentAssignment - The current assignment for subscribed topic partitions to memberIds.
* @return the new partition assignment for the members of the group.
*/
private GroupAssignment newAssignmentHomogeneous(
GroupSpec groupSpec,
Set<Uuid> subscribedTopicIds,
List<TopicIdPartition> targetPartitions,
Map<TopicIdPartition, List<String>> currentAssignment
) {
Map<TopicIdPartition, List<String>> newAssignment = new HashMap<>();
// Step 1: Hash member IDs to topic partitions.
memberHashAssignment(targetPartitions, groupSpec.memberIds(), newAssignment);
// Step 2: Round-robin assignment for unassigned partitions which do not have members already assigned in the current assignment.
List<TopicIdPartition> unassignedPartitions = targetPartitions.stream()
.filter(targetPartition -> !newAssignment.containsKey(targetPartition))
.filter(targetPartition -> !currentAssignment.containsKey(targetPartition))
.toList();
roundRobinAssignment(groupSpec.memberIds(), unassignedPartitions, newAssignment);
// Step 3: We combine current assignment and new assignment.
Map<String, Set<TopicIdPartition>> finalAssignment = new HashMap<>();
// As per the KIP, we should revoke the assignments from current assignment for partitions that were assigned by step 1
// in the new assignment and have members in current assignment by step 2. But we haven't implemented it to avoid the
// complexity in both the implementation and the run time complexity. This step was mentioned in the KIP to reduce
// the burden of certain members of the share groups. This can be achieved with the help of limiting the max
// no. of partitions assignment for every member(KAFKA-18788). Hence, the potential problem of burdening
// the share consumers will be addressed in a future PR.
newAssignment.forEach((targetPartition, members) -> members.forEach(member ->
finalAssignment.computeIfAbsent(member, k -> new HashSet<>()).add(targetPartition)));
// When combining current assignment, we need to only consider the topics in current assignment that are also being
// subscribed in the new assignment as well.
currentAssignment.forEach((targetPartition, members) -> {
if (subscribedTopicIds.contains(targetPartition.topicId()))
members.forEach(member -> {
if (groupSpec.memberIds().contains(member) && !newAssignment.containsKey(targetPartition))
finalAssignment.computeIfAbsent(member, k -> new HashSet<>()).add(targetPartition);
});
});
return groupAssignment(finalAssignment, groupSpec.memberIds());
}
/**
* This function computes the new assignment for a heterogeneous group.
* @param groupSpec - The group metadata specifications.
* @param memberToPartitionsSubscription - The member to subscribed topic partitions map.
* @param currentAssignment - The current assignment for subscribed topic partitions to memberIds.
* @return the new partition assignment for the members of the group.
*/
private GroupAssignment newAssignmentHeterogeneous(
GroupSpec groupSpec,
Map<String, List<TopicIdPartition>> memberToPartitionsSubscription,
Map<TopicIdPartition, List<String>> currentAssignment
) {
// Exhaustive set of all subscribed topic partitions.
Set<TopicIdPartition> targetPartitions = new LinkedHashSet<>();
memberToPartitionsSubscription.values().forEach(targetPartitions::addAll);
// Create a map for topic to members subscription.
Map<Uuid, Set<String>> topicToMemberSubscription = new HashMap<>();
memberToPartitionsSubscription.forEach((member, partitions) ->
partitions.forEach(partition -> topicToMemberSubscription.computeIfAbsent(partition.topicId(), k -> new LinkedHashSet<>()).add(member)));
Map<TopicIdPartition, List<String>> newAssignment = new HashMap<>();
// Step 1: Hash member IDs to partitions.
memberToPartitionsSubscription.forEach((member, partitions) ->
memberHashAssignment(partitions, List.of(member), newAssignment));
// Step 2: Round-robin assignment for unassigned partitions which do not have members already assigned in the current assignment.
Set<TopicIdPartition> assignedPartitions = new LinkedHashSet<>(newAssignment.keySet());
Map<Uuid, List<TopicIdPartition>> unassignedPartitions = new HashMap<>();
targetPartitions.forEach(targetPartition -> {
if (!assignedPartitions.contains(targetPartition) && !currentAssignment.containsKey(targetPartition))
unassignedPartitions.computeIfAbsent(targetPartition.topicId(), k -> new ArrayList<>()).add(targetPartition);
});
unassignedPartitions.keySet().forEach(unassignedTopic ->
roundRobinAssignment(topicToMemberSubscription.get(unassignedTopic), unassignedPartitions.get(unassignedTopic), newAssignment));
// Step 3: We combine current assignment and new assignment.
Map<String, Set<TopicIdPartition>> finalAssignment = new HashMap<>();
// As per the KIP, we should revoke the assignments from current assignment for partitions that were assigned by step 1
// in the new assignment and have members in current assignment by step 2. But we haven't implemented it to avoid the
// complexity in both the implementation and the run time complexity. This step was mentioned in the KIP to reduce
// the burden of certain members of the share groups. This can be achieved with the help of limiting the max
// no. of partitions assignment for every member(KAFKA-18788). Hence, the potential problem of burdening
// the share consumers will be addressed in a future PR.
newAssignment.forEach((targetPartition, members) -> members.forEach(member ->
finalAssignment.computeIfAbsent(member, k -> new HashSet<>()).add(targetPartition)));
// When combining current assignment, we need to only consider the member topic subscription in current assignment
// which is being subscribed in the new assignment as well.
currentAssignment.forEach((topicIdPartition, members) -> members.forEach(member -> {
if (topicToMemberSubscription.getOrDefault(topicIdPartition.topicId(), Collections.emptySet()).contains(member)
&& !newAssignment.containsKey(topicIdPartition))
finalAssignment.computeIfAbsent(member, k -> new HashSet<>()).add(topicIdPartition);
}));
return groupAssignment(finalAssignment, groupSpec.memberIds());
}
private GroupAssignment groupAssignment(
Map<String, Set<TopicIdPartition>> assignmentByMember,
Collection<String> allGroupMembers
) {
Map<String, MemberAssignment> members = new HashMap<>();
for (Map.Entry<String, Set<TopicIdPartition>> entry : assignmentByMember.entrySet()) {
Map<Uuid, Set<Integer>> targetPartitions = new HashMap<>();
entry.getValue().forEach(targetPartition -> targetPartitions.computeIfAbsent(targetPartition.topicId(), k -> new HashSet<>()).add(targetPartition.partitionId()));
members.put(entry.getKey(), new MemberAssignmentImpl(targetPartitions));
}
allGroupMembers.forEach(member -> {
if (!members.containsKey(member))
members.put(member, new MemberAssignmentImpl(new HashMap<>()));
});
return new GroupAssignment(members); return new GroupAssignment(members);
} }
private Map<Uuid, Set<Integer>> computeTargetPartitions( /**
Set<Uuid> subscribeTopicIds, * This function updates assignment by hashing the member IDs of the members and maps the partitions assigned to the
* members based on the hash. This gives approximately even balance.
* @param unassignedPartitions - the subscribed topic partitions which needs assignment.
* @param memberIds - the member ids to which the topic partitions need to be assigned.
* @param assignment - the existing assignment by topic partition. We need to pass it as a parameter because this
* function would be called multiple times for heterogeneous assignment.
*/
// Visible for testing
void memberHashAssignment(
List<TopicIdPartition> unassignedPartitions,
Collection<String> memberIds,
Map<TopicIdPartition, List<String>> assignment
) {
if (!unassignedPartitions.isEmpty())
for (String memberId : memberIds) {
int topicPartitionIndex = Math.abs(memberId.hashCode() % unassignedPartitions.size());
TopicIdPartition topicPartition = unassignedPartitions.get(topicPartitionIndex);
assignment.computeIfAbsent(topicPartition, k -> new ArrayList<>()).add(memberId);
}
}
/**
* This functions assigns topic partitions to members by round-robin approach and updates the existing assignment.
* @param memberIds - the member ids to which the topic partitions need to be assigned, should be non-empty.
* @param unassignedPartitions - the subscribed topic partitions which needs assignment.
* @param assignment - the existing assignment by topic partition.
*/
// Visible for testing
void roundRobinAssignment(
Collection<String> memberIds,
List<TopicIdPartition> unassignedPartitions,
Map<TopicIdPartition, List<String>> assignment
) {
// We iterate through the target partitions and assign a memberId to them. In case we run out of members (members < targetPartitions),
// we again start from the starting index of memberIds.
Iterator<String> memberIdIterator = memberIds.iterator();
for (TopicIdPartition targetPartition : unassignedPartitions) {
if (!memberIdIterator.hasNext()) {
memberIdIterator = memberIds.iterator();
}
String memberId = memberIdIterator.next();
assignment.computeIfAbsent(targetPartition, k -> new ArrayList<>()).add(memberId);
}
}
private List<TopicIdPartition> computeTargetPartitions(
Set<Uuid> subscribedTopicIds,
SubscribedTopicDescriber subscribedTopicDescriber SubscribedTopicDescriber subscribedTopicDescriber
) { ) {
Map<Uuid, Set<Integer>> targetPartitions = new HashMap<>(); List<TopicIdPartition> targetPartitions = new ArrayList<>();
subscribeTopicIds.forEach(topicId -> { subscribedTopicIds.forEach(topicId -> {
int numPartitions = subscribedTopicDescriber.numPartitions(topicId); int numPartitions = subscribedTopicDescriber.numPartitions(topicId);
if (numPartitions == -1) { if (numPartitions == -1) {
throw new PartitionAssignorException( throw new PartitionAssignorException(
@ -111,11 +313,9 @@ public class SimpleAssignor implements ShareGroupPartitionAssignor {
); );
} }
Set<Integer> partitions = new HashSet<>();
for (int i = 0; i < numPartitions; i++) { for (int i = 0; i < numPartitions; i++) {
partitions.add(i); targetPartitions.add(new TopicIdPartition(topicId, i));
} }
targetPartitions.put(topicId, partitions);
}); });
return targetPartitions; return targetPartitions;
} }

View File

@ -19,21 +19,24 @@ package org.apache.kafka.coordinator.group.assignor;
import org.apache.kafka.common.Uuid; import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment; import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.api.assignor.GroupSpec; import org.apache.kafka.coordinator.group.api.assignor.GroupSpec;
import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException; import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException;
import org.apache.kafka.coordinator.group.modern.Assignment; import org.apache.kafka.coordinator.group.modern.Assignment;
import org.apache.kafka.coordinator.group.modern.GroupSpecImpl; import org.apache.kafka.coordinator.group.modern.GroupSpecImpl;
import org.apache.kafka.coordinator.group.modern.MemberSubscriptionAndAssignmentImpl; import org.apache.kafka.coordinator.group.modern.MemberSubscriptionAndAssignmentImpl;
import org.apache.kafka.coordinator.group.modern.SubscribedTopicDescriberImpl; import org.apache.kafka.coordinator.group.modern.SubscribedTopicDescriberImpl;
import org.apache.kafka.coordinator.group.modern.TopicMetadata; import org.apache.kafka.coordinator.group.modern.TopicMetadata;
import org.apache.kafka.server.common.TopicIdPartition;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.TreeMap;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment; import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
@ -41,16 +44,21 @@ import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.H
import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS; import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class SimpleAssignorTest { public class SimpleAssignorTest {
private static final Uuid TOPIC_1_UUID = Uuid.randomUuid(); private static final Uuid TOPIC_1_UUID = Uuid.randomUuid();
private static final Uuid TOPIC_2_UUID = Uuid.randomUuid(); private static final Uuid TOPIC_2_UUID = Uuid.randomUuid();
private static final Uuid TOPIC_3_UUID = Uuid.randomUuid(); private static final Uuid TOPIC_3_UUID = Uuid.randomUuid();
private static final Uuid TOPIC_4_UUID = Uuid.randomUuid();
private static final String TOPIC_1_NAME = "topic1"; private static final String TOPIC_1_NAME = "topic1";
private static final String TOPIC_2_NAME = "topic2";
private static final String TOPIC_3_NAME = "topic3"; private static final String TOPIC_3_NAME = "topic3";
private static final String TOPIC_4_NAME = "topic4";
private static final String MEMBER_A = "A"; private static final String MEMBER_A = "A";
private static final String MEMBER_B = "B"; private static final String MEMBER_B = "B";
private static final String MEMBER_C = "C";
private final SimpleAssignor assignor = new SimpleAssignor(); private final SimpleAssignor assignor = new SimpleAssignor();
@ -62,13 +70,13 @@ public class SimpleAssignorTest {
@Test @Test
public void testAssignWithEmptyMembers() { public void testAssignWithEmptyMembers() {
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl( SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(
Collections.emptyMap() Map.of()
); );
GroupSpec groupSpec = new GroupSpecImpl( GroupSpec groupSpec = new GroupSpecImpl(
Collections.emptyMap(), Map.of(),
HOMOGENEOUS, HOMOGENEOUS,
Collections.emptyMap() Map.of()
); );
GroupAssignment groupAssignment = assignor.assign( GroupAssignment groupAssignment = assignor.assign(
@ -76,13 +84,24 @@ public class SimpleAssignorTest {
subscribedTopicMetadata subscribedTopicMetadata
); );
assertEquals(Collections.emptyMap(), groupAssignment.members()); assertEquals(Map.of(), groupAssignment.members());
groupSpec = new GroupSpecImpl(
Map.of(),
HETEROGENEOUS,
Map.of()
);
groupAssignment = assignor.assign(
groupSpec,
subscribedTopicMetadata
);
assertEquals(Map.of(), groupAssignment.members());
} }
@Test @Test
public void testAssignWithNoSubscribedTopic() { public void testAssignWithNoSubscribedTopic() {
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl( SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(
Collections.singletonMap( Map.of(
TOPIC_1_UUID, TOPIC_1_UUID,
new TopicMetadata( new TopicMetadata(
TOPIC_1_UUID, TOPIC_1_UUID,
@ -92,12 +111,12 @@ public class SimpleAssignorTest {
) )
); );
Map<String, MemberSubscriptionAndAssignmentImpl> members = Collections.singletonMap( Map<String, MemberSubscriptionAndAssignmentImpl> members = Map.of(
MEMBER_A, MEMBER_A,
new MemberSubscriptionAndAssignmentImpl( new MemberSubscriptionAndAssignmentImpl(
Optional.empty(), Optional.empty(),
Optional.empty(), Optional.empty(),
Collections.emptySet(), Set.of(),
Assignment.EMPTY Assignment.EMPTY
) )
); );
@ -105,7 +124,7 @@ public class SimpleAssignorTest {
GroupSpec groupSpec = new GroupSpecImpl( GroupSpec groupSpec = new GroupSpecImpl(
members, members,
HOMOGENEOUS, HOMOGENEOUS,
Collections.emptyMap() Map.of()
); );
GroupAssignment groupAssignment = assignor.assign( GroupAssignment groupAssignment = assignor.assign(
@ -113,13 +132,13 @@ public class SimpleAssignorTest {
subscribedTopicMetadata subscribedTopicMetadata
); );
assertEquals(Collections.emptyMap(), groupAssignment.members()); assertEquals(Map.of(), groupAssignment.members());
} }
@Test @Test
public void testAssignWithSubscribedToNonExistentTopic() { public void testAssignWithSubscribedToNonExistentTopic() {
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl( SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(
Collections.singletonMap( Map.of(
TOPIC_1_UUID, TOPIC_1_UUID,
new TopicMetadata( new TopicMetadata(
TOPIC_1_UUID, TOPIC_1_UUID,
@ -129,7 +148,7 @@ public class SimpleAssignorTest {
) )
); );
Map<String, MemberSubscriptionAndAssignmentImpl> members = Collections.singletonMap( Map<String, MemberSubscriptionAndAssignmentImpl> members = Map.of(
MEMBER_A, MEMBER_A,
new MemberSubscriptionAndAssignmentImpl( new MemberSubscriptionAndAssignmentImpl(
Optional.empty(), Optional.empty(),
@ -142,7 +161,7 @@ public class SimpleAssignorTest {
GroupSpec groupSpec = new GroupSpecImpl( GroupSpec groupSpec = new GroupSpecImpl(
members, members,
HOMOGENEOUS, HOMOGENEOUS,
Collections.emptyMap() Map.of()
); );
assertThrows(PartitionAssignorException.class, assertThrows(PartitionAssignorException.class,
@ -163,26 +182,30 @@ public class SimpleAssignorTest {
2 2
)); ));
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>(); Map<String, MemberSubscriptionAndAssignmentImpl> members = new HashMap<>();
Set<Uuid> topicsSubscription = new LinkedHashSet<>();
topicsSubscription.add(TOPIC_1_UUID);
topicsSubscription.add(TOPIC_3_UUID);
members.put(MEMBER_A, new MemberSubscriptionAndAssignmentImpl( members.put(MEMBER_A, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(), Optional.empty(),
Optional.empty(), Optional.empty(),
Set.of(TOPIC_1_UUID, TOPIC_3_UUID), topicsSubscription,
Assignment.EMPTY Assignment.EMPTY
)); ));
members.put(MEMBER_B, new MemberSubscriptionAndAssignmentImpl( members.put(MEMBER_B, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(), Optional.empty(),
Optional.empty(), Optional.empty(),
Set.of(TOPIC_1_UUID, TOPIC_3_UUID), topicsSubscription,
Assignment.EMPTY Assignment.EMPTY
)); ));
GroupSpec groupSpec = new GroupSpecImpl( GroupSpec groupSpec = new GroupSpecImpl(
members, members,
HOMOGENEOUS, HOMOGENEOUS,
Collections.emptyMap() Map.of()
); );
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata); SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata);
@ -191,16 +214,22 @@ public class SimpleAssignorTest {
subscribedTopicMetadata subscribedTopicMetadata
); );
// Hashcode of MEMBER_A is 65. Hashcode of MEMBER_B is 66.
// Step 1 -> T1:0 -> MEMBER_A and T1:1 -> MEMBER_B by hash assignment.
// Step 2 -> T1:2, T3:1 -> MEMBER_A and T3:0 -> MEMBER_B by round-robin assignment.
// Step 3 -> no new assignment gets added by current assignment since it is empty.
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>(); Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(MEMBER_A, mkAssignment( expectedAssignment.put(MEMBER_A, mkAssignment(
mkTopicAssignment(TOPIC_1_UUID, 0, 1, 2), mkTopicAssignment(TOPIC_1_UUID, 0, 2),
mkTopicAssignment(TOPIC_3_UUID, 0, 1) mkTopicAssignment(TOPIC_3_UUID, 1)
)); ));
expectedAssignment.put(MEMBER_B, mkAssignment( expectedAssignment.put(MEMBER_B, mkAssignment(
mkTopicAssignment(TOPIC_1_UUID, 0, 1, 2), mkTopicAssignment(TOPIC_1_UUID, 1),
mkTopicAssignment(TOPIC_3_UUID, 0, 1) mkTopicAssignment(TOPIC_3_UUID, 0)
)); ));
// T1: 3 partitions + T3: 2 partitions = 5 partitions
assertEveryPartitionGetsAssignment(5, computedAssignment);
assertAssignment(expectedAssignment, computedAssignment); assertAssignment(expectedAssignment, computedAssignment);
} }
@ -224,11 +253,15 @@ public class SimpleAssignorTest {
2 2
)); ));
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>(); Set<Uuid> memberATopicsSubscription = new LinkedHashSet<>();
memberATopicsSubscription.add(TOPIC_1_UUID);
memberATopicsSubscription.add(TOPIC_2_UUID);
Map<String, MemberSubscriptionAndAssignmentImpl> members = new HashMap<>();
members.put(MEMBER_A, new MemberSubscriptionAndAssignmentImpl( members.put(MEMBER_A, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(), Optional.empty(),
Optional.empty(), Optional.empty(),
Set.of(TOPIC_1_UUID, TOPIC_2_UUID), memberATopicsSubscription,
Assignment.EMPTY Assignment.EMPTY
)); ));
@ -239,18 +272,20 @@ public class SimpleAssignorTest {
Assignment.EMPTY Assignment.EMPTY
)); ));
String memberC = "C"; Set<Uuid> memberCTopicsSubscription = new LinkedHashSet<>();
members.put(memberC, new MemberSubscriptionAndAssignmentImpl( memberCTopicsSubscription.add(TOPIC_2_UUID);
memberCTopicsSubscription.add(TOPIC_3_UUID);
members.put(MEMBER_C, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(), Optional.empty(),
Optional.empty(), Optional.empty(),
Set.of(TOPIC_2_UUID, TOPIC_3_UUID), memberCTopicsSubscription,
Assignment.EMPTY Assignment.EMPTY
)); ));
GroupSpec groupSpec = new GroupSpecImpl( GroupSpec groupSpec = new GroupSpecImpl(
members, members,
HETEROGENEOUS, HETEROGENEOUS,
Collections.emptyMap() Map.of()
); );
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata); SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata);
@ -259,19 +294,24 @@ public class SimpleAssignorTest {
subscribedTopicMetadata subscribedTopicMetadata
); );
// Hashcode of MEMBER_A is 65. Hashcode of MEMBER_B is 66. Hashcode of MEMBER_C is 67.
// Step 1 -> T2:2 -> member_A, T3:0 -> member_B, T2:2 -> member_C by hash assignment.
// Step 2 -> T1:0, T1:1, T1:2, T2:0 -> member_A, T3:1, -> member_B, T2:1 -> member_C by round-robin assignment.
// Step 3 -> no new assignment gets added by current assignment since it is empty.
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>(); Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(MEMBER_A, mkAssignment( expectedAssignment.put(MEMBER_A, mkAssignment(
mkTopicAssignment(TOPIC_1_UUID, 0, 1, 2), mkTopicAssignment(TOPIC_1_UUID, 0, 1, 2),
mkTopicAssignment(TOPIC_2_UUID, 0, 1, 2) mkTopicAssignment(TOPIC_2_UUID, 0, 2)
)); ));
expectedAssignment.put(MEMBER_B, mkAssignment( expectedAssignment.put(MEMBER_B, mkAssignment(
mkTopicAssignment(TOPIC_3_UUID, 0, 1) mkTopicAssignment(TOPIC_3_UUID, 0, 1)
)); ));
expectedAssignment.put(memberC, mkAssignment( expectedAssignment.put(MEMBER_C, mkAssignment(
mkTopicAssignment(TOPIC_2_UUID, 0, 1, 2), mkTopicAssignment(TOPIC_2_UUID, 1, 2)
mkTopicAssignment(TOPIC_3_UUID, 0, 1)
)); ));
// T1: 3 partitions + T2: 3 partitions + T3: 2 partitions = 8 partitions
assertEveryPartitionGetsAssignment(8, computedAssignment);
assertAssignment(expectedAssignment, computedAssignment); assertAssignment(expectedAssignment, computedAssignment);
} }
@ -290,25 +330,28 @@ public class SimpleAssignorTest {
2 2
)); ));
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>(); Set<Uuid> memberATopicsSubscription = new LinkedHashSet<>();
memberATopicsSubscription.add(TOPIC_1_UUID);
memberATopicsSubscription.add(TOPIC_2_UUID);
Map<String, MemberSubscriptionAndAssignmentImpl> members = new HashMap<>();
members.put(MEMBER_A, new MemberSubscriptionAndAssignmentImpl( members.put(MEMBER_A, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(), Optional.empty(),
Optional.empty(), Optional.empty(),
Set.of(TOPIC_1_UUID, TOPIC_2_UUID), memberATopicsSubscription,
Assignment.EMPTY Assignment.EMPTY
)); ));
members.put(MEMBER_B, new MemberSubscriptionAndAssignmentImpl( members.put(MEMBER_B, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(), Optional.empty(),
Optional.empty(), Optional.empty(),
Collections.emptySet(), Set.of(),
Assignment.EMPTY Assignment.EMPTY
)); ));
GroupSpec groupSpec = new GroupSpecImpl( GroupSpec groupSpec = new GroupSpecImpl(
members, members,
HETEROGENEOUS, HETEROGENEOUS,
Collections.emptyMap() Map.of()
); );
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata); SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata);
@ -320,10 +363,376 @@ public class SimpleAssignorTest {
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>(); Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(MEMBER_A, mkAssignment( expectedAssignment.put(MEMBER_A, mkAssignment(
mkTopicAssignment(TOPIC_1_UUID, 0, 1, 2), mkTopicAssignment(TOPIC_1_UUID, 0, 1, 2),
mkTopicAssignment(TOPIC_2_UUID, 0, 1) mkTopicAssignment(TOPIC_2_UUID, 0, 1)));
expectedAssignment.put(MEMBER_B, mkAssignment());
// T1: 3 partitions + T2: 2 partitions = 5 partitions
assertEveryPartitionGetsAssignment(5, computedAssignment);
assertAssignment(expectedAssignment, computedAssignment);
}
@Test
public void testMemberHashAssignment() {
// hashcode for "member1" is 948881623.
String member1 = "member1";
// hashcode for "member2" is 948881624.
String member2 = "member2";
// hashcode for "member3" is 948881625.
String member3 = "member3";
// hashcode for "member4" is 948881626.
String member4 = "member4";
// hashcode for "AaAaAaAa" is -540425984 to test with negative hashcode.
String member5 = "AaAaAaAa";
List<String> members = List.of(member1, member2, member3, member4, member5);
TopicIdPartition partition1 = new TopicIdPartition(TOPIC_1_UUID, 0);
TopicIdPartition partition2 = new TopicIdPartition(TOPIC_2_UUID, 0);
TopicIdPartition partition3 = new TopicIdPartition(TOPIC_3_UUID, 0);
List<TopicIdPartition> partitions = List.of(partition1, partition2, partition3);
Map<TopicIdPartition, List<String>> computedAssignment = new HashMap<>();
assignor.memberHashAssignment(partitions, members, computedAssignment);
Map<TopicIdPartition, List<String>> expectedAssignment = new HashMap<>();
expectedAssignment.put(partition1, List.of(member3));
expectedAssignment.put(partition2, List.of(member1, member4));
expectedAssignment.put(partition3, List.of(member2, member5));
assertAssignment(expectedAssignment, computedAssignment);
}
@Test
public void testRoundRobinAssignment() {
String member1 = "member1";
String member2 = "member2";
List<String> members = List.of(member1, member2);
TopicIdPartition partition1 = new TopicIdPartition(TOPIC_1_UUID, 0);
TopicIdPartition partition2 = new TopicIdPartition(TOPIC_2_UUID, 0);
TopicIdPartition partition3 = new TopicIdPartition(TOPIC_3_UUID, 0);
TopicIdPartition partition4 = new TopicIdPartition(TOPIC_4_UUID, 0);
List<TopicIdPartition> unassignedPartitions = List.of(partition2, partition3, partition4);
Map<TopicIdPartition, List<String>> assignment = new HashMap<>();
assignment.put(partition1, List.of(member1));
assignor.roundRobinAssignment(members, unassignedPartitions, assignment);
Map<TopicIdPartition, List<String>> expectedAssignment = Map.of(
partition1, List.of(member1),
partition2, List.of(member1),
partition3, List.of(member2),
partition4, List.of(member1)
);
assertAssignment(expectedAssignment, assignment);
}
@Test
public void testAssignWithCurrentAssignmentHomogeneous() {
// Current assignment setup - Two members A, B subscribing to T1 and T2.
Map<Uuid, TopicMetadata> topicMetadata1 = new HashMap<>();
topicMetadata1.put(TOPIC_1_UUID, new TopicMetadata(
TOPIC_1_UUID,
TOPIC_1_NAME,
3
));
topicMetadata1.put(TOPIC_2_UUID, new TopicMetadata(
TOPIC_2_UUID,
TOPIC_2_NAME,
2
)); ));
assertAssignment(expectedAssignment, computedAssignment); Map<String, MemberSubscriptionAndAssignmentImpl> members1 = new HashMap<>();
Set<Uuid> topicsSubscription1 = new LinkedHashSet<>();
topicsSubscription1.add(TOPIC_1_UUID);
topicsSubscription1.add(TOPIC_2_UUID);
members1.put(MEMBER_A, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
Optional.empty(),
topicsSubscription1,
Assignment.EMPTY
));
members1.put(MEMBER_B, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
Optional.empty(),
topicsSubscription1,
Assignment.EMPTY
));
GroupSpec groupSpec1 = new GroupSpecImpl(
members1,
HOMOGENEOUS,
Map.of()
);
SubscribedTopicDescriberImpl subscribedTopicMetadata1 = new SubscribedTopicDescriberImpl(topicMetadata1);
GroupAssignment computedAssignment1 = assignor.assign(
groupSpec1,
subscribedTopicMetadata1
);
// Hashcode of MEMBER_A is 65. Hashcode of MEMBER_B is 66.
// Step 1 -> T1:0 -> MEMBER_A and T1:1 -> MEMBER_B by hash assignment.
// Step 2 -> T1:2, T2:1 -> MEMBER_A and T2:0 -> MEMBER_B by round-robin assignment.
// Step 3 -> no new assignment gets added by current assignment since it is empty.
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment1 = new HashMap<>();
expectedAssignment1.put(MEMBER_A, mkAssignment(
mkTopicAssignment(TOPIC_1_UUID, 0, 2),
mkTopicAssignment(TOPIC_2_UUID, 1)
));
expectedAssignment1.put(MEMBER_B, mkAssignment(
mkTopicAssignment(TOPIC_1_UUID, 1),
mkTopicAssignment(TOPIC_2_UUID, 0)
));
// T1: 3 partitions + T2: 2 partitions = 5 partitions
assertEveryPartitionGetsAssignment(5, computedAssignment1);
assertAssignment(expectedAssignment1, computedAssignment1);
// New assignment setup - Three members A, B, C subscribing to T2 and T3.
Map<Uuid, TopicMetadata> topicMetadata2 = new HashMap<>();
topicMetadata2.put(TOPIC_2_UUID, new TopicMetadata(
TOPIC_2_UUID,
TOPIC_2_NAME,
2
));
topicMetadata2.put(TOPIC_3_UUID, new TopicMetadata(
TOPIC_3_UUID,
TOPIC_3_NAME,
3
));
Map<String, MemberSubscriptionAndAssignmentImpl> members2 = new HashMap<>();
Set<Uuid> topicsSubscription2 = new LinkedHashSet<>();
topicsSubscription2.add(TOPIC_2_UUID);
topicsSubscription2.add(TOPIC_3_UUID);
members2.put(MEMBER_A, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
Optional.empty(),
topicsSubscription2,
// Utilizing the assignment from current assignment
new Assignment(mkAssignment(
mkTopicAssignment(TOPIC_1_UUID, 0, 2),
mkTopicAssignment(TOPIC_2_UUID, 1)))
));
members2.put(MEMBER_B, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
Optional.empty(),
topicsSubscription2,
new Assignment(mkAssignment(
mkTopicAssignment(TOPIC_1_UUID, 1),
mkTopicAssignment(TOPIC_2_UUID, 0)))
));
members2.put(MEMBER_C, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
Optional.empty(),
topicsSubscription2,
Assignment.EMPTY
));
GroupSpec groupSpec2 = new GroupSpecImpl(
members2,
HOMOGENEOUS,
Map.of()
);
SubscribedTopicDescriberImpl subscribedTopicMetadata2 = new SubscribedTopicDescriberImpl(topicMetadata2);
GroupAssignment computedAssignment2 = assignor.assign(
groupSpec2,
subscribedTopicMetadata2
);
// Hashcode of MEMBER_A is 65. Hashcode of MEMBER_B is 66. Hashcode of MEMBER_C is 67.
// Step 1 -> T2:0 -> MEMBER_A, T2:1 -> MEMBER_B, T3:0 -> MEMBER_C by hash assignment
// Step 2 -> T3:1 -> MEMBER_A, T3:2 -> MEMBER_B by round-robin assignment
// Step 3 -> no new addition by current assignment since T2:0 and T2:1 were already a part of new assignment.
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment2 = new HashMap<>();
expectedAssignment2.put(MEMBER_A, mkAssignment(
mkTopicAssignment(TOPIC_2_UUID, 0),
mkTopicAssignment(TOPIC_3_UUID, 1)
));
expectedAssignment2.put(MEMBER_B, mkAssignment(
mkTopicAssignment(TOPIC_2_UUID, 1),
mkTopicAssignment(TOPIC_3_UUID, 2)
));
expectedAssignment2.put(MEMBER_C, mkAssignment(
mkTopicAssignment(TOPIC_3_UUID, 0)
));
// T2: 2 partitions + T3: 3 partitions = 5 partitions
assertEveryPartitionGetsAssignment(5, computedAssignment2);
assertAssignment(expectedAssignment2, computedAssignment2);
}
@Test
public void testAssignWithCurrentAssignmentHeterogeneous() {
// Current assignment setup - 3 members A - {T1, T2}, B - {T3}, C - {T2, T3}.
Map<Uuid, TopicMetadata> topicMetadata1 = new HashMap<>();
topicMetadata1.put(TOPIC_1_UUID, new TopicMetadata(
TOPIC_1_UUID,
TOPIC_1_NAME,
3
));
topicMetadata1.put(TOPIC_2_UUID, new TopicMetadata(
TOPIC_2_UUID,
TOPIC_2_NAME,
3
));
topicMetadata1.put(TOPIC_3_UUID, new TopicMetadata(
TOPIC_3_UUID,
TOPIC_3_NAME,
2
));
Set<Uuid> memberATopicsSubscription1 = new LinkedHashSet<>();
memberATopicsSubscription1.add(TOPIC_1_UUID);
memberATopicsSubscription1.add(TOPIC_2_UUID);
Map<String, MemberSubscriptionAndAssignmentImpl> members1 = new HashMap<>();
members1.put(MEMBER_A, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
Optional.empty(),
memberATopicsSubscription1,
Assignment.EMPTY
));
members1.put(MEMBER_B, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
Optional.empty(),
Set.of(TOPIC_3_UUID),
Assignment.EMPTY
));
Set<Uuid> memberCTopicsSubscription1 = new LinkedHashSet<>();
memberCTopicsSubscription1.add(TOPIC_2_UUID);
memberCTopicsSubscription1.add(TOPIC_3_UUID);
members1.put(MEMBER_C, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
Optional.empty(),
memberCTopicsSubscription1,
Assignment.EMPTY
));
GroupSpec groupSpec1 = new GroupSpecImpl(
members1,
HETEROGENEOUS,
Map.of()
);
SubscribedTopicDescriberImpl subscribedTopicMetadata1 = new SubscribedTopicDescriberImpl(topicMetadata1);
GroupAssignment computedAssignment1 = assignor.assign(
groupSpec1,
subscribedTopicMetadata1
);
// Hashcode of MEMBER_A is 65. Hashcode of MEMBER_B is 66. Hashcode of MEMBER_C is 67.
// Step 1 -> T2:2 -> member_A, T3:0 -> member_B, T2:2 -> member_C by hash assignment.
// Step 2 -> T1:0, T1:1, T1:2, T2:0 -> member_A, T3:1, -> member_B, T2:1 -> member_C by round-robin assignment.
// Step 3 -> no new assignment gets added by current assignment since it is empty.
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment1 = new HashMap<>();
expectedAssignment1.put(MEMBER_A, mkAssignment(
mkTopicAssignment(TOPIC_1_UUID, 0, 1, 2),
mkTopicAssignment(TOPIC_2_UUID, 0, 2)
));
expectedAssignment1.put(MEMBER_B, mkAssignment(
mkTopicAssignment(TOPIC_3_UUID, 0, 1)
));
expectedAssignment1.put(MEMBER_C, mkAssignment(
mkTopicAssignment(TOPIC_2_UUID, 1, 2)
));
// T1: 3 partitions + T2: 3 partitions + T3: 2 partitions = 8 partitions
assertEveryPartitionGetsAssignment(8, computedAssignment1);
assertAssignment(expectedAssignment1, computedAssignment1);
// New assignment setup - 2 members A - {T1, T2, T3}, B - {T3, T4}.
Map<Uuid, TopicMetadata> topicMetadata2 = new HashMap<>();
topicMetadata2.put(TOPIC_1_UUID, new TopicMetadata(
TOPIC_1_UUID,
TOPIC_1_NAME,
3
));
topicMetadata2.put(TOPIC_2_UUID, new TopicMetadata(
TOPIC_2_UUID,
TOPIC_2_NAME,
3
));
topicMetadata2.put(TOPIC_3_UUID, new TopicMetadata(
TOPIC_3_UUID,
TOPIC_3_NAME,
2
));
topicMetadata2.put(TOPIC_4_UUID, new TopicMetadata(
TOPIC_4_UUID,
TOPIC_4_NAME,
1
));
Map<String, MemberSubscriptionAndAssignmentImpl> members2 = new HashMap<>();
Set<Uuid> memberATopicsSubscription2 = new LinkedHashSet<>();
memberATopicsSubscription2.add(TOPIC_1_UUID);
memberATopicsSubscription2.add(TOPIC_2_UUID);
memberATopicsSubscription2.add(TOPIC_3_UUID);
Set<Uuid> memberBTopicsSubscription2 = new LinkedHashSet<>();
memberBTopicsSubscription2.add(TOPIC_3_UUID);
memberBTopicsSubscription2.add(TOPIC_4_UUID);
members2.put(MEMBER_A, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
Optional.empty(),
memberATopicsSubscription2,
new Assignment(mkAssignment(
mkTopicAssignment(TOPIC_1_UUID, 0, 1, 2),
mkTopicAssignment(TOPIC_2_UUID, 0, 2)))
));
members2.put(MEMBER_B, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
Optional.empty(),
memberBTopicsSubscription2,
new Assignment(mkAssignment(
mkTopicAssignment(TOPIC_3_UUID, 0, 1)))
));
GroupSpec groupSpec2 = new GroupSpecImpl(
members2,
HETEROGENEOUS,
Map.of()
);
SubscribedTopicDescriberImpl subscribedTopicMetadata2 = new SubscribedTopicDescriberImpl(topicMetadata2);
GroupAssignment computedAssignment2 = assignor.assign(
groupSpec2,
subscribedTopicMetadata2
);
// Hashcode of MEMBER_A is 65. Hashcode of MEMBER_B is 66.
// Step 1 -> T1:1 -> member_A, T3:0 -> member_B by hash assignment.
// Step 2 -> T2:1 -> member_A, T4:0 -> member_B by round-robin assignment.
// Step 3 -> T1:0, T1:2, T2:0 -> member_A, T3:1 -> member_B by current assignment.
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment2 = new HashMap<>();
expectedAssignment2.put(MEMBER_A, mkAssignment(
mkTopicAssignment(TOPIC_1_UUID, 0, 1, 2),
mkTopicAssignment(TOPIC_2_UUID, 0, 1, 2)
));
expectedAssignment2.put(MEMBER_B, mkAssignment(
mkTopicAssignment(TOPIC_3_UUID, 0, 1),
mkTopicAssignment(TOPIC_4_UUID, 0)
));
// T1: 3 partitions + T2: 3 partitions + T3: 2 partitions + T4: 1 partition = 9 partitions
assertEveryPartitionGetsAssignment(9, computedAssignment2);
assertAssignment(expectedAssignment2, computedAssignment2);
} }
private void assertAssignment( private void assertAssignment(
@ -336,4 +745,31 @@ public class SimpleAssignorTest {
assertEquals(expectedAssignment.get(memberId), computedAssignmentForMember); assertEquals(expectedAssignment.get(memberId), computedAssignmentForMember);
} }
} }
private void assertAssignment(
Map<TopicIdPartition, List<String>> expectedAssignment,
Map<TopicIdPartition, List<String>> computedAssignment
) {
assertEquals(expectedAssignment.size(), computedAssignment.size());
expectedAssignment.forEach((topicIdPartition, members) -> {
List<String> computedMembers = computedAssignment.getOrDefault(topicIdPartition, List.of());
assertEquals(members.size(), computedMembers.size());
members.forEach(member -> assertTrue(computedMembers.contains(member)));
});
}
private void assertEveryPartitionGetsAssignment(
int expectedPartitions,
GroupAssignment computedGroupAssignment
) {
Map<String, MemberAssignment> memberAssignments = computedGroupAssignment.members();
Set<TopicIdPartition> topicPartitionAssignments = new HashSet<>();
memberAssignments.values().forEach(memberAssignment -> {
Map<Uuid, Set<Integer>> topicIdPartitions = memberAssignment.partitions();
topicIdPartitions.forEach((topicId, partitions) ->
partitions.forEach(partition -> topicPartitionAssignments.add(new TopicIdPartition(topicId, partition)))
);
});
assertEquals(expectedPartitions, topicPartitionAssignments.size());
}
} }