KAFKA-14702: Extend server side assignor to support rack aware replica placement (#14099)

This patch updates the `PartitionAssignor` interface to support rack-awareness. The change introduces the `SubscribedTopicDescriber` interface that can be used to retrieve topic metadata such as the number of partitions or the racks from within an assignor. We use an interface because it allows us to wrap internal data structures instead of having to copy them. It is more efficient.

Reviewers: David Jacot <djacot@confluent.io>
This commit is contained in:
Ritika Reddy 2023-07-28 10:30:04 -07:00 committed by GitHub
parent 32c39c8149
commit 3709901c9e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 959 additions and 361 deletions

View File

@ -792,7 +792,8 @@ public class GroupMetadataManager {
subscriptionMetadata = group.computeSubscriptionMetadata(
member,
updatedMember,
metadataImage.topics()
metadataImage.topics(),
metadataImage.cluster()
);
if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
@ -942,7 +943,8 @@ public class GroupMetadataManager {
Map<String, TopicMetadata> subscriptionMetadata = group.computeSubscriptionMetadata(
member,
null,
metadataImage.topics()
metadataImage.topics(),
metadataImage.cluster()
);
if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {

View File

@ -131,13 +131,24 @@ public class RecordHelpers {
Map<String, TopicMetadata> newSubscriptionMetadata
) {
ConsumerGroupPartitionMetadataValue value = new ConsumerGroupPartitionMetadataValue();
newSubscriptionMetadata.forEach((topicName, topicMetadata) ->
newSubscriptionMetadata.forEach((topicName, topicMetadata) -> {
List<ConsumerGroupPartitionMetadataValue.PartitionMetadata> partitionMetadata = new ArrayList<>();
// If the partition rack information map is empty, store an empty list in the record.
if (!topicMetadata.partitionRacks().isEmpty()) {
topicMetadata.partitionRacks().forEach((partition, racks) ->
partitionMetadata.add(new ConsumerGroupPartitionMetadataValue.PartitionMetadata()
.setPartition(partition)
.setRacks(new ArrayList<>(racks))
)
);
}
value.topics().add(new ConsumerGroupPartitionMetadataValue.TopicMetadata()
.setTopicId(topicMetadata.id())
.setTopicName(topicMetadata.name())
.setNumPartitions(topicMetadata.numPartitions())
)
);
.setPartitionMetadata(partitionMetadata)
);
});
return new Record(
new ApiMessageAndVersion(

View File

@ -16,8 +16,6 @@
*/
package org.apache.kafka.coordinator.group.assignor;
import org.apache.kafka.common.Uuid;
import java.util.Map;
import java.util.Objects;
@ -26,59 +24,39 @@ import java.util.Objects;
*/
public class AssignmentSpec {
/**
* The members keyed by member id.
* The member metadata keyed by member Id.
*/
private final Map<String, AssignmentMemberSpec> members;
/**
* The topics' metadata keyed by topic id.
*/
private final Map<Uuid, AssignmentTopicMetadata> topics;
public AssignmentSpec(
Map<String, AssignmentMemberSpec> members,
Map<Uuid, AssignmentTopicMetadata> topics
Map<String, AssignmentMemberSpec> members
) {
Objects.requireNonNull(members);
Objects.requireNonNull(topics);
this.members = members;
this.topics = topics;
}
/**
* @return Member metadata keyed by member Ids.
* @return Member metadata keyed by member Id.
*/
public Map<String, AssignmentMemberSpec> members() {
return members;
}
/**
* @return Topic metadata keyed by topic Ids.
*/
public Map<Uuid, AssignmentTopicMetadata> topics() {
return topics;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AssignmentSpec that = (AssignmentSpec) o;
if (!members.equals(that.members)) return false;
return topics.equals(that.topics);
return members.equals(that.members);
}
@Override
public int hashCode() {
int result = members.hashCode();
result = 31 * result + topics.hashCode();
return result;
return members.hashCode();
}
@Override
public String toString() {
return "AssignmentSpec(members=" + members +
", topics=" + topics +
')';
return "AssignmentSpec(members=" + members + ')';
}
}

View File

@ -1,59 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.assignor;
/**
* Metadata of a topic.
*/
public class AssignmentTopicMetadata {
/**
* The number of partitions.
*/
private final int numPartitions;
public AssignmentTopicMetadata(
int numPartitions
) {
this.numPartitions = numPartitions;
}
/**
* @return The number of partitions present for the topic.
*/
public int numPartitions() {
return numPartitions;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AssignmentTopicMetadata that = (AssignmentTopicMetadata) o;
return numPartitions == that.numPartitions;
}
@Override
public int hashCode() {
return numPartitions;
}
@Override
public String toString() {
return "AssignmentTopicMetadata(numPartitions=" + numPartitions + ')';
}
}

View File

@ -26,18 +26,17 @@ import org.apache.kafka.common.annotation.InterfaceStability;
*/
@InterfaceStability.Unstable
public interface PartitionAssignor {
/**
* Unique name for this assignor.
*/
String name();
/**
* Perform the group assignment given the current members and
* topic metadata.
* Assigns partitions to group members based on the given assignment specification and topic metadata.
*
* @param assignmentSpec The assignment spec.
* @param assignmentSpec The assignment spec which includes member metadata.
* @param subscribedTopicDescriber The topic and partition metadata describer.
* @return The new assignment for the group.
*/
GroupAssignment assign(AssignmentSpec assignmentSpec) throws PartitionAssignorException;
GroupAssignment assign(AssignmentSpec assignmentSpec, SubscribedTopicDescriber subscribedTopicDescriber) throws PartitionAssignorException;
}

View File

@ -33,8 +33,9 @@ import static java.lang.Math.min;
* This Range Assignor inherits properties of both the range assignor and the sticky assignor.
* The properties are as follows:
* <ol>
* <li> Each member must get at least one partition from every topic that it is subscribed to. The only exception is when
* the number of subscribed members is greater than the number of partitions for that topic. (Range) </li>
* <li> Each member must get at least one partition from every topic that it is subscribed to.
* The only exception is when the number of subscribed members is greater than the
* number of partitions for that topic. (Range) </li>
* <li> Partitions should be assigned to members in a way that facilitates the join operation when required. (Range)
* This can only be done if every member is subscribed to the same topics and the topics are co-partitioned.
* Two streams are co-partitioned if the following conditions are met:
@ -76,25 +77,28 @@ public class RangeAssignor implements PartitionAssignor {
}
/**
* @return Map of topic ids to a list of members subscribed to them.
* Returns a map of topic Ids to a list of members subscribed to them,
* based on the given assignment specification and metadata.
*
* @param assignmentSpec The specification for member assignments.
* @param subscribedTopicDescriber The metadata describer for subscribed topics and clusters.
* @return A map of topic Ids to a list of member Ids subscribed to them.
*
* @throws PartitionAssignorException If a member is subscribed to a non-existent topic.
*/
private Map<Uuid, List<String>> membersPerTopic(final AssignmentSpec assignmentSpec) {
private Map<Uuid, List<String>> membersPerTopic(final AssignmentSpec assignmentSpec, final SubscribedTopicDescriber subscribedTopicDescriber) {
Map<Uuid, List<String>> membersPerTopic = new HashMap<>();
Map<String, AssignmentMemberSpec> membersData = assignmentSpec.members();
membersData.forEach((memberId, memberMetadata) -> {
Collection<Uuid> topics = memberMetadata.subscribedTopicIds();
for (Uuid topicId : topics) {
// Only topics that are present in both the subscribed topics list and the topic metadata should be
// considered for assignment.
if (assignmentSpec.topics().containsKey(topicId)) {
membersPerTopic
.computeIfAbsent(topicId, k -> new ArrayList<>())
.add(memberId);
} else {
throw new PartitionAssignorException("Member " + memberId + " subscribed to topic " +
topicId + " which doesn't exist in the topic metadata");
if (subscribedTopicDescriber.numPartitions(topicId) == -1) {
throw new PartitionAssignorException("Member is subscribed to a non-existent topic");
}
membersPerTopic
.computeIfAbsent(topicId, k -> new ArrayList<>())
.add(memberId);
}
});
@ -118,14 +122,14 @@ public class RangeAssignor implements PartitionAssignor {
* </ol>
*/
@Override
public GroupAssignment assign(final AssignmentSpec assignmentSpec) throws PartitionAssignorException {
public GroupAssignment assign(final AssignmentSpec assignmentSpec, final SubscribedTopicDescriber subscribedTopicDescriber) throws PartitionAssignorException {
Map<String, MemberAssignment> newAssignment = new HashMap<>();
// Step 1
Map<Uuid, List<String>> membersPerTopic = membersPerTopic(assignmentSpec);
Map<Uuid, List<String>> membersPerTopic = membersPerTopic(assignmentSpec, subscribedTopicDescriber);
membersPerTopic.forEach((topicId, membersForTopic) -> {
int numPartitionsForTopic = assignmentSpec.topics().get(topicId).numPartitions();
int numPartitionsForTopic = subscribedTopicDescriber.numPartitions(topicId);
int minRequiredQuota = numPartitionsForTopic / membersForTopic.size();
// Each member can get only ONE extra partition per topic after receiving the minimum quota.
int numMembersWithExtraPartition = numPartitionsForTopic % membersForTopic.size();

View File

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.assignor;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.Set;
/**
* The subscribed topic describer is used by the {@link PartitionAssignor}
* to obtain topic and partition metadata of the subscribed topics.
*
* The interface is kept in an internal module until KIP-848 is fully
* implemented and ready to be released.
*/
@InterfaceStability.Unstable
public interface SubscribedTopicDescriber {
/**
* The number of partitions for the given topic Id.
*
* @param topicId Uuid corresponding to the topic.
* @return The number of partitions corresponding to the given topic Id,
* or -1 if the topic Id does not exist.
*/
int numPartitions(Uuid topicId);
/**
* Returns all the available racks associated with the replicas of the given partition.
*
* @param topicId Uuid corresponding to the partition's topic.
* @param partition Partition Id within topic.
* @return The set of racks corresponding to the replicas of the topic's partition.
* If the topic Id does not exist, an empty set is returned.
*/
Set<String> racksForPartition(Uuid topicId, int partition);
}

View File

@ -21,6 +21,7 @@ import org.apache.kafka.common.errors.StaleMemberEpochException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.coordinator.group.Group;
import org.apache.kafka.image.ClusterImage;
import org.apache.kafka.image.TopicImage;
import org.apache.kafka.image.TopicsImage;
import org.apache.kafka.timeline.SnapshotRegistry;
@ -30,6 +31,7 @@ import org.apache.kafka.timeline.TimelineObject;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
@ -404,8 +406,8 @@ public class ConsumerGroup implements Group {
}
/**
* @return An immutable Map containing the subscription metadata for all the topics whose
* members are subscribed to.
* @return An immutable Map of subscription metadata for
* each topic that the consumer group is subscribed to.
*/
public Map<String, TopicMetadata> subscriptionMetadata() {
return Collections.unmodifiableMap(subscribedTopicMetadata);
@ -427,16 +429,18 @@ public class ConsumerGroup implements Group {
* Computes the subscription metadata based on the current subscription and
* an updated member.
*
* @param oldMember The old member.
* @param newMember The new member.
* @param topicsImage The topic metadata.
* @param oldMember The old member of the consumer group.
* @param newMember The updated member of the consumer group.
* @param topicsImage The current metadata for all available topics.
* @param clusterImage The current metadata for the Kafka cluster.
*
* @return The new subscription metadata as an immutable Map.
* @return An immutable map of subscription metadata for each topic that the consumer group is subscribed to.
*/
public Map<String, TopicMetadata> computeSubscriptionMetadata(
ConsumerGroupMember oldMember,
ConsumerGroupMember newMember,
TopicsImage topicsImage
TopicsImage topicsImage,
ClusterImage clusterImage
) {
// Copy and update the current subscriptions.
Map<String, Integer> subscribedTopicNames = new HashMap<>(this.subscribedTopicNames);
@ -444,14 +448,30 @@ public class ConsumerGroup implements Group {
// Create the topic metadata for each subscribed topic.
Map<String, TopicMetadata> newSubscriptionMetadata = new HashMap<>(subscribedTopicNames.size());
subscribedTopicNames.forEach((topicName, count) -> {
TopicImage topicImage = topicsImage.getTopic(topicName);
if (topicImage != null) {
Map<Integer, Set<String>> partitionRacks = new HashMap<>();
topicImage.partitions().forEach((partition, partitionRegistration) -> {
Set<String> racks = new HashSet<>();
for (int replica : partitionRegistration.replicas) {
Optional<String> rackOptional = clusterImage.broker(replica).rack();
// Only add the rack if it is available for the broker/replica.
rackOptional.ifPresent(racks::add);
}
// If rack information is unavailable for all replicas of this partition,
// no corresponding entry will be stored for it in the map.
if (!racks.isEmpty())
partitionRacks.put(partition, racks);
});
newSubscriptionMetadata.put(topicName, new TopicMetadata(
topicImage.id(),
topicImage.name(),
topicImage.partitions().size()
));
topicImage.partitions().size(),
partitionRacks)
);
}
});

View File

@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.consumer;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
/**
* The subscribed topic metadata class is used by the {@link PartitionAssignor} to obtain
* topic and partition metadata for the topics that the consumer group is subscribed to.
*/
public class SubscribedTopicMetadata implements SubscribedTopicDescriber {
/**
* The topic Ids mapped to their corresponding {@link TopicMetadata}
* object, which contains topic and partition metadata.
*/
private final Map<Uuid, TopicMetadata> topicMetadata;
public SubscribedTopicMetadata(Map<Uuid, TopicMetadata> topicMetadata) {
this.topicMetadata = Objects.requireNonNull(topicMetadata);
}
/**
* Map of topic Ids to topic metadata.
*
* @return The map of topic Ids to topic metadata.
*/
public Map<Uuid, TopicMetadata> topicMetadata() {
return this.topicMetadata;
}
/**
* The number of partitions for the given topic Id.
*
* @param topicId Uuid corresponding to the topic.
* @return The number of partitions corresponding to the given topic Id,
* or -1 if the topic Id does not exist.
*/
@Override
public int numPartitions(Uuid topicId) {
TopicMetadata topic = this.topicMetadata.get(topicId);
return topic == null ? -1 : topic.numPartitions();
}
/**
* Returns all the available racks associated with the replicas of the given partition.
*
* @param topicId Uuid corresponding to the partition's topic.
* @param partition Partition Id within the topic.
* @return The set of racks corresponding to the replicas of the topics partition.
* If the topic Id does not exist, an empty set is returned
*/
@Override
public Set<String> racksForPartition(Uuid topicId, int partition) {
TopicMetadata topic = this.topicMetadata.get(topicId);
return topic == null ? Collections.emptySet() : topic.partitionRacks().get(partition);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SubscribedTopicMetadata that = (SubscribedTopicMetadata) o;
return topicMetadata.equals(that.topicMetadata);
}
@Override
public int hashCode() {
return topicMetadata.hashCode();
}
@Override
public String toString() {
return "SubscribedTopicMetadata(" +
"topicMetadata=" + topicMetadata +
')';
}
}

View File

@ -20,7 +20,6 @@ import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.Record;
import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
import org.apache.kafka.coordinator.group.assignor.AssignmentTopicMetadata;
import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
@ -244,16 +243,19 @@ public class TargetAssignmentBuilder {
});
// Prepare the topic metadata.
Map<Uuid, AssignmentTopicMetadata> topics = new HashMap<>();
Map<Uuid, TopicMetadata> topicMetadataMap = new HashMap<>();
subscriptionMetadata.forEach((topicName, topicMetadata) ->
topics.put(topicMetadata.id(), new AssignmentTopicMetadata(topicMetadata.numPartitions()))
topicMetadataMap.put(
topicMetadata.id(),
topicMetadata
)
);
// Compute the assignment.
GroupAssignment newGroupAssignment = assignor.assign(new AssignmentSpec(
Collections.unmodifiableMap(memberSpecs),
Collections.unmodifiableMap(topics)
));
GroupAssignment newGroupAssignment = assignor.assign(
new AssignmentSpec(Collections.unmodifiableMap(memberSpecs)),
new SubscribedTopicMetadata(topicMetadataMap)
);
// Compute delta from previous to new target assignment and create the
// relevant records.

View File

@ -19,7 +19,12 @@ package org.apache.kafka.coordinator.group.consumer;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
/**
* Immutable topic metadata.
@ -40,10 +45,17 @@ public class TopicMetadata {
*/
private final int numPartitions;
/**
* Map of every partition Id to a set of its rack Ids, if they exist.
* If rack information is unavailable for all partitions, this is an empty map.
*/
private final Map<Integer, Set<String>> partitionRacks;
public TopicMetadata(
Uuid id,
String name,
int numPartitions
int numPartitions,
Map<Integer, Set<String>> partitionRacks
) {
this.id = Objects.requireNonNull(id);
if (Uuid.ZERO_UUID.equals(id)) {
@ -57,6 +69,7 @@ public class TopicMetadata {
if (numPartitions < 0) {
throw new IllegalArgumentException("Number of partitions cannot be negative.");
}
this.partitionRacks = Objects.requireNonNull(partitionRacks);
}
/**
@ -80,6 +93,14 @@ public class TopicMetadata {
return this.numPartitions;
}
/**
* @return Every partition mapped to the set of corresponding available rack Ids of its replicas.
* An empty map is returned if rack information is unavailable for all partitions.
*/
public Map<Integer, Set<String>> partitionRacks() {
return this.partitionRacks;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
@ -89,7 +110,8 @@ public class TopicMetadata {
if (!id.equals(that.id)) return false;
if (!name.equals(that.name)) return false;
return numPartitions == that.numPartitions;
if (numPartitions != that.numPartitions) return false;
return partitionRacks.equals(that.partitionRacks);
}
@Override
@ -97,6 +119,7 @@ public class TopicMetadata {
int result = id.hashCode();
result = 31 * result + name.hashCode();
result = 31 * result + numPartitions;
result = 31 * result + partitionRacks.hashCode();
return result;
}
@ -106,16 +129,26 @@ public class TopicMetadata {
"id=" + id +
", name=" + name +
", numPartitions=" + numPartitions +
", partitionRacks=" + partitionRacks +
')';
}
public static TopicMetadata fromRecord(
ConsumerGroupPartitionMetadataValue.TopicMetadata record
) {
// Converting the data type from a list stored in the record to a map for the topic metadata.
Map<Integer, Set<String>> partitionRacks = new HashMap<>();
for (ConsumerGroupPartitionMetadataValue.PartitionMetadata partitionMetadata : record.partitionMetadata()) {
partitionRacks.put(
partitionMetadata.partition(),
Collections.unmodifiableSet(new HashSet<>(partitionMetadata.racks()))
);
}
return new TopicMetadata(
record.topicId(),
record.topicName(),
record.numPartitions()
);
record.numPartitions(),
partitionRacks);
}
}

View File

@ -29,7 +29,14 @@
{ "name": "TopicName", "versions": "0+", "type": "string",
"about": "The topic name." },
{ "name": "NumPartitions", "versions": "0+", "type": "int32",
"about": "The number of partitions of the topic." }
"about": "The number of partitions of the topic." },
{ "name": "PartitionMetadata", "versions": "0+", "type": "[]PartitionMetadata",
"about": "Partitions mapped to a set of racks. If the rack information is unavailable for all the partitions, an empty list is stored", "fields": [
{ "name": "Partition", "versions": "0+", "type": "int32",
"about": "The partition number." },
{ "name": "Racks", "versions": "0+", "type": "[]string",
"about": "The set of racks that the partition is mapped to." }
]}
]}
]
}

View File

@ -42,6 +42,7 @@ import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.SyncGroupRequestData.SyncGroupRequestAssignment;
import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.metadata.RemoveTopicRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.network.ClientInformation;
@ -62,6 +63,7 @@ import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
import org.apache.kafka.coordinator.group.consumer.Assignment;
import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
@ -95,7 +97,6 @@ import org.apache.kafka.timeline.SnapshotRegistry;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.opentest4j.AssertionFailedError;
import java.net.InetAddress;
import java.util.ArrayList;
@ -126,6 +127,9 @@ import static org.apache.kafka.coordinator.group.GroupMetadataManager.consumerGr
import static org.apache.kafka.coordinator.group.GroupMetadataManager.EMPTY_RESULT;
import static org.apache.kafka.coordinator.group.GroupMetadataManager.genericGroupHeartbeatKey;
import static org.apache.kafka.coordinator.group.GroupMetadataManager.genericGroupSyncKey;
import static org.apache.kafka.coordinator.group.RecordHelpersTest.assertRecordEquals;
import static org.apache.kafka.coordinator.group.RecordHelpersTest.assertRecordsEquals;
import static org.apache.kafka.coordinator.group.RecordHelpersTest.mkMapOfPartitionRacks;
import static org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
import static org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
import static org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
@ -163,7 +167,7 @@ public class GroupMetadataManagerTest {
}
@Override
public GroupAssignment assign(AssignmentSpec assignmentSpec) throws PartitionAssignorException {
public GroupAssignment assign(AssignmentSpec assignmentSpec, SubscribedTopicDescriber subscribedTopicDescriber) throws PartitionAssignorException {
return prepareGroupAssignment;
}
}
@ -176,11 +180,28 @@ public class GroupMetadataManagerTest {
String topicName,
int numPartitions
) {
// For testing purposes, the following criteria are used:
// - Number of replicas for each partition: 2
// - Number of brokers available in the cluster: 4
delta.replay(new TopicRecord().setTopicId(topicId).setName(topicName));
for (int i = 0; i < numPartitions; i++) {
delta.replay(new PartitionRecord()
.setTopicId(topicId)
.setPartitionId(i));
.setPartitionId(i)
.setReplicas(Arrays.asList(i % 4, (i + 1) % 4)));
}
return this;
}
/**
* Add rack Ids for 4 broker Ids.
*
* For testing purposes, each broker is mapped
* to a rack Id with the same broker Id as a suffix.
*/
public MetadataImageBuilder addRacks() {
for (int i = 0; i < 4; i++) {
delta.replay(new RegisterBrokerRecord().setBrokerId(i).setRack("rack" + i));
}
return this;
}
@ -242,8 +263,8 @@ public class GroupMetadataManagerTest {
subscriptionMetadata.put(topicName, new TopicMetadata(
topicImage.id(),
topicImage.name(),
topicImage.partitions().size()
));
topicImage.partitions().size(),
Collections.emptyMap()));
}
});
});
@ -1462,6 +1483,7 @@ public class GroupMetadataManagerTest {
.withMetadataImage(new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.addTopic(barTopicId, barTopicName, 3)
.addRacks()
.build())
.build();
@ -1519,8 +1541,8 @@ public class GroupMetadataManagerTest {
List<Record> expectedRecords = Arrays.asList(
RecordHelpers.newMemberSubscriptionRecord(groupId, expectedMember),
RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() {{
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6));
put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3));
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6)));
put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3, mkMapOfPartitionRacks(3)));
}}),
RecordHelpers.newGroupEpochRecord(groupId, 1),
RecordHelpers.newTargetAssignmentRecord(groupId, memberId, mkAssignment(
@ -1551,6 +1573,7 @@ public class GroupMetadataManagerTest {
.withMetadataImage(new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.addTopic(barTopicId, barTopicName, 3)
.addRacks()
.build())
.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
.withMember(new ConsumerGroupMember.Builder(memberId)
@ -1559,7 +1582,7 @@ public class GroupMetadataManagerTest {
.setTargetMemberEpoch(10)
.setClientId("client")
.setClientHost("localhost/127.0.0.1")
.setSubscribedTopicNames(Arrays.asList("foo"))
.setSubscribedTopicNames(Collections.singletonList("foo"))
.setServerAssignorName("range")
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)))
@ -1617,8 +1640,8 @@ public class GroupMetadataManagerTest {
RecordHelpers.newMemberSubscriptionRecord(groupId, expectedMember),
RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() {
{
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6));
put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3));
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6)));
put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3, mkMapOfPartitionRacks(3)));
}
}),
RecordHelpers.newGroupEpochRecord(groupId, 11),
@ -1652,6 +1675,7 @@ public class GroupMetadataManagerTest {
.withMetadataImage(new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.addTopic(barTopicId, barTopicName, 3)
.addRacks()
.build())
.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
.withMember(new ConsumerGroupMember.Builder(memberId1)
@ -1731,7 +1755,7 @@ public class GroupMetadataManagerTest {
.setPartitions(Arrays.asList(4, 5)),
new ConsumerGroupHeartbeatResponseData.TopicPartitions()
.setTopicId(barTopicId)
.setPartitions(Arrays.asList(2))
.setPartitions(Collections.singletonList(2))
))),
result.response()
);
@ -1752,6 +1776,12 @@ public class GroupMetadataManagerTest {
List<Record> expectedRecords = Arrays.asList(
RecordHelpers.newMemberSubscriptionRecord(groupId, expectedMember3),
RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() {
{
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6)));
put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3, mkMapOfPartitionRacks(3)));
}
}),
RecordHelpers.newGroupEpochRecord(groupId, 11),
RecordHelpers.newTargetAssignmentRecord(groupId, memberId1, mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1),
@ -1769,9 +1799,9 @@ public class GroupMetadataManagerTest {
RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember3)
);
assertRecordsEquals(expectedRecords.subList(0, 2), result.records().subList(0, 2));
assertUnorderedListEquals(expectedRecords.subList(2, 5), result.records().subList(2, 5));
assertRecordsEquals(expectedRecords.subList(5, 7), result.records().subList(5, 7));
assertRecordsEquals(expectedRecords.subList(0, 3), result.records().subList(0, 3));
assertUnorderedListEquals(expectedRecords.subList(3, 6), result.records().subList(3, 6));
assertRecordsEquals(expectedRecords.subList(6, 8), result.records().subList(6, 8));
}
@Test
@ -1797,6 +1827,7 @@ public class GroupMetadataManagerTest {
.addTopic(fooTopicId, fooTopicName, 6)
.addTopic(barTopicId, barTopicName, 3)
.addTopic(zarTopicId, zarTopicName, 1)
.addRacks()
.build())
.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
.withMember(new ConsumerGroupMember.Builder(memberId1)
@ -1857,8 +1888,8 @@ public class GroupMetadataManagerTest {
// Subscription metadata is recomputed because zar is no longer there.
RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() {
{
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6));
put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3));
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6)));
put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3, mkMapOfPartitionRacks(3)));
}
}),
RecordHelpers.newGroupEpochRecord(groupId, 11)
@ -1887,6 +1918,7 @@ public class GroupMetadataManagerTest {
.withMetadataImage(new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.addTopic(barTopicId, barTopicName, 3)
.addRacks()
.build())
.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
.withMember(new ConsumerGroupMember.Builder(memberId1)
@ -2332,6 +2364,7 @@ public class GroupMetadataManagerTest {
.withAssignors(Collections.singletonList(assignor))
.withMetadataImage(new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.addRacks()
.build())
.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
.withMember(new ConsumerGroupMember.Builder(memberId1)
@ -2692,13 +2725,14 @@ public class GroupMetadataManagerTest {
PartitionAssignor assignor = mock(PartitionAssignor.class);
when(assignor.name()).thenReturn("range");
when(assignor.assign(any())).thenThrow(new PartitionAssignorException("Assignment failed."));
when(assignor.assign(any(), any())).thenThrow(new PartitionAssignorException("Assignment failed."));
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withAssignors(Collections.singletonList(assignor))
.withMetadataImage(new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.addTopic(barTopicId, barTopicName, 3)
.addRacks()
.build())
.build();
@ -2732,6 +2766,7 @@ public class GroupMetadataManagerTest {
.withConsumerGroupMetadataRefreshIntervalMs(5 * 60 * 1000)
.withMetadataImage(new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.addRacks()
.build())
.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
.withMember(new ConsumerGroupMember.Builder(memberId)
@ -2753,7 +2788,7 @@ public class GroupMetadataManagerTest {
{
// foo only has 3 partitions stored in the metadata but foo has
// 6 partitions the metadata image.
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 3));
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 3, mkMapOfPartitionRacks(3)));
}
}))
.build();
@ -2807,7 +2842,7 @@ public class GroupMetadataManagerTest {
List<Record> expectedRecords = Arrays.asList(
RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() {
{
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6));
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6)));
}
}),
RecordHelpers.newGroupEpochRecord(groupId, 11),
@ -2842,6 +2877,7 @@ public class GroupMetadataManagerTest {
.withConsumerGroupMetadataRefreshIntervalMs(5 * 60 * 1000)
.withMetadataImage(new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.addRacks()
.build())
.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
.withMember(new ConsumerGroupMember.Builder(memberId)
@ -2863,7 +2899,7 @@ public class GroupMetadataManagerTest {
{
// foo only has 3 partitions stored in the metadata but foo has
// 6 partitions the metadata image.
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 3));
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 3, mkMapOfPartitionRacks(3)));
}
}))
.build();
@ -2935,7 +2971,7 @@ public class GroupMetadataManagerTest {
List<Record> expectedRecords = Arrays.asList(
RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() {
{
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6));
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6)));
}
}),
RecordHelpers.newGroupEpochRecord(groupId, 11),
@ -3172,6 +3208,7 @@ public class GroupMetadataManagerTest {
.withAssignors(Collections.singletonList(assignor))
.withMetadataImage(new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.addRacks()
.build())
.build();
@ -3246,6 +3283,7 @@ public class GroupMetadataManagerTest {
.withAssignors(Collections.singletonList(assignor))
.withMetadataImage(new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.addRacks()
.build())
.build();
@ -3311,6 +3349,7 @@ public class GroupMetadataManagerTest {
.withAssignors(Collections.singletonList(assignor))
.withMetadataImage(new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 3)
.addRacks()
.build())
.build();
@ -3550,6 +3589,7 @@ public class GroupMetadataManagerTest {
.withAssignors(Collections.singletonList(assignor))
.withMetadataImage(new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 3)
.addRacks()
.build())
.build();
@ -5737,7 +5777,7 @@ public class GroupMetadataManagerTest {
assertEquals(Errors.LEADER_NOT_AVAILABLE, Errors.LEADER_NOT_AVAILABLE);
}
private <T> void assertUnorderedListEquals(
public static <T> void assertUnorderedListEquals(
List<T> expected,
List<T> actual
) {
@ -5797,86 +5837,6 @@ public class GroupMetadataManagerTest {
return assignmentMap;
}
private void assertRecordsEquals(
List<Record> expectedRecords,
List<Record> actualRecords
) {
try {
assertEquals(expectedRecords.size(), actualRecords.size());
for (int i = 0; i < expectedRecords.size(); i++) {
Record expectedRecord = expectedRecords.get(i);
Record actualRecord = actualRecords.get(i);
assertRecordEquals(expectedRecord, actualRecord);
}
} catch (AssertionFailedError e) {
assertionFailure()
.expected(expectedRecords)
.actual(actualRecords)
.buildAndThrow();
}
}
private void assertRecordEquals(
Record expected,
Record actual
) {
try {
assertApiMessageAndVersionEquals(expected.key(), actual.key());
assertApiMessageAndVersionEquals(expected.value(), actual.value());
} catch (AssertionFailedError e) {
assertionFailure()
.expected(expected)
.actual(actual)
.buildAndThrow();
}
}
private void assertApiMessageAndVersionEquals(
ApiMessageAndVersion expected,
ApiMessageAndVersion actual
) {
if (expected == actual) return;
assertEquals(expected.version(), actual.version());
if (actual.message() instanceof ConsumerGroupCurrentMemberAssignmentValue) {
// The order of the topics stored in ConsumerGroupCurrentMemberAssignmentValue is not
// always guaranteed. Therefore, we need a special comparator.
ConsumerGroupCurrentMemberAssignmentValue expectedValue =
(ConsumerGroupCurrentMemberAssignmentValue) expected.message();
ConsumerGroupCurrentMemberAssignmentValue actualValue =
(ConsumerGroupCurrentMemberAssignmentValue) actual.message();
assertEquals(expectedValue.memberEpoch(), actualValue.memberEpoch());
assertEquals(expectedValue.previousMemberEpoch(), actualValue.previousMemberEpoch());
assertEquals(expectedValue.targetMemberEpoch(), actualValue.targetMemberEpoch());
assertEquals(expectedValue.error(), actualValue.error());
assertEquals(expectedValue.metadataVersion(), actualValue.metadataVersion());
assertEquals(expectedValue.metadataBytes(), actualValue.metadataBytes());
// We transform those to Maps before comparing them.
assertEquals(fromTopicPartitions(expectedValue.assignedPartitions()),
fromTopicPartitions(actualValue.assignedPartitions()));
assertEquals(fromTopicPartitions(expectedValue.partitionsPendingRevocation()),
fromTopicPartitions(actualValue.partitionsPendingRevocation()));
assertEquals(fromTopicPartitions(expectedValue.partitionsPendingAssignment()),
fromTopicPartitions(actualValue.partitionsPendingAssignment()));
} else {
assertEquals(expected.message(), actual.message());
}
}
private Map<Uuid, Set<Integer>> fromTopicPartitions(
List<ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions> assignment
) {
Map<Uuid, Set<Integer>> assignmentMap = new HashMap<>();
assignment.forEach(topicPartitions -> {
assignmentMap.put(topicPartitions.topicId(), new HashSet<>(topicPartitions.partitions()));
});
return assignmentMap;
}
private List<String> verifyGenericGroupJoinResponses(
List<CompletableFuture<JoinGroupResponseData>> responseFutures,
int expectedSuccessCount,

View File

@ -52,6 +52,7 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.opentest4j.AssertionFailedError;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
@ -59,6 +60,8 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@ -71,6 +74,7 @@ import java.util.stream.Stream;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkSortedAssignment;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkSortedTopicAssignment;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
import static org.apache.kafka.coordinator.group.GroupMetadataManagerTest.assertUnorderedListEquals;
import static org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentRecord;
import static org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentTombstoneRecord;
import static org.apache.kafka.coordinator.group.RecordHelpers.newGroupEpochRecord;
@ -83,8 +87,10 @@ import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignme
import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentEpochTombstoneRecord;
import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentRecord;
import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord;
import static org.junit.jupiter.api.AssertionFailureBuilder.assertionFailure;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.fail;
public class RecordHelpersTest {
@ -161,15 +167,18 @@ public class RecordHelpersTest {
Uuid fooTopicId = Uuid.randomUuid();
Uuid barTopicId = Uuid.randomUuid();
Map<String, TopicMetadata> subscriptionMetadata = new LinkedHashMap<>();
subscriptionMetadata.put("foo", new TopicMetadata(
fooTopicId,
"foo",
10
10,
mkMapOfPartitionRacks(10)
));
subscriptionMetadata.put("bar", new TopicMetadata(
barTopicId,
"bar",
20
20,
mkMapOfPartitionRacks(20)
));
Record expectedRecord = new Record(
@ -184,14 +193,16 @@ public class RecordHelpersTest {
new ConsumerGroupPartitionMetadataValue.TopicMetadata()
.setTopicId(fooTopicId)
.setTopicName("foo")
.setNumPartitions(10),
.setNumPartitions(10)
.setPartitionMetadata(mkListOfPartitionRacks(10)),
new ConsumerGroupPartitionMetadataValue.TopicMetadata()
.setTopicId(barTopicId)
.setTopicName("bar")
.setNumPartitions(20))),
.setNumPartitions(20)
.setPartitionMetadata(mkListOfPartitionRacks(20)))),
(short) 0));
assertEquals(expectedRecord, newGroupSubscriptionMetadataRecord(
assertRecordEquals(expectedRecord, newGroupSubscriptionMetadataRecord(
"group-id",
subscriptionMetadata
));
@ -212,6 +223,52 @@ public class RecordHelpersTest {
));
}
@Test
public void testEmptyPartitionMetadataWhenRacksUnavailableGroupSubscriptionMetadataRecord() {
Uuid fooTopicId = Uuid.randomUuid();
Uuid barTopicId = Uuid.randomUuid();
Map<String, TopicMetadata> subscriptionMetadata = new LinkedHashMap<>();
subscriptionMetadata.put("foo", new TopicMetadata(
fooTopicId,
"foo",
10,
Collections.emptyMap()
));
subscriptionMetadata.put("bar", new TopicMetadata(
barTopicId,
"bar",
20,
Collections.emptyMap()
));
Record expectedRecord = new Record(
new ApiMessageAndVersion(
new ConsumerGroupPartitionMetadataKey()
.setGroupId("group-id"),
(short) 4
),
new ApiMessageAndVersion(
new ConsumerGroupPartitionMetadataValue()
.setTopics(Arrays.asList(
new ConsumerGroupPartitionMetadataValue.TopicMetadata()
.setTopicId(fooTopicId)
.setTopicName("foo")
.setNumPartitions(10)
.setPartitionMetadata(Collections.emptyList()),
new ConsumerGroupPartitionMetadataValue.TopicMetadata()
.setTopicId(barTopicId)
.setTopicName("bar")
.setNumPartitions(20)
.setPartitionMetadata(Collections.emptyList()))),
(short) 0));
assertRecordEquals(expectedRecord, newGroupSubscriptionMetadataRecord(
"group-id",
subscriptionMetadata
));
}
@Test
public void testNewGroupEpochRecord() {
Record expectedRecord = new Record(
@ -620,7 +677,7 @@ public class RecordHelpersTest {
MetadataVersion.IBP_3_5_IV2
));
}
@ParameterizedTest
@MethodSource("metadataToExpectedGroupMetadataValue")
public void testEmptyGroupMetadataRecord(
@ -764,4 +821,182 @@ public class RecordHelpersTest {
Record record = RecordHelpers.newOffsetCommitTombstoneRecord("group-id", "foo", 1);
assertEquals(expectedRecord, record);
}
/**
* Creates a list of values to be added to the record and assigns partitions to racks for testing.
*
* @param numPartitions The number of partitions for the topic.
*
* For testing purposes, the following criteria are used:
* - Number of replicas for each partition: 2
* - Number of racks available to the cluster: 4
*/
public static List<ConsumerGroupPartitionMetadataValue.PartitionMetadata> mkListOfPartitionRacks(int numPartitions) {
List<ConsumerGroupPartitionMetadataValue.PartitionMetadata> partitionRacks = new ArrayList<>(numPartitions);
for (int i = 0; i < numPartitions; i++) {
List<String> racks = new ArrayList<>(Arrays.asList("rack" + i % 4, "rack" + (i + 1) % 4));
partitionRacks.add(
new ConsumerGroupPartitionMetadataValue.PartitionMetadata()
.setPartition(i)
.setRacks(racks)
);
}
return partitionRacks;
}
/**
* Creates a map of partitions to racks for testing.
*
* @param numPartitions The number of partitions for the topic.
*
* For testing purposes, the following criteria are used:
* - Number of replicas for each partition: 2
* - Number of racks available to the cluster: 4
*/
public static Map<Integer, Set<String>> mkMapOfPartitionRacks(int numPartitions) {
Map<Integer, Set<String>> partitionRacks = new HashMap<>(numPartitions);
for (int i = 0; i < numPartitions; i++) {
partitionRacks.put(i, new HashSet<>(Arrays.asList("rack" + i % 4, "rack" + (i + 1) % 4)));
}
return partitionRacks;
}
/**
* Asserts whether the two provided lists of records are equal.
*
* @param expectedRecords The expected list of records.
* @param actualRecords The actual list of records.
*/
public static void assertRecordsEquals(
List<Record> expectedRecords,
List<Record> actualRecords
) {
try {
assertEquals(expectedRecords.size(), actualRecords.size());
for (int i = 0; i < expectedRecords.size(); i++) {
Record expectedRecord = expectedRecords.get(i);
Record actualRecord = actualRecords.get(i);
assertRecordEquals(expectedRecord, actualRecord);
}
} catch (AssertionFailedError e) {
assertionFailure()
.expected(expectedRecords)
.actual(actualRecords)
.buildAndThrow();
}
}
/**
* Asserts if the two provided records are equal.
*
* @param expectedRecord The expected record.
* @param actualRecord The actual record.
*/
public static void assertRecordEquals(
Record expectedRecord,
Record actualRecord
) {
try {
assertApiMessageAndVersionEquals(expectedRecord.key(), actualRecord.key());
assertApiMessageAndVersionEquals(expectedRecord.value(), actualRecord.value());
} catch (AssertionFailedError e) {
assertionFailure()
.expected(expectedRecord)
.actual(actualRecord)
.buildAndThrow();
}
}
private static void assertApiMessageAndVersionEquals(
ApiMessageAndVersion expected,
ApiMessageAndVersion actual
) {
if (expected == actual) return;
assertEquals(expected.version(), actual.version());
if (actual.message() instanceof ConsumerGroupCurrentMemberAssignmentValue) {
// The order of the topics stored in ConsumerGroupCurrentMemberAssignmentValue is not
// always guaranteed. Therefore, we need a special comparator.
ConsumerGroupCurrentMemberAssignmentValue expectedValue =
(ConsumerGroupCurrentMemberAssignmentValue) expected.message();
ConsumerGroupCurrentMemberAssignmentValue actualValue =
(ConsumerGroupCurrentMemberAssignmentValue) actual.message();
assertEquals(expectedValue.memberEpoch(), actualValue.memberEpoch());
assertEquals(expectedValue.previousMemberEpoch(), actualValue.previousMemberEpoch());
assertEquals(expectedValue.targetMemberEpoch(), actualValue.targetMemberEpoch());
assertEquals(expectedValue.error(), actualValue.error());
assertEquals(expectedValue.metadataVersion(), actualValue.metadataVersion());
assertEquals(expectedValue.metadataBytes(), actualValue.metadataBytes());
// We transform those to Maps before comparing them.
assertEquals(fromTopicPartitions(expectedValue.assignedPartitions()),
fromTopicPartitions(actualValue.assignedPartitions()));
assertEquals(fromTopicPartitions(expectedValue.partitionsPendingRevocation()),
fromTopicPartitions(actualValue.partitionsPendingRevocation()));
assertEquals(fromTopicPartitions(expectedValue.partitionsPendingAssignment()),
fromTopicPartitions(actualValue.partitionsPendingAssignment()));
} else if (actual.message() instanceof ConsumerGroupPartitionMetadataValue) {
// The order of the racks stored in the PartitionMetadata of the ConsumerGroupPartitionMetadataValue
// is not always guaranteed. Therefore, we need a special comparator.
ConsumerGroupPartitionMetadataValue expectedValue =
(ConsumerGroupPartitionMetadataValue) expected.message();
ConsumerGroupPartitionMetadataValue actualValue =
(ConsumerGroupPartitionMetadataValue) actual.message();
List<ConsumerGroupPartitionMetadataValue.TopicMetadata> expectedTopicMetadataList =
expectedValue.topics();
List<ConsumerGroupPartitionMetadataValue.TopicMetadata> actualTopicMetadataList =
actualValue.topics();
if (expectedTopicMetadataList.size() != actualTopicMetadataList.size()) {
fail("Topic metadata lists have different sizes");
}
for (int i = 0; i < expectedTopicMetadataList.size(); i++) {
ConsumerGroupPartitionMetadataValue.TopicMetadata expectedTopicMetadata =
expectedTopicMetadataList.get(i);
ConsumerGroupPartitionMetadataValue.TopicMetadata actualTopicMetadata =
actualTopicMetadataList.get(i);
assertEquals(expectedTopicMetadata.topicId(), actualTopicMetadata.topicId());
assertEquals(expectedTopicMetadata.topicName(), actualTopicMetadata.topicName());
assertEquals(expectedTopicMetadata.numPartitions(), actualTopicMetadata.numPartitions());
List<ConsumerGroupPartitionMetadataValue.PartitionMetadata> expectedPartitionMetadataList =
expectedTopicMetadata.partitionMetadata();
List<ConsumerGroupPartitionMetadataValue.PartitionMetadata> actualPartitionMetadataList =
actualTopicMetadata.partitionMetadata();
// If the list is empty, rack information wasn't available for any replica of
// the partition and hence, the entry wasn't added to the record.
if (expectedPartitionMetadataList.size() != actualPartitionMetadataList.size()) {
fail("Partition metadata lists have different sizes");
} else if (!expectedPartitionMetadataList.isEmpty() && !actualPartitionMetadataList.isEmpty()) {
for (int j = 0; j < expectedTopicMetadataList.size(); j++) {
ConsumerGroupPartitionMetadataValue.PartitionMetadata expectedPartitionMetadata =
expectedPartitionMetadataList.get(j);
ConsumerGroupPartitionMetadataValue.PartitionMetadata actualPartitionMetadata =
actualPartitionMetadataList.get(j);
assertEquals(expectedPartitionMetadata.partition(), actualPartitionMetadata.partition());
assertUnorderedListEquals(expectedPartitionMetadata.racks(), actualPartitionMetadata.racks());
}
}
}
} else {
assertEquals(expected.message(), actual.message());
}
}
private static Map<Uuid, Set<Integer>> fromTopicPartitions(
List<ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions> assignment
) {
Map<Uuid, Set<Integer>> assignmentMap = new HashMap<>();
assignment.forEach(topicPartitions ->
assignmentMap.put(topicPartitions.topicId(), new HashSet<>(topicPartitions.partitions())));
return assignmentMap;
}
}

View File

@ -18,6 +18,8 @@ package org.apache.kafka.coordinator.group.assignor;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
@ -37,15 +39,29 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
public class RangeAssignorTest {
private final RangeAssignor assignor = new RangeAssignor();
private final Uuid topic1Uuid = Uuid.randomUuid();
private final String topic1Name = "topic1";
private final Uuid topic2Uuid = Uuid.randomUuid();
private final String topic2Name = "topic2";
private final Uuid topic3Uuid = Uuid.randomUuid();
private final String topic3Name = "topic3";
private final String consumerA = "A";
private final String consumerB = "B";
private final String consumerC = "C";
@Test
public void testOneConsumerNoTopic() {
Map<Uuid, AssignmentTopicMetadata> topics = Collections.singletonMap(topic1Uuid, new AssignmentTopicMetadata(3));
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(
Collections.singletonMap(
topic1Uuid,
new TopicMetadata(
topic1Uuid,
topic1Name,
3,
createPartitionRacks(3)
)
)
);
Map<String, AssignmentMemberSpec> members = Collections.singletonMap(
consumerA,
new AssignmentMemberSpec(
@ -56,15 +72,26 @@ public class RangeAssignorTest {
)
);
AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
GroupAssignment groupAssignment = assignor.assign(assignmentSpec);
AssignmentSpec assignmentSpec = new AssignmentSpec(members);
GroupAssignment groupAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
assertEquals(Collections.emptyMap(), groupAssignment.members());
}
@Test
public void testOneConsumerSubscribedToNonExistentTopic() {
Map<Uuid, AssignmentTopicMetadata> topics = Collections.singletonMap(topic1Uuid, new AssignmentTopicMetadata(3));
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(
Collections.singletonMap(
topic1Uuid,
new TopicMetadata(
topic1Uuid,
topic1Name,
3,
createPartitionRacks(3)
)
)
);
Map<String, AssignmentMemberSpec> members = Collections.singletonMap(
consumerA,
new AssignmentMemberSpec(
@ -75,17 +102,29 @@ public class RangeAssignorTest {
)
);
AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
AssignmentSpec assignmentSpec = new AssignmentSpec(members);
assertThrows(PartitionAssignorException.class,
() -> assignor.assign(assignmentSpec));
() -> assignor.assign(assignmentSpec, subscribedTopicMetadata));
}
@Test
public void testFirstAssignmentTwoConsumersTwoTopicsSameSubscriptions() {
Map<Uuid, AssignmentTopicMetadata> topics = new HashMap<>();
topics.put(topic1Uuid, new AssignmentTopicMetadata(3));
topics.put(topic3Uuid, new AssignmentTopicMetadata(2));
Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
3,
createPartitionRacks(3)
));
topicMetadata.put(topic3Uuid, new TopicMetadata(
topic3Uuid,
topic3Name,
2,
createPartitionRacks(2)
));
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
Map<String, AssignmentMemberSpec> members = new TreeMap<>();
@ -103,8 +142,8 @@ public class RangeAssignorTest {
Collections.emptyMap()
));
AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
GroupAssignment computedAssignment = assignor.assign(assignmentSpec);
AssignmentSpec assignmentSpec = new AssignmentSpec(members);
GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
@ -123,10 +162,27 @@ public class RangeAssignorTest {
@Test
public void testFirstAssignmentThreeConsumersThreeTopicsDifferentSubscriptions() {
Map<Uuid, AssignmentTopicMetadata> topics = new HashMap<>();
topics.put(topic1Uuid, new AssignmentTopicMetadata(3));
topics.put(topic2Uuid, new AssignmentTopicMetadata(3));
topics.put(topic3Uuid, new AssignmentTopicMetadata(2));
Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
3,
createPartitionRacks(3)
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
3,
createPartitionRacks(3)
));
topicMetadata.put(topic3Uuid, new TopicMetadata(
topic3Uuid,
topic3Name,
2,
createPartitionRacks(2)
));
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
Map<String, AssignmentMemberSpec> members = new TreeMap<>();
@ -151,8 +207,8 @@ public class RangeAssignorTest {
Collections.emptyMap()
));
AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
GroupAssignment computedAssignment = assignor.assign(assignmentSpec);
AssignmentSpec assignmentSpec = new AssignmentSpec(members);
GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
@ -175,9 +231,21 @@ public class RangeAssignorTest {
@Test
public void testFirstAssignmentNumConsumersGreaterThanNumPartitions() {
Map<Uuid, AssignmentTopicMetadata> topics = new HashMap<>();
topics.put(topic1Uuid, new AssignmentTopicMetadata(3));
topics.put(topic3Uuid, new AssignmentTopicMetadata(2));
Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
3,
createPartitionRacks(3)
));
topicMetadata.put(topic3Uuid, new TopicMetadata(
topic3Uuid,
topic3Name,
2,
createPartitionRacks(2)
));
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
Map<String, AssignmentMemberSpec> members = new TreeMap<>();
@ -202,8 +270,8 @@ public class RangeAssignorTest {
Collections.emptyMap()
));
AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
GroupAssignment computedAssignment = assignor.assign(assignmentSpec);
AssignmentSpec assignmentSpec = new AssignmentSpec(members);
GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
// Topic 3 has 2 partitions but three consumers subscribed to it - one of them will not get a partition.
@ -226,9 +294,21 @@ public class RangeAssignorTest {
@Test
public void testReassignmentNumConsumersGreaterThanNumPartitionsWhenOneConsumerAdded() {
Map<Uuid, AssignmentTopicMetadata> topics = new HashMap<>();
topics.put(topic1Uuid, new AssignmentTopicMetadata(2));
topics.put(topic2Uuid, new AssignmentTopicMetadata(2));
Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
2,
createPartitionRacks(2)
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
2,
createPartitionRacks(2)
));
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
Map<String, AssignmentMemberSpec> members = new TreeMap<>();
@ -264,8 +344,8 @@ public class RangeAssignorTest {
Collections.emptyMap()
));
AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
GroupAssignment computedAssignment = assignor.assign(assignmentSpec);
AssignmentSpec assignmentSpec = new AssignmentSpec(members);
GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
@ -287,9 +367,21 @@ public class RangeAssignorTest {
@Test
public void testReassignmentWhenOnePartitionAddedForTwoConsumersTwoTopics() {
// Simulating adding a partition - originally T1 -> 3 Partitions and T2 -> 3 Partitions
Map<Uuid, AssignmentTopicMetadata> topics = new HashMap<>();
topics.put(topic1Uuid, new AssignmentTopicMetadata(4));
topics.put(topic2Uuid, new AssignmentTopicMetadata(4));
Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
4,
createPartitionRacks(4)
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
4,
createPartitionRacks(4)
));
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
Map<String, AssignmentMemberSpec> members = new TreeMap<>();
@ -317,8 +409,8 @@ public class RangeAssignorTest {
currentAssignmentForB
));
AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
GroupAssignment computedAssignment = assignor.assign(assignmentSpec);
AssignmentSpec assignmentSpec = new AssignmentSpec(members);
GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
@ -337,9 +429,21 @@ public class RangeAssignorTest {
@Test
public void testReassignmentWhenOneConsumerAddedAfterInitialAssignmentWithTwoConsumersTwoTopics() {
Map<Uuid, AssignmentTopicMetadata> topics = new HashMap<>();
topics.put(topic1Uuid, new AssignmentTopicMetadata(3));
topics.put(topic2Uuid, new AssignmentTopicMetadata(3));
Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
3,
createPartitionRacks(3)
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
3,
createPartitionRacks(3)
));
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
Map<String, AssignmentMemberSpec> members = new TreeMap<>();
@ -375,8 +479,8 @@ public class RangeAssignorTest {
Collections.emptyMap()
));
AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
GroupAssignment computedAssignment = assignor.assign(assignmentSpec);
AssignmentSpec assignmentSpec = new AssignmentSpec(members);
GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
@ -400,10 +504,22 @@ public class RangeAssignorTest {
@Test
public void testReassignmentWhenOneConsumerAddedAndOnePartitionAfterInitialAssignmentWithTwoConsumersTwoTopics() {
Map<Uuid, AssignmentTopicMetadata> topics = new HashMap<>();
// Add a new partition to topic 1, initially T1 -> 3 partitions
topics.put(topic1Uuid, new AssignmentTopicMetadata(4));
topics.put(topic2Uuid, new AssignmentTopicMetadata(3));
Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
4,
createPartitionRacks(4)
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
3,
createPartitionRacks(3)
));
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
Map<String, AssignmentMemberSpec> members = new TreeMap<>();
@ -439,8 +555,8 @@ public class RangeAssignorTest {
Collections.emptyMap()
));
AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
GroupAssignment computedAssignment = assignor.assign(assignmentSpec);
AssignmentSpec assignmentSpec = new AssignmentSpec(members);
GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
@ -463,9 +579,21 @@ public class RangeAssignorTest {
@Test
public void testReassignmentWhenOneConsumerRemovedAfterInitialAssignmentWithTwoConsumersTwoTopics() {
Map<Uuid, AssignmentTopicMetadata> topics = new HashMap<>();
topics.put(topic1Uuid, new AssignmentTopicMetadata(3));
topics.put(topic2Uuid, new AssignmentTopicMetadata(3));
Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
3,
createPartitionRacks(3)
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
3,
createPartitionRacks(3)
));
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
Map<String, AssignmentMemberSpec> members = new TreeMap<>();
// Consumer A was removed
@ -482,8 +610,8 @@ public class RangeAssignorTest {
currentAssignmentForB
));
AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
GroupAssignment computedAssignment = assignor.assign(assignmentSpec);
AssignmentSpec assignmentSpec = new AssignmentSpec(members);
GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
@ -497,10 +625,27 @@ public class RangeAssignorTest {
@Test
public void testReassignmentWhenMultipleSubscriptionsRemovedAfterInitialAssignmentWithThreeConsumersTwoTopics() {
Map<Uuid, AssignmentTopicMetadata> topics = new HashMap<>();
topics.put(topic1Uuid, new AssignmentTopicMetadata(3));
topics.put(topic2Uuid, new AssignmentTopicMetadata(3));
topics.put(topic3Uuid, new AssignmentTopicMetadata(2));
Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
3,
createPartitionRacks(3)
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
3,
createPartitionRacks(3)
));
topicMetadata.put(topic3Uuid, new TopicMetadata(
topic3Uuid,
topic3Name,
2,
createPartitionRacks(2)
));
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
Map<String, AssignmentMemberSpec> members = new TreeMap<>();
@ -542,8 +687,8 @@ public class RangeAssignorTest {
currentAssignmentForC
));
AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
GroupAssignment computedAssignment = assignor.assign(assignmentSpec);
AssignmentSpec assignmentSpec = new AssignmentSpec(members);
GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
@ -571,4 +716,14 @@ public class RangeAssignorTest {
assertEquals(expectedAssignment.get(memberId), computedAssignmentForMember);
}
}
// When rack awareness is enabled for this assignor, rack information can be updated in this method.
private static Map<Integer, Set<String>> createPartitionRacks(int numPartitions) {
Map<Integer, Set<String>> partitionRacks = new HashMap<>(numPartitions);
Set<String> emptySet = Collections.emptySet();
for (int i = 0; i < numPartitions; i++) {
partitionRacks.put(i, emptySet);
}
return partitionRacks;
}
}

