KAFKA-19344: Replace desc.assignablePartitions with spec.isPartitionAssignable. (#19838)
CI / build (push) Waiting to run Details

- A new method `assignablePartitions` was added to the
`SubscribedTopicDescriber`in https://github.com/apache/kafka/pull/19026.
This method was required for computing assignments for share groups
(KIP-932).
- However, since the describer is a public interface and is used to
encapsulate methods which return all subscribed partitions (KIP-848),
`assignablePartitions` is deemed inconsistent with this interface.
- Hence, this PR extends the `GroupSpec` interface to add a method
`isPartitionAssignable` which will serve the same purpose. The
`assignablePartitions` has been removed from the describer.
- Tests have been updated for the assigners and spec and removed from
describer as required.

Reviewers: Andrew Schofield <aschofield@confluent.io>, David Jacot
 <djacot@confluent.io>
This commit is contained in:
Sushant Mahajan 2025-05-29 00:57:29 +05:30 committed by GitHub
parent 9dd4cff2d7
commit 383a9ff9df
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 370 additions and 155 deletions

View File

@ -35,11 +35,27 @@ public interface GroupSpec {
SubscriptionType subscriptionType(); SubscriptionType subscriptionType();
/** /**
* Determine whether a topic id and partition have been assigned to
* a member. This method functions the same for all types of groups.
*
* @param topicId Uuid corresponding to the partition's topic.
* @param partitionId Partition Id within topic.
* @return True, if the partition is currently assigned to a member. * @return True, if the partition is currently assigned to a member.
* False, otherwise. * False, otherwise.
*/ */
boolean isPartitionAssigned(Uuid topicId, int partitionId); boolean isPartitionAssigned(Uuid topicId, int partitionId);
/**
* For share groups, a partition can only be assigned once its initialization is complete.
* For other group types, this initialization is not required and all partitions returned
* by the SubscribedTopicDescriber are always assignable.
*
* @param topicId Uuid corresponding to the partition's topic.
* @param partitionId Partition Id within topic.
* @return True, if the partition is assignable.
*/
boolean isPartitionAssignable(Uuid topicId, int partitionId);
/** /**
* Gets the member subscription specification for a member. * Gets the member subscription specification for a member.
* *

View File

@ -43,14 +43,4 @@ public interface SubscribedTopicDescriber {
* If the topic Id does not exist, an empty set is returned. * If the topic Id does not exist, an empty set is returned.
*/ */
Set<String> racksForPartition(Uuid topicId, int partition); Set<String> racksForPartition(Uuid topicId, int partition);
/**
* Returns a set of partitions corresponding to a topic id which
* are allowlisted based on some criteria. For example, for share groups
* only partitions which have been initialized are returned.
*
* @param topicId The uuid of the topic
* @return The set of integers representing assignable partitions. Could be empty, or contain all partitions.
*/
Set<Integer> assignablePartitions(Uuid topicId);
} }

View File

