diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 39c6d4bedab..aaf71dc5478 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -33,7 +33,7 @@
files="AbstractResponse.java"/>
+ files="(KerberosLogin|RequestResponseTest|ConnectMetricsRegistry|KafkaConsumer|AbstractStickyAssignor).java"/>
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java
index c7c0679575a..b2af7a008eb 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients.consumer;
+import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
@@ -25,6 +26,10 @@ import java.util.Optional;
import java.util.Set;
import org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.protocol.types.Type;
/**
* A cooperative version of the {@link AbstractStickyAssignor AbstractStickyAssignor}. This follows the same (sticky)
@@ -43,6 +48,15 @@ import org.apache.kafka.common.TopicPartition;
*/
public class CooperativeStickyAssignor extends AbstractStickyAssignor {
+ // these schemas are used for preserving useful metadata for the assignment, such as the last stable generation
+ private static final String GENERATION_KEY_NAME = "generation";
+ private static final Schema COOPERATIVE_STICKY_ASSIGNOR_USER_DATA_V0 = new Schema(
+ new Field(GENERATION_KEY_NAME, Type.INT32));
+
+ private int generation = DEFAULT_GENERATION; // consumer group generation
+
+
+
@Override
public String name() {
return "cooperative-sticky";
@@ -53,9 +67,37 @@ public class CooperativeStickyAssignor extends AbstractStickyAssignor {
return Arrays.asList(RebalanceProtocol.COOPERATIVE, RebalanceProtocol.EAGER);
}
+ @Override
+ public void onAssignment(Assignment assignment, ConsumerGroupMetadata metadata) {
+ this.generation = metadata.generationId();
+ }
+
+ @Override
+ public ByteBuffer subscriptionUserData(Set topics) {
+ Struct struct = new Struct(COOPERATIVE_STICKY_ASSIGNOR_USER_DATA_V0);
+
+ struct.set(GENERATION_KEY_NAME, generation);
+ ByteBuffer buffer = ByteBuffer.allocate(COOPERATIVE_STICKY_ASSIGNOR_USER_DATA_V0.sizeOf(struct));
+ COOPERATIVE_STICKY_ASSIGNOR_USER_DATA_V0.write(buffer, struct);
+ buffer.flip();
+ return buffer;
+ }
+
@Override
protected MemberData memberData(Subscription subscription) {
- return new MemberData(subscription.ownedPartitions(), Optional.empty());
+ ByteBuffer buffer = subscription.userData();
+ Optional encodedGeneration;
+ if (buffer == null) {
+ encodedGeneration = Optional.empty();
+ } else {
+ try {
+ Struct struct = COOPERATIVE_STICKY_ASSIGNOR_USER_DATA_V0.read(buffer);
+ encodedGeneration = Optional.of(struct.getInt(GENERATION_KEY_NAME));
+ } catch (Exception e) {
+ encodedGeneration = Optional.of(DEFAULT_GENERATION);
+ }
+ }
+ return new MemberData(subscription.ownedPartitions(), encodedGeneration);
}
@Override
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
index a1af6d959a3..47b78615607 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
@@ -74,14 +74,16 @@ public abstract class AbstractStickyAssignor extends AbstractPartitionAssignor {
Map subscriptions) {
partitionMovements = new PartitionMovements();
Map> consumerToOwnedPartitions = new HashMap<>();
- if (allSubscriptionsEqual(partitionsPerTopic.keySet(), subscriptions, consumerToOwnedPartitions)) {
+ Set partitionsWithMultiplePreviousOwners = new HashSet<>();
+ if (allSubscriptionsEqual(partitionsPerTopic.keySet(), subscriptions, consumerToOwnedPartitions, partitionsWithMultiplePreviousOwners)) {
log.debug("Detected that all consumers were subscribed to same set of topics, invoking the "
+ "optimized assignment algorithm");
partitionsTransferringOwnership = new HashMap<>();
- return constrainedAssign(partitionsPerTopic, consumerToOwnedPartitions);
+ return constrainedAssign(partitionsPerTopic, consumerToOwnedPartitions, partitionsWithMultiplePreviousOwners);
} else {
log.debug("Detected that all not consumers were subscribed to same set of topics, falling back to the "
+ "general case assignment algorithm");
+ // we must set this to null for the general case so the cooperative assignor knows to compute it from scratch
partitionsTransferringOwnership = null;
return generalAssign(partitionsPerTopic, subscriptions);
}
@@ -89,17 +91,22 @@ public abstract class AbstractStickyAssignor extends AbstractPartitionAssignor {
/**
* Returns true iff all consumers have an identical subscription. Also fills out the passed in
- * {@code consumerToOwnedPartitions} with each consumer's previously owned and still-subscribed partitions
+ * {@code consumerToOwnedPartitions} with each consumer's previously owned and still-subscribed partitions,
+ * and the {@code partitionsWithMultiplePreviousOwners} with any partitions claimed by multiple previous owners
*/
private boolean allSubscriptionsEqual(Set allTopics,
Map subscriptions,
- Map> consumerToOwnedPartitions) {
- Set membersWithOldGeneration = new HashSet<>();
+ Map> consumerToOwnedPartitions,
+ Set partitionsWithMultiplePreviousOwners) {
Set membersOfCurrentHighestGeneration = new HashSet<>();
int maxGeneration = DEFAULT_GENERATION;
Set subscribedTopics = new HashSet<>();
+ // keep track of all previously owned partitions so we can invalidate them if invalid input is
+ // detected, eg two consumers somehow claiming the same partition in the same/current generation
+ Map allPreviousPartitionsToOwner = new HashMap<>();
+
for (Map.Entry subscriptionEntry : subscriptions.entrySet()) {
String consumer = subscriptionEntry.getKey();
Subscription subscription = subscriptionEntry.getValue();
@@ -124,7 +131,12 @@ public abstract class AbstractStickyAssignor extends AbstractPartitionAssignor {
// If the current member's generation is higher, all the previously owned partitions are invalid
if (memberData.generation.isPresent() && memberData.generation.get() > maxGeneration) {
- membersWithOldGeneration.addAll(membersOfCurrentHighestGeneration);
+ allPreviousPartitionsToOwner.clear();
+ partitionsWithMultiplePreviousOwners.clear();
+ for (String droppedOutConsumer : membersOfCurrentHighestGeneration) {
+ consumerToOwnedPartitions.get(droppedOutConsumer).clear();
+ }
+
membersOfCurrentHighestGeneration.clear();
maxGeneration = memberData.generation.get();
}
@@ -133,20 +145,45 @@ public abstract class AbstractStickyAssignor extends AbstractPartitionAssignor {
for (final TopicPartition tp : memberData.partitions) {
// filter out any topics that no longer exist or aren't part of the current subscription
if (allTopics.contains(tp.topic())) {
- ownedPartitions.add(tp);
+ if (!allPreviousPartitionsToOwner.containsKey(tp)) {
+ allPreviousPartitionsToOwner.put(tp, consumer);
+ ownedPartitions.add(tp);
+ } else {
+ String otherConsumer = allPreviousPartitionsToOwner.get(tp);
+ log.error("Found multiple consumers {} and {} claiming the same TopicPartition {} in the "
+ + "same generation {}, this will be invalidated and removed from their previous assignment.",
+ consumer, otherConsumer, tp, maxGeneration);
+ consumerToOwnedPartitions.get(otherConsumer).remove(tp);
+ partitionsWithMultiplePreviousOwners.add(tp);
+ }
}
}
}
}
- for (String consumer : membersWithOldGeneration) {
- consumerToOwnedPartitions.get(consumer).clear();
- }
return true;
}
+ /**
+ * This constrainedAssign optimizes the assignment algorithm when all consumers were subscribed to same set of topics.
+ * The method includes the following steps:
+ *
+ * 1. Reassign as many previously owned partitions as possible, up to the maxQuota
+ * 2. Fill remaining members up to minQuota
+ * 3. If we ran out of unassigned partitions before filling all consumers, we need to start stealing partitions
+ * from the over-full consumers at max capacity
+ * 4. Otherwise we may have run out of unfilled consumers before assigning all partitions, in which case we
+ * should just distribute one partition each to all consumers at min capacity
+ *
+ * @param partitionsPerTopic The number of partitions for each subscribed topic
+ * @param consumerToOwnedPartitions Each consumer's previously owned and still-subscribed partitions
+ * @param partitionsWithMultiplePreviousOwners The partitions being claimed in the previous assignment of multiple consumers
+ *
+ * @return Map from each member to the list of partitions assigned to them.
+ */
private Map> constrainedAssign(Map partitionsPerTopic,
- Map> consumerToOwnedPartitions) {
+ Map> consumerToOwnedPartitions,
+ Set partitionsWithMultiplePreviousOwners) {
SortedSet unassignedPartitions = getTopicPartitions(partitionsPerTopic);
Set allRevokedPartitions = new HashSet<>();
@@ -169,6 +206,16 @@ public abstract class AbstractStickyAssignor extends AbstractPartitionAssignor {
List ownedPartitions = consumerEntry.getValue();
List consumerAssignment = assignment.get(consumer);
+
+ for (TopicPartition doublyClaimedPartition : partitionsWithMultiplePreviousOwners) {
+ if (ownedPartitions.contains(doublyClaimedPartition)) {
+ log.error("Found partition {} still claimed as owned by consumer {}, despite being claimed by multiple "
+ + "consumers already in the same generation. Removing it from the ownedPartitions",
+ doublyClaimedPartition, consumer);
+ ownedPartitions.remove(doublyClaimedPartition);
+ }
+ }
+
int i = 0;
// assign the first N partitions up to the max quota, and mark the remaining as being revoked
for (TopicPartition tp : ownedPartitions) {
@@ -207,7 +254,7 @@ public abstract class AbstractStickyAssignor extends AbstractPartitionAssignor {
consumerAssignment.add(tp);
unassignedPartitionsIter.remove();
// We already assigned all possible ownedPartitions, so we know this must be newly to this consumer
- if (allRevokedPartitions.contains(tp))
+ if (allRevokedPartitions.contains(tp) || partitionsWithMultiplePreviousOwners.contains(tp))
partitionsTransferringOwnership.put(tp, consumer);
} else {
break;
@@ -250,10 +297,12 @@ public abstract class AbstractStickyAssignor extends AbstractPartitionAssignor {
// We can skip the bookkeeping of unassignedPartitions and maxCapacityMembers here since we are at the end
assignment.get(underCapacityConsumer).add(unassignedPartition);
- if (allRevokedPartitions.contains(unassignedPartition))
+ if (allRevokedPartitions.contains(unassignedPartition) || partitionsWithMultiplePreviousOwners.contains(unassignedPartition))
partitionsTransferringOwnership.put(unassignedPartition, underCapacityConsumer);
}
+ log.info("Final assignment of partitions to consumers: \n{}", assignment);
+
return assignment;
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java
index aed8c095370..9187014af0c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java
@@ -16,16 +16,24 @@
*/
package org.apache.kafka.clients.consumer;
-import static org.junit.Assert.assertTrue;
-
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription;
import org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor;
import org.apache.kafka.clients.consumer.internals.AbstractStickyAssignorTest;
import org.apache.kafka.common.TopicPartition;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static java.util.Collections.emptyList;
+import static org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.DEFAULT_GENERATION;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
public class CooperativeStickyAssignorTest extends AbstractStickyAssignorTest {
@@ -39,6 +47,74 @@ public class CooperativeStickyAssignorTest extends AbstractStickyAssignorTest {
return new Subscription(topics, assignor.subscriptionUserData(new HashSet<>(topics)), partitions);
}
+ @Override
+ public Subscription buildSubscriptionWithGeneration(List topics, List partitions, int generation) {
+ assignor.onAssignment(null, new ConsumerGroupMetadata("dummy-group-id", generation, "dummy-member-id", Optional.empty()));
+ return new Subscription(topics, assignor.subscriptionUserData(new HashSet<>(topics)), partitions);
+ }
+
+ @Test
+ public void testEncodeAndDecodeGeneration() {
+ Subscription subscription = new Subscription(topics(topic), assignor.subscriptionUserData(new HashSet<>(topics(topic))));
+
+ Optional encodedGeneration = ((CooperativeStickyAssignor) assignor).memberData(subscription).generation;
+ assertTrue(encodedGeneration.isPresent());
+ assertEquals(encodedGeneration.get().intValue(), DEFAULT_GENERATION);
+
+ int generation = 10;
+ assignor.onAssignment(null, new ConsumerGroupMetadata("dummy-group-id", generation, "dummy-member-id", Optional.empty()));
+
+ subscription = new Subscription(topics(topic), assignor.subscriptionUserData(new HashSet<>(topics(topic))));
+ encodedGeneration = ((CooperativeStickyAssignor) assignor).memberData(subscription).generation;
+
+ assertTrue(encodedGeneration.isPresent());
+ assertEquals(encodedGeneration.get().intValue(), generation);
+ }
+
+ @Test
+ public void testDecodeGeneration() {
+ Subscription subscription = new Subscription(topics(topic));
+ assertFalse(((CooperativeStickyAssignor) assignor).memberData(subscription).generation.isPresent());
+ }
+
+ @Test
+ public void testAllConsumersHaveOwnedPartitionInvalidatedWhenClaimedByMultipleConsumersInSameGenerationWithEqualPartitionsPerConsumer() {
+ Map partitionsPerTopic = new HashMap<>();
+ partitionsPerTopic.put(topic, 3);
+
+ subscriptions.put(consumer1, buildSubscription(topics(topic), partitions(tp(topic, 0), tp(topic, 1))));
+ subscriptions.put(consumer2, buildSubscription(topics(topic), partitions(tp(topic, 0), tp(topic, 2))));
+ subscriptions.put(consumer3, buildSubscription(topics(topic), emptyList()));
+
+ Map> assignment = assignor.assign(partitionsPerTopic, subscriptions);
+ assertEquals(partitions(tp(topic, 1)), assignment.get(consumer1));
+ assertEquals(partitions(tp(topic, 2)), assignment.get(consumer2));
+ // In the cooperative assignor, topic-0 has to be considered "owned" and so it cant be assigned until both have "revoked" it
+ assertTrue(assignment.get(consumer3).isEmpty());
+
+ verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic);
+ assertTrue(isFullyBalanced(assignment));
+ }
+
+ @Test
+ public void testAllConsumersHaveOwnedPartitionInvalidatedWhenClaimedByMultipleConsumersInSameGenerationWithUnequalPartitionsPerConsumer() {
+ Map partitionsPerTopic = new HashMap<>();
+ partitionsPerTopic.put(topic, 4);
+
+ subscriptions.put(consumer1, buildSubscription(topics(topic), partitions(tp(topic, 0), tp(topic, 1))));
+ subscriptions.put(consumer2, buildSubscription(topics(topic), partitions(tp(topic, 0), tp(topic, 2))));
+ subscriptions.put(consumer3, buildSubscription(topics(topic), emptyList()));
+
+ Map> assignment = assignor.assign(partitionsPerTopic, subscriptions);
+ assertEquals(partitions(tp(topic, 1)), assignment.get(consumer1));
+ assertEquals(partitions(tp(topic, 2), tp(topic, 3)), assignment.get(consumer2));
+ // In the cooperative assignor, topic-0 has to be considered "owned" and so it cant be assigned until both have "revoked" it
+ assertTrue(assignment.get(consumer3).isEmpty());
+
+ verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic);
+ assertTrue(isFullyBalanced(assignment));
+ }
+
/**
* The cooperative assignor must do some additional work and verification of some assignments relative to the eager
* assignor, since it may or may not need to trigger a second follow-up rebalance.
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java
index fb899449037..3e22f35dc34 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients.consumer;
+import static java.util.Collections.emptyList;
import static org.apache.kafka.clients.consumer.StickyAssignor.serializeTopicPartitionAssignment;
import static org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.DEFAULT_GENERATION;
import static org.junit.Assert.assertEquals;
@@ -51,6 +52,48 @@ public class StickyAssignorTest extends AbstractStickyAssignorTest {
serializeTopicPartitionAssignment(new MemberData(partitions, Optional.of(DEFAULT_GENERATION))));
}
+ @Override
+ public Subscription buildSubscriptionWithGeneration(List topics, List partitions, int generation) {
+ return new Subscription(topics,
+ serializeTopicPartitionAssignment(new MemberData(partitions, Optional.of(generation))));
+ }
+
+ @Test
+ public void testAllConsumersHaveOwnedPartitionInvalidatedWhenClaimedByMultipleConsumersInSameGenerationWithEqualPartitionsPerConsumer() {
+ Map partitionsPerTopic = new HashMap<>();
+ partitionsPerTopic.put(topic, 3);
+
+ subscriptions.put(consumer1, buildSubscription(topics(topic), partitions(tp(topic, 0), tp(topic, 1))));
+ subscriptions.put(consumer2, buildSubscription(topics(topic), partitions(tp(topic, 0), tp(topic, 2))));
+ subscriptions.put(consumer3, buildSubscription(topics(topic), emptyList()));
+
+ Map> assignment = assignor.assign(partitionsPerTopic, subscriptions);
+ assertEquals(partitions(tp(topic, 1)), assignment.get(consumer1));
+ assertEquals(partitions(tp(topic, 2)), assignment.get(consumer2));
+ assertEquals(partitions(tp(topic, 0)), assignment.get(consumer3));
+
+ verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic);
+ assertTrue(isFullyBalanced(assignment));
+ }
+
+ @Test
+ public void testAllConsumersHaveOwnedPartitionInvalidatedWhenClaimedByMultipleConsumersInSameGenerationWithUnequalPartitionsPerConsumer() {
+ Map partitionsPerTopic = new HashMap<>();
+ partitionsPerTopic.put(topic, 4);
+
+ subscriptions.put(consumer1, buildSubscription(topics(topic), partitions(tp(topic, 0), tp(topic, 1))));
+ subscriptions.put(consumer2, buildSubscription(topics(topic), partitions(tp(topic, 0), tp(topic, 2))));
+ subscriptions.put(consumer3, buildSubscription(topics(topic), emptyList()));
+
+ Map> assignment = assignor.assign(partitionsPerTopic, subscriptions);
+ assertEquals(partitions(tp(topic, 1)), assignment.get(consumer1));
+ assertEquals(partitions(tp(topic, 2), tp(topic, 3)), assignment.get(consumer2));
+ assertEquals(partitions(tp(topic, 0)), assignment.get(consumer3));
+
+ verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic);
+ assertTrue(isFullyBalanced(assignment));
+ }
+
@Test
public void testAssignmentWithMultipleGenerations1() {
String consumer1 = "consumer1";
@@ -221,11 +264,6 @@ public class StickyAssignorTest extends AbstractStickyAssignorTest {
assertTrue(assignor.isSticky());
}
- private Subscription buildSubscriptionWithGeneration(List topics, List partitions, int generation) {
- return new Subscription(topics,
- serializeTopicPartitionAssignment(new MemberData(partitions, Optional.of(generation))));
- }
-
private static Subscription buildSubscriptionWithOldSchema(List topics, List partitions) {
Struct struct = new Struct(StickyAssignor.STICKY_ASSIGNOR_USER_DATA_V0);
List topicAssignments = new ArrayList<>();
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
index c7b45233ca1..22a3e35f317 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
@@ -33,6 +33,7 @@ import org.apache.kafka.common.utils.Utils;
import org.junit.Before;
import org.junit.Test;
+import static org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.DEFAULT_GENERATION;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.junit.Assert.assertEquals;
@@ -42,13 +43,21 @@ import static org.junit.Assert.assertTrue;
public abstract class AbstractStickyAssignorTest {
protected AbstractStickyAssignor assignor;
protected String consumerId = "consumer";
+ protected String consumer1 = "consumer1";
+ protected String consumer2 = "consumer2";
+ protected String consumer3 = "consumer3";
protected Map subscriptions;
protected String topic = "topic";
+ protected String topic1 = "topic1";
+ protected String topic2 = "topic2";
+ protected String topic3 = "topic3";
protected abstract AbstractStickyAssignor createAssignor();
protected abstract Subscription buildSubscription(List topics, List partitions);
+ protected abstract Subscription buildSubscriptionWithGeneration(List topics, List partitions, int generation);
+
@Before
public void setUp() {
assignor = createAssignor();
@@ -656,6 +665,62 @@ public abstract class AbstractStickyAssignorTest {
}
}
+ @Test
+ public void testAllConsumersReachExpectedQuotaAndAreConsideredFilled() {
+ Map partitionsPerTopic = new HashMap<>();
+ partitionsPerTopic.put(topic, 4);
+
+ subscriptions.put(consumer1, buildSubscription(topics(topic), partitions(tp(topic, 0), tp(topic, 1))));
+ subscriptions.put(consumer2, buildSubscription(topics(topic), partitions(tp(topic, 2))));
+ subscriptions.put(consumer3, buildSubscription(topics(topic), Collections.emptyList()));
+
+ Map> assignment = assignor.assign(partitionsPerTopic, subscriptions);
+ assertEquals(partitions(tp(topic, 0), tp(topic, 1)), assignment.get(consumer1));
+ assertEquals(partitions(tp(topic, 2)), assignment.get(consumer2));
+ assertEquals(partitions(tp(topic, 3)), assignment.get(consumer3));
+
+ verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic);
+ assertTrue(isFullyBalanced(assignment));
+ }
+
+ @Test
+ public void testOwnedPartitionsAreInvalidatedForConsumerWithStaleGeneration() {
+ Map partitionsPerTopic = new HashMap<>();
+ partitionsPerTopic.put(topic, 3);
+ partitionsPerTopic.put(topic2, 3);
+
+ int currentGeneration = 10;
+
+ subscriptions.put(consumer1, buildSubscriptionWithGeneration(topics(topic, topic2), partitions(tp(topic, 0), tp(topic, 2), tp(topic2, 1)), currentGeneration));
+ subscriptions.put(consumer2, buildSubscriptionWithGeneration(topics(topic, topic2), partitions(tp(topic, 0), tp(topic, 2), tp(topic2, 1)), currentGeneration - 1));
+
+ Map> assignment = assignor.assign(partitionsPerTopic, subscriptions);
+ assertEquals(new HashSet<>(partitions(tp(topic, 0), tp(topic, 2), tp(topic2, 1))), new HashSet<>(assignment.get(consumer1)));
+ assertEquals(new HashSet<>(partitions(tp(topic, 1), tp(topic2, 0), tp(topic2, 2))), new HashSet<>(assignment.get(consumer2)));
+
+ verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic);
+ assertTrue(isFullyBalanced(assignment));
+ }
+
+ @Test
+ public void testOwnedPartitionsAreInvalidatedForConsumerWithNoGeneration() {
+ Map partitionsPerTopic = new HashMap<>();
+ partitionsPerTopic.put(topic, 3);
+ partitionsPerTopic.put(topic2, 3);
+
+ int currentGeneration = 10;
+
+ subscriptions.put(consumer1, buildSubscriptionWithGeneration(topics(topic, topic2), partitions(tp(topic, 0), tp(topic, 2), tp(topic2, 1)), currentGeneration));
+ subscriptions.put(consumer2, buildSubscriptionWithGeneration(topics(topic, topic2), partitions(tp(topic, 0), tp(topic, 2), tp(topic2, 1)), DEFAULT_GENERATION));
+
+ Map> assignment = assignor.assign(partitionsPerTopic, subscriptions);
+ assertEquals(new HashSet<>(partitions(tp(topic, 0), tp(topic, 2), tp(topic2, 1))), new HashSet<>(assignment.get(consumer1)));
+ assertEquals(new HashSet<>(partitions(tp(topic, 1), tp(topic2, 0), tp(topic2, 2))), new HashSet<>(assignment.get(consumer2)));
+
+ verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic);
+ assertTrue(isFullyBalanced(assignment));
+ }
+
private String getTopicName(int i, int maxNum) {
return getCanonicalName("t", i, maxNum);
}