View File

@ -34,6 +34,7 @@ import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
import static org.apache.kafka.coordinator.group.RecordHelpersTest.mkMapOfPartitionRacks;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
@ -398,16 +399,17 @@ public class ConsumerGroupTest {
.addTopic(fooTopicId, "foo", 1)
.addTopic(barTopicId, "bar", 2)
.addTopic(zarTopicId, "zar", 3)
.addRacks()
.build();
ConsumerGroupMember member1 = new ConsumerGroupMember.Builder("member1")
.setSubscribedTopicNames(Arrays.asList("foo"))
.setSubscribedTopicNames(Collections.singletonList("foo"))
.build();
ConsumerGroupMember member2 = new ConsumerGroupMember.Builder("member2")
.setSubscribedTopicNames(Arrays.asList("bar"))
.setSubscribedTopicNames(Collections.singletonList("bar"))
.build();
ConsumerGroupMember member3 = new ConsumerGroupMember.Builder("member3")
.setSubscribedTopicNames(Arrays.asList("zar"))
.setSubscribedTopicNames(Collections.singletonList("zar"))
.build();
ConsumerGroup consumerGroup = createConsumerGroup("group-foo");
@ -418,19 +420,21 @@ public class ConsumerGroupTest {
consumerGroup.computeSubscriptionMetadata(
null,
null,
image.topics()
image.topics(),
image.cluster()
)
);
// Compute while taking into account member 1.
assertEquals(
mkMap(
mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1))
mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1)))
),
consumerGroup.computeSubscriptionMetadata(
null,
member1,
image.topics()
image.topics(),
image.cluster()
)
);
@ -440,12 +444,13 @@ public class ConsumerGroupTest {
// It should return foo now.
assertEquals(
mkMap(
mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1))
mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1)))
),
consumerGroup.computeSubscriptionMetadata(
null,
null,
image.topics()
image.topics(),
image.cluster()
)
);
@ -455,20 +460,22 @@ public class ConsumerGroupTest {
consumerGroup.computeSubscriptionMetadata(
member1,
null,
image.topics()
image.topics(),
image.cluster()
)
);
// Compute while taking into account member 2.
assertEquals(
mkMap(
mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)),
mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2))
mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1))),
mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, mkMapOfPartitionRacks(2)))
),
consumerGroup.computeSubscriptionMetadata(
null,
member2,
image.topics()
image.topics(),
image.cluster()
)
);
@ -478,51 +485,55 @@ public class ConsumerGroupTest {
// It should return foo and bar.
assertEquals(
mkMap(
mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)),
mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2))
mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1))),
mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, mkMapOfPartitionRacks(2)))
),
consumerGroup.computeSubscriptionMetadata(
null,
null,
image.topics()
image.topics(),
image.cluster()
)
);
// Compute while taking into account removal of member 2.
assertEquals(
mkMap(
mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1))
mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1)))
),
consumerGroup.computeSubscriptionMetadata(
member2,
null,
image.topics()
image.topics(),
image.cluster()
)
);
// Removing member1 results in returning bar.
assertEquals(
mkMap(
mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2))
mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, mkMapOfPartitionRacks(2)))
),
consumerGroup.computeSubscriptionMetadata(
member1,
null,
image.topics()
image.topics(),
image.cluster()
)
);
// Compute while taking into account member 3.
assertEquals(
mkMap(
mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)),
mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2)),
mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3))
mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1))),
mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, mkMapOfPartitionRacks(2))),
mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3, mkMapOfPartitionRacks(3)))
),
consumerGroup.computeSubscriptionMetadata(
null,
member3,
image.topics()
image.topics(),
image.cluster()
)
);
@ -532,14 +543,15 @@ public class ConsumerGroupTest {
// It should return foo, bar and zar.
assertEquals(
mkMap(
mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)),
mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2)),
mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3))
mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1))),
mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, mkMapOfPartitionRacks(2))),
mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3, mkMapOfPartitionRacks(3)))
),
consumerGroup.computeSubscriptionMetadata(
null,
null,
image.topics()
image.topics(),
image.cluster()
)
);
}

