diff --git a/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala b/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala index 07c7b959ab8..448b6897ede 100644 --- a/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala @@ -28,6 +28,7 @@ import org.apache.kafka.common.test.ClusterInstance import org.junit.jupiter.api.Assertions.{assertEquals, assertNotEquals, assertNotNull, assertNull, assertTrue} import org.junit.jupiter.api.{Tag, Timeout} +import java.util import scala.jdk.CollectionConverters._ @Timeout(120) @@ -216,18 +217,12 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) { assertNotEquals(memberId1, memberId2) // Create the topic. - val topicId = TestUtils.createTopicWithAdminRaw( + TestUtils.createTopicWithAdminRaw( admin = admin, topic = "foo", numPartitions = 3 ) - // This is the expected assignment. - val expectedAssignment = new ShareGroupHeartbeatResponseData.Assignment() - .setTopicPartitions(List(new ShareGroupHeartbeatResponseData.TopicPartitions() - .setTopicId(topicId) - .setPartitions(List[Integer](0, 1, 2).asJava)).asJava) - // Prepare the next heartbeat for member 1. shareGroupHeartbeatRequest = new ShareGroupHeartbeatRequest.Builder( new ShareGroupHeartbeatRequestData() @@ -241,10 +236,10 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) { shareGroupHeartbeatResponse = null TestUtils.waitUntilTrue(() => { shareGroupHeartbeatResponse = connectAndReceive(shareGroupHeartbeatRequest) - shareGroupHeartbeatResponse.data.errorCode == Errors.NONE.code && - shareGroupHeartbeatResponse.data.assignment == expectedAssignment + shareGroupHeartbeatResponse.data.errorCode == Errors.NONE.code && shareGroupHeartbeatResponse.data.assignment != null }, msg = s"Could not get partitions assigned. Last response $shareGroupHeartbeatResponse.") + val topicPartitionsAssignedToMember1 = shareGroupHeartbeatResponse.data.assignment.topicPartitions() // Verify the response. assertEquals(3, shareGroupHeartbeatResponse.data.memberEpoch) @@ -261,13 +256,23 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) { shareGroupHeartbeatResponse = null TestUtils.waitUntilTrue(() => { shareGroupHeartbeatResponse = connectAndReceive(shareGroupHeartbeatRequest) - shareGroupHeartbeatResponse.data.errorCode == Errors.NONE.code && - shareGroupHeartbeatResponse.data.assignment == expectedAssignment + shareGroupHeartbeatResponse.data.errorCode == Errors.NONE.code && shareGroupHeartbeatResponse.data.assignment != null }, msg = s"Could not get partitions assigned. Last response $shareGroupHeartbeatResponse.") + val topicPartitionsAssignedToMember2 = shareGroupHeartbeatResponse.data.assignment.topicPartitions() // Verify the response. assertEquals(3, shareGroupHeartbeatResponse.data.memberEpoch) + val partitionsAssigned: util.Set[Integer] = new util.HashSet[Integer]() + topicPartitionsAssignedToMember1.forEach(topicPartition => { + partitionsAssigned.addAll(topicPartition.partitions()) + }) + topicPartitionsAssignedToMember2.forEach(topicPartition => { + partitionsAssigned.addAll(topicPartition.partitions()) + }) + // Verify all the 3 topic partitions for "foo" have been assigned to at least 1 member. + assertEquals(util.Set.of(0, 1, 2), partitionsAssigned) + // Verify the assignments are not changed for member 1. // Prepare another heartbeat for member 1 with latest received epoch 3 for member 1. shareGroupHeartbeatRequest = new ShareGroupHeartbeatRequest.Builder( diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java index 4cc66468ae7..781b64dd604 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java @@ -25,19 +25,23 @@ import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorExceptio import org.apache.kafka.coordinator.group.api.assignor.ShareGroupPartitionAssignor; import org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber; import org.apache.kafka.coordinator.group.modern.MemberAssignmentImpl; +import org.apache.kafka.server.common.TopicIdPartition; +import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.List; import java.util.Map; import java.util.Set; -import java.util.function.Function; -import java.util.stream.Collectors; import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS; /** - * A simple partition assignor that assigns each member all partitions of the subscribed topics. + * A simple partition assignor that assigns partitions of the subscribed topics based on the rules defined in KIP-932 to different members. */ public class SimpleAssignor implements ShareGroupPartitionAssignor { @@ -54,7 +58,7 @@ public class SimpleAssignor implements ShareGroupPartitionAssignor { SubscribedTopicDescriber subscribedTopicDescriber ) throws PartitionAssignorException { if (groupSpec.memberIds().isEmpty()) - return new GroupAssignment(Collections.emptyMap()); + return new GroupAssignment(Map.of()); if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) { return assignHomogenous(groupSpec, subscribedTopicDescriber); @@ -67,42 +71,240 @@ public class SimpleAssignor implements ShareGroupPartitionAssignor { GroupSpec groupSpec, SubscribedTopicDescriber subscribedTopicDescriber ) { - Set subscribeTopicIds = groupSpec.memberSubscription(groupSpec.memberIds().iterator().next()) + Set subscribedTopicIds = groupSpec.memberSubscription(groupSpec.memberIds().iterator().next()) .subscribedTopicIds(); - if (subscribeTopicIds.isEmpty()) - return new GroupAssignment(Collections.emptyMap()); + if (subscribedTopicIds.isEmpty()) + return new GroupAssignment(Map.of()); - Map> targetPartitions = computeTargetPartitions( - subscribeTopicIds, subscribedTopicDescriber); + // Subscribed topic partitions for the share group. + List targetPartitions = computeTargetPartitions( + subscribedTopicIds, subscribedTopicDescriber); - return new GroupAssignment(groupSpec.memberIds().stream().collect(Collectors.toMap( - Function.identity(), memberId -> new MemberAssignmentImpl(targetPartitions)))); + // The current assignment from topic partition to members. + Map> currentAssignment = currentAssignment(groupSpec); + return newAssignmentHomogeneous(groupSpec, subscribedTopicIds, targetPartitions, currentAssignment); } private GroupAssignment assignHeterogeneous( GroupSpec groupSpec, SubscribedTopicDescriber subscribedTopicDescriber ) { - Map members = new HashMap<>(); + Map> memberToPartitionsSubscription = new HashMap<>(); for (String memberId : groupSpec.memberIds()) { MemberSubscription spec = groupSpec.memberSubscription(memberId); if (spec.subscribedTopicIds().isEmpty()) continue; - Map> targetPartitions = computeTargetPartitions( + // Subscribed topic partitions for the share group member. + List targetPartitions = computeTargetPartitions( spec.subscribedTopicIds(), subscribedTopicDescriber); - - members.put(memberId, new MemberAssignmentImpl(targetPartitions)); + memberToPartitionsSubscription.put(memberId, targetPartitions); } + + // The current assignment from topic partition to members. + Map> currentAssignment = currentAssignment(groupSpec); + return newAssignmentHeterogeneous(groupSpec, memberToPartitionsSubscription, currentAssignment); + } + + /** + * Get the current assignment by topic partitions. + * @param groupSpec - The group metadata specifications. + * @return the current assignment for subscribed topic partitions to memberIds. + */ + private Map> currentAssignment(GroupSpec groupSpec) { + Map> assignment = new HashMap<>(); + + for (String member : groupSpec.memberIds()) { + Map> assignedTopicPartitions = groupSpec.memberAssignment(member).partitions(); + assignedTopicPartitions.forEach((topicId, partitions) -> partitions.forEach( + partition -> assignment.computeIfAbsent(new TopicIdPartition(topicId, partition), k -> new ArrayList<>()).add(member))); + } + return assignment; + } + + /** + * This function computes the new assignment for a homogeneous group. + * @param groupSpec - The group metadata specifications. + * @param subscribedTopicIds - The set of all the subscribed topic ids for the group. + * @param targetPartitions - The list of all topic partitions that need assignment. + * @param currentAssignment - The current assignment for subscribed topic partitions to memberIds. + * @return the new partition assignment for the members of the group. + */ + private GroupAssignment newAssignmentHomogeneous( + GroupSpec groupSpec, + Set subscribedTopicIds, + List targetPartitions, + Map> currentAssignment + ) { + Map> newAssignment = new HashMap<>(); + + // Step 1: Hash member IDs to topic partitions. + memberHashAssignment(targetPartitions, groupSpec.memberIds(), newAssignment); + + // Step 2: Round-robin assignment for unassigned partitions which do not have members already assigned in the current assignment. + List unassignedPartitions = targetPartitions.stream() + .filter(targetPartition -> !newAssignment.containsKey(targetPartition)) + .filter(targetPartition -> !currentAssignment.containsKey(targetPartition)) + .toList(); + + roundRobinAssignment(groupSpec.memberIds(), unassignedPartitions, newAssignment); + + // Step 3: We combine current assignment and new assignment. + Map> finalAssignment = new HashMap<>(); + + // As per the KIP, we should revoke the assignments from current assignment for partitions that were assigned by step 1 + // in the new assignment and have members in current assignment by step 2. But we haven't implemented it to avoid the + // complexity in both the implementation and the run time complexity. This step was mentioned in the KIP to reduce + // the burden of certain members of the share groups. This can be achieved with the help of limiting the max + // no. of partitions assignment for every member(KAFKA-18788). Hence, the potential problem of burdening + // the share consumers will be addressed in a future PR. + + newAssignment.forEach((targetPartition, members) -> members.forEach(member -> + finalAssignment.computeIfAbsent(member, k -> new HashSet<>()).add(targetPartition))); + // When combining current assignment, we need to only consider the topics in current assignment that are also being + // subscribed in the new assignment as well. + currentAssignment.forEach((targetPartition, members) -> { + if (subscribedTopicIds.contains(targetPartition.topicId())) + members.forEach(member -> { + if (groupSpec.memberIds().contains(member) && !newAssignment.containsKey(targetPartition)) + finalAssignment.computeIfAbsent(member, k -> new HashSet<>()).add(targetPartition); + }); + }); + + return groupAssignment(finalAssignment, groupSpec.memberIds()); + } + + /** + * This function computes the new assignment for a heterogeneous group. + * @param groupSpec - The group metadata specifications. + * @param memberToPartitionsSubscription - The member to subscribed topic partitions map. + * @param currentAssignment - The current assignment for subscribed topic partitions to memberIds. + * @return the new partition assignment for the members of the group. + */ + private GroupAssignment newAssignmentHeterogeneous( + GroupSpec groupSpec, + Map> memberToPartitionsSubscription, + Map> currentAssignment + ) { + + // Exhaustive set of all subscribed topic partitions. + Set targetPartitions = new LinkedHashSet<>(); + memberToPartitionsSubscription.values().forEach(targetPartitions::addAll); + + // Create a map for topic to members subscription. + Map> topicToMemberSubscription = new HashMap<>(); + memberToPartitionsSubscription.forEach((member, partitions) -> + partitions.forEach(partition -> topicToMemberSubscription.computeIfAbsent(partition.topicId(), k -> new LinkedHashSet<>()).add(member))); + + Map> newAssignment = new HashMap<>(); + + // Step 1: Hash member IDs to partitions. + memberToPartitionsSubscription.forEach((member, partitions) -> + memberHashAssignment(partitions, List.of(member), newAssignment)); + + // Step 2: Round-robin assignment for unassigned partitions which do not have members already assigned in the current assignment. + Set assignedPartitions = new LinkedHashSet<>(newAssignment.keySet()); + Map> unassignedPartitions = new HashMap<>(); + targetPartitions.forEach(targetPartition -> { + if (!assignedPartitions.contains(targetPartition) && !currentAssignment.containsKey(targetPartition)) + unassignedPartitions.computeIfAbsent(targetPartition.topicId(), k -> new ArrayList<>()).add(targetPartition); + }); + + unassignedPartitions.keySet().forEach(unassignedTopic -> + roundRobinAssignment(topicToMemberSubscription.get(unassignedTopic), unassignedPartitions.get(unassignedTopic), newAssignment)); + + // Step 3: We combine current assignment and new assignment. + Map> finalAssignment = new HashMap<>(); + // As per the KIP, we should revoke the assignments from current assignment for partitions that were assigned by step 1 + // in the new assignment and have members in current assignment by step 2. But we haven't implemented it to avoid the + // complexity in both the implementation and the run time complexity. This step was mentioned in the KIP to reduce + // the burden of certain members of the share groups. This can be achieved with the help of limiting the max + // no. of partitions assignment for every member(KAFKA-18788). Hence, the potential problem of burdening + // the share consumers will be addressed in a future PR. + + newAssignment.forEach((targetPartition, members) -> members.forEach(member -> + finalAssignment.computeIfAbsent(member, k -> new HashSet<>()).add(targetPartition))); + // When combining current assignment, we need to only consider the member topic subscription in current assignment + // which is being subscribed in the new assignment as well. + currentAssignment.forEach((topicIdPartition, members) -> members.forEach(member -> { + if (topicToMemberSubscription.getOrDefault(topicIdPartition.topicId(), Collections.emptySet()).contains(member) + && !newAssignment.containsKey(topicIdPartition)) + finalAssignment.computeIfAbsent(member, k -> new HashSet<>()).add(topicIdPartition); + })); + + return groupAssignment(finalAssignment, groupSpec.memberIds()); + } + + private GroupAssignment groupAssignment( + Map> assignmentByMember, + Collection allGroupMembers + ) { + Map members = new HashMap<>(); + for (Map.Entry> entry : assignmentByMember.entrySet()) { + Map> targetPartitions = new HashMap<>(); + entry.getValue().forEach(targetPartition -> targetPartitions.computeIfAbsent(targetPartition.topicId(), k -> new HashSet<>()).add(targetPartition.partitionId())); + members.put(entry.getKey(), new MemberAssignmentImpl(targetPartitions)); + } + allGroupMembers.forEach(member -> { + if (!members.containsKey(member)) + members.put(member, new MemberAssignmentImpl(new HashMap<>())); + }); + return new GroupAssignment(members); } - private Map> computeTargetPartitions( - Set subscribeTopicIds, + /** + * This function updates assignment by hashing the member IDs of the members and maps the partitions assigned to the + * members based on the hash. This gives approximately even balance. + * @param unassignedPartitions - the subscribed topic partitions which needs assignment. + * @param memberIds - the member ids to which the topic partitions need to be assigned. + * @param assignment - the existing assignment by topic partition. We need to pass it as a parameter because this + * function would be called multiple times for heterogeneous assignment. + */ + // Visible for testing + void memberHashAssignment( + List unassignedPartitions, + Collection memberIds, + Map> assignment + ) { + if (!unassignedPartitions.isEmpty()) + for (String memberId : memberIds) { + int topicPartitionIndex = Math.abs(memberId.hashCode() % unassignedPartitions.size()); + TopicIdPartition topicPartition = unassignedPartitions.get(topicPartitionIndex); + assignment.computeIfAbsent(topicPartition, k -> new ArrayList<>()).add(memberId); + } + } + + /** + * This functions assigns topic partitions to members by round-robin approach and updates the existing assignment. + * @param memberIds - the member ids to which the topic partitions need to be assigned, should be non-empty. + * @param unassignedPartitions - the subscribed topic partitions which needs assignment. + * @param assignment - the existing assignment by topic partition. + */ + // Visible for testing + void roundRobinAssignment( + Collection memberIds, + List unassignedPartitions, + Map> assignment + ) { + // We iterate through the target partitions and assign a memberId to them. In case we run out of members (members < targetPartitions), + // we again start from the starting index of memberIds. + Iterator memberIdIterator = memberIds.iterator(); + for (TopicIdPartition targetPartition : unassignedPartitions) { + if (!memberIdIterator.hasNext()) { + memberIdIterator = memberIds.iterator(); + } + String memberId = memberIdIterator.next(); + assignment.computeIfAbsent(targetPartition, k -> new ArrayList<>()).add(memberId); + } + } + + private List computeTargetPartitions( + Set subscribedTopicIds, SubscribedTopicDescriber subscribedTopicDescriber ) { - Map> targetPartitions = new HashMap<>(); - subscribeTopicIds.forEach(topicId -> { + List targetPartitions = new ArrayList<>(); + subscribedTopicIds.forEach(topicId -> { int numPartitions = subscribedTopicDescriber.numPartitions(topicId); if (numPartitions == -1) { throw new PartitionAssignorException( @@ -111,11 +313,9 @@ public class SimpleAssignor implements ShareGroupPartitionAssignor { ); } - Set partitions = new HashSet<>(); for (int i = 0; i < numPartitions; i++) { - partitions.add(i); + targetPartitions.add(new TopicIdPartition(topicId, i)); } - targetPartitions.put(topicId, partitions); }); return targetPartitions; } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignorTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignorTest.java index 71b64dcb817..a0b86d6fa13 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignorTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignorTest.java @@ -19,21 +19,24 @@ package org.apache.kafka.coordinator.group.assignor; import org.apache.kafka.common.Uuid; import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment; import org.apache.kafka.coordinator.group.api.assignor.GroupSpec; +import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment; import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException; import org.apache.kafka.coordinator.group.modern.Assignment; import org.apache.kafka.coordinator.group.modern.GroupSpecImpl; import org.apache.kafka.coordinator.group.modern.MemberSubscriptionAndAssignmentImpl; import org.apache.kafka.coordinator.group.modern.SubscribedTopicDescriberImpl; import org.apache.kafka.coordinator.group.modern.TopicMetadata; +import org.apache.kafka.server.common.TopicIdPartition; import org.junit.jupiter.api.Test; -import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.TreeMap; import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment; import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; @@ -41,16 +44,21 @@ import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.H import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; public class SimpleAssignorTest { private static final Uuid TOPIC_1_UUID = Uuid.randomUuid(); private static final Uuid TOPIC_2_UUID = Uuid.randomUuid(); private static final Uuid TOPIC_3_UUID = Uuid.randomUuid(); + private static final Uuid TOPIC_4_UUID = Uuid.randomUuid(); private static final String TOPIC_1_NAME = "topic1"; + private static final String TOPIC_2_NAME = "topic2"; private static final String TOPIC_3_NAME = "topic3"; + private static final String TOPIC_4_NAME = "topic4"; private static final String MEMBER_A = "A"; private static final String MEMBER_B = "B"; + private static final String MEMBER_C = "C"; private final SimpleAssignor assignor = new SimpleAssignor(); @@ -62,13 +70,13 @@ public class SimpleAssignorTest { @Test public void testAssignWithEmptyMembers() { SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl( - Collections.emptyMap() + Map.of() ); GroupSpec groupSpec = new GroupSpecImpl( - Collections.emptyMap(), + Map.of(), HOMOGENEOUS, - Collections.emptyMap() + Map.of() ); GroupAssignment groupAssignment = assignor.assign( @@ -76,13 +84,24 @@ public class SimpleAssignorTest { subscribedTopicMetadata ); - assertEquals(Collections.emptyMap(), groupAssignment.members()); + assertEquals(Map.of(), groupAssignment.members()); + + groupSpec = new GroupSpecImpl( + Map.of(), + HETEROGENEOUS, + Map.of() + ); + groupAssignment = assignor.assign( + groupSpec, + subscribedTopicMetadata + ); + assertEquals(Map.of(), groupAssignment.members()); } @Test public void testAssignWithNoSubscribedTopic() { SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl( - Collections.singletonMap( + Map.of( TOPIC_1_UUID, new TopicMetadata( TOPIC_1_UUID, @@ -92,12 +111,12 @@ public class SimpleAssignorTest { ) ); - Map members = Collections.singletonMap( + Map members = Map.of( MEMBER_A, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), Optional.empty(), - Collections.emptySet(), + Set.of(), Assignment.EMPTY ) ); @@ -105,7 +124,7 @@ public class SimpleAssignorTest { GroupSpec groupSpec = new GroupSpecImpl( members, HOMOGENEOUS, - Collections.emptyMap() + Map.of() ); GroupAssignment groupAssignment = assignor.assign( @@ -113,13 +132,13 @@ public class SimpleAssignorTest { subscribedTopicMetadata ); - assertEquals(Collections.emptyMap(), groupAssignment.members()); + assertEquals(Map.of(), groupAssignment.members()); } @Test public void testAssignWithSubscribedToNonExistentTopic() { SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl( - Collections.singletonMap( + Map.of( TOPIC_1_UUID, new TopicMetadata( TOPIC_1_UUID, @@ -129,7 +148,7 @@ public class SimpleAssignorTest { ) ); - Map members = Collections.singletonMap( + Map members = Map.of( MEMBER_A, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), @@ -142,7 +161,7 @@ public class SimpleAssignorTest { GroupSpec groupSpec = new GroupSpecImpl( members, HOMOGENEOUS, - Collections.emptyMap() + Map.of() ); assertThrows(PartitionAssignorException.class, @@ -163,26 +182,30 @@ public class SimpleAssignorTest { 2 )); - Map members = new TreeMap<>(); + Map members = new HashMap<>(); + + Set topicsSubscription = new LinkedHashSet<>(); + topicsSubscription.add(TOPIC_1_UUID); + topicsSubscription.add(TOPIC_3_UUID); members.put(MEMBER_A, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), Optional.empty(), - Set.of(TOPIC_1_UUID, TOPIC_3_UUID), + topicsSubscription, Assignment.EMPTY )); members.put(MEMBER_B, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), Optional.empty(), - Set.of(TOPIC_1_UUID, TOPIC_3_UUID), + topicsSubscription, Assignment.EMPTY )); GroupSpec groupSpec = new GroupSpecImpl( members, HOMOGENEOUS, - Collections.emptyMap() + Map.of() ); SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata); @@ -191,16 +214,22 @@ public class SimpleAssignorTest { subscribedTopicMetadata ); + // Hashcode of MEMBER_A is 65. Hashcode of MEMBER_B is 66. + // Step 1 -> T1:0 -> MEMBER_A and T1:1 -> MEMBER_B by hash assignment. + // Step 2 -> T1:2, T3:1 -> MEMBER_A and T3:0 -> MEMBER_B by round-robin assignment. + // Step 3 -> no new assignment gets added by current assignment since it is empty. Map>> expectedAssignment = new HashMap<>(); expectedAssignment.put(MEMBER_A, mkAssignment( - mkTopicAssignment(TOPIC_1_UUID, 0, 1, 2), - mkTopicAssignment(TOPIC_3_UUID, 0, 1) + mkTopicAssignment(TOPIC_1_UUID, 0, 2), + mkTopicAssignment(TOPIC_3_UUID, 1) )); expectedAssignment.put(MEMBER_B, mkAssignment( - mkTopicAssignment(TOPIC_1_UUID, 0, 1, 2), - mkTopicAssignment(TOPIC_3_UUID, 0, 1) + mkTopicAssignment(TOPIC_1_UUID, 1), + mkTopicAssignment(TOPIC_3_UUID, 0) )); + // T1: 3 partitions + T3: 2 partitions = 5 partitions + assertEveryPartitionGetsAssignment(5, computedAssignment); assertAssignment(expectedAssignment, computedAssignment); } @@ -224,11 +253,15 @@ public class SimpleAssignorTest { 2 )); - Map members = new TreeMap<>(); + Set memberATopicsSubscription = new LinkedHashSet<>(); + memberATopicsSubscription.add(TOPIC_1_UUID); + memberATopicsSubscription.add(TOPIC_2_UUID); + + Map members = new HashMap<>(); members.put(MEMBER_A, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), Optional.empty(), - Set.of(TOPIC_1_UUID, TOPIC_2_UUID), + memberATopicsSubscription, Assignment.EMPTY )); @@ -239,18 +272,20 @@ public class SimpleAssignorTest { Assignment.EMPTY )); - String memberC = "C"; - members.put(memberC, new MemberSubscriptionAndAssignmentImpl( + Set memberCTopicsSubscription = new LinkedHashSet<>(); + memberCTopicsSubscription.add(TOPIC_2_UUID); + memberCTopicsSubscription.add(TOPIC_3_UUID); + members.put(MEMBER_C, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), Optional.empty(), - Set.of(TOPIC_2_UUID, TOPIC_3_UUID), + memberCTopicsSubscription, Assignment.EMPTY )); GroupSpec groupSpec = new GroupSpecImpl( members, HETEROGENEOUS, - Collections.emptyMap() + Map.of() ); SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata); @@ -259,19 +294,24 @@ public class SimpleAssignorTest { subscribedTopicMetadata ); + // Hashcode of MEMBER_A is 65. Hashcode of MEMBER_B is 66. Hashcode of MEMBER_C is 67. + // Step 1 -> T2:2 -> member_A, T3:0 -> member_B, T2:2 -> member_C by hash assignment. + // Step 2 -> T1:0, T1:1, T1:2, T2:0 -> member_A, T3:1, -> member_B, T2:1 -> member_C by round-robin assignment. + // Step 3 -> no new assignment gets added by current assignment since it is empty. Map>> expectedAssignment = new HashMap<>(); expectedAssignment.put(MEMBER_A, mkAssignment( mkTopicAssignment(TOPIC_1_UUID, 0, 1, 2), - mkTopicAssignment(TOPIC_2_UUID, 0, 1, 2) + mkTopicAssignment(TOPIC_2_UUID, 0, 2) )); expectedAssignment.put(MEMBER_B, mkAssignment( mkTopicAssignment(TOPIC_3_UUID, 0, 1) )); - expectedAssignment.put(memberC, mkAssignment( - mkTopicAssignment(TOPIC_2_UUID, 0, 1, 2), - mkTopicAssignment(TOPIC_3_UUID, 0, 1) + expectedAssignment.put(MEMBER_C, mkAssignment( + mkTopicAssignment(TOPIC_2_UUID, 1, 2) )); + // T1: 3 partitions + T2: 3 partitions + T3: 2 partitions = 8 partitions + assertEveryPartitionGetsAssignment(8, computedAssignment); assertAssignment(expectedAssignment, computedAssignment); } @@ -290,25 +330,28 @@ public class SimpleAssignorTest { 2 )); - Map members = new TreeMap<>(); + Set memberATopicsSubscription = new LinkedHashSet<>(); + memberATopicsSubscription.add(TOPIC_1_UUID); + memberATopicsSubscription.add(TOPIC_2_UUID); + Map members = new HashMap<>(); members.put(MEMBER_A, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), Optional.empty(), - Set.of(TOPIC_1_UUID, TOPIC_2_UUID), + memberATopicsSubscription, Assignment.EMPTY )); members.put(MEMBER_B, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), Optional.empty(), - Collections.emptySet(), + Set.of(), Assignment.EMPTY )); GroupSpec groupSpec = new GroupSpecImpl( members, HETEROGENEOUS, - Collections.emptyMap() + Map.of() ); SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata); @@ -320,10 +363,376 @@ public class SimpleAssignorTest { Map>> expectedAssignment = new HashMap<>(); expectedAssignment.put(MEMBER_A, mkAssignment( mkTopicAssignment(TOPIC_1_UUID, 0, 1, 2), - mkTopicAssignment(TOPIC_2_UUID, 0, 1) + mkTopicAssignment(TOPIC_2_UUID, 0, 1))); + expectedAssignment.put(MEMBER_B, mkAssignment()); + + // T1: 3 partitions + T2: 2 partitions = 5 partitions + assertEveryPartitionGetsAssignment(5, computedAssignment); + assertAssignment(expectedAssignment, computedAssignment); + } + + @Test + public void testMemberHashAssignment() { + // hashcode for "member1" is 948881623. + String member1 = "member1"; + // hashcode for "member2" is 948881624. + String member2 = "member2"; + // hashcode for "member3" is 948881625. + String member3 = "member3"; + // hashcode for "member4" is 948881626. + String member4 = "member4"; + // hashcode for "AaAaAaAa" is -540425984 to test with negative hashcode. + String member5 = "AaAaAaAa"; + List members = List.of(member1, member2, member3, member4, member5); + + TopicIdPartition partition1 = new TopicIdPartition(TOPIC_1_UUID, 0); + TopicIdPartition partition2 = new TopicIdPartition(TOPIC_2_UUID, 0); + TopicIdPartition partition3 = new TopicIdPartition(TOPIC_3_UUID, 0); + List partitions = List.of(partition1, partition2, partition3); + + Map> computedAssignment = new HashMap<>(); + assignor.memberHashAssignment(partitions, members, computedAssignment); + + Map> expectedAssignment = new HashMap<>(); + expectedAssignment.put(partition1, List.of(member3)); + expectedAssignment.put(partition2, List.of(member1, member4)); + expectedAssignment.put(partition3, List.of(member2, member5)); + assertAssignment(expectedAssignment, computedAssignment); + } + + @Test + public void testRoundRobinAssignment() { + String member1 = "member1"; + String member2 = "member2"; + List members = List.of(member1, member2); + TopicIdPartition partition1 = new TopicIdPartition(TOPIC_1_UUID, 0); + TopicIdPartition partition2 = new TopicIdPartition(TOPIC_2_UUID, 0); + TopicIdPartition partition3 = new TopicIdPartition(TOPIC_3_UUID, 0); + TopicIdPartition partition4 = new TopicIdPartition(TOPIC_4_UUID, 0); + List unassignedPartitions = List.of(partition2, partition3, partition4); + + Map> assignment = new HashMap<>(); + assignment.put(partition1, List.of(member1)); + + assignor.roundRobinAssignment(members, unassignedPartitions, assignment); + Map> expectedAssignment = Map.of( + partition1, List.of(member1), + partition2, List.of(member1), + partition3, List.of(member2), + partition4, List.of(member1) + ); + + assertAssignment(expectedAssignment, assignment); + } + + @Test + public void testAssignWithCurrentAssignmentHomogeneous() { + // Current assignment setup - Two members A, B subscribing to T1 and T2. + Map topicMetadata1 = new HashMap<>(); + topicMetadata1.put(TOPIC_1_UUID, new TopicMetadata( + TOPIC_1_UUID, + TOPIC_1_NAME, + 3 + )); + topicMetadata1.put(TOPIC_2_UUID, new TopicMetadata( + TOPIC_2_UUID, + TOPIC_2_NAME, + 2 )); - assertAssignment(expectedAssignment, computedAssignment); + Map members1 = new HashMap<>(); + + Set topicsSubscription1 = new LinkedHashSet<>(); + topicsSubscription1.add(TOPIC_1_UUID); + topicsSubscription1.add(TOPIC_2_UUID); + + members1.put(MEMBER_A, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), + Optional.empty(), + topicsSubscription1, + Assignment.EMPTY + )); + + members1.put(MEMBER_B, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), + Optional.empty(), + topicsSubscription1, + Assignment.EMPTY + )); + + GroupSpec groupSpec1 = new GroupSpecImpl( + members1, + HOMOGENEOUS, + Map.of() + ); + SubscribedTopicDescriberImpl subscribedTopicMetadata1 = new SubscribedTopicDescriberImpl(topicMetadata1); + + GroupAssignment computedAssignment1 = assignor.assign( + groupSpec1, + subscribedTopicMetadata1 + ); + + // Hashcode of MEMBER_A is 65. Hashcode of MEMBER_B is 66. + // Step 1 -> T1:0 -> MEMBER_A and T1:1 -> MEMBER_B by hash assignment. + // Step 2 -> T1:2, T2:1 -> MEMBER_A and T2:0 -> MEMBER_B by round-robin assignment. + // Step 3 -> no new assignment gets added by current assignment since it is empty. + Map>> expectedAssignment1 = new HashMap<>(); + expectedAssignment1.put(MEMBER_A, mkAssignment( + mkTopicAssignment(TOPIC_1_UUID, 0, 2), + mkTopicAssignment(TOPIC_2_UUID, 1) + )); + expectedAssignment1.put(MEMBER_B, mkAssignment( + mkTopicAssignment(TOPIC_1_UUID, 1), + mkTopicAssignment(TOPIC_2_UUID, 0) + )); + + // T1: 3 partitions + T2: 2 partitions = 5 partitions + assertEveryPartitionGetsAssignment(5, computedAssignment1); + assertAssignment(expectedAssignment1, computedAssignment1); + + // New assignment setup - Three members A, B, C subscribing to T2 and T3. + Map topicMetadata2 = new HashMap<>(); + topicMetadata2.put(TOPIC_2_UUID, new TopicMetadata( + TOPIC_2_UUID, + TOPIC_2_NAME, + 2 + )); + topicMetadata2.put(TOPIC_3_UUID, new TopicMetadata( + TOPIC_3_UUID, + TOPIC_3_NAME, + 3 + )); + + Map members2 = new HashMap<>(); + + Set topicsSubscription2 = new LinkedHashSet<>(); + topicsSubscription2.add(TOPIC_2_UUID); + topicsSubscription2.add(TOPIC_3_UUID); + + members2.put(MEMBER_A, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), + Optional.empty(), + topicsSubscription2, + // Utilizing the assignment from current assignment + new Assignment(mkAssignment( + mkTopicAssignment(TOPIC_1_UUID, 0, 2), + mkTopicAssignment(TOPIC_2_UUID, 1))) + )); + + members2.put(MEMBER_B, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), + Optional.empty(), + topicsSubscription2, + new Assignment(mkAssignment( + mkTopicAssignment(TOPIC_1_UUID, 1), + mkTopicAssignment(TOPIC_2_UUID, 0))) + )); + + members2.put(MEMBER_C, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), + Optional.empty(), + topicsSubscription2, + Assignment.EMPTY + )); + + GroupSpec groupSpec2 = new GroupSpecImpl( + members2, + HOMOGENEOUS, + Map.of() + ); + SubscribedTopicDescriberImpl subscribedTopicMetadata2 = new SubscribedTopicDescriberImpl(topicMetadata2); + + GroupAssignment computedAssignment2 = assignor.assign( + groupSpec2, + subscribedTopicMetadata2 + ); + + // Hashcode of MEMBER_A is 65. Hashcode of MEMBER_B is 66. Hashcode of MEMBER_C is 67. + // Step 1 -> T2:0 -> MEMBER_A, T2:1 -> MEMBER_B, T3:0 -> MEMBER_C by hash assignment + // Step 2 -> T3:1 -> MEMBER_A, T3:2 -> MEMBER_B by round-robin assignment + // Step 3 -> no new addition by current assignment since T2:0 and T2:1 were already a part of new assignment. + Map>> expectedAssignment2 = new HashMap<>(); + expectedAssignment2.put(MEMBER_A, mkAssignment( + mkTopicAssignment(TOPIC_2_UUID, 0), + mkTopicAssignment(TOPIC_3_UUID, 1) + )); + expectedAssignment2.put(MEMBER_B, mkAssignment( + mkTopicAssignment(TOPIC_2_UUID, 1), + mkTopicAssignment(TOPIC_3_UUID, 2) + )); + expectedAssignment2.put(MEMBER_C, mkAssignment( + mkTopicAssignment(TOPIC_3_UUID, 0) + )); + + // T2: 2 partitions + T3: 3 partitions = 5 partitions + assertEveryPartitionGetsAssignment(5, computedAssignment2); + assertAssignment(expectedAssignment2, computedAssignment2); + } + + @Test + public void testAssignWithCurrentAssignmentHeterogeneous() { + // Current assignment setup - 3 members A - {T1, T2}, B - {T3}, C - {T2, T3}. + Map topicMetadata1 = new HashMap<>(); + topicMetadata1.put(TOPIC_1_UUID, new TopicMetadata( + TOPIC_1_UUID, + TOPIC_1_NAME, + 3 + )); + + topicMetadata1.put(TOPIC_2_UUID, new TopicMetadata( + TOPIC_2_UUID, + TOPIC_2_NAME, + 3 + )); + topicMetadata1.put(TOPIC_3_UUID, new TopicMetadata( + TOPIC_3_UUID, + TOPIC_3_NAME, + 2 + )); + + Set memberATopicsSubscription1 = new LinkedHashSet<>(); + memberATopicsSubscription1.add(TOPIC_1_UUID); + memberATopicsSubscription1.add(TOPIC_2_UUID); + + Map members1 = new HashMap<>(); + members1.put(MEMBER_A, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), + Optional.empty(), + memberATopicsSubscription1, + Assignment.EMPTY + )); + + members1.put(MEMBER_B, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), + Optional.empty(), + Set.of(TOPIC_3_UUID), + Assignment.EMPTY + )); + + Set memberCTopicsSubscription1 = new LinkedHashSet<>(); + memberCTopicsSubscription1.add(TOPIC_2_UUID); + memberCTopicsSubscription1.add(TOPIC_3_UUID); + members1.put(MEMBER_C, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), + Optional.empty(), + memberCTopicsSubscription1, + Assignment.EMPTY + )); + + GroupSpec groupSpec1 = new GroupSpecImpl( + members1, + HETEROGENEOUS, + Map.of() + ); + SubscribedTopicDescriberImpl subscribedTopicMetadata1 = new SubscribedTopicDescriberImpl(topicMetadata1); + + GroupAssignment computedAssignment1 = assignor.assign( + groupSpec1, + subscribedTopicMetadata1 + ); + + // Hashcode of MEMBER_A is 65. Hashcode of MEMBER_B is 66. Hashcode of MEMBER_C is 67. + // Step 1 -> T2:2 -> member_A, T3:0 -> member_B, T2:2 -> member_C by hash assignment. + // Step 2 -> T1:0, T1:1, T1:2, T2:0 -> member_A, T3:1, -> member_B, T2:1 -> member_C by round-robin assignment. + // Step 3 -> no new assignment gets added by current assignment since it is empty. + Map>> expectedAssignment1 = new HashMap<>(); + expectedAssignment1.put(MEMBER_A, mkAssignment( + mkTopicAssignment(TOPIC_1_UUID, 0, 1, 2), + mkTopicAssignment(TOPIC_2_UUID, 0, 2) + )); + expectedAssignment1.put(MEMBER_B, mkAssignment( + mkTopicAssignment(TOPIC_3_UUID, 0, 1) + )); + expectedAssignment1.put(MEMBER_C, mkAssignment( + mkTopicAssignment(TOPIC_2_UUID, 1, 2) + )); + + // T1: 3 partitions + T2: 3 partitions + T3: 2 partitions = 8 partitions + assertEveryPartitionGetsAssignment(8, computedAssignment1); + assertAssignment(expectedAssignment1, computedAssignment1); + + // New assignment setup - 2 members A - {T1, T2, T3}, B - {T3, T4}. + + Map topicMetadata2 = new HashMap<>(); + topicMetadata2.put(TOPIC_1_UUID, new TopicMetadata( + TOPIC_1_UUID, + TOPIC_1_NAME, + 3 + )); + topicMetadata2.put(TOPIC_2_UUID, new TopicMetadata( + TOPIC_2_UUID, + TOPIC_2_NAME, + 3 + )); + topicMetadata2.put(TOPIC_3_UUID, new TopicMetadata( + TOPIC_3_UUID, + TOPIC_3_NAME, + 2 + )); + topicMetadata2.put(TOPIC_4_UUID, new TopicMetadata( + TOPIC_4_UUID, + TOPIC_4_NAME, + 1 + )); + + Map members2 = new HashMap<>(); + + Set memberATopicsSubscription2 = new LinkedHashSet<>(); + memberATopicsSubscription2.add(TOPIC_1_UUID); + memberATopicsSubscription2.add(TOPIC_2_UUID); + memberATopicsSubscription2.add(TOPIC_3_UUID); + + Set memberBTopicsSubscription2 = new LinkedHashSet<>(); + memberBTopicsSubscription2.add(TOPIC_3_UUID); + memberBTopicsSubscription2.add(TOPIC_4_UUID); + + members2.put(MEMBER_A, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), + Optional.empty(), + memberATopicsSubscription2, + new Assignment(mkAssignment( + mkTopicAssignment(TOPIC_1_UUID, 0, 1, 2), + mkTopicAssignment(TOPIC_2_UUID, 0, 2))) + )); + + members2.put(MEMBER_B, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), + Optional.empty(), + memberBTopicsSubscription2, + new Assignment(mkAssignment( + mkTopicAssignment(TOPIC_3_UUID, 0, 1))) + )); + + GroupSpec groupSpec2 = new GroupSpecImpl( + members2, + HETEROGENEOUS, + Map.of() + ); + + SubscribedTopicDescriberImpl subscribedTopicMetadata2 = new SubscribedTopicDescriberImpl(topicMetadata2); + + GroupAssignment computedAssignment2 = assignor.assign( + groupSpec2, + subscribedTopicMetadata2 + ); + + // Hashcode of MEMBER_A is 65. Hashcode of MEMBER_B is 66. + // Step 1 -> T1:1 -> member_A, T3:0 -> member_B by hash assignment. + // Step 2 -> T2:1 -> member_A, T4:0 -> member_B by round-robin assignment. + // Step 3 -> T1:0, T1:2, T2:0 -> member_A, T3:1 -> member_B by current assignment. + Map>> expectedAssignment2 = new HashMap<>(); + expectedAssignment2.put(MEMBER_A, mkAssignment( + mkTopicAssignment(TOPIC_1_UUID, 0, 1, 2), + mkTopicAssignment(TOPIC_2_UUID, 0, 1, 2) + )); + expectedAssignment2.put(MEMBER_B, mkAssignment( + mkTopicAssignment(TOPIC_3_UUID, 0, 1), + mkTopicAssignment(TOPIC_4_UUID, 0) + )); + + // T1: 3 partitions + T2: 3 partitions + T3: 2 partitions + T4: 1 partition = 9 partitions + assertEveryPartitionGetsAssignment(9, computedAssignment2); + assertAssignment(expectedAssignment2, computedAssignment2); } private void assertAssignment( @@ -336,4 +745,31 @@ public class SimpleAssignorTest { assertEquals(expectedAssignment.get(memberId), computedAssignmentForMember); } } + + private void assertAssignment( + Map> expectedAssignment, + Map> computedAssignment + ) { + assertEquals(expectedAssignment.size(), computedAssignment.size()); + expectedAssignment.forEach((topicIdPartition, members) -> { + List computedMembers = computedAssignment.getOrDefault(topicIdPartition, List.of()); + assertEquals(members.size(), computedMembers.size()); + members.forEach(member -> assertTrue(computedMembers.contains(member))); + }); + } + + private void assertEveryPartitionGetsAssignment( + int expectedPartitions, + GroupAssignment computedGroupAssignment + ) { + Map memberAssignments = computedGroupAssignment.members(); + Set topicPartitionAssignments = new HashSet<>(); + memberAssignments.values().forEach(memberAssignment -> { + Map> topicIdPartitions = memberAssignment.partitions(); + topicIdPartitions.forEach((topicId, partitions) -> + partitions.forEach(partition -> topicPartitionAssignments.add(new TopicIdPartition(topicId, partition))) + ); + }); + assertEquals(expectedPartitions, topicPartitionAssignments.size()); + } }