KAFKA-17288 Removed tracking partition member epoch (KIP-932) (#16828)

Partition epochs are tracked for consumer groups where epoch is the current assigned member epoch. As share groups have partitions shared hence maintaing the partition epochs is not required.

Reviewers: Andrew Schofield <aschofield@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Apoorv Mittal 2024-08-12 00:17:46 +01:00 committed by GitHub
parent 9a85705b56
commit 126b25b51d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 95 additions and 243 deletions

View File

@ -113,13 +113,6 @@ public abstract class ModernGroup<T extends ModernGroupMember> implements Group
*/
private final TimelineHashMap<Uuid, TimelineHashMap<Integer, String>> invertedTargetAssignment;
/**
* The current partition epoch maps each topic-partitions to their current epoch where
* the epoch is the epoch of their owners. When a member revokes a partition, it removes
* its epochs from this map. When a member gets a partition, it adds its epochs to this map.
*/
protected final TimelineHashMap<Uuid, TimelineHashMap<Integer, Integer>> currentPartitionEpoch;
/**
* The metadata refresh deadline. It consists of a timestamp in milliseconds together with
* the group epoch at the time of setting it. The metadata refresh time is considered as a
@ -146,7 +139,6 @@ public abstract class ModernGroup<T extends ModernGroupMember> implements Group
this.targetAssignmentEpoch = new TimelineInteger(snapshotRegistry);
this.targetAssignment = new TimelineHashMap<>(snapshotRegistry, 0);
this.invertedTargetAssignment = new TimelineHashMap<>(snapshotRegistry, 0);
this.currentPartitionEpoch = new TimelineHashMap<>(snapshotRegistry, 0);
}
/**
@ -356,26 +348,6 @@ public abstract class ModernGroup<T extends ModernGroupMember> implements Group
return Collections.unmodifiableMap(targetAssignment);
}
/**
* Returns the current epoch of a partition or -1 if the partition
* does not have one.
*
* @param topicId The topic id.
* @param partitionId The partition id.
*
* @return The epoch or -1.
*/
public int currentPartitionEpoch(
Uuid topicId, int partitionId
) {
Map<Integer, Integer> partitions = currentPartitionEpoch.get(topicId);
if (partitions == null) {
return -1;
} else {
return partitions.getOrDefault(partitionId, -1);
}
}
/**
* @return An immutable Map of subscription metadata for
* each topic that the consumer group is subscribed to.
@ -591,73 +563,6 @@ public abstract class ModernGroup<T extends ModernGroupMember> implements Group
return HOMOGENEOUS;
}
/**
* Removes the partition epochs based on the provided assignment.
*
* @param assignment The assignment.
* @param expectedEpoch The expected epoch.
* @throws IllegalStateException if the epoch does not match the expected one.
* package-private for testing.
*/
public void removePartitionEpochs(
Map<Uuid, Set<Integer>> assignment,
int expectedEpoch
) {
assignment.forEach((topicId, assignedPartitions) -> {
currentPartitionEpoch.compute(topicId, (__, partitionsOrNull) -> {
if (partitionsOrNull != null) {
assignedPartitions.forEach(partitionId -> {
Integer prevValue = partitionsOrNull.remove(partitionId);
if (prevValue != expectedEpoch) {
throw new IllegalStateException(
String.format("Cannot remove the epoch %d from %s-%s because the partition is " +
"still owned at a different epoch %d", expectedEpoch, topicId, partitionId, prevValue));
}
});
if (partitionsOrNull.isEmpty()) {
return null;
} else {
return partitionsOrNull;
}
} else {
throw new IllegalStateException(
String.format("Cannot remove the epoch %d from %s because it does not have any epoch",
expectedEpoch, topicId));
}
});
});
}
/**
* Adds the partitions epoch based on the provided assignment.
*
* @param assignment The assignment.
* @param epoch The new epoch.
* @throws IllegalStateException if the partition already has an epoch assigned.
* package-private for testing.
*/
public void addPartitionEpochs(
Map<Uuid, Set<Integer>> assignment,
int epoch
) {
assignment.forEach((topicId, assignedPartitions) -> {
currentPartitionEpoch.compute(topicId, (__, partitionsOrNull) -> {
if (partitionsOrNull == null) {
partitionsOrNull = new TimelineHashMap<>(snapshotRegistry, assignedPartitions.size());
}
for (Integer partitionId : assignedPartitions) {
Integer prevValue = partitionsOrNull.put(partitionId, epoch);
if (prevValue != null) {
throw new IllegalStateException(
String.format("Cannot set the epoch of %s-%s to %d because the partition is " +
"still owned at epoch %d", topicId, partitionId, epoch, prevValue));
}
}
return partitionsOrNull;
});
});
}
/**
* Gets the protocol type for the group.
*

View File

@ -123,6 +123,13 @@ public class ConsumerGroup extends ModernGroup<ConsumerGroupMember> {
*/
private final TimelineHashMap<String, Integer> classicProtocolMembersSupportedProtocols;
/**
* The current partition epoch maps each topic-partitions to their current epoch where
* the epoch is the epoch of their owners. When a member revokes a partition, it removes
* its epochs from this map. When a member gets a partition, it adds its epochs to this map.
*/
private final TimelineHashMap<Uuid, TimelineHashMap<Integer, Integer>> currentPartitionEpoch;
public ConsumerGroup(
SnapshotRegistry snapshotRegistry,
String groupId,
@ -135,6 +142,7 @@ public class ConsumerGroup extends ModernGroup<ConsumerGroupMember> {
this.metrics = Objects.requireNonNull(metrics);
this.numClassicProtocolMembers = new TimelineInteger(snapshotRegistry);
this.classicProtocolMembersSupportedProtocols = new TimelineHashMap<>(snapshotRegistry, 0);
this.currentPartitionEpoch = new TimelineHashMap<>(snapshotRegistry, 0);
}
/**
@ -346,6 +354,26 @@ public class ConsumerGroup extends ModernGroup<ConsumerGroupMember> {
return Collections.unmodifiableMap(staticMembers);
}
/**
* Returns the current epoch of a partition or -1 if the partition
* does not have one.
*
* @param topicId The topic id.
* @param partitionId The partition id.
*
* @return The epoch or -1.
*/
public int currentPartitionEpoch(
Uuid topicId, int partitionId
) {
Map<Integer, Integer> partitions = currentPartitionEpoch.get(topicId);
if (partitions == null) {
return -1;
} else {
return partitions.getOrDefault(partitionId, -1);
}
}
/**
* Compute the preferred (server side) assignor for the group while
* taking into account the updated member. The computation relies
@ -677,6 +705,73 @@ public class ConsumerGroup extends ModernGroup<ConsumerGroupMember> {
}
}
/**
* Removes the partition epochs based on the provided assignment.
*
* @param assignment The assignment.
* @param expectedEpoch The expected epoch.
* @throws IllegalStateException if the epoch does not match the expected one.
* package-private for testing.
*/
void removePartitionEpochs(
Map<Uuid, Set<Integer>> assignment,
int expectedEpoch
) {
assignment.forEach((topicId, assignedPartitions) -> {
currentPartitionEpoch.compute(topicId, (__, partitionsOrNull) -> {
if (partitionsOrNull != null) {
assignedPartitions.forEach(partitionId -> {
Integer prevValue = partitionsOrNull.remove(partitionId);
if (prevValue != expectedEpoch) {
throw new IllegalStateException(
String.format("Cannot remove the epoch %d from %s-%s because the partition is " +
"still owned at a different epoch %d", expectedEpoch, topicId, partitionId, prevValue));
}
});
if (partitionsOrNull.isEmpty()) {
return null;
} else {
return partitionsOrNull;
}
} else {
throw new IllegalStateException(
String.format("Cannot remove the epoch %d from %s because it does not have any epoch",
expectedEpoch, topicId));
}
});
});
}
/**
* Adds the partitions epoch based on the provided assignment.
*
* @param assignment The assignment.
* @param epoch The new epoch.
* @throws IllegalStateException if the partition already has an epoch assigned.
* package-private for testing.
*/
void addPartitionEpochs(
Map<Uuid, Set<Integer>> assignment,
int epoch
) {
assignment.forEach((topicId, assignedPartitions) -> {
currentPartitionEpoch.compute(topicId, (__, partitionsOrNull) -> {
if (partitionsOrNull == null) {
partitionsOrNull = new TimelineHashMap<>(snapshotRegistry, assignedPartitions.size());
}
for (Integer partitionId : assignedPartitions) {
Integer prevValue = partitionsOrNull.put(partitionId, epoch);
if (prevValue != null) {
throw new IllegalStateException(
String.format("Cannot set the epoch of %s-%s to %d because the partition is " +
"still owned at epoch %d", topicId, partitionId, epoch, prevValue));
}
}
return partitionsOrNull;
});
});
}
public ConsumerGroupDescribeResponseData.DescribedGroup asDescribedGroup(
long committedOffset,
String defaultAssignor,

View File

@ -162,7 +162,6 @@ public class ShareGroup extends ModernGroup<ShareGroupMember> {
ShareGroupMember oldMember = members.put(newMember.memberId(), newMember);
maybeUpdateSubscribedTopicNamesAndGroupSubscriptionType(oldMember, newMember);
maybeUpdatePartitionEpoch(oldMember, newMember);
maybeUpdateGroupState();
}
@ -174,7 +173,6 @@ public class ShareGroup extends ModernGroup<ShareGroupMember> {
public void removeMember(String memberId) {
ShareGroupMember oldMember = members.remove(memberId);
maybeUpdateSubscribedTopicNamesAndGroupSubscriptionType(oldMember, null);
maybeRemovePartitionEpoch(oldMember);
maybeUpdateGroupState();
}
@ -251,33 +249,6 @@ public class ShareGroup extends ModernGroup<ShareGroupMember> {
state.set(newState);
}
/**
* Updates the partition epochs based on the old and the new member.
*
* @param oldMember The old member.
* @param newMember The new member.
*/
private void maybeUpdatePartitionEpoch(
ShareGroupMember oldMember,
ShareGroupMember newMember
) {
maybeRemovePartitionEpoch(oldMember);
addPartitionEpochs(newMember.assignedPartitions(), newMember.memberEpoch());
}
/**
* Removes the partition epochs for the provided member.
*
* @param oldMember The old member.
*/
private void maybeRemovePartitionEpoch(
ShareGroupMember oldMember
) {
if (oldMember != null) {
removePartitionEpochs(oldMember.assignedPartitions(), oldMember.memberEpoch());
}
}
public ShareGroupDescribeResponseData.DescribedGroup asDescribedGroup(
long committedOffset,
String defaultAssignor,

View File

@ -42,8 +42,6 @@ import java.util.HashSet;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
import static org.apache.kafka.coordinator.group.CoordinatorRecordHelpersTest.mkMapOfPartitionRacks;
import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HETEROGENEOUS;
import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS;
@ -116,123 +114,6 @@ public class ShareGroupTest {
}
@Test
public void testUpdatingMemberUpdatesPartitionEpoch() {
Uuid fooTopicId = Uuid.randomUuid();
Uuid barTopicId = Uuid.randomUuid();
ShareGroup shareGroup = createShareGroup("foo");
ShareGroupMember member;
member = new ShareGroupMember.Builder("member")
.setMemberEpoch(10)
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(fooTopicId, 1, 2, 3)))
.build();
shareGroup.updateMember(member);
assertEquals(10, shareGroup.currentPartitionEpoch(fooTopicId, 1));
assertEquals(10, shareGroup.currentPartitionEpoch(fooTopicId, 2));
assertEquals(10, shareGroup.currentPartitionEpoch(fooTopicId, 3));
assertEquals(-1, shareGroup.currentPartitionEpoch(barTopicId, 4));
assertEquals(-1, shareGroup.currentPartitionEpoch(barTopicId, 5));
assertEquals(-1, shareGroup.currentPartitionEpoch(barTopicId, 6));
member = new ShareGroupMember.Builder(member)
.setMemberEpoch(11)
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(barTopicId, 1, 2, 3)))
.build();
shareGroup.updateMember(member);
assertEquals(11, shareGroup.currentPartitionEpoch(barTopicId, 1));
assertEquals(11, shareGroup.currentPartitionEpoch(barTopicId, 2));
assertEquals(11, shareGroup.currentPartitionEpoch(barTopicId, 3));
assertEquals(-1, shareGroup.currentPartitionEpoch(fooTopicId, 1));
assertEquals(-1, shareGroup.currentPartitionEpoch(fooTopicId, 2));
assertEquals(-1, shareGroup.currentPartitionEpoch(fooTopicId, 3));
}
@Test
public void testRemovePartitionEpochs() {
Uuid fooTopicId = Uuid.randomUuid();
ShareGroup shareGroup = createShareGroup("foo");
// Removing should fail because there is no epoch set.
assertThrows(IllegalStateException.class, () -> shareGroup.removePartitionEpochs(
mkAssignment(
mkTopicAssignment(fooTopicId, 1)
),
10
));
ShareGroupMember m1 = new ShareGroupMember.Builder("m1")
.setMemberEpoch(10)
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(fooTopicId, 1)))
.build();
shareGroup.updateMember(m1);
// Removing should fail because the expected epoch is incorrect.
assertThrows(IllegalStateException.class, () -> shareGroup.removePartitionEpochs(
mkAssignment(
mkTopicAssignment(fooTopicId, 1)
),
11
));
}
@Test
public void testAddPartitionEpochs() {
Uuid fooTopicId = Uuid.randomUuid();
ShareGroup shareGroup = createShareGroup("foo");
shareGroup.addPartitionEpochs(
mkAssignment(
mkTopicAssignment(fooTopicId, 1)
),
10
);
// Changing the epoch should fail because the owner of the partition
// should remove it first.
assertThrows(IllegalStateException.class, () -> shareGroup.addPartitionEpochs(
mkAssignment(
mkTopicAssignment(fooTopicId, 1)
),
11
));
}
@Test
public void testDeletingMemberRemovesPartitionEpoch() {
Uuid fooTopicId = Uuid.randomUuid();
ShareGroup shareGroup = createShareGroup("foo");
ShareGroupMember member;
member = new ShareGroupMember.Builder("member")
.setMemberEpoch(10)
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(fooTopicId, 1, 2, 3)))
.build();
shareGroup.updateMember(member);
assertEquals(10, shareGroup.currentPartitionEpoch(fooTopicId, 1));
assertEquals(10, shareGroup.currentPartitionEpoch(fooTopicId, 2));
assertEquals(10, shareGroup.currentPartitionEpoch(fooTopicId, 3));
shareGroup.removeMember(member.memberId());
assertEquals(-1, shareGroup.currentPartitionEpoch(fooTopicId, 1));
assertEquals(-1, shareGroup.currentPartitionEpoch(fooTopicId, 2));
assertEquals(-1, shareGroup.currentPartitionEpoch(fooTopicId, 3));
}
@Test
public void testGroupState() {
ShareGroup shareGroup = createShareGroup("foo");