KAFKA-19383: Handle the deleted topics when applying ClearElrRecord (#20034)
CI / build (push) Waiting to run Details

https://issues.apache.org/jira/browse/KAFKA-19383 When applying the
ClearElrRecord, it may pick up the topicId in the image without checking
if the topic has been deleted. This can cause the creation of a new
TopicRecord with an old topic ID.

Reviewers: Alyssa Huang <ahuang@confluent.io>, Artem Livshits <alivshits@confluent.io>, Colin P. McCabe <cmccabe@apache.org>

No conflicts.
This commit is contained in:
Calvin Liu 2025-06-24 19:59:50 -07:00 committed by GitHub
parent 7e51a2a43b
commit 46e843da9f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 248 additions and 8 deletions

View File

@ -96,26 +96,43 @@ public final class TopicsDelta {
topicDelta.replay(record); topicDelta.replay(record);
} }
private void maybeReplayClearElrRecord(Uuid topicId, ClearElrRecord record) {
// Only apply the record if the topic is not deleted.
if (!deletedTopicIds.contains(topicId)) {
TopicDelta topicDelta = getOrCreateTopicDelta(topicId);
topicDelta.replay(record);
}
}
// When replaying the ClearElrRecord, we need to first find the latest topic ID associated with the topic(s) because
// multiple topic IDs for the same topic in a TopicsDelta is possible in the event of topic deletion and recreation.
// Second, we should not add the topicDelta if the given topic ID has been deleted. So that we don't leak the
// deleted topic ID.
public void replay(ClearElrRecord record) { public void replay(ClearElrRecord record) {
if (!record.topicName().isEmpty()) { if (!record.topicName().isEmpty()) {
Uuid topicId; Uuid topicId = null;
if (image.getTopic(record.topicName()) != null) { // CreatedTopics contains the latest topic IDs. It should be checked first in case the topic is deleted and
topicId = image.getTopic(record.topicName()).id(); // created in the same batch.
} else { if (createdTopics.containsKey(record.topicName())) {
topicId = createdTopics.get(record.topicName()); topicId = createdTopics.get(record.topicName());
} else if (image.getTopic(record.topicName()) != null) {
topicId = image.getTopic(record.topicName()).id();
} }
if (topicId == null) { if (topicId == null) {
throw new RuntimeException("Unable to clear elr for topic with name " + throw new RuntimeException("Unable to clear elr for topic with name " +
record.topicName() + ": no such topic found."); record.topicName() + ": no such topic found.");
} }
TopicDelta topicDelta = getOrCreateTopicDelta(topicId);
topicDelta.replay(record); maybeReplayClearElrRecord(topicId, record);
} else { } else {
// Update all the existing topics // Update all the existing topics
image.topicsById().forEach((topicId, image) -> { image.topicsById().forEach((topicId, image) -> {
TopicDelta topicDelta = getOrCreateTopicDelta(topicId); maybeReplayClearElrRecord(topicId, record);
topicDelta.replay(record);
}); });
createdTopicIds().forEach((topicId -> {
maybeReplayClearElrRecord(topicId, record);
}));
} }
} }

View File

