KAFKA-16625; Reverse lookup map from topic partitions to members (#15974)

This patch speeds up the computation of the unassigned partitions by exposing the inverted target assignment. It allows the assignor to check whether a partition is assigned or not.

Reviewers: Jeff Kim <jeff.kim@confluent.io>, David Jacot <djacot@confluent.io>
This commit is contained in:
Ritika Reddy 2024-05-25 09:06:15 -07:00 committed by GitHub
parent d585a494a4
commit a8d166c00e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
23 changed files with 890 additions and 239 deletions

View File

@ -1901,6 +1901,7 @@ public class GroupMetadataManager {
.withSubscriptionMetadata(subscriptionMetadata)
.withSubscriptionType(subscriptionType)
.withTargetAssignment(group.targetAssignment())
.withInvertedTargetAssignment(group.invertedTargetAssignment())
.withTopicsImage(metadataImage.topics())
.addOrUpdateMember(updatedMember.memberId(), updatedMember);
TargetAssignmentBuilder.TargetAssignmentResult assignmentResult;

View File

@ -107,8 +107,8 @@ public class GeneralUniformAssignmentBuilder extends AbstractUniformAssignmentBu
*/
private final PartitionMovements partitionMovements;
public GeneralUniformAssignmentBuilder(AssignmentSpec assignmentSpec, SubscribedTopicDescriber subscribedTopicDescriber) {
this.members = assignmentSpec.members();
public GeneralUniformAssignmentBuilder(GroupSpec groupSpec, SubscribedTopicDescriber subscribedTopicDescriber) {
this.members = groupSpec.members();
this.subscribedTopicDescriber = subscribedTopicDescriber;
this.subscribedTopicIds = new HashSet<>();
this.membersPerTopic = new HashMap<>();

View File

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.assignor;
import org.apache.kafka.common.Uuid;
import java.util.Map;
/**
* The group metadata specifications required to compute the target assignment.
*/
public interface GroupSpec {
/**
* @return Member metadata keyed by member Id.
*/
Map<String, AssignmentMemberSpec> members();
/**
* @return The group's subscription type.
*/
SubscriptionType subscriptionType();
/**
* @return True, if the partition is currently assigned to a member.
* False, otherwise.
*/
boolean isPartitionAssigned(Uuid topicId, int partitionId);
}

View File

@ -16,13 +16,15 @@
*/
package org.apache.kafka.coordinator.group.assignor;
import org.apache.kafka.common.Uuid;
import java.util.Map;
import java.util.Objects;
/**
* The assignment specification for a consumer group.
*/
public class AssignmentSpec {
public class GroupSpecImpl implements GroupSpec {
/**
* The member metadata keyed by member Id.
*/
@ -33,44 +35,76 @@ public class AssignmentSpec {
*/
private final SubscriptionType subscriptionType;
public AssignmentSpec(
/**
* Reverse lookup map representing topic partitions with
* their current member assignments.
*/
private final Map<Uuid, Map<Integer, String>> invertedTargetAssignment;
public GroupSpecImpl(
Map<String, AssignmentMemberSpec> members,
SubscriptionType subscriptionType
SubscriptionType subscriptionType,
Map<Uuid, Map<Integer, String>> invertedTargetAssignment
) {
Objects.requireNonNull(members);
Objects.requireNonNull(subscriptionType);
Objects.requireNonNull(invertedTargetAssignment);
this.members = members;
this.subscriptionType = subscriptionType;
this.invertedTargetAssignment = invertedTargetAssignment;
}
/**
* @return Member metadata keyed by member Id.
* {@inheritDoc}
*/
@Override
public Map<String, AssignmentMemberSpec> members() {
return members;
}
/**
* @return The group's subscription type.
* {@inheritDoc}
*/
@Override
public SubscriptionType subscriptionType() {
return subscriptionType;
}
/**
* {@inheritDoc}
*/
@Override
public boolean isPartitionAssigned(Uuid topicId, int partitionId) {
Map<Integer, String> partitionMap = invertedTargetAssignment.get(topicId);
if (partitionMap == null) {
return false;
}
return partitionMap.containsKey(partitionId);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AssignmentSpec that = (AssignmentSpec) o;
GroupSpecImpl that = (GroupSpecImpl) o;
return subscriptionType == that.subscriptionType &&
members.equals(that.members);
members.equals(that.members) &&
invertedTargetAssignment.equals(that.invertedTargetAssignment);
}
@Override
public int hashCode() {
return Objects.hash(members, subscriptionType);
int result = members.hashCode();
result = 31 * result + subscriptionType.hashCode();
result = 31 * result + invertedTargetAssignment.hashCode();
return result;
}
@Override
public String toString() {
return "AssignmentSpec(members=" + members + ", subscriptionType=" + subscriptionType.toString() + ')';
return "GroupSpecImpl(members=" + members +
", subscriptionType=" + subscriptionType +
", invertedTargetAssignment=" + invertedTargetAssignment +
')';
}
}

View File

@ -59,7 +59,7 @@ public class OptimizedUniformAssignmentBuilder extends AbstractUniformAssignment
/**
* The assignment specification which includes member metadata.
*/
private final AssignmentSpec assignmentSpec;
private final GroupSpec groupSpec;
/**
* The topic and partition metadata describer.
@ -89,18 +89,19 @@ public class OptimizedUniformAssignmentBuilder extends AbstractUniformAssignment
* The partitions that still need to be assigned.
* Initially this contains all the subscribed topics' partitions.
*/
private Set<TopicIdPartition> unassignedPartitions;
private final Set<TopicIdPartition> unassignedPartitions;
/**
* The target assignment.
*/
private final Map<String, MemberAssignment> targetAssignment;
OptimizedUniformAssignmentBuilder(AssignmentSpec assignmentSpec, SubscribedTopicDescriber subscribedTopicDescriber) {
this.assignmentSpec = assignmentSpec;
OptimizedUniformAssignmentBuilder(GroupSpec groupSpec, SubscribedTopicDescriber subscribedTopicDescriber) {
this.groupSpec = groupSpec;
this.subscribedTopicDescriber = subscribedTopicDescriber;
this.subscribedTopicIds = new HashSet<>(assignmentSpec.members().values().iterator().next().subscribedTopicIds());
this.subscribedTopicIds = new HashSet<>(groupSpec.members().values().iterator().next().subscribedTopicIds());
this.potentiallyUnfilledMembers = new HashMap<>();
this.unassignedPartitions = new HashSet<>();
this.targetAssignment = new HashMap<>();
}
@ -108,9 +109,9 @@ public class OptimizedUniformAssignmentBuilder extends AbstractUniformAssignment
* Here's the step-by-step breakdown of the assignment process:
*
* <li> Compute the quotas of partitions for each member based on the total partitions and member count.</li>
* <li> Initialize unassigned partitions to all the topic partitions and
* remove partitions from the list as and when they are assigned.</li>
* <li> For existing assignments, retain partitions based on the determined quota.</li>
* <li> Initialize unassigned partitions with all the topic partitions that aren't present in the
* current target assignment.</li>
* <li> For existing assignments, retain partitions based on the determined quota. Add extras to unassigned partitions.</li>
* <li> Identify members that haven't fulfilled their partition quota or are eligible to receive extra partitions.</li>
* <li> Proceed with a round-robin assignment according to quotas.
* For each unassigned partition, locate the first compatible member from the potentially unfilled list.</li>
@ -124,6 +125,9 @@ public class OptimizedUniformAssignmentBuilder extends AbstractUniformAssignment
return new GroupAssignment(Collections.emptyMap());
}
// Check if the subscribed topicId is still valid.
// Update unassigned partitions based on the current target assignment
// and topic metadata.
for (Uuid topicId : subscribedTopicIds) {
int partitionCount = subscribedTopicDescriber.numPartitions(topicId);
if (partitionCount == -1) {
@ -131,21 +135,25 @@ public class OptimizedUniformAssignmentBuilder extends AbstractUniformAssignment
"Members are subscribed to topic " + topicId + " which doesn't exist in the topic metadata."
);
} else {
for (int i = 0; i < partitionCount; i++) {
if (!groupSpec.isPartitionAssigned(topicId, i)) {
unassignedPartitions.add(new TopicIdPartition(topicId, i));
}
}
totalPartitionsCount += partitionCount;
}
}
// The minimum required quota that each member needs to meet for a balanced assignment.
// This is the same for all members.
final int numberOfMembers = assignmentSpec.members().size();
final int numberOfMembers = groupSpec.members().size();
final int minQuota = totalPartitionsCount / numberOfMembers;
remainingMembersToGetAnExtraPartition = totalPartitionsCount % numberOfMembers;
assignmentSpec.members().keySet().forEach(memberId ->
groupSpec.members().keySet().forEach(memberId ->
targetAssignment.put(memberId, new MemberAssignment(new HashMap<>())
));
unassignedPartitions = topicIdPartitions(subscribedTopicIds, subscribedTopicDescriber);
potentiallyUnfilledMembers = assignStickyPartitions(minQuota);
unassignedPartitionsRoundRobinAssignment();
@ -179,7 +187,7 @@ public class OptimizedUniformAssignmentBuilder extends AbstractUniformAssignment
private Map<String, Integer> assignStickyPartitions(int minQuota) {
Map<String, Integer> potentiallyUnfilledMembers = new HashMap<>();
assignmentSpec.members().forEach((memberId, assignmentMemberSpec) -> {
groupSpec.members().forEach((memberId, assignmentMemberSpec) -> {
List<TopicIdPartition> validCurrentMemberAssignment = validCurrentMemberAssignment(
assignmentMemberSpec.assignedPartitions()
);
@ -198,21 +206,29 @@ public class OptimizedUniformAssignmentBuilder extends AbstractUniformAssignment
topicIdPartition.topicId(),
topicIdPartition.partitionId()
);
unassignedPartitions.remove(topicIdPartition);
});
if (remaining < 0) {
// The extra partition is located at the last index from the previous step.
if (remaining < 0 && remainingMembersToGetAnExtraPartition > 0) {
TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(retainedPartitionsCount);
if (remainingMembersToGetAnExtraPartition > 0) {
TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(retainedPartitionsCount++);
addPartitionToAssignment(
targetAssignment,
memberId,
topicIdPartition.topicId(),
topicIdPartition.partitionId()
);
unassignedPartitions.remove(topicIdPartition);
remainingMembersToGetAnExtraPartition--;
}
// Any previously owned partitions that weren't retained due to the quotas
// are added to the unassigned partitions set.
if (retainedPartitionsCount < currentAssignmentSize) {
unassignedPartitions.addAll(validCurrentMemberAssignment.subList(
retainedPartitionsCount,
currentAssignmentSize
));
}
}
}
if (remaining >= 0) {

View File

@ -34,12 +34,12 @@ public interface PartitionAssignor {
/**
* Assigns partitions to group members based on the given assignment specification and topic metadata.
*
* @param assignmentSpec The assignment spec which includes member metadata.
* @param groupSpec The assignment spec which includes member metadata.
* @param subscribedTopicDescriber The topic and partition metadata describer.
* @return The new assignment for the group.
*/
GroupAssignment assign(
AssignmentSpec assignmentSpec,
GroupSpec groupSpec,
SubscribedTopicDescriber subscribedTopicDescriber
) throws PartitionAssignorException;
}

View File

@ -19,7 +19,7 @@ package org.apache.kafka.coordinator.group.assignor;
import org.apache.kafka.common.errors.ApiException;
/**
* Exception thrown by {@link PartitionAssignor#assign(AssignmentSpec)}. The exception
* Exception thrown by {@link PartitionAssignor#assign(GroupSpec, SubscribedTopicDescriber)}}. The exception
* is only used internally.
*/
public class PartitionAssignorException extends ApiException {

View File

@ -81,20 +81,20 @@ public class RangeAssignor implements PartitionAssignor {
* Returns a map of topic Ids to a list of members subscribed to them,
* based on the given assignment specification and metadata.
*
* @param assignmentSpec The specification for member assignments.
* @param groupSpec The specification for member assignments.
* @param subscribedTopicDescriber The metadata describer for subscribed topics and clusters.
* @return A map of topic Ids to a list of member Ids subscribed to them.
*
* @throws PartitionAssignorException If a member is subscribed to a non-existent topic.
*/
private Map<Uuid, Collection<String>> membersPerTopic(
final AssignmentSpec assignmentSpec,
final GroupSpec groupSpec,
final SubscribedTopicDescriber subscribedTopicDescriber
) {
Map<Uuid, Collection<String>> membersPerTopic = new HashMap<>();
Map<String, AssignmentMemberSpec> membersData = assignmentSpec.members();
Map<String, AssignmentMemberSpec> membersData = groupSpec.members();
if (assignmentSpec.subscriptionType().equals(HOMOGENEOUS)) {
if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) {
Set<String> allMembers = membersData.keySet();
Collection<Uuid> topics = membersData.values().iterator().next().subscribedTopicIds();
@ -139,7 +139,7 @@ public class RangeAssignor implements PartitionAssignor {
*/
@Override
public GroupAssignment assign(
final AssignmentSpec assignmentSpec,
final GroupSpec groupSpec,
final SubscribedTopicDescriber subscribedTopicDescriber
) throws PartitionAssignorException {
@ -147,7 +147,7 @@ public class RangeAssignor implements PartitionAssignor {
// Step 1
Map<Uuid, Collection<String>> membersPerTopic = membersPerTopic(
assignmentSpec,
groupSpec,
subscribedTopicDescriber
);
@ -162,7 +162,7 @@ public class RangeAssignor implements PartitionAssignor {
List<MemberWithRemainingAssignments> potentiallyUnfilledMembers = new ArrayList<>();
for (String memberId : membersForTopic) {
Set<Integer> assignedPartitionsForTopic = assignmentSpec.members().get(memberId)
Set<Integer> assignedPartitionsForTopic = groupSpec.members().get(memberId)
.assignedPartitions().getOrDefault(topicId, Collections.emptySet());
int currentAssignmentSize = assignedPartitionsForTopic.size();

View File

@ -57,28 +57,28 @@ public class UniformAssignor implements PartitionAssignor {
* Perform the group assignment given the current members and
* topics metadata.
*
* @param assignmentSpec The assignment specification that included member metadata.
* @param groupSpec The assignment specification that included member metadata.
* @param subscribedTopicDescriber The topic and cluster metadata describer {@link SubscribedTopicDescriber}.
* @return The new target assignment for the group.
*/
@Override
public GroupAssignment assign(
AssignmentSpec assignmentSpec,
GroupSpec groupSpec,
SubscribedTopicDescriber subscribedTopicDescriber
) throws PartitionAssignorException {
AbstractUniformAssignmentBuilder assignmentBuilder;
if (assignmentSpec.members().isEmpty())
if (groupSpec.members().isEmpty())
return new GroupAssignment(Collections.emptyMap());
if (assignmentSpec.subscriptionType().equals(HOMOGENEOUS)) {
if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) {
LOG.debug("Detected that all members are subscribed to the same set of topics, invoking the "
+ "optimized assignment algorithm");
assignmentBuilder = new OptimizedUniformAssignmentBuilder(assignmentSpec, subscribedTopicDescriber);
assignmentBuilder = new OptimizedUniformAssignmentBuilder(groupSpec, subscribedTopicDescriber);
} else {
LOG.debug("Detected that the members are subscribed to different sets of topics, invoking the "
+ "general assignment algorithm");
assignmentBuilder = new GeneralUniformAssignmentBuilder(assignmentSpec, subscribedTopicDescriber);
assignmentBuilder = new GeneralUniformAssignmentBuilder(groupSpec, subscribedTopicDescriber);
}
return assignmentBuilder.buildAssignment();

View File

@ -170,6 +170,12 @@ public class ConsumerGroup implements Group {
*/
private final TimelineHashMap<String, Assignment> targetAssignment;
/**
* Reverse lookup map representing topic partitions with
* their current member assignments.
*/
private final TimelineHashMap<Uuid, TimelineHashMap<Integer, String>> invertedTargetAssignment;
/**
* The current partition epoch maps each topic-partitions to their current epoch where
* the epoch is the epoch of their owners. When a member revokes a partition, it removes
@ -221,6 +227,7 @@ public class ConsumerGroup implements Group {
this.subscriptionType = new TimelineObject<>(snapshotRegistry, HOMOGENEOUS);
this.targetAssignmentEpoch = new TimelineInteger(snapshotRegistry);
this.targetAssignment = new TimelineHashMap<>(snapshotRegistry, 0);
this.invertedTargetAssignment = new TimelineHashMap<>(snapshotRegistry, 0);
this.currentPartitionEpoch = new TimelineHashMap<>(snapshotRegistry, 0);
this.metrics = Objects.requireNonNull(metrics);
this.numClassicProtocolMembers = new TimelineInteger(snapshotRegistry);
@ -517,21 +524,89 @@ public class ConsumerGroup implements Group {
}
/**
* Updates target assignment of a member.
* @return An immutable map containing all the topic partitions
* with their current member assignments.
*/
public Map<Uuid, Map<Integer, String>> invertedTargetAssignment() {
return Collections.unmodifiableMap(invertedTargetAssignment);
}
/**
* Updates the target assignment of a member.
*
* @param memberId The member id.
* @param newTargetAssignment The new target assignment.
*/
public void updateTargetAssignment(String memberId, Assignment newTargetAssignment) {
updateInvertedTargetAssignment(
memberId,
targetAssignment.getOrDefault(memberId, new Assignment(Collections.emptyMap())),
newTargetAssignment
);
targetAssignment.put(memberId, newTargetAssignment);
}
/**
* Updates the reverse lookup map of the target assignment.
*
* @param memberId The member Id.
* @param oldTargetAssignment The old target assignment.
* @param newTargetAssignment The new target assignment.
*/
private void updateInvertedTargetAssignment(
String memberId,
Assignment oldTargetAssignment,
Assignment newTargetAssignment
) {
// Combine keys from both old and new assignments.
Set<Uuid> allTopicIds = new HashSet<>();
allTopicIds.addAll(oldTargetAssignment.partitions().keySet());
allTopicIds.addAll(newTargetAssignment.partitions().keySet());
for (Uuid topicId : allTopicIds) {
Set<Integer> oldPartitions = oldTargetAssignment.partitions().getOrDefault(topicId, Collections.emptySet());
Set<Integer> newPartitions = newTargetAssignment.partitions().getOrDefault(topicId, Collections.emptySet());
TimelineHashMap<Integer, String> topicPartitionAssignment = invertedTargetAssignment.computeIfAbsent(
topicId, k -> new TimelineHashMap<>(snapshotRegistry, Math.max(oldPartitions.size(), newPartitions.size()))
);
// Remove partitions that aren't present in the new assignment only if the partition is currently
// still assigned to the member in question.
// If p0 was moved from A to B, and the target assignment map was updated for B first, we don't want to
// remove the key p0 from the inverted map and undo the action when A eventually tries to update its assignment.
for (Integer partition : oldPartitions) {
if (!newPartitions.contains(partition) && memberId.equals(topicPartitionAssignment.get(partition))) {
topicPartitionAssignment.remove(partition);
}
}
// Add partitions that are in the new assignment but not in the old assignment.
for (Integer partition : newPartitions) {
if (!oldPartitions.contains(partition)) {
topicPartitionAssignment.put(partition, memberId);
}
}
if (topicPartitionAssignment.isEmpty()) {
invertedTargetAssignment.remove(topicId);
} else {
invertedTargetAssignment.put(topicId, topicPartitionAssignment);
}
}
}
/**
* Removes the target assignment of a member.
*
* @param memberId The member id.
*/
public void removeTargetAssignment(String memberId) {
updateInvertedTargetAssignment(
memberId,
targetAssignment.getOrDefault(memberId, Assignment.EMPTY),
Assignment.EMPTY
);
targetAssignment.remove(memberId);
}

View File

@ -19,7 +19,7 @@ package org.apache.kafka.coordinator.group.consumer;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.CoordinatorRecord;
import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
import org.apache.kafka.coordinator.group.assignor.GroupSpecImpl;
import org.apache.kafka.coordinator.group.assignor.SubscriptionType;
import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
@ -126,6 +126,12 @@ public class TargetAssignmentBuilder {
*/
private Map<String, Assignment> targetAssignment = Collections.emptyMap();
/**
* Reverse lookup map representing topic partitions with
* their current member assignments.
*/
private Map<Uuid, Map<Integer, String>> invertedTargetAssignment = Collections.emptyMap();
/**
* The topics image.
*/
@ -224,6 +230,19 @@ public class TargetAssignmentBuilder {
return this;
}
/**
* Adds the existing topic partition assignments.
*
* @param invertedTargetAssignment The reverse lookup map of the current target assignment.
* @return This object.
*/
public TargetAssignmentBuilder withInvertedTargetAssignment(
Map<Uuid, Map<Integer, String>> invertedTargetAssignment
) {
this.invertedTargetAssignment = invertedTargetAssignment;
return this;
}
/**
* Adds the topics image.
*
@ -317,7 +336,11 @@ public class TargetAssignmentBuilder {
// Compute the assignment.
GroupAssignment newGroupAssignment = assignor.assign(
new AssignmentSpec(Collections.unmodifiableMap(memberSpecs), subscriptionType),
new GroupSpecImpl(
Collections.unmodifiableMap(memberSpecs),
subscriptionType,
invertedTargetAssignment
),
new SubscribedTopicMetadata(topicMetadataMap)
);

View File

@ -17,6 +17,7 @@
package org.apache.kafka.coordinator.group;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
import java.util.AbstractMap;
@ -82,4 +83,33 @@ public class AssignmentTestUtil {
assertEquals(expectedAssignment.get(memberId), computedAssignmentForMember);
});
}
/**
* Generate a reverse look up map of partition to member target assignments from the given member spec.
*
* @param memberSpec A map where the key is the member Id and the value is an
* AssignmentMemberSpec object containing the member's partition assignments.
* @return Map of topic partition to member assignments.
*/
public static Map<Uuid, Map<Integer, String>> invertedTargetAssignment(
Map<String, AssignmentMemberSpec> memberSpec
) {
Map<Uuid, Map<Integer, String>> invertedTargetAssignment = new HashMap<>();
for (Map.Entry<String, AssignmentMemberSpec> memberEntry : memberSpec.entrySet()) {
String memberId = memberEntry.getKey();
Map<Uuid, Set<Integer>> topicsAndPartitions = memberEntry.getValue().assignedPartitions();
for (Map.Entry<Uuid, Set<Integer>> topicEntry : topicsAndPartitions.entrySet()) {
Uuid topicId = topicEntry.getKey();
Set<Integer> partitions = topicEntry.getValue();
Map<Integer, String> partitionMap = invertedTargetAssignment.computeIfAbsent(topicId, k -> new HashMap<>());
for (Integer partitionId : partitions) {
partitionMap.put(partitionId, memberId);
}
}
}
return invertedTargetAssignment;
}
}

View File

@ -17,7 +17,7 @@
package org.apache.kafka.coordinator.group;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
import org.apache.kafka.coordinator.group.assignor.GroupSpec;
import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
@ -45,7 +45,7 @@ public class MockPartitionAssignor implements PartitionAssignor {
}
@Override
public GroupAssignment assign(AssignmentSpec assignmentSpec, SubscribedTopicDescriber subscribedTopicDescriber) throws PartitionAssignorException {
public GroupAssignment assign(GroupSpec groupSpec, SubscribedTopicDescriber subscribedTopicDescriber) throws PartitionAssignorException {
return prepareGroupAssignment;
}

View File

@ -16,7 +16,7 @@
*/
package org.apache.kafka.coordinator.group;
import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
import org.apache.kafka.coordinator.group.assignor.GroupSpec;
import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
@ -34,8 +34,8 @@ public class NoOpPartitionAssignor implements PartitionAssignor {
}
@Override
public GroupAssignment assign(AssignmentSpec assignmentSpec, SubscribedTopicDescriber subscribedTopicDescriber) {
return new GroupAssignment(assignmentSpec.members().entrySet()
public GroupAssignment assign(GroupSpec groupSpec, SubscribedTopicDescriber subscribedTopicDescriber) {
return new GroupAssignment(groupSpec.members().entrySet()
.stream()
.collect(Collectors.toMap(
Map.Entry::getKey,

View File

@ -32,6 +32,7 @@ import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.assertAssignment;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.invertedTargetAssignment;
import static org.apache.kafka.coordinator.group.CoordinatorRecordHelpersTest.mkMapOfPartitionRacks;
import static org.apache.kafka.coordinator.group.assignor.SubscriptionType.HETEROGENEOUS;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -79,8 +80,16 @@ public class GeneralUniformAssignmentBuilderTest {
Collections.emptyMap()
));
AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS);
GroupAssignment groupAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
GroupSpec groupSpec = new GroupSpecImpl(
members,
HETEROGENEOUS,
Collections.emptyMap()
);
GroupAssignment groupAssignment = assignor.assign(
groupSpec,
subscribedTopicMetadata
);
assertEquals(Collections.emptyMap(), groupAssignment.members());
}
@ -113,10 +122,14 @@ public class GeneralUniformAssignmentBuilderTest {
Collections.emptyMap()
));
AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS);
GroupSpec groupSpec = new GroupSpecImpl(
members,
HETEROGENEOUS,
Collections.emptyMap()
);
assertThrows(PartitionAssignorException.class,
() -> assignor.assign(assignmentSpec, subscribedTopicMetadata));
() -> assignor.assign(groupSpec, subscribedTopicMetadata));
}
@Test
@ -149,10 +162,17 @@ public class GeneralUniformAssignmentBuilderTest {
Collections.emptyMap()
));
AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS);
GroupSpec groupSpec = new GroupSpecImpl(
members,
HETEROGENEOUS,
Collections.emptyMap()
);
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
subscribedTopicMetadata
);
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(memberA, mkAssignment(
@ -202,10 +222,17 @@ public class GeneralUniformAssignmentBuilderTest {
Collections.emptyMap()
));
AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS);
GroupSpec groupSpec = new GroupSpecImpl(
members,
HETEROGENEOUS,
Collections.emptyMap()
);
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
subscribedTopicMetadata
);
// Topic 3 has 2 partitions but three members subscribed to it - one of them should not get an assignment.
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
@ -285,10 +312,17 @@ public class GeneralUniformAssignmentBuilderTest {
currentAssignmentForC
));
AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS);
GroupSpec groupSpec = new GroupSpecImpl(
members,
HETEROGENEOUS,
invertedTargetAssignment(members)
);
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
subscribedTopicMetadata
);
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(memberA, mkAssignment(
@ -362,10 +396,17 @@ public class GeneralUniformAssignmentBuilderTest {
currentAssignmentForB
));
AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS);
GroupSpec groupSpec = new GroupSpecImpl(
members,
HETEROGENEOUS,
invertedTargetAssignment(members)
);
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
subscribedTopicMetadata
);
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(memberA, mkAssignment(
@ -429,10 +470,17 @@ public class GeneralUniformAssignmentBuilderTest {
Collections.emptyMap()
));
AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS);
GroupSpec groupSpec = new GroupSpecImpl(
members,
HETEROGENEOUS,
invertedTargetAssignment(members)
);
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
subscribedTopicMetadata
);
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(memberA, mkAssignment(
@ -496,10 +544,17 @@ public class GeneralUniformAssignmentBuilderTest {
// Member C was removed
AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS);
GroupSpec groupSpec = new GroupSpecImpl(
members,
HETEROGENEOUS,
invertedTargetAssignment(members)
);
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
subscribedTopicMetadata
);
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(memberA, mkAssignment(
@ -554,10 +609,17 @@ public class GeneralUniformAssignmentBuilderTest {
currentAssignmentForB
));
AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS);
GroupSpec groupSpec = new GroupSpecImpl(
members,
HETEROGENEOUS,
invertedTargetAssignment(members)
);
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
subscribedTopicMetadata
);
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(memberA, mkAssignment(

View File

@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.assignor;
import org.apache.kafka.common.Uuid;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class GroupSpecImplTest {
private Map<String, AssignmentMemberSpec> members;
private SubscriptionType subscriptionType;
private Map<Uuid, Map<Integer, String>> invertedTargetAssignment;
private GroupSpecImpl groupSpec;
private Uuid topicId;
@BeforeEach
void setUp() {
members = new HashMap<>();
subscriptionType = SubscriptionType.HOMOGENEOUS;
invertedTargetAssignment = new HashMap<>();
topicId = Uuid.randomUuid();
members.put("test-member", new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
new HashSet<>(Collections.singletonList(topicId)),
Collections.emptyMap())
);
groupSpec = new GroupSpecImpl(
members,
subscriptionType,
invertedTargetAssignment
);
}
@Test
void testMembers() {
assertEquals(members, groupSpec.members());
}
@Test
void testSubscriptionType() {
assertEquals(subscriptionType, groupSpec.subscriptionType());
}
@Test
void testIsPartitionAssigned() {
Map<Integer, String> partitionMap = new HashMap<>();
partitionMap.put(1, "test-member");
invertedTargetAssignment.put(topicId, partitionMap);
assertTrue(groupSpec.isPartitionAssigned(topicId, 1));
assertFalse(groupSpec.isPartitionAssigned(topicId, 2));
assertFalse(groupSpec.isPartitionAssigned(Uuid.randomUuid(), 2));
}
}

View File

@ -35,6 +35,7 @@ import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.assertAssignment;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.invertedTargetAssignment;
import static org.apache.kafka.coordinator.group.CoordinatorRecordHelpersTest.mkMapOfPartitionRacks;
import static org.apache.kafka.coordinator.group.assignor.SubscriptionType.HOMOGENEOUS;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -77,8 +78,16 @@ public class OptimizedUniformAssignmentBuilderTest {
)
);
AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS);
GroupAssignment groupAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
GroupSpec groupSpec = new GroupSpecImpl(
members,
HOMOGENEOUS,
Collections.emptyMap()
);
GroupAssignment groupAssignment = assignor.assign(
groupSpec,
subscribedTopicMetadata
);
assertEquals(Collections.emptyMap(), groupAssignment.members());
}
@ -107,10 +116,14 @@ public class OptimizedUniformAssignmentBuilderTest {
)
);
AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS);
GroupSpec groupSpec = new GroupSpecImpl(
members,
HOMOGENEOUS,
Collections.emptyMap()
);
assertThrows(PartitionAssignorException.class,
() -> assignor.assign(assignmentSpec, subscribedTopicMetadata));
() -> assignor.assign(groupSpec, subscribedTopicMetadata));
}
@Test
@ -143,11 +156,6 @@ public class OptimizedUniformAssignmentBuilderTest {
Collections.emptyMap()
));
AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS);
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(memberA, mkAssignment(
mkTopicAssignment(topic1Uuid, 0, 2),
@ -158,6 +166,18 @@ public class OptimizedUniformAssignmentBuilderTest {
mkTopicAssignment(topic3Uuid, 0)
));
GroupSpec groupSpec = new GroupSpecImpl(
members,
HOMOGENEOUS,
Collections.emptyMap()
);
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
subscribedTopicMetadata
);
assertAssignment(expectedAssignment, computedAssignment);
checkValidityAndBalance(members, computedAssignment);
}
@ -192,11 +212,6 @@ public class OptimizedUniformAssignmentBuilderTest {
Collections.emptyMap()
));
AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS);
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
// Topic 3 has 2 partitions but three members subscribed to it - one of them should not get an assignment.
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(memberA, mkAssignment(
@ -209,6 +224,18 @@ public class OptimizedUniformAssignmentBuilderTest {
Collections.emptyMap()
);
GroupSpec groupSpec = new GroupSpecImpl(
members,
HOMOGENEOUS,
Collections.emptyMap()
);
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
subscribedTopicMetadata
);
assertAssignment(expectedAssignment, computedAssignment);
checkValidityAndBalance(members, computedAssignment);
}
@ -236,10 +263,17 @@ public class OptimizedUniformAssignmentBuilderTest {
));
}
AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS);
GroupSpec groupSpec = new GroupSpecImpl(
members,
HOMOGENEOUS,
Collections.emptyMap()
);
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
subscribedTopicMetadata
);
checkValidityAndBalance(members, computedAssignment);
}
@ -261,7 +295,6 @@ public class OptimizedUniformAssignmentBuilderTest {
));
Map<String, AssignmentMemberSpec> members = new TreeMap<>();
Map<Uuid, Set<Integer>> currentAssignmentForA = new TreeMap<>(
mkAssignment(
mkTopicAssignment(topic1Uuid, 0, 1),
@ -288,11 +321,6 @@ public class OptimizedUniformAssignmentBuilderTest {
currentAssignmentForB
));
AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS);
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(memberA, mkAssignment(
mkTopicAssignment(topic1Uuid, 0, 1),
@ -303,6 +331,18 @@ public class OptimizedUniformAssignmentBuilderTest {
mkTopicAssignment(topic2Uuid, 1, 2)
));
GroupSpec groupSpec = new GroupSpecImpl(
members,
HOMOGENEOUS,
invertedTargetAssignment(members)
);
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
subscribedTopicMetadata
);
assertAssignment(expectedAssignment, computedAssignment);
checkValidityAndBalance(members, computedAssignment);
}
@ -352,11 +392,6 @@ public class OptimizedUniformAssignmentBuilderTest {
currentAssignmentForB
));
AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS);
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(memberA, mkAssignment(
mkTopicAssignment(topic1Uuid, 0, 2, 3, 5),
@ -367,6 +402,18 @@ public class OptimizedUniformAssignmentBuilderTest {
mkTopicAssignment(topic2Uuid, 1, 2, 3)
));
GroupSpec groupSpec = new GroupSpecImpl(
members,
HOMOGENEOUS,
invertedTargetAssignment(members)
);
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
subscribedTopicMetadata
);
assertAssignment(expectedAssignment, computedAssignment);
checkValidityAndBalance(members, computedAssignment);
}
@ -419,11 +466,6 @@ public class OptimizedUniformAssignmentBuilderTest {
Collections.emptyMap()
));
AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS);
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(memberA, mkAssignment(
mkTopicAssignment(topic1Uuid, 0, 2)
@ -436,6 +478,18 @@ public class OptimizedUniformAssignmentBuilderTest {
mkTopicAssignment(topic2Uuid, 0, 2)
));
GroupSpec groupSpec = new GroupSpecImpl(
members,
HOMOGENEOUS,
invertedTargetAssignment(members)
);
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
subscribedTopicMetadata
);
assertAssignment(expectedAssignment, computedAssignment);
checkValidityAndBalance(members, computedAssignment);
}
@ -482,11 +536,6 @@ public class OptimizedUniformAssignmentBuilderTest {
// Member C was removed
AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS);
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(memberA, mkAssignment(
mkTopicAssignment(topic1Uuid, 0, 2),
@ -497,6 +546,18 @@ public class OptimizedUniformAssignmentBuilderTest {
mkTopicAssignment(topic2Uuid, 1, 2)
));
GroupSpec groupSpec = new GroupSpecImpl(
members,
HOMOGENEOUS,
invertedTargetAssignment(members)
);
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
subscribedTopicMetadata
);
assertAssignment(expectedAssignment, computedAssignment);
checkValidityAndBalance(members, computedAssignment);
}
@ -542,11 +603,6 @@ public class OptimizedUniformAssignmentBuilderTest {
currentAssignmentForB
));
AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS);
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(memberA, mkAssignment(
mkTopicAssignment(topic2Uuid, 0)
@ -555,6 +611,18 @@ public class OptimizedUniformAssignmentBuilderTest {
mkTopicAssignment(topic2Uuid, 1)
));
GroupSpec groupSpec = new GroupSpecImpl(
members,
HOMOGENEOUS,
invertedTargetAssignment(members)
);
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
subscribedTopicMetadata
);
assertAssignment(expectedAssignment, computedAssignment);
checkValidityAndBalance(members, computedAssignment);
}

