diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java index 537929b4a2a..2aecb6f5964 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java @@ -113,13 +113,6 @@ public abstract class ModernGroup implements Group */ private final TimelineHashMap> invertedTargetAssignment; - /** - * The current partition epoch maps each topic-partitions to their current epoch where - * the epoch is the epoch of their owners. When a member revokes a partition, it removes - * its epochs from this map. When a member gets a partition, it adds its epochs to this map. - */ - protected final TimelineHashMap> currentPartitionEpoch; - /** * The metadata refresh deadline. It consists of a timestamp in milliseconds together with * the group epoch at the time of setting it. The metadata refresh time is considered as a @@ -146,7 +139,6 @@ public abstract class ModernGroup implements Group this.targetAssignmentEpoch = new TimelineInteger(snapshotRegistry); this.targetAssignment = new TimelineHashMap<>(snapshotRegistry, 0); this.invertedTargetAssignment = new TimelineHashMap<>(snapshotRegistry, 0); - this.currentPartitionEpoch = new TimelineHashMap<>(snapshotRegistry, 0); } /** @@ -356,26 +348,6 @@ public abstract class ModernGroup implements Group return Collections.unmodifiableMap(targetAssignment); } - /** - * Returns the current epoch of a partition or -1 if the partition - * does not have one. - * - * @param topicId The topic id. - * @param partitionId The partition id. - * - * @return The epoch or -1. - */ - public int currentPartitionEpoch( - Uuid topicId, int partitionId - ) { - Map partitions = currentPartitionEpoch.get(topicId); - if (partitions == null) { - return -1; - } else { - return partitions.getOrDefault(partitionId, -1); - } - } - /** * @return An immutable Map of subscription metadata for * each topic that the consumer group is subscribed to. @@ -591,73 +563,6 @@ public abstract class ModernGroup implements Group return HOMOGENEOUS; } - /** - * Removes the partition epochs based on the provided assignment. - * - * @param assignment The assignment. - * @param expectedEpoch The expected epoch. - * @throws IllegalStateException if the epoch does not match the expected one. - * package-private for testing. - */ - public void removePartitionEpochs( - Map> assignment, - int expectedEpoch - ) { - assignment.forEach((topicId, assignedPartitions) -> { - currentPartitionEpoch.compute(topicId, (__, partitionsOrNull) -> { - if (partitionsOrNull != null) { - assignedPartitions.forEach(partitionId -> { - Integer prevValue = partitionsOrNull.remove(partitionId); - if (prevValue != expectedEpoch) { - throw new IllegalStateException( - String.format("Cannot remove the epoch %d from %s-%s because the partition is " + - "still owned at a different epoch %d", expectedEpoch, topicId, partitionId, prevValue)); - } - }); - if (partitionsOrNull.isEmpty()) { - return null; - } else { - return partitionsOrNull; - } - } else { - throw new IllegalStateException( - String.format("Cannot remove the epoch %d from %s because it does not have any epoch", - expectedEpoch, topicId)); - } - }); - }); - } - - /** - * Adds the partitions epoch based on the provided assignment. - * - * @param assignment The assignment. - * @param epoch The new epoch. - * @throws IllegalStateException if the partition already has an epoch assigned. - * package-private for testing. - */ - public void addPartitionEpochs( - Map> assignment, - int epoch - ) { - assignment.forEach((topicId, assignedPartitions) -> { - currentPartitionEpoch.compute(topicId, (__, partitionsOrNull) -> { - if (partitionsOrNull == null) { - partitionsOrNull = new TimelineHashMap<>(snapshotRegistry, assignedPartitions.size()); - } - for (Integer partitionId : assignedPartitions) { - Integer prevValue = partitionsOrNull.put(partitionId, epoch); - if (prevValue != null) { - throw new IllegalStateException( - String.format("Cannot set the epoch of %s-%s to %d because the partition is " + - "still owned at epoch %d", topicId, partitionId, epoch, prevValue)); - } - } - return partitionsOrNull; - }); - }); - } - /** * Gets the protocol type for the group. * diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java index 483e1c7e969..a5c36148b41 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java @@ -123,6 +123,13 @@ public class ConsumerGroup extends ModernGroup { */ private final TimelineHashMap classicProtocolMembersSupportedProtocols; + /** + * The current partition epoch maps each topic-partitions to their current epoch where + * the epoch is the epoch of their owners. When a member revokes a partition, it removes + * its epochs from this map. When a member gets a partition, it adds its epochs to this map. + */ + private final TimelineHashMap> currentPartitionEpoch; + public ConsumerGroup( SnapshotRegistry snapshotRegistry, String groupId, @@ -135,6 +142,7 @@ public class ConsumerGroup extends ModernGroup { this.metrics = Objects.requireNonNull(metrics); this.numClassicProtocolMembers = new TimelineInteger(snapshotRegistry); this.classicProtocolMembersSupportedProtocols = new TimelineHashMap<>(snapshotRegistry, 0); + this.currentPartitionEpoch = new TimelineHashMap<>(snapshotRegistry, 0); } /** @@ -346,6 +354,26 @@ public class ConsumerGroup extends ModernGroup { return Collections.unmodifiableMap(staticMembers); } + /** + * Returns the current epoch of a partition or -1 if the partition + * does not have one. + * + * @param topicId The topic id. + * @param partitionId The partition id. + * + * @return The epoch or -1. + */ + public int currentPartitionEpoch( + Uuid topicId, int partitionId + ) { + Map partitions = currentPartitionEpoch.get(topicId); + if (partitions == null) { + return -1; + } else { + return partitions.getOrDefault(partitionId, -1); + } + } + /** * Compute the preferred (server side) assignor for the group while * taking into account the updated member. The computation relies @@ -677,6 +705,73 @@ public class ConsumerGroup extends ModernGroup { } } + /** + * Removes the partition epochs based on the provided assignment. + * + * @param assignment The assignment. + * @param expectedEpoch The expected epoch. + * @throws IllegalStateException if the epoch does not match the expected one. + * package-private for testing. + */ + void removePartitionEpochs( + Map> assignment, + int expectedEpoch + ) { + assignment.forEach((topicId, assignedPartitions) -> { + currentPartitionEpoch.compute(topicId, (__, partitionsOrNull) -> { + if (partitionsOrNull != null) { + assignedPartitions.forEach(partitionId -> { + Integer prevValue = partitionsOrNull.remove(partitionId); + if (prevValue != expectedEpoch) { + throw new IllegalStateException( + String.format("Cannot remove the epoch %d from %s-%s because the partition is " + + "still owned at a different epoch %d", expectedEpoch, topicId, partitionId, prevValue)); + } + }); + if (partitionsOrNull.isEmpty()) { + return null; + } else { + return partitionsOrNull; + } + } else { + throw new IllegalStateException( + String.format("Cannot remove the epoch %d from %s because it does not have any epoch", + expectedEpoch, topicId)); + } + }); + }); + } + + /** + * Adds the partitions epoch based on the provided assignment. + * + * @param assignment The assignment. + * @param epoch The new epoch. + * @throws IllegalStateException if the partition already has an epoch assigned. + * package-private for testing. + */ + void addPartitionEpochs( + Map> assignment, + int epoch + ) { + assignment.forEach((topicId, assignedPartitions) -> { + currentPartitionEpoch.compute(topicId, (__, partitionsOrNull) -> { + if (partitionsOrNull == null) { + partitionsOrNull = new TimelineHashMap<>(snapshotRegistry, assignedPartitions.size()); + } + for (Integer partitionId : assignedPartitions) { + Integer prevValue = partitionsOrNull.put(partitionId, epoch); + if (prevValue != null) { + throw new IllegalStateException( + String.format("Cannot set the epoch of %s-%s to %d because the partition is " + + "still owned at epoch %d", topicId, partitionId, epoch, prevValue)); + } + } + return partitionsOrNull; + }); + }); + } + public ConsumerGroupDescribeResponseData.DescribedGroup asDescribedGroup( long committedOffset, String defaultAssignor, diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java index 5607eba9948..2833f7ef7f5 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java @@ -162,7 +162,6 @@ public class ShareGroup extends ModernGroup { ShareGroupMember oldMember = members.put(newMember.memberId(), newMember); maybeUpdateSubscribedTopicNamesAndGroupSubscriptionType(oldMember, newMember); - maybeUpdatePartitionEpoch(oldMember, newMember); maybeUpdateGroupState(); } @@ -174,7 +173,6 @@ public class ShareGroup extends ModernGroup { public void removeMember(String memberId) { ShareGroupMember oldMember = members.remove(memberId); maybeUpdateSubscribedTopicNamesAndGroupSubscriptionType(oldMember, null); - maybeRemovePartitionEpoch(oldMember); maybeUpdateGroupState(); } @@ -251,33 +249,6 @@ public class ShareGroup extends ModernGroup { state.set(newState); } - /** - * Updates the partition epochs based on the old and the new member. - * - * @param oldMember The old member. - * @param newMember The new member. - */ - private void maybeUpdatePartitionEpoch( - ShareGroupMember oldMember, - ShareGroupMember newMember - ) { - maybeRemovePartitionEpoch(oldMember); - addPartitionEpochs(newMember.assignedPartitions(), newMember.memberEpoch()); - } - - /** - * Removes the partition epochs for the provided member. - * - * @param oldMember The old member. - */ - private void maybeRemovePartitionEpoch( - ShareGroupMember oldMember - ) { - if (oldMember != null) { - removePartitionEpochs(oldMember.assignedPartitions(), oldMember.memberEpoch()); - } - } - public ShareGroupDescribeResponseData.DescribedGroup asDescribedGroup( long committedOffset, String defaultAssignor, diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupTest.java index 45bded5bac7..d3db6b1471c 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupTest.java @@ -42,8 +42,6 @@ import java.util.HashSet; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; -import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment; -import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; import static org.apache.kafka.coordinator.group.CoordinatorRecordHelpersTest.mkMapOfPartitionRacks; import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HETEROGENEOUS; import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS; @@ -116,123 +114,6 @@ public class ShareGroupTest { } - @Test - public void testUpdatingMemberUpdatesPartitionEpoch() { - Uuid fooTopicId = Uuid.randomUuid(); - Uuid barTopicId = Uuid.randomUuid(); - - ShareGroup shareGroup = createShareGroup("foo"); - ShareGroupMember member; - - member = new ShareGroupMember.Builder("member") - .setMemberEpoch(10) - .setAssignedPartitions(mkAssignment( - mkTopicAssignment(fooTopicId, 1, 2, 3))) - .build(); - - shareGroup.updateMember(member); - - assertEquals(10, shareGroup.currentPartitionEpoch(fooTopicId, 1)); - assertEquals(10, shareGroup.currentPartitionEpoch(fooTopicId, 2)); - assertEquals(10, shareGroup.currentPartitionEpoch(fooTopicId, 3)); - assertEquals(-1, shareGroup.currentPartitionEpoch(barTopicId, 4)); - assertEquals(-1, shareGroup.currentPartitionEpoch(barTopicId, 5)); - assertEquals(-1, shareGroup.currentPartitionEpoch(barTopicId, 6)); - - member = new ShareGroupMember.Builder(member) - .setMemberEpoch(11) - .setAssignedPartitions(mkAssignment( - mkTopicAssignment(barTopicId, 1, 2, 3))) - .build(); - - shareGroup.updateMember(member); - - assertEquals(11, shareGroup.currentPartitionEpoch(barTopicId, 1)); - assertEquals(11, shareGroup.currentPartitionEpoch(barTopicId, 2)); - assertEquals(11, shareGroup.currentPartitionEpoch(barTopicId, 3)); - assertEquals(-1, shareGroup.currentPartitionEpoch(fooTopicId, 1)); - assertEquals(-1, shareGroup.currentPartitionEpoch(fooTopicId, 2)); - assertEquals(-1, shareGroup.currentPartitionEpoch(fooTopicId, 3)); - } - - @Test - public void testRemovePartitionEpochs() { - Uuid fooTopicId = Uuid.randomUuid(); - ShareGroup shareGroup = createShareGroup("foo"); - - // Removing should fail because there is no epoch set. - assertThrows(IllegalStateException.class, () -> shareGroup.removePartitionEpochs( - mkAssignment( - mkTopicAssignment(fooTopicId, 1) - ), - 10 - )); - - ShareGroupMember m1 = new ShareGroupMember.Builder("m1") - .setMemberEpoch(10) - .setAssignedPartitions(mkAssignment( - mkTopicAssignment(fooTopicId, 1))) - .build(); - - shareGroup.updateMember(m1); - - // Removing should fail because the expected epoch is incorrect. - assertThrows(IllegalStateException.class, () -> shareGroup.removePartitionEpochs( - mkAssignment( - mkTopicAssignment(fooTopicId, 1) - ), - 11 - )); - } - - @Test - public void testAddPartitionEpochs() { - Uuid fooTopicId = Uuid.randomUuid(); - ShareGroup shareGroup = createShareGroup("foo"); - - shareGroup.addPartitionEpochs( - mkAssignment( - mkTopicAssignment(fooTopicId, 1) - ), - 10 - ); - - // Changing the epoch should fail because the owner of the partition - // should remove it first. - assertThrows(IllegalStateException.class, () -> shareGroup.addPartitionEpochs( - mkAssignment( - mkTopicAssignment(fooTopicId, 1) - ), - 11 - )); - } - - @Test - public void testDeletingMemberRemovesPartitionEpoch() { - Uuid fooTopicId = Uuid.randomUuid(); - - ShareGroup shareGroup = createShareGroup("foo"); - ShareGroupMember member; - - member = new ShareGroupMember.Builder("member") - .setMemberEpoch(10) - .setAssignedPartitions(mkAssignment( - mkTopicAssignment(fooTopicId, 1, 2, 3))) - .build(); - - shareGroup.updateMember(member); - - assertEquals(10, shareGroup.currentPartitionEpoch(fooTopicId, 1)); - assertEquals(10, shareGroup.currentPartitionEpoch(fooTopicId, 2)); - assertEquals(10, shareGroup.currentPartitionEpoch(fooTopicId, 3)); - - shareGroup.removeMember(member.memberId()); - - assertEquals(-1, shareGroup.currentPartitionEpoch(fooTopicId, 1)); - assertEquals(-1, shareGroup.currentPartitionEpoch(fooTopicId, 2)); - assertEquals(-1, shareGroup.currentPartitionEpoch(fooTopicId, 3)); - } - @Test public void testGroupState() { ShareGroup shareGroup = createShareGroup("foo");