KAFKA-18649: complete ClearElrRecord handling (#18708)

Implement ClearElrRecord handling in the TopicDelta. Also, the ReplicationControlManager should not merge updates if ELR/LastKnownElr are empty, becuase that will cause an unnecessary partition epoch bump.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
This commit is contained in:
Calvin Liu 2025-01-29 15:07:44 -08:00 committed by GitHub
parent 9dd73d43b0
commit fdbed6c458
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 180 additions and 13 deletions

View File

@ -609,14 +609,13 @@ public class ReplicationControlManager {
List<Integer> partitionIds = new ArrayList<>(topic.parts.keySet());
for (int partitionId : partitionIds) {
PartitionRegistration partition = topic.parts.get(partitionId);
PartitionRegistration nextPartition = partition.merge(
new PartitionChangeRecord().
setPartitionId(partitionId).
setTopicId(topic.id).
setEligibleLeaderReplicas(Collections.emptyList()).
setLastKnownElr(Collections.emptyList()));
if (!nextPartition.equals(partition)) {
topic.parts.put(partitionId, nextPartition);
if (partition.elr.length != 0 || partition.lastKnownElr.length != 0) {
topic.parts.put(partitionId, partition.merge(
new PartitionChangeRecord().
setPartitionId(partitionId).
setTopicId(topic.id).
setEligibleLeaderReplicas(Collections.emptyList()).
setLastKnownElr(Collections.emptyList())));
numRemoved++;
}
}

View File

@ -19,6 +19,7 @@ package org.apache.kafka.image;
import org.apache.kafka.common.metadata.AccessControlEntryRecord;
import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
import org.apache.kafka.common.metadata.ClearElrRecord;
import org.apache.kafka.common.metadata.ClientQuotaRecord;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.DelegationTokenRecord;
@ -231,6 +232,9 @@ public final class MetadataDelta {
case ACCESS_CONTROL_ENTRY_RECORD:
replay((AccessControlEntryRecord) record);
break;
case CLEAR_ELR_RECORD:
replay((ClearElrRecord) record);
break;
case REMOVE_ACCESS_CONTROL_ENTRY_RECORD:
replay((RemoveAccessControlEntryRecord) record);
break;
@ -278,6 +282,10 @@ public final class MetadataDelta {
getOrCreateConfigsDelta().replay(record);
}
public void replay(ClearElrRecord record) {
getOrCreateTopicsDelta().replay(record);
}
public void replay(PartitionChangeRecord record) {
getOrCreateTopicsDelta().replay(record);
}

View File

@ -20,11 +20,13 @@ package org.apache.kafka.image;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.metadata.ClearElrRecord;
import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.metadata.Replicas;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@ -83,6 +85,31 @@ public final class TopicDelta {
partitionChanges.put(record.partitionId(), partition.merge(record));
}
public void replay(ClearElrRecord record) {
// Some partitions are not added to the image yet, let's check the partitionChanges first.
partitionChanges.forEach((partitionId, partition) -> {
maybeClearElr(partitionId, partition);
});
image.partitions().forEach((partitionId, partition) -> {
if (!partitionChanges.containsKey(partitionId)) {
maybeClearElr(partitionId, partition);
}
});
}
void maybeClearElr(int partitionId, PartitionRegistration partition) {
if (partition.elr.length != 0 || partition.lastKnownElr.length != 0) {
partitionChanges.put(partitionId, partition.merge(
new PartitionChangeRecord().
setPartitionId(partitionId).
setTopicId(image.id()).
setEligibleLeaderReplicas(Collections.emptyList()).
setLastKnownElr(Collections.emptyList())
));
}
}
public TopicImage apply() {
Map<Integer, PartitionRegistration> newPartitions = new HashMap<>();
for (Entry<Integer, PartitionRegistration> entry : image.partitions().entrySet()) {

View File

@ -20,6 +20,7 @@ package org.apache.kafka.image;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.metadata.ClearElrRecord;
import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.RemoveTopicRecord;
@ -28,6 +29,7 @@ import org.apache.kafka.metadata.Replicas;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.immutable.ImmutableMap;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@ -54,7 +56,7 @@ public final class TopicsDelta {
*/
private final Set<Uuid> deletedTopicIds = new HashSet<>();
private final Set<Uuid> createdTopicIds = new HashSet<>();
private final Map<String, Uuid> createdTopics = new HashMap<>();
public TopicsDelta(TopicsImage image) {
this.image = image;
@ -72,7 +74,7 @@ public final class TopicsDelta {
TopicDelta delta = new TopicDelta(
new TopicImage(record.name(), record.topicId(), Collections.emptyMap()));
changedTopics.put(record.topicId(), delta);
createdTopicIds.add(record.topicId());
createdTopics.put(record.name(), record.topicId());
}
TopicDelta getOrCreateTopicDelta(Uuid id) {
@ -94,6 +96,29 @@ public final class TopicsDelta {
topicDelta.replay(record);
}
public void replay(ClearElrRecord record) {
if (!record.topicName().isEmpty()) {
Uuid topicId;
if (image.getTopic(record.topicName()) != null) {
topicId = image.getTopic(record.topicName()).id();
} else {
topicId = createdTopics.get(record.topicName());
}
if (topicId == null) {
throw new RuntimeException("Unable to clear elr for topic with name " +
record.topicName() + ": no such topic found.");
}
TopicDelta topicDelta = getOrCreateTopicDelta(topicId);
topicDelta.replay(record);
} else {
// Update all the existing topics
image.topicsById().forEach((topicId, image) -> {
TopicDelta topicDelta = getOrCreateTopicDelta(topicId);
topicDelta.replay(record);
});
}
}
public String replay(RemoveTopicRecord record) {
TopicDelta topicDelta = changedTopics.remove(record.topicId());
String topicName;
@ -172,8 +197,8 @@ public final class TopicsDelta {
return deletedTopicIds;
}
public Set<Uuid> createdTopicIds() {
return createdTopicIds;
public Collection<Uuid> createdTopicIds() {
return createdTopics.values();
}
/**
@ -231,7 +256,7 @@ public final class TopicsDelta {
return "TopicsDelta(" +
"changedTopics=" + changedTopics +
", deletedTopicIds=" + deletedTopicIds +
", createdTopicIds=" + createdTopicIds +
", createdTopics=" + createdTopics +
')';
}
}

View File

@ -62,6 +62,7 @@ import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingPartitionReassignment;
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingTopicReassignment;
import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
import org.apache.kafka.common.metadata.ClearElrRecord;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.common.metadata.PartitionChangeRecord;
@ -131,6 +132,7 @@ import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
import static org.apache.kafka.common.config.TopicConfig.SEGMENT_BYTES_CONFIG;
import static org.apache.kafka.common.metadata.MetadataRecordType.CLEAR_ELR_RECORD;
import static org.apache.kafka.common.protocol.Errors.ELECTION_NOT_NEEDED;
import static org.apache.kafka.common.protocol.Errors.ELIGIBLE_LEADERS_NOT_AVAILABLE;
import static org.apache.kafka.common.protocol.Errors.FENCED_LEADER_EPOCH;
@ -3287,4 +3289,19 @@ public class ReplicationControlManagerTest {
assertArrayEquals(new int[]{1}, ctx.replicationControl.getPartition(barId, 0).elr);
}
}
@Test
void testElrsRemovedShouldNotBumpPartitionEpochIfNoChange() {
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().
setIsElrEnabled(true).
setStaticConfig(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2").
build();
ctx.registerBrokers(1, 2, 3, 4);
ctx.unfenceBrokers(1, 2, 3, 4);
Uuid fooId = ctx.createTestTopic("foo", new int[][]{
new int[]{1, 2, 4}, new int[]{1, 3, 4}}).topicId();
int partitionEpoch = ctx.replicationControl.getPartition(fooId, 0).partitionEpoch;
ctx.replay(Arrays.asList(new ApiMessageAndVersion(new ClearElrRecord(), CLEAR_ELR_RECORD.highestSupportedVersion())));
assertEquals(partitionEpoch, ctx.replicationControl.getPartition(fooId, 0).partitionEpoch);
}
}

View File

@ -20,6 +20,7 @@ package org.apache.kafka.image;
import org.apache.kafka.common.DirectoryId;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.metadata.ClearElrRecord;
import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.RemoveTopicRecord;
@ -45,6 +46,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import static org.apache.kafka.common.metadata.MetadataRecordType.CLEAR_ELR_RECORD;
import static org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_CHANGE_RECORD;
import static org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_RECORD;
import static org.apache.kafka.common.metadata.MetadataRecordType.REMOVE_TOPIC_RECORD;
@ -352,6 +354,95 @@ public class TopicsImageTest {
assertEquals(Collections.emptyMap(), changes.followers());
}
@Test
public void testClearElrRecords() {
Uuid fooId = Uuid.fromString("0hHJ3X5ZQ-CFfQ5xgpj90w");
Uuid barId = Uuid.fromString("f62ptyETTjet8SL5ZeREiw");
List<TopicImage> topics = new ArrayList<>();
topics.add(
newTopicImage(
"foo",
fooId,
newPartition(new int[] {0, 1, 2, 3})
)
);
TopicsImage image = new TopicsImage(newTopicsByIdMap(topics),
newTopicsByNameMap(topics));
List<ApiMessageAndVersion> topicRecords = new ArrayList<>();
topicRecords.add(
new ApiMessageAndVersion(
new PartitionChangeRecord().setTopicId(fooId).setPartitionId(0).
setIsr(Arrays.asList(0, 1)).
setEligibleLeaderReplicas(Arrays.asList(2)).
setLastKnownElr(Arrays.asList(3)),
PARTITION_CHANGE_RECORD.highestSupportedVersion()
)
);
TopicsDelta delta = new TopicsDelta(image);
RecordTestUtils.replayAll(delta, topicRecords);
image = delta.apply();
assertEquals(1, image.getTopic(fooId).partitions().get(0).elr.length);
assertEquals(1, image.getTopic(fooId).partitions().get(0).lastKnownElr.length);
topicRecords = new ArrayList<>();
/* Test the following:
1. The clear elr record should work on all existing topics(foo).
2. The clear elr record should work on the new topic(bar) in the same batch.
*/
topicRecords.addAll(Arrays.asList(
new ApiMessageAndVersion(
new TopicRecord().setTopicId(barId).
setName("bar"),
TOPIC_RECORD.highestSupportedVersion()
),
new ApiMessageAndVersion(
new PartitionRecord().setTopicId(barId).
setPartitionId(0).
setLeader(0).
setIsr(Arrays.asList(1)).
setEligibleLeaderReplicas(Arrays.asList(2)).
setLastKnownElr(Arrays.asList(3)),
PARTITION_RECORD.highestSupportedVersion()
),
new ApiMessageAndVersion(
new ClearElrRecord().setTopicName("bar"),
CLEAR_ELR_RECORD.highestSupportedVersion()
),
new ApiMessageAndVersion(
new ClearElrRecord(),
CLEAR_ELR_RECORD.highestSupportedVersion()
))
);
delta = new TopicsDelta(image);
RecordTestUtils.replayAll(delta, topicRecords);
image = delta.apply();
assertEquals(0, image.getTopic(fooId).partitions().get(0).elr.length);
assertEquals(0, image.getTopic(fooId).partitions().get(0).lastKnownElr.length);
assertEquals(0, image.getTopic(barId).partitions().get(0).elr.length);
assertEquals(0, image.getTopic(barId).partitions().get(0).lastKnownElr.length);
}
@Test
public void testClearElrRecordForNonExistTopic() {
TopicsImage image = new TopicsImage(newTopicsByIdMap(Collections.emptyList()),
newTopicsByNameMap(Collections.emptyList()));
TopicsDelta delta = new TopicsDelta(image);
List<ApiMessageAndVersion> topicRecords = new ArrayList<>();
topicRecords.addAll(Collections.singletonList(
new ApiMessageAndVersion(
new ClearElrRecord().setTopicName("non-exist"),
CLEAR_ELR_RECORD.highestSupportedVersion()
))
);
assertThrows(RuntimeException.class, () -> RecordTestUtils.replayAll(delta, topicRecords));
}
@Test
public void testLocalReassignmentChanges() {
int localId = 3;