KAFKA-19497; Topic replay code does not handle creation and deletion in the same delta (#20242)

There is a small logic bug in topic replay. If a topic is created and
then removed before the TopicsDelta is applied, we end up with the
deleted topic in createdTopics on the delta. Tis issue is fixed by
removing the topicName from createTopics when replaying
RemoveTopicRecord to make sure the deleted topics won't appear in
createTopics.

Reviewers: José Armando García Sancio <jsancio@apache.org>, Kevin Wu
 <kevin.wu2412@gmail.com>, Alyssa Huang <ahuang@confluent.io>
This commit is contained in:
anonymous 2025-08-27 15:34:12 -05:00 committed by GitHub
parent 0412be9e9d
commit 92fe00e184
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 12 additions and 0 deletions

View File

@ -136,6 +136,7 @@ public final class TopicsDelta {
String topicName;
if (topicDelta != null) {
topicName = topicDelta.image().name();
createdTopics.remove(topicName);
if (image.topicsById().containsKey(record.topicId())) {
deletedTopicIds.add(record.topicId());
}

View File

@ -897,4 +897,15 @@ public class TopicsImageTest {
var result = IMAGE1.topicIdToNameView().get("zar");
assertNull(result);
}
@Test
public void testTopicsDeltaCreateThenDelete() {
TopicsDelta delta = new TopicsDelta(TopicsImage.EMPTY);
delta.replay(new TopicRecord().setName("test").setTopicId(FOO_UUID));
assertEquals(delta.createdTopicIds().contains(FOO_UUID), true);
assertEquals(delta.deletedTopicIds().contains(FOO_UUID), false);
delta.replay(new RemoveTopicRecord().setTopicId(FOO_UUID));
assertEquals(delta.deletedTopicIds().contains(FOO_UUID), false);
assertEquals(delta.createdTopicIds().contains(FOO_UUID), false);
}
}