@ -429,6 +429,229 @@ public class TopicsImageTest {
assertEquals(0, image.getTopic(barId).partitions().get(0).lastKnownElr.length); assertEquals(0, image.getTopic(barId).partitions().get(0).lastKnownElr.length);
} }
@Test
public void testClearElrRecordOnNonExistingTopic() {
TopicsImage image = TopicsImage.EMPTY;
List<ApiMessageAndVersion> topicRecords = new ArrayList<>();
topicRecords.addAll(List.of(
new ApiMessageAndVersion(
new ClearElrRecord().setTopicName("foo"),
CLEAR_ELR_RECORD.highestSupportedVersion()
))
);
TopicsDelta delta = new TopicsDelta(image);
assertThrows(RuntimeException.class, () -> RecordTestUtils.replayAll(delta, topicRecords));
}
@Test
public void testClearElrRecords_All_ForDeletedTopics() {
Uuid fooId = Uuid.fromString("0hHJ3X5ZQ-CFfQ5xgpj90w");
Uuid fooId2 = Uuid.randomUuid();
Uuid barId = Uuid.fromString("f62ptyETTjet8SL5ZeREiw");
Uuid barId2 = Uuid.randomUuid();
List<TopicImage> topics = new ArrayList<>();
topics.add(
newTopicImage(
"foo",
fooId,
newPartition(new int[] {0, 1, 2, 3})
)
);
TopicsImage image = new TopicsImage(newTopicsByIdMap(topics),
newTopicsByNameMap(topics));
List<ApiMessageAndVersion> topicRecords = new ArrayList<>();
topicRecords.add(
new ApiMessageAndVersion(
new PartitionRecord().setTopicId(fooId).
setPartitionId(0).
setLeader(0).
setIsr(List.of(1, 2, 3)),
PARTITION_RECORD.highestSupportedVersion()
)
);
TopicsDelta delta = new TopicsDelta(image);
RecordTestUtils.replayAll(delta, topicRecords);
image = delta.apply();
topicRecords = new ArrayList<>();
/* Test the following:
1. Topic foo is deleted and created in the same delta, the clear elr applies on the new topic
2. Topic bar is created, deleted, then created in the same delta, the clear elr applies on the new topic
*/
topicRecords.addAll(List.of(
new ApiMessageAndVersion(
new RemoveTopicRecord().setTopicId(fooId),
REMOVE_TOPIC_RECORD.highestSupportedVersion()
),
new ApiMessageAndVersion(
new TopicRecord().setTopicId(fooId2).
setName("foo"),
TOPIC_RECORD.highestSupportedVersion()
),
new ApiMessageAndVersion(
new PartitionRecord().setTopicId(fooId2).setPartitionId(0).
setIsr(List.of(0, 1)).
setEligibleLeaderReplicas(List.of(2)).
setLastKnownElr(List.of(3)),
PARTITION_CHANGE_RECORD.highestSupportedVersion()
),
new ApiMessageAndVersion(
new TopicRecord().setTopicId(barId).
setName("bar"),
TOPIC_RECORD.highestSupportedVersion()
),
new ApiMessageAndVersion(
new PartitionRecord().setTopicId(barId).
setPartitionId(0).
setLeader(0).
setIsr(List.of(1, 2, 3)),
PARTITION_RECORD.highestSupportedVersion()
),
new ApiMessageAndVersion(
new RemoveTopicRecord().setTopicId(barId),
REMOVE_TOPIC_RECORD.highestSupportedVersion()
),
new ApiMessageAndVersion(
new TopicRecord().setTopicId(barId2).
setName("bar"),
TOPIC_RECORD.highestSupportedVersion()
),
new ApiMessageAndVersion(
new PartitionRecord().setTopicId(barId2).
setPartitionId(0).
setLeader(0).
setIsr(List.of(1)).
setEligibleLeaderReplicas(List.of(2)).
setLastKnownElr(List.of(3)),
PARTITION_RECORD.highestSupportedVersion()
),
new ApiMessageAndVersion(
new ClearElrRecord(),
CLEAR_ELR_RECORD.highestSupportedVersion()
))
);
delta = new TopicsDelta(image);
RecordTestUtils.replayAll(delta, topicRecords);
image = delta.apply();
assertEquals(2, image.topicsById().size());
assertEquals(2, image.topicsByName().size());
assertEquals(0, image.getTopic(fooId2).partitions().get(0).elr.length);
assertEquals(0, image.getTopic(fooId2).partitions().get(0).lastKnownElr.length);
assertEquals(0, image.getTopic(barId2).partitions().get(0).elr.length);
assertEquals(0, image.getTopic(barId2).partitions().get(0).lastKnownElr.length);
}
@Test
public void testClearElrRecords_Single_ForDeletedTopics() {
Uuid fooId = Uuid.fromString("0hHJ3X5ZQ-CFfQ5xgpj90w");
Uuid fooId2 = Uuid.randomUuid();
Uuid barId = Uuid.fromString("f62ptyETTjet8SL5ZeREiw");
Uuid barId2 = Uuid.randomUuid();
List<TopicImage> topics = new ArrayList<>();
topics.add(
newTopicImage(
"foo",
fooId,
newPartition(new int[] {0, 1, 2, 3})
)
);
TopicsImage image = new TopicsImage(newTopicsByIdMap(topics),
newTopicsByNameMap(topics));
List<ApiMessageAndVersion> topicRecords = new ArrayList<>();
topicRecords.add(
new ApiMessageAndVersion(
new PartitionRecord().setTopicId(fooId).
setPartitionId(0).
setLeader(0).
setIsr(List.of(1, 2, 3)),
PARTITION_RECORD.highestSupportedVersion()
)
);
TopicsDelta delta = new TopicsDelta(image);
RecordTestUtils.replayAll(delta, topicRecords);
image = delta.apply();
topicRecords = new ArrayList<>();
/* Test the following:
1. Topic foo is deleted and created in the same delta, the clear elr applies on the new topic
2. Topic bar is created, deleted, then created in the same delta, the clear elr applies on the new topic
*/
topicRecords.addAll(List.of(
new ApiMessageAndVersion(
new RemoveTopicRecord().setTopicId(fooId),
REMOVE_TOPIC_RECORD.highestSupportedVersion()
),
new ApiMessageAndVersion(
new TopicRecord().setTopicId(fooId2).
setName("foo"),
TOPIC_RECORD.highestSupportedVersion()
),
new ApiMessageAndVersion(
new PartitionRecord().setTopicId(fooId2).setPartitionId(0).
setIsr(List.of(0, 1)).
setEligibleLeaderReplicas(List.of(2)).
setLastKnownElr(List.of(3)),
PARTITION_CHANGE_RECORD.highestSupportedVersion()
),
new ApiMessageAndVersion(
new TopicRecord().setTopicId(barId).
setName("bar"),
TOPIC_RECORD.highestSupportedVersion()
),
new ApiMessageAndVersion(
new PartitionRecord().setTopicId(barId).
setPartitionId(0).
setLeader(0).
setIsr(List.of(1, 2, 3)),
PARTITION_RECORD.highestSupportedVersion()
),
new ApiMessageAndVersion(
new RemoveTopicRecord().setTopicId(barId),
REMOVE_TOPIC_RECORD.highestSupportedVersion()
),
new ApiMessageAndVersion(
new TopicRecord().setTopicId(barId2).
setName("bar"),
TOPIC_RECORD.highestSupportedVersion()
),
new ApiMessageAndVersion(
new PartitionRecord().setTopicId(barId2).
setPartitionId(0).
setLeader(0).
setIsr(List.of(1)).
setEligibleLeaderReplicas(List.of(2)).
setLastKnownElr(List.of(3)),
PARTITION_RECORD.highestSupportedVersion()
),
new ApiMessageAndVersion(
new ClearElrRecord().setTopicName("foo"),
CLEAR_ELR_RECORD.highestSupportedVersion()
),
new ApiMessageAndVersion(
new ClearElrRecord().setTopicName("bar"),
CLEAR_ELR_RECORD.highestSupportedVersion()
))
);
delta = new TopicsDelta(image);
RecordTestUtils.replayAll(delta, topicRecords);
image = delta.apply();
assertEquals(2, image.topicsById().size());
assertEquals(2, image.topicsByName().size());
assertEquals(0, image.getTopic(fooId2).partitions().get(0).elr.length);
assertEquals(0, image.getTopic(fooId2).partitions().get(0).lastKnownElr.length);
assertEquals(0, image.getTopic(barId2).partitions().get(0).elr.length);
assertEquals(0, image.getTopic(barId2).partitions().get(0).lastKnownElr.length);
}
@Test @Test
public void testClearElrRecordForNonExistTopic() { public void testClearElrRecordForNonExistTopic() {
TopicsImage image = new TopicsImage(newTopicsByIdMap(Collections.emptyList()), TopicsImage image = new TopicsImage(newTopicsByIdMap(Collections.emptyList()),