@ -90,11 +90,11 @@ public class SimpleAssignor implements ShareGroupPartitionAssignor {
// Subscribed topic partitions for the share group. // Subscribed topic partitions for the share group.
List<TopicIdPartition> targetPartitions = computeTargetPartitions( List<TopicIdPartition> targetPartitions = computeTargetPartitions(
subscribedTopicIds, subscribedTopicDescriber); groupSpec, subscribedTopicIds, subscribedTopicDescriber);
// The current assignment from topic partition to members. // The current assignment from topic partition to members.
Map<TopicIdPartition, List<String>> currentAssignment = currentAssignment(groupSpec); Map<TopicIdPartition, List<String>> currentAssignment = currentAssignment(groupSpec);
return newAssignmentHomogeneous(groupSpec, subscribedTopicIds, targetPartitions, currentAssignment, subscribedTopicDescriber); return newAssignmentHomogeneous(groupSpec, subscribedTopicIds, targetPartitions, currentAssignment);
} }
private GroupAssignment assignHeterogeneous( private GroupAssignment assignHeterogeneous(
@ -109,13 +109,13 @@ public class SimpleAssignor implements ShareGroupPartitionAssignor {
// Subscribed topic partitions for the share group member. // Subscribed topic partitions for the share group member.
List<TopicIdPartition> targetPartitions = computeTargetPartitions( List<TopicIdPartition> targetPartitions = computeTargetPartitions(
spec.subscribedTopicIds(), subscribedTopicDescriber); groupSpec, spec.subscribedTopicIds(), subscribedTopicDescriber);
memberToPartitionsSubscription.put(memberId, targetPartitions); memberToPartitionsSubscription.put(memberId, targetPartitions);
} }
// The current assignment from topic partition to members. // The current assignment from topic partition to members.
Map<TopicIdPartition, List<String>> currentAssignment = currentAssignment(groupSpec); Map<TopicIdPartition, List<String>> currentAssignment = currentAssignment(groupSpec);
return newAssignmentHeterogeneous(groupSpec, memberToPartitionsSubscription, currentAssignment, subscribedTopicDescriber); return newAssignmentHeterogeneous(groupSpec, memberToPartitionsSubscription, currentAssignment);
} }
/** /**
@ -146,8 +146,7 @@ public class SimpleAssignor implements ShareGroupPartitionAssignor {
GroupSpec groupSpec, GroupSpec groupSpec,
Set<Uuid> subscribedTopicIds, Set<Uuid> subscribedTopicIds,
List<TopicIdPartition> targetPartitions, List<TopicIdPartition> targetPartitions,
Map<TopicIdPartition, List<String>> currentAssignment, Map<TopicIdPartition, List<String>> currentAssignment
SubscribedTopicDescriber subscribedTopicDescriber
) { ) {
// For entirely balanced assignment, we would expect (numTargetPartitions / numGroupMembers) partitions per member, rounded upwards. // For entirely balanced assignment, we would expect (numTargetPartitions / numGroupMembers) partitions per member, rounded upwards.
// That can be expressed as Math.ceil(numTargetPartitions / (double) numGroupMembers) // That can be expressed as Math.ceil(numTargetPartitions / (double) numGroupMembers)
@ -196,7 +195,7 @@ public class SimpleAssignor implements ShareGroupPartitionAssignor {
.filter(targetPartition -> !finalAssignmentByPartition.containsKey(targetPartition)) .filter(targetPartition -> !finalAssignmentByPartition.containsKey(targetPartition))
.toList(); .toList();
roundRobinAssignmentWithCount(groupSpec.memberIds(), unassignedPartitions, finalAssignment, desiredAssignmentCount, subscribedTopicDescriber); roundRobinAssignmentWithCount(groupSpec.memberIds(), unassignedPartitions, finalAssignment, desiredAssignmentCount);
return groupAssignment(finalAssignment, groupSpec.memberIds()); return groupAssignment(finalAssignment, groupSpec.memberIds());
} }
@ -211,8 +210,7 @@ public class SimpleAssignor implements ShareGroupPartitionAssignor {
private GroupAssignment newAssignmentHeterogeneous( private GroupAssignment newAssignmentHeterogeneous(
GroupSpec groupSpec, GroupSpec groupSpec,
Map<String, List<TopicIdPartition>> memberToPartitionsSubscription, Map<String, List<TopicIdPartition>> memberToPartitionsSubscription,
Map<TopicIdPartition, List<String>> currentAssignment, Map<TopicIdPartition, List<String>> currentAssignment
SubscribedTopicDescriber subscribedTopicDescriber
) { ) {
int numGroupMembers = groupSpec.memberIds().size(); int numGroupMembers = groupSpec.memberIds().size();
@ -240,7 +238,7 @@ public class SimpleAssignor implements ShareGroupPartitionAssignor {
}); });
unassignedPartitions.keySet().forEach(unassignedTopic -> unassignedPartitions.keySet().forEach(unassignedTopic ->
roundRobinAssignment(topicToMemberSubscription.get(unassignedTopic), unassignedPartitions.get(unassignedTopic), newAssignment, subscribedTopicDescriber)); roundRobinAssignment(topicToMemberSubscription.get(unassignedTopic), unassignedPartitions.get(unassignedTopic), newAssignment));
// Step 3: We combine current assignment and new assignment. // Step 3: We combine current assignment and new assignment.
Map<String, Set<TopicIdPartition>> finalAssignment = newHashMap(numGroupMembers); Map<String, Set<TopicIdPartition>> finalAssignment = newHashMap(numGroupMembers);
@ -288,14 +286,12 @@ public class SimpleAssignor implements ShareGroupPartitionAssignor {
* @param partitionsToAssign The subscribed topic partitions which needs assignment. * @param partitionsToAssign The subscribed topic partitions which needs assignment.
* @param assignment The existing assignment by topic partition. We need to pass it as a parameter because this * @param assignment The existing assignment by topic partition. We need to pass it as a parameter because this
* method can be called multiple times for heterogeneous assignment. * method can be called multiple times for heterogeneous assignment.
* @param subscribedTopicDescriber The topic describer to fetch assignable partitions from.
*/ */
// Visible for testing // Visible for testing
void roundRobinAssignment( void roundRobinAssignment(
Collection<String> memberIds, Collection<String> memberIds,
List<TopicIdPartition> partitionsToAssign, List<TopicIdPartition> partitionsToAssign,
Map<TopicIdPartition, List<String>> assignment, Map<TopicIdPartition, List<String>> assignment
SubscribedTopicDescriber subscribedTopicDescriber
) { ) {
// We iterate through the target partitions and assign a memberId to them. In case we run out of members (members < targetPartitions), // We iterate through the target partitions and assign a memberId to them. In case we run out of members (members < targetPartitions),
// we again start from the starting index of memberIds. // we again start from the starting index of memberIds.
@ -305,9 +301,7 @@ public class SimpleAssignor implements ShareGroupPartitionAssignor {
memberIdIterator = memberIds.iterator(); memberIdIterator = memberIds.iterator();
} }
String memberId = memberIdIterator.next(); String memberId = memberIdIterator.next();
if (subscribedTopicDescriber.assignablePartitions(topicPartition.topicId()).contains(topicPartition.partitionId())) { assignment.computeIfAbsent(topicPartition, k -> new ArrayList<>()).add(memberId);
assignment.computeIfAbsent(topicPartition, k -> new ArrayList<>()).add(memberId);
}
} }
} }
@ -320,14 +314,12 @@ public class SimpleAssignor implements ShareGroupPartitionAssignor {
* @param desiredAssignmentCount The number of partitions which can be assigned to each member to give even balance. * @param desiredAssignmentCount The number of partitions which can be assigned to each member to give even balance.
* Note that this number can be exceeded by one to allow for situations * Note that this number can be exceeded by one to allow for situations
* in which we have hashing collisions. * in which we have hashing collisions.
* @param subscribedTopicDescriber The topic describer to fetch assignable partitions from.
*/ */
void roundRobinAssignmentWithCount( void roundRobinAssignmentWithCount(
Collection<String> memberIds, Collection<String> memberIds,
List<TopicIdPartition> partitionsToAssign, List<TopicIdPartition> partitionsToAssign,
Map<String, Set<TopicIdPartition>> assignment, Map<String, Set<TopicIdPartition>> assignment,
int desiredAssignmentCount, int desiredAssignmentCount
SubscribedTopicDescriber subscribedTopicDescriber
) { ) {
Collection<String> memberIdsCopy = new LinkedHashSet<>(memberIds); Collection<String> memberIdsCopy = new LinkedHashSet<>(memberIds);
@ -337,9 +329,6 @@ public class SimpleAssignor implements ShareGroupPartitionAssignor {
ListIterator<TopicIdPartition> partitionListIterator = partitionsToAssign.listIterator(); ListIterator<TopicIdPartition> partitionListIterator = partitionsToAssign.listIterator();
while (partitionListIterator.hasNext()) { while (partitionListIterator.hasNext()) {
TopicIdPartition partition = partitionListIterator.next(); TopicIdPartition partition = partitionListIterator.next();
if (!subscribedTopicDescriber.assignablePartitions(partition.topicId()).contains(partition.partitionId())) {
continue;
}
if (!memberIdIterator.hasNext()) { if (!memberIdIterator.hasNext()) {
memberIdIterator = memberIdsCopy.iterator(); memberIdIterator = memberIdsCopy.iterator();
if (memberIdsCopy.isEmpty()) { if (memberIdsCopy.isEmpty()) {
@ -360,6 +349,7 @@ public class SimpleAssignor implements ShareGroupPartitionAssignor {
} }
private List<TopicIdPartition> computeTargetPartitions( private List<TopicIdPartition> computeTargetPartitions(
GroupSpec groupSpec,
Set<Uuid> subscribedTopicIds, Set<Uuid> subscribedTopicIds,
SubscribedTopicDescriber subscribedTopicDescriber SubscribedTopicDescriber subscribedTopicDescriber
) { ) {
@ -373,13 +363,11 @@ public class SimpleAssignor implements ShareGroupPartitionAssignor {
); );
} }
// Since we are returning a list here, we can keep it sorted for (int partition = 0; partition < numPartitions; partition++) {
// to add determinism while testing and iterating. if (groupSpec.isPartitionAssignable(topicId, partition)) {
targetPartitions.addAll(subscribedTopicDescriber.assignablePartitions(topicId).stream() targetPartitions.add(new TopicIdPartition(topicId, partition));
.sorted() }
.map(partition -> new TopicIdPartition(topicId, partition)) }
.toList()
);
}); });
return targetPartitions; return targetPartitions;
} }

