diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 1b490c55e10..09857750c48 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -792,7 +792,8 @@ public class GroupMetadataManager { subscriptionMetadata = group.computeSubscriptionMetadata( member, updatedMember, - metadataImage.topics() + metadataImage.topics(), + metadataImage.cluster() ); if (!subscriptionMetadata.equals(group.subscriptionMetadata())) { @@ -942,7 +943,8 @@ public class GroupMetadataManager { Map subscriptionMetadata = group.computeSubscriptionMetadata( member, null, - metadataImage.topics() + metadataImage.topics(), + metadataImage.cluster() ); if (!subscriptionMetadata.equals(group.subscriptionMetadata())) { diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java index 13fcabe023d..c9bae675c2e 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java @@ -131,13 +131,24 @@ public class RecordHelpers { Map newSubscriptionMetadata ) { ConsumerGroupPartitionMetadataValue value = new ConsumerGroupPartitionMetadataValue(); - newSubscriptionMetadata.forEach((topicName, topicMetadata) -> + newSubscriptionMetadata.forEach((topicName, topicMetadata) -> { + List partitionMetadata = new ArrayList<>(); + // If the partition rack information map is empty, store an empty list in the record. + if (!topicMetadata.partitionRacks().isEmpty()) { + topicMetadata.partitionRacks().forEach((partition, racks) -> + partitionMetadata.add(new ConsumerGroupPartitionMetadataValue.PartitionMetadata() + .setPartition(partition) + .setRacks(new ArrayList<>(racks)) + ) + ); + } value.topics().add(new ConsumerGroupPartitionMetadataValue.TopicMetadata() .setTopicId(topicMetadata.id()) .setTopicName(topicMetadata.name()) .setNumPartitions(topicMetadata.numPartitions()) - ) - ); + .setPartitionMetadata(partitionMetadata) + ); + }); return new Record( new ApiMessageAndVersion( diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentSpec.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentSpec.java index bf775439077..0518d9cbd2a 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentSpec.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentSpec.java @@ -16,8 +16,6 @@ */ package org.apache.kafka.coordinator.group.assignor; -import org.apache.kafka.common.Uuid; - import java.util.Map; import java.util.Objects; @@ -26,59 +24,39 @@ import java.util.Objects; */ public class AssignmentSpec { /** - * The members keyed by member id. + * The member metadata keyed by member Id. */ private final Map members; - /** - * The topics' metadata keyed by topic id. - */ - private final Map topics; - public AssignmentSpec( - Map members, - Map topics + Map members ) { Objects.requireNonNull(members); - Objects.requireNonNull(topics); this.members = members; - this.topics = topics; } /** - * @return Member metadata keyed by member Ids. + * @return Member metadata keyed by member Id. */ public Map members() { return members; } - /** - * @return Topic metadata keyed by topic Ids. - */ - public Map topics() { - return topics; - } - @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; AssignmentSpec that = (AssignmentSpec) o; - if (!members.equals(that.members)) return false; - return topics.equals(that.topics); + return members.equals(that.members); } @Override public int hashCode() { - int result = members.hashCode(); - result = 31 * result + topics.hashCode(); - return result; + return members.hashCode(); } @Override public String toString() { - return "AssignmentSpec(members=" + members + - ", topics=" + topics + - ')'; + return "AssignmentSpec(members=" + members + ')'; } } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentTopicMetadata.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentTopicMetadata.java deleted file mode 100644 index e5a96e79b2c..00000000000 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentTopicMetadata.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.coordinator.group.assignor; - -/** - * Metadata of a topic. - */ -public class AssignmentTopicMetadata { - - /** - * The number of partitions. - */ - private final int numPartitions; - - public AssignmentTopicMetadata( - int numPartitions - ) { - this.numPartitions = numPartitions; - } - - /** - * @return The number of partitions present for the topic. - */ - public int numPartitions() { - return numPartitions; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - AssignmentTopicMetadata that = (AssignmentTopicMetadata) o; - return numPartitions == that.numPartitions; - } - - @Override - public int hashCode() { - return numPartitions; - } - - @Override - public String toString() { - return "AssignmentTopicMetadata(numPartitions=" + numPartitions + ')'; - } -} diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignor.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignor.java index 43410fc8aae..ddb5f9b2bc6 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignor.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignor.java @@ -26,18 +26,17 @@ import org.apache.kafka.common.annotation.InterfaceStability; */ @InterfaceStability.Unstable public interface PartitionAssignor { - /** * Unique name for this assignor. */ String name(); /** - * Perform the group assignment given the current members and - * topic metadata. + * Assigns partitions to group members based on the given assignment specification and topic metadata. * - * @param assignmentSpec The assignment spec. + * @param assignmentSpec The assignment spec which includes member metadata. + * @param subscribedTopicDescriber The topic and partition metadata describer. * @return The new assignment for the group. */ - GroupAssignment assign(AssignmentSpec assignmentSpec) throws PartitionAssignorException; + GroupAssignment assign(AssignmentSpec assignmentSpec, SubscribedTopicDescriber subscribedTopicDescriber) throws PartitionAssignorException; } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java index b5316ffbbdb..081211ca1c6 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java @@ -33,8 +33,9 @@ import static java.lang.Math.min; * This Range Assignor inherits properties of both the range assignor and the sticky assignor. * The properties are as follows: *
    - *
  1. Each member must get at least one partition from every topic that it is subscribed to. The only exception is when - * the number of subscribed members is greater than the number of partitions for that topic. (Range)
  2. + *
  3. Each member must get at least one partition from every topic that it is subscribed to. + * The only exception is when the number of subscribed members is greater than the + * number of partitions for that topic. (Range)
  4. *
  5. Partitions should be assigned to members in a way that facilitates the join operation when required. (Range) * This can only be done if every member is subscribed to the same topics and the topics are co-partitioned. * Two streams are co-partitioned if the following conditions are met: @@ -76,25 +77,28 @@ public class RangeAssignor implements PartitionAssignor { } /** - * @return Map of topic ids to a list of members subscribed to them. + * Returns a map of topic Ids to a list of members subscribed to them, + * based on the given assignment specification and metadata. + * + * @param assignmentSpec The specification for member assignments. + * @param subscribedTopicDescriber The metadata describer for subscribed topics and clusters. + * @return A map of topic Ids to a list of member Ids subscribed to them. + * + * @throws PartitionAssignorException If a member is subscribed to a non-existent topic. */ - private Map> membersPerTopic(final AssignmentSpec assignmentSpec) { + private Map> membersPerTopic(final AssignmentSpec assignmentSpec, final SubscribedTopicDescriber subscribedTopicDescriber) { Map> membersPerTopic = new HashMap<>(); Map membersData = assignmentSpec.members(); membersData.forEach((memberId, memberMetadata) -> { Collection topics = memberMetadata.subscribedTopicIds(); for (Uuid topicId : topics) { - // Only topics that are present in both the subscribed topics list and the topic metadata should be - // considered for assignment. - if (assignmentSpec.topics().containsKey(topicId)) { - membersPerTopic - .computeIfAbsent(topicId, k -> new ArrayList<>()) - .add(memberId); - } else { - throw new PartitionAssignorException("Member " + memberId + " subscribed to topic " + - topicId + " which doesn't exist in the topic metadata"); + if (subscribedTopicDescriber.numPartitions(topicId) == -1) { + throw new PartitionAssignorException("Member is subscribed to a non-existent topic"); } + membersPerTopic + .computeIfAbsent(topicId, k -> new ArrayList<>()) + .add(memberId); } }); @@ -118,14 +122,14 @@ public class RangeAssignor implements PartitionAssignor { *
*/ @Override - public GroupAssignment assign(final AssignmentSpec assignmentSpec) throws PartitionAssignorException { + public GroupAssignment assign(final AssignmentSpec assignmentSpec, final SubscribedTopicDescriber subscribedTopicDescriber) throws PartitionAssignorException { Map newAssignment = new HashMap<>(); // Step 1 - Map> membersPerTopic = membersPerTopic(assignmentSpec); + Map> membersPerTopic = membersPerTopic(assignmentSpec, subscribedTopicDescriber); membersPerTopic.forEach((topicId, membersForTopic) -> { - int numPartitionsForTopic = assignmentSpec.topics().get(topicId).numPartitions(); + int numPartitionsForTopic = subscribedTopicDescriber.numPartitions(topicId); int minRequiredQuota = numPartitionsForTopic / membersForTopic.size(); // Each member can get only ONE extra partition per topic after receiving the minimum quota. int numMembersWithExtraPartition = numPartitionsForTopic % membersForTopic.size(); diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SubscribedTopicDescriber.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SubscribedTopicDescriber.java new file mode 100644 index 00000000000..28586648cbf --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SubscribedTopicDescriber.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Set; + +/** + * The subscribed topic describer is used by the {@link PartitionAssignor} + * to obtain topic and partition metadata of the subscribed topics. + * + * The interface is kept in an internal module until KIP-848 is fully + * implemented and ready to be released. + */ +@InterfaceStability.Unstable +public interface SubscribedTopicDescriber { + /** + * The number of partitions for the given topic Id. + * + * @param topicId Uuid corresponding to the topic. + * @return The number of partitions corresponding to the given topic Id, + * or -1 if the topic Id does not exist. + */ + int numPartitions(Uuid topicId); + + /** + * Returns all the available racks associated with the replicas of the given partition. + * + * @param topicId Uuid corresponding to the partition's topic. + * @param partition Partition Id within topic. + * @return The set of racks corresponding to the replicas of the topic's partition. + * If the topic Id does not exist, an empty set is returned. + */ + Set racksForPartition(Uuid topicId, int partition); +} diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java index 533c590c36d..82f3f19659c 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.errors.StaleMemberEpochException; import org.apache.kafka.common.errors.UnknownMemberIdException; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.coordinator.group.Group; +import org.apache.kafka.image.ClusterImage; import org.apache.kafka.image.TopicImage; import org.apache.kafka.image.TopicsImage; import org.apache.kafka.timeline.SnapshotRegistry; @@ -30,6 +31,7 @@ import org.apache.kafka.timeline.TimelineObject; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -404,8 +406,8 @@ public class ConsumerGroup implements Group { } /** - * @return An immutable Map containing the subscription metadata for all the topics whose - * members are subscribed to. + * @return An immutable Map of subscription metadata for + * each topic that the consumer group is subscribed to. */ public Map subscriptionMetadata() { return Collections.unmodifiableMap(subscribedTopicMetadata); @@ -427,16 +429,18 @@ public class ConsumerGroup implements Group { * Computes the subscription metadata based on the current subscription and * an updated member. * - * @param oldMember The old member. - * @param newMember The new member. - * @param topicsImage The topic metadata. + * @param oldMember The old member of the consumer group. + * @param newMember The updated member of the consumer group. + * @param topicsImage The current metadata for all available topics. + * @param clusterImage The current metadata for the Kafka cluster. * - * @return The new subscription metadata as an immutable Map. + * @return An immutable map of subscription metadata for each topic that the consumer group is subscribed to. */ public Map computeSubscriptionMetadata( ConsumerGroupMember oldMember, ConsumerGroupMember newMember, - TopicsImage topicsImage + TopicsImage topicsImage, + ClusterImage clusterImage ) { // Copy and update the current subscriptions. Map subscribedTopicNames = new HashMap<>(this.subscribedTopicNames); @@ -444,14 +448,30 @@ public class ConsumerGroup implements Group { // Create the topic metadata for each subscribed topic. Map newSubscriptionMetadata = new HashMap<>(subscribedTopicNames.size()); + subscribedTopicNames.forEach((topicName, count) -> { TopicImage topicImage = topicsImage.getTopic(topicName); if (topicImage != null) { + Map> partitionRacks = new HashMap<>(); + topicImage.partitions().forEach((partition, partitionRegistration) -> { + Set racks = new HashSet<>(); + for (int replica : partitionRegistration.replicas) { + Optional rackOptional = clusterImage.broker(replica).rack(); + // Only add the rack if it is available for the broker/replica. + rackOptional.ifPresent(racks::add); + } + // If rack information is unavailable for all replicas of this partition, + // no corresponding entry will be stored for it in the map. + if (!racks.isEmpty()) + partitionRacks.put(partition, racks); + }); + newSubscriptionMetadata.put(topicName, new TopicMetadata( topicImage.id(), topicImage.name(), - topicImage.partitions().size() - )); + topicImage.partitions().size(), + partitionRacks) + ); } }); diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/SubscribedTopicMetadata.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/SubscribedTopicMetadata.java new file mode 100644 index 00000000000..c18dd24d316 --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/SubscribedTopicMetadata.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber; + +import java.util.Collections; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +/** + * The subscribed topic metadata class is used by the {@link PartitionAssignor} to obtain + * topic and partition metadata for the topics that the consumer group is subscribed to. + */ +public class SubscribedTopicMetadata implements SubscribedTopicDescriber { + /** + * The topic Ids mapped to their corresponding {@link TopicMetadata} + * object, which contains topic and partition metadata. + */ + private final Map topicMetadata; + + public SubscribedTopicMetadata(Map topicMetadata) { + this.topicMetadata = Objects.requireNonNull(topicMetadata); + } + + /** + * Map of topic Ids to topic metadata. + * + * @return The map of topic Ids to topic metadata. + */ + public Map topicMetadata() { + return this.topicMetadata; + } + + /** + * The number of partitions for the given topic Id. + * + * @param topicId Uuid corresponding to the topic. + * @return The number of partitions corresponding to the given topic Id, + * or -1 if the topic Id does not exist. + */ + @Override + public int numPartitions(Uuid topicId) { + TopicMetadata topic = this.topicMetadata.get(topicId); + return topic == null ? -1 : topic.numPartitions(); + } + + /** + * Returns all the available racks associated with the replicas of the given partition. + * + * @param topicId Uuid corresponding to the partition's topic. + * @param partition Partition Id within the topic. + * @return The set of racks corresponding to the replicas of the topics partition. + * If the topic Id does not exist, an empty set is returned + */ + @Override + public Set racksForPartition(Uuid topicId, int partition) { + TopicMetadata topic = this.topicMetadata.get(topicId); + return topic == null ? Collections.emptySet() : topic.partitionRacks().get(partition); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + SubscribedTopicMetadata that = (SubscribedTopicMetadata) o; + return topicMetadata.equals(that.topicMetadata); + } + + @Override + public int hashCode() { + return topicMetadata.hashCode(); + } + + @Override + public String toString() { + return "SubscribedTopicMetadata(" + + "topicMetadata=" + topicMetadata + + ')'; + } +} diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java index 02b120db1ef..2026efaa82c 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java @@ -20,7 +20,6 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.coordinator.group.Record; import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; -import org.apache.kafka.coordinator.group.assignor.AssignmentTopicMetadata; import org.apache.kafka.coordinator.group.assignor.GroupAssignment; import org.apache.kafka.coordinator.group.assignor.MemberAssignment; import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; @@ -244,16 +243,19 @@ public class TargetAssignmentBuilder { }); // Prepare the topic metadata. - Map topics = new HashMap<>(); + Map topicMetadataMap = new HashMap<>(); subscriptionMetadata.forEach((topicName, topicMetadata) -> - topics.put(topicMetadata.id(), new AssignmentTopicMetadata(topicMetadata.numPartitions())) + topicMetadataMap.put( + topicMetadata.id(), + topicMetadata + ) ); // Compute the assignment. - GroupAssignment newGroupAssignment = assignor.assign(new AssignmentSpec( - Collections.unmodifiableMap(memberSpecs), - Collections.unmodifiableMap(topics) - )); + GroupAssignment newGroupAssignment = assignor.assign( + new AssignmentSpec(Collections.unmodifiableMap(memberSpecs)), + new SubscribedTopicMetadata(topicMetadataMap) + ); // Compute delta from previous to new target assignment and create the // relevant records. diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TopicMetadata.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TopicMetadata.java index 395d48d95df..f9b2d757671 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TopicMetadata.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TopicMetadata.java @@ -19,7 +19,12 @@ package org.apache.kafka.coordinator.group.consumer; import org.apache.kafka.common.Uuid; import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; import java.util.Objects; +import java.util.Set; /** * Immutable topic metadata. @@ -40,10 +45,17 @@ public class TopicMetadata { */ private final int numPartitions; + /** + * Map of every partition Id to a set of its rack Ids, if they exist. + * If rack information is unavailable for all partitions, this is an empty map. + */ + private final Map> partitionRacks; + public TopicMetadata( Uuid id, String name, - int numPartitions + int numPartitions, + Map> partitionRacks ) { this.id = Objects.requireNonNull(id); if (Uuid.ZERO_UUID.equals(id)) { @@ -57,6 +69,7 @@ public class TopicMetadata { if (numPartitions < 0) { throw new IllegalArgumentException("Number of partitions cannot be negative."); } + this.partitionRacks = Objects.requireNonNull(partitionRacks); } /** @@ -80,6 +93,14 @@ public class TopicMetadata { return this.numPartitions; } + /** + * @return Every partition mapped to the set of corresponding available rack Ids of its replicas. + * An empty map is returned if rack information is unavailable for all partitions. + */ + public Map> partitionRacks() { + return this.partitionRacks; + } + @Override public boolean equals(Object o) { if (this == o) return true; @@ -89,7 +110,8 @@ public class TopicMetadata { if (!id.equals(that.id)) return false; if (!name.equals(that.name)) return false; - return numPartitions == that.numPartitions; + if (numPartitions != that.numPartitions) return false; + return partitionRacks.equals(that.partitionRacks); } @Override @@ -97,6 +119,7 @@ public class TopicMetadata { int result = id.hashCode(); result = 31 * result + name.hashCode(); result = 31 * result + numPartitions; + result = 31 * result + partitionRacks.hashCode(); return result; } @@ -106,16 +129,26 @@ public class TopicMetadata { "id=" + id + ", name=" + name + ", numPartitions=" + numPartitions + + ", partitionRacks=" + partitionRacks + ')'; } public static TopicMetadata fromRecord( ConsumerGroupPartitionMetadataValue.TopicMetadata record ) { + // Converting the data type from a list stored in the record to a map for the topic metadata. + Map> partitionRacks = new HashMap<>(); + for (ConsumerGroupPartitionMetadataValue.PartitionMetadata partitionMetadata : record.partitionMetadata()) { + partitionRacks.put( + partitionMetadata.partition(), + Collections.unmodifiableSet(new HashSet<>(partitionMetadata.racks())) + ); + } + return new TopicMetadata( record.topicId(), record.topicName(), - record.numPartitions() - ); + record.numPartitions(), + partitionRacks); } } diff --git a/group-coordinator/src/main/resources/common/message/ConsumerGroupPartitionMetadataValue.json b/group-coordinator/src/main/resources/common/message/ConsumerGroupPartitionMetadataValue.json index 81b5f5225e7..19de783b49b 100644 --- a/group-coordinator/src/main/resources/common/message/ConsumerGroupPartitionMetadataValue.json +++ b/group-coordinator/src/main/resources/common/message/ConsumerGroupPartitionMetadataValue.json @@ -29,7 +29,14 @@ { "name": "TopicName", "versions": "0+", "type": "string", "about": "The topic name." }, { "name": "NumPartitions", "versions": "0+", "type": "int32", - "about": "The number of partitions of the topic." } + "about": "The number of partitions of the topic." }, + { "name": "PartitionMetadata", "versions": "0+", "type": "[]PartitionMetadata", + "about": "Partitions mapped to a set of racks. If the rack information is unavailable for all the partitions, an empty list is stored", "fields": [ + { "name": "Partition", "versions": "0+", "type": "int32", + "about": "The partition number." }, + { "name": "Racks", "versions": "0+", "type": "[]string", + "about": "The set of racks that the partition is mapped to." } + ]} ]} ] } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index 8f6ae07b37f..dd00d1f7ae8 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -42,6 +42,7 @@ import org.apache.kafka.common.message.SyncGroupRequestData; import org.apache.kafka.common.message.SyncGroupRequestData.SyncGroupRequestAssignment; import org.apache.kafka.common.message.SyncGroupResponseData; import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.common.metadata.RegisterBrokerRecord; import org.apache.kafka.common.metadata.RemoveTopicRecord; import org.apache.kafka.common.metadata.TopicRecord; import org.apache.kafka.common.network.ClientInformation; @@ -62,6 +63,7 @@ import org.apache.kafka.coordinator.group.assignor.GroupAssignment; import org.apache.kafka.coordinator.group.assignor.MemberAssignment; import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException; +import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber; import org.apache.kafka.coordinator.group.consumer.Assignment; import org.apache.kafka.coordinator.group.consumer.ConsumerGroup; import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; @@ -95,7 +97,6 @@ import org.apache.kafka.timeline.SnapshotRegistry; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; -import org.opentest4j.AssertionFailedError; import java.net.InetAddress; import java.util.ArrayList; @@ -126,6 +127,9 @@ import static org.apache.kafka.coordinator.group.GroupMetadataManager.consumerGr import static org.apache.kafka.coordinator.group.GroupMetadataManager.EMPTY_RESULT; import static org.apache.kafka.coordinator.group.GroupMetadataManager.genericGroupHeartbeatKey; import static org.apache.kafka.coordinator.group.GroupMetadataManager.genericGroupSyncKey; +import static org.apache.kafka.coordinator.group.RecordHelpersTest.assertRecordEquals; +import static org.apache.kafka.coordinator.group.RecordHelpersTest.assertRecordsEquals; +import static org.apache.kafka.coordinator.group.RecordHelpersTest.mkMapOfPartitionRacks; import static org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE; import static org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD; import static org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY; @@ -163,7 +167,7 @@ public class GroupMetadataManagerTest { } @Override - public GroupAssignment assign(AssignmentSpec assignmentSpec) throws PartitionAssignorException { + public GroupAssignment assign(AssignmentSpec assignmentSpec, SubscribedTopicDescriber subscribedTopicDescriber) throws PartitionAssignorException { return prepareGroupAssignment; } } @@ -176,11 +180,28 @@ public class GroupMetadataManagerTest { String topicName, int numPartitions ) { + // For testing purposes, the following criteria are used: + // - Number of replicas for each partition: 2 + // - Number of brokers available in the cluster: 4 delta.replay(new TopicRecord().setTopicId(topicId).setName(topicName)); for (int i = 0; i < numPartitions; i++) { delta.replay(new PartitionRecord() .setTopicId(topicId) - .setPartitionId(i)); + .setPartitionId(i) + .setReplicas(Arrays.asList(i % 4, (i + 1) % 4))); + } + return this; + } + + /** + * Add rack Ids for 4 broker Ids. + * + * For testing purposes, each broker is mapped + * to a rack Id with the same broker Id as a suffix. + */ + public MetadataImageBuilder addRacks() { + for (int i = 0; i < 4; i++) { + delta.replay(new RegisterBrokerRecord().setBrokerId(i).setRack("rack" + i)); } return this; } @@ -242,8 +263,8 @@ public class GroupMetadataManagerTest { subscriptionMetadata.put(topicName, new TopicMetadata( topicImage.id(), topicImage.name(), - topicImage.partitions().size() - )); + topicImage.partitions().size(), + Collections.emptyMap())); } }); }); @@ -1462,6 +1483,7 @@ public class GroupMetadataManagerTest { .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 6) .addTopic(barTopicId, barTopicName, 3) + .addRacks() .build()) .build(); @@ -1519,8 +1541,8 @@ public class GroupMetadataManagerTest { List expectedRecords = Arrays.asList( RecordHelpers.newMemberSubscriptionRecord(groupId, expectedMember), RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new HashMap() {{ - put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6)); - put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3)); + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6))); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3, mkMapOfPartitionRacks(3))); }}), RecordHelpers.newGroupEpochRecord(groupId, 1), RecordHelpers.newTargetAssignmentRecord(groupId, memberId, mkAssignment( @@ -1551,6 +1573,7 @@ public class GroupMetadataManagerTest { .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 6) .addTopic(barTopicId, barTopicName, 3) + .addRacks() .build()) .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) .withMember(new ConsumerGroupMember.Builder(memberId) @@ -1559,7 +1582,7 @@ public class GroupMetadataManagerTest { .setTargetMemberEpoch(10) .setClientId("client") .setClientHost("localhost/127.0.0.1") - .setSubscribedTopicNames(Arrays.asList("foo")) + .setSubscribedTopicNames(Collections.singletonList("foo")) .setServerAssignorName("range") .setAssignedPartitions(mkAssignment( mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5))) @@ -1617,8 +1640,8 @@ public class GroupMetadataManagerTest { RecordHelpers.newMemberSubscriptionRecord(groupId, expectedMember), RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new HashMap() { { - put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6)); - put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3)); + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6))); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3, mkMapOfPartitionRacks(3))); } }), RecordHelpers.newGroupEpochRecord(groupId, 11), @@ -1652,6 +1675,7 @@ public class GroupMetadataManagerTest { .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 6) .addTopic(barTopicId, barTopicName, 3) + .addRacks() .build()) .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) .withMember(new ConsumerGroupMember.Builder(memberId1) @@ -1731,7 +1755,7 @@ public class GroupMetadataManagerTest { .setPartitions(Arrays.asList(4, 5)), new ConsumerGroupHeartbeatResponseData.TopicPartitions() .setTopicId(barTopicId) - .setPartitions(Arrays.asList(2)) + .setPartitions(Collections.singletonList(2)) ))), result.response() ); @@ -1752,6 +1776,12 @@ public class GroupMetadataManagerTest { List expectedRecords = Arrays.asList( RecordHelpers.newMemberSubscriptionRecord(groupId, expectedMember3), + RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new HashMap() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6))); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3, mkMapOfPartitionRacks(3))); + } + }), RecordHelpers.newGroupEpochRecord(groupId, 11), RecordHelpers.newTargetAssignmentRecord(groupId, memberId1, mkAssignment( mkTopicAssignment(fooTopicId, 0, 1), @@ -1769,9 +1799,9 @@ public class GroupMetadataManagerTest { RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember3) ); - assertRecordsEquals(expectedRecords.subList(0, 2), result.records().subList(0, 2)); - assertUnorderedListEquals(expectedRecords.subList(2, 5), result.records().subList(2, 5)); - assertRecordsEquals(expectedRecords.subList(5, 7), result.records().subList(5, 7)); + assertRecordsEquals(expectedRecords.subList(0, 3), result.records().subList(0, 3)); + assertUnorderedListEquals(expectedRecords.subList(3, 6), result.records().subList(3, 6)); + assertRecordsEquals(expectedRecords.subList(6, 8), result.records().subList(6, 8)); } @Test @@ -1797,6 +1827,7 @@ public class GroupMetadataManagerTest { .addTopic(fooTopicId, fooTopicName, 6) .addTopic(barTopicId, barTopicName, 3) .addTopic(zarTopicId, zarTopicName, 1) + .addRacks() .build()) .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) .withMember(new ConsumerGroupMember.Builder(memberId1) @@ -1857,8 +1888,8 @@ public class GroupMetadataManagerTest { // Subscription metadata is recomputed because zar is no longer there. RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new HashMap() { { - put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6)); - put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3)); + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6))); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3, mkMapOfPartitionRacks(3))); } }), RecordHelpers.newGroupEpochRecord(groupId, 11) @@ -1887,6 +1918,7 @@ public class GroupMetadataManagerTest { .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 6) .addTopic(barTopicId, barTopicName, 3) + .addRacks() .build()) .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) .withMember(new ConsumerGroupMember.Builder(memberId1) @@ -2332,6 +2364,7 @@ public class GroupMetadataManagerTest { .withAssignors(Collections.singletonList(assignor)) .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 6) + .addRacks() .build()) .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) .withMember(new ConsumerGroupMember.Builder(memberId1) @@ -2692,13 +2725,14 @@ public class GroupMetadataManagerTest { PartitionAssignor assignor = mock(PartitionAssignor.class); when(assignor.name()).thenReturn("range"); - when(assignor.assign(any())).thenThrow(new PartitionAssignorException("Assignment failed.")); + when(assignor.assign(any(), any())).thenThrow(new PartitionAssignorException("Assignment failed.")); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withAssignors(Collections.singletonList(assignor)) .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 6) .addTopic(barTopicId, barTopicName, 3) + .addRacks() .build()) .build(); @@ -2732,6 +2766,7 @@ public class GroupMetadataManagerTest { .withConsumerGroupMetadataRefreshIntervalMs(5 * 60 * 1000) .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 6) + .addRacks() .build()) .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) .withMember(new ConsumerGroupMember.Builder(memberId) @@ -2753,7 +2788,7 @@ public class GroupMetadataManagerTest { { // foo only has 3 partitions stored in the metadata but foo has // 6 partitions the metadata image. - put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 3)); + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 3, mkMapOfPartitionRacks(3))); } })) .build(); @@ -2807,7 +2842,7 @@ public class GroupMetadataManagerTest { List expectedRecords = Arrays.asList( RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new HashMap() { { - put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6)); + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6))); } }), RecordHelpers.newGroupEpochRecord(groupId, 11), @@ -2842,6 +2877,7 @@ public class GroupMetadataManagerTest { .withConsumerGroupMetadataRefreshIntervalMs(5 * 60 * 1000) .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 6) + .addRacks() .build()) .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) .withMember(new ConsumerGroupMember.Builder(memberId) @@ -2863,7 +2899,7 @@ public class GroupMetadataManagerTest { { // foo only has 3 partitions stored in the metadata but foo has // 6 partitions the metadata image. - put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 3)); + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 3, mkMapOfPartitionRacks(3))); } })) .build(); @@ -2935,7 +2971,7 @@ public class GroupMetadataManagerTest { List expectedRecords = Arrays.asList( RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new HashMap() { { - put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6)); + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6))); } }), RecordHelpers.newGroupEpochRecord(groupId, 11), @@ -3172,6 +3208,7 @@ public class GroupMetadataManagerTest { .withAssignors(Collections.singletonList(assignor)) .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 6) + .addRacks() .build()) .build(); @@ -3246,6 +3283,7 @@ public class GroupMetadataManagerTest { .withAssignors(Collections.singletonList(assignor)) .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 6) + .addRacks() .build()) .build(); @@ -3311,6 +3349,7 @@ public class GroupMetadataManagerTest { .withAssignors(Collections.singletonList(assignor)) .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 3) + .addRacks() .build()) .build(); @@ -3550,6 +3589,7 @@ public class GroupMetadataManagerTest { .withAssignors(Collections.singletonList(assignor)) .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 3) + .addRacks() .build()) .build(); @@ -5737,7 +5777,7 @@ public class GroupMetadataManagerTest { assertEquals(Errors.LEADER_NOT_AVAILABLE, Errors.LEADER_NOT_AVAILABLE); } - private void assertUnorderedListEquals( + public static void assertUnorderedListEquals( List expected, List actual ) { @@ -5797,86 +5837,6 @@ public class GroupMetadataManagerTest { return assignmentMap; } - private void assertRecordsEquals( - List expectedRecords, - List actualRecords - ) { - try { - assertEquals(expectedRecords.size(), actualRecords.size()); - - for (int i = 0; i < expectedRecords.size(); i++) { - Record expectedRecord = expectedRecords.get(i); - Record actualRecord = actualRecords.get(i); - assertRecordEquals(expectedRecord, actualRecord); - } - } catch (AssertionFailedError e) { - assertionFailure() - .expected(expectedRecords) - .actual(actualRecords) - .buildAndThrow(); - } - } - - private void assertRecordEquals( - Record expected, - Record actual - ) { - try { - assertApiMessageAndVersionEquals(expected.key(), actual.key()); - assertApiMessageAndVersionEquals(expected.value(), actual.value()); - } catch (AssertionFailedError e) { - assertionFailure() - .expected(expected) - .actual(actual) - .buildAndThrow(); - } - } - - private void assertApiMessageAndVersionEquals( - ApiMessageAndVersion expected, - ApiMessageAndVersion actual - ) { - if (expected == actual) return; - - assertEquals(expected.version(), actual.version()); - - if (actual.message() instanceof ConsumerGroupCurrentMemberAssignmentValue) { - // The order of the topics stored in ConsumerGroupCurrentMemberAssignmentValue is not - // always guaranteed. Therefore, we need a special comparator. - ConsumerGroupCurrentMemberAssignmentValue expectedValue = - (ConsumerGroupCurrentMemberAssignmentValue) expected.message(); - ConsumerGroupCurrentMemberAssignmentValue actualValue = - (ConsumerGroupCurrentMemberAssignmentValue) actual.message(); - - assertEquals(expectedValue.memberEpoch(), actualValue.memberEpoch()); - assertEquals(expectedValue.previousMemberEpoch(), actualValue.previousMemberEpoch()); - assertEquals(expectedValue.targetMemberEpoch(), actualValue.targetMemberEpoch()); - assertEquals(expectedValue.error(), actualValue.error()); - assertEquals(expectedValue.metadataVersion(), actualValue.metadataVersion()); - assertEquals(expectedValue.metadataBytes(), actualValue.metadataBytes()); - - // We transform those to Maps before comparing them. - assertEquals(fromTopicPartitions(expectedValue.assignedPartitions()), - fromTopicPartitions(actualValue.assignedPartitions())); - assertEquals(fromTopicPartitions(expectedValue.partitionsPendingRevocation()), - fromTopicPartitions(actualValue.partitionsPendingRevocation())); - assertEquals(fromTopicPartitions(expectedValue.partitionsPendingAssignment()), - fromTopicPartitions(actualValue.partitionsPendingAssignment())); - } else { - assertEquals(expected.message(), actual.message()); - } - } - - private Map> fromTopicPartitions( - List assignment - ) { - Map> assignmentMap = new HashMap<>(); - assignment.forEach(topicPartitions -> { - assignmentMap.put(topicPartitions.topicId(), new HashSet<>(topicPartitions.partitions())); - }); - return assignmentMap; - } - private List verifyGenericGroupJoinResponses( List> responseFutures, int expectedSuccessCount, diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java index bdea8b79529..03b11cbed6f 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java @@ -52,6 +52,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.EnumSource; import org.junit.jupiter.params.provider.MethodSource; +import org.opentest4j.AssertionFailedError; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; @@ -59,6 +60,8 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; + import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -71,6 +74,7 @@ import java.util.stream.Stream; import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkSortedAssignment; import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkSortedTopicAssignment; import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; +import static org.apache.kafka.coordinator.group.GroupMetadataManagerTest.assertUnorderedListEquals; import static org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentRecord; import static org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentTombstoneRecord; import static org.apache.kafka.coordinator.group.RecordHelpers.newGroupEpochRecord; @@ -83,8 +87,10 @@ import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignme import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentEpochTombstoneRecord; import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentRecord; import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord; +import static org.junit.jupiter.api.AssertionFailureBuilder.assertionFailure; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.fail; public class RecordHelpersTest { @@ -161,15 +167,18 @@ public class RecordHelpersTest { Uuid fooTopicId = Uuid.randomUuid(); Uuid barTopicId = Uuid.randomUuid(); Map subscriptionMetadata = new LinkedHashMap<>(); + subscriptionMetadata.put("foo", new TopicMetadata( fooTopicId, "foo", - 10 + 10, + mkMapOfPartitionRacks(10) )); subscriptionMetadata.put("bar", new TopicMetadata( barTopicId, "bar", - 20 + 20, + mkMapOfPartitionRacks(20) )); Record expectedRecord = new Record( @@ -184,14 +193,16 @@ public class RecordHelpersTest { new ConsumerGroupPartitionMetadataValue.TopicMetadata() .setTopicId(fooTopicId) .setTopicName("foo") - .setNumPartitions(10), + .setNumPartitions(10) + .setPartitionMetadata(mkListOfPartitionRacks(10)), new ConsumerGroupPartitionMetadataValue.TopicMetadata() .setTopicId(barTopicId) .setTopicName("bar") - .setNumPartitions(20))), + .setNumPartitions(20) + .setPartitionMetadata(mkListOfPartitionRacks(20)))), (short) 0)); - assertEquals(expectedRecord, newGroupSubscriptionMetadataRecord( + assertRecordEquals(expectedRecord, newGroupSubscriptionMetadataRecord( "group-id", subscriptionMetadata )); @@ -212,6 +223,52 @@ public class RecordHelpersTest { )); } + @Test + public void testEmptyPartitionMetadataWhenRacksUnavailableGroupSubscriptionMetadataRecord() { + Uuid fooTopicId = Uuid.randomUuid(); + Uuid barTopicId = Uuid.randomUuid(); + Map subscriptionMetadata = new LinkedHashMap<>(); + + subscriptionMetadata.put("foo", new TopicMetadata( + fooTopicId, + "foo", + 10, + Collections.emptyMap() + )); + subscriptionMetadata.put("bar", new TopicMetadata( + barTopicId, + "bar", + 20, + Collections.emptyMap() + )); + + Record expectedRecord = new Record( + new ApiMessageAndVersion( + new ConsumerGroupPartitionMetadataKey() + .setGroupId("group-id"), + (short) 4 + ), + new ApiMessageAndVersion( + new ConsumerGroupPartitionMetadataValue() + .setTopics(Arrays.asList( + new ConsumerGroupPartitionMetadataValue.TopicMetadata() + .setTopicId(fooTopicId) + .setTopicName("foo") + .setNumPartitions(10) + .setPartitionMetadata(Collections.emptyList()), + new ConsumerGroupPartitionMetadataValue.TopicMetadata() + .setTopicId(barTopicId) + .setTopicName("bar") + .setNumPartitions(20) + .setPartitionMetadata(Collections.emptyList()))), + (short) 0)); + + assertRecordEquals(expectedRecord, newGroupSubscriptionMetadataRecord( + "group-id", + subscriptionMetadata + )); + } + @Test public void testNewGroupEpochRecord() { Record expectedRecord = new Record( @@ -620,7 +677,7 @@ public class RecordHelpersTest { MetadataVersion.IBP_3_5_IV2 )); } - + @ParameterizedTest @MethodSource("metadataToExpectedGroupMetadataValue") public void testEmptyGroupMetadataRecord( @@ -764,4 +821,182 @@ public class RecordHelpersTest { Record record = RecordHelpers.newOffsetCommitTombstoneRecord("group-id", "foo", 1); assertEquals(expectedRecord, record); } + + /** + * Creates a list of values to be added to the record and assigns partitions to racks for testing. + * + * @param numPartitions The number of partitions for the topic. + * + * For testing purposes, the following criteria are used: + * - Number of replicas for each partition: 2 + * - Number of racks available to the cluster: 4 + */ + public static List mkListOfPartitionRacks(int numPartitions) { + List partitionRacks = new ArrayList<>(numPartitions); + for (int i = 0; i < numPartitions; i++) { + List racks = new ArrayList<>(Arrays.asList("rack" + i % 4, "rack" + (i + 1) % 4)); + partitionRacks.add( + new ConsumerGroupPartitionMetadataValue.PartitionMetadata() + .setPartition(i) + .setRacks(racks) + ); + } + return partitionRacks; + } + + /** + * Creates a map of partitions to racks for testing. + * + * @param numPartitions The number of partitions for the topic. + * + * For testing purposes, the following criteria are used: + * - Number of replicas for each partition: 2 + * - Number of racks available to the cluster: 4 + */ + public static Map> mkMapOfPartitionRacks(int numPartitions) { + Map> partitionRacks = new HashMap<>(numPartitions); + for (int i = 0; i < numPartitions; i++) { + partitionRacks.put(i, new HashSet<>(Arrays.asList("rack" + i % 4, "rack" + (i + 1) % 4))); + } + return partitionRacks; + } + + /** + * Asserts whether the two provided lists of records are equal. + * + * @param expectedRecords The expected list of records. + * @param actualRecords The actual list of records. + */ + public static void assertRecordsEquals( + List expectedRecords, + List actualRecords + ) { + try { + assertEquals(expectedRecords.size(), actualRecords.size()); + + for (int i = 0; i < expectedRecords.size(); i++) { + Record expectedRecord = expectedRecords.get(i); + Record actualRecord = actualRecords.get(i); + assertRecordEquals(expectedRecord, actualRecord); + } + } catch (AssertionFailedError e) { + assertionFailure() + .expected(expectedRecords) + .actual(actualRecords) + .buildAndThrow(); + } + } + + /** + * Asserts if the two provided records are equal. + * + * @param expectedRecord The expected record. + * @param actualRecord The actual record. + */ + public static void assertRecordEquals( + Record expectedRecord, + Record actualRecord + ) { + try { + assertApiMessageAndVersionEquals(expectedRecord.key(), actualRecord.key()); + assertApiMessageAndVersionEquals(expectedRecord.value(), actualRecord.value()); + } catch (AssertionFailedError e) { + assertionFailure() + .expected(expectedRecord) + .actual(actualRecord) + .buildAndThrow(); + } + } + + private static void assertApiMessageAndVersionEquals( + ApiMessageAndVersion expected, + ApiMessageAndVersion actual + ) { + if (expected == actual) return; + + assertEquals(expected.version(), actual.version()); + + if (actual.message() instanceof ConsumerGroupCurrentMemberAssignmentValue) { + // The order of the topics stored in ConsumerGroupCurrentMemberAssignmentValue is not + // always guaranteed. Therefore, we need a special comparator. + ConsumerGroupCurrentMemberAssignmentValue expectedValue = + (ConsumerGroupCurrentMemberAssignmentValue) expected.message(); + ConsumerGroupCurrentMemberAssignmentValue actualValue = + (ConsumerGroupCurrentMemberAssignmentValue) actual.message(); + + assertEquals(expectedValue.memberEpoch(), actualValue.memberEpoch()); + assertEquals(expectedValue.previousMemberEpoch(), actualValue.previousMemberEpoch()); + assertEquals(expectedValue.targetMemberEpoch(), actualValue.targetMemberEpoch()); + assertEquals(expectedValue.error(), actualValue.error()); + assertEquals(expectedValue.metadataVersion(), actualValue.metadataVersion()); + assertEquals(expectedValue.metadataBytes(), actualValue.metadataBytes()); + + // We transform those to Maps before comparing them. + assertEquals(fromTopicPartitions(expectedValue.assignedPartitions()), + fromTopicPartitions(actualValue.assignedPartitions())); + assertEquals(fromTopicPartitions(expectedValue.partitionsPendingRevocation()), + fromTopicPartitions(actualValue.partitionsPendingRevocation())); + assertEquals(fromTopicPartitions(expectedValue.partitionsPendingAssignment()), + fromTopicPartitions(actualValue.partitionsPendingAssignment())); + } else if (actual.message() instanceof ConsumerGroupPartitionMetadataValue) { + // The order of the racks stored in the PartitionMetadata of the ConsumerGroupPartitionMetadataValue + // is not always guaranteed. Therefore, we need a special comparator. + ConsumerGroupPartitionMetadataValue expectedValue = + (ConsumerGroupPartitionMetadataValue) expected.message(); + ConsumerGroupPartitionMetadataValue actualValue = + (ConsumerGroupPartitionMetadataValue) actual.message(); + + List expectedTopicMetadataList = + expectedValue.topics(); + List actualTopicMetadataList = + actualValue.topics(); + + if (expectedTopicMetadataList.size() != actualTopicMetadataList.size()) { + fail("Topic metadata lists have different sizes"); + } + + for (int i = 0; i < expectedTopicMetadataList.size(); i++) { + ConsumerGroupPartitionMetadataValue.TopicMetadata expectedTopicMetadata = + expectedTopicMetadataList.get(i); + ConsumerGroupPartitionMetadataValue.TopicMetadata actualTopicMetadata = + actualTopicMetadataList.get(i); + + assertEquals(expectedTopicMetadata.topicId(), actualTopicMetadata.topicId()); + assertEquals(expectedTopicMetadata.topicName(), actualTopicMetadata.topicName()); + assertEquals(expectedTopicMetadata.numPartitions(), actualTopicMetadata.numPartitions()); + + List expectedPartitionMetadataList = + expectedTopicMetadata.partitionMetadata(); + List actualPartitionMetadataList = + actualTopicMetadata.partitionMetadata(); + + // If the list is empty, rack information wasn't available for any replica of + // the partition and hence, the entry wasn't added to the record. + if (expectedPartitionMetadataList.size() != actualPartitionMetadataList.size()) { + fail("Partition metadata lists have different sizes"); + } else if (!expectedPartitionMetadataList.isEmpty() && !actualPartitionMetadataList.isEmpty()) { + for (int j = 0; j < expectedTopicMetadataList.size(); j++) { + ConsumerGroupPartitionMetadataValue.PartitionMetadata expectedPartitionMetadata = + expectedPartitionMetadataList.get(j); + ConsumerGroupPartitionMetadataValue.PartitionMetadata actualPartitionMetadata = + actualPartitionMetadataList.get(j); + + assertEquals(expectedPartitionMetadata.partition(), actualPartitionMetadata.partition()); + assertUnorderedListEquals(expectedPartitionMetadata.racks(), actualPartitionMetadata.racks()); + } + } + } + } else { + assertEquals(expected.message(), actual.message()); + } + } + + private static Map> fromTopicPartitions( + List assignment + ) { + Map> assignmentMap = new HashMap<>(); + assignment.forEach(topicPartitions -> + assignmentMap.put(topicPartitions.topicId(), new HashSet<>(topicPartitions.partitions()))); + return assignmentMap; + } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java index 91f6385f104..eb58c653b13 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java @@ -18,6 +18,8 @@ package org.apache.kafka.coordinator.group.assignor; import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; import org.junit.jupiter.api.Test; import java.util.Arrays; @@ -37,15 +39,29 @@ import static org.junit.jupiter.api.Assertions.assertThrows; public class RangeAssignorTest { private final RangeAssignor assignor = new RangeAssignor(); private final Uuid topic1Uuid = Uuid.randomUuid(); + private final String topic1Name = "topic1"; private final Uuid topic2Uuid = Uuid.randomUuid(); + private final String topic2Name = "topic2"; private final Uuid topic3Uuid = Uuid.randomUuid(); + private final String topic3Name = "topic3"; private final String consumerA = "A"; private final String consumerB = "B"; private final String consumerC = "C"; @Test public void testOneConsumerNoTopic() { - Map topics = Collections.singletonMap(topic1Uuid, new AssignmentTopicMetadata(3)); + SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata( + Collections.singletonMap( + topic1Uuid, + new TopicMetadata( + topic1Uuid, + topic1Name, + 3, + createPartitionRacks(3) + ) + ) + ); + Map members = Collections.singletonMap( consumerA, new AssignmentMemberSpec( @@ -56,15 +72,26 @@ public class RangeAssignorTest { ) ); - AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); - GroupAssignment groupAssignment = assignor.assign(assignmentSpec); + AssignmentSpec assignmentSpec = new AssignmentSpec(members); + GroupAssignment groupAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); assertEquals(Collections.emptyMap(), groupAssignment.members()); } @Test public void testOneConsumerSubscribedToNonExistentTopic() { - Map topics = Collections.singletonMap(topic1Uuid, new AssignmentTopicMetadata(3)); + SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata( + Collections.singletonMap( + topic1Uuid, + new TopicMetadata( + topic1Uuid, + topic1Name, + 3, + createPartitionRacks(3) + ) + ) + ); + Map members = Collections.singletonMap( consumerA, new AssignmentMemberSpec( @@ -75,17 +102,29 @@ public class RangeAssignorTest { ) ); - AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); + AssignmentSpec assignmentSpec = new AssignmentSpec(members); assertThrows(PartitionAssignorException.class, - () -> assignor.assign(assignmentSpec)); + () -> assignor.assign(assignmentSpec, subscribedTopicMetadata)); } @Test public void testFirstAssignmentTwoConsumersTwoTopicsSameSubscriptions() { - Map topics = new HashMap<>(); - topics.put(topic1Uuid, new AssignmentTopicMetadata(3)); - topics.put(topic3Uuid, new AssignmentTopicMetadata(2)); + Map topicMetadata = new HashMap<>(); + topicMetadata.put(topic1Uuid, new TopicMetadata( + topic1Uuid, + topic1Name, + 3, + createPartitionRacks(3) + )); + topicMetadata.put(topic3Uuid, new TopicMetadata( + topic3Uuid, + topic3Name, + 2, + createPartitionRacks(2) + )); + + SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); Map members = new TreeMap<>(); @@ -103,8 +142,8 @@ public class RangeAssignorTest { Collections.emptyMap() )); - AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); - GroupAssignment computedAssignment = assignor.assign(assignmentSpec); + AssignmentSpec assignmentSpec = new AssignmentSpec(members); + GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); Map>> expectedAssignment = new HashMap<>(); @@ -123,10 +162,27 @@ public class RangeAssignorTest { @Test public void testFirstAssignmentThreeConsumersThreeTopicsDifferentSubscriptions() { - Map topics = new HashMap<>(); - topics.put(topic1Uuid, new AssignmentTopicMetadata(3)); - topics.put(topic2Uuid, new AssignmentTopicMetadata(3)); - topics.put(topic3Uuid, new AssignmentTopicMetadata(2)); + Map topicMetadata = new HashMap<>(); + topicMetadata.put(topic1Uuid, new TopicMetadata( + topic1Uuid, + topic1Name, + 3, + createPartitionRacks(3) + )); + topicMetadata.put(topic2Uuid, new TopicMetadata( + topic2Uuid, + topic2Name, + 3, + createPartitionRacks(3) + )); + topicMetadata.put(topic3Uuid, new TopicMetadata( + topic3Uuid, + topic3Name, + 2, + createPartitionRacks(2) + )); + + SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); Map members = new TreeMap<>(); @@ -151,8 +207,8 @@ public class RangeAssignorTest { Collections.emptyMap() )); - AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); - GroupAssignment computedAssignment = assignor.assign(assignmentSpec); + AssignmentSpec assignmentSpec = new AssignmentSpec(members); + GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); Map>> expectedAssignment = new HashMap<>(); @@ -175,9 +231,21 @@ public class RangeAssignorTest { @Test public void testFirstAssignmentNumConsumersGreaterThanNumPartitions() { - Map topics = new HashMap<>(); - topics.put(topic1Uuid, new AssignmentTopicMetadata(3)); - topics.put(topic3Uuid, new AssignmentTopicMetadata(2)); + Map topicMetadata = new HashMap<>(); + topicMetadata.put(topic1Uuid, new TopicMetadata( + topic1Uuid, + topic1Name, + 3, + createPartitionRacks(3) + )); + topicMetadata.put(topic3Uuid, new TopicMetadata( + topic3Uuid, + topic3Name, + 2, + createPartitionRacks(2) + )); + + SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); Map members = new TreeMap<>(); @@ -202,8 +270,8 @@ public class RangeAssignorTest { Collections.emptyMap() )); - AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); - GroupAssignment computedAssignment = assignor.assign(assignmentSpec); + AssignmentSpec assignmentSpec = new AssignmentSpec(members); + GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); Map>> expectedAssignment = new HashMap<>(); // Topic 3 has 2 partitions but three consumers subscribed to it - one of them will not get a partition. @@ -226,9 +294,21 @@ public class RangeAssignorTest { @Test public void testReassignmentNumConsumersGreaterThanNumPartitionsWhenOneConsumerAdded() { - Map topics = new HashMap<>(); - topics.put(topic1Uuid, new AssignmentTopicMetadata(2)); - topics.put(topic2Uuid, new AssignmentTopicMetadata(2)); + Map topicMetadata = new HashMap<>(); + topicMetadata.put(topic1Uuid, new TopicMetadata( + topic1Uuid, + topic1Name, + 2, + createPartitionRacks(2) + )); + topicMetadata.put(topic2Uuid, new TopicMetadata( + topic2Uuid, + topic2Name, + 2, + createPartitionRacks(2) + )); + + SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); Map members = new TreeMap<>(); @@ -264,8 +344,8 @@ public class RangeAssignorTest { Collections.emptyMap() )); - AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); - GroupAssignment computedAssignment = assignor.assign(assignmentSpec); + AssignmentSpec assignmentSpec = new AssignmentSpec(members); + GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); Map>> expectedAssignment = new HashMap<>(); @@ -287,9 +367,21 @@ public class RangeAssignorTest { @Test public void testReassignmentWhenOnePartitionAddedForTwoConsumersTwoTopics() { // Simulating adding a partition - originally T1 -> 3 Partitions and T2 -> 3 Partitions - Map topics = new HashMap<>(); - topics.put(topic1Uuid, new AssignmentTopicMetadata(4)); - topics.put(topic2Uuid, new AssignmentTopicMetadata(4)); + Map topicMetadata = new HashMap<>(); + topicMetadata.put(topic1Uuid, new TopicMetadata( + topic1Uuid, + topic1Name, + 4, + createPartitionRacks(4) + )); + topicMetadata.put(topic2Uuid, new TopicMetadata( + topic2Uuid, + topic2Name, + 4, + createPartitionRacks(4) + )); + + SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); Map members = new TreeMap<>(); @@ -317,8 +409,8 @@ public class RangeAssignorTest { currentAssignmentForB )); - AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); - GroupAssignment computedAssignment = assignor.assign(assignmentSpec); + AssignmentSpec assignmentSpec = new AssignmentSpec(members); + GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); Map>> expectedAssignment = new HashMap<>(); @@ -337,9 +429,21 @@ public class RangeAssignorTest { @Test public void testReassignmentWhenOneConsumerAddedAfterInitialAssignmentWithTwoConsumersTwoTopics() { - Map topics = new HashMap<>(); - topics.put(topic1Uuid, new AssignmentTopicMetadata(3)); - topics.put(topic2Uuid, new AssignmentTopicMetadata(3)); + Map topicMetadata = new HashMap<>(); + topicMetadata.put(topic1Uuid, new TopicMetadata( + topic1Uuid, + topic1Name, + 3, + createPartitionRacks(3) + )); + topicMetadata.put(topic2Uuid, new TopicMetadata( + topic2Uuid, + topic2Name, + 3, + createPartitionRacks(3) + )); + + SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); Map members = new TreeMap<>(); @@ -375,8 +479,8 @@ public class RangeAssignorTest { Collections.emptyMap() )); - AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); - GroupAssignment computedAssignment = assignor.assign(assignmentSpec); + AssignmentSpec assignmentSpec = new AssignmentSpec(members); + GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); Map>> expectedAssignment = new HashMap<>(); @@ -400,10 +504,22 @@ public class RangeAssignorTest { @Test public void testReassignmentWhenOneConsumerAddedAndOnePartitionAfterInitialAssignmentWithTwoConsumersTwoTopics() { - Map topics = new HashMap<>(); // Add a new partition to topic 1, initially T1 -> 3 partitions - topics.put(topic1Uuid, new AssignmentTopicMetadata(4)); - topics.put(topic2Uuid, new AssignmentTopicMetadata(3)); + Map topicMetadata = new HashMap<>(); + topicMetadata.put(topic1Uuid, new TopicMetadata( + topic1Uuid, + topic1Name, + 4, + createPartitionRacks(4) + )); + topicMetadata.put(topic2Uuid, new TopicMetadata( + topic2Uuid, + topic2Name, + 3, + createPartitionRacks(3) + )); + + SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); Map members = new TreeMap<>(); @@ -439,8 +555,8 @@ public class RangeAssignorTest { Collections.emptyMap() )); - AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); - GroupAssignment computedAssignment = assignor.assign(assignmentSpec); + AssignmentSpec assignmentSpec = new AssignmentSpec(members); + GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); Map>> expectedAssignment = new HashMap<>(); @@ -463,9 +579,21 @@ public class RangeAssignorTest { @Test public void testReassignmentWhenOneConsumerRemovedAfterInitialAssignmentWithTwoConsumersTwoTopics() { - Map topics = new HashMap<>(); - topics.put(topic1Uuid, new AssignmentTopicMetadata(3)); - topics.put(topic2Uuid, new AssignmentTopicMetadata(3)); + Map topicMetadata = new HashMap<>(); + topicMetadata.put(topic1Uuid, new TopicMetadata( + topic1Uuid, + topic1Name, + 3, + createPartitionRacks(3) + )); + topicMetadata.put(topic2Uuid, new TopicMetadata( + topic2Uuid, + topic2Name, + 3, + createPartitionRacks(3) + )); + + SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); Map members = new TreeMap<>(); // Consumer A was removed @@ -482,8 +610,8 @@ public class RangeAssignorTest { currentAssignmentForB )); - AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); - GroupAssignment computedAssignment = assignor.assign(assignmentSpec); + AssignmentSpec assignmentSpec = new AssignmentSpec(members); + GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); Map>> expectedAssignment = new HashMap<>(); @@ -497,10 +625,27 @@ public class RangeAssignorTest { @Test public void testReassignmentWhenMultipleSubscriptionsRemovedAfterInitialAssignmentWithThreeConsumersTwoTopics() { - Map topics = new HashMap<>(); - topics.put(topic1Uuid, new AssignmentTopicMetadata(3)); - topics.put(topic2Uuid, new AssignmentTopicMetadata(3)); - topics.put(topic3Uuid, new AssignmentTopicMetadata(2)); + Map topicMetadata = new HashMap<>(); + topicMetadata.put(topic1Uuid, new TopicMetadata( + topic1Uuid, + topic1Name, + 3, + createPartitionRacks(3) + )); + topicMetadata.put(topic2Uuid, new TopicMetadata( + topic2Uuid, + topic2Name, + 3, + createPartitionRacks(3) + )); + topicMetadata.put(topic3Uuid, new TopicMetadata( + topic3Uuid, + topic3Name, + 2, + createPartitionRacks(2) + )); + + SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); Map members = new TreeMap<>(); @@ -542,8 +687,8 @@ public class RangeAssignorTest { currentAssignmentForC )); - AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); - GroupAssignment computedAssignment = assignor.assign(assignmentSpec); + AssignmentSpec assignmentSpec = new AssignmentSpec(members); + GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); Map>> expectedAssignment = new HashMap<>(); @@ -571,4 +716,14 @@ public class RangeAssignorTest { assertEquals(expectedAssignment.get(memberId), computedAssignmentForMember); } } + + // When rack awareness is enabled for this assignor, rack information can be updated in this method. + private static Map> createPartitionRacks(int numPartitions) { + Map> partitionRacks = new HashMap<>(numPartitions); + Set emptySet = Collections.emptySet(); + for (int i = 0; i < numPartitions; i++) { + partitionRacks.put(i, emptySet); + } + return partitionRacks; + } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java index 7b4923c4b17..a715d4187bb 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java @@ -34,6 +34,7 @@ import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment; import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; +import static org.apache.kafka.coordinator.group.RecordHelpersTest.mkMapOfPartitionRacks; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -398,16 +399,17 @@ public class ConsumerGroupTest { .addTopic(fooTopicId, "foo", 1) .addTopic(barTopicId, "bar", 2) .addTopic(zarTopicId, "zar", 3) + .addRacks() .build(); ConsumerGroupMember member1 = new ConsumerGroupMember.Builder("member1") - .setSubscribedTopicNames(Arrays.asList("foo")) + .setSubscribedTopicNames(Collections.singletonList("foo")) .build(); ConsumerGroupMember member2 = new ConsumerGroupMember.Builder("member2") - .setSubscribedTopicNames(Arrays.asList("bar")) + .setSubscribedTopicNames(Collections.singletonList("bar")) .build(); ConsumerGroupMember member3 = new ConsumerGroupMember.Builder("member3") - .setSubscribedTopicNames(Arrays.asList("zar")) + .setSubscribedTopicNames(Collections.singletonList("zar")) .build(); ConsumerGroup consumerGroup = createConsumerGroup("group-foo"); @@ -418,19 +420,21 @@ public class ConsumerGroupTest { consumerGroup.computeSubscriptionMetadata( null, null, - image.topics() + image.topics(), + image.cluster() ) ); // Compute while taking into account member 1. assertEquals( mkMap( - mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)) + mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1))) ), consumerGroup.computeSubscriptionMetadata( null, member1, - image.topics() + image.topics(), + image.cluster() ) ); @@ -440,12 +444,13 @@ public class ConsumerGroupTest { // It should return foo now. assertEquals( mkMap( - mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)) + mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1))) ), consumerGroup.computeSubscriptionMetadata( null, null, - image.topics() + image.topics(), + image.cluster() ) ); @@ -455,20 +460,22 @@ public class ConsumerGroupTest { consumerGroup.computeSubscriptionMetadata( member1, null, - image.topics() + image.topics(), + image.cluster() ) ); // Compute while taking into account member 2. assertEquals( mkMap( - mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)), - mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2)) + mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1))), + mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, mkMapOfPartitionRacks(2))) ), consumerGroup.computeSubscriptionMetadata( null, member2, - image.topics() + image.topics(), + image.cluster() ) ); @@ -478,51 +485,55 @@ public class ConsumerGroupTest { // It should return foo and bar. assertEquals( mkMap( - mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)), - mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2)) + mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1))), + mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, mkMapOfPartitionRacks(2))) ), consumerGroup.computeSubscriptionMetadata( null, null, - image.topics() + image.topics(), + image.cluster() ) ); // Compute while taking into account removal of member 2. assertEquals( mkMap( - mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)) + mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1))) ), consumerGroup.computeSubscriptionMetadata( member2, null, - image.topics() + image.topics(), + image.cluster() ) ); // Removing member1 results in returning bar. assertEquals( mkMap( - mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2)) + mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, mkMapOfPartitionRacks(2))) ), consumerGroup.computeSubscriptionMetadata( member1, null, - image.topics() + image.topics(), + image.cluster() ) ); // Compute while taking into account member 3. assertEquals( mkMap( - mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)), - mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2)), - mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3)) + mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1))), + mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, mkMapOfPartitionRacks(2))), + mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3, mkMapOfPartitionRacks(3))) ), consumerGroup.computeSubscriptionMetadata( null, member3, - image.topics() + image.topics(), + image.cluster() ) ); @@ -532,14 +543,15 @@ public class ConsumerGroupTest { // It should return foo, bar and zar. assertEquals( mkMap( - mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)), - mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2)), - mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3)) + mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1))), + mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, mkMapOfPartitionRacks(2))), + mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3, mkMapOfPartitionRacks(3))) ), consumerGroup.computeSubscriptionMetadata( null, null, - image.topics() + image.topics(), + image.cluster() ) ); } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/SubscribedTopicMetadataTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/SubscribedTopicMetadataTest.java new file mode 100644 index 00000000000..632b701f44a --- /dev/null +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/SubscribedTopicMetadataTest.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import static org.apache.kafka.coordinator.group.RecordHelpersTest.mkMapOfPartitionRacks; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class SubscribedTopicMetadataTest { + + @Test + public void testAttribute() { + Map topicMetadataMap = new HashMap<>(); + for (int i = 0; i < 5; i++) { + Uuid topicId = Uuid.randomUuid(); + String topicName = "topic" + i; + Map> partitionRacks = mkMapOfPartitionRacks(5); + topicMetadataMap.put( + topicId, + new TopicMetadata(topicId, topicName, 5, partitionRacks) + ); + } + assertEquals(topicMetadataMap, new SubscribedTopicMetadata(topicMetadataMap).topicMetadata()); + } + + @Test + public void testTopicMetadataCannotBeNull() { + assertThrows(NullPointerException.class, () -> new SubscribedTopicMetadata(null)); + } + + @Test + public void testEquals() { + Map topicMetadataMap = new HashMap<>(); + for (int i = 0; i < 5; i++) { + Uuid topicId = Uuid.randomUuid(); + String topicName = "topic" + i; + Map> partitionRacks = mkMapOfPartitionRacks(5); + topicMetadataMap.put( + topicId, + new TopicMetadata(topicId, topicName, 5, partitionRacks) + ); + } + SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadataMap); + assertEquals(new SubscribedTopicMetadata(topicMetadataMap), subscribedTopicMetadata); + + Map topicMetadataMap2 = new HashMap<>(); + Uuid topicId = Uuid.randomUuid(); + topicMetadataMap2.put(topicId, new TopicMetadata(topicId, "newTopic", 5, Collections.emptyMap())); + assertNotEquals(new SubscribedTopicMetadata(topicMetadataMap2), subscribedTopicMetadata); + } +} diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java index 7733f67b128..810b7617a23 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java @@ -19,7 +19,6 @@ package org.apache.kafka.coordinator.group.consumer; import org.apache.kafka.common.Uuid; import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; -import org.apache.kafka.coordinator.group.assignor.AssignmentTopicMetadata; import org.apache.kafka.coordinator.group.assignor.GroupAssignment; import org.apache.kafka.coordinator.group.assignor.MemberAssignment; import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; @@ -38,6 +37,7 @@ import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentEpochRecord; import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentRecord; +import static org.apache.kafka.coordinator.group.RecordHelpersTest.mkMapOfPartitionRacks; import static org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder.createAssignmentMemberSpec; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.ArgumentMatchers.any; @@ -84,13 +84,15 @@ public class TargetAssignmentBuilderTest { public Uuid addTopicMetadata( String topicName, - int numPartitions + int numPartitions, + Map> partitionRacks ) { Uuid topicId = Uuid.randomUuid(); subscriptionMetadata.put(topicName, new TopicMetadata( topicId, topicName, - numPartitions + numPartitions, + partitionRacks )); return topicId; } @@ -145,13 +147,13 @@ public class TargetAssignmentBuilderTest { Map memberSpecs = new HashMap<>(); // All the existing members are prepared. - members.forEach((memberId, member) -> { + members.forEach((memberId, member) -> memberSpecs.put(memberId, createAssignmentMemberSpec( member, targetAssignment.getOrDefault(memberId, Assignment.EMPTY), subscriptionMetadata - )); - }); + ) + )); // All the updated are added and all the deleted // members are removed. @@ -168,20 +170,21 @@ public class TargetAssignmentBuilderTest { }); // Prepare the expected topic metadata. - Map topicMetadata = new HashMap<>(); - subscriptionMetadata.forEach((topicName, metadata) -> { - topicMetadata.put(metadata.id(), new AssignmentTopicMetadata(metadata.numPartitions())); - }); + Map topicMetadataMap = new HashMap<>(); + subscriptionMetadata.forEach((topicName, topicMetadata) -> + topicMetadataMap.put(topicMetadata.id(), topicMetadata)); + + // Prepare the expected subscription topic metadata. + SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadataMap); // Prepare the expected assignment spec. - AssignmentSpec assignmentSpec = new AssignmentSpec( - memberSpecs, - topicMetadata - ); + AssignmentSpec assignmentSpec = new AssignmentSpec(memberSpecs); // We use `any` here to always return an assignment but use `verify` later on // to ensure that the input was correct. - when(assignor.assign(any())).thenReturn(new GroupAssignment(memberAssignments)); + when(assignor.assign(any(), any())) + .thenReturn(new GroupAssignment(memberAssignments)); + // Create and populate the assignment builder. TargetAssignmentBuilder builder = new TargetAssignmentBuilder(groupId, groupEpoch, assignor) @@ -203,7 +206,8 @@ public class TargetAssignmentBuilderTest { // Verify that the assignor was called once with the expected // assignment spec. - verify(assignor, times(1)).assign(assignmentSpec); + verify(assignor, times(1)) + .assign(assignmentSpec, subscribedTopicMetadata); return result; } @@ -222,8 +226,8 @@ public class TargetAssignmentBuilderTest { Map subscriptionMetadata = new HashMap() { { - put("foo", new TopicMetadata(fooTopicId, "foo", 5)); - put("bar", new TopicMetadata(barTopicId, "bar", 5)); + put("foo", new TopicMetadata(fooTopicId, "foo", 5, Collections.emptyMap())); + put("bar", new TopicMetadata(barTopicId, "bar", 5, Collections.emptyMap())); } }; @@ -268,8 +272,8 @@ public class TargetAssignmentBuilderTest { 20 ); - Uuid fooTopicId = context.addTopicMetadata("foo", 6); - Uuid barTopicId = context.addTopicMetadata("bar", 6); + Uuid fooTopicId = context.addTopicMetadata("foo", 6, Collections.emptyMap()); + Uuid barTopicId = context.addTopicMetadata("bar", 6, Collections.emptyMap()); context.addGroupMember("member-1", Arrays.asList("foo", "bar", "zar"), mkAssignment( mkTopicAssignment(fooTopicId, 1, 2, 3), @@ -318,8 +322,8 @@ public class TargetAssignmentBuilderTest { 20 ); - Uuid fooTopicId = context.addTopicMetadata("foo", 6); - Uuid barTopicId = context.addTopicMetadata("bar", 6); + Uuid fooTopicId = context.addTopicMetadata("foo", 6, Collections.emptyMap()); + Uuid barTopicId = context.addTopicMetadata("bar", 6, Collections.emptyMap()); context.addGroupMember("member-1", Arrays.asList("foo", "bar", "zar"), mkAssignment( mkTopicAssignment(fooTopicId, 1, 2, 3), @@ -381,8 +385,8 @@ public class TargetAssignmentBuilderTest { 20 ); - Uuid fooTopicId = context.addTopicMetadata("foo", 6); - Uuid barTopicId = context.addTopicMetadata("bar", 6); + Uuid fooTopicId = context.addTopicMetadata("foo", 6, Collections.emptyMap()); + Uuid barTopicId = context.addTopicMetadata("bar", 6, Collections.emptyMap()); context.addGroupMember("member-1", Arrays.asList("foo", "bar", "zar"), mkAssignment( mkTopicAssignment(fooTopicId, 1, 2, 3), @@ -459,8 +463,8 @@ public class TargetAssignmentBuilderTest { 20 ); - Uuid fooTopicId = context.addTopicMetadata("foo", 6); - Uuid barTopicId = context.addTopicMetadata("bar", 6); + Uuid fooTopicId = context.addTopicMetadata("foo", 6, Collections.emptyMap()); + Uuid barTopicId = context.addTopicMetadata("bar", 6, Collections.emptyMap()); context.addGroupMember("member-1", Arrays.asList("foo", "bar", "zar"), mkAssignment( mkTopicAssignment(fooTopicId, 1, 2, 3), @@ -546,8 +550,8 @@ public class TargetAssignmentBuilderTest { 20 ); - Uuid fooTopicId = context.addTopicMetadata("foo", 6); - Uuid barTopicId = context.addTopicMetadata("bar", 6); + Uuid fooTopicId = context.addTopicMetadata("foo", 6, mkMapOfPartitionRacks(6)); + Uuid barTopicId = context.addTopicMetadata("bar", 6, mkMapOfPartitionRacks(6)); context.addGroupMember("member-1", Arrays.asList("foo", "bar", "zar"), mkAssignment( mkTopicAssignment(fooTopicId, 1, 2), @@ -624,8 +628,8 @@ public class TargetAssignmentBuilderTest { 20 ); - Uuid fooTopicId = context.addTopicMetadata("foo", 6); - Uuid barTopicId = context.addTopicMetadata("bar", 6); + Uuid fooTopicId = context.addTopicMetadata("foo", 6, Collections.emptyMap()); + Uuid barTopicId = context.addTopicMetadata("bar", 6, Collections.emptyMap()); context.addGroupMember("member-1", Arrays.asList("foo", "bar", "zar"), mkAssignment( mkTopicAssignment(fooTopicId, 1, 2), @@ -687,7 +691,7 @@ public class TargetAssignmentBuilderTest { assertEquals(expectedAssignment, result.targetAssignment()); } - public static void assertUnorderedList( + private static void assertUnorderedList( List expected, List actual ) { diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TopicMetadataTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TopicMetadataTest.java index 07d790ec8ff..4129baae30b 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TopicMetadataTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TopicMetadataTest.java @@ -20,6 +20,12 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; import org.junit.jupiter.api.Test; +import java.util.Collections; +import java.util.Map; +import java.util.Set; + +import static org.apache.kafka.coordinator.group.RecordHelpersTest.mkListOfPartitionRacks; +import static org.apache.kafka.coordinator.group.RecordHelpersTest.mkMapOfPartitionRacks; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -28,24 +34,29 @@ public class TopicMetadataTest { @Test public void testAttributes() { Uuid topicId = Uuid.randomUuid(); - TopicMetadata topicMetadata = new TopicMetadata(topicId, "foo", 15); + Map> partitionRacks = mkMapOfPartitionRacks(15); + TopicMetadata topicMetadata = new TopicMetadata(topicId, "foo", 15, partitionRacks); + assertEquals(topicId, topicMetadata.id()); assertEquals("foo", topicMetadata.name()); assertEquals(15, topicMetadata.numPartitions()); + assertEquals(partitionRacks, topicMetadata.partitionRacks()); } @Test public void testTopicIdAndNameCannotBeNull() { - assertThrows(NullPointerException.class, () -> new TopicMetadata(Uuid.randomUuid(), null, 15)); - assertThrows(NullPointerException.class, () -> new TopicMetadata(null, "foo", 15)); + assertThrows(NullPointerException.class, () -> new TopicMetadata(Uuid.randomUuid(), null, 15, Collections.emptyMap())); + assertThrows(NullPointerException.class, () -> new TopicMetadata(null, "foo", 15, Collections.emptyMap())); } @Test public void testEquals() { Uuid topicId = Uuid.randomUuid(); - TopicMetadata topicMetadata = new TopicMetadata(topicId, "foo", 15); - assertEquals(new TopicMetadata(topicId, "foo", 15), topicMetadata); - assertNotEquals(new TopicMetadata(topicId, "foo", 5), topicMetadata); + Map> partitionRacks = mkMapOfPartitionRacks(15); + TopicMetadata topicMetadata = new TopicMetadata(topicId, "foo", 15, partitionRacks); + + assertEquals(new TopicMetadata(topicId, "foo", 15, partitionRacks), topicMetadata); + assertNotEquals(new TopicMetadata(topicId, "foo", 5, mkMapOfPartitionRacks(5)), topicMetadata); } @Test @@ -56,10 +67,11 @@ public class TopicMetadataTest { ConsumerGroupPartitionMetadataValue.TopicMetadata record = new ConsumerGroupPartitionMetadataValue.TopicMetadata() .setTopicId(topicId) .setTopicName(topicName) - .setNumPartitions(15); + .setNumPartitions(15) + .setPartitionMetadata(mkListOfPartitionRacks(15)); assertEquals( - new TopicMetadata(topicId, topicName, 15), + new TopicMetadata(topicId, topicName, 15, mkMapOfPartitionRacks(15)), TopicMetadata.fromRecord(record) ); }