KAFKA-17747: [3/N] Get rid of TopicMetadata in SubscribedTopicDescriberImpl (#19611)
CI / build (push) Waiting to run Details

Replace `TopicMetadata` with `MetadataImage` in
`SubscribedTopicDescriberImpl` and  `TargetAssignmentBuilder`.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, David Jacot
 <djacot@confluent.io>
This commit is contained in:
PoAn Yang 2025-05-19 07:46:24 -05:00 committed by GitHub
parent ce4940f989
commit c493d89334
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 454 additions and 765 deletions

View File

@ -276,16 +276,13 @@ public class GroupMetadataManager {
private static class UpdateSubscriptionMetadataResult {
private final int groupEpoch;
private final Map<String, TopicMetadata> subscriptionMetadata;
private final SubscriptionType subscriptionType;
UpdateSubscriptionMetadataResult(
int groupEpoch,
Map<String, TopicMetadata> subscriptionMetadata,
SubscriptionType subscriptionType
) {
this.groupEpoch = groupEpoch;
this.subscriptionMetadata = Objects.requireNonNull(subscriptionMetadata);
this.subscriptionType = Objects.requireNonNull(subscriptionType);
}
}
@ -2231,7 +2228,6 @@ public class GroupMetadataManager {
);
int groupEpoch = group.groupEpoch();
Map<String, TopicMetadata> subscriptionMetadata = group.subscriptionMetadata();
SubscriptionType subscriptionType = group.subscriptionType();
if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) {
@ -2247,7 +2243,6 @@ public class GroupMetadataManager {
);
groupEpoch = result.groupEpoch;
subscriptionMetadata = result.subscriptionMetadata;
subscriptionType = result.subscriptionType;
}
@ -2262,7 +2257,6 @@ public class GroupMetadataManager {
groupEpoch,
member,
updatedMember,
subscriptionMetadata,
subscriptionType,
records
);
@ -2373,7 +2367,6 @@ public class GroupMetadataManager {
}
int groupEpoch = group.groupEpoch();
Map<String, TopicMetadata> subscriptionMetadata = group.subscriptionMetadata();
SubscriptionType subscriptionType = group.subscriptionType();
final ConsumerProtocolSubscription subscription = deserializeSubscription(protocols);
@ -2416,7 +2409,6 @@ public class GroupMetadataManager {
);
groupEpoch = result.groupEpoch;
subscriptionMetadata = result.subscriptionMetadata;
subscriptionType = result.subscriptionType;
}
@ -2431,7 +2423,6 @@ public class GroupMetadataManager {
groupEpoch,
member,
updatedMember,
subscriptionMetadata,
subscriptionType,
records
);
@ -2605,7 +2596,6 @@ public class GroupMetadataManager {
group,
groupEpoch,
updatedMember,
subscriptionMetadata,
subscriptionType,
records
);
@ -3624,7 +3614,6 @@ public class GroupMetadataManager {
return new UpdateSubscriptionMetadataResult(
groupEpoch,
subscriptionMetadata,
subscriptionType
);
}
@ -3632,13 +3621,12 @@ public class GroupMetadataManager {
/**
* Updates the target assignment according to the updated member and subscription metadata.
*
* @param group The ConsumerGroup.
* @param groupEpoch The group epoch.
* @param member The existing member.
* @param updatedMember The updated member.
* @param subscriptionMetadata The subscription metadata.
* @param subscriptionType The group subscription type.
* @param records The list to accumulate any new records.
* @param group The ConsumerGroup.
* @param groupEpoch The group epoch.
* @param member The existing member.
* @param updatedMember The updated member.
* @param subscriptionType The group subscription type.
* @param records The list to accumulate any new records.
* @return The new target assignment.
*/
private Assignment updateTargetAssignment(
@ -3646,7 +3634,6 @@ public class GroupMetadataManager {
int groupEpoch,
ConsumerGroupMember member,
ConsumerGroupMember updatedMember,
Map<String, TopicMetadata> subscriptionMetadata,
SubscriptionType subscriptionType,
List<CoordinatorRecord> records
) {
@ -3659,11 +3646,10 @@ public class GroupMetadataManager {
new TargetAssignmentBuilder.ConsumerTargetAssignmentBuilder(group.groupId(), groupEpoch, consumerGroupAssignors.get(preferredServerAssignor))
.withMembers(group.members())
.withStaticMembers(group.staticMembers())
.withSubscriptionMetadata(subscriptionMetadata)
.withSubscriptionType(subscriptionType)
.withTargetAssignment(group.targetAssignment())
.withInvertedTargetAssignment(group.invertedTargetAssignment())
.withTopicsImage(metadataImage.topics())
.withMetadataImage(metadataImage)
.withResolvedRegularExpressions(group.resolvedRegularExpressions())
.addOrUpdateMember(updatedMember.memberId(), updatedMember);
@ -3706,19 +3692,17 @@ public class GroupMetadataManager {
/**
* Updates the target assignment according to the updated member and subscription metadata.
*
* @param group The ShareGroup.
* @param groupEpoch The group epoch.
* @param updatedMember The updated member.
* @param subscriptionMetadata The subscription metadata.
* @param subscriptionType The group subscription type.
* @param records The list to accumulate any new records.
* @param group The ShareGroup.
* @param groupEpoch The group epoch.
* @param updatedMember The updated member.
* @param subscriptionType The group subscription type.
* @param records The list to accumulate any new records.
* @return The new target assignment.
*/
private Assignment updateTargetAssignment(
ShareGroup group,
int groupEpoch,
ShareGroupMember updatedMember,
Map<String, TopicMetadata> subscriptionMetadata,
SubscriptionType subscriptionType,
List<CoordinatorRecord> records
) {
@ -3730,12 +3714,11 @@ public class GroupMetadataManager {
TargetAssignmentBuilder.ShareTargetAssignmentBuilder assignmentResultBuilder =
new TargetAssignmentBuilder.ShareTargetAssignmentBuilder(group.groupId(), groupEpoch, shareGroupAssignor)
.withMembers(group.members())
.withSubscriptionMetadata(subscriptionMetadata)
.withSubscriptionType(subscriptionType)
.withTargetAssignment(group.targetAssignment())
.withTopicAssignablePartitionsMap(initializedTopicPartitions)
.withInvertedTargetAssignment(group.invertedTargetAssignment())
.withTopicsImage(metadataImage.topics())
.withMetadataImage(metadataImage)
.addOrUpdateMember(updatedMember.memberId(), updatedMember);
long startTimeMs = time.milliseconds();

View File

@ -19,12 +19,17 @@ package org.apache.kafka.coordinator.group.modern;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignor;
import org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.TopicImage;
import org.apache.kafka.metadata.BrokerRegistration;
import org.apache.kafka.metadata.PartitionRegistration;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
/**
* The subscribed topic metadata class is used by the {@link PartitionAssignor} to obtain
@ -32,28 +37,26 @@ import java.util.stream.IntStream;
*/
public class SubscribedTopicDescriberImpl implements SubscribedTopicDescriber {
/**
* The topic Ids mapped to their corresponding {@link TopicMetadata}
* object, which contains topic and partition metadata.
* The map of topic Ids to the set of allowed partitions for each topic.
* If this is empty, all partitions are allowed.
*/
private final Map<Uuid, TopicMetadata> topicMetadata;
private final Map<Uuid, Set<Integer>> topicPartitionAllowedMap;
public SubscribedTopicDescriberImpl(Map<Uuid, TopicMetadata> topicMetadata) {
this(topicMetadata, null);
}
public SubscribedTopicDescriberImpl(Map<Uuid, TopicMetadata> topicMetadata, Map<Uuid, Set<Integer>> topicPartitionAllowedMap) {
this.topicMetadata = Objects.requireNonNull(topicMetadata);
this.topicPartitionAllowedMap = topicPartitionAllowedMap;
}
private final Optional<Map<Uuid, Set<Integer>>> topicPartitionAllowedMap;
/**
* Map of topic Ids to topic metadata.
*
* @return The map of topic Ids to topic metadata.
* The metadata image that contains the latest metadata information.
*/
public Map<Uuid, TopicMetadata> topicMetadata() {
return this.topicMetadata;
private final MetadataImage metadataImage;
public SubscribedTopicDescriberImpl(MetadataImage metadataImage) {
this(metadataImage, Optional.empty());
}
public SubscribedTopicDescriberImpl(
MetadataImage metadataImage,
Optional<Map<Uuid, Set<Integer>>> topicPartitionAllowedMap
) {
this.metadataImage = Objects.requireNonNull(metadataImage);
this.topicPartitionAllowedMap = Objects.requireNonNull(topicPartitionAllowedMap);
}
/**
@ -65,8 +68,8 @@ public class SubscribedTopicDescriberImpl implements SubscribedTopicDescriber {
*/
@Override
public int numPartitions(Uuid topicId) {
TopicMetadata topic = this.topicMetadata.get(topicId);
return topic == null ? -1 : topic.numPartitions();
TopicImage topicImage = this.metadataImage.topics().getTopic(topicId);
return topicImage == null ? -1 : topicImage.partitions().size();
}
/**
@ -79,13 +82,28 @@ public class SubscribedTopicDescriberImpl implements SubscribedTopicDescriber {
*/
@Override
public Set<String> racksForPartition(Uuid topicId, int partition) {
TopicImage topic = metadataImage.topics().getTopic(topicId);
if (topic != null) {
PartitionRegistration partitionRegistration = topic.partitions().get(partition);
if (partitionRegistration != null) {
Set<String> racks = new HashSet<>();
for (int replica : partitionRegistration.replicas) {
// Only add the rack if it is available for the broker/replica.
BrokerRegistration brokerRegistration = metadataImage.cluster().broker(replica);
if (brokerRegistration != null) {
brokerRegistration.rack().ifPresent(racks::add);
}
}
return Collections.unmodifiableSet(racks);
}
}
return Set.of();
}
/**
* Returns a set of assignable partitions from the topic metadata.
* If the allowed partition map is null, all the partitions in the corresponding
* topic metadata are returned for the argument topic id. If allowed map is empty,
* Returns a set of assignable partitions from the metadata image.
* If the allowed partition map is Optional.empty(), all the partitions in the corresponding
* topic image are returned for the argument topic id. If allowed map is empty,
* empty set is returned.
*
* @param topicId The uuid of the topic
@ -93,16 +111,16 @@ public class SubscribedTopicDescriberImpl implements SubscribedTopicDescriber {
*/
@Override
public Set<Integer> assignablePartitions(Uuid topicId) {
TopicMetadata topic = this.topicMetadata.get(topicId);
TopicImage topic = metadataImage.topics().getTopic(topicId);
if (topic == null) {
return Set.of();
}
if (topicPartitionAllowedMap == null) {
return IntStream.range(0, topic.numPartitions()).boxed().collect(Collectors.toUnmodifiableSet());
if (topicPartitionAllowedMap.isEmpty()) {
return Collections.unmodifiableSet(topic.partitions().keySet());
}
return topicPartitionAllowedMap.getOrDefault(topicId, Set.of());
return topicPartitionAllowedMap.get().getOrDefault(topicId, Set.of());
}
@Override
@ -110,18 +128,22 @@ public class SubscribedTopicDescriberImpl implements SubscribedTopicDescriber {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SubscribedTopicDescriberImpl that = (SubscribedTopicDescriberImpl) o;
return topicMetadata.equals(that.topicMetadata);
if (!topicPartitionAllowedMap.equals(that.topicPartitionAllowedMap)) return false;
return metadataImage.equals(that.metadataImage);
}
@Override
public int hashCode() {
return topicMetadata.hashCode();
int result = metadataImage.hashCode();
result = 31 * result + topicPartitionAllowedMap.hashCode();
return result;
}
@Override
public String toString() {
return "SubscribedTopicMetadata(" +
"topicMetadata=" + topicMetadata +
"metadataImage=" + metadataImage +
", topicPartitionAllowedMap=" + topicPartitionAllowedMap +
')';
}
}

View File

@ -27,7 +27,7 @@ import org.apache.kafka.coordinator.group.api.assignor.SubscriptionType;
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember;
import org.apache.kafka.coordinator.group.modern.consumer.ResolvedRegularExpression;
import org.apache.kafka.coordinator.group.modern.share.ShareGroupMember;
import org.apache.kafka.image.TopicsImage;
import org.apache.kafka.image.MetadataImage;
import java.util.ArrayList;
import java.util.Collections;
@ -250,11 +250,6 @@ public abstract class TargetAssignmentBuilder<T extends ModernGroupMember, U ext
*/
private Map<String, T> members = Map.of();
/**
* The subscription metadata.
*/
private Map<String, TopicMetadata> subscriptionMetadata = Map.of();
/**
* The subscription type of the consumer group.
*/
@ -272,9 +267,9 @@ public abstract class TargetAssignmentBuilder<T extends ModernGroupMember, U ext
private Map<Uuid, Map<Integer, String>> invertedTargetAssignment = Map.of();
/**
* The topics image.
* The metadata image.
*/
private TopicsImage topicsImage = TopicsImage.EMPTY;
private MetadataImage metadataImage = MetadataImage.EMPTY;
/**
* The members which have been updated or deleted. Deleted members
@ -290,7 +285,7 @@ public abstract class TargetAssignmentBuilder<T extends ModernGroupMember, U ext
/**
* Topic partition assignable map.
*/
private Map<Uuid, Set<Integer>> topicAssignablePartitionsMap = new HashMap<>();
private Optional<Map<Uuid, Set<Integer>>> topicAssignablePartitionsMap = Optional.empty();
/**
* Constructs the object.
@ -335,19 +330,6 @@ public abstract class TargetAssignmentBuilder<T extends ModernGroupMember, U ext
return self();
}
/**
* Adds the subscription metadata to use.
*
* @param subscriptionMetadata The subscription metadata.
* @return This object.
*/
public U withSubscriptionMetadata(
Map<String, TopicMetadata> subscriptionMetadata
) {
this.subscriptionMetadata = subscriptionMetadata;
return self();
}
/**
* Adds the subscription type in use.
*
@ -388,22 +370,22 @@ public abstract class TargetAssignmentBuilder<T extends ModernGroupMember, U ext
}
/**
* Adds the topics image.
* Adds the metadata image.
*
* @param topicsImage The topics image.
* @param metadataImage The metadata image.
* @return This object.
*/
public U withTopicsImage(
TopicsImage topicsImage
public U withMetadataImage(
MetadataImage metadataImage
) {
this.topicsImage = topicsImage;
this.metadataImage = metadataImage;
return self();
}
public U withTopicAssignablePartitionsMap(
Map<Uuid, Set<Integer>> topicAssignablePartitionsMap
) {
this.topicAssignablePartitionsMap = topicAssignablePartitionsMap;
this.topicAssignablePartitionsMap = Optional.of(topicAssignablePartitionsMap);
return self();
}
@ -445,7 +427,7 @@ public abstract class TargetAssignmentBuilder<T extends ModernGroupMember, U ext
*/
public TargetAssignmentResult build() throws PartitionAssignorException {
Map<String, MemberSubscriptionAndAssignmentImpl> memberSpecs = new HashMap<>();
TopicIds.TopicResolver topicResolver = new TopicIds.CachedTopicResolver(topicsImage);
TopicIds.TopicResolver topicResolver = new TopicIds.CachedTopicResolver(metadataImage.topics());
// Prepare the member spec for all members.
members.forEach((memberId, member) ->
@ -479,15 +461,6 @@ public abstract class TargetAssignmentBuilder<T extends ModernGroupMember, U ext
}
});
// Prepare the topic metadata.
Map<Uuid, TopicMetadata> topicMetadataMap = new HashMap<>();
subscriptionMetadata.forEach((topicName, topicMetadata) ->
topicMetadataMap.put(
topicMetadata.id(),
topicMetadata
)
);
// Compute the assignment.
GroupAssignment newGroupAssignment = assignor.assign(
new GroupSpecImpl(
@ -495,7 +468,7 @@ public abstract class TargetAssignmentBuilder<T extends ModernGroupMember, U ext
subscriptionType,
invertedTargetAssignment
),
new SubscribedTopicDescriberImpl(topicMetadataMap, topicAssignablePartitionsMap)
new SubscribedTopicDescriberImpl(metadataImage, topicAssignablePartitionsMap)
);
// Compute delta from previous to new target assignment and create the relevant records.

View File

@ -17,6 +17,7 @@
package org.apache.kafka.coordinator.group.assignor;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.MetadataImageBuilder;
import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.api.assignor.GroupSpec;
import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException;
@ -24,7 +25,7 @@ import org.apache.kafka.coordinator.group.modern.Assignment;
import org.apache.kafka.coordinator.group.modern.GroupSpecImpl;
import org.apache.kafka.coordinator.group.modern.MemberSubscriptionAndAssignmentImpl;
import org.apache.kafka.coordinator.group.modern.SubscribedTopicDescriberImpl;
import org.apache.kafka.coordinator.group.modern.TopicMetadata;
import org.apache.kafka.image.MetadataImage;
import org.junit.jupiter.api.Test;
@ -61,15 +62,11 @@ public class OptimizedUniformAssignmentBuilderTest {
@Test
public void testOneMemberNoTopicSubscription() {
MetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(topic1Uuid, topic1Name, 3)
.build();
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(
Map.of(
topic1Uuid,
new TopicMetadata(
topic1Uuid,
topic1Name,
3
)
)
metadataImage
);
Map<String, MemberSubscriptionAndAssignmentImpl> members = Map.of(
@ -98,15 +95,11 @@ public class OptimizedUniformAssignmentBuilderTest {
@Test
public void testOneMemberSubscribedToNonexistentTopic() {
MetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(topic1Uuid, topic1Name, 3)
.build();
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(
Map.of(
topic1Uuid,
new TopicMetadata(
topic1Uuid,
topic1Name,
3
)
)
metadataImage
);
Map<String, MemberSubscriptionAndAssignmentImpl> members = Map.of(
@ -131,17 +124,10 @@ public class OptimizedUniformAssignmentBuilderTest {
@Test
public void testFirstAssignmentTwoMembersTwoTopicsNoMemberRacks() {
Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
3
));
topicMetadata.put(topic3Uuid, new TopicMetadata(
topic3Uuid,
topic3Name,
2
));
MetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(topic1Uuid, topic1Name, 3)
.addTopic(topic3Uuid, topic3Name, 2)
.build();
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
@ -172,7 +158,9 @@ public class OptimizedUniformAssignmentBuilderTest {
HOMOGENEOUS,
Map.of()
);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(
metadataImage
);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
@ -185,12 +173,9 @@ public class OptimizedUniformAssignmentBuilderTest {
@Test
public void testFirstAssignmentNumMembersGreaterThanTotalNumPartitions() {
Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
topicMetadata.put(topic3Uuid, new TopicMetadata(
topic3Uuid,
topic3Name,
2
));
MetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(topic3Uuid, topic3Name, 2)
.build();
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
@ -232,7 +217,9 @@ public class OptimizedUniformAssignmentBuilderTest {
HOMOGENEOUS,
Map.of()
);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(
metadataImage
);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
@ -245,22 +232,21 @@ public class OptimizedUniformAssignmentBuilderTest {
@Test
public void testValidityAndBalanceForLargeSampleSet() {
Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
for (int i = 1; i < 100; i++) {
MetadataImageBuilder metadataImageBuilder = new MetadataImageBuilder();
Set<Uuid> topicIds = new HashSet<>();
for (int i = 1; i < 101; i++) {
Uuid topicId = Uuid.randomUuid();
topicMetadata.put(topicId, new TopicMetadata(
topicId,
"topic-" + i,
3
));
metadataImageBuilder.addTopic(topicId, "topic-" + i, 3);
topicIds.add(topicId);
}
MetadataImage metadataImage = metadataImageBuilder.build();
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
for (int i = 1; i < 50; i++) {
members.put("member" + i, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
Optional.empty(),
topicMetadata.keySet(),
topicIds,
Assignment.EMPTY
));
}
@ -270,7 +256,9 @@ public class OptimizedUniformAssignmentBuilderTest {
HOMOGENEOUS,
Map.of()
);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(
metadataImage
);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
@ -282,17 +270,10 @@ public class OptimizedUniformAssignmentBuilderTest {
@Test
public void testReassignmentForTwoMembersTwoTopicsGivenUnbalancedPrevAssignment() {
Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
3
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
3
));
MetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(topic1Uuid, topic1Name, 3)
.addTopic(topic2Uuid, topic2Name, 3)
.build();
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
@ -331,7 +312,9 @@ public class OptimizedUniformAssignmentBuilderTest {
HOMOGENEOUS,
invertedTargetAssignment(members)
);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(
metadataImage
);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
@ -345,17 +328,10 @@ public class OptimizedUniformAssignmentBuilderTest {
@Test
public void testReassignmentWhenPartitionsAreAddedForTwoMembersTwoTopics() {
// Simulating adding partition to T1 and T2 - originally T1 -> 3 Partitions and T2 -> 3 Partitions
Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
6
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
5
));
MetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(topic1Uuid, topic1Name, 6)
.addTopic(topic2Uuid, topic2Name, 5)
.build();
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
@ -394,7 +370,9 @@ public class OptimizedUniformAssignmentBuilderTest {
HOMOGENEOUS,
invertedTargetAssignment(members)
);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(
metadataImage
);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
@ -407,17 +385,10 @@ public class OptimizedUniformAssignmentBuilderTest {
@Test
public void testReassignmentWhenOneMemberAddedAfterInitialAssignmentWithTwoMembersTwoTopics() {
Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
3
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
3
));
MetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(topic1Uuid, topic1Name, 3)
.addTopic(topic2Uuid, topic2Name, 3)
.build();
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
@ -466,7 +437,9 @@ public class OptimizedUniformAssignmentBuilderTest {
HOMOGENEOUS,
invertedTargetAssignment(members)
);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(
metadataImage
);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
@ -479,17 +452,10 @@ public class OptimizedUniformAssignmentBuilderTest {
@Test
public void testReassignmentWhenOneMemberRemovedAfterInitialAssignmentWithThreeMembersTwoTopics() {
Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
3
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
3
));
MetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(topic1Uuid, topic1Name, 3)
.addTopic(topic2Uuid, topic2Name, 3)
.build();
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
@ -530,7 +496,9 @@ public class OptimizedUniformAssignmentBuilderTest {
HOMOGENEOUS,
invertedTargetAssignment(members)
);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(
metadataImage
);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
@ -543,17 +511,10 @@ public class OptimizedUniformAssignmentBuilderTest {
@Test
public void testReassignmentWhenOneSubscriptionRemovedAfterInitialAssignmentWithTwoMembersTwoTopics() {
Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
2
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
2
));
MetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(topic1Uuid, topic1Name, 2)
.addTopic(topic2Uuid, topic2Name, 2)
.build();
// Initial subscriptions were [T1, T2]
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
@ -591,7 +552,9 @@ public class OptimizedUniformAssignmentBuilderTest {
HOMOGENEOUS,
invertedTargetAssignment(members)
);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(
metadataImage
);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
@ -604,12 +567,9 @@ public class OptimizedUniformAssignmentBuilderTest {
@Test
public void testReassignmentStickinessWhenAlreadyBalanced() {
Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
5
));
MetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(topic1Uuid, topic1Name, 5)
.build();
// A TreeMap ensures that memberA is first in the iteration order.
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
@ -660,7 +620,7 @@ public class OptimizedUniformAssignmentBuilderTest {
HOMOGENEOUS,
invertedTargetAssignment(members)
);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(metadataImage);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,

View File

@ -17,6 +17,7 @@
package org.apache.kafka.coordinator.group.assignor;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.MetadataImageBuilder;
import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.api.assignor.GroupSpec;
import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment;
@ -28,7 +29,7 @@ import org.apache.kafka.coordinator.group.modern.GroupSpecImpl;
import org.apache.kafka.coordinator.group.modern.MemberAssignmentImpl;
import org.apache.kafka.coordinator.group.modern.MemberSubscriptionAndAssignmentImpl;
import org.apache.kafka.coordinator.group.modern.SubscribedTopicDescriberImpl;
import org.apache.kafka.coordinator.group.modern.TopicMetadata;
import org.apache.kafka.image.MetadataImage;
import org.junit.jupiter.api.Test;
@ -61,14 +62,7 @@ public class RangeAssignorTest {
@Test
public void testOneMemberNoTopic() {
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(
Map.of(
topic1Uuid,
new TopicMetadata(
topic1Uuid,
topic1Name,
3
)
)
MetadataImage.EMPTY
);
Map<String, MemberSubscriptionAndAssignmentImpl> members = Map.of(
@ -102,15 +96,11 @@ public class RangeAssignorTest {
@Test
public void testOneMemberSubscribedToNonExistentTopic() {
MetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(topic1Uuid, topic1Name, 3)
.build();
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(
Map.of(
topic1Uuid,
new TopicMetadata(
topic1Uuid,
topic1Name,
3
)
)
metadataImage
);
Map<String, MemberSubscriptionAndAssignmentImpl> members = Map.of(
@ -135,17 +125,10 @@ public class RangeAssignorTest {
@Test
public void testFirstAssignmentTwoMembersTwoTopicsSameSubscriptions() {
Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
3
));
topicMetadata.put(topic3Uuid, new TopicMetadata(
topic3Uuid,
topic3Name,
2
));
MetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(topic1Uuid, topic1Name, 3)
.addTopic(topic3Uuid, topic3Name, 2)
.build();
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
@ -168,7 +151,9 @@ public class RangeAssignorTest {
HOMOGENEOUS,
invertedTargetAssignment(members)
);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(
metadataImage
);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
@ -190,22 +175,11 @@ public class RangeAssignorTest {
@Test
public void testFirstAssignmentThreeMembersThreeTopicsDifferentSubscriptions() {
Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
3
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
3
));
topicMetadata.put(topic3Uuid, new TopicMetadata(
topic3Uuid,
topic3Name,
2
));
MetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(topic1Uuid, topic1Name, 3)
.addTopic(topic2Uuid, topic2Name, 3)
.addTopic(topic3Uuid, topic3Name, 2)
.build();
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
@ -235,7 +209,9 @@ public class RangeAssignorTest {
HETEROGENEOUS,
invertedTargetAssignment(members)
);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(
metadataImage
);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
@ -260,17 +236,10 @@ public class RangeAssignorTest {
@Test
public void testFirstAssignmentNumMembersGreaterThanNumPartitions() {
Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
3
));
topicMetadata.put(topic3Uuid, new TopicMetadata(
topic3Uuid,
topic3Name,
2
));
MetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(topic1Uuid, topic1Name, 3)
.addTopic(topic3Uuid, topic3Name, 2)
.build();
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
@ -300,7 +269,9 @@ public class RangeAssignorTest {
HOMOGENEOUS,
invertedTargetAssignment(members)
);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(
metadataImage
);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
@ -326,15 +297,11 @@ public class RangeAssignorTest {
@Test
public void testStaticMembership() throws PartitionAssignorException {
MetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(topic1Uuid, topic1Name, 3)
.build();
SubscribedTopicDescriber subscribedTopicMetadata = new SubscribedTopicDescriberImpl(
Map.of(
topic1Uuid,
new TopicMetadata(
topic1Uuid,
topic1Name,
3
)
)
metadataImage
);
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
@ -396,15 +363,11 @@ public class RangeAssignorTest {
@Test
public void testMixedStaticMembership() throws PartitionAssignorException {
MetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(topic1Uuid, topic1Name, 5)
.build();
SubscribedTopicDescriber subscribedTopicMetadata = new SubscribedTopicDescriberImpl(
Map.of(
topic1Uuid,
new TopicMetadata(
topic1Uuid,
topic1Name,
5
)
)
metadataImage
);
// Initialize members with instance Ids.
@ -480,17 +443,10 @@ public class RangeAssignorTest {
@Test
public void testReassignmentNumMembersGreaterThanNumPartitionsWhenOneMemberAdded() {
Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
2
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
2
));
MetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(topic1Uuid, topic1Name, 2)
.addTopic(topic2Uuid, topic2Name, 2)
.build();
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
@ -527,7 +483,9 @@ public class RangeAssignorTest {
HOMOGENEOUS,
invertedTargetAssignment(members)
);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(
metadataImage
);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
@ -552,17 +510,10 @@ public class RangeAssignorTest {
@Test
public void testReassignmentWhenOnePartitionAddedForTwoMembersTwoTopics() {
// Simulating adding a partition - originally T1 -> 3 Partitions and T2 -> 3 Partitions
Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
4
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
4
));
MetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(topic1Uuid, topic1Name, 4)
.addTopic(topic2Uuid, topic2Name, 4)
.build();
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
@ -591,7 +542,9 @@ public class RangeAssignorTest {
HOMOGENEOUS,
invertedTargetAssignment(members)
);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(
metadataImage
);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
@ -613,17 +566,10 @@ public class RangeAssignorTest {
@Test
public void testReassignmentWhenOneMemberAddedAfterInitialAssignmentWithTwoMembersTwoTopics() {
Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
3
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
3
));
MetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(topic1Uuid, topic1Name, 3)
.addTopic(topic2Uuid, topic2Name, 3)
.build();
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
@ -660,7 +606,9 @@ public class RangeAssignorTest {
HOMOGENEOUS,
invertedTargetAssignment(members)
);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(
metadataImage
);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
@ -687,17 +635,10 @@ public class RangeAssignorTest {
@Test
public void testReassignmentWhenOneMemberAddedAndOnePartitionAfterInitialAssignmentWithTwoMembersTwoTopics() {
// Add a new partition to topic 1, initially T1 -> 3 partitions
Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
4
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
3
));
MetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(topic1Uuid, topic1Name, 4)
.addTopic(topic2Uuid, topic2Name, 3)
.build();
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
@ -734,7 +675,9 @@ public class RangeAssignorTest {
HETEROGENEOUS,
invertedTargetAssignment(members)
);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(
metadataImage
);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
@ -759,17 +702,10 @@ public class RangeAssignorTest {
@Test
public void testReassignmentWhenOneMemberRemovedAfterInitialAssignmentWithTwoMembersTwoTopics() {
Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
3
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
3
));
MetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(topic1Uuid, topic1Name, 3)
.addTopic(topic2Uuid, topic2Name, 3)
.build();
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
@ -790,7 +726,9 @@ public class RangeAssignorTest {
HOMOGENEOUS,
invertedTargetAssignment(members)
);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(
metadataImage
);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
@ -808,22 +746,11 @@ public class RangeAssignorTest {
@Test
public void testReassignmentWhenMultipleSubscriptionsRemovedAfterInitialAssignmentWithThreeMembersTwoTopics() {
Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
3
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
3
));
topicMetadata.put(topic3Uuid, new TopicMetadata(
topic3Uuid,
topic3Name,
2
));
MetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(topic1Uuid, topic1Name, 3)
.addTopic(topic2Uuid, topic2Name, 3)
.addTopic(topic3Uuid, topic3Name, 2)
.build();
// Let initial subscriptions be A -> T1, T2 // B -> T2 // C -> T2, T3
// Change the subscriptions to A -> T1 // B -> T1, T2, T3 // C -> T2
@ -863,7 +790,9 @@ public class RangeAssignorTest {
HETEROGENEOUS,
invertedTargetAssignment(members)
);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(
metadataImage
);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,

View File

@ -17,6 +17,7 @@
package org.apache.kafka.coordinator.group.assignor;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.MetadataImageBuilder;
import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.api.assignor.GroupSpec;
import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment;
@ -26,7 +27,7 @@ import org.apache.kafka.coordinator.group.modern.Assignment;
import org.apache.kafka.coordinator.group.modern.GroupSpecImpl;
import org.apache.kafka.coordinator.group.modern.MemberSubscriptionAndAssignmentImpl;
import org.apache.kafka.coordinator.group.modern.SubscribedTopicDescriberImpl;
import org.apache.kafka.coordinator.group.modern.TopicMetadata;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.common.TopicIdPartition;
import org.junit.jupiter.api.Test;
@ -74,7 +75,7 @@ public class SimpleAssignorTest {
@Test
public void testAssignWithEmptyMembers() {
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(
Map.of()
MetadataImage.EMPTY
);
GroupSpec groupSpec = new GroupSpecImpl(
@ -104,15 +105,11 @@ public class SimpleAssignorTest {
@Test
public void testAssignWithNoSubscribedTopic() {
MetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(TOPIC_1_UUID, TOPIC_1_NAME, 3)
.build();
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(
Map.of(
TOPIC_1_UUID,
new TopicMetadata(
TOPIC_1_UUID,
TOPIC_1_NAME,
3
)
)
metadataImage
);
Map<String, MemberSubscriptionAndAssignmentImpl> members = Map.of(
@ -141,15 +138,11 @@ public class SimpleAssignorTest {
@Test
public void testAssignWithSubscribedToNonExistentTopic() {
MetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(TOPIC_1_UUID, TOPIC_1_NAME, 3)
.build();
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(
Map.of(
TOPIC_1_UUID,
new TopicMetadata(
TOPIC_1_UUID,
TOPIC_1_NAME,
3
)
)
metadataImage
);
Map<String, MemberSubscriptionAndAssignmentImpl> members = Map.of(
@ -174,17 +167,10 @@ public class SimpleAssignorTest {
@Test
public void testAssignWithTwoMembersAndTwoTopicsHomogeneous() {
Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
topicMetadata.put(TOPIC_1_UUID, new TopicMetadata(
TOPIC_1_UUID,
TOPIC_1_NAME,
3
));
topicMetadata.put(TOPIC_3_UUID, new TopicMetadata(
TOPIC_3_UUID,
TOPIC_3_NAME,
2
));
MetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(TOPIC_1_UUID, TOPIC_1_NAME, 3)
.addTopic(TOPIC_3_UUID, TOPIC_3_NAME, 2)
.build();
Map<String, MemberSubscriptionAndAssignmentImpl> members = new HashMap<>();
@ -211,7 +197,9 @@ public class SimpleAssignorTest {
HOMOGENEOUS,
Map.of()
);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(
metadataImage
);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
@ -239,23 +227,11 @@ public class SimpleAssignorTest {
@Test
public void testAssignWithThreeMembersThreeTopicsHeterogeneous() {
Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
topicMetadata.put(TOPIC_1_UUID, new TopicMetadata(
TOPIC_1_UUID,
TOPIC_1_NAME,
3
));
topicMetadata.put(TOPIC_2_UUID, new TopicMetadata(
TOPIC_2_UUID,
"topic2",
3
));
topicMetadata.put(TOPIC_3_UUID, new TopicMetadata(
TOPIC_3_UUID,
TOPIC_3_NAME,
2
));
MetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(TOPIC_1_UUID, TOPIC_1_NAME, 3)
.addTopic(TOPIC_2_UUID, TOPIC_2_NAME, 3)
.addTopic(TOPIC_3_UUID, TOPIC_3_NAME, 2)
.build();
Set<Uuid> memberATopicsSubscription = new LinkedHashSet<>();
memberATopicsSubscription.add(TOPIC_1_UUID);
@ -291,7 +267,9 @@ public class SimpleAssignorTest {
HETEROGENEOUS,
Map.of()
);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(
metadataImage
);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
@ -321,18 +299,10 @@ public class SimpleAssignorTest {
@Test
public void testAssignWithOneMemberNoAssignedTopicHeterogeneous() {
Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
topicMetadata.put(TOPIC_1_UUID, new TopicMetadata(
TOPIC_1_UUID,
TOPIC_1_NAME,
3
));
topicMetadata.put(TOPIC_2_UUID, new TopicMetadata(
TOPIC_2_UUID,
"topic2",
2
));
MetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(TOPIC_1_UUID, TOPIC_1_NAME, 3)
.addTopic(TOPIC_2_UUID, TOPIC_2_NAME, 2)
.build();
Set<Uuid> memberATopicsSubscription = new LinkedHashSet<>();
memberATopicsSubscription.add(TOPIC_1_UUID);
@ -357,7 +327,9 @@ public class SimpleAssignorTest {
HETEROGENEOUS,
Map.of()
);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(
metadataImage
);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
@ -495,17 +467,10 @@ public class SimpleAssignorTest {
@Test
public void testAssignWithCurrentAssignmentHomogeneous() {
// Current assignment setup - Two members A, B subscribing to T1 and T2.
Map<Uuid, TopicMetadata> topicMetadata1 = new HashMap<>();
topicMetadata1.put(TOPIC_1_UUID, new TopicMetadata(
TOPIC_1_UUID,
TOPIC_1_NAME,
3
));
topicMetadata1.put(TOPIC_2_UUID, new TopicMetadata(
TOPIC_2_UUID,
TOPIC_2_NAME,
2
));
MetadataImage metadataImage1 = new MetadataImageBuilder()
.addTopic(TOPIC_1_UUID, TOPIC_1_NAME, 3)
.addTopic(TOPIC_2_UUID, TOPIC_2_NAME, 2)
.build();
Map<String, MemberSubscriptionAndAssignmentImpl> members1 = new HashMap<>();
@ -532,7 +497,9 @@ public class SimpleAssignorTest {
HOMOGENEOUS,
Map.of()
);
SubscribedTopicDescriberImpl subscribedTopicMetadata1 = new SubscribedTopicDescriberImpl(topicMetadata1);
SubscribedTopicDescriberImpl subscribedTopicMetadata1 = new SubscribedTopicDescriberImpl(
metadataImage1
);
GroupAssignment computedAssignment1 = assignor.assign(
groupSpec1,
@ -558,17 +525,10 @@ public class SimpleAssignorTest {
assertAssignment(expectedAssignment1, computedAssignment1);
// New assignment setup - Three members A, B, C subscribing to T2 and T3.
Map<Uuid, TopicMetadata> topicMetadata2 = new HashMap<>();
topicMetadata2.put(TOPIC_2_UUID, new TopicMetadata(
TOPIC_2_UUID,
TOPIC_2_NAME,
2
));
topicMetadata2.put(TOPIC_3_UUID, new TopicMetadata(
TOPIC_3_UUID,
TOPIC_3_NAME,
3
));
MetadataImage metadataImage2 = new MetadataImageBuilder()
.addTopic(TOPIC_2_UUID, TOPIC_2_NAME, 2)
.addTopic(TOPIC_3_UUID, TOPIC_3_NAME, 3)
.build();
Map<String, MemberSubscriptionAndAssignmentImpl> members2 = new HashMap<>();
@ -607,7 +567,9 @@ public class SimpleAssignorTest {
HOMOGENEOUS,
Map.of()
);
SubscribedTopicDescriberImpl subscribedTopicMetadata2 = new SubscribedTopicDescriberImpl(topicMetadata2);
SubscribedTopicDescriberImpl subscribedTopicMetadata2 = new SubscribedTopicDescriberImpl(
metadataImage2
);
GroupAssignment computedAssignment2 = assignor.assign(
groupSpec2,
@ -639,23 +601,11 @@ public class SimpleAssignorTest {
@Test
public void testAssignWithCurrentAssignmentHeterogeneous() {
// Current assignment setup - 3 members A - {T1, T2}, B - {T3}, C - {T2, T3}.
Map<Uuid, TopicMetadata> topicMetadata1 = new HashMap<>();
topicMetadata1.put(TOPIC_1_UUID, new TopicMetadata(
TOPIC_1_UUID,
TOPIC_1_NAME,
3
));
topicMetadata1.put(TOPIC_2_UUID, new TopicMetadata(
TOPIC_2_UUID,
TOPIC_2_NAME,
3
));
topicMetadata1.put(TOPIC_3_UUID, new TopicMetadata(
TOPIC_3_UUID,
TOPIC_3_NAME,
2
));
MetadataImage metadataImage1 = new MetadataImageBuilder()
.addTopic(TOPIC_1_UUID, TOPIC_1_NAME, 3)
.addTopic(TOPIC_2_UUID, TOPIC_2_NAME, 3)
.addTopic(TOPIC_3_UUID, TOPIC_3_NAME, 2)
.build();
Set<Uuid> memberATopicsSubscription1 = new LinkedHashSet<>();
memberATopicsSubscription1.add(TOPIC_1_UUID);
@ -691,7 +641,9 @@ public class SimpleAssignorTest {
HETEROGENEOUS,
Map.of()
);
SubscribedTopicDescriberImpl subscribedTopicMetadata1 = new SubscribedTopicDescriberImpl(topicMetadata1);
SubscribedTopicDescriberImpl subscribedTopicMetadata1 = new SubscribedTopicDescriberImpl(
metadataImage1
);
GroupAssignment computedAssignment1 = assignor.assign(
groupSpec1,
@ -720,27 +672,12 @@ public class SimpleAssignorTest {
// New assignment setup - 2 members A - {T1, T2, T3}, B - {T3, T4}.
Map<Uuid, TopicMetadata> topicMetadata2 = new HashMap<>();
topicMetadata2.put(TOPIC_1_UUID, new TopicMetadata(
TOPIC_1_UUID,
TOPIC_1_NAME,
3
));
topicMetadata2.put(TOPIC_2_UUID, new TopicMetadata(
TOPIC_2_UUID,
TOPIC_2_NAME,
3
));
topicMetadata2.put(TOPIC_3_UUID, new TopicMetadata(
TOPIC_3_UUID,
TOPIC_3_NAME,
2
));
topicMetadata2.put(TOPIC_4_UUID, new TopicMetadata(
TOPIC_4_UUID,
TOPIC_4_NAME,
1
));
MetadataImage metadataImage2 = new MetadataImageBuilder()
.addTopic(TOPIC_1_UUID, TOPIC_1_NAME, 3)
.addTopic(TOPIC_2_UUID, TOPIC_2_NAME, 3)
.addTopic(TOPIC_3_UUID, TOPIC_3_NAME, 2)
.addTopic(TOPIC_4_UUID, TOPIC_4_NAME, 1)
.build();
Map<String, MemberSubscriptionAndAssignmentImpl> members2 = new HashMap<>();
@ -776,7 +713,9 @@ public class SimpleAssignorTest {
Map.of()
);
SubscribedTopicDescriberImpl subscribedTopicMetadata2 = new SubscribedTopicDescriberImpl(topicMetadata2);
SubscribedTopicDescriberImpl subscribedTopicMetadata2 = new SubscribedTopicDescriberImpl(
metadataImage2
);
GroupAssignment computedAssignment2 = assignor.assign(
groupSpec2,

View File

@ -17,6 +17,7 @@
package org.apache.kafka.coordinator.group.assignor;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.MetadataImageBuilder;
import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.api.assignor.GroupSpec;
import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException;
@ -25,7 +26,7 @@ import org.apache.kafka.coordinator.group.modern.Assignment;
import org.apache.kafka.coordinator.group.modern.GroupSpecImpl;
import org.apache.kafka.coordinator.group.modern.MemberSubscriptionAndAssignmentImpl;
import org.apache.kafka.coordinator.group.modern.SubscribedTopicDescriberImpl;
import org.apache.kafka.coordinator.group.modern.TopicMetadata;
import org.apache.kafka.image.MetadataImage;
import org.junit.jupiter.api.Test;
@ -89,15 +90,11 @@ public class UniformHeterogeneousAssignmentBuilderTest {
@Test
public void testTwoMembersNoTopicSubscription() {
MetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(topic1Uuid, topic1Name, 3)
.build();
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(
Map.of(
topic1Uuid,
new TopicMetadata(
topic1Uuid,
topic1Name,
3
)
)
metadataImage
);
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
@ -130,15 +127,11 @@ public class UniformHeterogeneousAssignmentBuilderTest {
@Test
public void testTwoMembersSubscribedToNonexistentTopics() {
MetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(topic1Uuid, topic1Name, 3)
.build();
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(
Map.of(
topic1Uuid,
new TopicMetadata(
topic1Uuid,
topic1Name,
3
)
)
metadataImage
);
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
@ -168,17 +161,10 @@ public class UniformHeterogeneousAssignmentBuilderTest {
@Test
public void testFirstAssignmentTwoMembersTwoTopics() {
Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
3
));
topicMetadata.put(topic3Uuid, new TopicMetadata(
topic3Uuid,
topic3Name,
6
));
MetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(topic1Uuid, topic1Name, 3)
.addTopic(topic3Uuid, topic3Name, 6)
.build();
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
@ -201,7 +187,9 @@ public class UniformHeterogeneousAssignmentBuilderTest {
HETEROGENEOUS,
Map.of()
);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(
metadataImage
);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
@ -222,17 +210,10 @@ public class UniformHeterogeneousAssignmentBuilderTest {
@Test
public void testFirstAssignmentNumMembersGreaterThanTotalNumPartitions() {
Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
topicMetadata.put(topic3Uuid, new TopicMetadata(
topic3Uuid,
topic3Name,
1
));
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
2
));
MetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(topic1Uuid, topic1Name, 2)
.addTopic(topic3Uuid, topic3Name, 1)
.build();
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
@ -262,7 +243,9 @@ public class UniformHeterogeneousAssignmentBuilderTest {
HETEROGENEOUS,
Map.of()
);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(
metadataImage
);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
@ -286,22 +269,11 @@ public class UniformHeterogeneousAssignmentBuilderTest {
@Test
public void testReassignmentForTwoMembersThreeTopicsGivenUnbalancedPrevAssignment() {
Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
6
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
4
));
topicMetadata.put(topic3Uuid, new TopicMetadata(
topic3Uuid,
topic3Name,
4
));
MetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(topic1Uuid, topic1Name, 6)
.addTopic(topic2Uuid, topic2Name, 4)
.addTopic(topic3Uuid, topic3Name, 4)
.build();
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
@ -340,7 +312,9 @@ public class UniformHeterogeneousAssignmentBuilderTest {
HETEROGENEOUS,
invertedTargetAssignment(members)
);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(
metadataImage
);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
@ -366,27 +340,14 @@ public class UniformHeterogeneousAssignmentBuilderTest {
@Test
public void testReassignmentWhenPartitionsAreAddedForTwoMembers() {
// Simulating adding partitions to T1, T2, T3 - originally T1 -> 4, T2 -> 3, T3 -> 2, T4 -> 3
Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
6
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
5
));
topicMetadata.put(topic3Uuid, new TopicMetadata(
topic3Uuid,
topic3Name,
3
));
topicMetadata.put(topic4Uuid, new TopicMetadata(
topic4Uuid,
topic4Name,
3
));
MetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(topic1Uuid, topic1Name, 6)
.addTopic(topic2Uuid, topic2Name, 5)
.addTopic(topic3Uuid, topic3Name, 3)
.addTopic(topic4Uuid, topic4Name, 3)
.build();
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
@ -415,7 +376,9 @@ public class UniformHeterogeneousAssignmentBuilderTest {
HETEROGENEOUS,
invertedTargetAssignment(members)
);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(
metadataImage
);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
@ -438,17 +401,10 @@ public class UniformHeterogeneousAssignmentBuilderTest {
@Test
public void testReassignmentWhenOneMemberAddedAndPartitionsAddedTwoMembersTwoTopics() {
// Initially T1 -> 3, T2 -> 3 partitions.
Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
6
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
7
));
MetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(topic1Uuid, topic1Name, 6)
.addTopic(topic2Uuid, topic2Name, 7)
.build();
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
@ -485,7 +441,9 @@ public class UniformHeterogeneousAssignmentBuilderTest {
HETEROGENEOUS,
invertedTargetAssignment(members)
);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(
metadataImage
);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
@ -509,22 +467,11 @@ public class UniformHeterogeneousAssignmentBuilderTest {
@Test
public void testReassignmentWhenOneMemberRemovedAfterInitialAssignmentWithThreeMembersThreeTopics() {
Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
3
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
8
));
topicMetadata.put(topic3Uuid, new TopicMetadata(
topic3Uuid,
topic3Name,
3
));
MetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(topic1Uuid, topic1Name, 3)
.addTopic(topic2Uuid, topic2Name, 8)
.addTopic(topic3Uuid, topic3Name, 3)
.build();
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
@ -554,7 +501,9 @@ public class UniformHeterogeneousAssignmentBuilderTest {
HETEROGENEOUS,
invertedTargetAssignment(members)
);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(
metadataImage
);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
@ -575,17 +524,10 @@ public class UniformHeterogeneousAssignmentBuilderTest {
@Test
public void testReassignmentWhenOneSubscriptionRemovedAfterInitialAssignmentWithTwoMembersTwoTopics() {
Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
3
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
5
));
MetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(topic1Uuid, topic1Name, 3)
.addTopic(topic2Uuid, topic2Name, 5)
.build();
// Initial subscriptions were [T1, T2]
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
@ -615,7 +557,9 @@ public class UniformHeterogeneousAssignmentBuilderTest {
HETEROGENEOUS,
invertedTargetAssignment(members)
);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(
metadataImage
);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
@ -641,22 +585,11 @@ public class UniformHeterogeneousAssignmentBuilderTest {
*/
@Test
public void testReassignmentWhenTopicPartitionsRunOutAndMembersHaveNoPartitions() {
Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
2
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
2
));
topicMetadata.put(topic3Uuid, new TopicMetadata(
topic3Uuid,
topic3Name,
2
));
MetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(topic1Uuid, topic1Name, 2)
.addTopic(topic2Uuid, topic2Name, 2)
.addTopic(topic3Uuid, topic3Name, 2)
.build();
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
@ -683,7 +616,9 @@ public class UniformHeterogeneousAssignmentBuilderTest {
HETEROGENEOUS,
invertedTargetAssignment(members)
);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(
metadataImage
);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
@ -705,12 +640,9 @@ public class UniformHeterogeneousAssignmentBuilderTest {
@Test
public void testFirstAssignmentWithTwoMembersIncludingOneWithoutSubscriptions() {
Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
3
));
MetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(topic1Uuid, topic1Name, 3)
.build();
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
@ -733,7 +665,9 @@ public class UniformHeterogeneousAssignmentBuilderTest {
HETEROGENEOUS,
Map.of()
);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(
metadataImage
);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,

View File

@ -17,12 +17,14 @@
package org.apache.kafka.coordinator.group.modern;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.MetadataImageBuilder;
import org.apache.kafka.image.MetadataImage;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -31,80 +33,98 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
public class SubscribedTopicMetadataTest {
private Map<Uuid, TopicMetadata> topicMetadataMap;
private SubscribedTopicDescriberImpl subscribedTopicMetadata;
private MetadataImage metadataImage;
private final int numPartitions = 5;
@BeforeEach
public void setUp() {
topicMetadataMap = new HashMap<>();
MetadataImageBuilder metadataImageBuilder = new MetadataImageBuilder();
for (int i = 0; i < 5; i++) {
Uuid topicId = Uuid.randomUuid();
String topicName = "topic" + i;
topicMetadataMap.put(
topicId,
new TopicMetadata(topicId, topicName, 5)
);
metadataImageBuilder.addTopic(topicId, topicName, numPartitions);
}
subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadataMap);
metadataImage = metadataImageBuilder.addRacks().build();
subscribedTopicMetadata = new SubscribedTopicDescriberImpl(metadataImage);
}
@Test
public void testAttribute() {
assertEquals(topicMetadataMap, subscribedTopicMetadata.topicMetadata());
}
@Test
public void testTopicMetadataCannotBeNull() {
public void testMetadataImageCannotBeNull() {
assertThrows(NullPointerException.class, () -> new SubscribedTopicDescriberImpl(null));
}
@Test
public void testTopicPartitionAllowedMapCannotBeNull() {
assertThrows(NullPointerException.class, () -> new SubscribedTopicDescriberImpl(metadataImage, null));
}
@Test
public void testNumberOfPartitions() {
Uuid topicId = Uuid.randomUuid();
// Test -1 is returned when the topic Id doesn't exist.
// Test -1 is returned when the topic ID doesn't exist.
assertEquals(-1, subscribedTopicMetadata.numPartitions(topicId));
topicMetadataMap.put(topicId, new TopicMetadata(topicId, "topic6", 3));
// Test that the correct number of partitions are returned for a given topic ID.
metadataImage.topics().topicsById().forEach((id, name) ->
// Test that the correct number of partitions are returned for a given topic ID.
assertEquals(numPartitions, subscribedTopicMetadata.numPartitions(id))
);
}
// Test that the correct number of partitions are returned for a given topic Id.
assertEquals(3, subscribedTopicMetadata.numPartitions(topicId));
@Test
public void testRacksForPartition() {
Uuid topicId = Uuid.randomUuid();
// Test empty set is returned when the topic ID doesn't exist.
assertEquals(Set.of(), subscribedTopicMetadata.racksForPartition(topicId, 0));
metadataImage.topics().topicsById().forEach((id, name) -> {
// Test empty set is returned when the partition ID doesn't exist.
assertEquals(Set.of(), subscribedTopicMetadata.racksForPartition(id, 10));
// Test that the correct racks of partition are returned for a given topic ID.
assertEquals(Set.of("rack0", "rack1"), subscribedTopicMetadata.racksForPartition(id, 0));
});
}
@Test
public void testEquals() {
assertEquals(new SubscribedTopicDescriberImpl(topicMetadataMap), subscribedTopicMetadata);
assertEquals(new SubscribedTopicDescriberImpl(metadataImage), subscribedTopicMetadata);
Map<Uuid, TopicMetadata> topicMetadataMap2 = new HashMap<>();
Uuid topicId = Uuid.randomUuid();
topicMetadataMap2.put(topicId, new TopicMetadata(topicId, "newTopic", 5));
assertNotEquals(new SubscribedTopicDescriberImpl(topicMetadataMap2), subscribedTopicMetadata);
MetadataImage metadataImage2 = new MetadataImageBuilder()
.addTopic(topicId, "newTopic", 5)
.addRacks()
.build();
assertNotEquals(new SubscribedTopicDescriberImpl(metadataImage2), subscribedTopicMetadata);
}
@Test
public void testAssignablePartitions() {
// null allow map (all partitions assignable)
subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadataMap, null);
String t1Name = "t1";
Uuid t1Id = Uuid.randomUuid();
topicMetadataMap.put(t1Id, new TopicMetadata(t1Id, t1Name, 5));
metadataImage = new MetadataImageBuilder().addTopic(t1Id, t1Name, numPartitions).build();
// Optional.empty() allow map (all partitions assignable)
subscribedTopicMetadata = new SubscribedTopicDescriberImpl(metadataImage, Optional.empty());
assertEquals(Set.of(0, 1, 2, 3, 4), subscribedTopicMetadata.assignablePartitions(t1Id));
// empty allow map (nothing assignable)
subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadataMap, Map.of());
subscribedTopicMetadata = new SubscribedTopicDescriberImpl(metadataImage, Optional.of(Map.of()));
assertEquals(Set.of(), subscribedTopicMetadata.assignablePartitions(t1Id));
// few assignable partitions
subscribedTopicMetadata = new SubscribedTopicDescriberImpl(
topicMetadataMap,
Map.of(t1Id, Set.of(0, 5))
metadataImage,
Optional.of(Map.of(t1Id, Set.of(0, 5)))
);
assertEquals(Set.of(0, 5), subscribedTopicMetadata.assignablePartitions(t1Id));
// all assignable partitions
subscribedTopicMetadata = new SubscribedTopicDescriberImpl(
topicMetadataMap,
Map.of(t1Id, Set.of(0, 1, 2, 3, 4))
metadataImage,
Optional.of(Map.of(t1Id, Set.of(0, 1, 2, 3, 4)))
);
assertEquals(Set.of(0, 1, 2, 3, 4), subscribedTopicMetadata.assignablePartitions(t1Id));
}

View File

@ -25,7 +25,7 @@ import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignor;
import org.apache.kafka.coordinator.group.api.assignor.SubscriptionType;
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember;
import org.apache.kafka.coordinator.group.modern.consumer.ResolvedRegularExpression;
import org.apache.kafka.image.TopicsImage;
import org.apache.kafka.image.MetadataImage;
import org.junit.jupiter.api.Test;
@ -57,13 +57,12 @@ public class TargetAssignmentBuilderTest {
private final int groupEpoch;
private final PartitionAssignor assignor = mock(PartitionAssignor.class);
private final Map<String, ConsumerGroupMember> members = new HashMap<>();
private final Map<String, TopicMetadata> subscriptionMetadata = new HashMap<>();
private final Map<String, ConsumerGroupMember> updatedMembers = new HashMap<>();
private final Map<String, Assignment> targetAssignment = new HashMap<>();
private final Map<String, MemberAssignment> memberAssignments = new HashMap<>();
private final Map<String, String> staticMembers = new HashMap<>();
private final Map<String, ResolvedRegularExpression> resolvedRegularExpressions = new HashMap<>();
private MetadataImageBuilder topicsImageBuilder = new MetadataImageBuilder();
private MetadataImageBuilder metadataImageBuilder = new MetadataImageBuilder();
public TargetAssignmentBuilderTestContext(
String groupId,
@ -123,12 +122,7 @@ public class TargetAssignmentBuilderTest {
int numPartitions
) {
Uuid topicId = Uuid.randomUuid();
subscriptionMetadata.put(topicName, new TopicMetadata(
topicId,
topicName,
numPartitions
));
topicsImageBuilder = topicsImageBuilder.addTopic(topicId, topicName, numPartitions);
metadataImageBuilder = metadataImageBuilder.addTopic(topicId, topicName, numPartitions);
return topicId;
}
@ -218,8 +212,8 @@ public class TargetAssignmentBuilderTest {
}
public TargetAssignmentBuilder.TargetAssignmentResult build() {
TopicsImage topicsImage = topicsImageBuilder.build().topics();
TopicIds.TopicResolver topicResolver = new TopicIds.CachedTopicResolver(topicsImage);
MetadataImage metadataImage = metadataImageBuilder.build();
TopicIds.TopicResolver topicResolver = new TopicIds.CachedTopicResolver(metadataImage.topics());
// Prepare expected member specs.
Map<String, MemberSubscriptionAndAssignmentImpl> memberSubscriptions = new HashMap<>();
@ -256,13 +250,8 @@ public class TargetAssignmentBuilderTest {
}
});
// Prepare the expected topic metadata.
Map<Uuid, TopicMetadata> topicMetadataMap = new HashMap<>();
subscriptionMetadata.forEach((topicName, topicMetadata) ->
topicMetadataMap.put(topicMetadata.id(), topicMetadata));
// Prepare the expected subscription topic metadata.
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadataMap);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(metadataImage);
SubscriptionType subscriptionType = HOMOGENEOUS;
// Prepare the member assignments per topic partition.
@ -286,11 +275,10 @@ public class TargetAssignmentBuilderTest {
new TargetAssignmentBuilder.ConsumerTargetAssignmentBuilder(groupId, groupEpoch, assignor)
.withMembers(members)
.withStaticMembers(staticMembers)
.withSubscriptionMetadata(subscriptionMetadata)
.withSubscriptionType(subscriptionType)
.withTargetAssignment(targetAssignment)
.withInvertedTargetAssignment(invertedTargetAssignment)
.withTopicsImage(topicsImage)
.withMetadataImage(metadataImage)
.withResolvedRegularExpressions(resolvedRegularExpressions);
// Add the updated members or delete the deleted members.

View File

@ -27,12 +27,10 @@ import org.apache.kafka.coordinator.group.modern.Assignment;
import org.apache.kafka.coordinator.group.modern.GroupSpecImpl;
import org.apache.kafka.coordinator.group.modern.MemberSubscriptionAndAssignmentImpl;
import org.apache.kafka.coordinator.group.modern.TopicIds;
import org.apache.kafka.coordinator.group.modern.TopicMetadata;
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.image.TopicsImage;
import java.util.ArrayList;
import java.util.Arrays;
@ -89,69 +87,29 @@ public class AssignorBenchmarkUtils {
}
/**
* Creates a subscription metadata map for the given topics.
* Creates a TopicsImage from the given topic names.
*
* @param topicNames The names of the topics.
* @param partitionsPerTopic The number of partitions per topic.
* @return The subscription metadata map.
*/
public static Map<String, TopicMetadata> createSubscriptionMetadata(
List<String> topicNames,
int partitionsPerTopic
) {
Map<String, TopicMetadata> subscriptionMetadata = new HashMap<>();
for (String topicName : topicNames) {
Uuid topicId = Uuid.randomUuid();
TopicMetadata metadata = new TopicMetadata(
topicId,
topicName,
partitionsPerTopic
);
subscriptionMetadata.put(topicName, metadata);
}
return subscriptionMetadata;
}
/**
* Creates a topic metadata map from the given subscription metadata.
*
* @param subscriptionMetadata The subscription metadata.
* @return The topic metadata map.
*/
public static Map<Uuid, TopicMetadata> createTopicMetadata(
Map<String, TopicMetadata> subscriptionMetadata
) {
Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>((int) (subscriptionMetadata.size() / 0.75f + 1));
for (Map.Entry<String, TopicMetadata> entry : subscriptionMetadata.entrySet()) {
topicMetadata.put(entry.getValue().id(), entry.getValue());
}
return topicMetadata;
}
/**
* Creates a TopicsImage from the given subscription metadata.
*
* @param subscriptionMetadata The subscription metadata.
* @param allTopicNames The topic names.
* @param partitionsPerTopic Number of partitions per topic.
* @return A TopicsImage containing the topic ids, names and partition counts from the
* subscription metadata.
*/
public static TopicsImage createTopicsImage(Map<String, TopicMetadata> subscriptionMetadata) {
public static MetadataImage createMetadataImage(
List<String> allTopicNames,
int partitionsPerTopic
) {
MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
for (Map.Entry<String, TopicMetadata> entry : subscriptionMetadata.entrySet()) {
TopicMetadata topicMetadata = entry.getValue();
for (String topicName : allTopicNames) {
AssignorBenchmarkUtils.addTopic(
delta,
topicMetadata.id(),
topicMetadata.name(),
topicMetadata.numPartitions()
Uuid.randomUuid(),
topicName,
partitionsPerTopic
);
}
return delta.apply(MetadataProvenance.EMPTY).topics();
return delta.apply(MetadataProvenance.EMPTY);
}
/**

View File

@ -31,9 +31,8 @@ import org.apache.kafka.coordinator.group.modern.MemberAssignmentImpl;
import org.apache.kafka.coordinator.group.modern.MemberSubscriptionAndAssignmentImpl;
import org.apache.kafka.coordinator.group.modern.SubscribedTopicDescriberImpl;
import org.apache.kafka.coordinator.group.modern.TopicIds;
import org.apache.kafka.coordinator.group.modern.TopicMetadata;
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember;
import org.apache.kafka.image.TopicsImage;
import org.apache.kafka.image.MetadataImage;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
@ -125,9 +124,7 @@ public class ServerSideAssignorBenchmark {
private List<String> allTopicNames = Collections.emptyList();
private Map<String, TopicMetadata> subscriptionMetadata = Collections.emptyMap();
private TopicsImage topicsImage = TopicsImage.EMPTY;
private MetadataImage metadataImage = MetadataImage.EMPTY;
private TopicIds.TopicResolver topicResolver;
@ -151,16 +148,11 @@ public class ServerSideAssignorBenchmark {
allTopicNames = AssignorBenchmarkUtils.createTopicNames(topicCount);
int partitionsPerTopic = (memberCount * partitionsToMemberRatio) / topicCount;
subscriptionMetadata = AssignorBenchmarkUtils.createSubscriptionMetadata(
allTopicNames,
partitionsPerTopic
);
topicsImage = AssignorBenchmarkUtils.createTopicsImage(subscriptionMetadata);
topicResolver = new TopicIds.CachedTopicResolver(topicsImage);
metadataImage = AssignorBenchmarkUtils.createMetadataImage(allTopicNames, partitionsPerTopic);
topicResolver = new TopicIds.CachedTopicResolver(metadataImage.topics());
Map<Uuid, TopicMetadata> topicMetadata = AssignorBenchmarkUtils.createTopicMetadata(subscriptionMetadata);
subscribedTopicDescriber = new SubscribedTopicDescriberImpl(topicMetadata);
subscribedTopicDescriber = new SubscribedTopicDescriberImpl(metadataImage);
}
private Map<String, ConsumerGroupMember> createMembers() {

View File

@ -28,9 +28,8 @@ import org.apache.kafka.coordinator.group.modern.Assignment;
import org.apache.kafka.coordinator.group.modern.SubscribedTopicDescriberImpl;
import org.apache.kafka.coordinator.group.modern.TargetAssignmentBuilder;
import org.apache.kafka.coordinator.group.modern.TopicIds;
import org.apache.kafka.coordinator.group.modern.TopicMetadata;
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember;
import org.apache.kafka.image.TopicsImage;
import org.apache.kafka.image.MetadataImage;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
@ -93,9 +92,7 @@ public class TargetAssignmentBuilderBenchmark {
private List<String> allTopicNames = Collections.emptyList();
private Map<String, TopicMetadata> subscriptionMetadata = Collections.emptyMap();
private TopicsImage topicsImage;
private MetadataImage metadataImage;
private TopicIds.TopicResolver topicResolver;
@ -118,11 +115,10 @@ public class TargetAssignmentBuilderBenchmark {
targetAssignmentBuilder = new TargetAssignmentBuilder.ConsumerTargetAssignmentBuilder(GROUP_ID, GROUP_EPOCH, partitionAssignor)
.withMembers(members)
.withSubscriptionMetadata(subscriptionMetadata)
.withSubscriptionType(subscriptionType)
.withTargetAssignment(existingTargetAssignment)
.withInvertedTargetAssignment(invertedTargetAssignment)
.withTopicsImage(topicsImage)
.withMetadataImage(metadataImage)
.addOrUpdateMember(newMember.memberId(), newMember);
}
@ -130,16 +126,11 @@ public class TargetAssignmentBuilderBenchmark {
allTopicNames = AssignorBenchmarkUtils.createTopicNames(topicCount);
int partitionsPerTopic = (memberCount * partitionsToMemberRatio) / topicCount;
subscriptionMetadata = AssignorBenchmarkUtils.createSubscriptionMetadata(
allTopicNames,
partitionsPerTopic
);
topicsImage = AssignorBenchmarkUtils.createTopicsImage(subscriptionMetadata);
topicResolver = new TopicIds.CachedTopicResolver(topicsImage);
metadataImage = AssignorBenchmarkUtils.createMetadataImage(allTopicNames, partitionsPerTopic);
topicResolver = new TopicIds.CachedTopicResolver(metadataImage.topics());
Map<Uuid, TopicMetadata> topicMetadata = AssignorBenchmarkUtils.createTopicMetadata(subscriptionMetadata);
subscribedTopicDescriber = new SubscribedTopicDescriberImpl(topicMetadata);
subscribedTopicDescriber = new SubscribedTopicDescriberImpl(metadataImage);
}
private Map<String, Assignment> generateMockInitialTargetAssignmentAndUpdateInvertedTargetAssignment(