View File

@ -25,6 +25,8 @@ import org.apache.kafka.coordinator.group.api.assignor.SubscriptionType;
import java.util.Collection; import java.util.Collection;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Optional;
import java.util.Set;
/** /**
* The assignment specification for a modern group. * The assignment specification for a modern group.
@ -46,14 +48,31 @@ public class GroupSpecImpl implements GroupSpec {
*/ */
private final Map<Uuid, Map<Integer, String>> invertedMemberAssignment; private final Map<Uuid, Map<Integer, String>> invertedMemberAssignment;
/**
* In case of share groups, this map will be queried to decide
* which partition is assignable. For non-share groups,
* this optional should be empty.
*/
private final Optional<Map<Uuid, Set<Integer>>> topicPartitionAllowedMap;
public GroupSpecImpl( public GroupSpecImpl(
Map<String, MemberSubscriptionAndAssignmentImpl> members, Map<String, MemberSubscriptionAndAssignmentImpl> members,
SubscriptionType subscriptionType, SubscriptionType subscriptionType,
Map<Uuid, Map<Integer, String>> invertedMemberAssignment Map<Uuid, Map<Integer, String>> invertedMemberAssignment
) {
this(members, subscriptionType, invertedMemberAssignment, Optional.empty());
}
public GroupSpecImpl(
Map<String, MemberSubscriptionAndAssignmentImpl> members,
SubscriptionType subscriptionType,
Map<Uuid, Map<Integer, String>> invertedMemberAssignment,
Optional<Map<Uuid, Set<Integer>>> topicPartitionAllowedMap
) { ) {
this.members = Objects.requireNonNull(members); this.members = Objects.requireNonNull(members);
this.subscriptionType = Objects.requireNonNull(subscriptionType); this.subscriptionType = Objects.requireNonNull(subscriptionType);
this.invertedMemberAssignment = Objects.requireNonNull(invertedMemberAssignment); this.invertedMemberAssignment = Objects.requireNonNull(invertedMemberAssignment);
this.topicPartitionAllowedMap = Objects.requireNonNull(topicPartitionAllowedMap);
} }
/** /**
@ -84,6 +103,14 @@ public class GroupSpecImpl implements GroupSpec {
return partitionMap.containsKey(partitionId); return partitionMap.containsKey(partitionId);
} }
/**
* {@inheritDoc}
*/
@Override
public boolean isPartitionAssignable(Uuid topicId, int partitionId) {
return topicPartitionAllowedMap.map(allowedMap -> allowedMap.containsKey(topicId) && allowedMap.get(topicId).contains(partitionId)).orElse(true);
}
/** /**
* {@inheritDoc} * {@inheritDoc}
*/ */
@ -111,26 +138,25 @@ public class GroupSpecImpl implements GroupSpec {
@Override @Override
public boolean equals(Object o) { public boolean equals(Object o) {
if (this == o) return true; if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false; if (!(o instanceof GroupSpecImpl groupSpec)) return false;
GroupSpecImpl that = (GroupSpecImpl) o; return Objects.equals(members, groupSpec.members) &&
return subscriptionType == that.subscriptionType && subscriptionType == groupSpec.subscriptionType &&
members.equals(that.members) && Objects.equals(invertedMemberAssignment, groupSpec.invertedMemberAssignment) &&
invertedMemberAssignment.equals(that.invertedMemberAssignment); Objects.equals(topicPartitionAllowedMap, groupSpec.topicPartitionAllowedMap);
} }
@Override @Override
public int hashCode() { public int hashCode() {
int result = members.hashCode(); return Objects.hash(members, subscriptionType, invertedMemberAssignment, topicPartitionAllowedMap);
result = 31 * result + subscriptionType.hashCode();
result = 31 * result + invertedMemberAssignment.hashCode();
return result;
} }
@Override @Override
public String toString() { public String toString() {
return "GroupSpecImpl(members=" + members + return "GroupSpecImpl(" +
"members=" + members +
", subscriptionType=" + subscriptionType + ", subscriptionType=" + subscriptionType +
", invertedMemberAssignment=" + invertedMemberAssignment + ", invertedMemberAssignment=" + invertedMemberAssignment +
", topicPartitionAllowedMap=" + topicPartitionAllowedMap +
')'; ')';
} }
} }

