From 33c313d8531a2d2265b2457658f3bdad6f826b19 Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Thu, 19 Aug 2021 08:03:22 +0800 Subject: [PATCH] KAFKA-13081: detect and handle doubly assigned partitions in cooperative assignor (#11068) This is the fix 1 and fix 2 in #10985 for v2.6, including the tests. Uses the generation to invalidate previous assignments that claim partitions but no longer own them, and implements an additional safety net to handle any case in which doubly-claimed partitions slip in to the input anyway Reviewers: Anna Sophie Blee-Goldman --- checkstyle/suppressions.xml | 2 +- .../consumer/CooperativeStickyAssignor.java | 44 +++++++++- .../internals/AbstractStickyAssignor.java | 75 +++++++++++++--- .../CooperativeStickyAssignorTest.java | 88 +++++++++++++++++-- .../clients/consumer/StickyAssignorTest.java | 48 ++++++++-- .../internals/AbstractStickyAssignorTest.java | 65 ++++++++++++++ 6 files changed, 296 insertions(+), 26 deletions(-) diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 39c6d4bedab..aaf71dc5478 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -33,7 +33,7 @@ files="AbstractResponse.java"/> + files="(KerberosLogin|RequestResponseTest|ConnectMetricsRegistry|KafkaConsumer|AbstractStickyAssignor).java"/> diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java index c7c0679575a..b2af7a008eb 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.clients.consumer; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; @@ -25,6 +26,10 @@ import java.util.Optional; import java.util.Set; import org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.protocol.types.Type; /** * A cooperative version of the {@link AbstractStickyAssignor AbstractStickyAssignor}. This follows the same (sticky) @@ -43,6 +48,15 @@ import org.apache.kafka.common.TopicPartition; */ public class CooperativeStickyAssignor extends AbstractStickyAssignor { + // these schemas are used for preserving useful metadata for the assignment, such as the last stable generation + private static final String GENERATION_KEY_NAME = "generation"; + private static final Schema COOPERATIVE_STICKY_ASSIGNOR_USER_DATA_V0 = new Schema( + new Field(GENERATION_KEY_NAME, Type.INT32)); + + private int generation = DEFAULT_GENERATION; // consumer group generation + + + @Override public String name() { return "cooperative-sticky"; @@ -53,9 +67,37 @@ public class CooperativeStickyAssignor extends AbstractStickyAssignor { return Arrays.asList(RebalanceProtocol.COOPERATIVE, RebalanceProtocol.EAGER); } + @Override + public void onAssignment(Assignment assignment, ConsumerGroupMetadata metadata) { + this.generation = metadata.generationId(); + } + + @Override + public ByteBuffer subscriptionUserData(Set topics) { + Struct struct = new Struct(COOPERATIVE_STICKY_ASSIGNOR_USER_DATA_V0); + + struct.set(GENERATION_KEY_NAME, generation); + ByteBuffer buffer = ByteBuffer.allocate(COOPERATIVE_STICKY_ASSIGNOR_USER_DATA_V0.sizeOf(struct)); + COOPERATIVE_STICKY_ASSIGNOR_USER_DATA_V0.write(buffer, struct); + buffer.flip(); + return buffer; + } + @Override protected MemberData memberData(Subscription subscription) { - return new MemberData(subscription.ownedPartitions(), Optional.empty()); + ByteBuffer buffer = subscription.userData(); + Optional encodedGeneration; + if (buffer == null) { + encodedGeneration = Optional.empty(); + } else { + try { + Struct struct = COOPERATIVE_STICKY_ASSIGNOR_USER_DATA_V0.read(buffer); + encodedGeneration = Optional.of(struct.getInt(GENERATION_KEY_NAME)); + } catch (Exception e) { + encodedGeneration = Optional.of(DEFAULT_GENERATION); + } + } + return new MemberData(subscription.ownedPartitions(), encodedGeneration); } @Override diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java index a1af6d959a3..47b78615607 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java @@ -74,14 +74,16 @@ public abstract class AbstractStickyAssignor extends AbstractPartitionAssignor { Map subscriptions) { partitionMovements = new PartitionMovements(); Map> consumerToOwnedPartitions = new HashMap<>(); - if (allSubscriptionsEqual(partitionsPerTopic.keySet(), subscriptions, consumerToOwnedPartitions)) { + Set partitionsWithMultiplePreviousOwners = new HashSet<>(); + if (allSubscriptionsEqual(partitionsPerTopic.keySet(), subscriptions, consumerToOwnedPartitions, partitionsWithMultiplePreviousOwners)) { log.debug("Detected that all consumers were subscribed to same set of topics, invoking the " + "optimized assignment algorithm"); partitionsTransferringOwnership = new HashMap<>(); - return constrainedAssign(partitionsPerTopic, consumerToOwnedPartitions); + return constrainedAssign(partitionsPerTopic, consumerToOwnedPartitions, partitionsWithMultiplePreviousOwners); } else { log.debug("Detected that all not consumers were subscribed to same set of topics, falling back to the " + "general case assignment algorithm"); + // we must set this to null for the general case so the cooperative assignor knows to compute it from scratch partitionsTransferringOwnership = null; return generalAssign(partitionsPerTopic, subscriptions); } @@ -89,17 +91,22 @@ public abstract class AbstractStickyAssignor extends AbstractPartitionAssignor { /** * Returns true iff all consumers have an identical subscription. Also fills out the passed in - * {@code consumerToOwnedPartitions} with each consumer's previously owned and still-subscribed partitions + * {@code consumerToOwnedPartitions} with each consumer's previously owned and still-subscribed partitions, + * and the {@code partitionsWithMultiplePreviousOwners} with any partitions claimed by multiple previous owners */ private boolean allSubscriptionsEqual(Set allTopics, Map subscriptions, - Map> consumerToOwnedPartitions) { - Set membersWithOldGeneration = new HashSet<>(); + Map> consumerToOwnedPartitions, + Set partitionsWithMultiplePreviousOwners) { Set membersOfCurrentHighestGeneration = new HashSet<>(); int maxGeneration = DEFAULT_GENERATION; Set subscribedTopics = new HashSet<>(); + // keep track of all previously owned partitions so we can invalidate them if invalid input is + // detected, eg two consumers somehow claiming the same partition in the same/current generation + Map allPreviousPartitionsToOwner = new HashMap<>(); + for (Map.Entry subscriptionEntry : subscriptions.entrySet()) { String consumer = subscriptionEntry.getKey(); Subscription subscription = subscriptionEntry.getValue(); @@ -124,7 +131,12 @@ public abstract class AbstractStickyAssignor extends AbstractPartitionAssignor { // If the current member's generation is higher, all the previously owned partitions are invalid if (memberData.generation.isPresent() && memberData.generation.get() > maxGeneration) { - membersWithOldGeneration.addAll(membersOfCurrentHighestGeneration); + allPreviousPartitionsToOwner.clear(); + partitionsWithMultiplePreviousOwners.clear(); + for (String droppedOutConsumer : membersOfCurrentHighestGeneration) { + consumerToOwnedPartitions.get(droppedOutConsumer).clear(); + } + membersOfCurrentHighestGeneration.clear(); maxGeneration = memberData.generation.get(); } @@ -133,20 +145,45 @@ public abstract class AbstractStickyAssignor extends AbstractPartitionAssignor { for (final TopicPartition tp : memberData.partitions) { // filter out any topics that no longer exist or aren't part of the current subscription if (allTopics.contains(tp.topic())) { - ownedPartitions.add(tp); + if (!allPreviousPartitionsToOwner.containsKey(tp)) { + allPreviousPartitionsToOwner.put(tp, consumer); + ownedPartitions.add(tp); + } else { + String otherConsumer = allPreviousPartitionsToOwner.get(tp); + log.error("Found multiple consumers {} and {} claiming the same TopicPartition {} in the " + + "same generation {}, this will be invalidated and removed from their previous assignment.", + consumer, otherConsumer, tp, maxGeneration); + consumerToOwnedPartitions.get(otherConsumer).remove(tp); + partitionsWithMultiplePreviousOwners.add(tp); + } } } } } - for (String consumer : membersWithOldGeneration) { - consumerToOwnedPartitions.get(consumer).clear(); - } return true; } + /** + * This constrainedAssign optimizes the assignment algorithm when all consumers were subscribed to same set of topics. + * The method includes the following steps: + * + * 1. Reassign as many previously owned partitions as possible, up to the maxQuota + * 2. Fill remaining members up to minQuota + * 3. If we ran out of unassigned partitions before filling all consumers, we need to start stealing partitions + * from the over-full consumers at max capacity + * 4. Otherwise we may have run out of unfilled consumers before assigning all partitions, in which case we + * should just distribute one partition each to all consumers at min capacity + * + * @param partitionsPerTopic The number of partitions for each subscribed topic + * @param consumerToOwnedPartitions Each consumer's previously owned and still-subscribed partitions + * @param partitionsWithMultiplePreviousOwners The partitions being claimed in the previous assignment of multiple consumers + * + * @return Map from each member to the list of partitions assigned to them. + */ private Map> constrainedAssign(Map partitionsPerTopic, - Map> consumerToOwnedPartitions) { + Map> consumerToOwnedPartitions, + Set partitionsWithMultiplePreviousOwners) { SortedSet unassignedPartitions = getTopicPartitions(partitionsPerTopic); Set allRevokedPartitions = new HashSet<>(); @@ -169,6 +206,16 @@ public abstract class AbstractStickyAssignor extends AbstractPartitionAssignor { List ownedPartitions = consumerEntry.getValue(); List consumerAssignment = assignment.get(consumer); + + for (TopicPartition doublyClaimedPartition : partitionsWithMultiplePreviousOwners) { + if (ownedPartitions.contains(doublyClaimedPartition)) { + log.error("Found partition {} still claimed as owned by consumer {}, despite being claimed by multiple " + + "consumers already in the same generation. Removing it from the ownedPartitions", + doublyClaimedPartition, consumer); + ownedPartitions.remove(doublyClaimedPartition); + } + } + int i = 0; // assign the first N partitions up to the max quota, and mark the remaining as being revoked for (TopicPartition tp : ownedPartitions) { @@ -207,7 +254,7 @@ public abstract class AbstractStickyAssignor extends AbstractPartitionAssignor { consumerAssignment.add(tp); unassignedPartitionsIter.remove(); // We already assigned all possible ownedPartitions, so we know this must be newly to this consumer - if (allRevokedPartitions.contains(tp)) + if (allRevokedPartitions.contains(tp) || partitionsWithMultiplePreviousOwners.contains(tp)) partitionsTransferringOwnership.put(tp, consumer); } else { break; @@ -250,10 +297,12 @@ public abstract class AbstractStickyAssignor extends AbstractPartitionAssignor { // We can skip the bookkeeping of unassignedPartitions and maxCapacityMembers here since we are at the end assignment.get(underCapacityConsumer).add(unassignedPartition); - if (allRevokedPartitions.contains(unassignedPartition)) + if (allRevokedPartitions.contains(unassignedPartition) || partitionsWithMultiplePreviousOwners.contains(unassignedPartition)) partitionsTransferringOwnership.put(unassignedPartition, underCapacityConsumer); } + log.info("Final assignment of partitions to consumers: \n{}", assignment); + return assignment; } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java index aed8c095370..9187014af0c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java @@ -16,16 +16,24 @@ */ package org.apache.kafka.clients.consumer; -import static org.junit.Assert.assertTrue; - -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription; import org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor; import org.apache.kafka.clients.consumer.internals.AbstractStickyAssignorTest; import org.apache.kafka.common.TopicPartition; +import org.junit.Test; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static java.util.Collections.emptyList; +import static org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.DEFAULT_GENERATION; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; public class CooperativeStickyAssignorTest extends AbstractStickyAssignorTest { @@ -39,6 +47,74 @@ public class CooperativeStickyAssignorTest extends AbstractStickyAssignorTest { return new Subscription(topics, assignor.subscriptionUserData(new HashSet<>(topics)), partitions); } + @Override + public Subscription buildSubscriptionWithGeneration(List topics, List partitions, int generation) { + assignor.onAssignment(null, new ConsumerGroupMetadata("dummy-group-id", generation, "dummy-member-id", Optional.empty())); + return new Subscription(topics, assignor.subscriptionUserData(new HashSet<>(topics)), partitions); + } + + @Test + public void testEncodeAndDecodeGeneration() { + Subscription subscription = new Subscription(topics(topic), assignor.subscriptionUserData(new HashSet<>(topics(topic)))); + + Optional encodedGeneration = ((CooperativeStickyAssignor) assignor).memberData(subscription).generation; + assertTrue(encodedGeneration.isPresent()); + assertEquals(encodedGeneration.get().intValue(), DEFAULT_GENERATION); + + int generation = 10; + assignor.onAssignment(null, new ConsumerGroupMetadata("dummy-group-id", generation, "dummy-member-id", Optional.empty())); + + subscription = new Subscription(topics(topic), assignor.subscriptionUserData(new HashSet<>(topics(topic)))); + encodedGeneration = ((CooperativeStickyAssignor) assignor).memberData(subscription).generation; + + assertTrue(encodedGeneration.isPresent()); + assertEquals(encodedGeneration.get().intValue(), generation); + } + + @Test + public void testDecodeGeneration() { + Subscription subscription = new Subscription(topics(topic)); + assertFalse(((CooperativeStickyAssignor) assignor).memberData(subscription).generation.isPresent()); + } + + @Test + public void testAllConsumersHaveOwnedPartitionInvalidatedWhenClaimedByMultipleConsumersInSameGenerationWithEqualPartitionsPerConsumer() { + Map partitionsPerTopic = new HashMap<>(); + partitionsPerTopic.put(topic, 3); + + subscriptions.put(consumer1, buildSubscription(topics(topic), partitions(tp(topic, 0), tp(topic, 1)))); + subscriptions.put(consumer2, buildSubscription(topics(topic), partitions(tp(topic, 0), tp(topic, 2)))); + subscriptions.put(consumer3, buildSubscription(topics(topic), emptyList())); + + Map> assignment = assignor.assign(partitionsPerTopic, subscriptions); + assertEquals(partitions(tp(topic, 1)), assignment.get(consumer1)); + assertEquals(partitions(tp(topic, 2)), assignment.get(consumer2)); + // In the cooperative assignor, topic-0 has to be considered "owned" and so it cant be assigned until both have "revoked" it + assertTrue(assignment.get(consumer3).isEmpty()); + + verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic); + assertTrue(isFullyBalanced(assignment)); + } + + @Test + public void testAllConsumersHaveOwnedPartitionInvalidatedWhenClaimedByMultipleConsumersInSameGenerationWithUnequalPartitionsPerConsumer() { + Map partitionsPerTopic = new HashMap<>(); + partitionsPerTopic.put(topic, 4); + + subscriptions.put(consumer1, buildSubscription(topics(topic), partitions(tp(topic, 0), tp(topic, 1)))); + subscriptions.put(consumer2, buildSubscription(topics(topic), partitions(tp(topic, 0), tp(topic, 2)))); + subscriptions.put(consumer3, buildSubscription(topics(topic), emptyList())); + + Map> assignment = assignor.assign(partitionsPerTopic, subscriptions); + assertEquals(partitions(tp(topic, 1)), assignment.get(consumer1)); + assertEquals(partitions(tp(topic, 2), tp(topic, 3)), assignment.get(consumer2)); + // In the cooperative assignor, topic-0 has to be considered "owned" and so it cant be assigned until both have "revoked" it + assertTrue(assignment.get(consumer3).isEmpty()); + + verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic); + assertTrue(isFullyBalanced(assignment)); + } + /** * The cooperative assignor must do some additional work and verification of some assignments relative to the eager * assignor, since it may or may not need to trigger a second follow-up rebalance. diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java index fb899449037..3e22f35dc34 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.clients.consumer; +import static java.util.Collections.emptyList; import static org.apache.kafka.clients.consumer.StickyAssignor.serializeTopicPartitionAssignment; import static org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.DEFAULT_GENERATION; import static org.junit.Assert.assertEquals; @@ -51,6 +52,48 @@ public class StickyAssignorTest extends AbstractStickyAssignorTest { serializeTopicPartitionAssignment(new MemberData(partitions, Optional.of(DEFAULT_GENERATION)))); } + @Override + public Subscription buildSubscriptionWithGeneration(List topics, List partitions, int generation) { + return new Subscription(topics, + serializeTopicPartitionAssignment(new MemberData(partitions, Optional.of(generation)))); + } + + @Test + public void testAllConsumersHaveOwnedPartitionInvalidatedWhenClaimedByMultipleConsumersInSameGenerationWithEqualPartitionsPerConsumer() { + Map partitionsPerTopic = new HashMap<>(); + partitionsPerTopic.put(topic, 3); + + subscriptions.put(consumer1, buildSubscription(topics(topic), partitions(tp(topic, 0), tp(topic, 1)))); + subscriptions.put(consumer2, buildSubscription(topics(topic), partitions(tp(topic, 0), tp(topic, 2)))); + subscriptions.put(consumer3, buildSubscription(topics(topic), emptyList())); + + Map> assignment = assignor.assign(partitionsPerTopic, subscriptions); + assertEquals(partitions(tp(topic, 1)), assignment.get(consumer1)); + assertEquals(partitions(tp(topic, 2)), assignment.get(consumer2)); + assertEquals(partitions(tp(topic, 0)), assignment.get(consumer3)); + + verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic); + assertTrue(isFullyBalanced(assignment)); + } + + @Test + public void testAllConsumersHaveOwnedPartitionInvalidatedWhenClaimedByMultipleConsumersInSameGenerationWithUnequalPartitionsPerConsumer() { + Map partitionsPerTopic = new HashMap<>(); + partitionsPerTopic.put(topic, 4); + + subscriptions.put(consumer1, buildSubscription(topics(topic), partitions(tp(topic, 0), tp(topic, 1)))); + subscriptions.put(consumer2, buildSubscription(topics(topic), partitions(tp(topic, 0), tp(topic, 2)))); + subscriptions.put(consumer3, buildSubscription(topics(topic), emptyList())); + + Map> assignment = assignor.assign(partitionsPerTopic, subscriptions); + assertEquals(partitions(tp(topic, 1)), assignment.get(consumer1)); + assertEquals(partitions(tp(topic, 2), tp(topic, 3)), assignment.get(consumer2)); + assertEquals(partitions(tp(topic, 0)), assignment.get(consumer3)); + + verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic); + assertTrue(isFullyBalanced(assignment)); + } + @Test public void testAssignmentWithMultipleGenerations1() { String consumer1 = "consumer1"; @@ -221,11 +264,6 @@ public class StickyAssignorTest extends AbstractStickyAssignorTest { assertTrue(assignor.isSticky()); } - private Subscription buildSubscriptionWithGeneration(List topics, List partitions, int generation) { - return new Subscription(topics, - serializeTopicPartitionAssignment(new MemberData(partitions, Optional.of(generation)))); - } - private static Subscription buildSubscriptionWithOldSchema(List topics, List partitions) { Struct struct = new Struct(StickyAssignor.STICKY_ASSIGNOR_USER_DATA_V0); List topicAssignments = new ArrayList<>(); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java index c7b45233ca1..22a3e35f317 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java @@ -33,6 +33,7 @@ import org.apache.kafka.common.utils.Utils; import org.junit.Before; import org.junit.Test; +import static org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.DEFAULT_GENERATION; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.junit.Assert.assertEquals; @@ -42,13 +43,21 @@ import static org.junit.Assert.assertTrue; public abstract class AbstractStickyAssignorTest { protected AbstractStickyAssignor assignor; protected String consumerId = "consumer"; + protected String consumer1 = "consumer1"; + protected String consumer2 = "consumer2"; + protected String consumer3 = "consumer3"; protected Map subscriptions; protected String topic = "topic"; + protected String topic1 = "topic1"; + protected String topic2 = "topic2"; + protected String topic3 = "topic3"; protected abstract AbstractStickyAssignor createAssignor(); protected abstract Subscription buildSubscription(List topics, List partitions); + protected abstract Subscription buildSubscriptionWithGeneration(List topics, List partitions, int generation); + @Before public void setUp() { assignor = createAssignor(); @@ -656,6 +665,62 @@ public abstract class AbstractStickyAssignorTest { } } + @Test + public void testAllConsumersReachExpectedQuotaAndAreConsideredFilled() { + Map partitionsPerTopic = new HashMap<>(); + partitionsPerTopic.put(topic, 4); + + subscriptions.put(consumer1, buildSubscription(topics(topic), partitions(tp(topic, 0), tp(topic, 1)))); + subscriptions.put(consumer2, buildSubscription(topics(topic), partitions(tp(topic, 2)))); + subscriptions.put(consumer3, buildSubscription(topics(topic), Collections.emptyList())); + + Map> assignment = assignor.assign(partitionsPerTopic, subscriptions); + assertEquals(partitions(tp(topic, 0), tp(topic, 1)), assignment.get(consumer1)); + assertEquals(partitions(tp(topic, 2)), assignment.get(consumer2)); + assertEquals(partitions(tp(topic, 3)), assignment.get(consumer3)); + + verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic); + assertTrue(isFullyBalanced(assignment)); + } + + @Test + public void testOwnedPartitionsAreInvalidatedForConsumerWithStaleGeneration() { + Map partitionsPerTopic = new HashMap<>(); + partitionsPerTopic.put(topic, 3); + partitionsPerTopic.put(topic2, 3); + + int currentGeneration = 10; + + subscriptions.put(consumer1, buildSubscriptionWithGeneration(topics(topic, topic2), partitions(tp(topic, 0), tp(topic, 2), tp(topic2, 1)), currentGeneration)); + subscriptions.put(consumer2, buildSubscriptionWithGeneration(topics(topic, topic2), partitions(tp(topic, 0), tp(topic, 2), tp(topic2, 1)), currentGeneration - 1)); + + Map> assignment = assignor.assign(partitionsPerTopic, subscriptions); + assertEquals(new HashSet<>(partitions(tp(topic, 0), tp(topic, 2), tp(topic2, 1))), new HashSet<>(assignment.get(consumer1))); + assertEquals(new HashSet<>(partitions(tp(topic, 1), tp(topic2, 0), tp(topic2, 2))), new HashSet<>(assignment.get(consumer2))); + + verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic); + assertTrue(isFullyBalanced(assignment)); + } + + @Test + public void testOwnedPartitionsAreInvalidatedForConsumerWithNoGeneration() { + Map partitionsPerTopic = new HashMap<>(); + partitionsPerTopic.put(topic, 3); + partitionsPerTopic.put(topic2, 3); + + int currentGeneration = 10; + + subscriptions.put(consumer1, buildSubscriptionWithGeneration(topics(topic, topic2), partitions(tp(topic, 0), tp(topic, 2), tp(topic2, 1)), currentGeneration)); + subscriptions.put(consumer2, buildSubscriptionWithGeneration(topics(topic, topic2), partitions(tp(topic, 0), tp(topic, 2), tp(topic2, 1)), DEFAULT_GENERATION)); + + Map> assignment = assignor.assign(partitionsPerTopic, subscriptions); + assertEquals(new HashSet<>(partitions(tp(topic, 0), tp(topic, 2), tp(topic2, 1))), new HashSet<>(assignment.get(consumer1))); + assertEquals(new HashSet<>(partitions(tp(topic, 1), tp(topic2, 0), tp(topic2, 2))), new HashSet<>(assignment.get(consumer2))); + + verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic); + assertTrue(isFullyBalanced(assignment)); + } + private String getTopicName(int i, int maxNum) { return getCanonicalName("t", i, maxNum); }