diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java index d6eaf089f1e..fc4ae6b38bb 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java @@ -609,14 +609,13 @@ public class ReplicationControlManager { List partitionIds = new ArrayList<>(topic.parts.keySet()); for (int partitionId : partitionIds) { PartitionRegistration partition = topic.parts.get(partitionId); - PartitionRegistration nextPartition = partition.merge( - new PartitionChangeRecord(). - setPartitionId(partitionId). - setTopicId(topic.id). - setEligibleLeaderReplicas(Collections.emptyList()). - setLastKnownElr(Collections.emptyList())); - if (!nextPartition.equals(partition)) { - topic.parts.put(partitionId, nextPartition); + if (partition.elr.length != 0 || partition.lastKnownElr.length != 0) { + topic.parts.put(partitionId, partition.merge( + new PartitionChangeRecord(). + setPartitionId(partitionId). + setTopicId(topic.id). + setEligibleLeaderReplicas(Collections.emptyList()). + setLastKnownElr(Collections.emptyList()))); numRemoved++; } } diff --git a/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java b/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java index ae021a6f2fb..b934d10f6d1 100644 --- a/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java +++ b/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java @@ -19,6 +19,7 @@ package org.apache.kafka.image; import org.apache.kafka.common.metadata.AccessControlEntryRecord; import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord; +import org.apache.kafka.common.metadata.ClearElrRecord; import org.apache.kafka.common.metadata.ClientQuotaRecord; import org.apache.kafka.common.metadata.ConfigRecord; import org.apache.kafka.common.metadata.DelegationTokenRecord; @@ -231,6 +232,9 @@ public final class MetadataDelta { case ACCESS_CONTROL_ENTRY_RECORD: replay((AccessControlEntryRecord) record); break; + case CLEAR_ELR_RECORD: + replay((ClearElrRecord) record); + break; case REMOVE_ACCESS_CONTROL_ENTRY_RECORD: replay((RemoveAccessControlEntryRecord) record); break; @@ -278,6 +282,10 @@ public final class MetadataDelta { getOrCreateConfigsDelta().replay(record); } + public void replay(ClearElrRecord record) { + getOrCreateTopicsDelta().replay(record); + } + public void replay(PartitionChangeRecord record) { getOrCreateTopicsDelta().replay(record); } diff --git a/metadata/src/main/java/org/apache/kafka/image/TopicDelta.java b/metadata/src/main/java/org/apache/kafka/image/TopicDelta.java index 6623ae85902..a663cbfd7d9 100644 --- a/metadata/src/main/java/org/apache/kafka/image/TopicDelta.java +++ b/metadata/src/main/java/org/apache/kafka/image/TopicDelta.java @@ -20,11 +20,13 @@ package org.apache.kafka.image; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.metadata.ClearElrRecord; import org.apache.kafka.common.metadata.PartitionChangeRecord; import org.apache.kafka.common.metadata.PartitionRecord; import org.apache.kafka.metadata.PartitionRegistration; import org.apache.kafka.metadata.Replicas; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -83,6 +85,31 @@ public final class TopicDelta { partitionChanges.put(record.partitionId(), partition.merge(record)); } + public void replay(ClearElrRecord record) { + // Some partitions are not added to the image yet, let's check the partitionChanges first. + partitionChanges.forEach((partitionId, partition) -> { + maybeClearElr(partitionId, partition); + }); + + image.partitions().forEach((partitionId, partition) -> { + if (!partitionChanges.containsKey(partitionId)) { + maybeClearElr(partitionId, partition); + } + }); + } + + void maybeClearElr(int partitionId, PartitionRegistration partition) { + if (partition.elr.length != 0 || partition.lastKnownElr.length != 0) { + partitionChanges.put(partitionId, partition.merge( + new PartitionChangeRecord(). + setPartitionId(partitionId). + setTopicId(image.id()). + setEligibleLeaderReplicas(Collections.emptyList()). + setLastKnownElr(Collections.emptyList()) + )); + } + } + public TopicImage apply() { Map newPartitions = new HashMap<>(); for (Entry entry : image.partitions().entrySet()) { diff --git a/metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java b/metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java index 2b441776003..ec5cee135b3 100644 --- a/metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java +++ b/metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java @@ -20,6 +20,7 @@ package org.apache.kafka.image; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.metadata.ClearElrRecord; import org.apache.kafka.common.metadata.PartitionChangeRecord; import org.apache.kafka.common.metadata.PartitionRecord; import org.apache.kafka.common.metadata.RemoveTopicRecord; @@ -28,6 +29,7 @@ import org.apache.kafka.metadata.Replicas; import org.apache.kafka.server.common.MetadataVersion; import org.apache.kafka.server.immutable.ImmutableMap; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -54,7 +56,7 @@ public final class TopicsDelta { */ private final Set deletedTopicIds = new HashSet<>(); - private final Set createdTopicIds = new HashSet<>(); + private final Map createdTopics = new HashMap<>(); public TopicsDelta(TopicsImage image) { this.image = image; @@ -72,7 +74,7 @@ public final class TopicsDelta { TopicDelta delta = new TopicDelta( new TopicImage(record.name(), record.topicId(), Collections.emptyMap())); changedTopics.put(record.topicId(), delta); - createdTopicIds.add(record.topicId()); + createdTopics.put(record.name(), record.topicId()); } TopicDelta getOrCreateTopicDelta(Uuid id) { @@ -94,6 +96,29 @@ public final class TopicsDelta { topicDelta.replay(record); } + public void replay(ClearElrRecord record) { + if (!record.topicName().isEmpty()) { + Uuid topicId; + if (image.getTopic(record.topicName()) != null) { + topicId = image.getTopic(record.topicName()).id(); + } else { + topicId = createdTopics.get(record.topicName()); + } + if (topicId == null) { + throw new RuntimeException("Unable to clear elr for topic with name " + + record.topicName() + ": no such topic found."); + } + TopicDelta topicDelta = getOrCreateTopicDelta(topicId); + topicDelta.replay(record); + } else { + // Update all the existing topics + image.topicsById().forEach((topicId, image) -> { + TopicDelta topicDelta = getOrCreateTopicDelta(topicId); + topicDelta.replay(record); + }); + } + } + public String replay(RemoveTopicRecord record) { TopicDelta topicDelta = changedTopics.remove(record.topicId()); String topicName; @@ -172,8 +197,8 @@ public final class TopicsDelta { return deletedTopicIds; } - public Set createdTopicIds() { - return createdTopicIds; + public Collection createdTopicIds() { + return createdTopics.values(); } /** @@ -231,7 +256,7 @@ public final class TopicsDelta { return "TopicsDelta(" + "changedTopics=" + changedTopics + ", deletedTopicIds=" + deletedTopicIds + - ", createdTopicIds=" + createdTopicIds + + ", createdTopics=" + createdTopics + ')'; } } diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java index bd4cad63fda..a6c81f6510e 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java @@ -62,6 +62,7 @@ import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData; import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingPartitionReassignment; import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingTopicReassignment; import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord; +import org.apache.kafka.common.metadata.ClearElrRecord; import org.apache.kafka.common.metadata.ConfigRecord; import org.apache.kafka.common.metadata.FeatureLevelRecord; import org.apache.kafka.common.metadata.PartitionChangeRecord; @@ -131,6 +132,7 @@ import static java.util.Arrays.asList; import static java.util.Collections.singletonList; import static java.util.Collections.singletonMap; import static org.apache.kafka.common.config.TopicConfig.SEGMENT_BYTES_CONFIG; +import static org.apache.kafka.common.metadata.MetadataRecordType.CLEAR_ELR_RECORD; import static org.apache.kafka.common.protocol.Errors.ELECTION_NOT_NEEDED; import static org.apache.kafka.common.protocol.Errors.ELIGIBLE_LEADERS_NOT_AVAILABLE; import static org.apache.kafka.common.protocol.Errors.FENCED_LEADER_EPOCH; @@ -3287,4 +3289,19 @@ public class ReplicationControlManagerTest { assertArrayEquals(new int[]{1}, ctx.replicationControl.getPartition(barId, 0).elr); } } + + @Test + void testElrsRemovedShouldNotBumpPartitionEpochIfNoChange() { + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder(). + setIsElrEnabled(true). + setStaticConfig(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2"). + build(); + ctx.registerBrokers(1, 2, 3, 4); + ctx.unfenceBrokers(1, 2, 3, 4); + Uuid fooId = ctx.createTestTopic("foo", new int[][]{ + new int[]{1, 2, 4}, new int[]{1, 3, 4}}).topicId(); + int partitionEpoch = ctx.replicationControl.getPartition(fooId, 0).partitionEpoch; + ctx.replay(Arrays.asList(new ApiMessageAndVersion(new ClearElrRecord(), CLEAR_ELR_RECORD.highestSupportedVersion()))); + assertEquals(partitionEpoch, ctx.replicationControl.getPartition(fooId, 0).partitionEpoch); + } } diff --git a/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java b/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java index 4805fea058a..398cb84b5aa 100644 --- a/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java @@ -20,6 +20,7 @@ package org.apache.kafka.image; import org.apache.kafka.common.DirectoryId; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.metadata.ClearElrRecord; import org.apache.kafka.common.metadata.PartitionChangeRecord; import org.apache.kafka.common.metadata.PartitionRecord; import org.apache.kafka.common.metadata.RemoveTopicRecord; @@ -45,6 +46,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import static org.apache.kafka.common.metadata.MetadataRecordType.CLEAR_ELR_RECORD; import static org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_CHANGE_RECORD; import static org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_RECORD; import static org.apache.kafka.common.metadata.MetadataRecordType.REMOVE_TOPIC_RECORD; @@ -352,6 +354,95 @@ public class TopicsImageTest { assertEquals(Collections.emptyMap(), changes.followers()); } + @Test + public void testClearElrRecords() { + Uuid fooId = Uuid.fromString("0hHJ3X5ZQ-CFfQ5xgpj90w"); + Uuid barId = Uuid.fromString("f62ptyETTjet8SL5ZeREiw"); + + List topics = new ArrayList<>(); + topics.add( + newTopicImage( + "foo", + fooId, + newPartition(new int[] {0, 1, 2, 3}) + ) + ); + TopicsImage image = new TopicsImage(newTopicsByIdMap(topics), + newTopicsByNameMap(topics)); + + List topicRecords = new ArrayList<>(); + topicRecords.add( + new ApiMessageAndVersion( + new PartitionChangeRecord().setTopicId(fooId).setPartitionId(0). + setIsr(Arrays.asList(0, 1)). + setEligibleLeaderReplicas(Arrays.asList(2)). + setLastKnownElr(Arrays.asList(3)), + PARTITION_CHANGE_RECORD.highestSupportedVersion() + ) + ); + + TopicsDelta delta = new TopicsDelta(image); + RecordTestUtils.replayAll(delta, topicRecords); + image = delta.apply(); + + assertEquals(1, image.getTopic(fooId).partitions().get(0).elr.length); + assertEquals(1, image.getTopic(fooId).partitions().get(0).lastKnownElr.length); + + topicRecords = new ArrayList<>(); + + /* Test the following: + 1. The clear elr record should work on all existing topics(foo). + 2. The clear elr record should work on the new topic(bar) in the same batch. + */ + topicRecords.addAll(Arrays.asList( + new ApiMessageAndVersion( + new TopicRecord().setTopicId(barId). + setName("bar"), + TOPIC_RECORD.highestSupportedVersion() + ), + new ApiMessageAndVersion( + new PartitionRecord().setTopicId(barId). + setPartitionId(0). + setLeader(0). + setIsr(Arrays.asList(1)). + setEligibleLeaderReplicas(Arrays.asList(2)). + setLastKnownElr(Arrays.asList(3)), + PARTITION_RECORD.highestSupportedVersion() + ), + new ApiMessageAndVersion( + new ClearElrRecord().setTopicName("bar"), + CLEAR_ELR_RECORD.highestSupportedVersion() + ), + new ApiMessageAndVersion( + new ClearElrRecord(), + CLEAR_ELR_RECORD.highestSupportedVersion() + )) + ); + delta = new TopicsDelta(image); + RecordTestUtils.replayAll(delta, topicRecords); + image = delta.apply(); + + assertEquals(0, image.getTopic(fooId).partitions().get(0).elr.length); + assertEquals(0, image.getTopic(fooId).partitions().get(0).lastKnownElr.length); + assertEquals(0, image.getTopic(barId).partitions().get(0).elr.length); + assertEquals(0, image.getTopic(barId).partitions().get(0).lastKnownElr.length); + } + + @Test + public void testClearElrRecordForNonExistTopic() { + TopicsImage image = new TopicsImage(newTopicsByIdMap(Collections.emptyList()), + newTopicsByNameMap(Collections.emptyList())); + TopicsDelta delta = new TopicsDelta(image); + List topicRecords = new ArrayList<>(); + topicRecords.addAll(Collections.singletonList( + new ApiMessageAndVersion( + new ClearElrRecord().setTopicName("non-exist"), + CLEAR_ELR_RECORD.highestSupportedVersion() + )) + ); + assertThrows(RuntimeException.class, () -> RecordTestUtils.replayAll(delta, topicRecords)); + } + @Test public void testLocalReassignmentChanges() { int localId = 3;