View File

@ -26,9 +26,7 @@ import org.apache.kafka.metadata.PartitionRegistration;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Optional;
import java.util.Set; import java.util.Set;
/** /**
@ -36,27 +34,13 @@ import java.util.Set;
* topic and partition metadata for the topics that the modern group is subscribed to. * topic and partition metadata for the topics that the modern group is subscribed to.
*/ */
public class SubscribedTopicDescriberImpl implements SubscribedTopicDescriber { public class SubscribedTopicDescriberImpl implements SubscribedTopicDescriber {
/**
* The map of topic Ids to the set of allowed partitions for each topic.
* If this is empty, all partitions are allowed.
*/
private final Optional<Map<Uuid, Set<Integer>>> topicPartitionAllowedMap;
/** /**
* The metadata image that contains the latest metadata information. * The metadata image that contains the latest metadata information.
*/ */
private final MetadataImage metadataImage; private final MetadataImage metadataImage;
public SubscribedTopicDescriberImpl(MetadataImage metadataImage) { public SubscribedTopicDescriberImpl(MetadataImage metadataImage) {
this(metadataImage, Optional.empty());
}
public SubscribedTopicDescriberImpl(
MetadataImage metadataImage,
Optional<Map<Uuid, Set<Integer>>> topicPartitionAllowedMap
) {
this.metadataImage = Objects.requireNonNull(metadataImage); this.metadataImage = Objects.requireNonNull(metadataImage);
this.topicPartitionAllowedMap = Objects.requireNonNull(topicPartitionAllowedMap);
} }
/** /**
@ -100,50 +84,23 @@ public class SubscribedTopicDescriberImpl implements SubscribedTopicDescriber {
return Set.of(); return Set.of();
} }
/**
* Returns a set of assignable partitions from the metadata image.
* If the allowed partition map is Optional.empty(), all the partitions in the corresponding
* topic image are returned for the argument topic id. If allowed map is empty,
* empty set is returned.
*
* @param topicId The uuid of the topic
* @return Set of integers if assignable partitions available, empty otherwise.
*/
@Override
public Set<Integer> assignablePartitions(Uuid topicId) {
TopicImage topic = metadataImage.topics().getTopic(topicId);
if (topic == null) {
return Set.of();
}
if (topicPartitionAllowedMap.isEmpty()) {
return Collections.unmodifiableSet(topic.partitions().keySet());
}
return topicPartitionAllowedMap.get().getOrDefault(topicId, Set.of());
}
@Override @Override
public boolean equals(Object o) { public boolean equals(Object o) {
if (this == o) return true; if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false; if (o == null || getClass() != o.getClass()) return false;
SubscribedTopicDescriberImpl that = (SubscribedTopicDescriberImpl) o; SubscribedTopicDescriberImpl that = (SubscribedTopicDescriberImpl) o;
if (!topicPartitionAllowedMap.equals(that.topicPartitionAllowedMap)) return false;
return metadataImage.equals(that.metadataImage); return metadataImage.equals(that.metadataImage);
} }
@Override @Override
public int hashCode() { public int hashCode() {
int result = metadataImage.hashCode(); return Objects.hashCode(metadataImage);
result = 31 * result + topicPartitionAllowedMap.hashCode();
return result;
} }
@Override @Override
public String toString() { public String toString() {
return "SubscribedTopicMetadata(" + return "SubscribedTopicMetadata(" +
"metadataImage=" + metadataImage + "metadataImage=" + metadataImage +
", topicPartitionAllowedMap=" + topicPartitionAllowedMap +
')'; ')';
} }
} }