View File

@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.consumer;
import org.apache.kafka.common.Uuid;
import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import static org.apache.kafka.coordinator.group.RecordHelpersTest.mkMapOfPartitionRacks;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class SubscribedTopicMetadataTest {
@Test
public void testAttribute() {
Map<Uuid, TopicMetadata> topicMetadataMap = new HashMap<>();
for (int i = 0; i < 5; i++) {
Uuid topicId = Uuid.randomUuid();
String topicName = "topic" + i;
Map<Integer, Set<String>> partitionRacks = mkMapOfPartitionRacks(5);
topicMetadataMap.put(
topicId,
new TopicMetadata(topicId, topicName, 5, partitionRacks)
);
}
assertEquals(topicMetadataMap, new SubscribedTopicMetadata(topicMetadataMap).topicMetadata());
}
@Test
public void testTopicMetadataCannotBeNull() {
assertThrows(NullPointerException.class, () -> new SubscribedTopicMetadata(null));
}
@Test
public void testEquals() {
Map<Uuid, TopicMetadata> topicMetadataMap = new HashMap<>();
for (int i = 0; i < 5; i++) {
Uuid topicId = Uuid.randomUuid();
String topicName = "topic" + i;
Map<Integer, Set<String>> partitionRacks = mkMapOfPartitionRacks(5);
topicMetadataMap.put(
topicId,
new TopicMetadata(topicId, topicName, 5, partitionRacks)
);
}
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadataMap);
assertEquals(new SubscribedTopicMetadata(topicMetadataMap), subscribedTopicMetadata);
Map<Uuid, TopicMetadata> topicMetadataMap2 = new HashMap<>();
Uuid topicId = Uuid.randomUuid();
topicMetadataMap2.put(topicId, new TopicMetadata(topicId, "newTopic", 5, Collections.emptyMap()));
assertNotEquals(new SubscribedTopicMetadata(topicMetadataMap2), subscribedTopicMetadata);
}
}

