diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java index 823368e65e0..b2e4fcb782e 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java @@ -38,8 +38,8 @@ import java.util.stream.Collectors; */ public class ConsumerGroupMember { /** - * A builder allowing to create a new member or update an - * existing one. + * A builder that facilitates the creation of a new member or the update of + * an existing one. * * Please refer to the javadoc of {{@link ConsumerGroupMember}} for the * definition of the fields. @@ -521,7 +521,7 @@ public class ConsumerGroupMember { } /** - * @return The set of partitions awaiting assigning to the member. + * @return The set of partitions awaiting assignment to the member. */ public Map> partitionsPendingAssignment() { return partitionsPendingAssignment; diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java new file mode 100644 index 00000000000..02b120db1ef --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.Record; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentTopicMetadata; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; + +import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentEpochRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentRecord; + +/** + * Build a new Target Assignment based on the provided parameters. As a result, + * it yields the records that must be persisted to the log and the new member + * assignments as a map. + * + * Records are only created for members which have a new target assignment. If + * their assignment did not change, no new record is needed. + * + * When a member is deleted, it is assumed that its target assignment record + * is deleted as part of the member deletion process. In other words, this class + * does not yield a tombstone for removed members. + */ +public class TargetAssignmentBuilder { + /** + * The assignment result returned by {{@link TargetAssignmentBuilder#build()}}. + */ + public static class TargetAssignmentResult { + /** + * The records that must be applied to the __consumer_offsets + * topics to persist the new target assignment. + */ + private final List records; + + /** + * The new target assignment for the group. + */ + private final Map targetAssignment; + + TargetAssignmentResult( + List records, + Map targetAssignment + ) { + Objects.requireNonNull(records); + Objects.requireNonNull(targetAssignment); + this.records = records; + this.targetAssignment = targetAssignment; + } + + /** + * @return The records. + */ + public List records() { + return records; + } + + /** + * @return The target assignment. + */ + public Map targetAssignment() { + return targetAssignment; + } + } + + /** + * The group id. + */ + private final String groupId; + + /** + * The group epoch. + */ + private final int groupEpoch; + + /** + * The partition assignor used to compute the assignment. + */ + private final PartitionAssignor assignor; + + /** + * The members in the group. + */ + private Map members = Collections.emptyMap(); + + /** + * The subscription metadata. + */ + private Map subscriptionMetadata = Collections.emptyMap(); + + /** + * The existing target assignment. + */ + private Map targetAssignment = Collections.emptyMap(); + + /** + * The members which have been updated or deleted. Deleted members + * are signaled by a null value. + */ + private final Map updatedMembers = new HashMap<>(); + + /** + * Constructs the object. + * + * @param groupId The group id. + * @param groupEpoch The group epoch to compute a target assignment for. + * @param assignor The assignor to use to compute the target assignment. + */ + public TargetAssignmentBuilder( + String groupId, + int groupEpoch, + PartitionAssignor assignor + ) { + this.groupId = Objects.requireNonNull(groupId); + this.groupEpoch = groupEpoch; + this.assignor = Objects.requireNonNull(assignor); + } + + /** + * Adds all the existing members. + * + * @param members The existing members in the consumer group. + * @return This object. + */ + public TargetAssignmentBuilder withMembers( + Map members + ) { + this.members = members; + return this; + } + + /** + * Adds the subscription metadata to use. + * + * @param subscriptionMetadata The subscription metadata. + * @return This object. + */ + public TargetAssignmentBuilder withSubscriptionMetadata( + Map subscriptionMetadata + ) { + this.subscriptionMetadata = subscriptionMetadata; + return this; + } + + /** + * Adds the existing target assignment. + * + * @param targetAssignment The existing target assignment. + * @return This object. + */ + public TargetAssignmentBuilder withTargetAssignment( + Map targetAssignment + ) { + this.targetAssignment = targetAssignment; + return this; + } + + /** + * Adds or updates a member. This is useful when the updated member is + * not yet materialized in memory. + * + * @param memberId The member id. + * @param member The member to add or update. + * @return This object. + */ + public TargetAssignmentBuilder addOrUpdateMember( + String memberId, + ConsumerGroupMember member + ) { + this.updatedMembers.put(memberId, member); + return this; + } + + /** + * Removes a member. This is useful when the removed member + * is not yet materialized in memory. + * + * @param memberId The member id. + * @return This object. + */ + public TargetAssignmentBuilder removeMember( + String memberId + ) { + return addOrUpdateMember(memberId, null); + } + + /** + * Builds the new target assignment. + * + * @return A TargetAssignmentResult which contains the records to update + * the existing target assignment. + * @throws PartitionAssignorException if the target assignment cannot be computed. + */ + public TargetAssignmentResult build() throws PartitionAssignorException { + Map memberSpecs = new HashMap<>(); + + // Prepare the member spec for all members. + members.forEach((memberId, member) -> memberSpecs.put(memberId, createAssignmentMemberSpec( + member, + targetAssignment.getOrDefault(memberId, Assignment.EMPTY), + subscriptionMetadata + ))); + + // Update the member spec if updated or deleted members. + updatedMembers.forEach((memberId, updatedMemberOrNull) -> { + if (updatedMemberOrNull == null) { + memberSpecs.remove(memberId); + } else { + memberSpecs.put(memberId, createAssignmentMemberSpec( + updatedMemberOrNull, + targetAssignment.getOrDefault(memberId, Assignment.EMPTY), + subscriptionMetadata + )); + } + }); + + // Prepare the topic metadata. + Map topics = new HashMap<>(); + subscriptionMetadata.forEach((topicName, topicMetadata) -> + topics.put(topicMetadata.id(), new AssignmentTopicMetadata(topicMetadata.numPartitions())) + ); + + // Compute the assignment. + GroupAssignment newGroupAssignment = assignor.assign(new AssignmentSpec( + Collections.unmodifiableMap(memberSpecs), + Collections.unmodifiableMap(topics) + )); + + // Compute delta from previous to new target assignment and create the + // relevant records. + List records = new ArrayList<>(); + Map newTargetAssignment = new HashMap<>(); + + memberSpecs.keySet().forEach(memberId -> { + Assignment oldMemberAssignment = targetAssignment.get(memberId); + Assignment newMemberAssignment = newMemberAssignment(newGroupAssignment, memberId); + + newTargetAssignment.put(memberId, newMemberAssignment); + + if (oldMemberAssignment == null) { + // If the member had no assignment, we always create a record for it. + records.add(newTargetAssignmentRecord( + groupId, + memberId, + newMemberAssignment.partitions() + )); + } else { + // If the member had an assignment, we only create a record if the + // new assignment is different. + if (!newMemberAssignment.equals(oldMemberAssignment)) { + records.add(newTargetAssignmentRecord( + groupId, + memberId, + newMemberAssignment.partitions() + )); + } + } + }); + + // Bump the target assignment epoch. + records.add(newTargetAssignmentEpochRecord(groupId, groupEpoch)); + + return new TargetAssignmentResult(records, newTargetAssignment); + } + + private Assignment newMemberAssignment( + GroupAssignment newGroupAssignment, + String memberId + ) { + MemberAssignment newMemberAssignment = newGroupAssignment.members().get(memberId); + if (newMemberAssignment != null) { + return new Assignment(newMemberAssignment.targetPartitions()); + } else { + return Assignment.EMPTY; + } + } + + public static AssignmentMemberSpec createAssignmentMemberSpec( + ConsumerGroupMember member, + Assignment targetAssignment, + Map subscriptionMetadata + ) { + Set subscribedTopics = new HashSet<>(); + member.subscribedTopicNames().forEach(topicName -> { + TopicMetadata topicMetadata = subscriptionMetadata.get(topicName); + if (topicMetadata != null) { + subscribedTopics.add(topicMetadata.id()); + } + }); + + return new AssignmentMemberSpec( + Optional.ofNullable(member.instanceId()), + Optional.ofNullable(member.rackId()), + subscribedTopics, + targetAssignment.partitions() + ); + } +} diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java new file mode 100644 index 00000000000..a498af6ab8a --- /dev/null +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java @@ -0,0 +1,697 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentTopicMetadata; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkTopicAssignment; +import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentEpochRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentRecord; +import static org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder.createAssignmentMemberSpec; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class TargetAssignmentBuilderTest { + + public static class TargetAssignmentBuilderTestContext { + private final String groupId; + private final int groupEpoch; + private final PartitionAssignor assignor = mock(PartitionAssignor.class); + private final Map members = new HashMap<>(); + private final Map subscriptionMetadata = new HashMap<>(); + private final Map updatedMembers = new HashMap<>(); + private final Map targetAssignment = new HashMap<>(); + private final Map memberAssignments = new HashMap<>(); + + public TargetAssignmentBuilderTestContext( + String groupId, + int groupEpoch + ) { + this.groupId = groupId; + this.groupEpoch = groupEpoch; + } + + public void addGroupMember( + String memberId, + List subscriptions, + Map> targetPartitions + ) { + members.put(memberId, new ConsumerGroupMember.Builder(memberId) + .setSubscribedTopicNames(subscriptions) + .build()); + + targetAssignment.put(memberId, new Assignment( + (byte) 0, + targetPartitions, + VersionedMetadata.EMPTY + )); + } + + public Uuid addTopicMetadata( + String topicName, + int numPartitions + ) { + Uuid topicId = Uuid.randomUuid(); + subscriptionMetadata.put(topicName, new TopicMetadata( + topicId, + topicName, + numPartitions + )); + return topicId; + } + + public void updateMemberSubscription( + String memberId, + List subscriptions + ) { + updateMemberSubscription( + memberId, + subscriptions, + Optional.empty(), + Optional.empty() + ); + } + + public void updateMemberSubscription( + String memberId, + List subscriptions, + Optional instanceId, + Optional rackId + ) { + ConsumerGroupMember existingMember = members.get(memberId); + ConsumerGroupMember.Builder builder; + if (existingMember != null) { + builder = new ConsumerGroupMember.Builder(existingMember); + } else { + builder = new ConsumerGroupMember.Builder(memberId); + } + updatedMembers.put(memberId, builder + .setSubscribedTopicNames(subscriptions) + .maybeUpdateInstanceId(instanceId) + .maybeUpdateRackId(rackId) + .build()); + } + + public void removeMemberSubscription( + String memberId + ) { + this.updatedMembers.put(memberId, null); + } + + public void prepareMemberAssignment( + String memberId, + Map> assignment + ) { + memberAssignments.put(memberId, new MemberAssignment(assignment)); + } + + public TargetAssignmentBuilder.TargetAssignmentResult build() { + // Prepare expected member specs. + Map memberSpecs = new HashMap<>(); + + // All the existing members are prepared. + members.forEach((memberId, member) -> { + memberSpecs.put(memberId, createAssignmentMemberSpec( + member, + targetAssignment.getOrDefault(memberId, Assignment.EMPTY), + subscriptionMetadata + )); + }); + + // All the updated are added and all the deleted + // members are removed. + updatedMembers.forEach((memberId, updatedMemberOrNull) -> { + if (updatedMemberOrNull == null) { + memberSpecs.remove(memberId); + } else { + memberSpecs.put(memberId, createAssignmentMemberSpec( + updatedMemberOrNull, + targetAssignment.getOrDefault(memberId, Assignment.EMPTY), + subscriptionMetadata + )); + } + }); + + // Prepare the expected topic metadata. + Map topicMetadata = new HashMap<>(); + subscriptionMetadata.forEach((topicName, metadata) -> { + topicMetadata.put(metadata.id(), new AssignmentTopicMetadata(metadata.numPartitions())); + }); + + // Prepare the expected assignment spec. + AssignmentSpec assignmentSpec = new AssignmentSpec( + memberSpecs, + topicMetadata + ); + + // We use `any` here to always return an assignment but use `verify` later on + // to ensure that the input was correct. + when(assignor.assign(any())).thenReturn(new GroupAssignment(memberAssignments)); + + // Create and populate the assignment builder. + TargetAssignmentBuilder builder = new TargetAssignmentBuilder(groupId, groupEpoch, assignor) + .withMembers(members) + .withSubscriptionMetadata(subscriptionMetadata) + .withTargetAssignment(targetAssignment); + + // Add the updated members or delete the deleted members. + updatedMembers.forEach((memberId, updatedMemberOrNull) -> { + if (updatedMemberOrNull != null) { + builder.addOrUpdateMember(memberId, updatedMemberOrNull); + } else { + builder.removeMember(memberId); + } + }); + + // Execute the builder. + TargetAssignmentBuilder.TargetAssignmentResult result = builder.build(); + + // Verify that the assignor was called once with the expected + // assignment spec. + verify(assignor, times(1)).assign(assignmentSpec); + + return result; + } + } + + @Test + public void testCreateAssignmentMemberSpec() { + Uuid fooTopicId = Uuid.randomUuid(); + Uuid barTopicId = Uuid.randomUuid(); + + ConsumerGroupMember member = new ConsumerGroupMember.Builder("member-id") + .setSubscribedTopicNames(Arrays.asList("foo", "bar", "zar")) + .setRackId("rackId") + .setInstanceId("instanceId") + .build(); + + Map subscriptionMetadata = new HashMap() { + { + put("foo", new TopicMetadata(fooTopicId, "foo", 5)); + put("bar", new TopicMetadata(barTopicId, "bar", 5)); + } + }; + + Assignment assignment = new Assignment(mkAssignment( + mkTopicAssignment(fooTopicId, 1, 2, 3), + mkTopicAssignment(barTopicId, 1, 2, 3) + )); + + AssignmentMemberSpec assignmentMemberSpec = createAssignmentMemberSpec( + member, + assignment, + subscriptionMetadata + ); + + assertEquals(new AssignmentMemberSpec( + Optional.of("instanceId"), + Optional.of("rackId"), + new HashSet<>(Arrays.asList(fooTopicId, barTopicId)), + assignment.partitions() + ), assignmentMemberSpec); + } + + @Test + public void testEmpty() { + TargetAssignmentBuilderTestContext context = new TargetAssignmentBuilderTestContext( + "my-group", + 20 + ); + + TargetAssignmentBuilder.TargetAssignmentResult result = context.build(); + assertEquals(Collections.singletonList(newTargetAssignmentEpochRecord( + "my-group", + 20 + )), result.records()); + assertEquals(Collections.emptyMap(), result.targetAssignment()); + } + + @Test + public void testAssignmentHasNotChanged() { + TargetAssignmentBuilderTestContext context = new TargetAssignmentBuilderTestContext( + "my-group", + 20 + ); + + Uuid fooTopicId = context.addTopicMetadata("foo", 6); + Uuid barTopicId = context.addTopicMetadata("bar", 6); + + context.addGroupMember("member-1", Arrays.asList("foo", "bar", "zar"), mkAssignment( + mkTopicAssignment(fooTopicId, 1, 2, 3), + mkTopicAssignment(barTopicId, 1, 2, 3) + )); + + context.addGroupMember("member-2", Arrays.asList("foo", "bar", "zar"), mkAssignment( + mkTopicAssignment(fooTopicId, 4, 5, 6), + mkTopicAssignment(barTopicId, 4, 5, 6) + )); + + context.prepareMemberAssignment("member-1", mkAssignment( + mkTopicAssignment(fooTopicId, 1, 2, 3), + mkTopicAssignment(barTopicId, 1, 2, 3) + )); + + context.prepareMemberAssignment("member-2", mkAssignment( + mkTopicAssignment(fooTopicId, 4, 5, 6), + mkTopicAssignment(barTopicId, 4, 5, 6) + )); + + TargetAssignmentBuilder.TargetAssignmentResult result = context.build(); + + assertEquals(Collections.singletonList(newTargetAssignmentEpochRecord( + "my-group", + 20 + )), result.records()); + + Map expectedAssignment = new HashMap<>(); + expectedAssignment.put("member-1", new Assignment(mkAssignment( + mkTopicAssignment(fooTopicId, 1, 2, 3), + mkTopicAssignment(barTopicId, 1, 2, 3) + ))); + expectedAssignment.put("member-2", new Assignment(mkAssignment( + mkTopicAssignment(fooTopicId, 4, 5, 6), + mkTopicAssignment(barTopicId, 4, 5, 6) + ))); + + assertEquals(expectedAssignment, result.targetAssignment()); + } + + @Test + public void testAssignmentSwapped() { + TargetAssignmentBuilderTestContext context = new TargetAssignmentBuilderTestContext( + "my-group", + 20 + ); + + Uuid fooTopicId = context.addTopicMetadata("foo", 6); + Uuid barTopicId = context.addTopicMetadata("bar", 6); + + context.addGroupMember("member-1", Arrays.asList("foo", "bar", "zar"), mkAssignment( + mkTopicAssignment(fooTopicId, 1, 2, 3), + mkTopicAssignment(barTopicId, 1, 2, 3) + )); + + context.addGroupMember("member-2", Arrays.asList("foo", "bar", "zar"), mkAssignment( + mkTopicAssignment(fooTopicId, 4, 5, 6), + mkTopicAssignment(barTopicId, 4, 5, 6) + )); + + context.prepareMemberAssignment("member-2", mkAssignment( + mkTopicAssignment(fooTopicId, 1, 2, 3), + mkTopicAssignment(barTopicId, 1, 2, 3) + )); + + context.prepareMemberAssignment("member-1", mkAssignment( + mkTopicAssignment(fooTopicId, 4, 5, 6), + mkTopicAssignment(barTopicId, 4, 5, 6) + )); + + TargetAssignmentBuilder.TargetAssignmentResult result = context.build(); + + assertEquals(3, result.records().size()); + + assertUnorderedList(Arrays.asList( + newTargetAssignmentRecord("my-group", "member-1", mkAssignment( + mkTopicAssignment(fooTopicId, 4, 5, 6), + mkTopicAssignment(barTopicId, 4, 5, 6) + )), + newTargetAssignmentRecord("my-group", "member-2", mkAssignment( + mkTopicAssignment(fooTopicId, 1, 2, 3), + mkTopicAssignment(barTopicId, 1, 2, 3) + )) + ), result.records().subList(0, 2)); + + assertEquals(newTargetAssignmentEpochRecord( + "my-group", + 20 + ), result.records().get(2)); + + Map expectedAssignment = new HashMap<>(); + expectedAssignment.put("member-2", new Assignment(mkAssignment( + mkTopicAssignment(fooTopicId, 1, 2, 3), + mkTopicAssignment(barTopicId, 1, 2, 3) + ))); + expectedAssignment.put("member-1", new Assignment(mkAssignment( + mkTopicAssignment(fooTopicId, 4, 5, 6), + mkTopicAssignment(barTopicId, 4, 5, 6) + ))); + + assertEquals(expectedAssignment, result.targetAssignment()); + } + + @Test + public void testNewMember() { + TargetAssignmentBuilderTestContext context = new TargetAssignmentBuilderTestContext( + "my-group", + 20 + ); + + Uuid fooTopicId = context.addTopicMetadata("foo", 6); + Uuid barTopicId = context.addTopicMetadata("bar", 6); + + context.addGroupMember("member-1", Arrays.asList("foo", "bar", "zar"), mkAssignment( + mkTopicAssignment(fooTopicId, 1, 2, 3), + mkTopicAssignment(barTopicId, 1, 2, 3) + )); + + context.addGroupMember("member-2", Arrays.asList("foo", "bar", "zar"), mkAssignment( + mkTopicAssignment(fooTopicId, 4, 5, 6), + mkTopicAssignment(barTopicId, 4, 5, 6) + )); + + context.updateMemberSubscription("member-3", Arrays.asList("foo", "bar", "zar")); + + context.prepareMemberAssignment("member-1", mkAssignment( + mkTopicAssignment(fooTopicId, 1, 2), + mkTopicAssignment(barTopicId, 1, 2) + )); + + context.prepareMemberAssignment("member-2", mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4), + mkTopicAssignment(barTopicId, 3, 4) + )); + + context.prepareMemberAssignment("member-3", mkAssignment( + mkTopicAssignment(fooTopicId, 5, 6), + mkTopicAssignment(barTopicId, 5, 6) + )); + + TargetAssignmentBuilder.TargetAssignmentResult result = context.build(); + + assertEquals(4, result.records().size()); + + assertUnorderedList(Arrays.asList( + newTargetAssignmentRecord("my-group", "member-1", mkAssignment( + mkTopicAssignment(fooTopicId, 1, 2), + mkTopicAssignment(barTopicId, 1, 2) + )), + newTargetAssignmentRecord("my-group", "member-2", mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4), + mkTopicAssignment(barTopicId, 3, 4) + )), + newTargetAssignmentRecord("my-group", "member-3", mkAssignment( + mkTopicAssignment(fooTopicId, 5, 6), + mkTopicAssignment(barTopicId, 5, 6) + )) + ), result.records().subList(0, 3)); + + assertEquals(newTargetAssignmentEpochRecord( + "my-group", + 20 + ), result.records().get(3)); + + Map expectedAssignment = new HashMap<>(); + expectedAssignment.put("member-1", new Assignment(mkAssignment( + mkTopicAssignment(fooTopicId, 1, 2), + mkTopicAssignment(barTopicId, 1, 2) + ))); + expectedAssignment.put("member-2", new Assignment(mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4), + mkTopicAssignment(barTopicId, 3, 4) + ))); + expectedAssignment.put("member-3", new Assignment(mkAssignment( + mkTopicAssignment(fooTopicId, 5, 6), + mkTopicAssignment(barTopicId, 5, 6) + ))); + + assertEquals(expectedAssignment, result.targetAssignment()); + } + + @Test + public void testUpdateMember() { + TargetAssignmentBuilderTestContext context = new TargetAssignmentBuilderTestContext( + "my-group", + 20 + ); + + Uuid fooTopicId = context.addTopicMetadata("foo", 6); + Uuid barTopicId = context.addTopicMetadata("bar", 6); + + context.addGroupMember("member-1", Arrays.asList("foo", "bar", "zar"), mkAssignment( + mkTopicAssignment(fooTopicId, 1, 2, 3), + mkTopicAssignment(barTopicId, 1, 2) + )); + + context.addGroupMember("member-2", Arrays.asList("foo", "bar", "zar"), mkAssignment( + mkTopicAssignment(fooTopicId, 4, 5, 6), + mkTopicAssignment(barTopicId, 3, 4) + )); + + context.addGroupMember("member-3", Arrays.asList("bar", "zar"), mkAssignment( + mkTopicAssignment(barTopicId, 5, 6) + )); + + context.updateMemberSubscription( + "member-3", + Arrays.asList("foo", "bar", "zar"), + Optional.of("instance-id-3"), + Optional.of("rack-0") + ); + + context.prepareMemberAssignment("member-1", mkAssignment( + mkTopicAssignment(fooTopicId, 1, 2), + mkTopicAssignment(barTopicId, 1, 2) + )); + + context.prepareMemberAssignment("member-2", mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4), + mkTopicAssignment(barTopicId, 3, 4) + )); + + context.prepareMemberAssignment("member-3", mkAssignment( + mkTopicAssignment(fooTopicId, 5, 6), + mkTopicAssignment(barTopicId, 5, 6) + )); + + TargetAssignmentBuilder.TargetAssignmentResult result = context.build(); + + assertEquals(4, result.records().size()); + + assertUnorderedList(Arrays.asList( + newTargetAssignmentRecord("my-group", "member-1", mkAssignment( + mkTopicAssignment(fooTopicId, 1, 2), + mkTopicAssignment(barTopicId, 1, 2) + )), + newTargetAssignmentRecord("my-group", "member-2", mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4), + mkTopicAssignment(barTopicId, 3, 4) + )), + newTargetAssignmentRecord("my-group", "member-3", mkAssignment( + mkTopicAssignment(fooTopicId, 5, 6), + mkTopicAssignment(barTopicId, 5, 6) + )) + ), result.records().subList(0, 3)); + + assertEquals(newTargetAssignmentEpochRecord( + "my-group", + 20 + ), result.records().get(3)); + + Map expectedAssignment = new HashMap<>(); + expectedAssignment.put("member-1", new Assignment(mkAssignment( + mkTopicAssignment(fooTopicId, 1, 2), + mkTopicAssignment(barTopicId, 1, 2) + ))); + expectedAssignment.put("member-2", new Assignment(mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4), + mkTopicAssignment(barTopicId, 3, 4) + ))); + expectedAssignment.put("member-3", new Assignment(mkAssignment( + mkTopicAssignment(fooTopicId, 5, 6), + mkTopicAssignment(barTopicId, 5, 6) + ))); + + assertEquals(expectedAssignment, result.targetAssignment()); + } + + @Test + public void testPartialAssignmentUpdate() { + TargetAssignmentBuilderTestContext context = new TargetAssignmentBuilderTestContext( + "my-group", + 20 + ); + + Uuid fooTopicId = context.addTopicMetadata("foo", 6); + Uuid barTopicId = context.addTopicMetadata("bar", 6); + + context.addGroupMember("member-1", Arrays.asList("foo", "bar", "zar"), mkAssignment( + mkTopicAssignment(fooTopicId, 1, 2), + mkTopicAssignment(barTopicId, 1, 2) + )); + + context.addGroupMember("member-2", Arrays.asList("foo", "bar", "zar"), mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4), + mkTopicAssignment(barTopicId, 3, 4) + )); + + context.addGroupMember("member-3", Arrays.asList("bar", "zar"), mkAssignment( + mkTopicAssignment(fooTopicId, 5, 6), + mkTopicAssignment(barTopicId, 5, 6) + )); + + context.prepareMemberAssignment("member-1", mkAssignment( + mkTopicAssignment(fooTopicId, 1, 2), + mkTopicAssignment(barTopicId, 1, 2) + )); + + context.prepareMemberAssignment("member-2", mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5), + mkTopicAssignment(barTopicId, 3, 4, 5) + )); + + context.prepareMemberAssignment("member-3", mkAssignment( + mkTopicAssignment(fooTopicId, 6), + mkTopicAssignment(barTopicId, 6) + )); + + TargetAssignmentBuilder.TargetAssignmentResult result = context.build(); + + assertEquals(3, result.records().size()); + + // Member 1 has no record because its assignment did not change. + assertUnorderedList(Arrays.asList( + newTargetAssignmentRecord("my-group", "member-2", mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5), + mkTopicAssignment(barTopicId, 3, 4, 5) + )), + newTargetAssignmentRecord("my-group", "member-3", mkAssignment( + mkTopicAssignment(fooTopicId, 6), + mkTopicAssignment(barTopicId, 6) + )) + ), result.records().subList(0, 2)); + + assertEquals(newTargetAssignmentEpochRecord( + "my-group", + 20 + ), result.records().get(2)); + + Map expectedAssignment = new HashMap<>(); + expectedAssignment.put("member-1", new Assignment(mkAssignment( + mkTopicAssignment(fooTopicId, 1, 2), + mkTopicAssignment(barTopicId, 1, 2) + ))); + expectedAssignment.put("member-2", new Assignment(mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5), + mkTopicAssignment(barTopicId, 3, 4, 5) + ))); + expectedAssignment.put("member-3", new Assignment(mkAssignment( + mkTopicAssignment(fooTopicId, 6), + mkTopicAssignment(barTopicId, 6) + ))); + + assertEquals(expectedAssignment, result.targetAssignment()); + } + + @Test + public void testDeleteMember() { + TargetAssignmentBuilderTestContext context = new TargetAssignmentBuilderTestContext( + "my-group", + 20 + ); + + Uuid fooTopicId = context.addTopicMetadata("foo", 6); + Uuid barTopicId = context.addTopicMetadata("bar", 6); + + context.addGroupMember("member-1", Arrays.asList("foo", "bar", "zar"), mkAssignment( + mkTopicAssignment(fooTopicId, 1, 2), + mkTopicAssignment(barTopicId, 1, 2) + )); + + context.addGroupMember("member-2", Arrays.asList("foo", "bar", "zar"), mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4), + mkTopicAssignment(barTopicId, 3, 4) + )); + + context.addGroupMember("member-3", Arrays.asList("foo", "bar", "zar"), mkAssignment( + mkTopicAssignment(fooTopicId, 5, 6), + mkTopicAssignment(barTopicId, 5, 6) + )); + + context.removeMemberSubscription("member-3"); + + context.prepareMemberAssignment("member-1", mkAssignment( + mkTopicAssignment(fooTopicId, 1, 2, 3), + mkTopicAssignment(barTopicId, 1, 2, 3) + )); + + context.prepareMemberAssignment("member-2", mkAssignment( + mkTopicAssignment(fooTopicId, 4, 5, 6), + mkTopicAssignment(barTopicId, 4, 5, 6) + )); + + TargetAssignmentBuilder.TargetAssignmentResult result = context.build(); + + assertEquals(3, result.records().size()); + + assertUnorderedList(Arrays.asList( + newTargetAssignmentRecord("my-group", "member-1", mkAssignment( + mkTopicAssignment(fooTopicId, 1, 2, 3), + mkTopicAssignment(barTopicId, 1, 2, 3) + )), + newTargetAssignmentRecord("my-group", "member-2", mkAssignment( + mkTopicAssignment(fooTopicId, 4, 5, 6), + mkTopicAssignment(barTopicId, 4, 5, 6) + )) + ), result.records().subList(0, 2)); + + assertEquals(newTargetAssignmentEpochRecord( + "my-group", + 20 + ), result.records().get(2)); + + Map expectedAssignment = new HashMap<>(); + expectedAssignment.put("member-1", new Assignment(mkAssignment( + mkTopicAssignment(fooTopicId, 1, 2, 3), + mkTopicAssignment(barTopicId, 1, 2, 3) + ))); + expectedAssignment.put("member-2", new Assignment(mkAssignment( + mkTopicAssignment(fooTopicId, 4, 5, 6), + mkTopicAssignment(barTopicId, 4, 5, 6) + ))); + + assertEquals(expectedAssignment, result.targetAssignment()); + } + + public static void assertUnorderedList( + List expected, + List actual + ) { + assertEquals(expected.size(), actual.size()); + assertEquals(new HashSet<>(expected), new HashSet<>(actual)); + } +}