View File

@ -466,9 +466,10 @@ public abstract class TargetAssignmentBuilder<T extends ModernGroupMember, U ext
new GroupSpecImpl( new GroupSpecImpl(
Collections.unmodifiableMap(memberSpecs), Collections.unmodifiableMap(memberSpecs),
subscriptionType, subscriptionType,
invertedTargetAssignment invertedTargetAssignment,
topicAssignablePartitionsMap
), ),
new SubscribedTopicDescriberImpl(metadataImage, topicAssignablePartitionsMap) new SubscribedTopicDescriberImpl(metadataImage)
); );
// Compute delta from previous to new target assignment and create the relevant records. // Compute delta from previous to new target assignment and create the relevant records.

View File

@ -109,4 +109,21 @@ public class GroupSpecImplTest {
assertEquals(topicPartitions, groupSpec.memberAssignment(TEST_MEMBER).partitions()); assertEquals(topicPartitions, groupSpec.memberAssignment(TEST_MEMBER).partitions());
assertEquals(Map.of(), groupSpec.memberAssignment("unknown-member").partitions()); assertEquals(Map.of(), groupSpec.memberAssignment("unknown-member").partitions());
} }
@Test
void testIsPartitionAssignable() {
// Empty allowed map.
assertTrue(groupSpec.isPartitionAssignable(topicId, 1));
// Allowed map with data.
groupSpec = new GroupSpecImpl(
members,
subscriptionType,
invertedTargetAssignment,
Optional.of(Map.of(topicId, Set.of(0)))
);
assertTrue(groupSpec.isPartitionAssignable(topicId, 0));
assertFalse(groupSpec.isPartitionAssignable(topicId, 1));
}
} }

View File

@ -22,7 +22,6 @@ import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.api.assignor.GroupSpec; import org.apache.kafka.coordinator.group.api.assignor.GroupSpec;
import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment; import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException; import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException;
import org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber;
import org.apache.kafka.coordinator.group.modern.Assignment; import org.apache.kafka.coordinator.group.modern.Assignment;
import org.apache.kafka.coordinator.group.modern.GroupSpecImpl; import org.apache.kafka.coordinator.group.modern.GroupSpecImpl;
import org.apache.kafka.coordinator.group.modern.MemberSubscriptionAndAssignmentImpl; import org.apache.kafka.coordinator.group.modern.MemberSubscriptionAndAssignmentImpl;
@ -47,9 +46,6 @@ import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.H
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class SimpleAssignorTest { public class SimpleAssignorTest {
@ -225,6 +221,130 @@ public class SimpleAssignorTest {
assertAssignment(expectedAssignment, computedAssignment); assertAssignment(expectedAssignment, computedAssignment);
} }
@Test
public void testAssignWithTwoMembersAndTwoTopicsHomogeneousWithAllowedMap() {
MetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(TOPIC_1_UUID, TOPIC_1_NAME, 3)
.addTopic(TOPIC_3_UUID, TOPIC_3_NAME, 2)
.build();
Map<String, MemberSubscriptionAndAssignmentImpl> members = new HashMap<>();
Set<Uuid> topicsSubscription = new LinkedHashSet<>();
topicsSubscription.add(TOPIC_1_UUID);
topicsSubscription.add(TOPIC_3_UUID);
members.put(MEMBER_A, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
Optional.empty(),
topicsSubscription,
Assignment.EMPTY
));
members.put(MEMBER_B, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
Optional.empty(),
topicsSubscription,
Assignment.EMPTY
));
GroupSpec groupSpec = new GroupSpecImpl(
members,
HOMOGENEOUS,
Map.of(),
Optional.of(
Map.of(
TOPIC_1_UUID, Set.of(0, 1, 2),
TOPIC_3_UUID, Set.of(0, 1)
)
)
);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(
metadataImage
);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
subscribedTopicMetadata
);
// Hashcode of MEMBER_A is 65. Hashcode of MEMBER_B is 66.
// Step 1 -> T1:0 -> MEMBER_A and T1:1 -> MEMBER_B by hash assignment.
// Step 2 -> T1:2, T3:1 -> MEMBER_A and T3:0 -> MEMBER_B by round-robin assignment.
// Step 3 -> no new assignment gets added by current assignment since it is empty.
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(MEMBER_A, mkAssignment(
mkTopicAssignment(TOPIC_1_UUID, 0, 2),
mkTopicAssignment(TOPIC_3_UUID, 1)
));
expectedAssignment.put(MEMBER_B, mkAssignment(
mkTopicAssignment(TOPIC_1_UUID, 1),
mkTopicAssignment(TOPIC_3_UUID, 0)
));
// T1: 3 partitions + T3: 2 partitions = 5 partitions
assertEveryPartitionGetsAssignment(5, computedAssignment);
assertAssignment(expectedAssignment, computedAssignment);
}
@Test
public void testAssignWithTwoMembersAndTwoTopicsHomogeneousWithNonAssignableTopic() {
MetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(TOPIC_1_UUID, TOPIC_1_NAME, 3)
.addTopic(TOPIC_3_UUID, TOPIC_3_NAME, 2)
.build();
Map<String, MemberSubscriptionAndAssignmentImpl> members = new HashMap<>();
Set<Uuid> topicsSubscription = new LinkedHashSet<>();
topicsSubscription.add(TOPIC_1_UUID);
topicsSubscription.add(TOPIC_3_UUID);
members.put(MEMBER_A, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
Optional.empty(),
topicsSubscription,
Assignment.EMPTY
));
members.put(MEMBER_B, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
Optional.empty(),
topicsSubscription,
Assignment.EMPTY
));
GroupSpec groupSpec = new GroupSpecImpl(
members,
HOMOGENEOUS,
Map.of(),
Optional.of(
Map.of(TOPIC_1_UUID, Set.of(0, 1, 2))
)
);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(
metadataImage
);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
subscribedTopicMetadata
);
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(MEMBER_A, mkAssignment(
mkTopicAssignment(TOPIC_1_UUID, 1, 2)
));
expectedAssignment.put(MEMBER_B, mkAssignment(
mkTopicAssignment(TOPIC_1_UUID, 0)
));
// T1: 3 partitions + T3(non-assignable): 2 partitions = 3 partitions
assertEveryPartitionGetsAssignment(3, computedAssignment);
assertAssignment(expectedAssignment, computedAssignment);
}
@Test @Test
public void testAssignWithThreeMembersThreeTopicsHeterogeneous() { public void testAssignWithThreeMembersThreeTopicsHeterogeneous() {
MetadataImage metadataImage = new MetadataImageBuilder() MetadataImage metadataImage = new MetadataImageBuilder()
@ -297,6 +417,159 @@ public class SimpleAssignorTest {
assertAssignment(expectedAssignment, computedAssignment); assertAssignment(expectedAssignment, computedAssignment);
} }
@Test
public void testAssignWithThreeMembersThreeTopicsHeterogeneousWithAllowedMap() {
MetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(TOPIC_1_UUID, TOPIC_1_NAME, 3)
.addTopic(TOPIC_2_UUID, TOPIC_2_NAME, 3)
.addTopic(TOPIC_3_UUID, TOPIC_3_NAME, 2)
.build();
Set<Uuid> memberATopicsSubscription = new LinkedHashSet<>();
memberATopicsSubscription.add(TOPIC_1_UUID);
memberATopicsSubscription.add(TOPIC_2_UUID);
Map<String, MemberSubscriptionAndAssignmentImpl> members = new HashMap<>();
members.put(MEMBER_A, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
Optional.empty(),
memberATopicsSubscription,
Assignment.EMPTY
));
members.put(MEMBER_B, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
Optional.empty(),
Set.of(TOPIC_3_UUID),
Assignment.EMPTY
));
Set<Uuid> memberCTopicsSubscription = new LinkedHashSet<>();
memberCTopicsSubscription.add(TOPIC_2_UUID);
memberCTopicsSubscription.add(TOPIC_3_UUID);
members.put(MEMBER_C, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
Optional.empty(),
memberCTopicsSubscription,
Assignment.EMPTY
));
GroupSpec groupSpec = new GroupSpecImpl(
members,
HETEROGENEOUS,
Map.of(),
Optional.of(
Map.of(
TOPIC_1_UUID, Set.of(0, 1, 2),
TOPIC_2_UUID, Set.of(0, 1, 2),
TOPIC_3_UUID, Set.of(0, 1)
)
)
);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(
metadataImage
);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
subscribedTopicMetadata
);
// Hashcode of MEMBER_A is 65. Hashcode of MEMBER_B is 66. Hashcode of MEMBER_C is 67.
// Step 1 -> T2:2 -> member_A, T3:0 -> member_B, T2:2 -> member_C by hash assignment.
// Step 2 -> T1:0, T1:1, T1:2, T2:0 -> member_A, T3:1, -> member_B, T2:1 -> member_C by round-robin assignment.
// Step 3 -> no new assignment gets added by current assignment since it is empty.
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(MEMBER_A, mkAssignment(
mkTopicAssignment(TOPIC_1_UUID, 0, 1, 2),
mkTopicAssignment(TOPIC_2_UUID, 0, 2)
));
expectedAssignment.put(MEMBER_B, mkAssignment(
mkTopicAssignment(TOPIC_3_UUID, 0, 1)
));
expectedAssignment.put(MEMBER_C, mkAssignment(
mkTopicAssignment(TOPIC_2_UUID, 1, 2)
));
// T1: 3 partitions + T2: 3 partitions + T3: 2 partitions = 8 partitions
assertEveryPartitionGetsAssignment(8, computedAssignment);
assertAssignment(expectedAssignment, computedAssignment);
}
@Test
public void testAssignWithThreeMembersThreeTopicsHeterogeneousWithNonAssignableTopic() {
MetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(TOPIC_1_UUID, TOPIC_1_NAME, 3)
.addTopic(TOPIC_2_UUID, TOPIC_2_NAME, 3)
.addTopic(TOPIC_3_UUID, TOPIC_3_NAME, 2) // non-assignable
.build();
Set<Uuid> memberATopicsSubscription = new LinkedHashSet<>();
memberATopicsSubscription.add(TOPIC_1_UUID);
memberATopicsSubscription.add(TOPIC_2_UUID);
Map<String, MemberSubscriptionAndAssignmentImpl> members = new HashMap<>();
members.put(MEMBER_A, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
Optional.empty(),
memberATopicsSubscription,
Assignment.EMPTY
));
members.put(MEMBER_B, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
Optional.empty(),
Set.of(TOPIC_3_UUID),
Assignment.EMPTY
));
Set<Uuid> memberCTopicsSubscription = new LinkedHashSet<>();
memberCTopicsSubscription.add(TOPIC_2_UUID);
memberCTopicsSubscription.add(TOPIC_3_UUID);
members.put(MEMBER_C, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
Optional.empty(),
memberCTopicsSubscription,
Assignment.EMPTY
));
GroupSpec groupSpec = new GroupSpecImpl(
members,
HETEROGENEOUS,
Map.of(),
Optional.of(
Map.of(
TOPIC_1_UUID, Set.of(0, 1, 2),
TOPIC_2_UUID, Set.of(0, 1, 2)
)
)
);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(
metadataImage
);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
subscribedTopicMetadata
);
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(MEMBER_A, mkAssignment(
mkTopicAssignment(TOPIC_1_UUID, 0, 1, 2),
mkTopicAssignment(TOPIC_2_UUID, 0, 2)
));
expectedAssignment.put(MEMBER_B, Map.of());
expectedAssignment.put(MEMBER_C, mkAssignment(
mkTopicAssignment(TOPIC_2_UUID, 1)
));
// T1: 3 partitions + T2: 3 partitions + T3: 2 partitions(non-assignable) = 6 partitions
assertEveryPartitionGetsAssignment(6, computedAssignment);
assertAssignment(expectedAssignment, computedAssignment);
}
@Test @Test
public void testAssignWithOneMemberNoAssignedTopicHeterogeneous() { public void testAssignWithOneMemberNoAssignedTopicHeterogeneous() {
MetadataImage metadataImage = new MetadataImageBuilder() MetadataImage metadataImage = new MetadataImageBuilder()
@ -390,13 +663,7 @@ public class SimpleAssignorTest {
Map<TopicIdPartition, List<String>> assignment = new HashMap<>(); Map<TopicIdPartition, List<String>> assignment = new HashMap<>();
assignment.put(partition1, List.of(member1)); assignment.put(partition1, List.of(member1));
SubscribedTopicDescriber describer = mock(SubscribedTopicDescriber.class); assignor.roundRobinAssignment(members, unassignedPartitions, assignment);
when(describer.assignablePartitions(eq(TOPIC_1_UUID))).thenReturn(Set.of(0));
when(describer.assignablePartitions(eq(TOPIC_2_UUID))).thenReturn(Set.of(0));
when(describer.assignablePartitions(eq(TOPIC_3_UUID))).thenReturn(Set.of(0));
when(describer.assignablePartitions(eq(TOPIC_4_UUID))).thenReturn(Set.of(0));
assignor.roundRobinAssignment(members, unassignedPartitions, assignment, describer);
Map<TopicIdPartition, List<String>> expectedAssignment = Map.of( Map<TopicIdPartition, List<String>> expectedAssignment = Map.of(
partition1, List.of(member1), partition1, List.of(member1),
partition2, List.of(member1), partition2, List.of(member1),
@ -422,13 +689,7 @@ public class SimpleAssignorTest {
assignment.put(member1, new HashSet<>(Set.of(partition1))); assignment.put(member1, new HashSet<>(Set.of(partition1)));
assignment.put(member2, new HashSet<>(Set.of(partition1))); assignment.put(member2, new HashSet<>(Set.of(partition1)));
SubscribedTopicDescriber describer = mock(SubscribedTopicDescriber.class); assignor.roundRobinAssignmentWithCount(members, unassignedPartitions, assignment, 2);
when(describer.assignablePartitions(eq(TOPIC_1_UUID))).thenReturn(Set.of(0));
when(describer.assignablePartitions(eq(TOPIC_2_UUID))).thenReturn(Set.of(0));
when(describer.assignablePartitions(eq(TOPIC_3_UUID))).thenReturn(Set.of(0));
when(describer.assignablePartitions(eq(TOPIC_4_UUID))).thenReturn(Set.of(0));
assignor.roundRobinAssignmentWithCount(members, unassignedPartitions, assignment, 2, describer);
Map<String, Set<TopicIdPartition>> expectedAssignment = Map.of( Map<String, Set<TopicIdPartition>> expectedAssignment = Map.of(
member1, Set.of(partition1, partition2, partition4), member1, Set.of(partition1, partition2, partition4),
member2, Set.of(partition1, partition3) member2, Set.of(partition1, partition3)
@ -454,14 +715,8 @@ public class SimpleAssignorTest {
assignment.put(member1, new HashSet<>(Set.of(partition1))); assignment.put(member1, new HashSet<>(Set.of(partition1)));
assignment.put(member2, new HashSet<>(Set.of(partition1))); assignment.put(member2, new HashSet<>(Set.of(partition1)));
SubscribedTopicDescriber describer = mock(SubscribedTopicDescriber.class);
when(describer.assignablePartitions(eq(TOPIC_1_UUID))).thenReturn(Set.of(0));
when(describer.assignablePartitions(eq(TOPIC_2_UUID))).thenReturn(Set.of(0));
when(describer.assignablePartitions(eq(TOPIC_3_UUID))).thenReturn(Set.of(0));
when(describer.assignablePartitions(eq(TOPIC_4_UUID))).thenReturn(Set.of(0, 1, 2));
assertThrows(PartitionAssignorException.class, assertThrows(PartitionAssignorException.class,
() -> assignor.roundRobinAssignmentWithCount(members, unassignedPartitions, assignment, 2, describer)); () -> assignor.roundRobinAssignmentWithCount(members, unassignedPartitions, assignment, 2));
} }
@Test @Test

View File

@ -23,8 +23,6 @@ import org.apache.kafka.image.MetadataImage;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.util.Map;
import java.util.Optional;
import java.util.Set; import java.util.Set;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
@ -55,11 +53,6 @@ public class SubscribedTopicMetadataTest {
assertThrows(NullPointerException.class, () -> new SubscribedTopicDescriberImpl(null)); assertThrows(NullPointerException.class, () -> new SubscribedTopicDescriberImpl(null));
} }
@Test
public void testTopicPartitionAllowedMapCannotBeNull() {
assertThrows(NullPointerException.class, () -> new SubscribedTopicDescriberImpl(metadataImage, null));
}
@Test @Test
public void testNumberOfPartitions() { public void testNumberOfPartitions() {
Uuid topicId = Uuid.randomUuid(); Uuid topicId = Uuid.randomUuid();
@ -100,32 +93,4 @@ public class SubscribedTopicMetadataTest {
.build(); .build();
assertNotEquals(new SubscribedTopicDescriberImpl(metadataImage2), subscribedTopicMetadata); assertNotEquals(new SubscribedTopicDescriberImpl(metadataImage2), subscribedTopicMetadata);
} }
@Test
public void testAssignablePartitions() {
String t1Name = "t1";
Uuid t1Id = Uuid.randomUuid();
metadataImage = new MetadataImageBuilder().addTopic(t1Id, t1Name, numPartitions).build();
// Optional.empty() allow map (all partitions assignable)
subscribedTopicMetadata = new SubscribedTopicDescriberImpl(metadataImage, Optional.empty());
assertEquals(Set.of(0, 1, 2, 3, 4), subscribedTopicMetadata.assignablePartitions(t1Id));
// empty allow map (nothing assignable)
subscribedTopicMetadata = new SubscribedTopicDescriberImpl(metadataImage, Optional.of(Map.of()));
assertEquals(Set.of(), subscribedTopicMetadata.assignablePartitions(t1Id));
// few assignable partitions
subscribedTopicMetadata = new SubscribedTopicDescriberImpl(
metadataImage,
Optional.of(Map.of(t1Id, Set.of(0, 5)))
);
assertEquals(Set.of(0, 5), subscribedTopicMetadata.assignablePartitions(t1Id));
// all assignable partitions
subscribedTopicMetadata = new SubscribedTopicDescriberImpl(
metadataImage,
Optional.of(Map.of(t1Id, Set.of(0, 1, 2, 3, 4)))
);
assertEquals(Set.of(0, 1, 2, 3, 4), subscribedTopicMetadata.assignablePartitions(t1Id));
}
} }