diff --git a/build.gradle b/build.gradle index 676ddda7cab..39d9d4dfa18 100644 --- a/build.gradle +++ b/build.gradle @@ -1017,6 +1017,7 @@ project(':core') { api libs.scalaLibrary implementation project(':server-common') + implementation project(':group-coordinator:group-coordinator-api') implementation project(':group-coordinator') implementation project(':transaction-coordinator') implementation project(':metadata') @@ -1400,6 +1401,66 @@ project(':metadata') { } } +project(':group-coordinator:group-coordinator-api') { + base { + archivesName = "kafka-group-coordinator-api" + } + + dependencies { + implementation project(':clients') + } + + task createVersionFile() { + def receiptFile = file("$buildDir/kafka/$buildVersionFileName") + inputs.property "commitId", commitId + inputs.property "version", version + outputs.file receiptFile + + doLast { + def data = [ + commitId: commitId, + version: version, + ] + + receiptFile.parentFile.mkdirs() + def content = data.entrySet().collect { "$it.key=$it.value" }.sort().join("\n") + receiptFile.setText(content, "ISO-8859-1") + } + } + + sourceSets { + main { + java { + srcDirs = ["src/main/java"] + } + } + test { + java { + srcDirs = ["src/test/java"] + } + } + } + + jar { + dependsOn createVersionFile + from("$buildDir") { + include "kafka/$buildVersionFileName" + } + } + + clean.doFirst { + delete "$buildDir/kafka/" + } + + javadoc { + include "**/org/apache/kafka/coordinator/group/api/**" + } + + checkstyle { + configProperties = checkstyleConfigProperties("import-control-group-coordinator.xml") + } +} + project(':group-coordinator') { base { archivesName = "kafka-group-coordinator" @@ -1413,6 +1474,7 @@ project(':group-coordinator') { implementation project(':server-common') implementation project(':clients') implementation project(':metadata') + implementation project(':group-coordinator:group-coordinator-api') implementation project(':storage') implementation libs.jacksonDatabind implementation libs.jacksonJDK8Datatypes @@ -2957,6 +3019,7 @@ project(':jmh-benchmarks') { implementation project(':raft') implementation project(':clients') implementation project(':group-coordinator') + implementation project(':group-coordinator:group-coordinator-api') implementation project(':metadata') implementation project(':storage') implementation project(':streams') diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index d1f5bf43c0f..8f3933b6bfb 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -37,7 +37,7 @@ import org.apache.kafka.common.utils.Utils import org.apache.kafka.coordinator.group.ConsumerGroupMigrationPolicy import org.apache.kafka.coordinator.group.Group.GroupType import org.apache.kafka.coordinator.group.GroupCoordinatorConfig -import org.apache.kafka.coordinator.group.assignor.ConsumerGroupPartitionAssignor +import org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor import org.apache.kafka.coordinator.transaction.{TransactionLogConfigs, TransactionStateManagerConfigs} import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.raft.QuorumConfig diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/ConsumerGroupPartitionAssignor.java b/group-coordinator/group-coordinator-api/src/main/java/org/apache/kafka/coordinator/group/api/assignor/ConsumerGroupPartitionAssignor.java similarity index 85% rename from group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/ConsumerGroupPartitionAssignor.java rename to group-coordinator/group-coordinator-api/src/main/java/org/apache/kafka/coordinator/group/api/assignor/ConsumerGroupPartitionAssignor.java index da315b38d6c..f46860f33a5 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/ConsumerGroupPartitionAssignor.java +++ b/group-coordinator/group-coordinator-api/src/main/java/org/apache/kafka/coordinator/group/api/assignor/ConsumerGroupPartitionAssignor.java @@ -14,15 +14,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.coordinator.group.assignor; +package org.apache.kafka.coordinator.group.api.assignor; import org.apache.kafka.common.annotation.InterfaceStability; /** * Server-side partition assignor for consumer groups used by the GroupCoordinator. * - * The interface is kept in an internal module until KIP-848 is fully - * implemented and ready to be released. + * The new consumer group protocol is in preview so this interface is considered + * unstable until Apache Kafka 4.0. */ @InterfaceStability.Unstable public interface ConsumerGroupPartitionAssignor extends PartitionAssignor { diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupAssignment.java b/group-coordinator/group-coordinator-api/src/main/java/org/apache/kafka/coordinator/group/api/assignor/GroupAssignment.java similarity index 89% rename from group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupAssignment.java rename to group-coordinator/group-coordinator-api/src/main/java/org/apache/kafka/coordinator/group/api/assignor/GroupAssignment.java index f5cee5bf2e9..bf126334a17 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupAssignment.java +++ b/group-coordinator/group-coordinator-api/src/main/java/org/apache/kafka/coordinator/group/api/assignor/GroupAssignment.java @@ -14,7 +14,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.coordinator.group.assignor; +package org.apache.kafka.coordinator.group.api.assignor; + +import org.apache.kafka.common.annotation.InterfaceStability; import java.util.Map; import java.util.Objects; @@ -22,6 +24,7 @@ import java.util.Objects; /** * The partition assignment for a consumer group. */ +@InterfaceStability.Unstable public class GroupAssignment { /** * The member assignments keyed by member id. @@ -31,8 +34,7 @@ public class GroupAssignment { public GroupAssignment( Map members ) { - Objects.requireNonNull(members); - this.members = members; + this.members = Objects.requireNonNull(members); } /** diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpec.java b/group-coordinator/group-coordinator-api/src/main/java/org/apache/kafka/coordinator/group/api/assignor/GroupSpec.java similarity index 78% rename from group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpec.java rename to group-coordinator/group-coordinator-api/src/main/java/org/apache/kafka/coordinator/group/api/assignor/GroupSpec.java index 6a0dc509c54..ec417099629 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpec.java +++ b/group-coordinator/group-coordinator-api/src/main/java/org/apache/kafka/coordinator/group/api/assignor/GroupSpec.java @@ -14,17 +14,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.coordinator.group.assignor; +package org.apache.kafka.coordinator.group.api.assignor; import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.annotation.InterfaceStability; import java.util.Collection; -import java.util.Map; -import java.util.Set; /** * The group metadata specifications required to compute the target assignment. */ +@InterfaceStability.Unstable public interface GroupSpec { /** * @return All the member Ids of the consumer group. @@ -45,18 +45,18 @@ public interface GroupSpec { /** * Gets the member subscription specification for a member. * - * @param memberId The member Id. + * @param memberId The member Id. * @return The member's subscription metadata. * @throws IllegalArgumentException If the member Id isn't found. */ - MemberSubscriptionSpec memberSubscription(String memberId); + MemberSubscription memberSubscription(String memberId); /** * Gets the current assignment of the member. * - * @param memberId The member Id. - * @return A map of topic Ids to sets of partition numbers. - * An empty map is returned if the member Id isn't found. + * @param memberId The member Id. + * @return The member's assignment or an empty assignment if the + * member does not have one. */ - Map> memberAssignment(String memberId); + MemberAssignment memberAssignment(String memberId); } diff --git a/group-coordinator/group-coordinator-api/src/main/java/org/apache/kafka/coordinator/group/api/assignor/MemberAssignment.java b/group-coordinator/group-coordinator-api/src/main/java/org/apache/kafka/coordinator/group/api/assignor/MemberAssignment.java new file mode 100644 index 00000000000..52b5c564e7d --- /dev/null +++ b/group-coordinator/group-coordinator-api/src/main/java/org/apache/kafka/coordinator/group/api/assignor/MemberAssignment.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.api.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Map; +import java.util.Set; + +/** + * The partition assignment for a consumer group member. + */ +@InterfaceStability.Unstable +public interface MemberAssignment { + /** + * @return The assigned partitions keyed by topic Ids. + */ + Map> partitions(); +} diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/MemberSubscriptionSpec.java b/group-coordinator/group-coordinator-api/src/main/java/org/apache/kafka/coordinator/group/api/assignor/MemberSubscription.java similarity index 87% rename from group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/MemberSubscriptionSpec.java rename to group-coordinator/group-coordinator-api/src/main/java/org/apache/kafka/coordinator/group/api/assignor/MemberSubscription.java index 382183a13cf..4f3cf45eee4 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/MemberSubscriptionSpec.java +++ b/group-coordinator/group-coordinator-api/src/main/java/org/apache/kafka/coordinator/group/api/assignor/MemberSubscription.java @@ -14,9 +14,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.coordinator.group.assignor; +package org.apache.kafka.coordinator.group.api.assignor; import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.annotation.InterfaceStability; import java.util.Optional; import java.util.Set; @@ -24,7 +25,8 @@ import java.util.Set; /** * Interface representing the subscription metadata for a group member. */ -public interface MemberSubscriptionSpec { +@InterfaceStability.Unstable +public interface MemberSubscription { /** * Gets the rack Id if present. * diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignor.java b/group-coordinator/group-coordinator-api/src/main/java/org/apache/kafka/coordinator/group/api/assignor/PartitionAssignor.java similarity index 90% rename from group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignor.java rename to group-coordinator/group-coordinator-api/src/main/java/org/apache/kafka/coordinator/group/api/assignor/PartitionAssignor.java index f8b74bc218c..3d4f8efbaa6 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignor.java +++ b/group-coordinator/group-coordinator-api/src/main/java/org/apache/kafka/coordinator/group/api/assignor/PartitionAssignor.java @@ -14,15 +14,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.coordinator.group.assignor; +package org.apache.kafka.coordinator.group.api.assignor; import org.apache.kafka.common.annotation.InterfaceStability; /** * Server-side partition assignor used by the GroupCoordinator. * - * The interface is kept in an internal module until KIP-848 is fully - * implemented and ready to be released. + * The new consumer group protocol is in preview so this interface is considered + * unstable until Apache Kafka 4.0. */ @InterfaceStability.Unstable public interface PartitionAssignor { diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignorException.java b/group-coordinator/group-coordinator-api/src/main/java/org/apache/kafka/coordinator/group/api/assignor/PartitionAssignorException.java similarity index 95% rename from group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignorException.java rename to group-coordinator/group-coordinator-api/src/main/java/org/apache/kafka/coordinator/group/api/assignor/PartitionAssignorException.java index 482ad02f13a..deb2e09b402 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignorException.java +++ b/group-coordinator/group-coordinator-api/src/main/java/org/apache/kafka/coordinator/group/api/assignor/PartitionAssignorException.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.coordinator.group.assignor; +package org.apache.kafka.coordinator.group.api.assignor; import org.apache.kafka.common.errors.ApiException; diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SubscribedTopicDescriber.java b/group-coordinator/group-coordinator-api/src/main/java/org/apache/kafka/coordinator/group/api/assignor/SubscribedTopicDescriber.java similarity index 97% rename from group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SubscribedTopicDescriber.java rename to group-coordinator/group-coordinator-api/src/main/java/org/apache/kafka/coordinator/group/api/assignor/SubscribedTopicDescriber.java index 28586648cbf..ca1e5a514b8 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SubscribedTopicDescriber.java +++ b/group-coordinator/group-coordinator-api/src/main/java/org/apache/kafka/coordinator/group/api/assignor/SubscribedTopicDescriber.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.coordinator.group.assignor; +package org.apache.kafka.coordinator.group.api.assignor; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.annotation.InterfaceStability; diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SubscriptionType.java b/group-coordinator/group-coordinator-api/src/main/java/org/apache/kafka/coordinator/group/api/assignor/SubscriptionType.java similarity index 90% rename from group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SubscriptionType.java rename to group-coordinator/group-coordinator-api/src/main/java/org/apache/kafka/coordinator/group/api/assignor/SubscriptionType.java index 130dfb6e3a7..cab35bbf3db 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SubscriptionType.java +++ b/group-coordinator/group-coordinator-api/src/main/java/org/apache/kafka/coordinator/group/api/assignor/SubscriptionType.java @@ -14,11 +14,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.coordinator.group.assignor; +package org.apache.kafka.coordinator.group.api.assignor; + +import org.apache.kafka.common.annotation.InterfaceStability; /** * The subscription type followed by a consumer group. */ +@InterfaceStability.Unstable public enum SubscriptionType { /** * A homogeneous subscription type means that all the members diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java index 7939dfa630e..84219aa46ff 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java @@ -17,7 +17,7 @@ package org.apache.kafka.coordinator.group; import org.apache.kafka.common.record.CompressionType; -import org.apache.kafka.coordinator.group.assignor.ConsumerGroupPartitionAssignor; +import org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor; import org.apache.kafka.coordinator.group.assignor.RangeAssignor; import org.apache.kafka.coordinator.group.assignor.UniformAssignor; diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 34dcaf146b1..6463d78d9ce 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -58,10 +58,10 @@ import org.apache.kafka.common.requests.JoinGroupRequest; import org.apache.kafka.common.requests.RequestContext; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.coordinator.group.assignor.ConsumerGroupPartitionAssignor; -import org.apache.kafka.coordinator.group.assignor.MemberAssignment; -import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException; -import org.apache.kafka.coordinator.group.assignor.SubscriptionType; +import org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor; +import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException; +import org.apache.kafka.coordinator.group.api.assignor.SubscriptionType; import org.apache.kafka.coordinator.group.consumer.Assignment; import org.apache.kafka.coordinator.group.consumer.ConsumerGroup; import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; @@ -1926,7 +1926,7 @@ public class GroupMetadataManager { MemberAssignment newMemberAssignment = assignmentResult.targetAssignment().get(updatedMember.memberId()); if (newMemberAssignment != null) { - return new Assignment(newMemberAssignment.targetPartitions()); + return new Assignment(newMemberAssignment.partitions()); } else { return Assignment.EMPTY; } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AbstractUniformAssignmentBuilder.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AbstractUniformAssignmentBuilder.java index bd9b0b00779..2c171133087 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AbstractUniformAssignmentBuilder.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AbstractUniformAssignmentBuilder.java @@ -17,6 +17,9 @@ package org.apache.kafka.coordinator.group.assignor; import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber; import org.apache.kafka.server.common.TopicIdPartition; import java.util.Collection; @@ -42,7 +45,7 @@ public abstract class AbstractUniformAssignmentBuilder { int partition ) { memberAssignments.get(memberId) - .targetPartitions() + .partitions() .computeIfAbsent(topicId, __ -> new HashSet<>()) .add(partition); } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilder.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilder.java index 884594a0576..46c4e363e56 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilder.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilder.java @@ -17,6 +17,12 @@ package org.apache.kafka.coordinator.group.assignor; import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.api.assignor.GroupSpec; +import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException; +import org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber; +import org.apache.kafka.coordinator.group.consumer.MemberAssignmentImpl; import org.apache.kafka.server.common.TopicIdPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -124,7 +130,7 @@ public class GeneralUniformAssignmentBuilder extends AbstractUniformAssignmentBu } subscribedTopicIds.add(topicId); membersPerTopic.computeIfAbsent(topicId, k -> new ArrayList<>()).add(memberId); - targetAssignment.put(memberId, new MemberAssignment(new HashMap<>())); + targetAssignment.put(memberId, new MemberAssignmentImpl(new HashMap<>())); }) ); this.unassignedPartitions = new HashSet<>(topicIdPartitions(subscribedTopicIds, subscribedTopicDescriber)); @@ -191,7 +197,7 @@ public class GeneralUniformAssignmentBuilder extends AbstractUniformAssignmentBu */ private void assignStickyPartitions() { groupSpec.memberIds().forEach(memberId -> - groupSpec.memberAssignment(memberId).forEach((topicId, currentAssignment) -> { + groupSpec.memberAssignment(memberId).partitions().forEach((topicId, currentAssignment) -> { if (groupSpec.memberSubscription(memberId).subscribedTopicIds().contains(topicId)) { currentAssignment.forEach(partition -> { TopicIdPartition topicIdPartition = new TopicIdPartition(topicId, partition); @@ -244,7 +250,7 @@ public class GeneralUniformAssignmentBuilder extends AbstractUniformAssignmentBu * @return true if the member can participate in reassignment, false otherwise. */ private boolean canMemberParticipateInReassignment(String memberId) { - Set assignedTopicIds = targetAssignment.get(memberId).targetPartitions().keySet(); + Set assignedTopicIds = targetAssignment.get(memberId).partitions().keySet(); int currentAssignmentSize = assignmentManager.targetAssignmentSize(memberId); int maxAssignmentSize = assignmentManager.maxAssignmentSize(memberId); @@ -292,7 +298,7 @@ public class GeneralUniformAssignmentBuilder extends AbstractUniformAssignmentBu // Otherwise make sure it cannot get any more partitions. for (Uuid topicId : groupSpec.memberSubscription(member).subscribedTopicIds()) { - Set assignedPartitions = targetAssignment.get(member).targetPartitions().get(topicId); + Set assignedPartitions = targetAssignment.get(member).partitions().get(topicId); for (int i = 0; i < subscribedTopicDescriber.numPartitions(topicId); i++) { TopicIdPartition topicIdPartition = new TopicIdPartition(topicId, i); if (assignedPartitions == null || !assignedPartitions.contains(i)) { @@ -784,7 +790,7 @@ public class GeneralUniformAssignmentBuilder extends AbstractUniformAssignmentBu * @param memberId Member that the partition needs to be revoked from. */ private void removePartitionFromTargetAssignment(TopicIdPartition topicIdPartition, String memberId) { - Map> targetPartitionsMap = targetAssignment.get(memberId).targetPartitions(); + Map> targetPartitionsMap = targetAssignment.get(memberId).partitions(); Set partitionsSet = targetPartitionsMap.get(topicIdPartition.topicId()); // Remove the partition from the assignment, if there are no more partitions from a particular topic, // remove the topic from the assignment as well. diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java index a97e0b98ba9..8631b3ce9ae 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java @@ -17,6 +17,12 @@ package org.apache.kafka.coordinator.group.assignor; import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.api.assignor.GroupSpec; +import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException; +import org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber; +import org.apache.kafka.coordinator.group.consumer.MemberAssignmentImpl; import org.apache.kafka.server.common.TopicIdPartition; import java.util.ArrayList; @@ -160,7 +166,7 @@ public class OptimizedUniformAssignmentBuilder { */ private void maybeRevokePartitions() { for (String memberId : groupSpec.memberIds()) { - Map> oldAssignment = groupSpec.memberAssignment(memberId); + Map> oldAssignment = groupSpec.memberAssignment(memberId).partitions(); Map> newAssignment = null; // The assignor expects to receive the assignment as an immutable map. It leverages @@ -219,9 +225,9 @@ public class OptimizedUniformAssignmentBuilder { } if (newAssignment == null) { - targetAssignment.put(memberId, new MemberAssignment(oldAssignment)); + targetAssignment.put(memberId, new MemberAssignmentImpl(oldAssignment)); } else { - targetAssignment.put(memberId, new MemberAssignment(newAssignment)); + targetAssignment.put(memberId, new MemberAssignmentImpl(newAssignment)); } } } @@ -236,12 +242,12 @@ public class OptimizedUniformAssignmentBuilder { String memberId = unfilledMember.memberId; int remainingQuota = unfilledMember.remainingQuota; - Map> newAssignment = targetAssignment.get(memberId).targetPartitions(); + Map> newAssignment = targetAssignment.get(memberId).partitions(); if (isImmutableMap(newAssignment)) { // If the new assignment is immutable, we must create a deep copy of it // before altering it. newAssignment = deepCopy(newAssignment); - targetAssignment.put(memberId, new MemberAssignment(newAssignment)); + targetAssignment.put(memberId, new MemberAssignmentImpl(newAssignment)); } for (int i = 0; i < remainingQuota && unassignedPartitionIndex < unassignedPartitions.size(); i++) { diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java index d59bef034be..d4ef372263b 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java @@ -17,6 +17,13 @@ package org.apache.kafka.coordinator.group.assignor; import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor; +import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.api.assignor.GroupSpec; +import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException; +import org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber; +import org.apache.kafka.coordinator.group.consumer.MemberAssignmentImpl; import java.util.ArrayList; import java.util.Collection; @@ -28,7 +35,7 @@ import java.util.Map; import java.util.Set; import static java.lang.Math.min; -import static org.apache.kafka.coordinator.group.assignor.SubscriptionType.HOMOGENEOUS; +import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS; /** * This Range Assignor inherits properties of both the range assignor and the sticky assignor. @@ -162,7 +169,9 @@ public class RangeAssignor implements ConsumerGroupPartitionAssignor { List potentiallyUnfilledMembers = new ArrayList<>(); for (String memberId : membersForTopic) { - Set assignedPartitionsForTopic = groupSpec.memberAssignment(memberId) + Set assignedPartitionsForTopic = groupSpec + .memberAssignment(memberId) + .partitions() .getOrDefault(topicId, Collections.emptySet()); int currentAssignmentSize = assignedPartitionsForTopic.size(); @@ -177,8 +186,8 @@ public class RangeAssignor implements ConsumerGroupPartitionAssignor { for (int i = 0; i < retainedPartitionsCount; i++) { assignedStickyPartitionsForTopic .add(currentAssignmentListForTopic.get(i)); - newTargetAssignment.computeIfAbsent(memberId, k -> new MemberAssignment(new HashMap<>())) - .targetPartitions() + newTargetAssignment.computeIfAbsent(memberId, k -> new MemberAssignmentImpl(new HashMap<>())) + .partitions() .computeIfAbsent(topicId, k -> new HashSet<>()) .add(currentAssignmentListForTopic.get(i)); } @@ -198,8 +207,8 @@ public class RangeAssignor implements ConsumerGroupPartitionAssignor { // add the extra partition that will be present at the index right after min quota was satisfied. assignedStickyPartitionsForTopic .add(currentAssignmentListForTopic.get(minRequiredQuota)); - newTargetAssignment.computeIfAbsent(memberId, k -> new MemberAssignment(new HashMap<>())) - .targetPartitions() + newTargetAssignment.computeIfAbsent(memberId, k -> new MemberAssignmentImpl(new HashMap<>())) + .partitions() .computeIfAbsent(topicId, k -> new HashSet<>()) .add(currentAssignmentListForTopic.get(minRequiredQuota)); } else { @@ -233,8 +242,8 @@ public class RangeAssignor implements ConsumerGroupPartitionAssignor { List partitionsToAssign = unassignedPartitionsForTopic .subList(unassignedPartitionsListStartPointer, unassignedPartitionsListStartPointer + remaining); unassignedPartitionsListStartPointer += remaining; - newTargetAssignment.computeIfAbsent(memberId, k -> new MemberAssignment(new HashMap<>())) - .targetPartitions() + newTargetAssignment.computeIfAbsent(memberId, k -> new MemberAssignmentImpl(new HashMap<>())) + .partitions() .computeIfAbsent(topicId, k -> new HashSet<>()) .addAll(partitionsToAssign); } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformAssignor.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformAssignor.java index 177c511b548..8ae76313676 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformAssignor.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformAssignor.java @@ -16,12 +16,17 @@ */ package org.apache.kafka.coordinator.group.assignor; +import org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor; +import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.api.assignor.GroupSpec; +import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException; +import org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Collections; -import static org.apache.kafka.coordinator.group.assignor.SubscriptionType.HOMOGENEOUS; +import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS; /** * The Uniform Assignor distributes topic partitions among group members for a diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/Assignment.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/Assignment.java index f70601ec0cc..317bd3ed057 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/Assignment.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/Assignment.java @@ -17,6 +17,7 @@ package org.apache.kafka.coordinator.group.consumer; import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment; import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; import java.util.Collections; @@ -29,7 +30,7 @@ import java.util.stream.Collectors; /** * An immutable assignment for a member. */ -public class Assignment { +public class Assignment implements MemberAssignment { public static final Assignment EMPTY = new Assignment(Collections.emptyMap()); /** @@ -46,6 +47,7 @@ public class Assignment { /** * @return The assigned partitions. */ + @Override public Map> partitions() { return partitions; } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java index 98f37a7ed6c..91aa26c5023 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java @@ -33,7 +33,7 @@ import org.apache.kafka.coordinator.group.OffsetExpirationCondition; import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl; import org.apache.kafka.coordinator.group.CoordinatorRecord; import org.apache.kafka.coordinator.group.CoordinatorRecordHelpers; -import org.apache.kafka.coordinator.group.assignor.SubscriptionType; +import org.apache.kafka.coordinator.group.api.assignor.SubscriptionType; import org.apache.kafka.coordinator.group.classic.ClassicGroup; import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard; @@ -56,8 +56,8 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; -import static org.apache.kafka.coordinator.group.assignor.SubscriptionType.HETEROGENEOUS; -import static org.apache.kafka.coordinator.group.assignor.SubscriptionType.HOMOGENEOUS; +import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HETEROGENEOUS; +import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS; import static org.apache.kafka.coordinator.group.consumer.ConsumerGroup.ConsumerGroupState.ASSIGNING; import static org.apache.kafka.coordinator.group.consumer.ConsumerGroup.ConsumerGroupState.EMPTY; import static org.apache.kafka.coordinator.group.consumer.ConsumerGroup.ConsumerGroupState.RECONCILING; diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpecImpl.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/GroupSpecImpl.java similarity index 75% rename from group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpecImpl.java rename to group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/GroupSpecImpl.java index 3f5b1d89550..39239de497a 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpecImpl.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/GroupSpecImpl.java @@ -14,15 +14,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.coordinator.group.assignor; +package org.apache.kafka.coordinator.group.consumer; import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.api.assignor.GroupSpec; +import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.api.assignor.MemberSubscription; +import org.apache.kafka.coordinator.group.api.assignor.SubscriptionType; import java.util.Collection; import java.util.Collections; import java.util.Map; import java.util.Objects; -import java.util.Set; /** * The assignment specification for a consumer group. @@ -31,7 +34,7 @@ public class GroupSpecImpl implements GroupSpec { /** * Member subscription metadata keyed by member Id. */ - private final Map memberSubscriptions; + private final Map members; /** * The subscription type of the group. @@ -45,11 +48,11 @@ public class GroupSpecImpl implements GroupSpec { private final Map> invertedMemberAssignment; public GroupSpecImpl( - Map memberSubscriptions, + Map members, SubscriptionType subscriptionType, Map> invertedMemberAssignment ) { - this.memberSubscriptions = Objects.requireNonNull(memberSubscriptions); + this.members = Objects.requireNonNull(members); this.subscriptionType = Objects.requireNonNull(subscriptionType); this.invertedMemberAssignment = Objects.requireNonNull(invertedMemberAssignment); } @@ -59,7 +62,7 @@ public class GroupSpecImpl implements GroupSpec { */ @Override public Collection memberIds() { - return memberSubscriptions.keySet(); + return members.keySet(); } /** @@ -86,8 +89,8 @@ public class GroupSpecImpl implements GroupSpec { * {@inheritDoc} */ @Override - public MemberSubscriptionSpec memberSubscription(String memberId) { - MemberSubscriptionSpec memberSubscription = memberSubscriptions.get(memberId); + public MemberSubscription memberSubscription(String memberId) { + MemberSubscription memberSubscription = members.get(memberId); if (memberSubscription == null) { throw new IllegalArgumentException("Member Id " + memberId + " not found."); } @@ -98,12 +101,12 @@ public class GroupSpecImpl implements GroupSpec { * {@inheritDoc} */ @Override - public Map> memberAssignment(String memberId) { - MemberSubscriptionSpecImpl memberSubscription = memberSubscriptions.get(memberId); - if (memberSubscription == null) { - return Collections.emptyMap(); + public MemberAssignment memberAssignment(String memberId) { + MemberSubscriptionAndAssignmentImpl member = members.get(memberId); + if (member == null) { + return new MemberAssignmentImpl(Collections.emptyMap()); } - return memberSubscription.memberAssignment(); + return member; } @Override @@ -112,13 +115,13 @@ public class GroupSpecImpl implements GroupSpec { if (o == null || getClass() != o.getClass()) return false; GroupSpecImpl that = (GroupSpecImpl) o; return subscriptionType == that.subscriptionType && - memberSubscriptions.equals(that.memberSubscriptions) && + members.equals(that.members) && invertedMemberAssignment.equals(that.invertedMemberAssignment); } @Override public int hashCode() { - int result = memberSubscriptions.hashCode(); + int result = members.hashCode(); result = 31 * result + subscriptionType.hashCode(); result = 31 * result + invertedMemberAssignment.hashCode(); return result; @@ -126,7 +129,7 @@ public class GroupSpecImpl implements GroupSpec { @Override public String toString() { - return "GroupSpecImpl(memberSubscriptions=" + memberSubscriptions + + return "GroupSpecImpl(members=" + members + ", subscriptionType=" + subscriptionType + ", invertedMemberAssignment=" + invertedMemberAssignment + ')'; diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/MemberAssignment.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/MemberAssignmentImpl.java similarity index 61% rename from group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/MemberAssignment.java rename to group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/MemberAssignmentImpl.java index ca19b3e883b..d94ac9a3890 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/MemberAssignment.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/MemberAssignmentImpl.java @@ -14,9 +14,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.coordinator.group.assignor; +package org.apache.kafka.coordinator.group.consumer; import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment; import java.util.Map; import java.util.Objects; @@ -25,39 +26,39 @@ import java.util.Set; /** * The partition assignment for a consumer group member. */ -public class MemberAssignment { +public class MemberAssignmentImpl implements MemberAssignment { /** - * The target partitions assigned to this member keyed by topicId. + * The partitions assigned to this member keyed by topicId. */ - private final Map> targetPartitions; + private final Map> partitions; - public MemberAssignment(Map> targetPartitions) { - Objects.requireNonNull(targetPartitions); - this.targetPartitions = targetPartitions; + public MemberAssignmentImpl(Map> partitions) { + this.partitions = Objects.requireNonNull(partitions); } /** - * @return Target partitions keyed by topic Ids. + * @return The assigned partitions keyed by topic Ids. */ - public Map> targetPartitions() { - return this.targetPartitions; + @Override + public Map> partitions() { + return this.partitions; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; - MemberAssignment that = (MemberAssignment) o; - return targetPartitions.equals(that.targetPartitions); + MemberAssignmentImpl that = (MemberAssignmentImpl) o; + return partitions.equals(that.partitions); } @Override public int hashCode() { - return targetPartitions.hashCode(); + return partitions.hashCode(); } @Override public String toString() { - return "MemberAssignment(targetPartitions=" + targetPartitions + ')'; + return "MemberAssignment(partitions=" + partitions + ')'; } } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/MemberSubscriptionSpecImpl.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/MemberSubscriptionAndAssignmentImpl.java similarity index 77% rename from group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/MemberSubscriptionSpecImpl.java rename to group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/MemberSubscriptionAndAssignmentImpl.java index 00d86c367e5..b59ae51f6c2 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/MemberSubscriptionSpecImpl.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/MemberSubscriptionAndAssignmentImpl.java @@ -14,10 +14,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.coordinator.group.assignor; +package org.apache.kafka.coordinator.group.consumer; import org.apache.kafka.common.Uuid; -import org.apache.kafka.coordinator.group.consumer.Assignment; +import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.api.assignor.MemberSubscription; import java.util.Map; import java.util.Objects; @@ -25,21 +26,21 @@ import java.util.Optional; import java.util.Set; /** - * Implementation of the {@link MemberSubscriptionSpec} interface. + * Implementation of the {@link MemberSubscription} and the {@link MemberAssignment} interfaces. */ -public class MemberSubscriptionSpecImpl implements MemberSubscriptionSpec { +public class MemberSubscriptionAndAssignmentImpl implements MemberSubscription, MemberAssignment { private final Optional rackId; private final Set subscribedTopicIds; private final Assignment memberAssignment; /** - * Constructs a new {@code MemberSubscriptionSpecImpl}. + * Constructs a new {@code MemberSubscriptionAndAssignmentImpl}. * * @param rackId The rack Id. * @param subscribedTopicIds The set of subscribed topic Ids. * @param memberAssignment The current member assignment. */ - public MemberSubscriptionSpecImpl( + public MemberSubscriptionAndAssignmentImpl( Optional rackId, Set subscribedTopicIds, Assignment memberAssignment @@ -59,7 +60,8 @@ public class MemberSubscriptionSpecImpl implements MemberSubscriptionSpec { return subscribedTopicIds; } - public Map> memberAssignment() { + @Override + public Map> partitions() { return memberAssignment.partitions(); } @@ -67,7 +69,7 @@ public class MemberSubscriptionSpecImpl implements MemberSubscriptionSpec { public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; - MemberSubscriptionSpecImpl that = (MemberSubscriptionSpecImpl) o; + MemberSubscriptionAndAssignmentImpl that = (MemberSubscriptionAndAssignmentImpl) o; return rackId.equals(that.rackId) && subscribedTopicIds.equals(that.subscribedTopicIds) && memberAssignment.equals(that.memberAssignment); @@ -83,7 +85,7 @@ public class MemberSubscriptionSpecImpl implements MemberSubscriptionSpec { @Override public String toString() { - return "MemberSubscriptionSpecImpl(rackId=" + rackId.orElse("N/A") + + return "MemberSubscriptionAndAssignmentImpl(rackId=" + rackId.orElse("N/A") + ", subscribedTopicIds=" + subscribedTopicIds + ", memberAssignment=" + memberAssignment + ')'; diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/SubscribedTopicMetadata.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/SubscribedTopicDescriberImpl.java similarity index 89% rename from group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/SubscribedTopicMetadata.java rename to group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/SubscribedTopicDescriberImpl.java index c5a6e134ff2..08e693a48ef 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/SubscribedTopicMetadata.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/SubscribedTopicDescriberImpl.java @@ -17,8 +17,8 @@ package org.apache.kafka.coordinator.group.consumer; import org.apache.kafka.common.Uuid; -import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; -import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber; +import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber; import java.util.Collections; import java.util.Map; @@ -29,14 +29,14 @@ import java.util.Set; * The subscribed topic metadata class is used by the {@link PartitionAssignor} to obtain * topic and partition metadata for the topics that the consumer group is subscribed to. */ -public class SubscribedTopicMetadata implements SubscribedTopicDescriber { +public class SubscribedTopicDescriberImpl implements SubscribedTopicDescriber { /** * The topic Ids mapped to their corresponding {@link TopicMetadata} * object, which contains topic and partition metadata. */ private final Map topicMetadata; - public SubscribedTopicMetadata(Map topicMetadata) { + public SubscribedTopicDescriberImpl(Map topicMetadata) { this.topicMetadata = Objects.requireNonNull(topicMetadata); } @@ -80,7 +80,7 @@ public class SubscribedTopicMetadata implements SubscribedTopicDescriber { public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; - SubscribedTopicMetadata that = (SubscribedTopicMetadata) o; + SubscribedTopicDescriberImpl that = (SubscribedTopicDescriberImpl) o; return topicMetadata.equals(that.topicMetadata); } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java index 38e6999383f..acb4372b101 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java @@ -18,13 +18,11 @@ package org.apache.kafka.coordinator.group.consumer; import org.apache.kafka.common.Uuid; import org.apache.kafka.coordinator.group.CoordinatorRecord; -import org.apache.kafka.coordinator.group.assignor.GroupSpecImpl; -import org.apache.kafka.coordinator.group.assignor.MemberSubscriptionSpecImpl; -import org.apache.kafka.coordinator.group.assignor.SubscriptionType; -import org.apache.kafka.coordinator.group.assignor.GroupAssignment; -import org.apache.kafka.coordinator.group.assignor.MemberAssignment; -import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; -import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException; +import org.apache.kafka.coordinator.group.api.assignor.SubscriptionType; +import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException; import org.apache.kafka.image.TopicsImage; import java.util.ArrayList; @@ -293,11 +291,11 @@ public class TargetAssignmentBuilder { * @throws PartitionAssignorException if the target assignment cannot be computed. */ public TargetAssignmentResult build() throws PartitionAssignorException { - Map memberSpecs = new HashMap<>(); + Map memberSpecs = new HashMap<>(); // Prepare the member spec for all members. members.forEach((memberId, member) -> - memberSpecs.put(memberId, createMemberSubscriptionSpecImpl( + memberSpecs.put(memberId, createMemberSubscriptionAndAssignment( member, targetAssignment.getOrDefault(memberId, Assignment.EMPTY), topicsImage @@ -319,7 +317,7 @@ public class TargetAssignmentBuilder { } } - memberSpecs.put(memberId, createMemberSubscriptionSpecImpl( + memberSpecs.put(memberId, createMemberSubscriptionAndAssignment( updatedMemberOrNull, assignment, topicsImage @@ -343,7 +341,7 @@ public class TargetAssignmentBuilder { subscriptionType, invertedTargetAssignment ), - new SubscribedTopicMetadata(topicMetadataMap) + new SubscribedTopicDescriberImpl(topicMetadataMap) ); // Compute delta from previous to new target assignment and create the @@ -377,19 +375,19 @@ public class TargetAssignmentBuilder { ) { MemberAssignment newMemberAssignment = newGroupAssignment.members().get(memberId); if (newMemberAssignment != null) { - return new Assignment(newMemberAssignment.targetPartitions()); + return new Assignment(newMemberAssignment.partitions()); } else { return Assignment.EMPTY; } } // private for testing - static MemberSubscriptionSpecImpl createMemberSubscriptionSpecImpl( + static MemberSubscriptionAndAssignmentImpl createMemberSubscriptionAndAssignment( ConsumerGroupMember member, Assignment memberAssignment, TopicsImage topicsImage ) { - return new MemberSubscriptionSpecImpl( + return new MemberSubscriptionAndAssignmentImpl( Optional.ofNullable(member.rackId()), new TopicIds(member.subscribedTopicNames(), topicsImage), memberAssignment diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/AssignmentTestUtil.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/AssignmentTestUtil.java index 5042c8a7590..a7ab186cfa1 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/AssignmentTestUtil.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/AssignmentTestUtil.java @@ -17,8 +17,8 @@ package org.apache.kafka.coordinator.group; import org.apache.kafka.common.Uuid; -import org.apache.kafka.coordinator.group.assignor.GroupAssignment; -import org.apache.kafka.coordinator.group.assignor.MemberSubscriptionSpecImpl; +import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.consumer.MemberSubscriptionAndAssignmentImpl; import java.util.AbstractMap; import java.util.Arrays; @@ -80,7 +80,7 @@ public class AssignmentTestUtil { ) { assertEquals(expectedAssignment.size(), computedGroupAssignment.members().size()); computedGroupAssignment.members().forEach((memberId, memberAssignment) -> { - Map> computedAssignmentForMember = memberAssignment.targetPartitions(); + Map> computedAssignmentForMember = memberAssignment.partitions(); assertEquals(expectedAssignment.get(memberId), computedAssignmentForMember); }); } @@ -92,12 +92,12 @@ public class AssignmentTestUtil { * @return Map of topic partition to member assignments. */ public static Map> invertedTargetAssignment( - Map members + Map members ) { Map> invertedTargetAssignment = new HashMap<>(); - for (Map.Entry memberEntry : members.entrySet()) { + for (Map.Entry memberEntry : members.entrySet()) { String memberId = memberEntry.getKey(); - Map> memberAssignment = memberEntry.getValue().memberAssignment(); + Map> memberAssignment = memberEntry.getValue().partitions(); for (Map.Entry> topicEntry : memberAssignment.entrySet()) { Uuid topicId = topicEntry.getKey(); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java index e8906c4418a..03306c90407 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java @@ -17,7 +17,7 @@ package org.apache.kafka.coordinator.group; import org.apache.kafka.common.record.CompressionType; -import org.apache.kafka.coordinator.group.assignor.ConsumerGroupPartitionAssignor; +import org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor; import org.apache.kafka.coordinator.group.assignor.RangeAssignor; import org.junit.jupiter.api.Test; diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index abf48fd6415..ad68c96aaab 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -61,10 +61,11 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.coordinator.group.MockCoordinatorTimer.ExpiredTimeout; import org.apache.kafka.coordinator.group.MockCoordinatorTimer.ScheduledTimeout; -import org.apache.kafka.coordinator.group.assignor.ConsumerGroupPartitionAssignor; -import org.apache.kafka.coordinator.group.assignor.GroupAssignment; -import org.apache.kafka.coordinator.group.assignor.MemberAssignment; -import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException; +import org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor; +import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.consumer.MemberAssignmentImpl; +import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException; import org.apache.kafka.coordinator.group.classic.ClassicGroupState; import org.apache.kafka.coordinator.group.consumer.Assignment; import org.apache.kafka.coordinator.group.consumer.ConsumerGroup; @@ -435,7 +436,7 @@ public class GroupMetadataManagerTest { .build(); assignor.prepareGroupAssignment(new GroupAssignment( - Collections.singletonMap(memberId, new MemberAssignment(mkAssignment( + Collections.singletonMap(memberId, new MemberAssignmentImpl(mkAssignment( mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5), mkTopicAssignment(barTopicId, 0, 1, 2) ))) @@ -540,7 +541,7 @@ public class GroupMetadataManagerTest { .build(); assignor.prepareGroupAssignment(new GroupAssignment( - Collections.singletonMap(memberId, new MemberAssignment(mkAssignment( + Collections.singletonMap(memberId, new MemberAssignmentImpl(mkAssignment( mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5), mkTopicAssignment(barTopicId, 0, 1, 2) ))) @@ -663,15 +664,15 @@ public class GroupMetadataManagerTest { assignor.prepareGroupAssignment(new GroupAssignment( new HashMap() { { - put(memberId1, new MemberAssignment(mkAssignment( + put(memberId1, new MemberAssignmentImpl(mkAssignment( mkTopicAssignment(fooTopicId, 0, 1), mkTopicAssignment(barTopicId, 0) ))); - put(memberId2, new MemberAssignment(mkAssignment( + put(memberId2, new MemberAssignmentImpl(mkAssignment( mkTopicAssignment(fooTopicId, 2, 3), mkTopicAssignment(barTopicId, 1) ))); - put(memberId3, new MemberAssignment(mkAssignment( + put(memberId3, new MemberAssignmentImpl(mkAssignment( mkTopicAssignment(fooTopicId, 4, 5), mkTopicAssignment(barTopicId, 2) ))); @@ -897,15 +898,15 @@ public class GroupMetadataManagerTest { assignor.prepareGroupAssignment(new GroupAssignment( new HashMap() { { - put(memberId1, new MemberAssignment(mkAssignment( + put(memberId1, new MemberAssignmentImpl(mkAssignment( mkTopicAssignment(fooTopicId, 0, 1), mkTopicAssignment(barTopicId, 0) ))); - put(memberId2, new MemberAssignment(mkAssignment( + put(memberId2, new MemberAssignmentImpl(mkAssignment( mkTopicAssignment(fooTopicId, 2, 3), mkTopicAssignment(barTopicId, 1) ))); - put(memberId3, new MemberAssignment(mkAssignment( + put(memberId3, new MemberAssignmentImpl(mkAssignment( mkTopicAssignment(fooTopicId, 4, 5), mkTopicAssignment(barTopicId, 2) ))); @@ -1047,12 +1048,12 @@ public class GroupMetadataManagerTest { assignor.prepareGroupAssignment(new GroupAssignment( new HashMap() { { - put(memberId1, new MemberAssignment(mkAssignment( + put(memberId1, new MemberAssignmentImpl(mkAssignment( mkTopicAssignment(fooTopicId, 0, 1, 2), mkTopicAssignment(barTopicId, 0, 1) ))); // When the member rejoins, it gets the same assignments. - put(member2RejoinId, new MemberAssignment(mkAssignment( + put(member2RejoinId, new MemberAssignmentImpl(mkAssignment( mkTopicAssignment(fooTopicId, 3, 4, 5), mkTopicAssignment(barTopicId, 2) ))); @@ -1680,7 +1681,7 @@ public class GroupMetadataManagerTest { assignor.prepareGroupAssignment(new GroupAssignment( new HashMap() { { - put(memberId, new MemberAssignment(mkAssignment( + put(memberId, new MemberAssignmentImpl(mkAssignment( mkTopicAssignment(fooTopicId, 0, 1) ))); } @@ -1816,15 +1817,15 @@ public class GroupMetadataManagerTest { assignor.prepareGroupAssignment(new GroupAssignment( new HashMap() { { - put(memberId1, new MemberAssignment(mkAssignment( + put(memberId1, new MemberAssignmentImpl(mkAssignment( mkTopicAssignment(fooTopicId, 0, 1), mkTopicAssignment(barTopicId, 0) ))); - put(memberId2, new MemberAssignment(mkAssignment( + put(memberId2, new MemberAssignmentImpl(mkAssignment( mkTopicAssignment(fooTopicId, 2, 3), mkTopicAssignment(barTopicId, 2) ))); - put(memberId3, new MemberAssignment(mkAssignment( + put(memberId3, new MemberAssignmentImpl(mkAssignment( mkTopicAssignment(fooTopicId, 4, 5), mkTopicAssignment(barTopicId, 1) ))); @@ -2395,7 +2396,7 @@ public class GroupMetadataManagerTest { // Prepare the assignment result. assignor.prepareGroupAssignment(new GroupAssignment( - Collections.singletonMap(memberId, new MemberAssignment(mkAssignment( + Collections.singletonMap(memberId, new MemberAssignmentImpl(mkAssignment( mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5) ))) )); @@ -2506,7 +2507,7 @@ public class GroupMetadataManagerTest { // Prepare the assignment result. assignor.prepareGroupAssignment(new GroupAssignment( - Collections.singletonMap(memberId, new MemberAssignment(mkAssignment( + Collections.singletonMap(memberId, new MemberAssignmentImpl(mkAssignment( mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5) ))) )); @@ -2808,7 +2809,7 @@ public class GroupMetadataManagerTest { .build(); assignor.prepareGroupAssignment(new GroupAssignment( - Collections.singletonMap(memberId, new MemberAssignment(mkAssignment( + Collections.singletonMap(memberId, new MemberAssignmentImpl(mkAssignment( mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5) ))) )); @@ -2883,7 +2884,7 @@ public class GroupMetadataManagerTest { .build(); assignor.prepareGroupAssignment(new GroupAssignment( - Collections.singletonMap(memberId, new MemberAssignment(mkAssignment( + Collections.singletonMap(memberId, new MemberAssignmentImpl(mkAssignment( mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5) ))) )); @@ -2947,7 +2948,7 @@ public class GroupMetadataManagerTest { .build(); assignor.prepareGroupAssignment(new GroupAssignment( - Collections.singletonMap(memberId, new MemberAssignment(mkAssignment( + Collections.singletonMap(memberId, new MemberAssignmentImpl(mkAssignment( mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5) ))) )); @@ -3031,7 +3032,7 @@ public class GroupMetadataManagerTest { assignor.prepareGroupAssignment(new GroupAssignment( new HashMap() { { - put(memberId1, new MemberAssignment(mkAssignment( + put(memberId1, new MemberAssignmentImpl(mkAssignment( mkTopicAssignment(fooTopicId, 0, 1, 2) ))); } @@ -3071,10 +3072,10 @@ public class GroupMetadataManagerTest { assignor.prepareGroupAssignment(new GroupAssignment( new HashMap() { { - put(memberId1, new MemberAssignment(mkAssignment( + put(memberId1, new MemberAssignmentImpl(mkAssignment( mkTopicAssignment(fooTopicId, 0, 1) ))); - put(memberId2, new MemberAssignment(mkAssignment( + put(memberId2, new MemberAssignmentImpl(mkAssignment( mkTopicAssignment(fooTopicId, 2) ))); } @@ -3186,7 +3187,7 @@ public class GroupMetadataManagerTest { assignor.prepareGroupAssignment(new GroupAssignment( new HashMap() { { - put(memberId1, new MemberAssignment(mkAssignment( + put(memberId1, new MemberAssignmentImpl(mkAssignment( mkTopicAssignment(fooTopicId, 0, 1, 2) ))); } @@ -3226,10 +3227,10 @@ public class GroupMetadataManagerTest { assignor.prepareGroupAssignment(new GroupAssignment( new HashMap() { { - put(memberId1, new MemberAssignment(mkAssignment( + put(memberId1, new MemberAssignmentImpl(mkAssignment( mkTopicAssignment(fooTopicId, 0, 1) ))); - put(memberId2, new MemberAssignment(mkAssignment( + put(memberId2, new MemberAssignmentImpl(mkAssignment( mkTopicAssignment(fooTopicId, 2) ))); } @@ -9233,7 +9234,7 @@ public class GroupMetadataManagerTest { .build(); assignor.prepareGroupAssignment(new GroupAssignment( - Collections.singletonMap(memberId, new MemberAssignment(mkAssignment( + Collections.singletonMap(memberId, new MemberAssignmentImpl(mkAssignment( mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5), mkTopicAssignment(barTopicId, 0, 1, 2) ))) @@ -9476,10 +9477,10 @@ public class GroupMetadataManagerTest { assignor.prepareGroupAssignment(new GroupAssignment( new HashMap() { { - put(memberId1, new MemberAssignment(mkAssignment( + put(memberId1, new MemberAssignmentImpl(mkAssignment( mkTopicAssignment(fooTopicId, 0) ))); - put(memberId2, new MemberAssignment(mkAssignment( + put(memberId2, new MemberAssignmentImpl(mkAssignment( mkTopicAssignment(barTopicId, 0) ))); } @@ -9652,13 +9653,13 @@ public class GroupMetadataManagerTest { assignor.prepareGroupAssignment(new GroupAssignment( new HashMap() { { - put(memberId1, new MemberAssignment(mkAssignment( + put(memberId1, new MemberAssignmentImpl(mkAssignment( mkTopicAssignment(fooTopicId, 0) ))); - put(memberId2, new MemberAssignment(mkAssignment( + put(memberId2, new MemberAssignmentImpl(mkAssignment( mkTopicAssignment(barTopicId, 0) ))); - put(memberId3, new MemberAssignment(mkAssignment( + put(memberId3, new MemberAssignmentImpl(mkAssignment( mkTopicAssignment(fooTopicId, 1) ))); } @@ -9899,13 +9900,13 @@ public class GroupMetadataManagerTest { assignor.prepareGroupAssignment(new GroupAssignment( new HashMap() { { - put(memberId1, new MemberAssignment(mkAssignment( + put(memberId1, new MemberAssignmentImpl(mkAssignment( mkTopicAssignment(fooTopicId, 0) ))); - put(memberId2, new MemberAssignment(mkAssignment( + put(memberId2, new MemberAssignmentImpl(mkAssignment( mkTopicAssignment(barTopicId, 0) ))); - put(memberId3, new MemberAssignment(mkAssignment( + put(memberId3, new MemberAssignmentImpl(mkAssignment( mkTopicAssignment(fooTopicId, 1) ))); } @@ -10730,11 +10731,11 @@ public class GroupMetadataManagerTest { assignor.prepareGroupAssignment(new GroupAssignment( new HashMap() { { - put(memberId1, new MemberAssignment(mkAssignment( + put(memberId1, new MemberAssignmentImpl(mkAssignment( mkTopicAssignment(fooTopicId, 0, 1, 2), mkTopicAssignment(barTopicId, 0, 1) ))); - put(memberId2, new MemberAssignment(mkAssignment( + put(memberId2, new MemberAssignmentImpl(mkAssignment( mkTopicAssignment(fooTopicId, 3, 4, 5) ))); } @@ -10970,10 +10971,10 @@ public class GroupMetadataManagerTest { assignor.prepareGroupAssignment(new GroupAssignment( new HashMap() { { - put(memberId, new MemberAssignment(mkAssignment( + put(memberId, new MemberAssignmentImpl(mkAssignment( mkTopicAssignment(fooTopicId, 0) ))); - put(newMemberId, new MemberAssignment(mkAssignment( + put(newMemberId, new MemberAssignmentImpl(mkAssignment( mkTopicAssignment(barTopicId, 0) ))); } @@ -11059,10 +11060,10 @@ public class GroupMetadataManagerTest { assignor.prepareGroupAssignment(new GroupAssignment( new HashMap() { { - put(memberId, new MemberAssignment(mkAssignment( + put(memberId, new MemberAssignmentImpl(mkAssignment( mkTopicAssignment(fooTopicId, 0) ))); - put(newMemberId, new MemberAssignment(mkAssignment( + put(newMemberId, new MemberAssignmentImpl(mkAssignment( mkTopicAssignment(fooTopicId, 1) ))); } @@ -11389,11 +11390,11 @@ public class GroupMetadataManagerTest { assignor.prepareGroupAssignment(new GroupAssignment( new HashMap() { { - put(memberId1, new MemberAssignment(mkAssignment( + put(memberId1, new MemberAssignmentImpl(mkAssignment( mkTopicAssignment(fooTopicId, 0), mkTopicAssignment(zarTopicId, 0) ))); - put(memberId2, new MemberAssignment(mkAssignment( + put(memberId2, new MemberAssignmentImpl(mkAssignment( mkTopicAssignment(barTopicId, 0), mkTopicAssignment(fooTopicId, 1) ))); @@ -11619,11 +11620,11 @@ public class GroupMetadataManagerTest { assignor.prepareGroupAssignment(new GroupAssignment( new HashMap() { { - put(memberId1, new MemberAssignment(mkAssignment( + put(memberId1, new MemberAssignmentImpl(mkAssignment( mkTopicAssignment(fooTopicId, 0, 1), mkTopicAssignment(zarTopicId, 0) ))); - put(memberId2, new MemberAssignment(mkAssignment( + put(memberId2, new MemberAssignmentImpl(mkAssignment( mkTopicAssignment(barTopicId, 0) ))); } @@ -11861,11 +11862,11 @@ public class GroupMetadataManagerTest { assignor.prepareGroupAssignment(new GroupAssignment( new HashMap() { { - put(memberId1, new MemberAssignment(mkAssignment( + put(memberId1, new MemberAssignmentImpl(mkAssignment( mkTopicAssignment(fooTopicId, 0, 1), mkTopicAssignment(zarTopicId, 0) ))); - put(memberId2, new MemberAssignment(mkAssignment( + put(memberId2, new MemberAssignmentImpl(mkAssignment( mkTopicAssignment(barTopicId, 0) ))); } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java index e6268edc342..d352949346f 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java @@ -46,7 +46,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal; import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; -import org.apache.kafka.coordinator.group.assignor.ConsumerGroupPartitionAssignor; +import org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor; import org.apache.kafka.coordinator.group.classic.ClassicGroup; import org.apache.kafka.coordinator.group.consumer.ConsumerGroup; import org.apache.kafka.coordinator.group.consumer.ConsumerGroupBuilder; diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/MockPartitionAssignor.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/MockPartitionAssignor.java index f084bb86e1c..5137101af70 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/MockPartitionAssignor.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/MockPartitionAssignor.java @@ -17,11 +17,11 @@ package org.apache.kafka.coordinator.group; import org.apache.kafka.common.Uuid; -import org.apache.kafka.coordinator.group.assignor.ConsumerGroupPartitionAssignor; -import org.apache.kafka.coordinator.group.assignor.GroupAssignment; -import org.apache.kafka.coordinator.group.assignor.GroupSpec; -import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException; -import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber; +import org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor; +import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.api.assignor.GroupSpec; +import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException; +import org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber; import java.util.Map; import java.util.Objects; @@ -51,6 +51,6 @@ public class MockPartitionAssignor implements ConsumerGroupPartitionAssignor { public Map> targetPartitions(String memberId) { Objects.requireNonNull(prepareGroupAssignment); - return prepareGroupAssignment.members().get(memberId).targetPartitions(); + return prepareGroupAssignment.members().get(memberId).partitions(); } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/NoOpPartitionAssignor.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/NoOpPartitionAssignor.java index 460929506d2..52089595681 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/NoOpPartitionAssignor.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/NoOpPartitionAssignor.java @@ -16,12 +16,12 @@ */ package org.apache.kafka.coordinator.group; -import org.apache.kafka.coordinator.group.assignor.ConsumerGroupPartitionAssignor; -import org.apache.kafka.coordinator.group.assignor.GroupAssignment; -import org.apache.kafka.coordinator.group.assignor.GroupSpec; -import org.apache.kafka.coordinator.group.assignor.MemberAssignment; -import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber; +import org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor; +import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.api.assignor.GroupSpec; +import org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber; +import java.util.function.Function; import java.util.stream.Collectors; public class NoOpPartitionAssignor implements ConsumerGroupPartitionAssignor { @@ -36,9 +36,6 @@ public class NoOpPartitionAssignor implements ConsumerGroupPartitionAssignor { public GroupAssignment assign(GroupSpec groupSpec, SubscribedTopicDescriber subscribedTopicDescriber) { return new GroupAssignment(groupSpec.memberIds() .stream() - .collect(Collectors.toMap( - memberId -> memberId, - memberId -> new MemberAssignment(groupSpec.memberAssignment(memberId)) - ))); + .collect(Collectors.toMap(Function.identity(), groupSpec::memberAssignment))); } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilderTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilderTest.java index c78a3f78a95..d0275f8615a 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilderTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilderTest.java @@ -17,8 +17,13 @@ package org.apache.kafka.coordinator.group.assignor; import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.api.assignor.GroupSpec; +import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException; import org.apache.kafka.coordinator.group.consumer.Assignment; -import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata; +import org.apache.kafka.coordinator.group.consumer.GroupSpecImpl; +import org.apache.kafka.coordinator.group.consumer.MemberSubscriptionAndAssignmentImpl; +import org.apache.kafka.coordinator.group.consumer.SubscribedTopicDescriberImpl; import org.apache.kafka.coordinator.group.consumer.TopicMetadata; import org.junit.jupiter.api.Test; @@ -35,7 +40,7 @@ import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; import static org.apache.kafka.coordinator.group.AssignmentTestUtil.invertedTargetAssignment; import static org.apache.kafka.coordinator.group.CoordinatorRecordHelpersTest.mkMapOfPartitionRacks; -import static org.apache.kafka.coordinator.group.assignor.SubscriptionType.HETEROGENEOUS; +import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HETEROGENEOUS; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -55,7 +60,7 @@ public class GeneralUniformAssignmentBuilderTest { @Test public void testTwoMembersNoTopicSubscription() { - SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata( + SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl( Collections.singletonMap( topic1Uuid, new TopicMetadata( @@ -67,13 +72,13 @@ public class GeneralUniformAssignmentBuilderTest { ) ); - Map members = new TreeMap<>(); - members.put(memberA, new MemberSubscriptionSpecImpl( + Map members = new TreeMap<>(); + members.put(memberA, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), Collections.emptySet(), Assignment.EMPTY )); - members.put(memberB, new MemberSubscriptionSpecImpl( + members.put(memberB, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), Collections.emptySet(), Assignment.EMPTY @@ -95,7 +100,7 @@ public class GeneralUniformAssignmentBuilderTest { @Test public void testTwoMembersSubscribedToNonexistentTopics() { - SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata( + SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl( Collections.singletonMap( topic1Uuid, new TopicMetadata( @@ -107,13 +112,13 @@ public class GeneralUniformAssignmentBuilderTest { ) ); - Map members = new TreeMap<>(); - members.put(memberA, new MemberSubscriptionSpecImpl( + Map members = new TreeMap<>(); + members.put(memberA, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), Collections.singleton(topic3Uuid), Assignment.EMPTY )); - members.put(memberB, new MemberSubscriptionSpecImpl( + members.put(memberB, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), Collections.singleton(topic2Uuid), Assignment.EMPTY @@ -125,7 +130,8 @@ public class GeneralUniformAssignmentBuilderTest { Collections.emptyMap() ); - assertThrows(PartitionAssignorException.class, + assertThrows( + PartitionAssignorException.class, () -> assignor.assign(groupSpec, subscribedTopicMetadata)); } @@ -145,15 +151,15 @@ public class GeneralUniformAssignmentBuilderTest { mkMapOfPartitionRacks(6) )); - Map members = new TreeMap<>(); + Map members = new TreeMap<>(); - members.put(memberA, new MemberSubscriptionSpecImpl( + members.put(memberA, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), mkSet(topic1Uuid, topic3Uuid), Assignment.EMPTY )); - members.put(memberB, new MemberSubscriptionSpecImpl( + members.put(memberB, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), Collections.singleton(topic3Uuid), Assignment.EMPTY @@ -164,7 +170,7 @@ public class GeneralUniformAssignmentBuilderTest { HETEROGENEOUS, Collections.emptyMap() ); - SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); + SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata); GroupAssignment computedAssignment = assignor.assign( groupSpec, @@ -199,21 +205,21 @@ public class GeneralUniformAssignmentBuilderTest { mkMapOfPartitionRacks(2) )); - Map members = new TreeMap<>(); + Map members = new TreeMap<>(); - members.put(memberA, new MemberSubscriptionSpecImpl( + members.put(memberA, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), Collections.singleton(topic3Uuid), Assignment.EMPTY )); - members.put(memberB, new MemberSubscriptionSpecImpl( + members.put(memberB, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), Collections.singleton(topic3Uuid), Assignment.EMPTY )); - members.put(memberC, new MemberSubscriptionSpecImpl( + members.put(memberC, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), Collections.singleton(topic1Uuid), Assignment.EMPTY @@ -224,7 +230,7 @@ public class GeneralUniformAssignmentBuilderTest { HETEROGENEOUS, Collections.emptyMap() ); - SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); + SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata); GroupAssignment computedAssignment = assignor.assign( groupSpec, @@ -268,9 +274,9 @@ public class GeneralUniformAssignmentBuilderTest { mkMapOfPartitionRacks(4) )); - Map members = new TreeMap<>(); + Map members = new TreeMap<>(); - members.put(memberA, new MemberSubscriptionSpecImpl( + members.put(memberA, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), Collections.singleton(topic1Uuid), new Assignment(mkAssignment( @@ -278,7 +284,7 @@ public class GeneralUniformAssignmentBuilderTest { )) )); - members.put(memberB, new MemberSubscriptionSpecImpl( + members.put(memberB, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), mkSet(topic1Uuid, topic2Uuid), new Assignment(mkAssignment( @@ -287,7 +293,7 @@ public class GeneralUniformAssignmentBuilderTest { )) )); - members.put(memberC, new MemberSubscriptionSpecImpl( + members.put(memberC, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), mkSet(topic1Uuid, topic2Uuid, topic3Uuid), new Assignment(mkAssignment( @@ -302,7 +308,7 @@ public class GeneralUniformAssignmentBuilderTest { HETEROGENEOUS, invertedTargetAssignment(members) ); - SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); + SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata); GroupAssignment computedAssignment = assignor.assign( groupSpec, @@ -353,9 +359,9 @@ public class GeneralUniformAssignmentBuilderTest { mkMapOfPartitionRacks(3) )); - Map members = new TreeMap<>(); + Map members = new TreeMap<>(); - members.put(memberA, new MemberSubscriptionSpecImpl( + members.put(memberA, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), mkSet(topic1Uuid, topic3Uuid), new Assignment(mkAssignment( @@ -364,7 +370,7 @@ public class GeneralUniformAssignmentBuilderTest { )) )); - members.put(memberB, new MemberSubscriptionSpecImpl( + members.put(memberB, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), mkSet(topic1Uuid, topic2Uuid, topic3Uuid, topic4Uuid), new Assignment(mkAssignment( @@ -378,7 +384,7 @@ public class GeneralUniformAssignmentBuilderTest { HETEROGENEOUS, invertedTargetAssignment(members) ); - SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); + SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata); GroupAssignment computedAssignment = assignor.assign( groupSpec, @@ -415,9 +421,9 @@ public class GeneralUniformAssignmentBuilderTest { mkMapOfPartitionRacks(7) )); - Map members = new TreeMap<>(); + Map members = new TreeMap<>(); - members.put(memberA, new MemberSubscriptionSpecImpl( + members.put(memberA, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), Collections.singleton(topic1Uuid), new Assignment(mkAssignment( @@ -426,7 +432,7 @@ public class GeneralUniformAssignmentBuilderTest { )) )); - members.put(memberB, new MemberSubscriptionSpecImpl( + members.put(memberB, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), mkSet(topic1Uuid, topic2Uuid), new Assignment(mkAssignment( @@ -436,7 +442,7 @@ public class GeneralUniformAssignmentBuilderTest { )); // Add a new member to trigger a re-assignment. - members.put(memberC, new MemberSubscriptionSpecImpl( + members.put(memberC, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), mkSet(topic1Uuid, topic2Uuid), Assignment.EMPTY @@ -447,7 +453,7 @@ public class GeneralUniformAssignmentBuilderTest { HETEROGENEOUS, invertedTargetAssignment(members) ); - SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); + SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata); GroupAssignment computedAssignment = assignor.assign( groupSpec, @@ -491,9 +497,9 @@ public class GeneralUniformAssignmentBuilderTest { mkMapOfPartitionRacks(3) )); - Map members = new TreeMap<>(); + Map members = new TreeMap<>(); - members.put(memberA, new MemberSubscriptionSpecImpl( + members.put(memberA, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), mkSet(topic1Uuid, topic3Uuid), new Assignment(mkAssignment( @@ -502,7 +508,7 @@ public class GeneralUniformAssignmentBuilderTest { )) )); - members.put(memberB, new MemberSubscriptionSpecImpl( + members.put(memberB, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), Collections.singleton(topic2Uuid), new Assignment(mkAssignment( @@ -517,7 +523,7 @@ public class GeneralUniformAssignmentBuilderTest { HETEROGENEOUS, invertedTargetAssignment(members) ); - SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); + SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata); GroupAssignment computedAssignment = assignor.assign( groupSpec, @@ -553,9 +559,9 @@ public class GeneralUniformAssignmentBuilderTest { )); // Initial subscriptions were [T1, T2] - Map members = new TreeMap<>(); + Map members = new TreeMap<>(); - members.put(memberA, new MemberSubscriptionSpecImpl( + members.put(memberA, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), Collections.singleton(topic1Uuid), new Assignment(mkAssignment( @@ -564,7 +570,7 @@ public class GeneralUniformAssignmentBuilderTest { )) )); - members.put(memberB, new MemberSubscriptionSpecImpl( + members.put(memberB, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), mkSet(topic1Uuid, topic2Uuid), new Assignment(mkAssignment( @@ -578,7 +584,7 @@ public class GeneralUniformAssignmentBuilderTest { HETEROGENEOUS, invertedTargetAssignment(members) ); - SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); + SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata); GroupAssignment computedAssignment = assignor.assign( groupSpec, diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GroupSpecImplTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GroupSpecImplTest.java index db1860c5049..f166998a588 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GroupSpecImplTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GroupSpecImplTest.java @@ -17,7 +17,10 @@ package org.apache.kafka.coordinator.group.assignor; import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.api.assignor.SubscriptionType; import org.apache.kafka.coordinator.group.consumer.Assignment; +import org.apache.kafka.coordinator.group.consumer.GroupSpecImpl; +import org.apache.kafka.coordinator.group.consumer.MemberSubscriptionAndAssignmentImpl; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -36,7 +39,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; public class GroupSpecImplTest { private static final String TEST_MEMBER = "test-member"; - private Map members; + private Map members; private SubscriptionType subscriptionType; private Map> invertedTargetAssignment; private GroupSpecImpl groupSpec; @@ -49,7 +52,7 @@ public class GroupSpecImplTest { invertedTargetAssignment = new HashMap<>(); topicId = Uuid.randomUuid(); - members.put(TEST_MEMBER, new MemberSubscriptionSpecImpl( + members.put(TEST_MEMBER, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), mkSet(topicId), Assignment.EMPTY @@ -96,13 +99,13 @@ public class GroupSpecImplTest { topicId, mkSet(0, 1) ); - members.put(TEST_MEMBER, new MemberSubscriptionSpecImpl( + members.put(TEST_MEMBER, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), mkSet(topicId), new Assignment(topicPartitions) )); - assertEquals(topicPartitions, groupSpec.memberAssignment(TEST_MEMBER)); - assertEquals(Collections.emptyMap(), groupSpec.memberAssignment("unknown-member")); + assertEquals(topicPartitions, groupSpec.memberAssignment(TEST_MEMBER).partitions()); + assertEquals(Collections.emptyMap(), groupSpec.memberAssignment("unknown-member").partitions()); } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java index 878e142cb81..823eb94f8a3 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java @@ -17,8 +17,13 @@ package org.apache.kafka.coordinator.group.assignor; import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.api.assignor.GroupSpec; +import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException; import org.apache.kafka.coordinator.group.consumer.Assignment; -import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata; +import org.apache.kafka.coordinator.group.consumer.GroupSpecImpl; +import org.apache.kafka.coordinator.group.consumer.MemberSubscriptionAndAssignmentImpl; +import org.apache.kafka.coordinator.group.consumer.SubscribedTopicDescriberImpl; import org.apache.kafka.coordinator.group.consumer.TopicMetadata; import org.junit.jupiter.api.Test; @@ -39,7 +44,7 @@ import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkOrderedAss import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; import static org.apache.kafka.coordinator.group.AssignmentTestUtil.invertedTargetAssignment; import static org.apache.kafka.coordinator.group.CoordinatorRecordHelpersTest.mkMapOfPartitionRacks; -import static org.apache.kafka.coordinator.group.assignor.SubscriptionType.HOMOGENEOUS; +import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -58,7 +63,7 @@ public class OptimizedUniformAssignmentBuilderTest { @Test public void testOneMemberNoTopicSubscription() { - SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata( + SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl( Collections.singletonMap( topic1Uuid, new TopicMetadata( @@ -70,9 +75,9 @@ public class OptimizedUniformAssignmentBuilderTest { ) ); - Map members = Collections.singletonMap( + Map members = Collections.singletonMap( memberA, - new MemberSubscriptionSpecImpl( + new MemberSubscriptionAndAssignmentImpl( Optional.empty(), Collections.emptySet(), Assignment.EMPTY @@ -95,7 +100,7 @@ public class OptimizedUniformAssignmentBuilderTest { @Test public void testOneMemberSubscribedToNonexistentTopic() { - SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata( + SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl( Collections.singletonMap( topic1Uuid, new TopicMetadata( @@ -107,9 +112,9 @@ public class OptimizedUniformAssignmentBuilderTest { ) ); - Map members = Collections.singletonMap( + Map members = Collections.singletonMap( memberA, - new MemberSubscriptionSpecImpl( + new MemberSubscriptionAndAssignmentImpl( Optional.empty(), Collections.singleton(topic2Uuid), Assignment.EMPTY @@ -142,15 +147,15 @@ public class OptimizedUniformAssignmentBuilderTest { mkMapOfPartitionRacks(2) )); - Map members = new TreeMap<>(); + Map members = new TreeMap<>(); - members.put(memberA, new MemberSubscriptionSpecImpl( + members.put(memberA, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), mkSet(topic1Uuid, topic3Uuid), Assignment.EMPTY )); - members.put(memberB, new MemberSubscriptionSpecImpl( + members.put(memberB, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), mkSet(topic1Uuid, topic3Uuid), Assignment.EMPTY @@ -170,7 +175,7 @@ public class OptimizedUniformAssignmentBuilderTest { HOMOGENEOUS, Collections.emptyMap() ); - SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); + SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata); GroupAssignment computedAssignment = assignor.assign( groupSpec, @@ -191,21 +196,21 @@ public class OptimizedUniformAssignmentBuilderTest { mkMapOfPartitionRacks(2) )); - Map members = new TreeMap<>(); + Map members = new TreeMap<>(); - members.put(memberA, new MemberSubscriptionSpecImpl( + members.put(memberA, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), Collections.singleton(topic3Uuid), Assignment.EMPTY )); - members.put(memberB, new MemberSubscriptionSpecImpl( + members.put(memberB, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), Collections.singleton(topic3Uuid), Assignment.EMPTY )); - members.put(memberC, new MemberSubscriptionSpecImpl( + members.put(memberC, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), Collections.singleton(topic3Uuid), Assignment.EMPTY @@ -228,7 +233,7 @@ public class OptimizedUniformAssignmentBuilderTest { HOMOGENEOUS, Collections.emptyMap() ); - SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); + SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata); GroupAssignment computedAssignment = assignor.assign( groupSpec, @@ -252,9 +257,9 @@ public class OptimizedUniformAssignmentBuilderTest { )); } - Map members = new TreeMap<>(); + Map members = new TreeMap<>(); for (int i = 1; i < 50; i++) { - members.put("member" + i, new MemberSubscriptionSpecImpl( + members.put("member" + i, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), topicMetadata.keySet(), Assignment.EMPTY @@ -266,7 +271,7 @@ public class OptimizedUniformAssignmentBuilderTest { HOMOGENEOUS, Collections.emptyMap() ); - SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); + SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata); GroupAssignment computedAssignment = assignor.assign( groupSpec, @@ -292,9 +297,9 @@ public class OptimizedUniformAssignmentBuilderTest { mkMapOfPartitionRacks(3) )); - Map members = new TreeMap<>(); + Map members = new TreeMap<>(); - members.put(memberA, new MemberSubscriptionSpecImpl( + members.put(memberA, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), mkSet(topic1Uuid, topic2Uuid), new Assignment(mkOrderedAssignment( @@ -303,7 +308,7 @@ public class OptimizedUniformAssignmentBuilderTest { )) )); - members.put(memberB, new MemberSubscriptionSpecImpl( + members.put(memberB, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), mkSet(topic1Uuid, topic2Uuid), new Assignment(mkOrderedAssignment( @@ -327,7 +332,7 @@ public class OptimizedUniformAssignmentBuilderTest { HOMOGENEOUS, invertedTargetAssignment(members) ); - SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); + SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata); GroupAssignment computedAssignment = assignor.assign( groupSpec, @@ -355,9 +360,9 @@ public class OptimizedUniformAssignmentBuilderTest { mkMapOfPartitionRacks(5) )); - Map members = new TreeMap<>(); + Map members = new TreeMap<>(); - members.put(memberA, new MemberSubscriptionSpecImpl( + members.put(memberA, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), mkSet(topic1Uuid, topic2Uuid), new Assignment(mkOrderedAssignment( @@ -366,7 +371,7 @@ public class OptimizedUniformAssignmentBuilderTest { )) )); - members.put(memberB, new MemberSubscriptionSpecImpl( + members.put(memberB, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), mkSet(topic1Uuid, topic2Uuid), new Assignment(mkOrderedAssignment( @@ -390,7 +395,7 @@ public class OptimizedUniformAssignmentBuilderTest { HOMOGENEOUS, invertedTargetAssignment(members) ); - SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); + SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata); GroupAssignment computedAssignment = assignor.assign( groupSpec, @@ -417,9 +422,9 @@ public class OptimizedUniformAssignmentBuilderTest { mkMapOfPartitionRacks(3) )); - Map members = new TreeMap<>(); + Map members = new TreeMap<>(); - members.put(memberA, new MemberSubscriptionSpecImpl( + members.put(memberA, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), mkSet(topic1Uuid, topic2Uuid), new Assignment(mkOrderedAssignment( @@ -428,7 +433,7 @@ public class OptimizedUniformAssignmentBuilderTest { )) )); - members.put(memberB, new MemberSubscriptionSpecImpl( + members.put(memberB, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), mkSet(topic1Uuid, topic2Uuid), new Assignment(mkOrderedAssignment( @@ -438,7 +443,7 @@ public class OptimizedUniformAssignmentBuilderTest { )); // Add a new member to trigger a re-assignment. - members.put(memberC, new MemberSubscriptionSpecImpl( + members.put(memberC, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), mkSet(topic1Uuid, topic2Uuid), Assignment.EMPTY @@ -461,7 +466,7 @@ public class OptimizedUniformAssignmentBuilderTest { HOMOGENEOUS, invertedTargetAssignment(members) ); - SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); + SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata); GroupAssignment computedAssignment = assignor.assign( groupSpec, @@ -488,9 +493,9 @@ public class OptimizedUniformAssignmentBuilderTest { mkMapOfPartitionRacks(3) )); - Map members = new TreeMap<>(); + Map members = new TreeMap<>(); - members.put(memberA, new MemberSubscriptionSpecImpl( + members.put(memberA, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), mkSet(topic1Uuid, topic2Uuid), new Assignment(mkAssignment( @@ -499,7 +504,7 @@ public class OptimizedUniformAssignmentBuilderTest { )) )); - members.put(memberB, new MemberSubscriptionSpecImpl( + members.put(memberB, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), mkSet(topic1Uuid, topic2Uuid), new Assignment(mkAssignment( @@ -525,7 +530,7 @@ public class OptimizedUniformAssignmentBuilderTest { HOMOGENEOUS, invertedTargetAssignment(members) ); - SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); + SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata); GroupAssignment computedAssignment = assignor.assign( groupSpec, @@ -553,9 +558,9 @@ public class OptimizedUniformAssignmentBuilderTest { )); // Initial subscriptions were [T1, T2] - Map members = new TreeMap<>(); + Map members = new TreeMap<>(); - members.put(memberA, new MemberSubscriptionSpecImpl( + members.put(memberA, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), Collections.singleton(topic2Uuid), new Assignment(mkAssignment( @@ -564,7 +569,7 @@ public class OptimizedUniformAssignmentBuilderTest { )) )); - members.put(memberB, new MemberSubscriptionSpecImpl( + members.put(memberB, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), Collections.singleton(topic2Uuid), new Assignment(mkAssignment( @@ -586,7 +591,7 @@ public class OptimizedUniformAssignmentBuilderTest { HOMOGENEOUS, invertedTargetAssignment(members) ); - SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); + SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata); GroupAssignment computedAssignment = assignor.assign( groupSpec, @@ -610,7 +615,7 @@ public class OptimizedUniformAssignmentBuilderTest { * @param computedGroupAssignment Assignment computed by the uniform assignor. */ private void checkValidityAndBalance( - Map memberSubscriptionSpec, + Map memberSubscriptionSpec, GroupAssignment computedGroupAssignment ) { List membersList = new ArrayList<>(computedGroupAssignment.members().keySet()); @@ -618,7 +623,7 @@ public class OptimizedUniformAssignmentBuilderTest { List totalAssignmentSizesOfAllMembers = new ArrayList<>(membersList.size()); membersList.forEach(member -> { Map> computedAssignmentForMember = computedGroupAssignment - .members().get(member).targetPartitions(); + .members().get(member).partitions(); int sum = computedAssignmentForMember.values().stream().mapToInt(Set::size).sum(); totalAssignmentSizesOfAllMembers.add(sum); }); @@ -626,7 +631,7 @@ public class OptimizedUniformAssignmentBuilderTest { for (int i = 0; i < numMembers; i++) { String memberId = membersList.get(i); Map> computedAssignmentForMember = - computedGroupAssignment.members().get(memberId).targetPartitions(); + computedGroupAssignment.members().get(memberId).partitions(); // Each member is subscribed to topics of all the partitions assigned to it. computedAssignmentForMember.keySet().forEach(topicId -> { // Check if the topic exists in the subscription. @@ -638,7 +643,7 @@ public class OptimizedUniformAssignmentBuilderTest { for (int j = i + 1; j < numMembers; j++) { String otherMemberId = membersList.get(j); Map> computedAssignmentForOtherMember = computedGroupAssignment - .members().get(otherMemberId).targetPartitions(); + .members().get(otherMemberId).partitions(); // Each partition should be assigned to at most one member computedAssignmentForMember.keySet().forEach(topicId -> { Set intersection = new HashSet<>(); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java index 20b97490c2b..13b31c97016 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java @@ -17,8 +17,13 @@ package org.apache.kafka.coordinator.group.assignor; import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.api.assignor.GroupSpec; +import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException; import org.apache.kafka.coordinator.group.consumer.Assignment; -import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata; +import org.apache.kafka.coordinator.group.consumer.GroupSpecImpl; +import org.apache.kafka.coordinator.group.consumer.MemberSubscriptionAndAssignmentImpl; +import org.apache.kafka.coordinator.group.consumer.SubscribedTopicDescriberImpl; import org.apache.kafka.coordinator.group.consumer.TopicMetadata; import org.junit.jupiter.api.Test; @@ -33,8 +38,8 @@ import static org.apache.kafka.common.utils.Utils.mkSet; import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment; import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; import static org.apache.kafka.coordinator.group.AssignmentTestUtil.invertedTargetAssignment; -import static org.apache.kafka.coordinator.group.assignor.SubscriptionType.HETEROGENEOUS; -import static org.apache.kafka.coordinator.group.assignor.SubscriptionType.HOMOGENEOUS; +import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HETEROGENEOUS; +import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -53,7 +58,7 @@ public class RangeAssignorTest { @Test public void testOneConsumerNoTopic() { - SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata( + SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl( Collections.singletonMap( topic1Uuid, new TopicMetadata( @@ -65,9 +70,9 @@ public class RangeAssignorTest { ) ); - Map members = Collections.singletonMap( + Map members = Collections.singletonMap( memberA, - new MemberSubscriptionSpecImpl( + new MemberSubscriptionAndAssignmentImpl( Optional.empty(), Collections.emptySet(), Assignment.EMPTY @@ -90,7 +95,7 @@ public class RangeAssignorTest { @Test public void testOneConsumerSubscribedToNonExistentTopic() { - SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata( + SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl( Collections.singletonMap( topic1Uuid, new TopicMetadata( @@ -102,9 +107,9 @@ public class RangeAssignorTest { ) ); - Map members = Collections.singletonMap( + Map members = Collections.singletonMap( memberA, - new MemberSubscriptionSpecImpl( + new MemberSubscriptionAndAssignmentImpl( Optional.empty(), mkSet(topic2Uuid), Assignment.EMPTY @@ -137,15 +142,15 @@ public class RangeAssignorTest { Collections.emptyMap() )); - Map members = new TreeMap<>(); + Map members = new TreeMap<>(); - members.put(memberA, new MemberSubscriptionSpecImpl( + members.put(memberA, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), mkSet(topic1Uuid, topic3Uuid), Assignment.EMPTY )); - members.put(memberB, new MemberSubscriptionSpecImpl( + members.put(memberB, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), mkSet(topic1Uuid, topic3Uuid), Assignment.EMPTY @@ -156,7 +161,7 @@ public class RangeAssignorTest { HOMOGENEOUS, invertedTargetAssignment(members) ); - SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); + SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata); GroupAssignment computedAssignment = assignor.assign( groupSpec, @@ -198,21 +203,21 @@ public class RangeAssignorTest { Collections.emptyMap() )); - Map members = new TreeMap<>(); + Map members = new TreeMap<>(); - members.put(memberA, new MemberSubscriptionSpecImpl( + members.put(memberA, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), mkSet(topic1Uuid, topic2Uuid), Assignment.EMPTY )); - members.put(memberB, new MemberSubscriptionSpecImpl( + members.put(memberB, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), mkSet(topic3Uuid), Assignment.EMPTY )); - members.put(memberC, new MemberSubscriptionSpecImpl( + members.put(memberC, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), mkSet(topic2Uuid, topic3Uuid), Assignment.EMPTY @@ -223,7 +228,7 @@ public class RangeAssignorTest { HETEROGENEOUS, invertedTargetAssignment(members) ); - SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); + SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata); GroupAssignment computedAssignment = assignor.assign( groupSpec, @@ -262,21 +267,21 @@ public class RangeAssignorTest { Collections.emptyMap() )); - Map members = new TreeMap<>(); + Map members = new TreeMap<>(); - members.put(memberA, new MemberSubscriptionSpecImpl( + members.put(memberA, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), mkSet(topic1Uuid, topic3Uuid), Assignment.EMPTY )); - members.put(memberB, new MemberSubscriptionSpecImpl( + members.put(memberB, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), mkSet(topic1Uuid, topic3Uuid), Assignment.EMPTY )); - members.put(memberC, new MemberSubscriptionSpecImpl( + members.put(memberC, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), mkSet(topic1Uuid, topic3Uuid), Assignment.EMPTY @@ -287,7 +292,7 @@ public class RangeAssignorTest { HOMOGENEOUS, invertedTargetAssignment(members) ); - SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); + SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata); GroupAssignment computedAssignment = assignor.assign( groupSpec, @@ -327,9 +332,9 @@ public class RangeAssignorTest { Collections.emptyMap() )); - Map members = new TreeMap<>(); + Map members = new TreeMap<>(); - members.put(memberA, new MemberSubscriptionSpecImpl( + members.put(memberA, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), mkSet(topic1Uuid, topic2Uuid), new Assignment(mkAssignment( @@ -338,7 +343,7 @@ public class RangeAssignorTest { )) )); - members.put(memberB, new MemberSubscriptionSpecImpl( + members.put(memberB, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), mkSet(topic1Uuid, topic2Uuid), new Assignment(mkAssignment( @@ -348,7 +353,7 @@ public class RangeAssignorTest { )); // Add a new consumer to trigger a re-assignment - members.put(memberC, new MemberSubscriptionSpecImpl( + members.put(memberC, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), mkSet(topic1Uuid, topic2Uuid), Assignment.EMPTY @@ -359,7 +364,7 @@ public class RangeAssignorTest { HOMOGENEOUS, invertedTargetAssignment(members) ); - SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); + SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata); GroupAssignment computedAssignment = assignor.assign( groupSpec, @@ -398,9 +403,9 @@ public class RangeAssignorTest { Collections.emptyMap() )); - Map members = new TreeMap<>(); + Map members = new TreeMap<>(); - members.put(memberA, new MemberSubscriptionSpecImpl( + members.put(memberA, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), mkSet(topic1Uuid, topic2Uuid), new Assignment(mkAssignment( @@ -409,7 +414,7 @@ public class RangeAssignorTest { )) )); - members.put(memberB, new MemberSubscriptionSpecImpl( + members.put(memberB, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), mkSet(topic1Uuid, topic2Uuid), new Assignment(mkAssignment( @@ -423,7 +428,7 @@ public class RangeAssignorTest { HOMOGENEOUS, invertedTargetAssignment(members) ); - SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); + SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata); GroupAssignment computedAssignment = assignor.assign( groupSpec, @@ -459,9 +464,9 @@ public class RangeAssignorTest { Collections.emptyMap() )); - Map members = new TreeMap<>(); + Map members = new TreeMap<>(); - members.put(memberA, new MemberSubscriptionSpecImpl( + members.put(memberA, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), mkSet(topic1Uuid, topic2Uuid), new Assignment(mkAssignment( @@ -470,7 +475,7 @@ public class RangeAssignorTest { )) )); - members.put(memberB, new MemberSubscriptionSpecImpl( + members.put(memberB, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), mkSet(topic1Uuid, topic2Uuid), new Assignment(mkAssignment( @@ -480,7 +485,7 @@ public class RangeAssignorTest { )); // Add a new consumer to trigger a re-assignment - members.put(memberC, new MemberSubscriptionSpecImpl( + members.put(memberC, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), mkSet(topic1Uuid, topic2Uuid), Assignment.EMPTY @@ -491,7 +496,7 @@ public class RangeAssignorTest { HOMOGENEOUS, invertedTargetAssignment(members) ); - SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); + SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata); GroupAssignment computedAssignment = assignor.assign( groupSpec, @@ -532,9 +537,9 @@ public class RangeAssignorTest { Collections.emptyMap() )); - Map members = new TreeMap<>(); + Map members = new TreeMap<>(); - members.put(memberA, new MemberSubscriptionSpecImpl( + members.put(memberA, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), mkSet(topic1Uuid, topic2Uuid), new Assignment(mkAssignment( @@ -543,7 +548,7 @@ public class RangeAssignorTest { )) )); - members.put(memberB, new MemberSubscriptionSpecImpl( + members.put(memberB, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), mkSet(topic1Uuid, topic2Uuid), new Assignment(mkAssignment( @@ -553,7 +558,7 @@ public class RangeAssignorTest { )); // Add a new consumer to trigger a re-assignment - members.put(memberC, new MemberSubscriptionSpecImpl( + members.put(memberC, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), mkSet(topic1Uuid), Assignment.EMPTY @@ -564,7 +569,7 @@ public class RangeAssignorTest { HETEROGENEOUS, invertedTargetAssignment(members) ); - SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); + SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata); GroupAssignment computedAssignment = assignor.assign( groupSpec, @@ -603,11 +608,11 @@ public class RangeAssignorTest { Collections.emptyMap() )); - Map members = new TreeMap<>(); + Map members = new TreeMap<>(); // Consumer A was removed - members.put(memberB, new MemberSubscriptionSpecImpl( + members.put(memberB, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), mkSet(topic1Uuid, topic2Uuid), new Assignment(mkAssignment( @@ -621,7 +626,7 @@ public class RangeAssignorTest { HOMOGENEOUS, invertedTargetAssignment(members) ); - SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); + SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata); GroupAssignment computedAssignment = assignor.assign( groupSpec, @@ -661,9 +666,9 @@ public class RangeAssignorTest { // Let initial subscriptions be A -> T1, T2 // B -> T2 // C -> T2, T3 // Change the subscriptions to A -> T1 // B -> T1, T2, T3 // C -> T2 - Map members = new TreeMap<>(); + Map members = new TreeMap<>(); - members.put(memberA, new MemberSubscriptionSpecImpl( + members.put(memberA, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), mkSet(topic1Uuid), new Assignment(mkAssignment( @@ -672,7 +677,7 @@ public class RangeAssignorTest { )) )); - members.put(memberB, new MemberSubscriptionSpecImpl( + members.put(memberB, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), mkSet(topic1Uuid, topic2Uuid, topic3Uuid), new Assignment(mkAssignment( @@ -680,7 +685,7 @@ public class RangeAssignorTest { )) )); - members.put(memberC, new MemberSubscriptionSpecImpl( + members.put(memberC, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), mkSet(topic2Uuid), new Assignment(mkAssignment( @@ -694,7 +699,7 @@ public class RangeAssignorTest { HETEROGENEOUS, invertedTargetAssignment(members) ); - SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); + SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata); GroupAssignment computedAssignment = assignor.assign( groupSpec, @@ -723,7 +728,7 @@ public class RangeAssignorTest { ) { assertEquals(expectedAssignment.size(), computedGroupAssignment.members().size()); for (String memberId : computedGroupAssignment.members().keySet()) { - Map> computedAssignmentForMember = computedGroupAssignment.members().get(memberId).targetPartitions(); + Map> computedAssignmentForMember = computedGroupAssignment.members().get(memberId).partitions(); assertEquals(expectedAssignment.get(memberId), computedAssignmentForMember); } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java index a67a5b098b9..0adeb0d289c 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java @@ -55,8 +55,8 @@ import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment; import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; import static org.apache.kafka.coordinator.group.CoordinatorRecordHelpersTest.mkMapOfPartitionRacks; -import static org.apache.kafka.coordinator.group.assignor.SubscriptionType.HETEROGENEOUS; -import static org.apache.kafka.coordinator.group.assignor.SubscriptionType.HOMOGENEOUS; +import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HETEROGENEOUS; +import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/SubscribedTopicMetadataTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/SubscribedTopicMetadataTest.java index 956e446d9e1..0efad07488e 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/SubscribedTopicMetadataTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/SubscribedTopicMetadataTest.java @@ -33,7 +33,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; public class SubscribedTopicMetadataTest { private Map topicMetadataMap; - private SubscribedTopicMetadata subscribedTopicMetadata; + private SubscribedTopicDescriberImpl subscribedTopicMetadata; @BeforeEach public void setUp() { @@ -47,7 +47,7 @@ public class SubscribedTopicMetadataTest { new TopicMetadata(topicId, topicName, 5, partitionRacks) ); } - subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadataMap); + subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadataMap); } @Test @@ -57,7 +57,7 @@ public class SubscribedTopicMetadataTest { @Test public void testTopicMetadataCannotBeNull() { - assertThrows(NullPointerException.class, () -> new SubscribedTopicMetadata(null)); + assertThrows(NullPointerException.class, () -> new SubscribedTopicDescriberImpl(null)); } @Test @@ -100,11 +100,11 @@ public class SubscribedTopicMetadataTest { @Test public void testEquals() { - assertEquals(new SubscribedTopicMetadata(topicMetadataMap), subscribedTopicMetadata); + assertEquals(new SubscribedTopicDescriberImpl(topicMetadataMap), subscribedTopicMetadata); Map topicMetadataMap2 = new HashMap<>(); Uuid topicId = Uuid.randomUuid(); topicMetadataMap2.put(topicId, new TopicMetadata(topicId, "newTopic", 5, Collections.emptyMap())); - assertNotEquals(new SubscribedTopicMetadata(topicMetadataMap2), subscribedTopicMetadata); + assertNotEquals(new SubscribedTopicDescriberImpl(topicMetadataMap2), subscribedTopicMetadata); } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java index 5c7557cf5ec..5bc049bc2a9 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java @@ -19,13 +19,11 @@ package org.apache.kafka.coordinator.group.consumer; import org.apache.kafka.common.Uuid; import org.apache.kafka.coordinator.group.AssignmentTestUtil; import org.apache.kafka.coordinator.group.MetadataImageBuilder; -import org.apache.kafka.coordinator.group.assignor.GroupSpecImpl; -import org.apache.kafka.coordinator.group.assignor.MemberSubscriptionSpec; -import org.apache.kafka.coordinator.group.assignor.MemberSubscriptionSpecImpl; -import org.apache.kafka.coordinator.group.assignor.SubscriptionType; -import org.apache.kafka.coordinator.group.assignor.GroupAssignment; -import org.apache.kafka.coordinator.group.assignor.MemberAssignment; -import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.api.assignor.MemberSubscription; +import org.apache.kafka.coordinator.group.api.assignor.SubscriptionType; +import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignor; import org.apache.kafka.image.TopicsImage; import org.junit.jupiter.api.Test; @@ -44,8 +42,8 @@ import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssig import static org.apache.kafka.coordinator.group.CoordinatorRecordHelpers.newTargetAssignmentEpochRecord; import static org.apache.kafka.coordinator.group.CoordinatorRecordHelpers.newTargetAssignmentRecord; import static org.apache.kafka.coordinator.group.CoordinatorRecordHelpersTest.mkMapOfPartitionRacks; -import static org.apache.kafka.coordinator.group.assignor.SubscriptionType.HOMOGENEOUS; -import static org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder.createMemberSubscriptionSpecImpl; +import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS; +import static org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder.createMemberSubscriptionAndAssignment; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; @@ -159,17 +157,17 @@ public class TargetAssignmentBuilderTest { String memberId, Map> assignment ) { - memberAssignments.put(memberId, new MemberAssignment(assignment)); + memberAssignments.put(memberId, new MemberAssignmentImpl(assignment)); } public TargetAssignmentBuilder.TargetAssignmentResult build() { TopicsImage topicsImage = topicsImageBuilder.build().topics(); // Prepare expected member specs. - Map memberSubscriptions = new HashMap<>(); + Map memberSubscriptions = new HashMap<>(); // All the existing members are prepared. members.forEach((memberId, member) -> - memberSubscriptions.put(memberId, createMemberSubscriptionSpecImpl( + memberSubscriptions.put(memberId, createMemberSubscriptionAndAssignment( member, targetAssignment.getOrDefault(memberId, Assignment.EMPTY), topicsImage @@ -192,7 +190,7 @@ public class TargetAssignmentBuilderTest { } } - memberSubscriptions.put(memberId, createMemberSubscriptionSpecImpl( + memberSubscriptions.put(memberId, createMemberSubscriptionAndAssignment( updatedMemberOrNull, assignment, topicsImage @@ -206,7 +204,7 @@ public class TargetAssignmentBuilderTest { topicMetadataMap.put(topicMetadata.id(), topicMetadata)); // Prepare the expected subscription topic metadata. - SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadataMap); + SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadataMap); SubscriptionType subscriptionType = HOMOGENEOUS; // Prepare the member assignments per topic partition. @@ -277,13 +275,13 @@ public class TargetAssignmentBuilderTest { mkTopicAssignment(barTopicId, 1, 2, 3) )); - MemberSubscriptionSpec subscriptionSpec = createMemberSubscriptionSpecImpl( + MemberSubscription subscriptionSpec = createMemberSubscriptionAndAssignment( member, assignment, topicsImage ); - assertEquals(new MemberSubscriptionSpecImpl( + assertEquals(new MemberSubscriptionAndAssignmentImpl( Optional.of("rackId"), new TopicIds(mkSet("bar", "foo", "zar"), topicsImage), assignment @@ -343,11 +341,11 @@ public class TargetAssignmentBuilderTest { )), result.records()); Map expectedAssignment = new HashMap<>(); - expectedAssignment.put("member-1", new MemberAssignment(mkAssignment( + expectedAssignment.put("member-1", new MemberAssignmentImpl(mkAssignment( mkTopicAssignment(fooTopicId, 1, 2, 3), mkTopicAssignment(barTopicId, 1, 2, 3) ))); - expectedAssignment.put("member-2", new MemberAssignment(mkAssignment( + expectedAssignment.put("member-2", new MemberAssignmentImpl(mkAssignment( mkTopicAssignment(fooTopicId, 4, 5, 6), mkTopicAssignment(barTopicId, 4, 5, 6) ))); @@ -406,11 +404,11 @@ public class TargetAssignmentBuilderTest { ), result.records().get(2)); Map expectedAssignment = new HashMap<>(); - expectedAssignment.put("member-2", new MemberAssignment(mkAssignment( + expectedAssignment.put("member-2", new MemberAssignmentImpl(mkAssignment( mkTopicAssignment(fooTopicId, 1, 2, 3), mkTopicAssignment(barTopicId, 1, 2, 3) ))); - expectedAssignment.put("member-1", new MemberAssignment(mkAssignment( + expectedAssignment.put("member-1", new MemberAssignmentImpl(mkAssignment( mkTopicAssignment(fooTopicId, 4, 5, 6), mkTopicAssignment(barTopicId, 4, 5, 6) ))); @@ -480,15 +478,15 @@ public class TargetAssignmentBuilderTest { ), result.records().get(3)); Map expectedAssignment = new HashMap<>(); - expectedAssignment.put("member-1", new MemberAssignment(mkAssignment( + expectedAssignment.put("member-1", new MemberAssignmentImpl(mkAssignment( mkTopicAssignment(fooTopicId, 1, 2), mkTopicAssignment(barTopicId, 1, 2) ))); - expectedAssignment.put("member-2", new MemberAssignment(mkAssignment( + expectedAssignment.put("member-2", new MemberAssignmentImpl(mkAssignment( mkTopicAssignment(fooTopicId, 3, 4), mkTopicAssignment(barTopicId, 3, 4) ))); - expectedAssignment.put("member-3", new MemberAssignment(mkAssignment( + expectedAssignment.put("member-3", new MemberAssignmentImpl(mkAssignment( mkTopicAssignment(fooTopicId, 5, 6), mkTopicAssignment(barTopicId, 5, 6) ))); @@ -567,15 +565,15 @@ public class TargetAssignmentBuilderTest { ), result.records().get(3)); Map expectedAssignment = new HashMap<>(); - expectedAssignment.put("member-1", new MemberAssignment(mkAssignment( + expectedAssignment.put("member-1", new MemberAssignmentImpl(mkAssignment( mkTopicAssignment(fooTopicId, 1, 2), mkTopicAssignment(barTopicId, 1, 2) ))); - expectedAssignment.put("member-2", new MemberAssignment(mkAssignment( + expectedAssignment.put("member-2", new MemberAssignmentImpl(mkAssignment( mkTopicAssignment(fooTopicId, 3, 4), mkTopicAssignment(barTopicId, 3, 4) ))); - expectedAssignment.put("member-3", new MemberAssignment(mkAssignment( + expectedAssignment.put("member-3", new MemberAssignmentImpl(mkAssignment( mkTopicAssignment(fooTopicId, 5, 6), mkTopicAssignment(barTopicId, 5, 6) ))); @@ -645,15 +643,15 @@ public class TargetAssignmentBuilderTest { ), result.records().get(2)); Map expectedAssignment = new HashMap<>(); - expectedAssignment.put("member-1", new MemberAssignment(mkAssignment( + expectedAssignment.put("member-1", new MemberAssignmentImpl(mkAssignment( mkTopicAssignment(fooTopicId, 1, 2), mkTopicAssignment(barTopicId, 1, 2) ))); - expectedAssignment.put("member-2", new MemberAssignment(mkAssignment( + expectedAssignment.put("member-2", new MemberAssignmentImpl(mkAssignment( mkTopicAssignment(fooTopicId, 3, 4, 5), mkTopicAssignment(barTopicId, 3, 4, 5) ))); - expectedAssignment.put("member-3", new MemberAssignment(mkAssignment( + expectedAssignment.put("member-3", new MemberAssignmentImpl(mkAssignment( mkTopicAssignment(fooTopicId, 6), mkTopicAssignment(barTopicId, 6) ))); @@ -719,11 +717,11 @@ public class TargetAssignmentBuilderTest { ), result.records().get(2)); Map expectedAssignment = new HashMap<>(); - expectedAssignment.put("member-1", new MemberAssignment(mkAssignment( + expectedAssignment.put("member-1", new MemberAssignmentImpl(mkAssignment( mkTopicAssignment(fooTopicId, 1, 2, 3), mkTopicAssignment(barTopicId, 1, 2, 3) ))); - expectedAssignment.put("member-2", new MemberAssignment(mkAssignment( + expectedAssignment.put("member-2", new MemberAssignmentImpl(mkAssignment( mkTopicAssignment(fooTopicId, 4, 5, 6), mkTopicAssignment(barTopicId, 4, 5, 6) ))); @@ -794,16 +792,16 @@ public class TargetAssignmentBuilderTest { ), result.records().get(1)); Map expectedAssignment = new HashMap<>(); - expectedAssignment.put("member-1", new MemberAssignment(mkAssignment( + expectedAssignment.put("member-1", new MemberAssignmentImpl(mkAssignment( mkTopicAssignment(fooTopicId, 1, 2), mkTopicAssignment(barTopicId, 1, 2) ))); - expectedAssignment.put("member-2", new MemberAssignment(mkAssignment( + expectedAssignment.put("member-2", new MemberAssignmentImpl(mkAssignment( mkTopicAssignment(fooTopicId, 3, 4), mkTopicAssignment(barTopicId, 3, 4) ))); - expectedAssignment.put("member-3-a", new MemberAssignment(mkAssignment( + expectedAssignment.put("member-3-a", new MemberAssignmentImpl(mkAssignment( mkTopicAssignment(fooTopicId, 5, 6), mkTopicAssignment(barTopicId, 5, 6) ))); diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/AssignorBenchmarkUtils.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/AssignorBenchmarkUtils.java index 2d2edf222f9..4bfcd727852 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/AssignorBenchmarkUtils.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/AssignorBenchmarkUtils.java @@ -19,8 +19,8 @@ package org.apache.kafka.jmh.assignor; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.metadata.PartitionRecord; import org.apache.kafka.common.metadata.TopicRecord; -import org.apache.kafka.coordinator.group.assignor.GroupAssignment; -import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment; import org.apache.kafka.image.MetadataDelta; import java.util.Arrays; @@ -41,7 +41,7 @@ public class AssignorBenchmarkUtils { Map> invertedTargetAssignment = new HashMap<>(); for (Map.Entry memberEntry : groupAssignment.members().entrySet()) { String memberId = memberEntry.getKey(); - Map> topicsAndPartitions = memberEntry.getValue().targetPartitions(); + Map> topicsAndPartitions = memberEntry.getValue().partitions(); for (Map.Entry> topicEntry : topicsAndPartitions.entrySet()) { Uuid topicId = topicEntry.getKey(); diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ClientSideAssignorBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ClientSideAssignorBenchmark.java index f4c81fccb41..636f2a9e8a7 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ClientSideAssignorBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ClientSideAssignorBenchmark.java @@ -23,7 +23,7 @@ import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.coordinator.group.assignor.SubscriptionType; +import org.apache.kafka.coordinator.group.api.assignor.SubscriptionType; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Fork; @@ -47,8 +47,8 @@ import java.util.Optional; import java.util.concurrent.TimeUnit; import static org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.DEFAULT_GENERATION; -import static org.apache.kafka.coordinator.group.assignor.SubscriptionType.HETEROGENEOUS; -import static org.apache.kafka.coordinator.group.assignor.SubscriptionType.HOMOGENEOUS; +import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HETEROGENEOUS; +import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS; @State(Scope.Benchmark) @Fork(value = 1) diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java index 0b1681e0d18..56d5142a329 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java @@ -17,18 +17,19 @@ package org.apache.kafka.jmh.assignor; import org.apache.kafka.common.Uuid; -import org.apache.kafka.coordinator.group.assignor.GroupSpecImpl; -import org.apache.kafka.coordinator.group.assignor.GroupAssignment; -import org.apache.kafka.coordinator.group.assignor.MemberAssignment; -import org.apache.kafka.coordinator.group.assignor.MemberSubscriptionSpecImpl; -import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.consumer.MemberAssignmentImpl; +import org.apache.kafka.coordinator.group.consumer.GroupSpecImpl; +import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.consumer.MemberSubscriptionAndAssignmentImpl; +import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignor; import org.apache.kafka.coordinator.group.assignor.RangeAssignor; -import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber; -import org.apache.kafka.coordinator.group.assignor.SubscriptionType; +import org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber; +import org.apache.kafka.coordinator.group.api.assignor.SubscriptionType; import org.apache.kafka.coordinator.group.consumer.Assignment; import org.apache.kafka.coordinator.group.consumer.TopicIds; import org.apache.kafka.coordinator.group.assignor.UniformAssignor; -import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata; +import org.apache.kafka.coordinator.group.consumer.SubscribedTopicDescriberImpl; import org.apache.kafka.coordinator.group.consumer.TopicMetadata; import org.apache.kafka.image.MetadataDelta; @@ -61,8 +62,8 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; -import static org.apache.kafka.coordinator.group.assignor.SubscriptionType.HETEROGENEOUS; -import static org.apache.kafka.coordinator.group.assignor.SubscriptionType.HOMOGENEOUS; +import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HETEROGENEOUS; +import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS; @State(Scope.Benchmark) @Fork(value = 1) @@ -133,7 +134,7 @@ public class ServerSideAssignorBenchmark { @Setup(Level.Trial) public void setup() { Map topicMetadata = createTopicMetadata(); - subscribedTopicDescriber = new SubscribedTopicMetadata(topicMetadata); + subscribedTopicDescriber = new SubscribedTopicDescriberImpl(topicMetadata); createGroupSpec(); @@ -177,7 +178,7 @@ public class ServerSideAssignorBenchmark { } private void createGroupSpec() { - Map members = new HashMap<>(); + Map members = new HashMap<>(); // In the rebalance case, we will add the last member as a trigger. // This is done to keep the total members count consistent with the input. @@ -228,14 +229,14 @@ public class ServerSideAssignorBenchmark { } private void addMemberSpec( - Map members, + Map members, int memberIndex, Set subscribedTopicIds ) { String memberId = "member" + memberIndex; Optional rackId = rackId(memberIndex); - members.put(memberId, new MemberSubscriptionSpecImpl( + members.put(memberId, new MemberSubscriptionAndAssignmentImpl( rackId, subscribedTopicIds, Assignment.EMPTY @@ -260,18 +261,18 @@ public class ServerSideAssignorBenchmark { Map> invertedTargetAssignment = AssignorBenchmarkUtils.computeInvertedTargetAssignment(initialAssignment); - Map updatedMemberSpec = new HashMap<>(); + Map updatedMemberSpec = new HashMap<>(); for (String memberId : groupSpec.memberIds()) { MemberAssignment memberAssignment = members.getOrDefault( memberId, - new MemberAssignment(Collections.emptyMap()) + new MemberAssignmentImpl(Collections.emptyMap()) ); - updatedMemberSpec.put(memberId, new MemberSubscriptionSpecImpl( + updatedMemberSpec.put(memberId, new MemberSubscriptionAndAssignmentImpl( groupSpec.memberSubscription(memberId).rackId(), groupSpec.memberSubscription(memberId).subscribedTopicIds(), - new Assignment(Collections.unmodifiableMap(memberAssignment.targetPartitions())) + new Assignment(Collections.unmodifiableMap(memberAssignment.partitions())) )); } @@ -283,7 +284,7 @@ public class ServerSideAssignorBenchmark { } Optional rackId = rackId(memberCount - 1); - updatedMemberSpec.put("newMember", new MemberSubscriptionSpecImpl( + updatedMemberSpec.put("newMember", new MemberSubscriptionAndAssignmentImpl( rackId, subscribedTopicIdsForNewMember, Assignment.EMPTY diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java index 97508ef9f57..2731437771c 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java @@ -17,16 +17,16 @@ package org.apache.kafka.jmh.assignor; import org.apache.kafka.common.Uuid; -import org.apache.kafka.coordinator.group.assignor.GroupSpecImpl; -import org.apache.kafka.coordinator.group.assignor.GroupAssignment; -import org.apache.kafka.coordinator.group.assignor.MemberAssignment; -import org.apache.kafka.coordinator.group.assignor.MemberSubscriptionSpecImpl; -import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.consumer.GroupSpecImpl; +import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.consumer.MemberSubscriptionAndAssignmentImpl; +import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignor; import org.apache.kafka.coordinator.group.consumer.TopicIds; import org.apache.kafka.coordinator.group.assignor.UniformAssignor; import org.apache.kafka.coordinator.group.consumer.Assignment; import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; -import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata; +import org.apache.kafka.coordinator.group.consumer.SubscribedTopicDescriberImpl; import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder; import org.apache.kafka.coordinator.group.consumer.TopicMetadata; @@ -59,7 +59,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; -import static org.apache.kafka.coordinator.group.assignor.SubscriptionType.HOMOGENEOUS; +import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS; @State(Scope.Benchmark) @Fork(value = 1) @@ -175,7 +175,7 @@ public class TargetAssignmentBuilderBenchmark { GroupAssignment groupAssignment = partitionAssignor.assign( groupSpec, - new SubscribedTopicMetadata(topicMetadataMap) + new SubscribedTopicDescriberImpl(topicMetadataMap) ); invertedTargetAssignment = AssignorBenchmarkUtils.computeInvertedTargetAssignment(groupAssignment); @@ -183,7 +183,7 @@ public class TargetAssignmentBuilderBenchmark { for (Map.Entry entry : groupAssignment.members().entrySet()) { String memberId = entry.getKey(); - Map> topicPartitions = entry.getValue().targetPartitions(); + Map> topicPartitions = entry.getValue().partitions(); initialTargetAssignment.put(memberId, new Assignment(topicPartitions)); } @@ -191,12 +191,12 @@ public class TargetAssignmentBuilderBenchmark { } private void createAssignmentSpec() { - Map members = new HashMap<>(); + Map members = new HashMap<>(); for (int i = 0; i < memberCount - 1; i++) { String memberId = "member" + i; - members.put(memberId, new MemberSubscriptionSpecImpl( + members.put(memberId, new MemberSubscriptionAndAssignmentImpl( Optional.empty(), new TopicIds(new HashSet<>(allTopicNames), topicsImage), Assignment.EMPTY diff --git a/settings.gradle b/settings.gradle index c2d66475ad4..09bc70605db 100644 --- a/settings.gradle +++ b/settings.gradle @@ -61,6 +61,7 @@ include 'clients', 'examples', 'generator', 'group-coordinator', + 'group-coordinator:group-coordinator-api', 'jmh-benchmarks', 'log4j-appender', 'metadata',