View File

@ -19,7 +19,6 @@ package org.apache.kafka.coordinator.group.consumer;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
import org.apache.kafka.coordinator.group.assignor.AssignmentTopicMetadata;
import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
@ -38,6 +37,7 @@ import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentEpochRecord;
import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentRecord;
import static org.apache.kafka.coordinator.group.RecordHelpersTest.mkMapOfPartitionRacks;
import static org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder.createAssignmentMemberSpec;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
@ -84,13 +84,15 @@ public class TargetAssignmentBuilderTest {
public Uuid addTopicMetadata(
String topicName,
int numPartitions
int numPartitions,
Map<Integer, Set<String>> partitionRacks
) {
Uuid topicId = Uuid.randomUuid();
subscriptionMetadata.put(topicName, new TopicMetadata(
topicId,
topicName,
numPartitions
numPartitions,
partitionRacks
));
return topicId;
}
@ -145,13 +147,13 @@ public class TargetAssignmentBuilderTest {
Map<String, AssignmentMemberSpec> memberSpecs = new HashMap<>();
// All the existing members are prepared.
members.forEach((memberId, member) -> {
members.forEach((memberId, member) ->
memberSpecs.put(memberId, createAssignmentMemberSpec(
member,
targetAssignment.getOrDefault(memberId, Assignment.EMPTY),
subscriptionMetadata
));
});
)
));
// All the updated are added and all the deleted
// members are removed.
@ -168,20 +170,21 @@ public class TargetAssignmentBuilderTest {
});
// Prepare the expected topic metadata.
Map<Uuid, AssignmentTopicMetadata> topicMetadata = new HashMap<>();
subscriptionMetadata.forEach((topicName, metadata) -> {
topicMetadata.put(metadata.id(), new AssignmentTopicMetadata(metadata.numPartitions()));
});
Map<Uuid, TopicMetadata> topicMetadataMap = new HashMap<>();
subscriptionMetadata.forEach((topicName, topicMetadata) ->
topicMetadataMap.put(topicMetadata.id(), topicMetadata));
// Prepare the expected subscription topic metadata.
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadataMap);
// Prepare the expected assignment spec.
AssignmentSpec assignmentSpec = new AssignmentSpec(
memberSpecs,
topicMetadata
);
AssignmentSpec assignmentSpec = new AssignmentSpec(memberSpecs);
// We use `any` here to always return an assignment but use `verify` later on
// to ensure that the input was correct.
when(assignor.assign(any())).thenReturn(new GroupAssignment(memberAssignments));
when(assignor.assign(any(), any()))
.thenReturn(new GroupAssignment(memberAssignments));
// Create and populate the assignment builder.
TargetAssignmentBuilder builder = new TargetAssignmentBuilder(groupId, groupEpoch, assignor)
@ -203,7 +206,8 @@ public class TargetAssignmentBuilderTest {
// Verify that the assignor was called once with the expected
// assignment spec.
verify(assignor, times(1)).assign(assignmentSpec);
verify(assignor, times(1))
.assign(assignmentSpec, subscribedTopicMetadata);
return result;
}
@ -222,8 +226,8 @@ public class TargetAssignmentBuilderTest {
Map<String, TopicMetadata> subscriptionMetadata = new HashMap<String, TopicMetadata>() {
{
put("foo", new TopicMetadata(fooTopicId, "foo", 5));
put("bar", new TopicMetadata(barTopicId, "bar", 5));
put("foo", new TopicMetadata(fooTopicId, "foo", 5, Collections.emptyMap()));
put("bar", new TopicMetadata(barTopicId, "bar", 5, Collections.emptyMap()));
}
};
@ -268,8 +272,8 @@ public class TargetAssignmentBuilderTest {
20
);
Uuid fooTopicId = context.addTopicMetadata("foo", 6);
Uuid barTopicId = context.addTopicMetadata("bar", 6);
Uuid fooTopicId = context.addTopicMetadata("foo", 6, Collections.emptyMap());
Uuid barTopicId = context.addTopicMetadata("bar", 6, Collections.emptyMap());
context.addGroupMember("member-1", Arrays.asList("foo", "bar", "zar"), mkAssignment(
mkTopicAssignment(fooTopicId, 1, 2, 3),
@ -318,8 +322,8 @@ public class TargetAssignmentBuilderTest {
20
);
Uuid fooTopicId = context.addTopicMetadata("foo", 6);
Uuid barTopicId = context.addTopicMetadata("bar", 6);
Uuid fooTopicId = context.addTopicMetadata("foo", 6, Collections.emptyMap());
Uuid barTopicId = context.addTopicMetadata("bar", 6, Collections.emptyMap());
context.addGroupMember("member-1", Arrays.asList("foo", "bar", "zar"), mkAssignment(
mkTopicAssignment(fooTopicId, 1, 2, 3),
@ -381,8 +385,8 @@ public class TargetAssignmentBuilderTest {
20
);
Uuid fooTopicId = context.addTopicMetadata("foo", 6);
Uuid barTopicId = context.addTopicMetadata("bar", 6);
Uuid fooTopicId = context.addTopicMetadata("foo", 6, Collections.emptyMap());
Uuid barTopicId = context.addTopicMetadata("bar", 6, Collections.emptyMap());
context.addGroupMember("member-1", Arrays.asList("foo", "bar", "zar"), mkAssignment(
mkTopicAssignment(fooTopicId, 1, 2, 3),
@ -459,8 +463,8 @@ public class TargetAssignmentBuilderTest {
20
);
Uuid fooTopicId = context.addTopicMetadata("foo", 6);
Uuid barTopicId = context.addTopicMetadata("bar", 6);
Uuid fooTopicId = context.addTopicMetadata("foo", 6, Collections.emptyMap());
Uuid barTopicId = context.addTopicMetadata("bar", 6, Collections.emptyMap());
context.addGroupMember("member-1", Arrays.asList("foo", "bar", "zar"), mkAssignment(
mkTopicAssignment(fooTopicId, 1, 2, 3),
@ -546,8 +550,8 @@ public class TargetAssignmentBuilderTest {
20
);
Uuid fooTopicId = context.addTopicMetadata("foo", 6);
Uuid barTopicId = context.addTopicMetadata("bar", 6);
Uuid fooTopicId = context.addTopicMetadata("foo", 6, mkMapOfPartitionRacks(6));
Uuid barTopicId = context.addTopicMetadata("bar", 6, mkMapOfPartitionRacks(6));
context.addGroupMember("member-1", Arrays.asList("foo", "bar", "zar"), mkAssignment(
mkTopicAssignment(fooTopicId, 1, 2),
@ -624,8 +628,8 @@ public class TargetAssignmentBuilderTest {
20
);
Uuid fooTopicId = context.addTopicMetadata("foo", 6);
Uuid barTopicId = context.addTopicMetadata("bar", 6);
Uuid fooTopicId = context.addTopicMetadata("foo", 6, Collections.emptyMap());
Uuid barTopicId = context.addTopicMetadata("bar", 6, Collections.emptyMap());
context.addGroupMember("member-1", Arrays.asList("foo", "bar", "zar"), mkAssignment(
mkTopicAssignment(fooTopicId, 1, 2),
@ -687,7 +691,7 @@ public class TargetAssignmentBuilderTest {
assertEquals(expectedAssignment, result.targetAssignment());
}
public static <T> void assertUnorderedList(
private static <T> void assertUnorderedList(
List<T> expected,
List<T> actual
) {

View File

@ -20,6 +20,12 @@ import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import static org.apache.kafka.coordinator.group.RecordHelpersTest.mkListOfPartitionRacks;
import static org.apache.kafka.coordinator.group.RecordHelpersTest.mkMapOfPartitionRacks;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
@ -28,24 +34,29 @@ public class TopicMetadataTest {
@Test
public void testAttributes() {
Uuid topicId = Uuid.randomUuid();
TopicMetadata topicMetadata = new TopicMetadata(topicId, "foo", 15);
Map<Integer, Set<String>> partitionRacks = mkMapOfPartitionRacks(15);
TopicMetadata topicMetadata = new TopicMetadata(topicId, "foo", 15, partitionRacks);
assertEquals(topicId, topicMetadata.id());
assertEquals("foo", topicMetadata.name());
assertEquals(15, topicMetadata.numPartitions());
assertEquals(partitionRacks, topicMetadata.partitionRacks());
}
@Test
public void testTopicIdAndNameCannotBeNull() {
assertThrows(NullPointerException.class, () -> new TopicMetadata(Uuid.randomUuid(), null, 15));
assertThrows(NullPointerException.class, () -> new TopicMetadata(null, "foo", 15));
assertThrows(NullPointerException.class, () -> new TopicMetadata(Uuid.randomUuid(), null, 15, Collections.emptyMap()));
assertThrows(NullPointerException.class, () -> new TopicMetadata(null, "foo", 15, Collections.emptyMap()));
}
@Test
public void testEquals() {
Uuid topicId = Uuid.randomUuid();
TopicMetadata topicMetadata = new TopicMetadata(topicId, "foo", 15);
assertEquals(new TopicMetadata(topicId, "foo", 15), topicMetadata);
assertNotEquals(new TopicMetadata(topicId, "foo", 5), topicMetadata);
Map<Integer, Set<String>> partitionRacks = mkMapOfPartitionRacks(15);
TopicMetadata topicMetadata = new TopicMetadata(topicId, "foo", 15, partitionRacks);
assertEquals(new TopicMetadata(topicId, "foo", 15, partitionRacks), topicMetadata);
assertNotEquals(new TopicMetadata(topicId, "foo", 5, mkMapOfPartitionRacks(5)), topicMetadata);
}
@Test
@ -56,10 +67,11 @@ public class TopicMetadataTest {
ConsumerGroupPartitionMetadataValue.TopicMetadata record = new ConsumerGroupPartitionMetadataValue.TopicMetadata()
.setTopicId(topicId)
.setTopicName(topicName)
.setNumPartitions(15);
.setNumPartitions(15)
.setPartitionMetadata(mkListOfPartitionRacks(15));
assertEquals(
new TopicMetadata(topicId, topicName, 15),
new TopicMetadata(topicId, topicName, 15, mkMapOfPartitionRacks(15)),
TopicMetadata.fromRecord(record)
);
}