View File

@ -32,6 +32,7 @@ import java.util.TreeMap;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.invertedTargetAssignment;
import static org.apache.kafka.coordinator.group.assignor.SubscriptionType.HETEROGENEOUS;
import static org.apache.kafka.coordinator.group.assignor.SubscriptionType.HOMOGENEOUS;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -74,8 +75,16 @@ public class RangeAssignorTest {
)
);
AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS);
GroupAssignment groupAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
GroupSpec groupSpec = new GroupSpecImpl(
members,
HOMOGENEOUS,
Collections.emptyMap()
);
GroupAssignment groupAssignment = assignor.assign(
groupSpec,
subscribedTopicMetadata
);
assertEquals(Collections.emptyMap(), groupAssignment.members());
}
@ -104,10 +113,14 @@ public class RangeAssignorTest {
)
);
AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS);
GroupSpec groupSpec = new GroupSpecImpl(
members,
HOMOGENEOUS,
Collections.emptyMap()
);
assertThrows(PartitionAssignorException.class,
() -> assignor.assign(assignmentSpec, subscribedTopicMetadata));
() -> assignor.assign(groupSpec, subscribedTopicMetadata));
}
@Test
@ -126,8 +139,6 @@ public class RangeAssignorTest {
createPartitionRacks(2)
));
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
Map<String, AssignmentMemberSpec> members = new TreeMap<>();
members.put(consumerA, new AssignmentMemberSpec(
@ -144,16 +155,23 @@ public class RangeAssignorTest {
Collections.emptyMap()
));
AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS);
GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
GroupSpec groupSpec = new GroupSpecImpl(
members,
HOMOGENEOUS,
Collections.emptyMap()
);
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
subscribedTopicMetadata
);
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(consumerA, mkAssignment(
mkTopicAssignment(topic1Uuid, 0, 1),
mkTopicAssignment(topic3Uuid, 0)
));
expectedAssignment.put(consumerB, mkAssignment(
mkTopicAssignment(topic1Uuid, 2),
mkTopicAssignment(topic3Uuid, 1)
@ -184,8 +202,6 @@ public class RangeAssignorTest {
createPartitionRacks(2)
));
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
Map<String, AssignmentMemberSpec> members = new TreeMap<>();
members.put(consumerA, new AssignmentMemberSpec(
@ -209,20 +225,26 @@ public class RangeAssignorTest {
Collections.emptyMap()
));
AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS);
GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
GroupSpec groupSpec = new GroupSpecImpl(
members,
HETEROGENEOUS,
Collections.emptyMap()
);
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
subscribedTopicMetadata
);
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(consumerA, mkAssignment(
mkTopicAssignment(topic1Uuid, 0, 1, 2),
mkTopicAssignment(topic2Uuid, 0, 1)
));
expectedAssignment.put(consumerB, mkAssignment(
mkTopicAssignment(topic3Uuid, 0)
));
expectedAssignment.put(consumerC, mkAssignment(
mkTopicAssignment(topic2Uuid, 2),
mkTopicAssignment(topic3Uuid, 1)
@ -247,8 +269,6 @@ public class RangeAssignorTest {
createPartitionRacks(2)
));
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
Map<String, AssignmentMemberSpec> members = new TreeMap<>();
members.put(consumerA, new AssignmentMemberSpec(
@ -272,8 +292,17 @@ public class RangeAssignorTest {
Collections.emptyMap()
));
AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS);
GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
GroupSpec groupSpec = new GroupSpecImpl(
members,
HOMOGENEOUS,
Collections.emptyMap()
);
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
subscribedTopicMetadata
);
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
// Topic 3 has 2 partitions but three consumers subscribed to it - one of them will not get a partition.
@ -281,12 +310,10 @@ public class RangeAssignorTest {
mkTopicAssignment(topic1Uuid, 0),
mkTopicAssignment(topic3Uuid, 0)
));
expectedAssignment.put(consumerB, mkAssignment(
mkTopicAssignment(topic1Uuid, 1),
mkTopicAssignment(topic3Uuid, 1)
));
expectedAssignment.put(consumerC, mkAssignment(
mkTopicAssignment(topic1Uuid, 2)
));
@ -310,15 +337,12 @@ public class RangeAssignorTest {
createPartitionRacks(2)
));
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
Map<String, AssignmentMemberSpec> members = new TreeMap<>();
Map<Uuid, Set<Integer>> currentAssignmentForA = mkAssignment(
mkTopicAssignment(topic1Uuid, 0),
mkTopicAssignment(topic2Uuid, 0)
);
members.put(consumerA, new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
@ -330,7 +354,6 @@ public class RangeAssignorTest {
mkTopicAssignment(topic1Uuid, 1),
mkTopicAssignment(topic2Uuid, 1)
);
members.put(consumerB, new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
@ -346,16 +369,23 @@ public class RangeAssignorTest {
Collections.emptyMap()
));
AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS);
GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
GroupSpec groupSpec = new GroupSpecImpl(
members,
HOMOGENEOUS,
invertedTargetAssignment(members)
);
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
subscribedTopicMetadata
);
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(consumerA, mkAssignment(
mkTopicAssignment(topic1Uuid, 0),
mkTopicAssignment(topic2Uuid, 0)
));
expectedAssignment.put(consumerB, mkAssignment(
mkTopicAssignment(topic1Uuid, 1),
mkTopicAssignment(topic2Uuid, 1)
@ -383,15 +413,12 @@ public class RangeAssignorTest {
createPartitionRacks(4)
));
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
Map<String, AssignmentMemberSpec> members = new TreeMap<>();
Map<Uuid, Set<Integer>> currentAssignmentForA = mkAssignment(
mkTopicAssignment(topic1Uuid, 0, 1),
mkTopicAssignment(topic2Uuid, 0, 1)
);
members.put(consumerA, new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
@ -403,7 +430,6 @@ public class RangeAssignorTest {
mkTopicAssignment(topic1Uuid, 2),
mkTopicAssignment(topic2Uuid, 2)
);
members.put(consumerB, new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
@ -411,16 +437,23 @@ public class RangeAssignorTest {
currentAssignmentForB
));
AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS);
GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
GroupSpec groupSpec = new GroupSpecImpl(
members,
HOMOGENEOUS,
invertedTargetAssignment(members)
);
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
subscribedTopicMetadata
);
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(consumerA, mkAssignment(
mkTopicAssignment(topic1Uuid, 0, 1),
mkTopicAssignment(topic2Uuid, 0, 1)
));
expectedAssignment.put(consumerB, mkAssignment(
mkTopicAssignment(topic1Uuid, 2, 3),
mkTopicAssignment(topic2Uuid, 2, 3)
@ -445,15 +478,12 @@ public class RangeAssignorTest {
createPartitionRacks(3)
));
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
Map<String, AssignmentMemberSpec> members = new TreeMap<>();
Map<Uuid, Set<Integer>> currentAssignmentForA = mkAssignment(
mkTopicAssignment(topic1Uuid, 0, 1),
mkTopicAssignment(topic2Uuid, 0, 1)
);
members.put(consumerA, new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
@ -465,7 +495,6 @@ public class RangeAssignorTest {
mkTopicAssignment(topic1Uuid, 2),
mkTopicAssignment(topic2Uuid, 2)
);
members.put(consumerB, new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
@ -481,21 +510,27 @@ public class RangeAssignorTest {
Collections.emptyMap()
));
AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS);
GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
GroupSpec groupSpec = new GroupSpecImpl(
members,
HOMOGENEOUS,
invertedTargetAssignment(members)
);
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
subscribedTopicMetadata
);
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(consumerA, mkAssignment(
mkTopicAssignment(topic1Uuid, 0),
mkTopicAssignment(topic2Uuid, 0)
));
expectedAssignment.put(consumerB, mkAssignment(
mkTopicAssignment(topic1Uuid, 2),
mkTopicAssignment(topic2Uuid, 2)
));
expectedAssignment.put(consumerC, mkAssignment(
mkTopicAssignment(topic1Uuid, 1),
mkTopicAssignment(topic2Uuid, 1)
@ -521,15 +556,12 @@ public class RangeAssignorTest {
createPartitionRacks(3)
));
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
Map<String, AssignmentMemberSpec> members = new TreeMap<>();
Map<Uuid, Set<Integer>> currentAssignmentForA = mkAssignment(
mkTopicAssignment(topic1Uuid, 0, 1),
mkTopicAssignment(topic2Uuid, 0, 1)
);
members.put(consumerA, new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
@ -541,7 +573,6 @@ public class RangeAssignorTest {
mkTopicAssignment(topic1Uuid, 2),
mkTopicAssignment(topic2Uuid, 2)
);
members.put(consumerB, new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
@ -557,21 +588,27 @@ public class RangeAssignorTest {
Collections.emptyMap()
));
AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS);
GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
GroupSpec groupSpec = new GroupSpecImpl(
members,
HETEROGENEOUS,
invertedTargetAssignment(members)
);
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
subscribedTopicMetadata
);
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(consumerA, mkAssignment(
mkTopicAssignment(topic1Uuid, 0, 1),
mkTopicAssignment(topic2Uuid, 0, 1)
));
expectedAssignment.put(consumerB, mkAssignment(
mkTopicAssignment(topic1Uuid, 2),
mkTopicAssignment(topic2Uuid, 2)
));
expectedAssignment.put(consumerC, mkAssignment(
mkTopicAssignment(topic1Uuid, 3)
));
@ -595,8 +632,6 @@ public class RangeAssignorTest {
createPartitionRacks(3)
));
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
Map<String, AssignmentMemberSpec> members = new TreeMap<>();
// Consumer A was removed
@ -604,7 +639,6 @@ public class RangeAssignorTest {
mkTopicAssignment(topic1Uuid, 2),
mkTopicAssignment(topic2Uuid, 2)
);
members.put(consumerB, new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
@ -612,11 +646,19 @@ public class RangeAssignorTest {
currentAssignmentForB
));
AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS);
GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
GroupSpec groupSpec = new GroupSpecImpl(
members,
HOMOGENEOUS,
invertedTargetAssignment(members)
);
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
subscribedTopicMetadata
);
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(consumerB, mkAssignment(
mkTopicAssignment(topic1Uuid, 0, 1, 2),
mkTopicAssignment(topic2Uuid, 0, 1, 2)
@ -647,18 +689,14 @@ public class RangeAssignorTest {
createPartitionRacks(2)
));
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
Map<String, AssignmentMemberSpec> members = new TreeMap<>();
// Let initial subscriptions be A -> T1, T2 // B -> T2 // C -> T2, T3
// Change the subscriptions to A -> T1 // B -> T1, T2, T3 // C -> T2
Map<String, AssignmentMemberSpec> members = new TreeMap<>();
Map<Uuid, Set<Integer>> currentAssignmentForA = mkAssignment(
mkTopicAssignment(topic1Uuid, 0, 1, 2),
mkTopicAssignment(topic2Uuid, 0)
);
members.put(consumerA, new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
@ -669,7 +707,6 @@ public class RangeAssignorTest {
Map<Uuid, Set<Integer>> currentAssignmentForB = mkAssignment(
mkTopicAssignment(topic2Uuid, 1)
);
members.put(consumerB, new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
@ -681,7 +718,6 @@ public class RangeAssignorTest {
mkTopicAssignment(topic2Uuid, 2),
mkTopicAssignment(topic3Uuid, 0, 1)
);
members.put(consumerC, new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
@ -689,21 +725,27 @@ public class RangeAssignorTest {
currentAssignmentForC
));
AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS);
GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
GroupSpec groupSpec = new GroupSpecImpl(
members,
HETEROGENEOUS,
invertedTargetAssignment(members)
);
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
subscribedTopicMetadata
);
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(consumerA, mkAssignment(
mkTopicAssignment(topic1Uuid, 0, 1)
));
expectedAssignment.put(consumerB, mkAssignment(
mkTopicAssignment(topic1Uuid, 2),
mkTopicAssignment(topic2Uuid, 0, 1),
mkTopicAssignment(topic3Uuid, 0, 1)
));
expectedAssignment.put(consumerC, mkAssignment(
mkTopicAssignment(topic2Uuid, 2)
));
@ -711,7 +753,10 @@ public class RangeAssignorTest {
assertAssignment(expectedAssignment, computedAssignment);
}
private void assertAssignment(Map<String, Map<Uuid, Set<Integer>>> expectedAssignment, GroupAssignment computedGroupAssignment) {
private void assertAssignment(
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment,
GroupAssignment computedGroupAssignment
) {
assertEquals(expectedAssignment.size(), computedGroupAssignment.members().size());
for (String memberId : computedGroupAssignment.members().keySet()) {
Map<Uuid, Set<Integer>> computedAssignmentForMember = computedGroupAssignment.members().get(memberId).targetPartitions();

View File

@ -834,6 +834,90 @@ public class ConsumerGroupTest {
);
}
@Test
public void testUpdateInvertedAssignment() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
GroupCoordinatorMetricsShard metricsShard = mock(GroupCoordinatorMetricsShard.class);
ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, "test-group", metricsShard);
Uuid topicId = Uuid.randomUuid();
String memberId1 = "member1";
String memberId2 = "member2";
// Initial assignment for member1
Assignment initialAssignment = new Assignment(Collections.singletonMap(
topicId,
new HashSet<>(Collections.singletonList(0))
));
consumerGroup.updateTargetAssignment(memberId1, initialAssignment);
// Verify that partition 0 is assigned to member1.
assertEquals(
mkMap(
mkEntry(topicId, mkMap(mkEntry(0, memberId1)))
),
consumerGroup.invertedTargetAssignment()
);
// New assignment for member1
Assignment newAssignment = new Assignment(Collections.singletonMap(
topicId,
new HashSet<>(Collections.singletonList(1))
));
consumerGroup.updateTargetAssignment(memberId1, newAssignment);
// Verify that partition 0 is no longer assigned and partition 1 is assigned to member1
assertEquals(
mkMap(
mkEntry(topicId, mkMap(mkEntry(1, memberId1)))
),
consumerGroup.invertedTargetAssignment()
);
// New assignment for member2 to add partition 1
Assignment newAssignment2 = new Assignment(Collections.singletonMap(
topicId,
new HashSet<>(Collections.singletonList(1))
));
consumerGroup.updateTargetAssignment(memberId2, newAssignment2);
// Verify that partition 1 is assigned to member2
assertEquals(
mkMap(
mkEntry(topicId, mkMap(mkEntry(1, memberId2)))
),
consumerGroup.invertedTargetAssignment()
);
// New assignment for member1 to revoke partition 1 and assign partition 0
Assignment newAssignment1 = new Assignment(Collections.singletonMap(
topicId,
new HashSet<>(Collections.singletonList(0))
));
consumerGroup.updateTargetAssignment(memberId1, newAssignment1);
// Verify that partition 1 is still assigned to member2 and partition 0 is assigned to member1
assertEquals(
mkMap(
mkEntry(topicId, mkMap(
mkEntry(0, memberId1),
mkEntry(1, memberId2)
))
),
consumerGroup.invertedTargetAssignment()
);
// Test remove target assignment for member1
consumerGroup.removeTargetAssignment(memberId1);
// Verify that partition 0 is no longer assigned and partition 1 is still assigned to member2
assertEquals(
mkMap(
mkEntry(topicId, mkMap(mkEntry(1, memberId2)))
),
consumerGroup.invertedTargetAssignment()
);
}
@Test
public void testMetadataRefreshDeadline() {
MockTime time = new MockTime();

View File

@ -17,9 +17,10 @@
package org.apache.kafka.coordinator.group.consumer;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.AssignmentTestUtil;
import org.apache.kafka.coordinator.group.MetadataImageBuilder;
import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
import org.apache.kafka.coordinator.group.assignor.GroupSpecImpl;
import org.apache.kafka.coordinator.group.assignor.SubscriptionType;
import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
@ -207,8 +208,11 @@ public class TargetAssignmentBuilderTest {
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadataMap);
SubscriptionType subscriptionType = HOMOGENEOUS;
// Prepare the member assignments per topic partition.
Map<Uuid, Map<Integer, String>> invertedTargetAssignment = AssignmentTestUtil.invertedTargetAssignment(memberSpecs);
// Prepare the expected assignment spec.
AssignmentSpec assignmentSpec = new AssignmentSpec(memberSpecs, subscriptionType);
GroupSpecImpl groupSpec = new GroupSpecImpl(memberSpecs, subscriptionType, invertedTargetAssignment);
// We use `any` here to always return an assignment but use `verify` later on
// to ensure that the input was correct.
@ -222,6 +226,7 @@ public class TargetAssignmentBuilderTest {
.withSubscriptionMetadata(subscriptionMetadata)
.withSubscriptionType(subscriptionType)
.withTargetAssignment(targetAssignment)
.withInvertedTargetAssignment(invertedTargetAssignment)
.withTopicsImage(topicsImage);
// Add the updated members or delete the deleted members.
@ -239,7 +244,7 @@ public class TargetAssignmentBuilderTest {
// Verify that the assignor was called once with the expected
// assignment spec.
verify(assignor, times(1))
.assign(assignmentSpec, subscribedTopicMetadata);
.assign(groupSpec, subscribedTopicMetadata);
return result;
}

View File

@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.jmh.assignor;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
import org.apache.kafka.image.MetadataDelta;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
public class AssignorBenchmarkUtils {
/**
* Generate a reverse look up map of partition to member target assignments from the given member spec.
*
* @param groupAssignment The group assignment.
* @return Map of topic partition to member assignments.
*/
public static Map<Uuid, Map<Integer, String>> computeInvertedTargetAssignment(
GroupAssignment groupAssignment
) {
Map<Uuid, Map<Integer, String>> invertedTargetAssignment = new HashMap<>();
for (Map.Entry<String, MemberAssignment> memberEntry : groupAssignment.members().entrySet()) {
String memberId = memberEntry.getKey();
Map<Uuid, Set<Integer>> topicsAndPartitions = memberEntry.getValue().targetPartitions();
for (Map.Entry<Uuid, Set<Integer>> topicEntry : topicsAndPartitions.entrySet()) {
Uuid topicId = topicEntry.getKey();
Set<Integer> partitions = topicEntry.getValue();
Map<Integer, String> partitionMap = invertedTargetAssignment.computeIfAbsent(topicId, k -> new HashMap<>());
for (Integer partitionId : partitions) {
partitionMap.put(partitionId, memberId);
}
}
}
return invertedTargetAssignment;
}
public static void addTopic(
MetadataDelta delta,
Uuid topicId,
String topicName,
int numPartitions
) {
// For testing purposes, the following criteria are used:
// - Number of replicas for each partition: 2
// - Number of brokers available in the cluster: 4
delta.replay(new TopicRecord().setTopicId(topicId).setName(topicName));
for (int i = 0; i < numPartitions; i++) {
delta.replay(new PartitionRecord()
.setTopicId(topicId)
.setPartitionId(i)
.setReplicas(Arrays.asList(i % 4, (i + 1) % 4)));
}
}
}

View File

@ -18,7 +18,7 @@ package org.apache.kafka.jmh.assignor;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
import org.apache.kafka.coordinator.group.assignor.GroupSpecImpl;
import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
@ -29,10 +29,12 @@ import org.apache.kafka.coordinator.group.consumer.TopicIds;
import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.image.TopicsImage;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
@ -119,7 +121,7 @@ public class ServerSideAssignorBenchmark {
private static final int MAX_BUCKET_COUNT = 5;
private AssignmentSpec assignmentSpec;
private GroupSpecImpl groupSpec;
private SubscribedTopicDescriber subscribedTopicDescriber;
@ -160,7 +162,13 @@ public class ServerSideAssignorBenchmark {
partitionsPerTopicCount,
partitionRacks
));
TargetAssignmentBuilderBenchmark.addTopic(delta, topicUuid, topicName, partitionsPerTopicCount);
AssignorBenchmarkUtils.addTopic(
delta,
topicUuid,
topicName,
partitionsPerTopicCount
);
}
topicsImage = delta.apply(MetadataProvenance.EMPTY).topics();
@ -207,7 +215,7 @@ public class ServerSideAssignorBenchmark {
}
}
this.assignmentSpec = new AssignmentSpec(members, subscriptionType);
this.groupSpec = new GroupSpecImpl(members, subscriptionType, Collections.emptyMap());
}
private Optional<String> rackId(int memberIndex) {
@ -243,16 +251,23 @@ public class ServerSideAssignorBenchmark {
}
private void simulateIncrementalRebalance() {
GroupAssignment initialAssignment = partitionAssignor.assign(assignmentSpec, subscribedTopicDescriber);
GroupAssignment initialAssignment = partitionAssignor.assign(groupSpec, subscribedTopicDescriber);
Map<String, MemberAssignment> members = initialAssignment.members();
Map<Uuid, Map<Integer, String>> invertedTargetAssignment = AssignorBenchmarkUtils.computeInvertedTargetAssignment(initialAssignment);
Map<String, AssignmentMemberSpec> updatedMembers = new HashMap<>();
members.forEach((memberId, memberAssignment) -> {
AssignmentMemberSpec memberSpec = assignmentSpec.members().get(memberId);
groupSpec.members().forEach((memberId, assignmentMemberSpec) -> {
MemberAssignment memberAssignment = members.getOrDefault(
memberId,
new MemberAssignment(Collections.emptyMap())
);
updatedMembers.put(memberId, new AssignmentMemberSpec(
memberSpec.instanceId(),
memberSpec.rackId(),
memberSpec.subscribedTopicIds(),
assignmentMemberSpec.instanceId(),
assignmentMemberSpec.rackId(),
assignmentMemberSpec.subscribedTopicIds(),
memberAssignment.targetPartitions()
));
});
@ -272,13 +287,13 @@ public class ServerSideAssignorBenchmark {
Collections.emptyMap()
));
assignmentSpec = new AssignmentSpec(updatedMembers, subscriptionType);
groupSpec = new GroupSpecImpl(updatedMembers, subscriptionType, invertedTargetAssignment);
}
@Benchmark
@Threads(1)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void doAssignment() {
partitionAssignor.assign(assignmentSpec, subscribedTopicDescriber);
partitionAssignor.assign(groupSpec, subscribedTopicDescriber);
}
}

View File

@ -17,10 +17,8 @@
package org.apache.kafka.jmh.assignor;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
import org.apache.kafka.coordinator.group.assignor.GroupSpecImpl;
import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
@ -36,6 +34,7 @@ import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.image.TopicsImage;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
@ -51,7 +50,6 @@ import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@ -90,7 +88,9 @@ public class TargetAssignmentBuilderBenchmark {
private TargetAssignmentBuilder targetAssignmentBuilder;
private AssignmentSpec assignmentSpec;
private GroupSpecImpl groupSpec;
private Map<Uuid, Map<Integer, String>> invertedTargetAssignment;
private final List<String> allTopicNames = new ArrayList<>();
@ -104,7 +104,7 @@ public class TargetAssignmentBuilderBenchmark {
subscriptionMetadata = generateMockSubscriptionMetadata();
Map<String, ConsumerGroupMember> members = generateMockMembers();
Map<String, Assignment> existingTargetAssignment = generateMockInitialTargetAssignment();
Map<String, Assignment> existingTargetAssignment = generateMockInitialTargetAssignmentAndUpdateInvertedTargetAssignment();
ConsumerGroupMember newMember = new ConsumerGroupMember.Builder("newMember")
.setSubscribedTopicNames(allTopicNames)
@ -113,8 +113,9 @@ public class TargetAssignmentBuilderBenchmark {
targetAssignmentBuilder = new TargetAssignmentBuilder(GROUP_ID, GROUP_EPOCH, partitionAssignor)
.withMembers(members)
.withSubscriptionMetadata(subscriptionMetadata)
.withTargetAssignment(existingTargetAssignment)
.withSubscriptionType(HOMOGENEOUS)
.withTargetAssignment(existingTargetAssignment)
.withInvertedTargetAssignment(invertedTargetAssignment)
.withTopicsImage(topicsImage)
.addOrUpdateMember(newMember.memberId(), newMember);
}
@ -148,14 +149,20 @@ public class TargetAssignmentBuilderBenchmark {
Collections.emptyMap()
);
subscriptionMetadata.put(topicName, metadata);
addTopic(delta, topicId, topicName, partitionsPerTopicCount);
AssignorBenchmarkUtils.addTopic(
delta,
topicId,
topicName,
partitionsPerTopicCount
);
}
topicsImage = delta.apply(MetadataProvenance.EMPTY).topics();
return subscriptionMetadata;
}
private Map<String, Assignment> generateMockInitialTargetAssignment() {
private Map<String, Assignment> generateMockInitialTargetAssignmentAndUpdateInvertedTargetAssignment() {
Map<Uuid, TopicMetadata> topicMetadataMap = new HashMap<>(topicCount);
subscriptionMetadata.forEach((topicName, topicMetadata) ->
topicMetadataMap.put(
@ -167,9 +174,10 @@ public class TargetAssignmentBuilderBenchmark {
createAssignmentSpec();
GroupAssignment groupAssignment = partitionAssignor.assign(
assignmentSpec,
groupSpec,
new SubscribedTopicMetadata(topicMetadataMap)
);
invertedTargetAssignment = AssignorBenchmarkUtils.computeInvertedTargetAssignment(groupAssignment);
Map<String, Assignment> initialTargetAssignment = new HashMap<>(memberCount);
@ -198,25 +206,7 @@ public class TargetAssignmentBuilderBenchmark {
Collections.emptyMap()
));
}
assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS);
}
public static void addTopic(
MetadataDelta delta,
Uuid topicId,
String topicName,
int numPartitions
) {
// For testing purposes, the following criteria are used:
// - Number of replicas for each partition: 2
// - Number of brokers available in the cluster: 4
delta.replay(new TopicRecord().setTopicId(topicId).setName(topicName));
for (int i = 0; i < numPartitions; i++) {
delta.replay(new PartitionRecord()
.setTopicId(topicId)
.setPartitionId(i)
.setReplicas(Arrays.asList(i % 4, (i + 1) % 4)));
}
groupSpec = new GroupSpecImpl(members, HOMOGENEOUS, Collections.emptyMap());
}
@Benchmark