KAFKA-14701; Move `PartitionAssignor` to new `group-coordinator-api` module (#16198)

This patch moves the `PartitionAssignor` interface and all the related classes to a newly created `group-coordinator/api` module, following the pattern used by the storage and tools modules.

Reviewers: Ritika Reddy <rreddy@confluent.io>, Jeff Kim <jeff.kim@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
David Jacot 2024-06-06 21:19:20 +02:00
parent 25ca963980
commit 1b0edf4f8c
43 changed files with 561 additions and 405 deletions

View File

@ -943,6 +943,7 @@ project(':core') {
api libs.scalaLibrary
implementation project(':server-common')
implementation project(':group-coordinator:group-coordinator-api')
implementation project(':group-coordinator')
implementation project(':transaction-coordinator')
implementation project(':metadata')
@ -1326,6 +1327,66 @@ project(':metadata') {
}
}
project(':group-coordinator:group-coordinator-api') {
base {
archivesName = "kafka-group-coordinator-api"
}
dependencies {
implementation project(':clients')
}
task createVersionFile() {
def receiptFile = file("$buildDir/kafka/$buildVersionFileName")
inputs.property "commitId", commitId
inputs.property "version", version
outputs.file receiptFile
doLast {
def data = [
commitId: commitId,
version: version,
]
receiptFile.parentFile.mkdirs()
def content = data.entrySet().collect { "$it.key=$it.value" }.sort().join("\n")
receiptFile.setText(content, "ISO-8859-1")
}
}
sourceSets {
main {
java {
srcDirs = ["src/main/java"]
}
}
test {
java {
srcDirs = ["src/test/java"]
}
}
}
jar {
dependsOn createVersionFile
from("$buildDir") {
include "kafka/$buildVersionFileName"
}
}
clean.doFirst {
delete "$buildDir/kafka/"
}
javadoc {
include "**/org/apache/kafka/coordinator/group/api/**"
}
checkstyle {
configProperties = checkstyleConfigProperties("import-control-group-coordinator.xml")
}
}
project(':group-coordinator') {
base {
archivesName = "kafka-group-coordinator"
@ -1339,6 +1400,7 @@ project(':group-coordinator') {
implementation project(':server-common')
implementation project(':clients')
implementation project(':metadata')
implementation project(':group-coordinator:group-coordinator-api')
implementation project(':storage')
implementation libs.jacksonDatabind
implementation libs.jacksonJDK8Datatypes
@ -2883,6 +2945,7 @@ project(':jmh-benchmarks') {
implementation project(':raft')
implementation project(':clients')
implementation project(':group-coordinator')
implementation project(':group-coordinator:group-coordinator-api')
implementation project(':metadata')
implementation project(':storage')
implementation project(':streams')

View File

@ -37,7 +37,7 @@ import org.apache.kafka.common.utils.Utils
import org.apache.kafka.coordinator.group.ConsumerGroupMigrationPolicy
import org.apache.kafka.coordinator.group.Group.GroupType
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.coordinator.group.assignor.ConsumerGroupPartitionAssignor
import org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor
import org.apache.kafka.coordinator.transaction.{TransactionLogConfigs, TransactionStateManagerConfigs}
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.QuorumConfig

View File

@ -14,15 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.assignor;
package org.apache.kafka.coordinator.group.api.assignor;
import org.apache.kafka.common.annotation.InterfaceStability;
/**
* Server-side partition assignor for consumer groups used by the GroupCoordinator.
*
* The interface is kept in an internal module until KIP-848 is fully
* implemented and ready to be released.
* The new consumer group protocol is in preview so this interface is considered
* unstable until Apache Kafka 4.0.
*/
@InterfaceStability.Unstable
public interface ConsumerGroupPartitionAssignor extends PartitionAssignor {

View File

@ -14,7 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.assignor;
package org.apache.kafka.coordinator.group.api.assignor;
import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.Map;
import java.util.Objects;
@ -22,6 +24,7 @@ import java.util.Objects;
/**
* The partition assignment for a consumer group.
*/
@InterfaceStability.Unstable
public class GroupAssignment {
/**
* The member assignments keyed by member id.
@ -31,8 +34,7 @@ public class GroupAssignment {
public GroupAssignment(
Map<String, MemberAssignment> members
) {
Objects.requireNonNull(members);
this.members = members;
this.members = Objects.requireNonNull(members);
}
/**

View File

@ -14,17 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.assignor;
package org.apache.kafka.coordinator.group.api.assignor;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
/**
* The group metadata specifications required to compute the target assignment.
*/
@InterfaceStability.Unstable
public interface GroupSpec {
/**
* @return All the member Ids of the consumer group.
@ -45,18 +45,18 @@ public interface GroupSpec {
/**
* Gets the member subscription specification for a member.
*
* @param memberId The member Id.
* @param memberId The member Id.
* @return The member's subscription metadata.
* @throws IllegalArgumentException If the member Id isn't found.
*/
MemberSubscriptionSpec memberSubscription(String memberId);
MemberSubscription memberSubscription(String memberId);
/**
* Gets the current assignment of the member.
*
* @param memberId The member Id.
* @return A map of topic Ids to sets of partition numbers.
* An empty map is returned if the member Id isn't found.
* @param memberId The member Id.
* @return The member's assignment or an empty assignment if the
* member does not have one.
*/
Map<Uuid, Set<Integer>> memberAssignment(String memberId);
MemberAssignment memberAssignment(String memberId);
}

View File

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.api.assignor;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.Map;
import java.util.Set;
/**
* The partition assignment for a consumer group member.
*/
@InterfaceStability.Unstable
public interface MemberAssignment {
/**
* @return The assigned partitions keyed by topic Ids.
*/
Map<Uuid, Set<Integer>> partitions();
}

View File

@ -14,9 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.assignor;
package org.apache.kafka.coordinator.group.api.assignor;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.Optional;
import java.util.Set;
@ -24,7 +25,8 @@ import java.util.Set;
/**
* Interface representing the subscription metadata for a group member.
*/
public interface MemberSubscriptionSpec {
@InterfaceStability.Unstable
public interface MemberSubscription {
/**
* Gets the rack Id if present.
*

View File

@ -14,15 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.assignor;
package org.apache.kafka.coordinator.group.api.assignor;
import org.apache.kafka.common.annotation.InterfaceStability;
/**
* Server-side partition assignor used by the GroupCoordinator.
*
* The interface is kept in an internal module until KIP-848 is fully
* implemented and ready to be released.
* The new consumer group protocol is in preview so this interface is considered
* unstable until Apache Kafka 4.0.
*/
@InterfaceStability.Unstable
public interface PartitionAssignor {

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.assignor;
package org.apache.kafka.coordinator.group.api.assignor;
import org.apache.kafka.common.errors.ApiException;

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.assignor;
package org.apache.kafka.coordinator.group.api.assignor;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.annotation.InterfaceStability;

View File

@ -14,11 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.assignor;
package org.apache.kafka.coordinator.group.api.assignor;
import org.apache.kafka.common.annotation.InterfaceStability;
/**
* The subscription type followed by a consumer group.
*/
@InterfaceStability.Unstable
public enum SubscriptionType {
/**
* A homogeneous subscription type means that all the members

View File

@ -17,7 +17,7 @@
package org.apache.kafka.coordinator.group;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.coordinator.group.assignor.ConsumerGroupPartitionAssignor;
import org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor;
import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
import org.apache.kafka.coordinator.group.assignor.UniformAssignor;

View File

@ -58,10 +58,10 @@ import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.assignor.ConsumerGroupPartitionAssignor;
import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
import org.apache.kafka.coordinator.group.assignor.SubscriptionType;
import org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor;
import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException;
import org.apache.kafka.coordinator.group.api.assignor.SubscriptionType;
import org.apache.kafka.coordinator.group.consumer.Assignment;
import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
@ -1926,7 +1926,7 @@ public class GroupMetadataManager {
MemberAssignment newMemberAssignment = assignmentResult.targetAssignment().get(updatedMember.memberId());
if (newMemberAssignment != null) {
return new Assignment(newMemberAssignment.targetPartitions());
return new Assignment(newMemberAssignment.partitions());
} else {
return Assignment.EMPTY;
}

View File

@ -17,6 +17,9 @@
package org.apache.kafka.coordinator.group.assignor;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber;
import org.apache.kafka.server.common.TopicIdPartition;
import java.util.Collection;
@ -42,7 +45,7 @@ public abstract class AbstractUniformAssignmentBuilder {
int partition
) {
memberAssignments.get(memberId)
.targetPartitions()
.partitions()
.computeIfAbsent(topicId, __ -> new HashSet<>())
.add(partition);
}

View File

@ -17,6 +17,12 @@
package org.apache.kafka.coordinator.group.assignor;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.api.assignor.GroupSpec;
import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException;
import org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber;
import org.apache.kafka.coordinator.group.consumer.MemberAssignmentImpl;
import org.apache.kafka.server.common.TopicIdPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -124,7 +130,7 @@ public class GeneralUniformAssignmentBuilder extends AbstractUniformAssignmentBu
}
subscribedTopicIds.add(topicId);
membersPerTopic.computeIfAbsent(topicId, k -> new ArrayList<>()).add(memberId);
targetAssignment.put(memberId, new MemberAssignment(new HashMap<>()));
targetAssignment.put(memberId, new MemberAssignmentImpl(new HashMap<>()));
})
);
this.unassignedPartitions = new HashSet<>(topicIdPartitions(subscribedTopicIds, subscribedTopicDescriber));
@ -191,7 +197,7 @@ public class GeneralUniformAssignmentBuilder extends AbstractUniformAssignmentBu
*/
private void assignStickyPartitions() {
groupSpec.memberIds().forEach(memberId ->
groupSpec.memberAssignment(memberId).forEach((topicId, currentAssignment) -> {
groupSpec.memberAssignment(memberId).partitions().forEach((topicId, currentAssignment) -> {
if (groupSpec.memberSubscription(memberId).subscribedTopicIds().contains(topicId)) {
currentAssignment.forEach(partition -> {
TopicIdPartition topicIdPartition = new TopicIdPartition(topicId, partition);
@ -244,7 +250,7 @@ public class GeneralUniformAssignmentBuilder extends AbstractUniformAssignmentBu
* @return true if the member can participate in reassignment, false otherwise.
*/
private boolean canMemberParticipateInReassignment(String memberId) {
Set<Uuid> assignedTopicIds = targetAssignment.get(memberId).targetPartitions().keySet();
Set<Uuid> assignedTopicIds = targetAssignment.get(memberId).partitions().keySet();
int currentAssignmentSize = assignmentManager.targetAssignmentSize(memberId);
int maxAssignmentSize = assignmentManager.maxAssignmentSize(memberId);
@ -292,7 +298,7 @@ public class GeneralUniformAssignmentBuilder extends AbstractUniformAssignmentBu
// Otherwise make sure it cannot get any more partitions.
for (Uuid topicId : groupSpec.memberSubscription(member).subscribedTopicIds()) {
Set<Integer> assignedPartitions = targetAssignment.get(member).targetPartitions().get(topicId);
Set<Integer> assignedPartitions = targetAssignment.get(member).partitions().get(topicId);
for (int i = 0; i < subscribedTopicDescriber.numPartitions(topicId); i++) {
TopicIdPartition topicIdPartition = new TopicIdPartition(topicId, i);
if (assignedPartitions == null || !assignedPartitions.contains(i)) {
@ -784,7 +790,7 @@ public class GeneralUniformAssignmentBuilder extends AbstractUniformAssignmentBu
* @param memberId Member that the partition needs to be revoked from.
*/
private void removePartitionFromTargetAssignment(TopicIdPartition topicIdPartition, String memberId) {
Map<Uuid, Set<Integer>> targetPartitionsMap = targetAssignment.get(memberId).targetPartitions();
Map<Uuid, Set<Integer>> targetPartitionsMap = targetAssignment.get(memberId).partitions();
Set<Integer> partitionsSet = targetPartitionsMap.get(topicIdPartition.topicId());
// Remove the partition from the assignment, if there are no more partitions from a particular topic,
// remove the topic from the assignment as well.

View File

@ -17,6 +17,12 @@
package org.apache.kafka.coordinator.group.assignor;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.api.assignor.GroupSpec;
import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException;
import org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber;
import org.apache.kafka.coordinator.group.consumer.MemberAssignmentImpl;
import org.apache.kafka.server.common.TopicIdPartition;
import java.util.ArrayList;
@ -160,7 +166,7 @@ public class OptimizedUniformAssignmentBuilder {
*/
private void maybeRevokePartitions() {
for (String memberId : groupSpec.memberIds()) {
Map<Uuid, Set<Integer>> oldAssignment = groupSpec.memberAssignment(memberId);
Map<Uuid, Set<Integer>> oldAssignment = groupSpec.memberAssignment(memberId).partitions();
Map<Uuid, Set<Integer>> newAssignment = null;
// The assignor expects to receive the assignment as an immutable map. It leverages
@ -219,9 +225,9 @@ public class OptimizedUniformAssignmentBuilder {
}
if (newAssignment == null) {
targetAssignment.put(memberId, new MemberAssignment(oldAssignment));
targetAssignment.put(memberId, new MemberAssignmentImpl(oldAssignment));
} else {
targetAssignment.put(memberId, new MemberAssignment(newAssignment));
targetAssignment.put(memberId, new MemberAssignmentImpl(newAssignment));
}
}
}
@ -236,12 +242,12 @@ public class OptimizedUniformAssignmentBuilder {
String memberId = unfilledMember.memberId;
int remainingQuota = unfilledMember.remainingQuota;
Map<Uuid, Set<Integer>> newAssignment = targetAssignment.get(memberId).targetPartitions();
Map<Uuid, Set<Integer>> newAssignment = targetAssignment.get(memberId).partitions();
if (isImmutableMap(newAssignment)) {
// If the new assignment is immutable, we must create a deep copy of it
// before altering it.
newAssignment = deepCopy(newAssignment);
targetAssignment.put(memberId, new MemberAssignment(newAssignment));
targetAssignment.put(memberId, new MemberAssignmentImpl(newAssignment));
}
for (int i = 0; i < remainingQuota && unassignedPartitionIndex < unassignedPartitions.size(); i++) {

View File

@ -17,6 +17,13 @@
package org.apache.kafka.coordinator.group.assignor;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor;
import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.api.assignor.GroupSpec;
import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException;
import org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber;
import org.apache.kafka.coordinator.group.consumer.MemberAssignmentImpl;
import java.util.ArrayList;
import java.util.Collection;
@ -28,7 +35,7 @@ import java.util.Map;
import java.util.Set;
import static java.lang.Math.min;
import static org.apache.kafka.coordinator.group.assignor.SubscriptionType.HOMOGENEOUS;
import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS;
/**
* This Range Assignor inherits properties of both the range assignor and the sticky assignor.
@ -162,7 +169,9 @@ public class RangeAssignor implements ConsumerGroupPartitionAssignor {
List<MemberWithRemainingAssignments> potentiallyUnfilledMembers = new ArrayList<>();
for (String memberId : membersForTopic) {
Set<Integer> assignedPartitionsForTopic = groupSpec.memberAssignment(memberId)
Set<Integer> assignedPartitionsForTopic = groupSpec
.memberAssignment(memberId)
.partitions()
.getOrDefault(topicId, Collections.emptySet());
int currentAssignmentSize = assignedPartitionsForTopic.size();
@ -177,8 +186,8 @@ public class RangeAssignor implements ConsumerGroupPartitionAssignor {
for (int i = 0; i < retainedPartitionsCount; i++) {
assignedStickyPartitionsForTopic
.add(currentAssignmentListForTopic.get(i));
newTargetAssignment.computeIfAbsent(memberId, k -> new MemberAssignment(new HashMap<>()))
.targetPartitions()
newTargetAssignment.computeIfAbsent(memberId, k -> new MemberAssignmentImpl(new HashMap<>()))
.partitions()
.computeIfAbsent(topicId, k -> new HashSet<>())
.add(currentAssignmentListForTopic.get(i));
}
@ -198,8 +207,8 @@ public class RangeAssignor implements ConsumerGroupPartitionAssignor {
// add the extra partition that will be present at the index right after min quota was satisfied.
assignedStickyPartitionsForTopic
.add(currentAssignmentListForTopic.get(minRequiredQuota));
newTargetAssignment.computeIfAbsent(memberId, k -> new MemberAssignment(new HashMap<>()))
.targetPartitions()
newTargetAssignment.computeIfAbsent(memberId, k -> new MemberAssignmentImpl(new HashMap<>()))
.partitions()
.computeIfAbsent(topicId, k -> new HashSet<>())
.add(currentAssignmentListForTopic.get(minRequiredQuota));
} else {
@ -233,8 +242,8 @@ public class RangeAssignor implements ConsumerGroupPartitionAssignor {
List<Integer> partitionsToAssign = unassignedPartitionsForTopic
.subList(unassignedPartitionsListStartPointer, unassignedPartitionsListStartPointer + remaining);
unassignedPartitionsListStartPointer += remaining;
newTargetAssignment.computeIfAbsent(memberId, k -> new MemberAssignment(new HashMap<>()))
.targetPartitions()
newTargetAssignment.computeIfAbsent(memberId, k -> new MemberAssignmentImpl(new HashMap<>()))
.partitions()
.computeIfAbsent(topicId, k -> new HashSet<>())
.addAll(partitionsToAssign);
}

View File

@ -16,12 +16,17 @@
*/
package org.apache.kafka.coordinator.group.assignor;
import org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor;
import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.api.assignor.GroupSpec;
import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException;
import org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import static org.apache.kafka.coordinator.group.assignor.SubscriptionType.HOMOGENEOUS;
import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS;
/**
* The Uniform Assignor distributes topic partitions among group members for a

View File

@ -17,6 +17,7 @@
package org.apache.kafka.coordinator.group.consumer;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
import java.util.Collections;
@ -29,7 +30,7 @@ import java.util.stream.Collectors;
/**
* An immutable assignment for a member.
*/
public class Assignment {
public class Assignment implements MemberAssignment {
public static final Assignment EMPTY = new Assignment(Collections.emptyMap());
/**
@ -46,6 +47,7 @@ public class Assignment {
/**
* @return The assigned partitions.
*/
@Override
public Map<Uuid, Set<Integer>> partitions() {
return partitions;
}

View File

@ -33,7 +33,7 @@ import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
import org.apache.kafka.coordinator.group.CoordinatorRecord;
import org.apache.kafka.coordinator.group.CoordinatorRecordHelpers;
import org.apache.kafka.coordinator.group.assignor.SubscriptionType;
import org.apache.kafka.coordinator.group.api.assignor.SubscriptionType;
import org.apache.kafka.coordinator.group.classic.ClassicGroup;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
@ -56,8 +56,8 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import static org.apache.kafka.coordinator.group.assignor.SubscriptionType.HETEROGENEOUS;
import static org.apache.kafka.coordinator.group.assignor.SubscriptionType.HOMOGENEOUS;
import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HETEROGENEOUS;
import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS;
import static org.apache.kafka.coordinator.group.consumer.ConsumerGroup.ConsumerGroupState.ASSIGNING;
import static org.apache.kafka.coordinator.group.consumer.ConsumerGroup.ConsumerGroupState.EMPTY;
import static org.apache.kafka.coordinator.group.consumer.ConsumerGroup.ConsumerGroupState.RECONCILING;

View File

@ -14,15 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.assignor;
package org.apache.kafka.coordinator.group.consumer;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.api.assignor.GroupSpec;
import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.api.assignor.MemberSubscription;
import org.apache.kafka.coordinator.group.api.assignor.SubscriptionType;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
/**
* The assignment specification for a consumer group.
@ -31,7 +34,7 @@ public class GroupSpecImpl implements GroupSpec {
/**
* Member subscription metadata keyed by member Id.
*/
private final Map<String, MemberSubscriptionSpecImpl> memberSubscriptions;
private final Map<String, MemberSubscriptionAndAssignmentImpl> members;
/**
* The subscription type of the group.
@ -45,11 +48,11 @@ public class GroupSpecImpl implements GroupSpec {
private final Map<Uuid, Map<Integer, String>> invertedMemberAssignment;
public GroupSpecImpl(
Map<String, MemberSubscriptionSpecImpl> memberSubscriptions,
Map<String, MemberSubscriptionAndAssignmentImpl> members,
SubscriptionType subscriptionType,
Map<Uuid, Map<Integer, String>> invertedMemberAssignment
) {
this.memberSubscriptions = Objects.requireNonNull(memberSubscriptions);
this.members = Objects.requireNonNull(members);
this.subscriptionType = Objects.requireNonNull(subscriptionType);
this.invertedMemberAssignment = Objects.requireNonNull(invertedMemberAssignment);
}
@ -59,7 +62,7 @@ public class GroupSpecImpl implements GroupSpec {
*/
@Override
public Collection<String> memberIds() {
return memberSubscriptions.keySet();
return members.keySet();
}
/**
@ -86,8 +89,8 @@ public class GroupSpecImpl implements GroupSpec {
* {@inheritDoc}
*/
@Override
public MemberSubscriptionSpec memberSubscription(String memberId) {
MemberSubscriptionSpec memberSubscription = memberSubscriptions.get(memberId);
public MemberSubscription memberSubscription(String memberId) {
MemberSubscription memberSubscription = members.get(memberId);
if (memberSubscription == null) {
throw new IllegalArgumentException("Member Id " + memberId + " not found.");
}
@ -98,12 +101,12 @@ public class GroupSpecImpl implements GroupSpec {
* {@inheritDoc}
*/
@Override
public Map<Uuid, Set<Integer>> memberAssignment(String memberId) {
MemberSubscriptionSpecImpl memberSubscription = memberSubscriptions.get(memberId);
if (memberSubscription == null) {
return Collections.emptyMap();
public MemberAssignment memberAssignment(String memberId) {
MemberSubscriptionAndAssignmentImpl member = members.get(memberId);
if (member == null) {
return new MemberAssignmentImpl(Collections.emptyMap());
}
return memberSubscription.memberAssignment();
return member;
}
@Override
@ -112,13 +115,13 @@ public class GroupSpecImpl implements GroupSpec {
if (o == null || getClass() != o.getClass()) return false;
GroupSpecImpl that = (GroupSpecImpl) o;
return subscriptionType == that.subscriptionType &&
memberSubscriptions.equals(that.memberSubscriptions) &&
members.equals(that.members) &&
invertedMemberAssignment.equals(that.invertedMemberAssignment);
}
@Override
public int hashCode() {
int result = memberSubscriptions.hashCode();
int result = members.hashCode();
result = 31 * result + subscriptionType.hashCode();
result = 31 * result + invertedMemberAssignment.hashCode();
return result;
@ -126,7 +129,7 @@ public class GroupSpecImpl implements GroupSpec {
@Override
public String toString() {
return "GroupSpecImpl(memberSubscriptions=" + memberSubscriptions +
return "GroupSpecImpl(members=" + members +
", subscriptionType=" + subscriptionType +
", invertedMemberAssignment=" + invertedMemberAssignment +
')';

View File

@ -14,9 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.assignor;
package org.apache.kafka.coordinator.group.consumer;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment;
import java.util.Map;
import java.util.Objects;
@ -25,39 +26,39 @@ import java.util.Set;
/**
* The partition assignment for a consumer group member.
*/
public class MemberAssignment {
public class MemberAssignmentImpl implements MemberAssignment {
/**
* The target partitions assigned to this member keyed by topicId.
* The partitions assigned to this member keyed by topicId.
*/
private final Map<Uuid, Set<Integer>> targetPartitions;
private final Map<Uuid, Set<Integer>> partitions;
public MemberAssignment(Map<Uuid, Set<Integer>> targetPartitions) {
Objects.requireNonNull(targetPartitions);
this.targetPartitions = targetPartitions;
public MemberAssignmentImpl(Map<Uuid, Set<Integer>> partitions) {
this.partitions = Objects.requireNonNull(partitions);
}
/**
* @return Target partitions keyed by topic Ids.
* @return The assigned partitions keyed by topic Ids.
*/
public Map<Uuid, Set<Integer>> targetPartitions() {
return this.targetPartitions;
@Override
public Map<Uuid, Set<Integer>> partitions() {
return this.partitions;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
MemberAssignment that = (MemberAssignment) o;
return targetPartitions.equals(that.targetPartitions);
MemberAssignmentImpl that = (MemberAssignmentImpl) o;
return partitions.equals(that.partitions);
}
@Override
public int hashCode() {
return targetPartitions.hashCode();
return partitions.hashCode();
}
@Override
public String toString() {
return "MemberAssignment(targetPartitions=" + targetPartitions + ')';
return "MemberAssignment(partitions=" + partitions + ')';
}
}

View File

@ -14,10 +14,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.assignor;
package org.apache.kafka.coordinator.group.consumer;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.consumer.Assignment;
import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.api.assignor.MemberSubscription;
import java.util.Map;
import java.util.Objects;
@ -25,21 +26,21 @@ import java.util.Optional;
import java.util.Set;
/**
* Implementation of the {@link MemberSubscriptionSpec} interface.
* Implementation of the {@link MemberSubscription} and the {@link MemberAssignment} interfaces.
*/
public class MemberSubscriptionSpecImpl implements MemberSubscriptionSpec {
public class MemberSubscriptionAndAssignmentImpl implements MemberSubscription, MemberAssignment {
private final Optional<String> rackId;
private final Set<Uuid> subscribedTopicIds;
private final Assignment memberAssignment;
/**
* Constructs a new {@code MemberSubscriptionSpecImpl}.
* Constructs a new {@code MemberSubscriptionAndAssignmentImpl}.
*
* @param rackId The rack Id.
* @param subscribedTopicIds The set of subscribed topic Ids.
* @param memberAssignment The current member assignment.
*/
public MemberSubscriptionSpecImpl(
public MemberSubscriptionAndAssignmentImpl(
Optional<String> rackId,
Set<Uuid> subscribedTopicIds,
Assignment memberAssignment
@ -59,7 +60,8 @@ public class MemberSubscriptionSpecImpl implements MemberSubscriptionSpec {
return subscribedTopicIds;
}
public Map<Uuid, Set<Integer>> memberAssignment() {
@Override
public Map<Uuid, Set<Integer>> partitions() {
return memberAssignment.partitions();
}
@ -67,7 +69,7 @@ public class MemberSubscriptionSpecImpl implements MemberSubscriptionSpec {
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
MemberSubscriptionSpecImpl that = (MemberSubscriptionSpecImpl) o;
MemberSubscriptionAndAssignmentImpl that = (MemberSubscriptionAndAssignmentImpl) o;
return rackId.equals(that.rackId) &&
subscribedTopicIds.equals(that.subscribedTopicIds) &&
memberAssignment.equals(that.memberAssignment);
@ -83,7 +85,7 @@ public class MemberSubscriptionSpecImpl implements MemberSubscriptionSpec {
@Override
public String toString() {
return "MemberSubscriptionSpecImpl(rackId=" + rackId.orElse("N/A") +
return "MemberSubscriptionAndAssignmentImpl(rackId=" + rackId.orElse("N/A") +
", subscribedTopicIds=" + subscribedTopicIds +
", memberAssignment=" + memberAssignment +
')';

View File

@ -17,8 +17,8 @@
package org.apache.kafka.coordinator.group.consumer;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignor;
import org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber;
import java.util.Collections;
import java.util.Map;
@ -29,14 +29,14 @@ import java.util.Set;
* The subscribed topic metadata class is used by the {@link PartitionAssignor} to obtain
* topic and partition metadata for the topics that the consumer group is subscribed to.
*/
public class SubscribedTopicMetadata implements SubscribedTopicDescriber {
public class SubscribedTopicDescriberImpl implements SubscribedTopicDescriber {
/**
* The topic Ids mapped to their corresponding {@link TopicMetadata}
* object, which contains topic and partition metadata.
*/
private final Map<Uuid, TopicMetadata> topicMetadata;
public SubscribedTopicMetadata(Map<Uuid, TopicMetadata> topicMetadata) {
public SubscribedTopicDescriberImpl(Map<Uuid, TopicMetadata> topicMetadata) {
this.topicMetadata = Objects.requireNonNull(topicMetadata);
}
@ -80,7 +80,7 @@ public class SubscribedTopicMetadata implements SubscribedTopicDescriber {
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SubscribedTopicMetadata that = (SubscribedTopicMetadata) o;
SubscribedTopicDescriberImpl that = (SubscribedTopicDescriberImpl) o;
return topicMetadata.equals(that.topicMetadata);
}

View File

@ -18,13 +18,11 @@ package org.apache.kafka.coordinator.group.consumer;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.CoordinatorRecord;
import org.apache.kafka.coordinator.group.assignor.GroupSpecImpl;
import org.apache.kafka.coordinator.group.assignor.MemberSubscriptionSpecImpl;
import org.apache.kafka.coordinator.group.assignor.SubscriptionType;
import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
import org.apache.kafka.coordinator.group.api.assignor.SubscriptionType;
import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignor;
import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException;
import org.apache.kafka.image.TopicsImage;
import java.util.ArrayList;
@ -293,11 +291,11 @@ public class TargetAssignmentBuilder {
* @throws PartitionAssignorException if the target assignment cannot be computed.
*/
public TargetAssignmentResult build() throws PartitionAssignorException {
Map<String, MemberSubscriptionSpecImpl> memberSpecs = new HashMap<>();
Map<String, MemberSubscriptionAndAssignmentImpl> memberSpecs = new HashMap<>();
// Prepare the member spec for all members.
members.forEach((memberId, member) ->
memberSpecs.put(memberId, createMemberSubscriptionSpecImpl(
memberSpecs.put(memberId, createMemberSubscriptionAndAssignment(
member,
targetAssignment.getOrDefault(memberId, Assignment.EMPTY),
topicsImage
@ -319,7 +317,7 @@ public class TargetAssignmentBuilder {
}
}
memberSpecs.put(memberId, createMemberSubscriptionSpecImpl(
memberSpecs.put(memberId, createMemberSubscriptionAndAssignment(
updatedMemberOrNull,
assignment,
topicsImage
@ -343,7 +341,7 @@ public class TargetAssignmentBuilder {
subscriptionType,
invertedTargetAssignment
),
new SubscribedTopicMetadata(topicMetadataMap)
new SubscribedTopicDescriberImpl(topicMetadataMap)
);
// Compute delta from previous to new target assignment and create the
@ -377,19 +375,19 @@ public class TargetAssignmentBuilder {
) {
MemberAssignment newMemberAssignment = newGroupAssignment.members().get(memberId);
if (newMemberAssignment != null) {
return new Assignment(newMemberAssignment.targetPartitions());
return new Assignment(newMemberAssignment.partitions());
} else {
return Assignment.EMPTY;
}
}
// private for testing
static MemberSubscriptionSpecImpl createMemberSubscriptionSpecImpl(
static MemberSubscriptionAndAssignmentImpl createMemberSubscriptionAndAssignment(
ConsumerGroupMember member,
Assignment memberAssignment,
TopicsImage topicsImage
) {
return new MemberSubscriptionSpecImpl(
return new MemberSubscriptionAndAssignmentImpl(
Optional.ofNullable(member.rackId()),
new TopicIds(member.subscribedTopicNames(), topicsImage),
memberAssignment

View File

@ -17,8 +17,8 @@
package org.apache.kafka.coordinator.group;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.assignor.MemberSubscriptionSpecImpl;
import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.consumer.MemberSubscriptionAndAssignmentImpl;
import java.util.AbstractMap;
import java.util.Arrays;
@ -80,7 +80,7 @@ public class AssignmentTestUtil {
) {
assertEquals(expectedAssignment.size(), computedGroupAssignment.members().size());
computedGroupAssignment.members().forEach((memberId, memberAssignment) -> {
Map<Uuid, Set<Integer>> computedAssignmentForMember = memberAssignment.targetPartitions();
Map<Uuid, Set<Integer>> computedAssignmentForMember = memberAssignment.partitions();
assertEquals(expectedAssignment.get(memberId), computedAssignmentForMember);
});
}
@ -92,12 +92,12 @@ public class AssignmentTestUtil {
* @return Map of topic partition to member assignments.
*/
public static Map<Uuid, Map<Integer, String>> invertedTargetAssignment(
Map<String, MemberSubscriptionSpecImpl> members
Map<String, MemberSubscriptionAndAssignmentImpl> members
) {
Map<Uuid, Map<Integer, String>> invertedTargetAssignment = new HashMap<>();
for (Map.Entry<String, MemberSubscriptionSpecImpl> memberEntry : members.entrySet()) {
for (Map.Entry<String, MemberSubscriptionAndAssignmentImpl> memberEntry : members.entrySet()) {
String memberId = memberEntry.getKey();
Map<Uuid, Set<Integer>> memberAssignment = memberEntry.getValue().memberAssignment();
Map<Uuid, Set<Integer>> memberAssignment = memberEntry.getValue().partitions();
for (Map.Entry<Uuid, Set<Integer>> topicEntry : memberAssignment.entrySet()) {
Uuid topicId = topicEntry.getKey();

View File

@ -17,7 +17,7 @@
package org.apache.kafka.coordinator.group;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.coordinator.group.assignor.ConsumerGroupPartitionAssignor;
import org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor;
import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
import org.junit.jupiter.api.Test;

View File

@ -61,10 +61,11 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.group.MockCoordinatorTimer.ExpiredTimeout;
import org.apache.kafka.coordinator.group.MockCoordinatorTimer.ScheduledTimeout;
import org.apache.kafka.coordinator.group.assignor.ConsumerGroupPartitionAssignor;
import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
import org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor;
import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.consumer.MemberAssignmentImpl;
import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException;
import org.apache.kafka.coordinator.group.classic.ClassicGroupState;
import org.apache.kafka.coordinator.group.consumer.Assignment;
import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
@ -435,7 +436,7 @@ public class GroupMetadataManagerTest {
.build();
assignor.prepareGroupAssignment(new GroupAssignment(
Collections.singletonMap(memberId, new MemberAssignment(mkAssignment(
Collections.singletonMap(memberId, new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5),
mkTopicAssignment(barTopicId, 0, 1, 2)
)))
@ -540,7 +541,7 @@ public class GroupMetadataManagerTest {
.build();
assignor.prepareGroupAssignment(new GroupAssignment(
Collections.singletonMap(memberId, new MemberAssignment(mkAssignment(
Collections.singletonMap(memberId, new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5),
mkTopicAssignment(barTopicId, 0, 1, 2)
)))
@ -663,15 +664,15 @@ public class GroupMetadataManagerTest {
assignor.prepareGroupAssignment(new GroupAssignment(
new HashMap<String, MemberAssignment>() {
{
put(memberId1, new MemberAssignment(mkAssignment(
put(memberId1, new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1),
mkTopicAssignment(barTopicId, 0)
)));
put(memberId2, new MemberAssignment(mkAssignment(
put(memberId2, new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 2, 3),
mkTopicAssignment(barTopicId, 1)
)));
put(memberId3, new MemberAssignment(mkAssignment(
put(memberId3, new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 4, 5),
mkTopicAssignment(barTopicId, 2)
)));
@ -897,15 +898,15 @@ public class GroupMetadataManagerTest {
assignor.prepareGroupAssignment(new GroupAssignment(
new HashMap<String, MemberAssignment>() {
{
put(memberId1, new MemberAssignment(mkAssignment(
put(memberId1, new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1),
mkTopicAssignment(barTopicId, 0)
)));
put(memberId2, new MemberAssignment(mkAssignment(
put(memberId2, new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 2, 3),
mkTopicAssignment(barTopicId, 1)
)));
put(memberId3, new MemberAssignment(mkAssignment(
put(memberId3, new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 4, 5),
mkTopicAssignment(barTopicId, 2)
)));
@ -1047,12 +1048,12 @@ public class GroupMetadataManagerTest {
assignor.prepareGroupAssignment(new GroupAssignment(
new HashMap<String, MemberAssignment>() {
{
put(memberId1, new MemberAssignment(mkAssignment(
put(memberId1, new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2),
mkTopicAssignment(barTopicId, 0, 1)
)));
// When the member rejoins, it gets the same assignments.
put(member2RejoinId, new MemberAssignment(mkAssignment(
put(member2RejoinId, new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 3, 4, 5),
mkTopicAssignment(barTopicId, 2)
)));
@ -1680,7 +1681,7 @@ public class GroupMetadataManagerTest {
assignor.prepareGroupAssignment(new GroupAssignment(
new HashMap<String, MemberAssignment>() {
{
put(memberId, new MemberAssignment(mkAssignment(
put(memberId, new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1)
)));
}
@ -1816,15 +1817,15 @@ public class GroupMetadataManagerTest {
assignor.prepareGroupAssignment(new GroupAssignment(
new HashMap<String, MemberAssignment>() {
{
put(memberId1, new MemberAssignment(mkAssignment(
put(memberId1, new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1),
mkTopicAssignment(barTopicId, 0)
)));
put(memberId2, new MemberAssignment(mkAssignment(
put(memberId2, new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 2, 3),
mkTopicAssignment(barTopicId, 2)
)));
put(memberId3, new MemberAssignment(mkAssignment(
put(memberId3, new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 4, 5),
mkTopicAssignment(barTopicId, 1)
)));
@ -2395,7 +2396,7 @@ public class GroupMetadataManagerTest {
// Prepare the assignment result.
assignor.prepareGroupAssignment(new GroupAssignment(
Collections.singletonMap(memberId, new MemberAssignment(mkAssignment(
Collections.singletonMap(memberId, new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
)))
));
@ -2506,7 +2507,7 @@ public class GroupMetadataManagerTest {
// Prepare the assignment result.
assignor.prepareGroupAssignment(new GroupAssignment(
Collections.singletonMap(memberId, new MemberAssignment(mkAssignment(
Collections.singletonMap(memberId, new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
)))
));
@ -2808,7 +2809,7 @@ public class GroupMetadataManagerTest {
.build();
assignor.prepareGroupAssignment(new GroupAssignment(
Collections.singletonMap(memberId, new MemberAssignment(mkAssignment(
Collections.singletonMap(memberId, new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
)))
));
@ -2883,7 +2884,7 @@ public class GroupMetadataManagerTest {
.build();
assignor.prepareGroupAssignment(new GroupAssignment(
Collections.singletonMap(memberId, new MemberAssignment(mkAssignment(
Collections.singletonMap(memberId, new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
)))
));
@ -2947,7 +2948,7 @@ public class GroupMetadataManagerTest {
.build();
assignor.prepareGroupAssignment(new GroupAssignment(
Collections.singletonMap(memberId, new MemberAssignment(mkAssignment(
Collections.singletonMap(memberId, new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
)))
));
@ -3031,7 +3032,7 @@ public class GroupMetadataManagerTest {
assignor.prepareGroupAssignment(new GroupAssignment(
new HashMap<String, MemberAssignment>() {
{
put(memberId1, new MemberAssignment(mkAssignment(
put(memberId1, new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2)
)));
}
@ -3071,10 +3072,10 @@ public class GroupMetadataManagerTest {
assignor.prepareGroupAssignment(new GroupAssignment(
new HashMap<String, MemberAssignment>() {
{
put(memberId1, new MemberAssignment(mkAssignment(
put(memberId1, new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1)
)));
put(memberId2, new MemberAssignment(mkAssignment(
put(memberId2, new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 2)
)));
}
@ -3186,7 +3187,7 @@ public class GroupMetadataManagerTest {
assignor.prepareGroupAssignment(new GroupAssignment(
new HashMap<String, MemberAssignment>() {
{
put(memberId1, new MemberAssignment(mkAssignment(
put(memberId1, new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2)
)));
}
@ -3226,10 +3227,10 @@ public class GroupMetadataManagerTest {
assignor.prepareGroupAssignment(new GroupAssignment(
new HashMap<String, MemberAssignment>() {
{
put(memberId1, new MemberAssignment(mkAssignment(
put(memberId1, new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1)
)));
put(memberId2, new MemberAssignment(mkAssignment(
put(memberId2, new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 2)
)));
}
@ -9233,7 +9234,7 @@ public class GroupMetadataManagerTest {
.build();
assignor.prepareGroupAssignment(new GroupAssignment(
Collections.singletonMap(memberId, new MemberAssignment(mkAssignment(
Collections.singletonMap(memberId, new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5),
mkTopicAssignment(barTopicId, 0, 1, 2)
)))
@ -9476,10 +9477,10 @@ public class GroupMetadataManagerTest {
assignor.prepareGroupAssignment(new GroupAssignment(
new HashMap<String, MemberAssignment>() {
{
put(memberId1, new MemberAssignment(mkAssignment(
put(memberId1, new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 0)
)));
put(memberId2, new MemberAssignment(mkAssignment(
put(memberId2, new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(barTopicId, 0)
)));
}
@ -9652,13 +9653,13 @@ public class GroupMetadataManagerTest {
assignor.prepareGroupAssignment(new GroupAssignment(
new HashMap<String, MemberAssignment>() {
{
put(memberId1, new MemberAssignment(mkAssignment(
put(memberId1, new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 0)
)));
put(memberId2, new MemberAssignment(mkAssignment(
put(memberId2, new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(barTopicId, 0)
)));
put(memberId3, new MemberAssignment(mkAssignment(
put(memberId3, new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 1)
)));
}
@ -9899,13 +9900,13 @@ public class GroupMetadataManagerTest {
assignor.prepareGroupAssignment(new GroupAssignment(
new HashMap<String, MemberAssignment>() {
{
put(memberId1, new MemberAssignment(mkAssignment(
put(memberId1, new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 0)
)));
put(memberId2, new MemberAssignment(mkAssignment(
put(memberId2, new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(barTopicId, 0)
)));
put(memberId3, new MemberAssignment(mkAssignment(
put(memberId3, new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 1)
)));
}
@ -10730,11 +10731,11 @@ public class GroupMetadataManagerTest {
assignor.prepareGroupAssignment(new GroupAssignment(
new HashMap<String, MemberAssignment>() {
{
put(memberId1, new MemberAssignment(mkAssignment(
put(memberId1, new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2),
mkTopicAssignment(barTopicId, 0, 1)
)));
put(memberId2, new MemberAssignment(mkAssignment(
put(memberId2, new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 3, 4, 5)
)));
}
@ -10970,10 +10971,10 @@ public class GroupMetadataManagerTest {
assignor.prepareGroupAssignment(new GroupAssignment(
new HashMap<String, MemberAssignment>() {
{
put(memberId, new MemberAssignment(mkAssignment(
put(memberId, new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 0)
)));
put(newMemberId, new MemberAssignment(mkAssignment(
put(newMemberId, new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(barTopicId, 0)
)));
}
@ -11059,10 +11060,10 @@ public class GroupMetadataManagerTest {
assignor.prepareGroupAssignment(new GroupAssignment(
new HashMap<String, MemberAssignment>() {
{
put(memberId, new MemberAssignment(mkAssignment(
put(memberId, new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 0)
)));
put(newMemberId, new MemberAssignment(mkAssignment(
put(newMemberId, new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 1)
)));
}
@ -11389,11 +11390,11 @@ public class GroupMetadataManagerTest {
assignor.prepareGroupAssignment(new GroupAssignment(
new HashMap<String, MemberAssignment>() {
{
put(memberId1, new MemberAssignment(mkAssignment(
put(memberId1, new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 0),
mkTopicAssignment(zarTopicId, 0)
)));
put(memberId2, new MemberAssignment(mkAssignment(
put(memberId2, new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(barTopicId, 0),
mkTopicAssignment(fooTopicId, 1)
)));
@ -11619,11 +11620,11 @@ public class GroupMetadataManagerTest {
assignor.prepareGroupAssignment(new GroupAssignment(
new HashMap<String, MemberAssignment>() {
{
put(memberId1, new MemberAssignment(mkAssignment(
put(memberId1, new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1),
mkTopicAssignment(zarTopicId, 0)
)));
put(memberId2, new MemberAssignment(mkAssignment(
put(memberId2, new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(barTopicId, 0)
)));
}
@ -11861,11 +11862,11 @@ public class GroupMetadataManagerTest {
assignor.prepareGroupAssignment(new GroupAssignment(
new HashMap<String, MemberAssignment>() {
{
put(memberId1, new MemberAssignment(mkAssignment(
put(memberId1, new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1),
mkTopicAssignment(zarTopicId, 0)
)));
put(memberId2, new MemberAssignment(mkAssignment(
put(memberId2, new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(barTopicId, 0)
)));
}

View File

@ -46,7 +46,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.coordinator.group.assignor.ConsumerGroupPartitionAssignor;
import org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor;
import org.apache.kafka.coordinator.group.classic.ClassicGroup;
import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
import org.apache.kafka.coordinator.group.consumer.ConsumerGroupBuilder;

View File

@ -17,11 +17,11 @@
package org.apache.kafka.coordinator.group;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.assignor.ConsumerGroupPartitionAssignor;
import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.assignor.GroupSpec;
import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
import org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor;
import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.api.assignor.GroupSpec;
import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException;
import org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber;
import java.util.Map;
import java.util.Objects;
@ -51,6 +51,6 @@ public class MockPartitionAssignor implements ConsumerGroupPartitionAssignor {
public Map<Uuid, Set<Integer>> targetPartitions(String memberId) {
Objects.requireNonNull(prepareGroupAssignment);
return prepareGroupAssignment.members().get(memberId).targetPartitions();
return prepareGroupAssignment.members().get(memberId).partitions();
}
}

View File

@ -16,12 +16,12 @@
*/
package org.apache.kafka.coordinator.group;
import org.apache.kafka.coordinator.group.assignor.ConsumerGroupPartitionAssignor;
import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.assignor.GroupSpec;
import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
import org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor;
import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.api.assignor.GroupSpec;
import org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber;
import java.util.function.Function;
import java.util.stream.Collectors;
public class NoOpPartitionAssignor implements ConsumerGroupPartitionAssignor {
@ -36,9 +36,6 @@ public class NoOpPartitionAssignor implements ConsumerGroupPartitionAssignor {
public GroupAssignment assign(GroupSpec groupSpec, SubscribedTopicDescriber subscribedTopicDescriber) {
return new GroupAssignment(groupSpec.memberIds()
.stream()
.collect(Collectors.toMap(
memberId -> memberId,
memberId -> new MemberAssignment(groupSpec.memberAssignment(memberId))
)));
.collect(Collectors.toMap(Function.identity(), groupSpec::memberAssignment)));
}
}

View File

@ -17,8 +17,13 @@
package org.apache.kafka.coordinator.group.assignor;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.api.assignor.GroupSpec;
import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException;
import org.apache.kafka.coordinator.group.consumer.Assignment;
import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
import org.apache.kafka.coordinator.group.consumer.GroupSpecImpl;
import org.apache.kafka.coordinator.group.consumer.MemberSubscriptionAndAssignmentImpl;
import org.apache.kafka.coordinator.group.consumer.SubscribedTopicDescriberImpl;
import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
import org.junit.jupiter.api.Test;
@ -35,7 +40,7 @@ import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.invertedTargetAssignment;
import static org.apache.kafka.coordinator.group.CoordinatorRecordHelpersTest.mkMapOfPartitionRacks;
import static org.apache.kafka.coordinator.group.assignor.SubscriptionType.HETEROGENEOUS;
import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HETEROGENEOUS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
@ -55,7 +60,7 @@ public class GeneralUniformAssignmentBuilderTest {
@Test
public void testTwoMembersNoTopicSubscription() {
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(
Collections.singletonMap(
topic1Uuid,
new TopicMetadata(
@ -67,13 +72,13 @@ public class GeneralUniformAssignmentBuilderTest {
)
);
Map<String, MemberSubscriptionSpecImpl> members = new TreeMap<>();
members.put(memberA, new MemberSubscriptionSpecImpl(
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
members.put(memberA, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
Collections.emptySet(),
Assignment.EMPTY
));
members.put(memberB, new MemberSubscriptionSpecImpl(
members.put(memberB, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
Collections.emptySet(),
Assignment.EMPTY
@ -95,7 +100,7 @@ public class GeneralUniformAssignmentBuilderTest {
@Test
public void testTwoMembersSubscribedToNonexistentTopics() {
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(
Collections.singletonMap(
topic1Uuid,
new TopicMetadata(
@ -107,13 +112,13 @@ public class GeneralUniformAssignmentBuilderTest {
)
);
Map<String, MemberSubscriptionSpecImpl> members = new TreeMap<>();
members.put(memberA, new MemberSubscriptionSpecImpl(
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
members.put(memberA, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
Collections.singleton(topic3Uuid),
Assignment.EMPTY
));
members.put(memberB, new MemberSubscriptionSpecImpl(
members.put(memberB, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
Collections.singleton(topic2Uuid),
Assignment.EMPTY
@ -125,7 +130,8 @@ public class GeneralUniformAssignmentBuilderTest {
Collections.emptyMap()
);
assertThrows(PartitionAssignorException.class,
assertThrows(
PartitionAssignorException.class,
() -> assignor.assign(groupSpec, subscribedTopicMetadata));
}
@ -145,15 +151,15 @@ public class GeneralUniformAssignmentBuilderTest {
mkMapOfPartitionRacks(6)
));
Map<String, MemberSubscriptionSpecImpl> members = new TreeMap<>();
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
members.put(memberA, new MemberSubscriptionSpecImpl(
members.put(memberA, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
mkSet(topic1Uuid, topic3Uuid),
Assignment.EMPTY
));
members.put(memberB, new MemberSubscriptionSpecImpl(
members.put(memberB, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
Collections.singleton(topic3Uuid),
Assignment.EMPTY
@ -164,7 +170,7 @@ public class GeneralUniformAssignmentBuilderTest {
HETEROGENEOUS,
Collections.emptyMap()
);
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
@ -199,21 +205,21 @@ public class GeneralUniformAssignmentBuilderTest {
mkMapOfPartitionRacks(2)
));
Map<String, MemberSubscriptionSpecImpl> members = new TreeMap<>();
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
members.put(memberA, new MemberSubscriptionSpecImpl(
members.put(memberA, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
Collections.singleton(topic3Uuid),
Assignment.EMPTY
));
members.put(memberB, new MemberSubscriptionSpecImpl(
members.put(memberB, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
Collections.singleton(topic3Uuid),
Assignment.EMPTY
));
members.put(memberC, new MemberSubscriptionSpecImpl(
members.put(memberC, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
Collections.singleton(topic1Uuid),
Assignment.EMPTY
@ -224,7 +230,7 @@ public class GeneralUniformAssignmentBuilderTest {
HETEROGENEOUS,
Collections.emptyMap()
);
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
@ -268,9 +274,9 @@ public class GeneralUniformAssignmentBuilderTest {
mkMapOfPartitionRacks(4)
));
Map<String, MemberSubscriptionSpecImpl> members = new TreeMap<>();
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
members.put(memberA, new MemberSubscriptionSpecImpl(
members.put(memberA, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
Collections.singleton(topic1Uuid),
new Assignment(mkAssignment(
@ -278,7 +284,7 @@ public class GeneralUniformAssignmentBuilderTest {
))
));
members.put(memberB, new MemberSubscriptionSpecImpl(
members.put(memberB, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
new Assignment(mkAssignment(
@ -287,7 +293,7 @@ public class GeneralUniformAssignmentBuilderTest {
))
));
members.put(memberC, new MemberSubscriptionSpecImpl(
members.put(memberC, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid, topic3Uuid),
new Assignment(mkAssignment(
@ -302,7 +308,7 @@ public class GeneralUniformAssignmentBuilderTest {
HETEROGENEOUS,
invertedTargetAssignment(members)
);
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
@ -353,9 +359,9 @@ public class GeneralUniformAssignmentBuilderTest {
mkMapOfPartitionRacks(3)
));
Map<String, MemberSubscriptionSpecImpl> members = new TreeMap<>();
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
members.put(memberA, new MemberSubscriptionSpecImpl(
members.put(memberA, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
mkSet(topic1Uuid, topic3Uuid),
new Assignment(mkAssignment(
@ -364,7 +370,7 @@ public class GeneralUniformAssignmentBuilderTest {
))
));
members.put(memberB, new MemberSubscriptionSpecImpl(
members.put(memberB, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid, topic3Uuid, topic4Uuid),
new Assignment(mkAssignment(
@ -378,7 +384,7 @@ public class GeneralUniformAssignmentBuilderTest {
HETEROGENEOUS,
invertedTargetAssignment(members)
);
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
@ -415,9 +421,9 @@ public class GeneralUniformAssignmentBuilderTest {
mkMapOfPartitionRacks(7)
));
Map<String, MemberSubscriptionSpecImpl> members = new TreeMap<>();
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
members.put(memberA, new MemberSubscriptionSpecImpl(
members.put(memberA, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
Collections.singleton(topic1Uuid),
new Assignment(mkAssignment(
@ -426,7 +432,7 @@ public class GeneralUniformAssignmentBuilderTest {
))
));
members.put(memberB, new MemberSubscriptionSpecImpl(
members.put(memberB, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
new Assignment(mkAssignment(
@ -436,7 +442,7 @@ public class GeneralUniformAssignmentBuilderTest {
));
// Add a new member to trigger a re-assignment.
members.put(memberC, new MemberSubscriptionSpecImpl(
members.put(memberC, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
Assignment.EMPTY
@ -447,7 +453,7 @@ public class GeneralUniformAssignmentBuilderTest {
HETEROGENEOUS,
invertedTargetAssignment(members)
);
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
@ -491,9 +497,9 @@ public class GeneralUniformAssignmentBuilderTest {
mkMapOfPartitionRacks(3)
));
Map<String, MemberSubscriptionSpecImpl> members = new TreeMap<>();
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
members.put(memberA, new MemberSubscriptionSpecImpl(
members.put(memberA, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
mkSet(topic1Uuid, topic3Uuid),
new Assignment(mkAssignment(
@ -502,7 +508,7 @@ public class GeneralUniformAssignmentBuilderTest {
))
));
members.put(memberB, new MemberSubscriptionSpecImpl(
members.put(memberB, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
Collections.singleton(topic2Uuid),
new Assignment(mkAssignment(
@ -517,7 +523,7 @@ public class GeneralUniformAssignmentBuilderTest {
HETEROGENEOUS,
invertedTargetAssignment(members)
);
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
@ -553,9 +559,9 @@ public class GeneralUniformAssignmentBuilderTest {
));
// Initial subscriptions were [T1, T2]
Map<String, MemberSubscriptionSpecImpl> members = new TreeMap<>();
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
members.put(memberA, new MemberSubscriptionSpecImpl(
members.put(memberA, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
Collections.singleton(topic1Uuid),
new Assignment(mkAssignment(
@ -564,7 +570,7 @@ public class GeneralUniformAssignmentBuilderTest {
))
));
members.put(memberB, new MemberSubscriptionSpecImpl(
members.put(memberB, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
new Assignment(mkAssignment(
@ -578,7 +584,7 @@ public class GeneralUniformAssignmentBuilderTest {
HETEROGENEOUS,
invertedTargetAssignment(members)
);
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,

View File

@ -17,7 +17,10 @@
package org.apache.kafka.coordinator.group.assignor;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.api.assignor.SubscriptionType;
import org.apache.kafka.coordinator.group.consumer.Assignment;
import org.apache.kafka.coordinator.group.consumer.GroupSpecImpl;
import org.apache.kafka.coordinator.group.consumer.MemberSubscriptionAndAssignmentImpl;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@ -36,7 +39,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class GroupSpecImplTest {
private static final String TEST_MEMBER = "test-member";
private Map<String, MemberSubscriptionSpecImpl> members;
private Map<String, MemberSubscriptionAndAssignmentImpl> members;
private SubscriptionType subscriptionType;
private Map<Uuid, Map<Integer, String>> invertedTargetAssignment;
private GroupSpecImpl groupSpec;
@ -49,7 +52,7 @@ public class GroupSpecImplTest {
invertedTargetAssignment = new HashMap<>();
topicId = Uuid.randomUuid();
members.put(TEST_MEMBER, new MemberSubscriptionSpecImpl(
members.put(TEST_MEMBER, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
mkSet(topicId),
Assignment.EMPTY
@ -96,13 +99,13 @@ public class GroupSpecImplTest {
topicId,
mkSet(0, 1)
);
members.put(TEST_MEMBER, new MemberSubscriptionSpecImpl(
members.put(TEST_MEMBER, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
mkSet(topicId),
new Assignment(topicPartitions)
));
assertEquals(topicPartitions, groupSpec.memberAssignment(TEST_MEMBER));
assertEquals(Collections.emptyMap(), groupSpec.memberAssignment("unknown-member"));
assertEquals(topicPartitions, groupSpec.memberAssignment(TEST_MEMBER).partitions());
assertEquals(Collections.emptyMap(), groupSpec.memberAssignment("unknown-member").partitions());
}
}

View File

@ -17,8 +17,13 @@
package org.apache.kafka.coordinator.group.assignor;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.api.assignor.GroupSpec;
import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException;
import org.apache.kafka.coordinator.group.consumer.Assignment;
import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
import org.apache.kafka.coordinator.group.consumer.GroupSpecImpl;
import org.apache.kafka.coordinator.group.consumer.MemberSubscriptionAndAssignmentImpl;
import org.apache.kafka.coordinator.group.consumer.SubscribedTopicDescriberImpl;
import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
import org.junit.jupiter.api.Test;
@ -39,7 +44,7 @@ import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkOrderedAss
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.invertedTargetAssignment;
import static org.apache.kafka.coordinator.group.CoordinatorRecordHelpersTest.mkMapOfPartitionRacks;
import static org.apache.kafka.coordinator.group.assignor.SubscriptionType.HOMOGENEOUS;
import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -58,7 +63,7 @@ public class OptimizedUniformAssignmentBuilderTest {
@Test
public void testOneMemberNoTopicSubscription() {
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(
Collections.singletonMap(
topic1Uuid,
new TopicMetadata(
@ -70,9 +75,9 @@ public class OptimizedUniformAssignmentBuilderTest {
)
);
Map<String, MemberSubscriptionSpecImpl> members = Collections.singletonMap(
Map<String, MemberSubscriptionAndAssignmentImpl> members = Collections.singletonMap(
memberA,
new MemberSubscriptionSpecImpl(
new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
Collections.emptySet(),
Assignment.EMPTY
@ -95,7 +100,7 @@ public class OptimizedUniformAssignmentBuilderTest {
@Test
public void testOneMemberSubscribedToNonexistentTopic() {
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(
Collections.singletonMap(
topic1Uuid,
new TopicMetadata(
@ -107,9 +112,9 @@ public class OptimizedUniformAssignmentBuilderTest {
)
);
Map<String, MemberSubscriptionSpecImpl> members = Collections.singletonMap(
Map<String, MemberSubscriptionAndAssignmentImpl> members = Collections.singletonMap(
memberA,
new MemberSubscriptionSpecImpl(
new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
Collections.singleton(topic2Uuid),
Assignment.EMPTY
@ -142,15 +147,15 @@ public class OptimizedUniformAssignmentBuilderTest {
mkMapOfPartitionRacks(2)
));
Map<String, MemberSubscriptionSpecImpl> members = new TreeMap<>();
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
members.put(memberA, new MemberSubscriptionSpecImpl(
members.put(memberA, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
mkSet(topic1Uuid, topic3Uuid),
Assignment.EMPTY
));
members.put(memberB, new MemberSubscriptionSpecImpl(
members.put(memberB, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
mkSet(topic1Uuid, topic3Uuid),
Assignment.EMPTY
@ -170,7 +175,7 @@ public class OptimizedUniformAssignmentBuilderTest {
HOMOGENEOUS,
Collections.emptyMap()
);
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
@ -191,21 +196,21 @@ public class OptimizedUniformAssignmentBuilderTest {
mkMapOfPartitionRacks(2)
));
Map<String, MemberSubscriptionSpecImpl> members = new TreeMap<>();
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
members.put(memberA, new MemberSubscriptionSpecImpl(
members.put(memberA, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
Collections.singleton(topic3Uuid),
Assignment.EMPTY
));
members.put(memberB, new MemberSubscriptionSpecImpl(
members.put(memberB, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
Collections.singleton(topic3Uuid),
Assignment.EMPTY
));
members.put(memberC, new MemberSubscriptionSpecImpl(
members.put(memberC, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
Collections.singleton(topic3Uuid),
Assignment.EMPTY
@ -228,7 +233,7 @@ public class OptimizedUniformAssignmentBuilderTest {
HOMOGENEOUS,
Collections.emptyMap()
);
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
@ -252,9 +257,9 @@ public class OptimizedUniformAssignmentBuilderTest {
));
}
Map<String, MemberSubscriptionSpecImpl> members = new TreeMap<>();
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
for (int i = 1; i < 50; i++) {
members.put("member" + i, new MemberSubscriptionSpecImpl(
members.put("member" + i, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
topicMetadata.keySet(),
Assignment.EMPTY
@ -266,7 +271,7 @@ public class OptimizedUniformAssignmentBuilderTest {
HOMOGENEOUS,
Collections.emptyMap()
);
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
@ -292,9 +297,9 @@ public class OptimizedUniformAssignmentBuilderTest {
mkMapOfPartitionRacks(3)
));
Map<String, MemberSubscriptionSpecImpl> members = new TreeMap<>();
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
members.put(memberA, new MemberSubscriptionSpecImpl(
members.put(memberA, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
new Assignment(mkOrderedAssignment(
@ -303,7 +308,7 @@ public class OptimizedUniformAssignmentBuilderTest {
))
));
members.put(memberB, new MemberSubscriptionSpecImpl(
members.put(memberB, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
new Assignment(mkOrderedAssignment(
@ -327,7 +332,7 @@ public class OptimizedUniformAssignmentBuilderTest {
HOMOGENEOUS,
invertedTargetAssignment(members)
);
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
@ -355,9 +360,9 @@ public class OptimizedUniformAssignmentBuilderTest {
mkMapOfPartitionRacks(5)
));
Map<String, MemberSubscriptionSpecImpl> members = new TreeMap<>();
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
members.put(memberA, new MemberSubscriptionSpecImpl(
members.put(memberA, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
new Assignment(mkOrderedAssignment(
@ -366,7 +371,7 @@ public class OptimizedUniformAssignmentBuilderTest {
))
));
members.put(memberB, new MemberSubscriptionSpecImpl(
members.put(memberB, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
new Assignment(mkOrderedAssignment(
@ -390,7 +395,7 @@ public class OptimizedUniformAssignmentBuilderTest {
HOMOGENEOUS,
invertedTargetAssignment(members)
);
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
@ -417,9 +422,9 @@ public class OptimizedUniformAssignmentBuilderTest {
mkMapOfPartitionRacks(3)
));
Map<String, MemberSubscriptionSpecImpl> members = new TreeMap<>();
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
members.put(memberA, new MemberSubscriptionSpecImpl(
members.put(memberA, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
new Assignment(mkOrderedAssignment(
@ -428,7 +433,7 @@ public class OptimizedUniformAssignmentBuilderTest {
))
));
members.put(memberB, new MemberSubscriptionSpecImpl(
members.put(memberB, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
new Assignment(mkOrderedAssignment(
@ -438,7 +443,7 @@ public class OptimizedUniformAssignmentBuilderTest {
));
// Add a new member to trigger a re-assignment.
members.put(memberC, new MemberSubscriptionSpecImpl(
members.put(memberC, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
Assignment.EMPTY
@ -461,7 +466,7 @@ public class OptimizedUniformAssignmentBuilderTest {
HOMOGENEOUS,
invertedTargetAssignment(members)
);
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
@ -488,9 +493,9 @@ public class OptimizedUniformAssignmentBuilderTest {
mkMapOfPartitionRacks(3)
));
Map<String, MemberSubscriptionSpecImpl> members = new TreeMap<>();
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
members.put(memberA, new MemberSubscriptionSpecImpl(
members.put(memberA, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
new Assignment(mkAssignment(
@ -499,7 +504,7 @@ public class OptimizedUniformAssignmentBuilderTest {
))
));
members.put(memberB, new MemberSubscriptionSpecImpl(
members.put(memberB, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
new Assignment(mkAssignment(
@ -525,7 +530,7 @@ public class OptimizedUniformAssignmentBuilderTest {
HOMOGENEOUS,
invertedTargetAssignment(members)
);
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
@ -553,9 +558,9 @@ public class OptimizedUniformAssignmentBuilderTest {
));
// Initial subscriptions were [T1, T2]
Map<String, MemberSubscriptionSpecImpl> members = new TreeMap<>();
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
members.put(memberA, new MemberSubscriptionSpecImpl(
members.put(memberA, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
Collections.singleton(topic2Uuid),
new Assignment(mkAssignment(
@ -564,7 +569,7 @@ public class OptimizedUniformAssignmentBuilderTest {
))
));
members.put(memberB, new MemberSubscriptionSpecImpl(
members.put(memberB, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
Collections.singleton(topic2Uuid),
new Assignment(mkAssignment(
@ -586,7 +591,7 @@ public class OptimizedUniformAssignmentBuilderTest {
HOMOGENEOUS,
invertedTargetAssignment(members)
);
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
@ -610,7 +615,7 @@ public class OptimizedUniformAssignmentBuilderTest {
* @param computedGroupAssignment Assignment computed by the uniform assignor.
*/
private void checkValidityAndBalance(
Map<String, MemberSubscriptionSpecImpl> memberSubscriptionSpec,
Map<String, MemberSubscriptionAndAssignmentImpl> memberSubscriptionSpec,
GroupAssignment computedGroupAssignment
) {
List<String> membersList = new ArrayList<>(computedGroupAssignment.members().keySet());
@ -618,7 +623,7 @@ public class OptimizedUniformAssignmentBuilderTest {
List<Integer> totalAssignmentSizesOfAllMembers = new ArrayList<>(membersList.size());
membersList.forEach(member -> {
Map<Uuid, Set<Integer>> computedAssignmentForMember = computedGroupAssignment
.members().get(member).targetPartitions();
.members().get(member).partitions();
int sum = computedAssignmentForMember.values().stream().mapToInt(Set::size).sum();
totalAssignmentSizesOfAllMembers.add(sum);
});
@ -626,7 +631,7 @@ public class OptimizedUniformAssignmentBuilderTest {
for (int i = 0; i < numMembers; i++) {
String memberId = membersList.get(i);
Map<Uuid, Set<Integer>> computedAssignmentForMember =
computedGroupAssignment.members().get(memberId).targetPartitions();
computedGroupAssignment.members().get(memberId).partitions();
// Each member is subscribed to topics of all the partitions assigned to it.
computedAssignmentForMember.keySet().forEach(topicId -> {
// Check if the topic exists in the subscription.
@ -638,7 +643,7 @@ public class OptimizedUniformAssignmentBuilderTest {
for (int j = i + 1; j < numMembers; j++) {
String otherMemberId = membersList.get(j);
Map<Uuid, Set<Integer>> computedAssignmentForOtherMember = computedGroupAssignment
.members().get(otherMemberId).targetPartitions();
.members().get(otherMemberId).partitions();
// Each partition should be assigned to at most one member
computedAssignmentForMember.keySet().forEach(topicId -> {
Set<Integer> intersection = new HashSet<>();

View File

@ -17,8 +17,13 @@
package org.apache.kafka.coordinator.group.assignor;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.api.assignor.GroupSpec;
import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException;
import org.apache.kafka.coordinator.group.consumer.Assignment;
import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
import org.apache.kafka.coordinator.group.consumer.GroupSpecImpl;
import org.apache.kafka.coordinator.group.consumer.MemberSubscriptionAndAssignmentImpl;
import org.apache.kafka.coordinator.group.consumer.SubscribedTopicDescriberImpl;
import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
import org.junit.jupiter.api.Test;
@ -33,8 +38,8 @@ import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.invertedTargetAssignment;
import static org.apache.kafka.coordinator.group.assignor.SubscriptionType.HETEROGENEOUS;
import static org.apache.kafka.coordinator.group.assignor.SubscriptionType.HOMOGENEOUS;
import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HETEROGENEOUS;
import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
@ -53,7 +58,7 @@ public class RangeAssignorTest {
@Test
public void testOneConsumerNoTopic() {
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(
Collections.singletonMap(
topic1Uuid,
new TopicMetadata(
@ -65,9 +70,9 @@ public class RangeAssignorTest {
)
);
Map<String, MemberSubscriptionSpecImpl> members = Collections.singletonMap(
Map<String, MemberSubscriptionAndAssignmentImpl> members = Collections.singletonMap(
memberA,
new MemberSubscriptionSpecImpl(
new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
Collections.emptySet(),
Assignment.EMPTY
@ -90,7 +95,7 @@ public class RangeAssignorTest {
@Test
public void testOneConsumerSubscribedToNonExistentTopic() {
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(
Collections.singletonMap(
topic1Uuid,
new TopicMetadata(
@ -102,9 +107,9 @@ public class RangeAssignorTest {
)
);
Map<String, MemberSubscriptionSpecImpl> members = Collections.singletonMap(
Map<String, MemberSubscriptionAndAssignmentImpl> members = Collections.singletonMap(
memberA,
new MemberSubscriptionSpecImpl(
new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
mkSet(topic2Uuid),
Assignment.EMPTY
@ -137,15 +142,15 @@ public class RangeAssignorTest {
Collections.emptyMap()
));
Map<String, MemberSubscriptionSpecImpl> members = new TreeMap<>();
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
members.put(memberA, new MemberSubscriptionSpecImpl(
members.put(memberA, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
mkSet(topic1Uuid, topic3Uuid),
Assignment.EMPTY
));
members.put(memberB, new MemberSubscriptionSpecImpl(
members.put(memberB, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
mkSet(topic1Uuid, topic3Uuid),
Assignment.EMPTY
@ -156,7 +161,7 @@ public class RangeAssignorTest {
HOMOGENEOUS,
invertedTargetAssignment(members)
);
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
@ -198,21 +203,21 @@ public class RangeAssignorTest {
Collections.emptyMap()
));
Map<String, MemberSubscriptionSpecImpl> members = new TreeMap<>();
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
members.put(memberA, new MemberSubscriptionSpecImpl(
members.put(memberA, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
Assignment.EMPTY
));
members.put(memberB, new MemberSubscriptionSpecImpl(
members.put(memberB, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
mkSet(topic3Uuid),
Assignment.EMPTY
));
members.put(memberC, new MemberSubscriptionSpecImpl(
members.put(memberC, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
mkSet(topic2Uuid, topic3Uuid),
Assignment.EMPTY
@ -223,7 +228,7 @@ public class RangeAssignorTest {
HETEROGENEOUS,
invertedTargetAssignment(members)
);
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
@ -262,21 +267,21 @@ public class RangeAssignorTest {
Collections.emptyMap()
));
Map<String, MemberSubscriptionSpecImpl> members = new TreeMap<>();
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
members.put(memberA, new MemberSubscriptionSpecImpl(
members.put(memberA, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
mkSet(topic1Uuid, topic3Uuid),
Assignment.EMPTY
));
members.put(memberB, new MemberSubscriptionSpecImpl(
members.put(memberB, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
mkSet(topic1Uuid, topic3Uuid),
Assignment.EMPTY
));
members.put(memberC, new MemberSubscriptionSpecImpl(
members.put(memberC, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
mkSet(topic1Uuid, topic3Uuid),
Assignment.EMPTY
@ -287,7 +292,7 @@ public class RangeAssignorTest {
HOMOGENEOUS,
invertedTargetAssignment(members)
);
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
@ -327,9 +332,9 @@ public class RangeAssignorTest {
Collections.emptyMap()
));
Map<String, MemberSubscriptionSpecImpl> members = new TreeMap<>();
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
members.put(memberA, new MemberSubscriptionSpecImpl(
members.put(memberA, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
new Assignment(mkAssignment(
@ -338,7 +343,7 @@ public class RangeAssignorTest {
))
));
members.put(memberB, new MemberSubscriptionSpecImpl(
members.put(memberB, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
new Assignment(mkAssignment(
@ -348,7 +353,7 @@ public class RangeAssignorTest {
));
// Add a new consumer to trigger a re-assignment
members.put(memberC, new MemberSubscriptionSpecImpl(
members.put(memberC, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
Assignment.EMPTY
@ -359,7 +364,7 @@ public class RangeAssignorTest {
HOMOGENEOUS,
invertedTargetAssignment(members)
);
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
@ -398,9 +403,9 @@ public class RangeAssignorTest {
Collections.emptyMap()
));
Map<String, MemberSubscriptionSpecImpl> members = new TreeMap<>();
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
members.put(memberA, new MemberSubscriptionSpecImpl(
members.put(memberA, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
new Assignment(mkAssignment(
@ -409,7 +414,7 @@ public class RangeAssignorTest {
))
));
members.put(memberB, new MemberSubscriptionSpecImpl(
members.put(memberB, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
new Assignment(mkAssignment(
@ -423,7 +428,7 @@ public class RangeAssignorTest {
HOMOGENEOUS,
invertedTargetAssignment(members)
);
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
@ -459,9 +464,9 @@ public class RangeAssignorTest {
Collections.emptyMap()
));
Map<String, MemberSubscriptionSpecImpl> members = new TreeMap<>();
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
members.put(memberA, new MemberSubscriptionSpecImpl(
members.put(memberA, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
new Assignment(mkAssignment(
@ -470,7 +475,7 @@ public class RangeAssignorTest {
))
));
members.put(memberB, new MemberSubscriptionSpecImpl(
members.put(memberB, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
new Assignment(mkAssignment(
@ -480,7 +485,7 @@ public class RangeAssignorTest {
));
// Add a new consumer to trigger a re-assignment
members.put(memberC, new MemberSubscriptionSpecImpl(
members.put(memberC, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
Assignment.EMPTY
@ -491,7 +496,7 @@ public class RangeAssignorTest {
HOMOGENEOUS,
invertedTargetAssignment(members)
);
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
@ -532,9 +537,9 @@ public class RangeAssignorTest {
Collections.emptyMap()
));
Map<String, MemberSubscriptionSpecImpl> members = new TreeMap<>();
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
members.put(memberA, new MemberSubscriptionSpecImpl(
members.put(memberA, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
new Assignment(mkAssignment(
@ -543,7 +548,7 @@ public class RangeAssignorTest {
))
));
members.put(memberB, new MemberSubscriptionSpecImpl(
members.put(memberB, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
new Assignment(mkAssignment(
@ -553,7 +558,7 @@ public class RangeAssignorTest {
));
// Add a new consumer to trigger a re-assignment
members.put(memberC, new MemberSubscriptionSpecImpl(
members.put(memberC, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
mkSet(topic1Uuid),
Assignment.EMPTY
@ -564,7 +569,7 @@ public class RangeAssignorTest {
HETEROGENEOUS,
invertedTargetAssignment(members)
);
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
@ -603,11 +608,11 @@ public class RangeAssignorTest {
Collections.emptyMap()
));
Map<String, MemberSubscriptionSpecImpl> members = new TreeMap<>();
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
// Consumer A was removed
members.put(memberB, new MemberSubscriptionSpecImpl(
members.put(memberB, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
new Assignment(mkAssignment(
@ -621,7 +626,7 @@ public class RangeAssignorTest {
HOMOGENEOUS,
invertedTargetAssignment(members)
);
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
@ -661,9 +666,9 @@ public class RangeAssignorTest {
// Let initial subscriptions be A -> T1, T2 // B -> T2 // C -> T2, T3
// Change the subscriptions to A -> T1 // B -> T1, T2, T3 // C -> T2
Map<String, MemberSubscriptionSpecImpl> members = new TreeMap<>();
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
members.put(memberA, new MemberSubscriptionSpecImpl(
members.put(memberA, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
mkSet(topic1Uuid),
new Assignment(mkAssignment(
@ -672,7 +677,7 @@ public class RangeAssignorTest {
))
));
members.put(memberB, new MemberSubscriptionSpecImpl(
members.put(memberB, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid, topic3Uuid),
new Assignment(mkAssignment(
@ -680,7 +685,7 @@ public class RangeAssignorTest {
))
));
members.put(memberC, new MemberSubscriptionSpecImpl(
members.put(memberC, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
mkSet(topic2Uuid),
new Assignment(mkAssignment(
@ -694,7 +699,7 @@ public class RangeAssignorTest {
HETEROGENEOUS,
invertedTargetAssignment(members)
);
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
@ -723,7 +728,7 @@ public class RangeAssignorTest {
) {
assertEquals(expectedAssignment.size(), computedGroupAssignment.members().size());
for (String memberId : computedGroupAssignment.members().keySet()) {
Map<Uuid, Set<Integer>> computedAssignmentForMember = computedGroupAssignment.members().get(memberId).targetPartitions();
Map<Uuid, Set<Integer>> computedAssignmentForMember = computedGroupAssignment.members().get(memberId).partitions();
assertEquals(expectedAssignment.get(memberId), computedAssignmentForMember);
}
}

View File

@ -55,8 +55,8 @@ import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
import static org.apache.kafka.coordinator.group.CoordinatorRecordHelpersTest.mkMapOfPartitionRacks;
import static org.apache.kafka.coordinator.group.assignor.SubscriptionType.HETEROGENEOUS;
import static org.apache.kafka.coordinator.group.assignor.SubscriptionType.HOMOGENEOUS;
import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HETEROGENEOUS;
import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;

View File

@ -33,7 +33,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
public class SubscribedTopicMetadataTest {
private Map<Uuid, TopicMetadata> topicMetadataMap;
private SubscribedTopicMetadata subscribedTopicMetadata;
private SubscribedTopicDescriberImpl subscribedTopicMetadata;
@BeforeEach
public void setUp() {
@ -47,7 +47,7 @@ public class SubscribedTopicMetadataTest {
new TopicMetadata(topicId, topicName, 5, partitionRacks)
);
}
subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadataMap);
subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadataMap);
}
@Test
@ -57,7 +57,7 @@ public class SubscribedTopicMetadataTest {
@Test
public void testTopicMetadataCannotBeNull() {
assertThrows(NullPointerException.class, () -> new SubscribedTopicMetadata(null));
assertThrows(NullPointerException.class, () -> new SubscribedTopicDescriberImpl(null));
}
@Test
@ -100,11 +100,11 @@ public class SubscribedTopicMetadataTest {
@Test
public void testEquals() {
assertEquals(new SubscribedTopicMetadata(topicMetadataMap), subscribedTopicMetadata);
assertEquals(new SubscribedTopicDescriberImpl(topicMetadataMap), subscribedTopicMetadata);
Map<Uuid, TopicMetadata> topicMetadataMap2 = new HashMap<>();
Uuid topicId = Uuid.randomUuid();
topicMetadataMap2.put(topicId, new TopicMetadata(topicId, "newTopic", 5, Collections.emptyMap()));
assertNotEquals(new SubscribedTopicMetadata(topicMetadataMap2), subscribedTopicMetadata);
assertNotEquals(new SubscribedTopicDescriberImpl(topicMetadataMap2), subscribedTopicMetadata);
}
}

View File

@ -19,13 +19,11 @@ package org.apache.kafka.coordinator.group.consumer;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.AssignmentTestUtil;
import org.apache.kafka.coordinator.group.MetadataImageBuilder;
import org.apache.kafka.coordinator.group.assignor.GroupSpecImpl;
import org.apache.kafka.coordinator.group.assignor.MemberSubscriptionSpec;
import org.apache.kafka.coordinator.group.assignor.MemberSubscriptionSpecImpl;
import org.apache.kafka.coordinator.group.assignor.SubscriptionType;
import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
import org.apache.kafka.coordinator.group.api.assignor.MemberSubscription;
import org.apache.kafka.coordinator.group.api.assignor.SubscriptionType;
import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignor;
import org.apache.kafka.image.TopicsImage;
import org.junit.jupiter.api.Test;
@ -44,8 +42,8 @@ import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssig
import static org.apache.kafka.coordinator.group.CoordinatorRecordHelpers.newTargetAssignmentEpochRecord;
import static org.apache.kafka.coordinator.group.CoordinatorRecordHelpers.newTargetAssignmentRecord;
import static org.apache.kafka.coordinator.group.CoordinatorRecordHelpersTest.mkMapOfPartitionRacks;
import static org.apache.kafka.coordinator.group.assignor.SubscriptionType.HOMOGENEOUS;
import static org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder.createMemberSubscriptionSpecImpl;
import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS;
import static org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder.createMemberSubscriptionAndAssignment;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
@ -159,17 +157,17 @@ public class TargetAssignmentBuilderTest {
String memberId,
Map<Uuid, Set<Integer>> assignment
) {
memberAssignments.put(memberId, new MemberAssignment(assignment));
memberAssignments.put(memberId, new MemberAssignmentImpl(assignment));
}
public TargetAssignmentBuilder.TargetAssignmentResult build() {
TopicsImage topicsImage = topicsImageBuilder.build().topics();
// Prepare expected member specs.
Map<String, MemberSubscriptionSpecImpl> memberSubscriptions = new HashMap<>();
Map<String, MemberSubscriptionAndAssignmentImpl> memberSubscriptions = new HashMap<>();
// All the existing members are prepared.
members.forEach((memberId, member) ->
memberSubscriptions.put(memberId, createMemberSubscriptionSpecImpl(
memberSubscriptions.put(memberId, createMemberSubscriptionAndAssignment(
member,
targetAssignment.getOrDefault(memberId, Assignment.EMPTY),
topicsImage
@ -192,7 +190,7 @@ public class TargetAssignmentBuilderTest {
}
}
memberSubscriptions.put(memberId, createMemberSubscriptionSpecImpl(
memberSubscriptions.put(memberId, createMemberSubscriptionAndAssignment(
updatedMemberOrNull,
assignment,
topicsImage
@ -206,7 +204,7 @@ public class TargetAssignmentBuilderTest {
topicMetadataMap.put(topicMetadata.id(), topicMetadata));
// Prepare the expected subscription topic metadata.
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadataMap);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadataMap);
SubscriptionType subscriptionType = HOMOGENEOUS;
// Prepare the member assignments per topic partition.
@ -277,13 +275,13 @@ public class TargetAssignmentBuilderTest {
mkTopicAssignment(barTopicId, 1, 2, 3)
));
MemberSubscriptionSpec subscriptionSpec = createMemberSubscriptionSpecImpl(
MemberSubscription subscriptionSpec = createMemberSubscriptionAndAssignment(
member,
assignment,
topicsImage
);
assertEquals(new MemberSubscriptionSpecImpl(
assertEquals(new MemberSubscriptionAndAssignmentImpl(
Optional.of("rackId"),
new TopicIds(mkSet("bar", "foo", "zar"), topicsImage),
assignment
@ -343,11 +341,11 @@ public class TargetAssignmentBuilderTest {
)), result.records());
Map<String, MemberAssignment> expectedAssignment = new HashMap<>();
expectedAssignment.put("member-1", new MemberAssignment(mkAssignment(
expectedAssignment.put("member-1", new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 1, 2, 3),
mkTopicAssignment(barTopicId, 1, 2, 3)
)));
expectedAssignment.put("member-2", new MemberAssignment(mkAssignment(
expectedAssignment.put("member-2", new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 4, 5, 6),
mkTopicAssignment(barTopicId, 4, 5, 6)
)));
@ -406,11 +404,11 @@ public class TargetAssignmentBuilderTest {
), result.records().get(2));
Map<String, MemberAssignment> expectedAssignment = new HashMap<>();
expectedAssignment.put("member-2", new MemberAssignment(mkAssignment(
expectedAssignment.put("member-2", new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 1, 2, 3),
mkTopicAssignment(barTopicId, 1, 2, 3)
)));
expectedAssignment.put("member-1", new MemberAssignment(mkAssignment(
expectedAssignment.put("member-1", new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 4, 5, 6),
mkTopicAssignment(barTopicId, 4, 5, 6)
)));
@ -480,15 +478,15 @@ public class TargetAssignmentBuilderTest {
), result.records().get(3));
Map<String, MemberAssignment> expectedAssignment = new HashMap<>();
expectedAssignment.put("member-1", new MemberAssignment(mkAssignment(
expectedAssignment.put("member-1", new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 1, 2),
mkTopicAssignment(barTopicId, 1, 2)
)));
expectedAssignment.put("member-2", new MemberAssignment(mkAssignment(
expectedAssignment.put("member-2", new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 3, 4),
mkTopicAssignment(barTopicId, 3, 4)
)));
expectedAssignment.put("member-3", new MemberAssignment(mkAssignment(
expectedAssignment.put("member-3", new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 5, 6),
mkTopicAssignment(barTopicId, 5, 6)
)));
@ -567,15 +565,15 @@ public class TargetAssignmentBuilderTest {
), result.records().get(3));
Map<String, MemberAssignment> expectedAssignment = new HashMap<>();
expectedAssignment.put("member-1", new MemberAssignment(mkAssignment(
expectedAssignment.put("member-1", new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 1, 2),
mkTopicAssignment(barTopicId, 1, 2)
)));
expectedAssignment.put("member-2", new MemberAssignment(mkAssignment(
expectedAssignment.put("member-2", new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 3, 4),
mkTopicAssignment(barTopicId, 3, 4)
)));
expectedAssignment.put("member-3", new MemberAssignment(mkAssignment(
expectedAssignment.put("member-3", new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 5, 6),
mkTopicAssignment(barTopicId, 5, 6)
)));
@ -645,15 +643,15 @@ public class TargetAssignmentBuilderTest {
), result.records().get(2));
Map<String, MemberAssignment> expectedAssignment = new HashMap<>();
expectedAssignment.put("member-1", new MemberAssignment(mkAssignment(
expectedAssignment.put("member-1", new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 1, 2),
mkTopicAssignment(barTopicId, 1, 2)
)));
expectedAssignment.put("member-2", new MemberAssignment(mkAssignment(
expectedAssignment.put("member-2", new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 3, 4, 5),
mkTopicAssignment(barTopicId, 3, 4, 5)
)));
expectedAssignment.put("member-3", new MemberAssignment(mkAssignment(
expectedAssignment.put("member-3", new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 6),
mkTopicAssignment(barTopicId, 6)
)));
@ -719,11 +717,11 @@ public class TargetAssignmentBuilderTest {
), result.records().get(2));
Map<String, MemberAssignment> expectedAssignment = new HashMap<>();
expectedAssignment.put("member-1", new MemberAssignment(mkAssignment(
expectedAssignment.put("member-1", new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 1, 2, 3),
mkTopicAssignment(barTopicId, 1, 2, 3)
)));
expectedAssignment.put("member-2", new MemberAssignment(mkAssignment(
expectedAssignment.put("member-2", new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 4, 5, 6),
mkTopicAssignment(barTopicId, 4, 5, 6)
)));
@ -794,16 +792,16 @@ public class TargetAssignmentBuilderTest {
), result.records().get(1));
Map<String, MemberAssignment> expectedAssignment = new HashMap<>();
expectedAssignment.put("member-1", new MemberAssignment(mkAssignment(
expectedAssignment.put("member-1", new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 1, 2),
mkTopicAssignment(barTopicId, 1, 2)
)));
expectedAssignment.put("member-2", new MemberAssignment(mkAssignment(
expectedAssignment.put("member-2", new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 3, 4),
mkTopicAssignment(barTopicId, 3, 4)
)));
expectedAssignment.put("member-3-a", new MemberAssignment(mkAssignment(
expectedAssignment.put("member-3-a", new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 5, 6),
mkTopicAssignment(barTopicId, 5, 6)
)));

View File

@ -19,8 +19,8 @@ package org.apache.kafka.jmh.assignor;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment;
import org.apache.kafka.image.MetadataDelta;
import java.util.Arrays;
@ -41,7 +41,7 @@ public class AssignorBenchmarkUtils {
Map<Uuid, Map<Integer, String>> invertedTargetAssignment = new HashMap<>();
for (Map.Entry<String, MemberAssignment> memberEntry : groupAssignment.members().entrySet()) {
String memberId = memberEntry.getKey();
Map<Uuid, Set<Integer>> topicsAndPartitions = memberEntry.getValue().targetPartitions();
Map<Uuid, Set<Integer>> topicsAndPartitions = memberEntry.getValue().partitions();
for (Map.Entry<Uuid, Set<Integer>> topicEntry : topicsAndPartitions.entrySet()) {
Uuid topicId = topicEntry.getKey();

View File

@ -23,7 +23,7 @@ import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.coordinator.group.assignor.SubscriptionType;
import org.apache.kafka.coordinator.group.api.assignor.SubscriptionType;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
@ -47,8 +47,8 @@ import java.util.Optional;
import java.util.concurrent.TimeUnit;
import static org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.DEFAULT_GENERATION;
import static org.apache.kafka.coordinator.group.assignor.SubscriptionType.HETEROGENEOUS;
import static org.apache.kafka.coordinator.group.assignor.SubscriptionType.HOMOGENEOUS;
import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HETEROGENEOUS;
import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS;
@State(Scope.Benchmark)
@Fork(value = 1)

View File

@ -17,18 +17,19 @@
package org.apache.kafka.jmh.assignor;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.assignor.GroupSpecImpl;
import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.assignor.MemberSubscriptionSpecImpl;
import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
import org.apache.kafka.coordinator.group.consumer.MemberAssignmentImpl;
import org.apache.kafka.coordinator.group.consumer.GroupSpecImpl;
import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.consumer.MemberSubscriptionAndAssignmentImpl;
import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignor;
import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
import org.apache.kafka.coordinator.group.assignor.SubscriptionType;
import org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber;
import org.apache.kafka.coordinator.group.api.assignor.SubscriptionType;
import org.apache.kafka.coordinator.group.consumer.Assignment;
import org.apache.kafka.coordinator.group.consumer.TopicIds;
import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
import org.apache.kafka.coordinator.group.consumer.SubscribedTopicDescriberImpl;
import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
import org.apache.kafka.image.MetadataDelta;
@ -61,8 +62,8 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import static org.apache.kafka.coordinator.group.assignor.SubscriptionType.HETEROGENEOUS;
import static org.apache.kafka.coordinator.group.assignor.SubscriptionType.HOMOGENEOUS;
import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HETEROGENEOUS;
import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS;
@State(Scope.Benchmark)
@Fork(value = 1)
@ -133,7 +134,7 @@ public class ServerSideAssignorBenchmark {
@Setup(Level.Trial)
public void setup() {
Map<Uuid, TopicMetadata> topicMetadata = createTopicMetadata();
subscribedTopicDescriber = new SubscribedTopicMetadata(topicMetadata);
subscribedTopicDescriber = new SubscribedTopicDescriberImpl(topicMetadata);
createGroupSpec();
@ -177,7 +178,7 @@ public class ServerSideAssignorBenchmark {
}
private void createGroupSpec() {
Map<String, MemberSubscriptionSpecImpl> members = new HashMap<>();
Map<String, MemberSubscriptionAndAssignmentImpl> members = new HashMap<>();
// In the rebalance case, we will add the last member as a trigger.
// This is done to keep the total members count consistent with the input.
@ -228,14 +229,14 @@ public class ServerSideAssignorBenchmark {
}
private void addMemberSpec(
Map<String, MemberSubscriptionSpecImpl> members,
Map<String, MemberSubscriptionAndAssignmentImpl> members,
int memberIndex,
Set<Uuid> subscribedTopicIds
) {
String memberId = "member" + memberIndex;
Optional<String> rackId = rackId(memberIndex);
members.put(memberId, new MemberSubscriptionSpecImpl(
members.put(memberId, new MemberSubscriptionAndAssignmentImpl(
rackId,
subscribedTopicIds,
Assignment.EMPTY
@ -260,18 +261,18 @@ public class ServerSideAssignorBenchmark {
Map<Uuid, Map<Integer, String>> invertedTargetAssignment = AssignorBenchmarkUtils.computeInvertedTargetAssignment(initialAssignment);
Map<String, MemberSubscriptionSpecImpl> updatedMemberSpec = new HashMap<>();
Map<String, MemberSubscriptionAndAssignmentImpl> updatedMemberSpec = new HashMap<>();
for (String memberId : groupSpec.memberIds()) {
MemberAssignment memberAssignment = members.getOrDefault(
memberId,
new MemberAssignment(Collections.emptyMap())
new MemberAssignmentImpl(Collections.emptyMap())
);
updatedMemberSpec.put(memberId, new MemberSubscriptionSpecImpl(
updatedMemberSpec.put(memberId, new MemberSubscriptionAndAssignmentImpl(
groupSpec.memberSubscription(memberId).rackId(),
groupSpec.memberSubscription(memberId).subscribedTopicIds(),
new Assignment(Collections.unmodifiableMap(memberAssignment.targetPartitions()))
new Assignment(Collections.unmodifiableMap(memberAssignment.partitions()))
));
}
@ -283,7 +284,7 @@ public class ServerSideAssignorBenchmark {
}
Optional<String> rackId = rackId(memberCount - 1);
updatedMemberSpec.put("newMember", new MemberSubscriptionSpecImpl(
updatedMemberSpec.put("newMember", new MemberSubscriptionAndAssignmentImpl(
rackId,
subscribedTopicIdsForNewMember,
Assignment.EMPTY

View File

@ -17,16 +17,16 @@
package org.apache.kafka.jmh.assignor;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.assignor.GroupSpecImpl;
import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.assignor.MemberSubscriptionSpecImpl;
import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
import org.apache.kafka.coordinator.group.consumer.GroupSpecImpl;
import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.consumer.MemberSubscriptionAndAssignmentImpl;
import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignor;
import org.apache.kafka.coordinator.group.consumer.TopicIds;
import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
import org.apache.kafka.coordinator.group.consumer.Assignment;
import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
import org.apache.kafka.coordinator.group.consumer.SubscribedTopicDescriberImpl;
import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder;
import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
@ -59,7 +59,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import static org.apache.kafka.coordinator.group.assignor.SubscriptionType.HOMOGENEOUS;
import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS;
@State(Scope.Benchmark)
@Fork(value = 1)
@ -175,7 +175,7 @@ public class TargetAssignmentBuilderBenchmark {
GroupAssignment groupAssignment = partitionAssignor.assign(
groupSpec,
new SubscribedTopicMetadata(topicMetadataMap)
new SubscribedTopicDescriberImpl(topicMetadataMap)
);
invertedTargetAssignment = AssignorBenchmarkUtils.computeInvertedTargetAssignment(groupAssignment);
@ -183,7 +183,7 @@ public class TargetAssignmentBuilderBenchmark {
for (Map.Entry<String, MemberAssignment> entry : groupAssignment.members().entrySet()) {
String memberId = entry.getKey();
Map<Uuid, Set<Integer>> topicPartitions = entry.getValue().targetPartitions();
Map<Uuid, Set<Integer>> topicPartitions = entry.getValue().partitions();
initialTargetAssignment.put(memberId, new Assignment(topicPartitions));
}
@ -191,12 +191,12 @@ public class TargetAssignmentBuilderBenchmark {
}
private void createAssignmentSpec() {
Map<String, MemberSubscriptionSpecImpl> members = new HashMap<>();
Map<String, MemberSubscriptionAndAssignmentImpl> members = new HashMap<>();
for (int i = 0; i < memberCount - 1; i++) {
String memberId = "member" + i;
members.put(memberId, new MemberSubscriptionSpecImpl(
members.put(memberId, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
new TopicIds(new HashSet<>(allTopicNames), topicsImage),
Assignment.EMPTY

View File

@ -61,6 +61,7 @@ include 'clients',
'examples',
'generator',
'group-coordinator',
'group-coordinator:group-coordinator-api',
'jmh-benchmarks',
'log4j-appender',
'metadata',