mirror of https://github.com/apache/kafka.git
KAFKA-14462; [10/N] Add TargetAssignmentBuilder (#13637)
This patch adds TargetAssignmentBuilder. It is responsible for computing a target assignment for a given group. Reviewers: Ritika Reddy <rreddy@confluent.io>, Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
This commit is contained in:
parent
f44ee4fab7
commit
16fc8e1cff
|
@ -38,8 +38,8 @@ import java.util.stream.Collectors;
|
||||||
*/
|
*/
|
||||||
public class ConsumerGroupMember {
|
public class ConsumerGroupMember {
|
||||||
/**
|
/**
|
||||||
* A builder allowing to create a new member or update an
|
* A builder that facilitates the creation of a new member or the update of
|
||||||
* existing one.
|
* an existing one.
|
||||||
*
|
*
|
||||||
* Please refer to the javadoc of {{@link ConsumerGroupMember}} for the
|
* Please refer to the javadoc of {{@link ConsumerGroupMember}} for the
|
||||||
* definition of the fields.
|
* definition of the fields.
|
||||||
|
@ -521,7 +521,7 @@ public class ConsumerGroupMember {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return The set of partitions awaiting assigning to the member.
|
* @return The set of partitions awaiting assignment to the member.
|
||||||
*/
|
*/
|
||||||
public Map<Uuid, Set<Integer>> partitionsPendingAssignment() {
|
public Map<Uuid, Set<Integer>> partitionsPendingAssignment() {
|
||||||
return partitionsPendingAssignment;
|
return partitionsPendingAssignment;
|
||||||
|
|
|
@ -0,0 +1,327 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.kafka.coordinator.group.consumer;
|
||||||
|
|
||||||
|
import org.apache.kafka.common.Uuid;
|
||||||
|
import org.apache.kafka.coordinator.group.Record;
|
||||||
|
import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
|
||||||
|
import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
|
||||||
|
import org.apache.kafka.coordinator.group.assignor.AssignmentTopicMetadata;
|
||||||
|
import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
|
||||||
|
import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
|
||||||
|
import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
|
||||||
|
import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Objects;
|
||||||
|
import java.util.Optional;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentEpochRecord;
|
||||||
|
import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentRecord;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Build a new Target Assignment based on the provided parameters. As a result,
|
||||||
|
* it yields the records that must be persisted to the log and the new member
|
||||||
|
* assignments as a map.
|
||||||
|
*
|
||||||
|
* Records are only created for members which have a new target assignment. If
|
||||||
|
* their assignment did not change, no new record is needed.
|
||||||
|
*
|
||||||
|
* When a member is deleted, it is assumed that its target assignment record
|
||||||
|
* is deleted as part of the member deletion process. In other words, this class
|
||||||
|
* does not yield a tombstone for removed members.
|
||||||
|
*/
|
||||||
|
public class TargetAssignmentBuilder {
|
||||||
|
/**
|
||||||
|
* The assignment result returned by {{@link TargetAssignmentBuilder#build()}}.
|
||||||
|
*/
|
||||||
|
public static class TargetAssignmentResult {
|
||||||
|
/**
|
||||||
|
* The records that must be applied to the __consumer_offsets
|
||||||
|
* topics to persist the new target assignment.
|
||||||
|
*/
|
||||||
|
private final List<Record> records;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The new target assignment for the group.
|
||||||
|
*/
|
||||||
|
private final Map<String, Assignment> targetAssignment;
|
||||||
|
|
||||||
|
TargetAssignmentResult(
|
||||||
|
List<org.apache.kafka.coordinator.group.Record> records,
|
||||||
|
Map<String, Assignment> targetAssignment
|
||||||
|
) {
|
||||||
|
Objects.requireNonNull(records);
|
||||||
|
Objects.requireNonNull(targetAssignment);
|
||||||
|
this.records = records;
|
||||||
|
this.targetAssignment = targetAssignment;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return The records.
|
||||||
|
*/
|
||||||
|
public List<Record> records() {
|
||||||
|
return records;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return The target assignment.
|
||||||
|
*/
|
||||||
|
public Map<String, Assignment> targetAssignment() {
|
||||||
|
return targetAssignment;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The group id.
|
||||||
|
*/
|
||||||
|
private final String groupId;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The group epoch.
|
||||||
|
*/
|
||||||
|
private final int groupEpoch;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The partition assignor used to compute the assignment.
|
||||||
|
*/
|
||||||
|
private final PartitionAssignor assignor;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The members in the group.
|
||||||
|
*/
|
||||||
|
private Map<String, ConsumerGroupMember> members = Collections.emptyMap();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The subscription metadata.
|
||||||
|
*/
|
||||||
|
private Map<String, TopicMetadata> subscriptionMetadata = Collections.emptyMap();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The existing target assignment.
|
||||||
|
*/
|
||||||
|
private Map<String, Assignment> targetAssignment = Collections.emptyMap();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The members which have been updated or deleted. Deleted members
|
||||||
|
* are signaled by a null value.
|
||||||
|
*/
|
||||||
|
private final Map<String, ConsumerGroupMember> updatedMembers = new HashMap<>();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructs the object.
|
||||||
|
*
|
||||||
|
* @param groupId The group id.
|
||||||
|
* @param groupEpoch The group epoch to compute a target assignment for.
|
||||||
|
* @param assignor The assignor to use to compute the target assignment.
|
||||||
|
*/
|
||||||
|
public TargetAssignmentBuilder(
|
||||||
|
String groupId,
|
||||||
|
int groupEpoch,
|
||||||
|
PartitionAssignor assignor
|
||||||
|
) {
|
||||||
|
this.groupId = Objects.requireNonNull(groupId);
|
||||||
|
this.groupEpoch = groupEpoch;
|
||||||
|
this.assignor = Objects.requireNonNull(assignor);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Adds all the existing members.
|
||||||
|
*
|
||||||
|
* @param members The existing members in the consumer group.
|
||||||
|
* @return This object.
|
||||||
|
*/
|
||||||
|
public TargetAssignmentBuilder withMembers(
|
||||||
|
Map<String, ConsumerGroupMember> members
|
||||||
|
) {
|
||||||
|
this.members = members;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Adds the subscription metadata to use.
|
||||||
|
*
|
||||||
|
* @param subscriptionMetadata The subscription metadata.
|
||||||
|
* @return This object.
|
||||||
|
*/
|
||||||
|
public TargetAssignmentBuilder withSubscriptionMetadata(
|
||||||
|
Map<String, TopicMetadata> subscriptionMetadata
|
||||||
|
) {
|
||||||
|
this.subscriptionMetadata = subscriptionMetadata;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Adds the existing target assignment.
|
||||||
|
*
|
||||||
|
* @param targetAssignment The existing target assignment.
|
||||||
|
* @return This object.
|
||||||
|
*/
|
||||||
|
public TargetAssignmentBuilder withTargetAssignment(
|
||||||
|
Map<String, Assignment> targetAssignment
|
||||||
|
) {
|
||||||
|
this.targetAssignment = targetAssignment;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Adds or updates a member. This is useful when the updated member is
|
||||||
|
* not yet materialized in memory.
|
||||||
|
*
|
||||||
|
* @param memberId The member id.
|
||||||
|
* @param member The member to add or update.
|
||||||
|
* @return This object.
|
||||||
|
*/
|
||||||
|
public TargetAssignmentBuilder addOrUpdateMember(
|
||||||
|
String memberId,
|
||||||
|
ConsumerGroupMember member
|
||||||
|
) {
|
||||||
|
this.updatedMembers.put(memberId, member);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Removes a member. This is useful when the removed member
|
||||||
|
* is not yet materialized in memory.
|
||||||
|
*
|
||||||
|
* @param memberId The member id.
|
||||||
|
* @return This object.
|
||||||
|
*/
|
||||||
|
public TargetAssignmentBuilder removeMember(
|
||||||
|
String memberId
|
||||||
|
) {
|
||||||
|
return addOrUpdateMember(memberId, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Builds the new target assignment.
|
||||||
|
*
|
||||||
|
* @return A TargetAssignmentResult which contains the records to update
|
||||||
|
* the existing target assignment.
|
||||||
|
* @throws PartitionAssignorException if the target assignment cannot be computed.
|
||||||
|
*/
|
||||||
|
public TargetAssignmentResult build() throws PartitionAssignorException {
|
||||||
|
Map<String, AssignmentMemberSpec> memberSpecs = new HashMap<>();
|
||||||
|
|
||||||
|
// Prepare the member spec for all members.
|
||||||
|
members.forEach((memberId, member) -> memberSpecs.put(memberId, createAssignmentMemberSpec(
|
||||||
|
member,
|
||||||
|
targetAssignment.getOrDefault(memberId, Assignment.EMPTY),
|
||||||
|
subscriptionMetadata
|
||||||
|
)));
|
||||||
|
|
||||||
|
// Update the member spec if updated or deleted members.
|
||||||
|
updatedMembers.forEach((memberId, updatedMemberOrNull) -> {
|
||||||
|
if (updatedMemberOrNull == null) {
|
||||||
|
memberSpecs.remove(memberId);
|
||||||
|
} else {
|
||||||
|
memberSpecs.put(memberId, createAssignmentMemberSpec(
|
||||||
|
updatedMemberOrNull,
|
||||||
|
targetAssignment.getOrDefault(memberId, Assignment.EMPTY),
|
||||||
|
subscriptionMetadata
|
||||||
|
));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Prepare the topic metadata.
|
||||||
|
Map<Uuid, AssignmentTopicMetadata> topics = new HashMap<>();
|
||||||
|
subscriptionMetadata.forEach((topicName, topicMetadata) ->
|
||||||
|
topics.put(topicMetadata.id(), new AssignmentTopicMetadata(topicMetadata.numPartitions()))
|
||||||
|
);
|
||||||
|
|
||||||
|
// Compute the assignment.
|
||||||
|
GroupAssignment newGroupAssignment = assignor.assign(new AssignmentSpec(
|
||||||
|
Collections.unmodifiableMap(memberSpecs),
|
||||||
|
Collections.unmodifiableMap(topics)
|
||||||
|
));
|
||||||
|
|
||||||
|
// Compute delta from previous to new target assignment and create the
|
||||||
|
// relevant records.
|
||||||
|
List<Record> records = new ArrayList<>();
|
||||||
|
Map<String, Assignment> newTargetAssignment = new HashMap<>();
|
||||||
|
|
||||||
|
memberSpecs.keySet().forEach(memberId -> {
|
||||||
|
Assignment oldMemberAssignment = targetAssignment.get(memberId);
|
||||||
|
Assignment newMemberAssignment = newMemberAssignment(newGroupAssignment, memberId);
|
||||||
|
|
||||||
|
newTargetAssignment.put(memberId, newMemberAssignment);
|
||||||
|
|
||||||
|
if (oldMemberAssignment == null) {
|
||||||
|
// If the member had no assignment, we always create a record for it.
|
||||||
|
records.add(newTargetAssignmentRecord(
|
||||||
|
groupId,
|
||||||
|
memberId,
|
||||||
|
newMemberAssignment.partitions()
|
||||||
|
));
|
||||||
|
} else {
|
||||||
|
// If the member had an assignment, we only create a record if the
|
||||||
|
// new assignment is different.
|
||||||
|
if (!newMemberAssignment.equals(oldMemberAssignment)) {
|
||||||
|
records.add(newTargetAssignmentRecord(
|
||||||
|
groupId,
|
||||||
|
memberId,
|
||||||
|
newMemberAssignment.partitions()
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Bump the target assignment epoch.
|
||||||
|
records.add(newTargetAssignmentEpochRecord(groupId, groupEpoch));
|
||||||
|
|
||||||
|
return new TargetAssignmentResult(records, newTargetAssignment);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Assignment newMemberAssignment(
|
||||||
|
GroupAssignment newGroupAssignment,
|
||||||
|
String memberId
|
||||||
|
) {
|
||||||
|
MemberAssignment newMemberAssignment = newGroupAssignment.members().get(memberId);
|
||||||
|
if (newMemberAssignment != null) {
|
||||||
|
return new Assignment(newMemberAssignment.targetPartitions());
|
||||||
|
} else {
|
||||||
|
return Assignment.EMPTY;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static AssignmentMemberSpec createAssignmentMemberSpec(
|
||||||
|
ConsumerGroupMember member,
|
||||||
|
Assignment targetAssignment,
|
||||||
|
Map<String, TopicMetadata> subscriptionMetadata
|
||||||
|
) {
|
||||||
|
Set<Uuid> subscribedTopics = new HashSet<>();
|
||||||
|
member.subscribedTopicNames().forEach(topicName -> {
|
||||||
|
TopicMetadata topicMetadata = subscriptionMetadata.get(topicName);
|
||||||
|
if (topicMetadata != null) {
|
||||||
|
subscribedTopics.add(topicMetadata.id());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
return new AssignmentMemberSpec(
|
||||||
|
Optional.ofNullable(member.instanceId()),
|
||||||
|
Optional.ofNullable(member.rackId()),
|
||||||
|
subscribedTopics,
|
||||||
|
targetAssignment.partitions()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,697 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.kafka.coordinator.group.consumer;
|
||||||
|
|
||||||
|
import org.apache.kafka.common.Uuid;
|
||||||
|
import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
|
||||||
|
import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
|
||||||
|
import org.apache.kafka.coordinator.group.assignor.AssignmentTopicMetadata;
|
||||||
|
import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
|
||||||
|
import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
|
||||||
|
import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Optional;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkAssignment;
|
||||||
|
import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkTopicAssignment;
|
||||||
|
import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentEpochRecord;
|
||||||
|
import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentRecord;
|
||||||
|
import static org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder.createAssignmentMemberSpec;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.times;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
public class TargetAssignmentBuilderTest {
|
||||||
|
|
||||||
|
public static class TargetAssignmentBuilderTestContext {
|
||||||
|
private final String groupId;
|
||||||
|
private final int groupEpoch;
|
||||||
|
private final PartitionAssignor assignor = mock(PartitionAssignor.class);
|
||||||
|
private final Map<String, ConsumerGroupMember> members = new HashMap<>();
|
||||||
|
private final Map<String, TopicMetadata> subscriptionMetadata = new HashMap<>();
|
||||||
|
private final Map<String, ConsumerGroupMember> updatedMembers = new HashMap<>();
|
||||||
|
private final Map<String, Assignment> targetAssignment = new HashMap<>();
|
||||||
|
private final Map<String, MemberAssignment> memberAssignments = new HashMap<>();
|
||||||
|
|
||||||
|
public TargetAssignmentBuilderTestContext(
|
||||||
|
String groupId,
|
||||||
|
int groupEpoch
|
||||||
|
) {
|
||||||
|
this.groupId = groupId;
|
||||||
|
this.groupEpoch = groupEpoch;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addGroupMember(
|
||||||
|
String memberId,
|
||||||
|
List<String> subscriptions,
|
||||||
|
Map<Uuid, Set<Integer>> targetPartitions
|
||||||
|
) {
|
||||||
|
members.put(memberId, new ConsumerGroupMember.Builder(memberId)
|
||||||
|
.setSubscribedTopicNames(subscriptions)
|
||||||
|
.build());
|
||||||
|
|
||||||
|
targetAssignment.put(memberId, new Assignment(
|
||||||
|
(byte) 0,
|
||||||
|
targetPartitions,
|
||||||
|
VersionedMetadata.EMPTY
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
public Uuid addTopicMetadata(
|
||||||
|
String topicName,
|
||||||
|
int numPartitions
|
||||||
|
) {
|
||||||
|
Uuid topicId = Uuid.randomUuid();
|
||||||
|
subscriptionMetadata.put(topicName, new TopicMetadata(
|
||||||
|
topicId,
|
||||||
|
topicName,
|
||||||
|
numPartitions
|
||||||
|
));
|
||||||
|
return topicId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void updateMemberSubscription(
|
||||||
|
String memberId,
|
||||||
|
List<String> subscriptions
|
||||||
|
) {
|
||||||
|
updateMemberSubscription(
|
||||||
|
memberId,
|
||||||
|
subscriptions,
|
||||||
|
Optional.empty(),
|
||||||
|
Optional.empty()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void updateMemberSubscription(
|
||||||
|
String memberId,
|
||||||
|
List<String> subscriptions,
|
||||||
|
Optional<String> instanceId,
|
||||||
|
Optional<String> rackId
|
||||||
|
) {
|
||||||
|
ConsumerGroupMember existingMember = members.get(memberId);
|
||||||
|
ConsumerGroupMember.Builder builder;
|
||||||
|
if (existingMember != null) {
|
||||||
|
builder = new ConsumerGroupMember.Builder(existingMember);
|
||||||
|
} else {
|
||||||
|
builder = new ConsumerGroupMember.Builder(memberId);
|
||||||
|
}
|
||||||
|
updatedMembers.put(memberId, builder
|
||||||
|
.setSubscribedTopicNames(subscriptions)
|
||||||
|
.maybeUpdateInstanceId(instanceId)
|
||||||
|
.maybeUpdateRackId(rackId)
|
||||||
|
.build());
|
||||||
|
}
|
||||||
|
|
||||||
|
public void removeMemberSubscription(
|
||||||
|
String memberId
|
||||||
|
) {
|
||||||
|
this.updatedMembers.put(memberId, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void prepareMemberAssignment(
|
||||||
|
String memberId,
|
||||||
|
Map<Uuid, Set<Integer>> assignment
|
||||||
|
) {
|
||||||
|
memberAssignments.put(memberId, new MemberAssignment(assignment));
|
||||||
|
}
|
||||||
|
|
||||||
|
public TargetAssignmentBuilder.TargetAssignmentResult build() {
|
||||||
|
// Prepare expected member specs.
|
||||||
|
Map<String, AssignmentMemberSpec> memberSpecs = new HashMap<>();
|
||||||
|
|
||||||
|
// All the existing members are prepared.
|
||||||
|
members.forEach((memberId, member) -> {
|
||||||
|
memberSpecs.put(memberId, createAssignmentMemberSpec(
|
||||||
|
member,
|
||||||
|
targetAssignment.getOrDefault(memberId, Assignment.EMPTY),
|
||||||
|
subscriptionMetadata
|
||||||
|
));
|
||||||
|
});
|
||||||
|
|
||||||
|
// All the updated are added and all the deleted
|
||||||
|
// members are removed.
|
||||||
|
updatedMembers.forEach((memberId, updatedMemberOrNull) -> {
|
||||||
|
if (updatedMemberOrNull == null) {
|
||||||
|
memberSpecs.remove(memberId);
|
||||||
|
} else {
|
||||||
|
memberSpecs.put(memberId, createAssignmentMemberSpec(
|
||||||
|
updatedMemberOrNull,
|
||||||
|
targetAssignment.getOrDefault(memberId, Assignment.EMPTY),
|
||||||
|
subscriptionMetadata
|
||||||
|
));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Prepare the expected topic metadata.
|
||||||
|
Map<Uuid, AssignmentTopicMetadata> topicMetadata = new HashMap<>();
|
||||||
|
subscriptionMetadata.forEach((topicName, metadata) -> {
|
||||||
|
topicMetadata.put(metadata.id(), new AssignmentTopicMetadata(metadata.numPartitions()));
|
||||||
|
});
|
||||||
|
|
||||||
|
// Prepare the expected assignment spec.
|
||||||
|
AssignmentSpec assignmentSpec = new AssignmentSpec(
|
||||||
|
memberSpecs,
|
||||||
|
topicMetadata
|
||||||
|
);
|
||||||
|
|
||||||
|
// We use `any` here to always return an assignment but use `verify` later on
|
||||||
|
// to ensure that the input was correct.
|
||||||
|
when(assignor.assign(any())).thenReturn(new GroupAssignment(memberAssignments));
|
||||||
|
|
||||||
|
// Create and populate the assignment builder.
|
||||||
|
TargetAssignmentBuilder builder = new TargetAssignmentBuilder(groupId, groupEpoch, assignor)
|
||||||
|
.withMembers(members)
|
||||||
|
.withSubscriptionMetadata(subscriptionMetadata)
|
||||||
|
.withTargetAssignment(targetAssignment);
|
||||||
|
|
||||||
|
// Add the updated members or delete the deleted members.
|
||||||
|
updatedMembers.forEach((memberId, updatedMemberOrNull) -> {
|
||||||
|
if (updatedMemberOrNull != null) {
|
||||||
|
builder.addOrUpdateMember(memberId, updatedMemberOrNull);
|
||||||
|
} else {
|
||||||
|
builder.removeMember(memberId);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Execute the builder.
|
||||||
|
TargetAssignmentBuilder.TargetAssignmentResult result = builder.build();
|
||||||
|
|
||||||
|
// Verify that the assignor was called once with the expected
|
||||||
|
// assignment spec.
|
||||||
|
verify(assignor, times(1)).assign(assignmentSpec);
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCreateAssignmentMemberSpec() {
|
||||||
|
Uuid fooTopicId = Uuid.randomUuid();
|
||||||
|
Uuid barTopicId = Uuid.randomUuid();
|
||||||
|
|
||||||
|
ConsumerGroupMember member = new ConsumerGroupMember.Builder("member-id")
|
||||||
|
.setSubscribedTopicNames(Arrays.asList("foo", "bar", "zar"))
|
||||||
|
.setRackId("rackId")
|
||||||
|
.setInstanceId("instanceId")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
Map<String, TopicMetadata> subscriptionMetadata = new HashMap<String, TopicMetadata>() {
|
||||||
|
{
|
||||||
|
put("foo", new TopicMetadata(fooTopicId, "foo", 5));
|
||||||
|
put("bar", new TopicMetadata(barTopicId, "bar", 5));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
Assignment assignment = new Assignment(mkAssignment(
|
||||||
|
mkTopicAssignment(fooTopicId, 1, 2, 3),
|
||||||
|
mkTopicAssignment(barTopicId, 1, 2, 3)
|
||||||
|
));
|
||||||
|
|
||||||
|
AssignmentMemberSpec assignmentMemberSpec = createAssignmentMemberSpec(
|
||||||
|
member,
|
||||||
|
assignment,
|
||||||
|
subscriptionMetadata
|
||||||
|
);
|
||||||
|
|
||||||
|
assertEquals(new AssignmentMemberSpec(
|
||||||
|
Optional.of("instanceId"),
|
||||||
|
Optional.of("rackId"),
|
||||||
|
new HashSet<>(Arrays.asList(fooTopicId, barTopicId)),
|
||||||
|
assignment.partitions()
|
||||||
|
), assignmentMemberSpec);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testEmpty() {
|
||||||
|
TargetAssignmentBuilderTestContext context = new TargetAssignmentBuilderTestContext(
|
||||||
|
"my-group",
|
||||||
|
20
|
||||||
|
);
|
||||||
|
|
||||||
|
TargetAssignmentBuilder.TargetAssignmentResult result = context.build();
|
||||||
|
assertEquals(Collections.singletonList(newTargetAssignmentEpochRecord(
|
||||||
|
"my-group",
|
||||||
|
20
|
||||||
|
)), result.records());
|
||||||
|
assertEquals(Collections.emptyMap(), result.targetAssignment());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAssignmentHasNotChanged() {
|
||||||
|
TargetAssignmentBuilderTestContext context = new TargetAssignmentBuilderTestContext(
|
||||||
|
"my-group",
|
||||||
|
20
|
||||||
|
);
|
||||||
|
|
||||||
|
Uuid fooTopicId = context.addTopicMetadata("foo", 6);
|
||||||
|
Uuid barTopicId = context.addTopicMetadata("bar", 6);
|
||||||
|
|
||||||
|
context.addGroupMember("member-1", Arrays.asList("foo", "bar", "zar"), mkAssignment(
|
||||||
|
mkTopicAssignment(fooTopicId, 1, 2, 3),
|
||||||
|
mkTopicAssignment(barTopicId, 1, 2, 3)
|
||||||
|
));
|
||||||
|
|
||||||
|
context.addGroupMember("member-2", Arrays.asList("foo", "bar", "zar"), mkAssignment(
|
||||||
|
mkTopicAssignment(fooTopicId, 4, 5, 6),
|
||||||
|
mkTopicAssignment(barTopicId, 4, 5, 6)
|
||||||
|
));
|
||||||
|
|
||||||
|
context.prepareMemberAssignment("member-1", mkAssignment(
|
||||||
|
mkTopicAssignment(fooTopicId, 1, 2, 3),
|
||||||
|
mkTopicAssignment(barTopicId, 1, 2, 3)
|
||||||
|
));
|
||||||
|
|
||||||
|
context.prepareMemberAssignment("member-2", mkAssignment(
|
||||||
|
mkTopicAssignment(fooTopicId, 4, 5, 6),
|
||||||
|
mkTopicAssignment(barTopicId, 4, 5, 6)
|
||||||
|
));
|
||||||
|
|
||||||
|
TargetAssignmentBuilder.TargetAssignmentResult result = context.build();
|
||||||
|
|
||||||
|
assertEquals(Collections.singletonList(newTargetAssignmentEpochRecord(
|
||||||
|
"my-group",
|
||||||
|
20
|
||||||
|
)), result.records());
|
||||||
|
|
||||||
|
Map<String, Assignment> expectedAssignment = new HashMap<>();
|
||||||
|
expectedAssignment.put("member-1", new Assignment(mkAssignment(
|
||||||
|
mkTopicAssignment(fooTopicId, 1, 2, 3),
|
||||||
|
mkTopicAssignment(barTopicId, 1, 2, 3)
|
||||||
|
)));
|
||||||
|
expectedAssignment.put("member-2", new Assignment(mkAssignment(
|
||||||
|
mkTopicAssignment(fooTopicId, 4, 5, 6),
|
||||||
|
mkTopicAssignment(barTopicId, 4, 5, 6)
|
||||||
|
)));
|
||||||
|
|
||||||
|
assertEquals(expectedAssignment, result.targetAssignment());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAssignmentSwapped() {
|
||||||
|
TargetAssignmentBuilderTestContext context = new TargetAssignmentBuilderTestContext(
|
||||||
|
"my-group",
|
||||||
|
20
|
||||||
|
);
|
||||||
|
|
||||||
|
Uuid fooTopicId = context.addTopicMetadata("foo", 6);
|
||||||
|
Uuid barTopicId = context.addTopicMetadata("bar", 6);
|
||||||
|
|
||||||
|
context.addGroupMember("member-1", Arrays.asList("foo", "bar", "zar"), mkAssignment(
|
||||||
|
mkTopicAssignment(fooTopicId, 1, 2, 3),
|
||||||
|
mkTopicAssignment(barTopicId, 1, 2, 3)
|
||||||
|
));
|
||||||
|
|
||||||
|
context.addGroupMember("member-2", Arrays.asList("foo", "bar", "zar"), mkAssignment(
|
||||||
|
mkTopicAssignment(fooTopicId, 4, 5, 6),
|
||||||
|
mkTopicAssignment(barTopicId, 4, 5, 6)
|
||||||
|
));
|
||||||
|
|
||||||
|
context.prepareMemberAssignment("member-2", mkAssignment(
|
||||||
|
mkTopicAssignment(fooTopicId, 1, 2, 3),
|
||||||
|
mkTopicAssignment(barTopicId, 1, 2, 3)
|
||||||
|
));
|
||||||
|
|
||||||
|
context.prepareMemberAssignment("member-1", mkAssignment(
|
||||||
|
mkTopicAssignment(fooTopicId, 4, 5, 6),
|
||||||
|
mkTopicAssignment(barTopicId, 4, 5, 6)
|
||||||
|
));
|
||||||
|
|
||||||
|
TargetAssignmentBuilder.TargetAssignmentResult result = context.build();
|
||||||
|
|
||||||
|
assertEquals(3, result.records().size());
|
||||||
|
|
||||||
|
assertUnorderedList(Arrays.asList(
|
||||||
|
newTargetAssignmentRecord("my-group", "member-1", mkAssignment(
|
||||||
|
mkTopicAssignment(fooTopicId, 4, 5, 6),
|
||||||
|
mkTopicAssignment(barTopicId, 4, 5, 6)
|
||||||
|
)),
|
||||||
|
newTargetAssignmentRecord("my-group", "member-2", mkAssignment(
|
||||||
|
mkTopicAssignment(fooTopicId, 1, 2, 3),
|
||||||
|
mkTopicAssignment(barTopicId, 1, 2, 3)
|
||||||
|
))
|
||||||
|
), result.records().subList(0, 2));
|
||||||
|
|
||||||
|
assertEquals(newTargetAssignmentEpochRecord(
|
||||||
|
"my-group",
|
||||||
|
20
|
||||||
|
), result.records().get(2));
|
||||||
|
|
||||||
|
Map<String, Assignment> expectedAssignment = new HashMap<>();
|
||||||
|
expectedAssignment.put("member-2", new Assignment(mkAssignment(
|
||||||
|
mkTopicAssignment(fooTopicId, 1, 2, 3),
|
||||||
|
mkTopicAssignment(barTopicId, 1, 2, 3)
|
||||||
|
)));
|
||||||
|
expectedAssignment.put("member-1", new Assignment(mkAssignment(
|
||||||
|
mkTopicAssignment(fooTopicId, 4, 5, 6),
|
||||||
|
mkTopicAssignment(barTopicId, 4, 5, 6)
|
||||||
|
)));
|
||||||
|
|
||||||
|
assertEquals(expectedAssignment, result.targetAssignment());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNewMember() {
|
||||||
|
TargetAssignmentBuilderTestContext context = new TargetAssignmentBuilderTestContext(
|
||||||
|
"my-group",
|
||||||
|
20
|
||||||
|
);
|
||||||
|
|
||||||
|
Uuid fooTopicId = context.addTopicMetadata("foo", 6);
|
||||||
|
Uuid barTopicId = context.addTopicMetadata("bar", 6);
|
||||||
|
|
||||||
|
context.addGroupMember("member-1", Arrays.asList("foo", "bar", "zar"), mkAssignment(
|
||||||
|
mkTopicAssignment(fooTopicId, 1, 2, 3),
|
||||||
|
mkTopicAssignment(barTopicId, 1, 2, 3)
|
||||||
|
));
|
||||||
|
|
||||||
|
context.addGroupMember("member-2", Arrays.asList("foo", "bar", "zar"), mkAssignment(
|
||||||
|
mkTopicAssignment(fooTopicId, 4, 5, 6),
|
||||||
|
mkTopicAssignment(barTopicId, 4, 5, 6)
|
||||||
|
));
|
||||||
|
|
||||||
|
context.updateMemberSubscription("member-3", Arrays.asList("foo", "bar", "zar"));
|
||||||
|
|
||||||
|
context.prepareMemberAssignment("member-1", mkAssignment(
|
||||||
|
mkTopicAssignment(fooTopicId, 1, 2),
|
||||||
|
mkTopicAssignment(barTopicId, 1, 2)
|
||||||
|
));
|
||||||
|
|
||||||
|
context.prepareMemberAssignment("member-2", mkAssignment(
|
||||||
|
mkTopicAssignment(fooTopicId, 3, 4),
|
||||||
|
mkTopicAssignment(barTopicId, 3, 4)
|
||||||
|
));
|
||||||
|
|
||||||
|
context.prepareMemberAssignment("member-3", mkAssignment(
|
||||||
|
mkTopicAssignment(fooTopicId, 5, 6),
|
||||||
|
mkTopicAssignment(barTopicId, 5, 6)
|
||||||
|
));
|
||||||
|
|
||||||
|
TargetAssignmentBuilder.TargetAssignmentResult result = context.build();
|
||||||
|
|
||||||
|
assertEquals(4, result.records().size());
|
||||||
|
|
||||||
|
assertUnorderedList(Arrays.asList(
|
||||||
|
newTargetAssignmentRecord("my-group", "member-1", mkAssignment(
|
||||||
|
mkTopicAssignment(fooTopicId, 1, 2),
|
||||||
|
mkTopicAssignment(barTopicId, 1, 2)
|
||||||
|
)),
|
||||||
|
newTargetAssignmentRecord("my-group", "member-2", mkAssignment(
|
||||||
|
mkTopicAssignment(fooTopicId, 3, 4),
|
||||||
|
mkTopicAssignment(barTopicId, 3, 4)
|
||||||
|
)),
|
||||||
|
newTargetAssignmentRecord("my-group", "member-3", mkAssignment(
|
||||||
|
mkTopicAssignment(fooTopicId, 5, 6),
|
||||||
|
mkTopicAssignment(barTopicId, 5, 6)
|
||||||
|
))
|
||||||
|
), result.records().subList(0, 3));
|
||||||
|
|
||||||
|
assertEquals(newTargetAssignmentEpochRecord(
|
||||||
|
"my-group",
|
||||||
|
20
|
||||||
|
), result.records().get(3));
|
||||||
|
|
||||||
|
Map<String, Assignment> expectedAssignment = new HashMap<>();
|
||||||
|
expectedAssignment.put("member-1", new Assignment(mkAssignment(
|
||||||
|
mkTopicAssignment(fooTopicId, 1, 2),
|
||||||
|
mkTopicAssignment(barTopicId, 1, 2)
|
||||||
|
)));
|
||||||
|
expectedAssignment.put("member-2", new Assignment(mkAssignment(
|
||||||
|
mkTopicAssignment(fooTopicId, 3, 4),
|
||||||
|
mkTopicAssignment(barTopicId, 3, 4)
|
||||||
|
)));
|
||||||
|
expectedAssignment.put("member-3", new Assignment(mkAssignment(
|
||||||
|
mkTopicAssignment(fooTopicId, 5, 6),
|
||||||
|
mkTopicAssignment(barTopicId, 5, 6)
|
||||||
|
)));
|
||||||
|
|
||||||
|
assertEquals(expectedAssignment, result.targetAssignment());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUpdateMember() {
|
||||||
|
TargetAssignmentBuilderTestContext context = new TargetAssignmentBuilderTestContext(
|
||||||
|
"my-group",
|
||||||
|
20
|
||||||
|
);
|
||||||
|
|
||||||
|
Uuid fooTopicId = context.addTopicMetadata("foo", 6);
|
||||||
|
Uuid barTopicId = context.addTopicMetadata("bar", 6);
|
||||||
|
|
||||||
|
context.addGroupMember("member-1", Arrays.asList("foo", "bar", "zar"), mkAssignment(
|
||||||
|
mkTopicAssignment(fooTopicId, 1, 2, 3),
|
||||||
|
mkTopicAssignment(barTopicId, 1, 2)
|
||||||
|
));
|
||||||
|
|
||||||
|
context.addGroupMember("member-2", Arrays.asList("foo", "bar", "zar"), mkAssignment(
|
||||||
|
mkTopicAssignment(fooTopicId, 4, 5, 6),
|
||||||
|
mkTopicAssignment(barTopicId, 3, 4)
|
||||||
|
));
|
||||||
|
|
||||||
|
context.addGroupMember("member-3", Arrays.asList("bar", "zar"), mkAssignment(
|
||||||
|
mkTopicAssignment(barTopicId, 5, 6)
|
||||||
|
));
|
||||||
|
|
||||||
|
context.updateMemberSubscription(
|
||||||
|
"member-3",
|
||||||
|
Arrays.asList("foo", "bar", "zar"),
|
||||||
|
Optional.of("instance-id-3"),
|
||||||
|
Optional.of("rack-0")
|
||||||
|
);
|
||||||
|
|
||||||
|
context.prepareMemberAssignment("member-1", mkAssignment(
|
||||||
|
mkTopicAssignment(fooTopicId, 1, 2),
|
||||||
|
mkTopicAssignment(barTopicId, 1, 2)
|
||||||
|
));
|
||||||
|
|
||||||
|
context.prepareMemberAssignment("member-2", mkAssignment(
|
||||||
|
mkTopicAssignment(fooTopicId, 3, 4),
|
||||||
|
mkTopicAssignment(barTopicId, 3, 4)
|
||||||
|
));
|
||||||
|
|
||||||
|
context.prepareMemberAssignment("member-3", mkAssignment(
|
||||||
|
mkTopicAssignment(fooTopicId, 5, 6),
|
||||||
|
mkTopicAssignment(barTopicId, 5, 6)
|
||||||
|
));
|
||||||
|
|
||||||
|
TargetAssignmentBuilder.TargetAssignmentResult result = context.build();
|
||||||
|
|
||||||
|
assertEquals(4, result.records().size());
|
||||||
|
|
||||||
|
assertUnorderedList(Arrays.asList(
|
||||||
|
newTargetAssignmentRecord("my-group", "member-1", mkAssignment(
|
||||||
|
mkTopicAssignment(fooTopicId, 1, 2),
|
||||||
|
mkTopicAssignment(barTopicId, 1, 2)
|
||||||
|
)),
|
||||||
|
newTargetAssignmentRecord("my-group", "member-2", mkAssignment(
|
||||||
|
mkTopicAssignment(fooTopicId, 3, 4),
|
||||||
|
mkTopicAssignment(barTopicId, 3, 4)
|
||||||
|
)),
|
||||||
|
newTargetAssignmentRecord("my-group", "member-3", mkAssignment(
|
||||||
|
mkTopicAssignment(fooTopicId, 5, 6),
|
||||||
|
mkTopicAssignment(barTopicId, 5, 6)
|
||||||
|
))
|
||||||
|
), result.records().subList(0, 3));
|
||||||
|
|
||||||
|
assertEquals(newTargetAssignmentEpochRecord(
|
||||||
|
"my-group",
|
||||||
|
20
|
||||||
|
), result.records().get(3));
|
||||||
|
|
||||||
|
Map<String, Assignment> expectedAssignment = new HashMap<>();
|
||||||
|
expectedAssignment.put("member-1", new Assignment(mkAssignment(
|
||||||
|
mkTopicAssignment(fooTopicId, 1, 2),
|
||||||
|
mkTopicAssignment(barTopicId, 1, 2)
|
||||||
|
)));
|
||||||
|
expectedAssignment.put("member-2", new Assignment(mkAssignment(
|
||||||
|
mkTopicAssignment(fooTopicId, 3, 4),
|
||||||
|
mkTopicAssignment(barTopicId, 3, 4)
|
||||||
|
)));
|
||||||
|
expectedAssignment.put("member-3", new Assignment(mkAssignment(
|
||||||
|
mkTopicAssignment(fooTopicId, 5, 6),
|
||||||
|
mkTopicAssignment(barTopicId, 5, 6)
|
||||||
|
)));
|
||||||
|
|
||||||
|
assertEquals(expectedAssignment, result.targetAssignment());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPartialAssignmentUpdate() {
|
||||||
|
TargetAssignmentBuilderTestContext context = new TargetAssignmentBuilderTestContext(
|
||||||
|
"my-group",
|
||||||
|
20
|
||||||
|
);
|
||||||
|
|
||||||
|
Uuid fooTopicId = context.addTopicMetadata("foo", 6);
|
||||||
|
Uuid barTopicId = context.addTopicMetadata("bar", 6);
|
||||||
|
|
||||||
|
context.addGroupMember("member-1", Arrays.asList("foo", "bar", "zar"), mkAssignment(
|
||||||
|
mkTopicAssignment(fooTopicId, 1, 2),
|
||||||
|
mkTopicAssignment(barTopicId, 1, 2)
|
||||||
|
));
|
||||||
|
|
||||||
|
context.addGroupMember("member-2", Arrays.asList("foo", "bar", "zar"), mkAssignment(
|
||||||
|
mkTopicAssignment(fooTopicId, 3, 4),
|
||||||
|
mkTopicAssignment(barTopicId, 3, 4)
|
||||||
|
));
|
||||||
|
|
||||||
|
context.addGroupMember("member-3", Arrays.asList("bar", "zar"), mkAssignment(
|
||||||
|
mkTopicAssignment(fooTopicId, 5, 6),
|
||||||
|
mkTopicAssignment(barTopicId, 5, 6)
|
||||||
|
));
|
||||||
|
|
||||||
|
context.prepareMemberAssignment("member-1", mkAssignment(
|
||||||
|
mkTopicAssignment(fooTopicId, 1, 2),
|
||||||
|
mkTopicAssignment(barTopicId, 1, 2)
|
||||||
|
));
|
||||||
|
|
||||||
|
context.prepareMemberAssignment("member-2", mkAssignment(
|
||||||
|
mkTopicAssignment(fooTopicId, 3, 4, 5),
|
||||||
|
mkTopicAssignment(barTopicId, 3, 4, 5)
|
||||||
|
));
|
||||||
|
|
||||||
|
context.prepareMemberAssignment("member-3", mkAssignment(
|
||||||
|
mkTopicAssignment(fooTopicId, 6),
|
||||||
|
mkTopicAssignment(barTopicId, 6)
|
||||||
|
));
|
||||||
|
|
||||||
|
TargetAssignmentBuilder.TargetAssignmentResult result = context.build();
|
||||||
|
|
||||||
|
assertEquals(3, result.records().size());
|
||||||
|
|
||||||
|
// Member 1 has no record because its assignment did not change.
|
||||||
|
assertUnorderedList(Arrays.asList(
|
||||||
|
newTargetAssignmentRecord("my-group", "member-2", mkAssignment(
|
||||||
|
mkTopicAssignment(fooTopicId, 3, 4, 5),
|
||||||
|
mkTopicAssignment(barTopicId, 3, 4, 5)
|
||||||
|
)),
|
||||||
|
newTargetAssignmentRecord("my-group", "member-3", mkAssignment(
|
||||||
|
mkTopicAssignment(fooTopicId, 6),
|
||||||
|
mkTopicAssignment(barTopicId, 6)
|
||||||
|
))
|
||||||
|
), result.records().subList(0, 2));
|
||||||
|
|
||||||
|
assertEquals(newTargetAssignmentEpochRecord(
|
||||||
|
"my-group",
|
||||||
|
20
|
||||||
|
), result.records().get(2));
|
||||||
|
|
||||||
|
Map<String, Assignment> expectedAssignment = new HashMap<>();
|
||||||
|
expectedAssignment.put("member-1", new Assignment(mkAssignment(
|
||||||
|
mkTopicAssignment(fooTopicId, 1, 2),
|
||||||
|
mkTopicAssignment(barTopicId, 1, 2)
|
||||||
|
)));
|
||||||
|
expectedAssignment.put("member-2", new Assignment(mkAssignment(
|
||||||
|
mkTopicAssignment(fooTopicId, 3, 4, 5),
|
||||||
|
mkTopicAssignment(barTopicId, 3, 4, 5)
|
||||||
|
)));
|
||||||
|
expectedAssignment.put("member-3", new Assignment(mkAssignment(
|
||||||
|
mkTopicAssignment(fooTopicId, 6),
|
||||||
|
mkTopicAssignment(barTopicId, 6)
|
||||||
|
)));
|
||||||
|
|
||||||
|
assertEquals(expectedAssignment, result.targetAssignment());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDeleteMember() {
|
||||||
|
TargetAssignmentBuilderTestContext context = new TargetAssignmentBuilderTestContext(
|
||||||
|
"my-group",
|
||||||
|
20
|
||||||
|
);
|
||||||
|
|
||||||
|
Uuid fooTopicId = context.addTopicMetadata("foo", 6);
|
||||||
|
Uuid barTopicId = context.addTopicMetadata("bar", 6);
|
||||||
|
|
||||||
|
context.addGroupMember("member-1", Arrays.asList("foo", "bar", "zar"), mkAssignment(
|
||||||
|
mkTopicAssignment(fooTopicId, 1, 2),
|
||||||
|
mkTopicAssignment(barTopicId, 1, 2)
|
||||||
|
));
|
||||||
|
|
||||||
|
context.addGroupMember("member-2", Arrays.asList("foo", "bar", "zar"), mkAssignment(
|
||||||
|
mkTopicAssignment(fooTopicId, 3, 4),
|
||||||
|
mkTopicAssignment(barTopicId, 3, 4)
|
||||||
|
));
|
||||||
|
|
||||||
|
context.addGroupMember("member-3", Arrays.asList("foo", "bar", "zar"), mkAssignment(
|
||||||
|
mkTopicAssignment(fooTopicId, 5, 6),
|
||||||
|
mkTopicAssignment(barTopicId, 5, 6)
|
||||||
|
));
|
||||||
|
|
||||||
|
context.removeMemberSubscription("member-3");
|
||||||
|
|
||||||
|
context.prepareMemberAssignment("member-1", mkAssignment(
|
||||||
|
mkTopicAssignment(fooTopicId, 1, 2, 3),
|
||||||
|
mkTopicAssignment(barTopicId, 1, 2, 3)
|
||||||
|
));
|
||||||
|
|
||||||
|
context.prepareMemberAssignment("member-2", mkAssignment(
|
||||||
|
mkTopicAssignment(fooTopicId, 4, 5, 6),
|
||||||
|
mkTopicAssignment(barTopicId, 4, 5, 6)
|
||||||
|
));
|
||||||
|
|
||||||
|
TargetAssignmentBuilder.TargetAssignmentResult result = context.build();
|
||||||
|
|
||||||
|
assertEquals(3, result.records().size());
|
||||||
|
|
||||||
|
assertUnorderedList(Arrays.asList(
|
||||||
|
newTargetAssignmentRecord("my-group", "member-1", mkAssignment(
|
||||||
|
mkTopicAssignment(fooTopicId, 1, 2, 3),
|
||||||
|
mkTopicAssignment(barTopicId, 1, 2, 3)
|
||||||
|
)),
|
||||||
|
newTargetAssignmentRecord("my-group", "member-2", mkAssignment(
|
||||||
|
mkTopicAssignment(fooTopicId, 4, 5, 6),
|
||||||
|
mkTopicAssignment(barTopicId, 4, 5, 6)
|
||||||
|
))
|
||||||
|
), result.records().subList(0, 2));
|
||||||
|
|
||||||
|
assertEquals(newTargetAssignmentEpochRecord(
|
||||||
|
"my-group",
|
||||||
|
20
|
||||||
|
), result.records().get(2));
|
||||||
|
|
||||||
|
Map<String, Assignment> expectedAssignment = new HashMap<>();
|
||||||
|
expectedAssignment.put("member-1", new Assignment(mkAssignment(
|
||||||
|
mkTopicAssignment(fooTopicId, 1, 2, 3),
|
||||||
|
mkTopicAssignment(barTopicId, 1, 2, 3)
|
||||||
|
)));
|
||||||
|
expectedAssignment.put("member-2", new Assignment(mkAssignment(
|
||||||
|
mkTopicAssignment(fooTopicId, 4, 5, 6),
|
||||||
|
mkTopicAssignment(barTopicId, 4, 5, 6)
|
||||||
|
)));
|
||||||
|
|
||||||
|
assertEquals(expectedAssignment, result.targetAssignment());
|
||||||
|
}
|
||||||
|
|
||||||
|
public static <T> void assertUnorderedList(
|
||||||
|
List<T> expected,
|
||||||
|
List<T> actual
|
||||||
|
) {
|
||||||
|
assertEquals(expected.size(), actual.size());
|
||||||
|
assertEquals(new HashSet<>(expected), new HashSet<>(actual));
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue