KAFKA-16821; Member Subscription Spec Interface (#16068)

This patch reworks the `PartitionAssignor` interface to use interfaces instead of POJOs. It mainly introduces the `MemberSubscriptionSpec` interface that represents a member subscription and changes the `GroupSpec` interfaces to expose the subscriptions and the assignments via different methods.

The patch does not change the performance.

before:
```
Benchmark                                     (memberCount)  (partitionsToMemberRatio)  (topicCount)  Mode  Cnt  Score   Error  Units
TargetAssignmentBuilderBenchmark.build                10000                         10           100  avgt    5  3.462 ± 0.687  ms/op
TargetAssignmentBuilderBenchmark.build                10000                         10          1000  avgt    5  3.626 ± 0.412  ms/op
JMH benchmarks done
```

after:
```
Benchmark                                     (memberCount)  (partitionsToMemberRatio)  (topicCount)  Mode  Cnt  Score   Error  Units
TargetAssignmentBuilderBenchmark.build                10000                         10           100  avgt    5  3.677 ± 0.683  ms/op
TargetAssignmentBuilderBenchmark.build                10000                         10          1000  avgt    5  3.991 ± 0.065  ms/op
JMH benchmarks done
```

Reviewers: David Jacot <djacot@confluent.io>
This commit is contained in:
Ritika Reddy 2024-06-04 06:44:37 -07:00 committed by GitHub
parent 7d82f7625e
commit 078dd9a311
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 681 additions and 676 deletions

View File

@ -1,122 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.assignor;
import org.apache.kafka.common.Uuid;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
/**
* The assignment specification for a consumer group member.
*/
public class AssignmentMemberSpec {
/**
* The instance ID if provided.
*/
private final Optional<String> instanceId;
/**
* The rack ID if provided.
*/
private final Optional<String> rackId;
/**
* Topics Ids that the member is subscribed to.
*/
private final Set<Uuid> subscribedTopicIds;
/**
* Partitions assigned keyed by topicId.
*/
private final Map<Uuid, Set<Integer>> assignedPartitions;
/**
* @return The instance ID as an Optional.
*/
public Optional<String> instanceId() {
return instanceId;
}
/**
* @return The rack ID as an Optional.
*/
public Optional<String> rackId() {
return rackId;
}
/**
* @return Set of subscribed topic Ids.
*/
public Set<Uuid> subscribedTopicIds() {
return subscribedTopicIds;
}
/**
* @return Assigned partitions keyed by topic Ids.
*/
public Map<Uuid, Set<Integer>> assignedPartitions() {
return assignedPartitions;
}
public AssignmentMemberSpec(
Optional<String> instanceId,
Optional<String> rackId,
Set<Uuid> subscribedTopicIds,
Map<Uuid, Set<Integer>> assignedPartitions
) {
Objects.requireNonNull(instanceId);
Objects.requireNonNull(rackId);
Objects.requireNonNull(subscribedTopicIds);
Objects.requireNonNull(assignedPartitions);
this.instanceId = instanceId;
this.rackId = rackId;
this.subscribedTopicIds = subscribedTopicIds;
this.assignedPartitions = assignedPartitions;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AssignmentMemberSpec that = (AssignmentMemberSpec) o;
if (!instanceId.equals(that.instanceId)) return false;
if (!rackId.equals(that.rackId)) return false;
if (!subscribedTopicIds.equals(that.subscribedTopicIds)) return false;
return assignedPartitions.equals(that.assignedPartitions);
}
@Override
public int hashCode() {
int result = instanceId.hashCode();
result = 31 * result + rackId.hashCode();
result = 31 * result + subscribedTopicIds.hashCode();
result = 31 * result + assignedPartitions.hashCode();
return result;
}
@Override
public String toString() {
return "AssignmentMemberSpec(instanceId=" + instanceId +
", rackId=" + rackId +
", subscribedTopicIds=" + subscribedTopicIds +
", assignedPartitions=" + assignedPartitions +
')';
}
}

View File

@ -53,9 +53,9 @@ public class GeneralUniformAssignmentBuilder extends AbstractUniformAssignmentBu
private static final Logger LOG = LoggerFactory.getLogger(GeneralUniformAssignmentBuilder.class);
/**
* The member metadata obtained from the assignment specification.
* The group metadata specification.
*/
private final Map<String, AssignmentMemberSpec> members;
private final GroupSpec groupSpec;
/**
* The topic and partition metadata describer.
@ -108,13 +108,13 @@ public class GeneralUniformAssignmentBuilder extends AbstractUniformAssignmentBu
private final PartitionMovements partitionMovements;
public GeneralUniformAssignmentBuilder(GroupSpec groupSpec, SubscribedTopicDescriber subscribedTopicDescriber) {
this.members = groupSpec.members();
this.groupSpec = groupSpec;
this.subscribedTopicDescriber = subscribedTopicDescriber;
this.subscribedTopicIds = new HashSet<>();
this.membersPerTopic = new HashMap<>();
this.targetAssignment = new HashMap<>();
members.forEach((memberId, memberMetadata) ->
memberMetadata.subscribedTopicIds().forEach(topicId -> {
groupSpec.memberIds().forEach(memberId ->
groupSpec.memberSubscription(memberId).subscribedTopicIds().forEach(topicId -> {
// Check if the subscribed topic exists.
int partitionCount = subscribedTopicDescriber.numPartitions(topicId);
if (partitionCount == -1) {
@ -129,8 +129,8 @@ public class GeneralUniformAssignmentBuilder extends AbstractUniformAssignmentBu
);
this.unassignedPartitions = new HashSet<>(topicIdPartitions(subscribedTopicIds, subscribedTopicDescriber));
this.assignedStickyPartitions = new HashSet<>();
this.assignmentManager = new AssignmentManager(this.members, this.subscribedTopicDescriber);
this.sortedMembersByAssignmentSize = assignmentManager.sortMembersByAssignmentSize(members.keySet());
this.assignmentManager = new AssignmentManager(this.subscribedTopicDescriber);
this.sortedMembersByAssignmentSize = assignmentManager.sortMembersByAssignmentSize(groupSpec.memberIds());
this.partitionOwnerInTargetAssignment = new HashMap<>();
this.partitionMovements = new PartitionMovements();
}
@ -149,7 +149,6 @@ public class GeneralUniformAssignmentBuilder extends AbstractUniformAssignmentBu
return new GroupAssignment(Collections.emptyMap());
}
// All existing partitions are retained until max assignment size.
assignStickyPartitions();
unassignedPartitionsAssignment();
@ -191,9 +190,9 @@ public class GeneralUniformAssignmentBuilder extends AbstractUniformAssignmentBu
* <li> Partitions from topics that are still present in both the new subscriptions and the topic metadata. </li>
*/
private void assignStickyPartitions() {
members.forEach((memberId, assignmentMemberSpec) ->
assignmentMemberSpec.assignedPartitions().forEach((topicId, currentAssignment) -> {
if (assignmentMemberSpec.subscribedTopicIds().contains(topicId)) {
groupSpec.memberIds().forEach(memberId ->
groupSpec.memberAssignment(memberId).forEach((topicId, currentAssignment) -> {
if (groupSpec.memberSubscription(memberId).subscribedTopicIds().contains(topicId)) {
currentAssignment.forEach(partition -> {
TopicIdPartition topicIdPartition = new TopicIdPartition(topicId, partition);
assignmentManager.addPartitionToTargetAssignment(topicIdPartition, memberId);
@ -292,7 +291,7 @@ public class GeneralUniformAssignmentBuilder extends AbstractUniformAssignmentBu
continue;
// Otherwise make sure it cannot get any more partitions.
for (Uuid topicId : members.get(member).subscribedTopicIds()) {
for (Uuid topicId : groupSpec.memberSubscription(member).subscribedTopicIds()) {
Set<Integer> assignedPartitions = targetAssignment.get(member).targetPartitions().get(topicId);
for (int i = 0; i < subscribedTopicDescriber.numPartitions(topicId); i++) {
TopicIdPartition topicIdPartition = new TopicIdPartition(topicId, i);
@ -332,7 +331,7 @@ public class GeneralUniformAssignmentBuilder extends AbstractUniformAssignmentBu
unassignedPartitions.removeAll(fixedPartitions);
// Narrow down the reassignment scope to only those members that are subject to reassignment.
for (String member : members.keySet()) {
for (String member : groupSpec.memberIds()) {
if (!canMemberParticipateInReassignment(member)) {
sortedMembersByAssignmentSize.remove(member);
}
@ -411,7 +410,7 @@ public class GeneralUniformAssignmentBuilder extends AbstractUniformAssignmentBu
// Find the new member with the least assignment size.
String newOwner = null;
for (String anotherMember : sortedMembersByAssignmentSize) {
if (members.get(anotherMember).subscribedTopicIds().contains(partition.topicId())) {
if (groupSpec.memberSubscription(anotherMember).subscribedTopicIds().contains(partition.topicId())) {
newOwner = anotherMember;
break;
}
@ -646,9 +645,11 @@ public class GeneralUniformAssignmentBuilder extends AbstractUniformAssignmentBu
/**
* Initializes an AssignmentManager, setting up the necessary data structures.
*/
public AssignmentManager(Map<String, AssignmentMemberSpec> members, SubscribedTopicDescriber subscribedTopicDescriber) {
members.forEach((memberId, member) -> {
int maxSize = member.subscribedTopicIds().stream()
public AssignmentManager(
SubscribedTopicDescriber subscribedTopicDescriber
) {
groupSpec.memberIds().forEach(memberId -> {
int maxSize = groupSpec.memberSubscription(memberId).subscribedTopicIds().stream()
.mapToInt(subscribedTopicDescriber::numPartitions)
.sum();
@ -734,7 +735,7 @@ public class GeneralUniformAssignmentBuilder extends AbstractUniformAssignmentBu
String memberId
) {
// If member is not subscribed to the partition's topic, return false without assigning.
if (!members.get(memberId).subscribedTopicIds().contains(topicIdPartition.topicId())) {
if (!groupSpec.memberSubscription(memberId).subscribedTopicIds().contains(topicIdPartition.topicId())) {
return false;
}

View File

@ -18,16 +18,18 @@ package org.apache.kafka.coordinator.group.assignor;
import org.apache.kafka.common.Uuid;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
/**
* The group metadata specifications required to compute the target assignment.
*/
public interface GroupSpec {
/**
* @return Member metadata keyed by member Id.
* @return All the member Ids of the consumer group.
*/
Map<String, AssignmentMemberSpec> members();
Collection<String> memberIds();
/**
* @return The group's subscription type.
@ -39,4 +41,22 @@ public interface GroupSpec {
* False, otherwise.
*/
boolean isPartitionAssigned(Uuid topicId, int partitionId);
/**
* Gets the member subscription specification for a member.
*
* @param memberId The member Id.
* @return The member's subscription metadata.
* @throws IllegalArgumentException If the member Id isn't found.
*/
MemberSubscriptionSpec memberSubscription(String memberId);
/**
* Gets the current assignment of the member.
*
* @param memberId The member Id.
* @return A map of topic Ids to sets of partition numbers.
* An empty map is returned if the member Id isn't found.
*/
Map<Uuid, Set<Integer>> memberAssignment(String memberId);
}

View File

@ -18,20 +18,23 @@ package org.apache.kafka.coordinator.group.assignor;
import org.apache.kafka.common.Uuid;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
/**
* The assignment specification for a consumer group.
*/
public class GroupSpecImpl implements GroupSpec {
/**
* The member metadata keyed by member Id.
* Member subscription metadata keyed by member Id.
*/
private final Map<String, AssignmentMemberSpec> members;
private final Map<String, MemberSubscriptionSpecImpl> memberSubscriptions;
/**
* The subscription type followed by the group.
* The subscription type of the group.
*/
private final SubscriptionType subscriptionType;
@ -39,27 +42,24 @@ public class GroupSpecImpl implements GroupSpec {
* Reverse lookup map representing topic partitions with
* their current member assignments.
*/
private final Map<Uuid, Map<Integer, String>> invertedTargetAssignment;
private final Map<Uuid, Map<Integer, String>> invertedMemberAssignment;
public GroupSpecImpl(
Map<String, AssignmentMemberSpec> members,
Map<String, MemberSubscriptionSpecImpl> memberSubscriptions,
SubscriptionType subscriptionType,
Map<Uuid, Map<Integer, String>> invertedTargetAssignment
Map<Uuid, Map<Integer, String>> invertedMemberAssignment
) {
Objects.requireNonNull(members);
Objects.requireNonNull(subscriptionType);
Objects.requireNonNull(invertedTargetAssignment);
this.members = members;
this.subscriptionType = subscriptionType;
this.invertedTargetAssignment = invertedTargetAssignment;
this.memberSubscriptions = Objects.requireNonNull(memberSubscriptions);
this.subscriptionType = Objects.requireNonNull(subscriptionType);
this.invertedMemberAssignment = Objects.requireNonNull(invertedMemberAssignment);
}
/**
* {@inheritDoc}
*/
@Override
public Map<String, AssignmentMemberSpec> members() {
return members;
public Collection<String> memberIds() {
return memberSubscriptions.keySet();
}
/**
@ -75,36 +75,60 @@ public class GroupSpecImpl implements GroupSpec {
*/
@Override
public boolean isPartitionAssigned(Uuid topicId, int partitionId) {
Map<Integer, String> partitionMap = invertedTargetAssignment.get(topicId);
Map<Integer, String> partitionMap = invertedMemberAssignment.get(topicId);
if (partitionMap == null) {
return false;
}
return partitionMap.containsKey(partitionId);
}
/**
* {@inheritDoc}
*/
@Override
public MemberSubscriptionSpec memberSubscription(String memberId) {
MemberSubscriptionSpec memberSubscription = memberSubscriptions.get(memberId);
if (memberSubscription == null) {
throw new IllegalArgumentException("Member Id " + memberId + " not found.");
}
return memberSubscription;
}
/**
* {@inheritDoc}
*/
@Override
public Map<Uuid, Set<Integer>> memberAssignment(String memberId) {
MemberSubscriptionSpecImpl memberSubscription = memberSubscriptions.get(memberId);
if (memberSubscription == null) {
return Collections.emptyMap();
}
return memberSubscription.memberAssignment();
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
GroupSpecImpl that = (GroupSpecImpl) o;
return subscriptionType == that.subscriptionType &&
members.equals(that.members) &&
invertedTargetAssignment.equals(that.invertedTargetAssignment);
memberSubscriptions.equals(that.memberSubscriptions) &&
invertedMemberAssignment.equals(that.invertedMemberAssignment);
}
@Override
public int hashCode() {
int result = members.hashCode();
int result = memberSubscriptions.hashCode();
result = 31 * result + subscriptionType.hashCode();
result = 31 * result + invertedTargetAssignment.hashCode();
result = 31 * result + invertedMemberAssignment.hashCode();
return result;
}
@Override
public String toString() {
return "GroupSpecImpl(members=" + members +
return "GroupSpecImpl(memberSubscriptions=" + memberSubscriptions +
", subscriptionType=" + subscriptionType +
", invertedTargetAssignment=" + invertedTargetAssignment +
", invertedMemberAssignment=" + invertedMemberAssignment +
')';
}
}

View File

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.assignor;
import org.apache.kafka.common.Uuid;
import java.util.Optional;
import java.util.Set;
/**
* Interface representing the subscription metadata for a group member.
*/
public interface MemberSubscriptionSpec {
/**
* Gets the rack Id if present.
*
* @return An Optional containing the rack Id, or an empty Optional if not present.
*/
Optional<String> rackId();
/**
* Gets the set of subscribed topic Ids.
*
* @return The set of subscribed topic Ids.
*/
Set<Uuid> subscribedTopicIds();
}

View File

@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.assignor;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.consumer.Assignment;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
/**
* Implementation of the {@link MemberSubscriptionSpec} interface.
*/
public class MemberSubscriptionSpecImpl implements MemberSubscriptionSpec {
private final Optional<String> rackId;
private final Set<Uuid> subscribedTopicIds;
private final Assignment memberAssignment;
/**
* Constructs a new {@code MemberSubscriptionSpecImpl}.
*
* @param rackId The rack Id.
* @param subscribedTopicIds The set of subscribed topic Ids.
* @param memberAssignment The current member assignment.
*/
public MemberSubscriptionSpecImpl(
Optional<String> rackId,
Set<Uuid> subscribedTopicIds,
Assignment memberAssignment
) {
this.rackId = Objects.requireNonNull(rackId);
this.subscribedTopicIds = Objects.requireNonNull(subscribedTopicIds);
this.memberAssignment = Objects.requireNonNull(memberAssignment);
}
@Override
public Optional<String> rackId() {
return rackId;
}
@Override
public Set<Uuid> subscribedTopicIds() {
return subscribedTopicIds;
}
public Map<Uuid, Set<Integer>> memberAssignment() {
return memberAssignment.partitions();
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
MemberSubscriptionSpecImpl that = (MemberSubscriptionSpecImpl) o;
return rackId.equals(that.rackId) &&
subscribedTopicIds.equals(that.subscribedTopicIds) &&
memberAssignment.equals(that.memberAssignment);
}
@Override
public int hashCode() {
int result = rackId.hashCode();
result = 31 * result + subscribedTopicIds.hashCode();
result = 31 * result + memberAssignment.hashCode();
return result;
}
@Override
public String toString() {
return "MemberSubscriptionSpecImpl(rackId=" + rackId.orElse("N/A") +
", subscribedTopicIds=" + subscribedTopicIds +
", memberAssignment=" + memberAssignment +
')';
}
}

View File

@ -101,9 +101,11 @@ public class OptimizedUniformAssignmentBuilder {
OptimizedUniformAssignmentBuilder(GroupSpec groupSpec, SubscribedTopicDescriber subscribedTopicDescriber) {
this.groupSpec = groupSpec;
this.subscribedTopicDescriber = subscribedTopicDescriber;
this.subscribedTopicIds = new HashSet<>(groupSpec.members().values().iterator().next().subscribedTopicIds());
this.subscribedTopicIds = new HashSet<>(groupSpec.memberSubscription(groupSpec.memberIds().iterator().next())
.subscribedTopicIds());
this.unfilledMembers = new ArrayList<>();
this.unassignedPartitions = new ArrayList<>();
this.targetAssignment = new HashMap<>();
}
@ -135,7 +137,7 @@ public class OptimizedUniformAssignmentBuilder {
// Compute the minimum required quota per member and the number of members
// that should receive an extra partition.
int numberOfMembers = groupSpec.members().size();
int numberOfMembers = groupSpec.memberIds().size();
minimumMemberQuota = totalPartitionsCount / numberOfMembers;
remainingMembersToGetAnExtraPartition = totalPartitionsCount % numberOfMembers;
@ -157,10 +159,8 @@ public class OptimizedUniformAssignmentBuilder {
* altered.
*/
private void maybeRevokePartitions() {
for (Map.Entry<String, AssignmentMemberSpec> entry : groupSpec.members().entrySet()) {
String memberId = entry.getKey();
AssignmentMemberSpec assignmentMemberSpec = entry.getValue();
Map<Uuid, Set<Integer>> oldAssignment = assignmentMemberSpec.assignedPartitions();
for (String memberId : groupSpec.memberIds()) {
Map<Uuid, Set<Integer>> oldAssignment = groupSpec.memberAssignment(memberId);
Map<Uuid, Set<Integer>> newAssignment = null;
// The assignor expects to receive the assignment as an immutable map. It leverages

View File

@ -81,7 +81,7 @@ public class RangeAssignor implements ConsumerGroupPartitionAssignor {
* Returns a map of topic Ids to a list of members subscribed to them,
* based on the given assignment specification and metadata.
*
* @param groupSpec The specification for member assignments.
* @param groupSpec The specification required for group assignments.
* @param subscribedTopicDescriber The metadata describer for subscribed topics and clusters.
* @return A map of topic Ids to a list of member Ids subscribed to them.
*
@ -92,11 +92,11 @@ public class RangeAssignor implements ConsumerGroupPartitionAssignor {
final SubscribedTopicDescriber subscribedTopicDescriber
) {
Map<Uuid, Collection<String>> membersPerTopic = new HashMap<>();
Map<String, AssignmentMemberSpec> membersData = groupSpec.members();
if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) {
Set<String> allMembers = membersData.keySet();
Collection<Uuid> topics = membersData.values().iterator().next().subscribedTopicIds();
Collection<String> allMembers = groupSpec.memberIds();
Collection<Uuid> topics = groupSpec.memberSubscription(groupSpec.memberIds().iterator().next())
.subscribedTopicIds();
for (Uuid topicId : topics) {
if (subscribedTopicDescriber.numPartitions(topicId) == -1) {
@ -105,8 +105,8 @@ public class RangeAssignor implements ConsumerGroupPartitionAssignor {
membersPerTopic.put(topicId, allMembers);
}
} else {
membersData.forEach((memberId, memberMetadata) -> {
Collection<Uuid> topics = memberMetadata.subscribedTopicIds();
groupSpec.memberIds().forEach(memberId -> {
Collection<Uuid> topics = groupSpec.memberSubscription(memberId).subscribedTopicIds();
for (Uuid topicId : topics) {
if (subscribedTopicDescriber.numPartitions(topicId) == -1) {
throw new PartitionAssignorException("Member is subscribed to a non-existent topic");
@ -162,8 +162,8 @@ public class RangeAssignor implements ConsumerGroupPartitionAssignor {
List<MemberWithRemainingAssignments> potentiallyUnfilledMembers = new ArrayList<>();
for (String memberId : membersForTopic) {
Set<Integer> assignedPartitionsForTopic = groupSpec.members().get(memberId)
.assignedPartitions().getOrDefault(topicId, Collections.emptySet());
Set<Integer> assignedPartitionsForTopic = groupSpec.memberAssignment(memberId)
.getOrDefault(topicId, Collections.emptySet());
int currentAssignmentSize = assignedPartitionsForTopic.size();
List<Integer> currentAssignmentListForTopic = new ArrayList<>(assignedPartitionsForTopic);

View File

@ -66,7 +66,7 @@ public class UniformAssignor implements ConsumerGroupPartitionAssignor {
GroupSpec groupSpec,
SubscribedTopicDescriber subscribedTopicDescriber
) throws PartitionAssignorException {
if (groupSpec.members().isEmpty())
if (groupSpec.memberIds().isEmpty())
return new GroupAssignment(Collections.emptyMap());
if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) {

View File

@ -18,8 +18,8 @@ package org.apache.kafka.coordinator.group.consumer;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.CoordinatorRecord;
import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
import org.apache.kafka.coordinator.group.assignor.GroupSpecImpl;
import org.apache.kafka.coordinator.group.assignor.MemberSubscriptionSpecImpl;
import org.apache.kafka.coordinator.group.assignor.SubscriptionType;
import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
@ -293,14 +293,16 @@ public class TargetAssignmentBuilder {
* @throws PartitionAssignorException if the target assignment cannot be computed.
*/
public TargetAssignmentResult build() throws PartitionAssignorException {
Map<String, AssignmentMemberSpec> memberSpecs = new HashMap<>();
Map<String, MemberSubscriptionSpecImpl> memberSpecs = new HashMap<>();
// Prepare the member spec for all members.
members.forEach((memberId, member) -> memberSpecs.put(memberId, createAssignmentMemberSpec(
members.forEach((memberId, member) ->
memberSpecs.put(memberId, createMemberSubscriptionSpecImpl(
member,
targetAssignment.getOrDefault(memberId, Assignment.EMPTY),
topicsImage
)));
))
);
// Update the member spec if updated or deleted members.
updatedMembers.forEach((memberId, updatedMemberOrNull) -> {
@ -317,7 +319,7 @@ public class TargetAssignmentBuilder {
}
}
memberSpecs.put(memberId, createAssignmentMemberSpec(
memberSpecs.put(memberId, createMemberSubscriptionSpecImpl(
updatedMemberOrNull,
assignment,
topicsImage
@ -381,16 +383,16 @@ public class TargetAssignmentBuilder {
}
}
static AssignmentMemberSpec createAssignmentMemberSpec(
// private for testing
static MemberSubscriptionSpecImpl createMemberSubscriptionSpecImpl(
ConsumerGroupMember member,
Assignment targetAssignment,
Assignment memberAssignment,
TopicsImage topicsImage
) {
return new AssignmentMemberSpec(
Optional.ofNullable(member.instanceId()),
return new MemberSubscriptionSpecImpl(
Optional.ofNullable(member.rackId()),
new TopicIds(member.subscribedTopicNames(), topicsImage),
targetAssignment.partitions()
memberAssignment
);
}
}

View File

@ -173,4 +173,11 @@ public class TopicIds implements Set<Uuid> {
result = 31 * result + image.hashCode();
return result;
}
@Override
public String toString() {
return "TopicIds(topicNames=" + topicNames +
", image=" + image +
')';
}
}

View File

@ -17,8 +17,8 @@
package org.apache.kafka.coordinator.group;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.assignor.MemberSubscriptionSpecImpl;
import java.util.AbstractMap;
import java.util.Arrays;
@ -86,21 +86,20 @@ public class AssignmentTestUtil {
}
/**
* Generate a reverse look up map of partition to member target assignments from the given member spec.
* Generate a reverse look up map of partition to member target assignments from the given metadata.
*
* @param memberSpec A map where the key is the member Id and the value is an
* AssignmentMemberSpec object containing the member's partition assignments.
* @param members The member subscription specs.
* @return Map of topic partition to member assignments.
*/
public static Map<Uuid, Map<Integer, String>> invertedTargetAssignment(
Map<String, AssignmentMemberSpec> memberSpec
Map<String, MemberSubscriptionSpecImpl> members
) {
Map<Uuid, Map<Integer, String>> invertedTargetAssignment = new HashMap<>();
for (Map.Entry<String, AssignmentMemberSpec> memberEntry : memberSpec.entrySet()) {
for (Map.Entry<String, MemberSubscriptionSpecImpl> memberEntry : members.entrySet()) {
String memberId = memberEntry.getKey();
Map<Uuid, Set<Integer>> topicsAndPartitions = memberEntry.getValue().assignedPartitions();
Map<Uuid, Set<Integer>> memberAssignment = memberEntry.getValue().memberAssignment();
for (Map.Entry<Uuid, Set<Integer>> topicEntry : topicsAndPartitions.entrySet()) {
for (Map.Entry<Uuid, Set<Integer>> topicEntry : memberAssignment.entrySet()) {
Uuid topicId = topicEntry.getKey();
Set<Integer> partitions = topicEntry.getValue();

View File

@ -22,7 +22,6 @@ import org.apache.kafka.coordinator.group.assignor.GroupSpec;
import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
import java.util.Map;
import java.util.stream.Collectors;
public class NoOpPartitionAssignor implements ConsumerGroupPartitionAssignor {
@ -35,11 +34,11 @@ public class NoOpPartitionAssignor implements ConsumerGroupPartitionAssignor {
@Override
public GroupAssignment assign(GroupSpec groupSpec, SubscribedTopicDescriber subscribedTopicDescriber) {
return new GroupAssignment(groupSpec.members().entrySet()
return new GroupAssignment(groupSpec.memberIds()
.stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
entry -> new MemberAssignment(entry.getValue().assignedPartitions())
memberId -> memberId,
memberId -> new MemberAssignment(groupSpec.memberAssignment(memberId))
)));
}
}

View File

@ -17,6 +17,7 @@
package org.apache.kafka.coordinator.group.assignor;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.consumer.Assignment;
import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
import org.junit.jupiter.api.Test;
@ -66,18 +67,16 @@ public class GeneralUniformAssignmentBuilderTest {
)
);
Map<String, AssignmentMemberSpec> members = new TreeMap<>();
members.put(memberA, new AssignmentMemberSpec(
Optional.empty(),
Map<String, MemberSubscriptionSpecImpl> members = new TreeMap<>();
members.put(memberA, new MemberSubscriptionSpecImpl(
Optional.empty(),
Collections.emptySet(),
Collections.emptyMap()
Assignment.EMPTY
));
members.put(memberB, new AssignmentMemberSpec(
Optional.empty(),
members.put(memberB, new MemberSubscriptionSpecImpl(
Optional.empty(),
Collections.emptySet(),
Collections.emptyMap()
Assignment.EMPTY
));
GroupSpec groupSpec = new GroupSpecImpl(
@ -108,18 +107,16 @@ public class GeneralUniformAssignmentBuilderTest {
)
);
Map<String, AssignmentMemberSpec> members = new TreeMap<>();
members.put(memberA, new AssignmentMemberSpec(
Optional.empty(),
Map<String, MemberSubscriptionSpecImpl> members = new TreeMap<>();
members.put(memberA, new MemberSubscriptionSpecImpl(
Optional.empty(),
Collections.singleton(topic3Uuid),
Collections.emptyMap()
Assignment.EMPTY
));
members.put(memberB, new AssignmentMemberSpec(
Optional.empty(),
members.put(memberB, new MemberSubscriptionSpecImpl(
Optional.empty(),
Collections.singleton(topic2Uuid),
Collections.emptyMap()
Assignment.EMPTY
));
GroupSpec groupSpec = new GroupSpecImpl(
@ -148,18 +145,18 @@ public class GeneralUniformAssignmentBuilderTest {
mkMapOfPartitionRacks(6)
));
Map<String, AssignmentMemberSpec> members = new TreeMap<>();
members.put(memberA, new AssignmentMemberSpec(
Optional.empty(),
Map<String, MemberSubscriptionSpecImpl> members = new TreeMap<>();
members.put(memberA, new MemberSubscriptionSpecImpl(
Optional.empty(),
mkSet(topic1Uuid, topic3Uuid),
Collections.emptyMap()
Assignment.EMPTY
));
members.put(memberB, new AssignmentMemberSpec(
Optional.empty(),
members.put(memberB, new MemberSubscriptionSpecImpl(
Optional.empty(),
Collections.singleton(topic3Uuid),
Collections.emptyMap()
Assignment.EMPTY
));
GroupSpec groupSpec = new GroupSpecImpl(
@ -202,24 +199,24 @@ public class GeneralUniformAssignmentBuilderTest {
mkMapOfPartitionRacks(2)
));
Map<String, AssignmentMemberSpec> members = new TreeMap<>();
members.put(memberA, new AssignmentMemberSpec(
Optional.empty(),
Map<String, MemberSubscriptionSpecImpl> members = new TreeMap<>();
members.put(memberA, new MemberSubscriptionSpecImpl(
Optional.empty(),
Collections.singleton(topic3Uuid),
Collections.emptyMap()
Assignment.EMPTY
));
members.put(memberB, new AssignmentMemberSpec(
Optional.empty(),
members.put(memberB, new MemberSubscriptionSpecImpl(
Optional.empty(),
Collections.singleton(topic3Uuid),
Collections.emptyMap()
Assignment.EMPTY
));
members.put(memberC, new AssignmentMemberSpec(
Optional.empty(),
members.put(memberC, new MemberSubscriptionSpecImpl(
Optional.empty(),
Collections.singleton(topic1Uuid),
Collections.emptyMap()
Assignment.EMPTY
));
GroupSpec groupSpec = new GroupSpecImpl(
@ -271,45 +268,33 @@ public class GeneralUniformAssignmentBuilderTest {
mkMapOfPartitionRacks(4)
));
Map<String, AssignmentMemberSpec> members = new TreeMap<>();
Map<String, MemberSubscriptionSpecImpl> members = new TreeMap<>();
Map<Uuid, Set<Integer>> currentAssignmentForA = new TreeMap<>(
mkAssignment(
mkTopicAssignment(topic1Uuid, 0, 1, 2)
)
);
members.put(memberA, new AssignmentMemberSpec(
members.put(memberA, new MemberSubscriptionSpecImpl(
Optional.empty(),
Optional.of("rack0"),
Collections.singleton(topic1Uuid),
currentAssignmentForA
new Assignment(mkAssignment(
mkTopicAssignment(topic1Uuid, 0, 1, 2)
))
));
Map<Uuid, Set<Integer>> currentAssignmentForB = new TreeMap<>(
mkAssignment(
members.put(memberB, new MemberSubscriptionSpecImpl(
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
new Assignment(mkAssignment(
mkTopicAssignment(topic1Uuid, 3),
mkTopicAssignment(topic2Uuid, 0)
)
);
members.put(memberB, new AssignmentMemberSpec(
Optional.empty(),
Optional.of("rack1"),
mkSet(topic1Uuid, topic2Uuid),
currentAssignmentForB
))
));
Map<Uuid, Set<Integer>> currentAssignmentForC = new TreeMap<>(
mkAssignment(
members.put(memberC, new MemberSubscriptionSpecImpl(
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid, topic3Uuid),
new Assignment(mkAssignment(
mkTopicAssignment(topic1Uuid, 4, 5),
mkTopicAssignment(topic2Uuid, 1, 2, 3),
mkTopicAssignment(topic3Uuid, 0, 1, 2, 3)
)
);
members.put(memberC, new AssignmentMemberSpec(
Optional.empty(),
Optional.of("rack2"),
mkSet(topic1Uuid, topic2Uuid, topic3Uuid),
currentAssignmentForC
))
));
GroupSpec groupSpec = new GroupSpecImpl(
@ -368,32 +353,24 @@ public class GeneralUniformAssignmentBuilderTest {
mkMapOfPartitionRacks(3)
));
Map<String, AssignmentMemberSpec> members = new TreeMap<>();
Map<String, MemberSubscriptionSpecImpl> members = new TreeMap<>();
Map<Uuid, Set<Integer>> currentAssignmentForA = new TreeMap<>(
mkAssignment(
mkTopicAssignment(topic1Uuid, 0, 1, 2, 3),
mkTopicAssignment(topic3Uuid, 0, 1)
)
);
members.put(memberA, new AssignmentMemberSpec(
Optional.empty(),
members.put(memberA, new MemberSubscriptionSpecImpl(
Optional.empty(),
mkSet(topic1Uuid, topic3Uuid),
currentAssignmentForA
new Assignment(mkAssignment(
mkTopicAssignment(topic1Uuid, 0, 1, 2, 3),
mkTopicAssignment(topic3Uuid, 0, 1)
))
));
Map<Uuid, Set<Integer>> currentAssignmentForB = new TreeMap<>(
mkAssignment(
mkTopicAssignment(topic2Uuid, 0, 1, 2),
mkTopicAssignment(topic4Uuid, 0, 1, 2)
)
);
members.put(memberB, new AssignmentMemberSpec(
Optional.empty(),
members.put(memberB, new MemberSubscriptionSpecImpl(
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid, topic3Uuid, topic4Uuid),
currentAssignmentForB
new Assignment(mkAssignment(
mkTopicAssignment(topic2Uuid, 0, 1, 2),
mkTopicAssignment(topic4Uuid, 0, 1, 2)
))
));
GroupSpec groupSpec = new GroupSpecImpl(
@ -438,36 +415,31 @@ public class GeneralUniformAssignmentBuilderTest {
mkMapOfPartitionRacks(7)
));
Map<String, AssignmentMemberSpec> members = new HashMap<>();
Map<String, MemberSubscriptionSpecImpl> members = new TreeMap<>();
Map<Uuid, Set<Integer>> currentAssignmentForA = new TreeMap<>(mkAssignment(
mkTopicAssignment(topic1Uuid, 0, 2),
mkTopicAssignment(topic2Uuid, 0)
));
members.put(memberA, new AssignmentMemberSpec(
Optional.empty(),
members.put(memberA, new MemberSubscriptionSpecImpl(
Optional.empty(),
Collections.singleton(topic1Uuid),
currentAssignmentForA
new Assignment(mkAssignment(
mkTopicAssignment(topic1Uuid, 0, 2),
mkTopicAssignment(topic2Uuid, 0)
))
));
Map<Uuid, Set<Integer>> currentAssignmentForB = new TreeMap<>(mkAssignment(
mkTopicAssignment(topic1Uuid, 1),
mkTopicAssignment(topic2Uuid, 1, 2)
));
members.put(memberB, new AssignmentMemberSpec(
Optional.empty(),
members.put(memberB, new MemberSubscriptionSpecImpl(
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
currentAssignmentForB
new Assignment(mkAssignment(
mkTopicAssignment(topic1Uuid, 1),
mkTopicAssignment(topic2Uuid, 1, 2)
))
));
// Add a new member to trigger a re-assignment.
members.put(memberC, new AssignmentMemberSpec(
Optional.empty(),
members.put(memberC, new MemberSubscriptionSpecImpl(
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
Collections.emptyMap()
Assignment.EMPTY
));
GroupSpec groupSpec = new GroupSpecImpl(
@ -519,27 +491,23 @@ public class GeneralUniformAssignmentBuilderTest {
mkMapOfPartitionRacks(3)
));
Map<String, AssignmentMemberSpec> members = new HashMap<>();
Map<String, MemberSubscriptionSpecImpl> members = new TreeMap<>();
Map<Uuid, Set<Integer>> currentAssignmentForA = mkAssignment(
mkTopicAssignment(topic1Uuid, 0, 1, 2),
mkTopicAssignment(topic3Uuid, 0, 1)
);
members.put(memberA, new AssignmentMemberSpec(
Optional.empty(),
members.put(memberA, new MemberSubscriptionSpecImpl(
Optional.empty(),
mkSet(topic1Uuid, topic3Uuid),
currentAssignmentForA
new Assignment(mkAssignment(
mkTopicAssignment(topic1Uuid, 0, 1, 2),
mkTopicAssignment(topic3Uuid, 0, 1)
))
));
Map<Uuid, Set<Integer>> currentAssignmentForB = mkAssignment(
mkTopicAssignment(topic2Uuid, 3, 4, 5, 6)
);
members.put(memberB, new AssignmentMemberSpec(
Optional.empty(),
members.put(memberB, new MemberSubscriptionSpecImpl(
Optional.empty(),
Collections.singleton(topic2Uuid),
currentAssignmentForB
new Assignment(mkAssignment(
mkTopicAssignment(topic2Uuid, 3, 4, 5, 6)
))
));
// Member C was removed
@ -585,28 +553,24 @@ public class GeneralUniformAssignmentBuilderTest {
));
// Initial subscriptions were [T1, T2]
Map<String, AssignmentMemberSpec> members = new HashMap<>();
Map<String, MemberSubscriptionSpecImpl> members = new TreeMap<>();
Map<Uuid, Set<Integer>> currentAssignmentForA = mkAssignment(
mkTopicAssignment(topic1Uuid, 0, 2),
mkTopicAssignment(topic2Uuid, 1, 3)
);
members.put(memberA, new AssignmentMemberSpec(
Optional.empty(),
members.put(memberA, new MemberSubscriptionSpecImpl(
Optional.empty(),
Collections.singleton(topic1Uuid),
currentAssignmentForA
new Assignment(mkAssignment(
mkTopicAssignment(topic1Uuid, 0, 2),
mkTopicAssignment(topic2Uuid, 1, 3)
))
));
Map<Uuid, Set<Integer>> currentAssignmentForB = mkAssignment(
mkTopicAssignment(topic1Uuid, 1),
mkTopicAssignment(topic2Uuid, 0, 2, 4)
);
members.put(memberB, new AssignmentMemberSpec(
Optional.empty(),
members.put(memberB, new MemberSubscriptionSpecImpl(
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
currentAssignmentForB
new Assignment(mkAssignment(
mkTopicAssignment(topic1Uuid, 1),
mkTopicAssignment(topic2Uuid, 0, 2, 4)
))
));
GroupSpec groupSpec = new GroupSpecImpl(

View File

@ -17,23 +17,26 @@
package org.apache.kafka.coordinator.group.assignor;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.consumer.Assignment;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class GroupSpecImplTest {
private Map<String, AssignmentMemberSpec> members;
private static final String TEST_MEMBER = "test-member";
private Map<String, MemberSubscriptionSpecImpl> members;
private SubscriptionType subscriptionType;
private Map<Uuid, Map<Integer, String>> invertedTargetAssignment;
private GroupSpecImpl groupSpec;
@ -42,17 +45,15 @@ public class GroupSpecImplTest {
@BeforeEach
void setUp() {
members = new HashMap<>();
subscriptionType = SubscriptionType.HOMOGENEOUS;
invertedTargetAssignment = new HashMap<>();
topicId = Uuid.randomUuid();
members.put("test-member", new AssignmentMemberSpec(
members.put(TEST_MEMBER, new MemberSubscriptionSpecImpl(
Optional.empty(),
Optional.empty(),
new HashSet<>(Collections.singletonList(topicId)),
Collections.emptyMap())
);
mkSet(topicId),
Assignment.EMPTY
));
groupSpec = new GroupSpecImpl(
members,
@ -62,8 +63,8 @@ public class GroupSpecImplTest {
}
@Test
void testMembers() {
assertEquals(members, groupSpec.members());
void testMemberIds() {
assertEquals(members.keySet(), groupSpec.memberIds());
}
@Test
@ -81,4 +82,27 @@ public class GroupSpecImplTest {
assertFalse(groupSpec.isPartitionAssigned(topicId, 2));
assertFalse(groupSpec.isPartitionAssigned(Uuid.randomUuid(), 2));
}
@Test
void testMemberSubscription() {
assertEquals(members.get(TEST_MEMBER), groupSpec.memberSubscription(TEST_MEMBER));
assertThrows(IllegalArgumentException.class, () -> groupSpec.memberSubscription("unknown-member"));
}
@Test
void testMemberAssignment() {
Map<Uuid, Set<Integer>> topicPartitions = new HashMap<>();
topicPartitions.put(
topicId,
mkSet(0, 1)
);
members.put(TEST_MEMBER, new MemberSubscriptionSpecImpl(
Optional.empty(),
mkSet(topicId),
new Assignment(topicPartitions)
));
assertEquals(topicPartitions, groupSpec.memberAssignment(TEST_MEMBER));
assertEquals(Collections.emptyMap(), groupSpec.memberAssignment("unknown-member"));
}
}

View File

@ -17,6 +17,7 @@
package org.apache.kafka.coordinator.group.assignor;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.consumer.Assignment;
import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
import org.junit.jupiter.api.Test;
@ -69,13 +70,12 @@ public class OptimizedUniformAssignmentBuilderTest {
)
);
Map<String, AssignmentMemberSpec> members = Collections.singletonMap(
Map<String, MemberSubscriptionSpecImpl> members = Collections.singletonMap(
memberA,
new AssignmentMemberSpec(
Optional.empty(),
new MemberSubscriptionSpecImpl(
Optional.empty(),
Collections.emptySet(),
Collections.emptyMap()
Assignment.EMPTY
)
);
@ -107,13 +107,12 @@ public class OptimizedUniformAssignmentBuilderTest {
)
);
Map<String, AssignmentMemberSpec> members = Collections.singletonMap(
Map<String, MemberSubscriptionSpecImpl> members = Collections.singletonMap(
memberA,
new AssignmentMemberSpec(
Optional.empty(),
new MemberSubscriptionSpecImpl(
Optional.empty(),
Collections.singleton(topic2Uuid),
Collections.emptyMap()
Assignment.EMPTY
)
);
@ -143,18 +142,18 @@ public class OptimizedUniformAssignmentBuilderTest {
mkMapOfPartitionRacks(2)
));
Map<String, AssignmentMemberSpec> members = new TreeMap<>();
members.put(memberA, new AssignmentMemberSpec(
Optional.empty(),
Map<String, MemberSubscriptionSpecImpl> members = new TreeMap<>();
members.put(memberA, new MemberSubscriptionSpecImpl(
Optional.empty(),
mkSet(topic1Uuid, topic3Uuid),
Collections.emptyMap()
Assignment.EMPTY
));
members.put(memberB, new AssignmentMemberSpec(
Optional.empty(),
members.put(memberB, new MemberSubscriptionSpecImpl(
Optional.empty(),
mkSet(topic1Uuid, topic3Uuid),
Collections.emptyMap()
Assignment.EMPTY
));
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
@ -192,24 +191,24 @@ public class OptimizedUniformAssignmentBuilderTest {
mkMapOfPartitionRacks(2)
));
Map<String, AssignmentMemberSpec> members = new TreeMap<>();
members.put(memberA, new AssignmentMemberSpec(
Optional.empty(),
Map<String, MemberSubscriptionSpecImpl> members = new TreeMap<>();
members.put(memberA, new MemberSubscriptionSpecImpl(
Optional.empty(),
Collections.singleton(topic3Uuid),
Collections.emptyMap()
Assignment.EMPTY
));
members.put(memberB, new AssignmentMemberSpec(
Optional.empty(),
members.put(memberB, new MemberSubscriptionSpecImpl(
Optional.empty(),
Collections.singleton(topic3Uuid),
Collections.emptyMap()
Assignment.EMPTY
));
members.put(memberC, new AssignmentMemberSpec(
Optional.empty(),
members.put(memberC, new MemberSubscriptionSpecImpl(
Optional.empty(),
Collections.singleton(topic3Uuid),
Collections.emptyMap()
Assignment.EMPTY
));
// Topic 3 has 2 partitions but three members subscribed to it - one of them should not get an assignment.
@ -253,13 +252,12 @@ public class OptimizedUniformAssignmentBuilderTest {
));
}
Map<String, AssignmentMemberSpec> members = new TreeMap<>();
Map<String, MemberSubscriptionSpecImpl> members = new TreeMap<>();
for (int i = 1; i < 50; i++) {
members.put("member" + i, new AssignmentMemberSpec(
Optional.empty(),
members.put("member" + i, new MemberSubscriptionSpecImpl(
Optional.empty(),
topicMetadata.keySet(),
Collections.emptyMap()
Assignment.EMPTY
));
}
@ -294,26 +292,24 @@ public class OptimizedUniformAssignmentBuilderTest {
mkMapOfPartitionRacks(3)
));
Map<String, AssignmentMemberSpec> members = new TreeMap<>();
Map<String, MemberSubscriptionSpecImpl> members = new TreeMap<>();
members.put(memberA, new AssignmentMemberSpec(
Optional.empty(),
members.put(memberA, new MemberSubscriptionSpecImpl(
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
mkOrderedAssignment(
new Assignment(mkOrderedAssignment(
mkTopicAssignment(topic1Uuid, 0, 1),
mkTopicAssignment(topic2Uuid, 0, 1)
)
))
));
members.put(memberB, new AssignmentMemberSpec(
Optional.empty(),
members.put(memberB, new MemberSubscriptionSpecImpl(
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
mkOrderedAssignment(
new Assignment(mkOrderedAssignment(
mkTopicAssignment(topic1Uuid, 2),
mkTopicAssignment(topic2Uuid, 2)
)
))
));
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
@ -359,26 +355,24 @@ public class OptimizedUniformAssignmentBuilderTest {
mkMapOfPartitionRacks(5)
));
Map<String, AssignmentMemberSpec> members = new TreeMap<>();
Map<String, MemberSubscriptionSpecImpl> members = new TreeMap<>();
members.put(memberA, new AssignmentMemberSpec(
Optional.empty(),
members.put(memberA, new MemberSubscriptionSpecImpl(
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
mkOrderedAssignment(
new Assignment(mkOrderedAssignment(
mkTopicAssignment(topic1Uuid, 0, 2),
mkTopicAssignment(topic2Uuid, 0)
)
))
));
members.put(memberB, new AssignmentMemberSpec(
Optional.empty(),
members.put(memberB, new MemberSubscriptionSpecImpl(
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
mkOrderedAssignment(
new Assignment(mkOrderedAssignment(
mkTopicAssignment(topic1Uuid, 1),
mkTopicAssignment(topic2Uuid, 1, 2)
)
))
));
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
@ -423,34 +417,31 @@ public class OptimizedUniformAssignmentBuilderTest {
mkMapOfPartitionRacks(3)
));
Map<String, AssignmentMemberSpec> members = new HashMap<>();
Map<String, MemberSubscriptionSpecImpl> members = new TreeMap<>();
members.put(memberA, new AssignmentMemberSpec(
Optional.empty(),
members.put(memberA, new MemberSubscriptionSpecImpl(
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
mkOrderedAssignment(
new Assignment(mkOrderedAssignment(
mkTopicAssignment(topic1Uuid, 0, 2),
mkTopicAssignment(topic2Uuid, 0)
)
))
));
members.put(memberB, new AssignmentMemberSpec(
Optional.empty(),
members.put(memberB, new MemberSubscriptionSpecImpl(
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
mkOrderedAssignment(
new Assignment(mkOrderedAssignment(
mkTopicAssignment(topic1Uuid, 1),
mkTopicAssignment(topic2Uuid, 1, 2)
)
))
));
// Add a new member to trigger a re-assignment.
members.put(memberC, new AssignmentMemberSpec(
Optional.empty(),
members.put(memberC, new MemberSubscriptionSpecImpl(
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
Collections.emptyMap()
Assignment.EMPTY
));
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
@ -497,26 +488,24 @@ public class OptimizedUniformAssignmentBuilderTest {
mkMapOfPartitionRacks(3)
));
Map<String, AssignmentMemberSpec> members = new HashMap<>();
Map<String, MemberSubscriptionSpecImpl> members = new TreeMap<>();
members.put(memberA, new AssignmentMemberSpec(
Optional.empty(),
members.put(memberA, new MemberSubscriptionSpecImpl(
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
mkAssignment(
new Assignment(mkAssignment(
mkTopicAssignment(topic1Uuid, 0),
mkTopicAssignment(topic2Uuid, 0)
)
))
));
members.put(memberB, new AssignmentMemberSpec(
Optional.empty(),
members.put(memberB, new MemberSubscriptionSpecImpl(
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
mkAssignment(
new Assignment(mkAssignment(
mkTopicAssignment(topic1Uuid, 1),
mkTopicAssignment(topic2Uuid, 1)
)
))
));
// Member C was removed
@ -564,26 +553,24 @@ public class OptimizedUniformAssignmentBuilderTest {
));
// Initial subscriptions were [T1, T2]
Map<String, AssignmentMemberSpec> members = new HashMap<>();
Map<String, MemberSubscriptionSpecImpl> members = new TreeMap<>();
members.put(memberA, new AssignmentMemberSpec(
Optional.empty(),
members.put(memberA, new MemberSubscriptionSpecImpl(
Optional.empty(),
Collections.singleton(topic2Uuid),
mkAssignment(
new Assignment(mkAssignment(
mkTopicAssignment(topic1Uuid, 0),
mkTopicAssignment(topic2Uuid, 0)
)
))
));
members.put(memberB, new AssignmentMemberSpec(
Optional.empty(),
members.put(memberB, new MemberSubscriptionSpecImpl(
Optional.empty(),
Collections.singleton(topic2Uuid),
mkAssignment(
new Assignment(mkAssignment(
mkTopicAssignment(topic1Uuid, 1),
mkTopicAssignment(topic2Uuid, 1)
)
))
));
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
@ -616,14 +603,14 @@ public class OptimizedUniformAssignmentBuilderTest {
* - each member is subscribed to topics of all partitions assigned to it, and
* - each partition is assigned to no more than one member.
* Balance requirements:
* - the assignment is fully balanced (the numbers of topic partitions assigned to members differ by at most one), or
* - the assignment is fully balanced (the numbers of topic partitions assigned to memberSubscriptionSpec differ by at most one), or
* - there is no topic partition that can be moved from one member to another with 2+ fewer topic partitions.
*
* @param members Members data structure from the assignment Spec.
* @param memberSubscriptionSpec Members subscription metadata structure from the group Spec.
* @param computedGroupAssignment Assignment computed by the uniform assignor.
*/
private void checkValidityAndBalance(
Map<String, AssignmentMemberSpec> members,
Map<String, MemberSubscriptionSpecImpl> memberSubscriptionSpec,
GroupAssignment computedGroupAssignment
) {
List<String> membersList = new ArrayList<>(computedGroupAssignment.members().keySet());
@ -643,7 +630,7 @@ public class OptimizedUniformAssignmentBuilderTest {
// Each member is subscribed to topics of all the partitions assigned to it.
computedAssignmentForMember.keySet().forEach(topicId -> {
// Check if the topic exists in the subscription.
assertTrue(members.get(memberId).subscribedTopicIds().contains(topicId),
assertTrue(memberSubscriptionSpec.get(memberId).subscribedTopicIds().contains(topicId),
"Error: Partitions for topic " + topicId + " are assigned to member " + memberId +
" but it is not part of the members subscription ");
});

View File

@ -17,7 +17,7 @@
package org.apache.kafka.coordinator.group.assignor;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.consumer.Assignment;
import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
import org.junit.jupiter.api.Test;
@ -47,9 +47,9 @@ public class RangeAssignorTest {
private final String topic2Name = "topic2";
private final Uuid topic3Uuid = Uuid.randomUuid();
private final String topic3Name = "topic3";
private final String consumerA = "A";
private final String consumerB = "B";
private final String consumerC = "C";
private final String memberA = "A";
private final String memberB = "B";
private final String memberC = "C";
@Test
public void testOneConsumerNoTopic() {
@ -60,18 +60,17 @@ public class RangeAssignorTest {
topic1Uuid,
topic1Name,
3,
createPartitionRacks(3)
Collections.emptyMap()
)
)
);
Map<String, AssignmentMemberSpec> members = Collections.singletonMap(
consumerA,
new AssignmentMemberSpec(
Optional.empty(),
Map<String, MemberSubscriptionSpecImpl> members = Collections.singletonMap(
memberA,
new MemberSubscriptionSpecImpl(
Optional.empty(),
Collections.emptySet(),
Collections.emptyMap()
Assignment.EMPTY
)
);
@ -98,18 +97,17 @@ public class RangeAssignorTest {
topic1Uuid,
topic1Name,
3,
createPartitionRacks(3)
Collections.emptyMap()
)
)
);
Map<String, AssignmentMemberSpec> members = Collections.singletonMap(
consumerA,
new AssignmentMemberSpec(
Map<String, MemberSubscriptionSpecImpl> members = Collections.singletonMap(
memberA,
new MemberSubscriptionSpecImpl(
Optional.empty(),
Optional.empty(),
Collections.singleton(topic2Uuid),
Collections.emptyMap()
mkSet(topic2Uuid),
Assignment.EMPTY
)
);
@ -130,35 +128,33 @@ public class RangeAssignorTest {
topic1Uuid,
topic1Name,
3,
createPartitionRacks(3)
Collections.emptyMap()
));
topicMetadata.put(topic3Uuid, new TopicMetadata(
topic3Uuid,
topic3Name,
2,
createPartitionRacks(2)
));
Map<String, AssignmentMemberSpec> members = new TreeMap<>();
members.put(consumerA, new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
mkSet(topic1Uuid, topic3Uuid),
Collections.emptyMap()
));
members.put(consumerB, new AssignmentMemberSpec(
Optional.empty(),
Map<String, MemberSubscriptionSpecImpl> members = new TreeMap<>();
members.put(memberA, new MemberSubscriptionSpecImpl(
Optional.empty(),
mkSet(topic1Uuid, topic3Uuid),
Collections.emptyMap()
Assignment.EMPTY
));
members.put(memberB, new MemberSubscriptionSpecImpl(
Optional.empty(),
mkSet(topic1Uuid, topic3Uuid),
Assignment.EMPTY
));
GroupSpec groupSpec = new GroupSpecImpl(
members,
HOMOGENEOUS,
Collections.emptyMap()
invertedTargetAssignment(members)
);
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
@ -168,11 +164,11 @@ public class RangeAssignorTest {
);
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(consumerA, mkAssignment(
expectedAssignment.put(memberA, mkAssignment(
mkTopicAssignment(topic1Uuid, 0, 1),
mkTopicAssignment(topic3Uuid, 0)
));
expectedAssignment.put(consumerB, mkAssignment(
expectedAssignment.put(memberB, mkAssignment(
mkTopicAssignment(topic1Uuid, 2),
mkTopicAssignment(topic3Uuid, 1)
));
@ -187,48 +183,45 @@ public class RangeAssignorTest {
topic1Uuid,
topic1Name,
3,
createPartitionRacks(3)
Collections.emptyMap()
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
3,
createPartitionRacks(3)
Collections.emptyMap()
));
topicMetadata.put(topic3Uuid, new TopicMetadata(
topic3Uuid,
topic3Name,
2,
createPartitionRacks(2)
Collections.emptyMap()
));
Map<String, AssignmentMemberSpec> members = new TreeMap<>();
Map<String, MemberSubscriptionSpecImpl> members = new TreeMap<>();
members.put(consumerA, new AssignmentMemberSpec(
Optional.empty(),
members.put(memberA, new MemberSubscriptionSpecImpl(
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
Collections.emptyMap()
Assignment.EMPTY
));
members.put(consumerB, new AssignmentMemberSpec(
members.put(memberB, new MemberSubscriptionSpecImpl(
Optional.empty(),
Optional.empty(),
Collections.singleton(topic3Uuid),
Collections.emptyMap()
mkSet(topic3Uuid),
Assignment.EMPTY
));
members.put(consumerC, new AssignmentMemberSpec(
Optional.empty(),
members.put(memberC, new MemberSubscriptionSpecImpl(
Optional.empty(),
mkSet(topic2Uuid, topic3Uuid),
Collections.emptyMap()
Assignment.EMPTY
));
GroupSpec groupSpec = new GroupSpecImpl(
members,
HETEROGENEOUS,
Collections.emptyMap()
invertedTargetAssignment(members)
);
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
@ -238,14 +231,14 @@ public class RangeAssignorTest {
);
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(consumerA, mkAssignment(
expectedAssignment.put(memberA, mkAssignment(
mkTopicAssignment(topic1Uuid, 0, 1, 2),
mkTopicAssignment(topic2Uuid, 0, 1)
));
expectedAssignment.put(consumerB, mkAssignment(
expectedAssignment.put(memberB, mkAssignment(
mkTopicAssignment(topic3Uuid, 0)
));
expectedAssignment.put(consumerC, mkAssignment(
expectedAssignment.put(memberC, mkAssignment(
mkTopicAssignment(topic2Uuid, 2),
mkTopicAssignment(topic3Uuid, 1)
));
@ -260,42 +253,39 @@ public class RangeAssignorTest {
topic1Uuid,
topic1Name,
3,
createPartitionRacks(3)
Collections.emptyMap()
));
topicMetadata.put(topic3Uuid, new TopicMetadata(
topic3Uuid,
topic3Name,
2,
createPartitionRacks(2)
));
Map<String, AssignmentMemberSpec> members = new TreeMap<>();
members.put(consumerA, new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
mkSet(topic1Uuid, topic3Uuid),
Collections.emptyMap()
));
members.put(consumerB, new AssignmentMemberSpec(
Optional.empty(),
Map<String, MemberSubscriptionSpecImpl> members = new TreeMap<>();
members.put(memberA, new MemberSubscriptionSpecImpl(
Optional.empty(),
mkSet(topic1Uuid, topic3Uuid),
Collections.emptyMap()
Assignment.EMPTY
));
members.put(consumerC, new AssignmentMemberSpec(
Optional.empty(),
members.put(memberB, new MemberSubscriptionSpecImpl(
Optional.empty(),
mkSet(topic1Uuid, topic3Uuid),
Collections.emptyMap()
Assignment.EMPTY
));
members.put(memberC, new MemberSubscriptionSpecImpl(
Optional.empty(),
mkSet(topic1Uuid, topic3Uuid),
Assignment.EMPTY
));
GroupSpec groupSpec = new GroupSpecImpl(
members,
HOMOGENEOUS,
Collections.emptyMap()
invertedTargetAssignment(members)
);
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
@ -306,15 +296,15 @@ public class RangeAssignorTest {
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
// Topic 3 has 2 partitions but three consumers subscribed to it - one of them will not get a partition.
expectedAssignment.put(consumerA, mkAssignment(
expectedAssignment.put(memberA, mkAssignment(
mkTopicAssignment(topic1Uuid, 0),
mkTopicAssignment(topic3Uuid, 0)
));
expectedAssignment.put(consumerB, mkAssignment(
expectedAssignment.put(memberB, mkAssignment(
mkTopicAssignment(topic1Uuid, 1),
mkTopicAssignment(topic3Uuid, 1)
));
expectedAssignment.put(consumerC, mkAssignment(
expectedAssignment.put(memberC, mkAssignment(
mkTopicAssignment(topic1Uuid, 2)
));
@ -328,45 +318,40 @@ public class RangeAssignorTest {
topic1Uuid,
topic1Name,
2,
createPartitionRacks(2)
Collections.emptyMap()
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
2,
createPartitionRacks(2)
Collections.emptyMap()
));
Map<String, AssignmentMemberSpec> members = new TreeMap<>();
Map<String, MemberSubscriptionSpecImpl> members = new TreeMap<>();
Map<Uuid, Set<Integer>> currentAssignmentForA = mkAssignment(
members.put(memberA, new MemberSubscriptionSpecImpl(
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
new Assignment(mkAssignment(
mkTopicAssignment(topic1Uuid, 0),
mkTopicAssignment(topic2Uuid, 0)
);
members.put(consumerA, new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
currentAssignmentForA
))
));
Map<Uuid, Set<Integer>> currentAssignmentForB = mkAssignment(
mkTopicAssignment(topic1Uuid, 1),
mkTopicAssignment(topic2Uuid, 1)
);
members.put(consumerB, new AssignmentMemberSpec(
Optional.empty(),
members.put(memberB, new MemberSubscriptionSpecImpl(
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
currentAssignmentForB
new Assignment(mkAssignment(
mkTopicAssignment(topic1Uuid, 1),
mkTopicAssignment(topic2Uuid, 1)
))
));
// Add a new consumer to trigger a re-assignment
members.put(consumerC, new AssignmentMemberSpec(
Optional.empty(),
members.put(memberC, new MemberSubscriptionSpecImpl(
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
Collections.emptyMap()
Assignment.EMPTY
));
GroupSpec groupSpec = new GroupSpecImpl(
@ -382,17 +367,17 @@ public class RangeAssignorTest {
);
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(consumerA, mkAssignment(
expectedAssignment.put(memberA, mkAssignment(
mkTopicAssignment(topic1Uuid, 0),
mkTopicAssignment(topic2Uuid, 0)
));
expectedAssignment.put(consumerB, mkAssignment(
expectedAssignment.put(memberB, mkAssignment(
mkTopicAssignment(topic1Uuid, 1),
mkTopicAssignment(topic2Uuid, 1)
));
// Consumer C shouldn't get any assignment, due to stickiness A, B retain their assignments
assertNull(computedAssignment.members().get(consumerC));
assertNull(computedAssignment.members().get(memberC));
assertAssignment(expectedAssignment, computedAssignment);
}
@ -404,37 +389,33 @@ public class RangeAssignorTest {
topic1Uuid,
topic1Name,
4,
createPartitionRacks(4)
Collections.emptyMap()
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
4,
createPartitionRacks(4)
Collections.emptyMap()
));
Map<String, AssignmentMemberSpec> members = new TreeMap<>();
Map<String, MemberSubscriptionSpecImpl> members = new TreeMap<>();
Map<Uuid, Set<Integer>> currentAssignmentForA = mkAssignment(
members.put(memberA, new MemberSubscriptionSpecImpl(
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
new Assignment(mkAssignment(
mkTopicAssignment(topic1Uuid, 0, 1),
mkTopicAssignment(topic2Uuid, 0, 1)
);
members.put(consumerA, new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
currentAssignmentForA
))
));
Map<Uuid, Set<Integer>> currentAssignmentForB = mkAssignment(
mkTopicAssignment(topic1Uuid, 2),
mkTopicAssignment(topic2Uuid, 2)
);
members.put(consumerB, new AssignmentMemberSpec(
Optional.empty(),
members.put(memberB, new MemberSubscriptionSpecImpl(
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
currentAssignmentForB
new Assignment(mkAssignment(
mkTopicAssignment(topic1Uuid, 2),
mkTopicAssignment(topic2Uuid, 2)
))
));
GroupSpec groupSpec = new GroupSpecImpl(
@ -450,11 +431,11 @@ public class RangeAssignorTest {
);
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(consumerA, mkAssignment(
expectedAssignment.put(memberA, mkAssignment(
mkTopicAssignment(topic1Uuid, 0, 1),
mkTopicAssignment(topic2Uuid, 0, 1)
));
expectedAssignment.put(consumerB, mkAssignment(
expectedAssignment.put(memberB, mkAssignment(
mkTopicAssignment(topic1Uuid, 2, 3),
mkTopicAssignment(topic2Uuid, 2, 3)
));
@ -469,45 +450,40 @@ public class RangeAssignorTest {
topic1Uuid,
topic1Name,
3,
createPartitionRacks(3)
Collections.emptyMap()
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
3,
createPartitionRacks(3)
Collections.emptyMap()
));
Map<String, AssignmentMemberSpec> members = new TreeMap<>();
Map<String, MemberSubscriptionSpecImpl> members = new TreeMap<>();
Map<Uuid, Set<Integer>> currentAssignmentForA = mkAssignment(
members.put(memberA, new MemberSubscriptionSpecImpl(
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
new Assignment(mkAssignment(
mkTopicAssignment(topic1Uuid, 0, 1),
mkTopicAssignment(topic2Uuid, 0, 1)
);
members.put(consumerA, new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
currentAssignmentForA
))
));
Map<Uuid, Set<Integer>> currentAssignmentForB = mkAssignment(
mkTopicAssignment(topic1Uuid, 2),
mkTopicAssignment(topic2Uuid, 2)
);
members.put(consumerB, new AssignmentMemberSpec(
Optional.empty(),
members.put(memberB, new MemberSubscriptionSpecImpl(
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
currentAssignmentForB
new Assignment(mkAssignment(
mkTopicAssignment(topic1Uuid, 2),
mkTopicAssignment(topic2Uuid, 2)
))
));
// Add a new consumer to trigger a re-assignment
members.put(consumerC, new AssignmentMemberSpec(
Optional.empty(),
members.put(memberC, new MemberSubscriptionSpecImpl(
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
Collections.emptyMap()
Assignment.EMPTY
));
GroupSpec groupSpec = new GroupSpecImpl(
@ -523,15 +499,15 @@ public class RangeAssignorTest {
);
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(consumerA, mkAssignment(
expectedAssignment.put(memberA, mkAssignment(
mkTopicAssignment(topic1Uuid, 0),
mkTopicAssignment(topic2Uuid, 0)
));
expectedAssignment.put(consumerB, mkAssignment(
expectedAssignment.put(memberB, mkAssignment(
mkTopicAssignment(topic1Uuid, 2),
mkTopicAssignment(topic2Uuid, 2)
));
expectedAssignment.put(consumerC, mkAssignment(
expectedAssignment.put(memberC, mkAssignment(
mkTopicAssignment(topic1Uuid, 1),
mkTopicAssignment(topic2Uuid, 1)
));
@ -547,45 +523,40 @@ public class RangeAssignorTest {
topic1Uuid,
topic1Name,
4,
createPartitionRacks(4)
Collections.emptyMap()
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
3,
createPartitionRacks(3)
Collections.emptyMap()
));
Map<String, AssignmentMemberSpec> members = new TreeMap<>();
Map<String, MemberSubscriptionSpecImpl> members = new TreeMap<>();
Map<Uuid, Set<Integer>> currentAssignmentForA = mkAssignment(
members.put(memberA, new MemberSubscriptionSpecImpl(
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
new Assignment(mkAssignment(
mkTopicAssignment(topic1Uuid, 0, 1),
mkTopicAssignment(topic2Uuid, 0, 1)
);
members.put(consumerA, new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
currentAssignmentForA
))
));
Map<Uuid, Set<Integer>> currentAssignmentForB = mkAssignment(
mkTopicAssignment(topic1Uuid, 2),
mkTopicAssignment(topic2Uuid, 2)
);
members.put(consumerB, new AssignmentMemberSpec(
Optional.empty(),
members.put(memberB, new MemberSubscriptionSpecImpl(
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
currentAssignmentForB
new Assignment(mkAssignment(
mkTopicAssignment(topic1Uuid, 2),
mkTopicAssignment(topic2Uuid, 2)
))
));
// Add a new consumer to trigger a re-assignment
members.put(consumerC, new AssignmentMemberSpec(
members.put(memberC, new MemberSubscriptionSpecImpl(
Optional.empty(),
Optional.empty(),
Collections.singleton(topic1Uuid),
Collections.emptyMap()
mkSet(topic1Uuid),
Assignment.EMPTY
));
GroupSpec groupSpec = new GroupSpecImpl(
@ -601,15 +572,15 @@ public class RangeAssignorTest {
);
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(consumerA, mkAssignment(
expectedAssignment.put(memberA, mkAssignment(
mkTopicAssignment(topic1Uuid, 0, 1),
mkTopicAssignment(topic2Uuid, 0, 1)
));
expectedAssignment.put(consumerB, mkAssignment(
expectedAssignment.put(memberB, mkAssignment(
mkTopicAssignment(topic1Uuid, 2),
mkTopicAssignment(topic2Uuid, 2)
));
expectedAssignment.put(consumerC, mkAssignment(
expectedAssignment.put(memberC, mkAssignment(
mkTopicAssignment(topic1Uuid, 3)
));
@ -623,27 +594,26 @@ public class RangeAssignorTest {
topic1Uuid,
topic1Name,
3,
createPartitionRacks(3)
Collections.emptyMap()
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
3,
createPartitionRacks(3)
Collections.emptyMap()
));
Map<String, AssignmentMemberSpec> members = new TreeMap<>();
Map<String, MemberSubscriptionSpecImpl> members = new TreeMap<>();
// Consumer A was removed
Map<Uuid, Set<Integer>> currentAssignmentForB = mkAssignment(
mkTopicAssignment(topic1Uuid, 2),
mkTopicAssignment(topic2Uuid, 2)
);
members.put(consumerB, new AssignmentMemberSpec(
Optional.empty(),
members.put(memberB, new MemberSubscriptionSpecImpl(
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
currentAssignmentForB
new Assignment(mkAssignment(
mkTopicAssignment(topic1Uuid, 2),
mkTopicAssignment(topic2Uuid, 2)
))
));
GroupSpec groupSpec = new GroupSpecImpl(
@ -659,7 +629,7 @@ public class RangeAssignorTest {
);
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(consumerB, mkAssignment(
expectedAssignment.put(memberB, mkAssignment(
mkTopicAssignment(topic1Uuid, 0, 1, 2),
mkTopicAssignment(topic2Uuid, 0, 1, 2)
));
@ -674,55 +644,49 @@ public class RangeAssignorTest {
topic1Uuid,
topic1Name,
3,
createPartitionRacks(3)
Collections.emptyMap()
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
3,
createPartitionRacks(3)
Collections.emptyMap()
));
topicMetadata.put(topic3Uuid, new TopicMetadata(
topic3Uuid,
topic3Name,
2,
createPartitionRacks(2)
Collections.emptyMap()
));
// Let initial subscriptions be A -> T1, T2 // B -> T2 // C -> T2, T3
// Change the subscriptions to A -> T1 // B -> T1, T2, T3 // C -> T2
Map<String, AssignmentMemberSpec> members = new TreeMap<>();
Map<String, MemberSubscriptionSpecImpl> members = new TreeMap<>();
Map<Uuid, Set<Integer>> currentAssignmentForA = mkAssignment(
members.put(memberA, new MemberSubscriptionSpecImpl(
Optional.empty(),
mkSet(topic1Uuid),
new Assignment(mkAssignment(
mkTopicAssignment(topic1Uuid, 0, 1, 2),
mkTopicAssignment(topic2Uuid, 0)
);
members.put(consumerA, new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
Collections.singleton(topic1Uuid),
currentAssignmentForA
))
));
Map<Uuid, Set<Integer>> currentAssignmentForB = mkAssignment(
mkTopicAssignment(topic2Uuid, 1)
);
members.put(consumerB, new AssignmentMemberSpec(
Optional.empty(),
members.put(memberB, new MemberSubscriptionSpecImpl(
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid, topic3Uuid),
currentAssignmentForB
new Assignment(mkAssignment(
mkTopicAssignment(topic2Uuid, 1)
))
));
Map<Uuid, Set<Integer>> currentAssignmentForC = mkAssignment(
members.put(memberC, new MemberSubscriptionSpecImpl(
Optional.empty(),
mkSet(topic2Uuid),
new Assignment(mkAssignment(
mkTopicAssignment(topic2Uuid, 2),
mkTopicAssignment(topic3Uuid, 0, 1)
);
members.put(consumerC, new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
Collections.singleton(topic2Uuid),
currentAssignmentForC
))
));
GroupSpec groupSpec = new GroupSpecImpl(
@ -738,15 +702,15 @@ public class RangeAssignorTest {
);
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(consumerA, mkAssignment(
expectedAssignment.put(memberA, mkAssignment(
mkTopicAssignment(topic1Uuid, 0, 1)
));
expectedAssignment.put(consumerB, mkAssignment(
expectedAssignment.put(memberB, mkAssignment(
mkTopicAssignment(topic1Uuid, 2),
mkTopicAssignment(topic2Uuid, 0, 1),
mkTopicAssignment(topic3Uuid, 0, 1)
));
expectedAssignment.put(consumerC, mkAssignment(
expectedAssignment.put(memberC, mkAssignment(
mkTopicAssignment(topic2Uuid, 2)
));
@ -763,14 +727,4 @@ public class RangeAssignorTest {
assertEquals(expectedAssignment.get(memberId), computedAssignmentForMember);
}
}
// When rack awareness is enabled for this assignor, rack information can be updated in this method.
private static Map<Integer, Set<String>> createPartitionRacks(int numPartitions) {
Map<Integer, Set<String>> partitionRacks = new HashMap<>(numPartitions);
Set<String> emptySet = Collections.emptySet();
for (int i = 0; i < numPartitions; i++) {
partitionRacks.put(i, emptySet);
}
return partitionRacks;
}
}

View File

@ -19,8 +19,9 @@ package org.apache.kafka.coordinator.group.consumer;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.AssignmentTestUtil;
import org.apache.kafka.coordinator.group.MetadataImageBuilder;
import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
import org.apache.kafka.coordinator.group.assignor.GroupSpecImpl;
import org.apache.kafka.coordinator.group.assignor.MemberSubscriptionSpec;
import org.apache.kafka.coordinator.group.assignor.MemberSubscriptionSpecImpl;
import org.apache.kafka.coordinator.group.assignor.SubscriptionType;
import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
@ -44,7 +45,7 @@ import static org.apache.kafka.coordinator.group.CoordinatorRecordHelpers.newTar
import static org.apache.kafka.coordinator.group.CoordinatorRecordHelpers.newTargetAssignmentRecord;
import static org.apache.kafka.coordinator.group.CoordinatorRecordHelpersTest.mkMapOfPartitionRacks;
import static org.apache.kafka.coordinator.group.assignor.SubscriptionType.HOMOGENEOUS;
import static org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder.createAssignmentMemberSpec;
import static org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder.createMemberSubscriptionSpecImpl;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
@ -164,22 +165,22 @@ public class TargetAssignmentBuilderTest {
public TargetAssignmentBuilder.TargetAssignmentResult build() {
TopicsImage topicsImage = topicsImageBuilder.build().topics();
// Prepare expected member specs.
Map<String, AssignmentMemberSpec> memberSpecs = new HashMap<>();
Map<String, MemberSubscriptionSpecImpl> memberSubscriptions = new HashMap<>();
// All the existing members are prepared.
members.forEach((memberId, member) ->
memberSpecs.put(memberId, createAssignmentMemberSpec(
memberSubscriptions.put(memberId, createMemberSubscriptionSpecImpl(
member,
targetAssignment.getOrDefault(memberId, Assignment.EMPTY),
topicsImage
)
));
))
);
// All the updated are added and all the deleted
// members are removed.
updatedMembers.forEach((memberId, updatedMemberOrNull) -> {
if (updatedMemberOrNull == null) {
memberSpecs.remove(memberId);
memberSubscriptions.remove(memberId);
} else {
Assignment assignment = targetAssignment.getOrDefault(memberId, Assignment.EMPTY);
@ -191,7 +192,7 @@ public class TargetAssignmentBuilderTest {
}
}
memberSpecs.put(memberId, createAssignmentMemberSpec(
memberSubscriptions.put(memberId, createMemberSubscriptionSpecImpl(
updatedMemberOrNull,
assignment,
topicsImage
@ -209,10 +210,15 @@ public class TargetAssignmentBuilderTest {
SubscriptionType subscriptionType = HOMOGENEOUS;
// Prepare the member assignments per topic partition.
Map<Uuid, Map<Integer, String>> invertedTargetAssignment = AssignmentTestUtil.invertedTargetAssignment(memberSpecs);
Map<Uuid, Map<Integer, String>> invertedTargetAssignment = AssignmentTestUtil
.invertedTargetAssignment(memberSubscriptions);
// Prepare the expected assignment spec.
GroupSpecImpl groupSpec = new GroupSpecImpl(memberSpecs, subscriptionType, invertedTargetAssignment);
GroupSpecImpl groupSpec = new GroupSpecImpl(
memberSubscriptions,
subscriptionType,
invertedTargetAssignment
);
// We use `any` here to always return an assignment but use `verify` later on
// to ensure that the input was correct.
@ -251,7 +257,7 @@ public class TargetAssignmentBuilderTest {
}
@Test
public void testCreateAssignmentMemberSpec() {
public void testCreateMemberSubscriptionSpecImpl() {
Uuid fooTopicId = Uuid.randomUuid();
Uuid barTopicId = Uuid.randomUuid();
TopicsImage topicsImage = new MetadataImageBuilder()
@ -271,18 +277,17 @@ public class TargetAssignmentBuilderTest {
mkTopicAssignment(barTopicId, 1, 2, 3)
));
AssignmentMemberSpec assignmentMemberSpec = createAssignmentMemberSpec(
MemberSubscriptionSpec subscriptionSpec = createMemberSubscriptionSpecImpl(
member,
assignment,
topicsImage
);
assertEquals(new AssignmentMemberSpec(
Optional.of("instanceId"),
assertEquals(new MemberSubscriptionSpecImpl(
Optional.of("rackId"),
new TopicIds(mkSet("bar", "foo", "zar"), topicsImage),
assignment.partitions()
), assignmentMemberSpec);
assignment
), subscriptionSpec);
}
@Test

View File

@ -17,14 +17,15 @@
package org.apache.kafka.jmh.assignor;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
import org.apache.kafka.coordinator.group.assignor.GroupSpecImpl;
import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.assignor.MemberSubscriptionSpecImpl;
import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
import org.apache.kafka.coordinator.group.assignor.SubscriptionType;
import org.apache.kafka.coordinator.group.consumer.Assignment;
import org.apache.kafka.coordinator.group.consumer.TopicIds;
import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
@ -134,7 +135,7 @@ public class ServerSideAssignorBenchmark {
Map<Uuid, TopicMetadata> topicMetadata = createTopicMetadata();
subscribedTopicDescriber = new SubscribedTopicMetadata(topicMetadata);
createAssignmentSpec();
createGroupSpec();
partitionAssignor = assignorType.assignor();
@ -175,8 +176,8 @@ public class ServerSideAssignorBenchmark {
return topicMetadata;
}
private void createAssignmentSpec() {
Map<String, AssignmentMemberSpec> members = new HashMap<>();
private void createGroupSpec() {
Map<String, MemberSubscriptionSpecImpl> members = new HashMap<>();
// In the rebalance case, we will add the last member as a trigger.
// This is done to keep the total members count consistent with the input.
@ -215,7 +216,11 @@ public class ServerSideAssignorBenchmark {
}
}
this.groupSpec = new GroupSpecImpl(members, subscriptionType, Collections.emptyMap());
this.groupSpec = new GroupSpecImpl(
members,
subscriptionType,
Collections.emptyMap()
);
}
private Optional<String> rackId(int memberIndex) {
@ -223,18 +228,17 @@ public class ServerSideAssignorBenchmark {
}
private void addMemberSpec(
Map<String, AssignmentMemberSpec> members,
Map<String, MemberSubscriptionSpecImpl> members,
int memberIndex,
Set<Uuid> subscribedTopicIds
) {
String memberId = "member" + memberIndex;
Optional<String> rackId = rackId(memberIndex);
members.put(memberId, new AssignmentMemberSpec(
Optional.empty(),
members.put(memberId, new MemberSubscriptionSpecImpl(
rackId,
subscribedTopicIds,
Collections.emptyMap()
Assignment.EMPTY
));
}
@ -256,38 +260,40 @@ public class ServerSideAssignorBenchmark {
Map<Uuid, Map<Integer, String>> invertedTargetAssignment = AssignorBenchmarkUtils.computeInvertedTargetAssignment(initialAssignment);
Map<String, AssignmentMemberSpec> updatedMembers = new HashMap<>();
Map<String, MemberSubscriptionSpecImpl> updatedMemberSpec = new HashMap<>();
groupSpec.members().forEach((memberId, assignmentMemberSpec) -> {
for (String memberId : groupSpec.memberIds()) {
MemberAssignment memberAssignment = members.getOrDefault(
memberId,
new MemberAssignment(Collections.emptyMap())
);
updatedMembers.put(memberId, new AssignmentMemberSpec(
assignmentMemberSpec.instanceId(),
assignmentMemberSpec.rackId(),
assignmentMemberSpec.subscribedTopicIds(),
Collections.unmodifiableMap(memberAssignment.targetPartitions())
updatedMemberSpec.put(memberId, new MemberSubscriptionSpecImpl(
groupSpec.memberSubscription(memberId).rackId(),
groupSpec.memberSubscription(memberId).subscribedTopicIds(),
new Assignment(Collections.unmodifiableMap(memberAssignment.targetPartitions()))
));
});
}
Set<Uuid> subscribedTopicIdsForNewMember;
if (subscriptionType == HETEROGENEOUS) {
subscribedTopicIdsForNewMember = updatedMembers.get("member" + (memberCount - 2)).subscribedTopicIds();
subscribedTopicIdsForNewMember = updatedMemberSpec.get("member" + (memberCount - 2)).subscribedTopicIds();
} else {
subscribedTopicIdsForNewMember = new TopicIds(new HashSet<>(allTopicNames), topicsImage);
}
Optional<String> rackId = rackId(memberCount - 1);
updatedMembers.put("newMember", new AssignmentMemberSpec(
Optional.empty(),
updatedMemberSpec.put("newMember", new MemberSubscriptionSpecImpl(
rackId,
subscribedTopicIdsForNewMember,
Collections.emptyMap()
Assignment.EMPTY
));
groupSpec = new GroupSpecImpl(updatedMembers, subscriptionType, invertedTargetAssignment);
groupSpec = new GroupSpecImpl(
updatedMemberSpec,
subscriptionType,
invertedTargetAssignment
);
}
@Benchmark

View File

@ -17,10 +17,10 @@
package org.apache.kafka.jmh.assignor;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
import org.apache.kafka.coordinator.group.assignor.GroupSpecImpl;
import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.assignor.MemberSubscriptionSpecImpl;
import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
import org.apache.kafka.coordinator.group.consumer.TopicIds;
import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
@ -191,19 +191,22 @@ public class TargetAssignmentBuilderBenchmark {
}
private void createAssignmentSpec() {
Map<String, AssignmentMemberSpec> members = new HashMap<>();
Map<String, MemberSubscriptionSpecImpl> members = new HashMap<>();
for (int i = 0; i < memberCount - 1; i++) {
String memberId = "member" + i;
members.put(memberId, new AssignmentMemberSpec(
Optional.empty(),
members.put(memberId, new MemberSubscriptionSpecImpl(
Optional.empty(),
new TopicIds(new HashSet<>(allTopicNames), topicsImage),
Collections.emptyMap()
Assignment.EMPTY
));
}
groupSpec = new GroupSpecImpl(members, HOMOGENEOUS, Collections.emptyMap());
groupSpec = new GroupSpecImpl(
members,
HOMOGENEOUS,
Collections.emptyMap()
);
}
@Benchmark