diff --git a/core/src/main/scala/kafka/zk/migration/ZkConfigMigrationClient.scala b/core/src/main/scala/kafka/zk/migration/ZkConfigMigrationClient.scala index 846a599875b..dffdc138aa1 100644 --- a/core/src/main/scala/kafka/zk/migration/ZkConfigMigrationClient.scala +++ b/core/src/main/scala/kafka/zk/migration/ZkConfigMigrationClient.scala @@ -228,8 +228,8 @@ class ZkConfigMigrationClient( val (migrationZkVersion, responses) = zkClient.retryMigrationRequestsUntilConnected(requests, state) if (responses.head.resultCode.equals(Code.NONODE)) { - // Not fatal. - error(s"Did not delete $configResource since the node did not exist.") + // Not fatal. This is expected in the case this is a topic config and we delete the topic + debug(s"Did not delete $configResource since the node did not exist.") state } else if (responses.head.resultCode.equals(Code.OK)) { // Write the notification znode if our update was successful diff --git a/metadata/src/main/java/org/apache/kafka/image/ConfigurationsDelta.java b/metadata/src/main/java/org/apache/kafka/image/ConfigurationsDelta.java index eab0505e332..9d794bfddc2 100644 --- a/metadata/src/main/java/org/apache/kafka/image/ConfigurationsDelta.java +++ b/metadata/src/main/java/org/apache/kafka/image/ConfigurationsDelta.java @@ -71,11 +71,12 @@ public final class ConfigurationsDelta { public void replay(RemoveTopicRecord record, String topicName) { ConfigResource resource = new ConfigResource(Type.TOPIC, topicName); - ConfigurationImage configImage = image.resourceData().getOrDefault(resource, - new ConfigurationImage(resource, Collections.emptyMap())); - ConfigurationDelta delta = changes.computeIfAbsent(resource, - __ -> new ConfigurationDelta(configImage)); - delta.deleteAll(); + if (image.resourceData().containsKey(resource)) { + ConfigurationImage configImage = image.resourceData().get(resource); + ConfigurationDelta delta = changes.computeIfAbsent(resource, + __ -> new ConfigurationDelta(configImage)); + delta.deleteAll(); + } } public ConfigurationsImage apply() { diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java index 8a7e148d579..6870ad8f0cb 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java @@ -250,11 +250,6 @@ public class KRaftMigrationZkWriter { migrationState -> migrationClient.topicClient().deleteTopic(topicName, migrationState) ); ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topicName); - operationConsumer.accept( - UPDATE_TOPIC_CONFIG, - "Updating Configs for Topic " + topicName + ", ID " + topicId, - migrationState -> migrationClient.configClient().deleteConfigs(resource, migrationState) - ); }); newPartitions.forEach((topicId, partitionMap) -> { diff --git a/metadata/src/test/java/org/apache/kafka/image/ConfigurationsImageTest.java b/metadata/src/test/java/org/apache/kafka/image/ConfigurationsImageTest.java index 9b7cd39dcd6..995a44b3f8f 100644 --- a/metadata/src/test/java/org/apache/kafka/image/ConfigurationsImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/ConfigurationsImageTest.java @@ -27,6 +27,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -61,26 +62,34 @@ public class ConfigurationsImageTest { IMAGE1 = new ConfigurationsImage(map1); DELTA1_RECORDS = new ArrayList<>(); + // remove configs DELTA1_RECORDS.add(new ApiMessageAndVersion(new ConfigRecord().setResourceType(BROKER.id()). setResourceName("0").setName("foo").setValue(null), CONFIG_RECORD.highestSupportedVersion())); + DELTA1_RECORDS.add(new ApiMessageAndVersion(new ConfigRecord().setResourceType(BROKER.id()). + setResourceName("0").setName("baz").setValue(null), + CONFIG_RECORD.highestSupportedVersion())); + DELTA1_RECORDS.add(new ApiMessageAndVersion(new ConfigRecord().setResourceType(BROKER.id()). + setResourceName("1").setName("foobar").setValue(null), + CONFIG_RECORD.highestSupportedVersion())); + // add new config to b1 DELTA1_RECORDS.add(new ApiMessageAndVersion(new ConfigRecord().setResourceType(BROKER.id()). setResourceName("1").setName("barfoo").setValue("bazfoo"), CONFIG_RECORD.highestSupportedVersion())); + // add new config to b2 + DELTA1_RECORDS.add(new ApiMessageAndVersion(new ConfigRecord().setResourceType(BROKER.id()). + setResourceName("2").setName("foo").setValue("bar"), + CONFIG_RECORD.highestSupportedVersion())); DELTA1 = new ConfigurationsDelta(IMAGE1); RecordTestUtils.replayAll(DELTA1, DELTA1_RECORDS); Map map2 = new HashMap<>(); - Map broker0Map2 = new HashMap<>(); - broker0Map2.put("baz", "quux"); - map2.put(new ConfigResource(BROKER, "0"), - new ConfigurationImage(new ConfigResource(BROKER, "0"), broker0Map2)); - Map broker1Map2 = new HashMap<>(); - broker1Map2.put("foobar", "foobaz"); - broker1Map2.put("barfoo", "bazfoo"); + Map broker1Map2 = Collections.singletonMap("barfoo", "bazfoo"); map2.put(new ConfigResource(BROKER, "1"), new ConfigurationImage(new ConfigResource(BROKER, "1"), broker1Map2)); + Map broker2Map = Collections.singletonMap("foo", "bar"); + map2.put(new ConfigResource(BROKER, "2"), new ConfigurationImage(new ConfigResource(BROKER, "2"), broker2Map)); IMAGE2 = new ConfigurationsImage(map2); } diff --git a/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java b/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java index 84a092ccd39..eabb63ff858 100644 --- a/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java @@ -94,10 +94,16 @@ public class TopicsImageTest { public static final Uuid FOO_UUID = Uuid.fromString("ThIaNwRnSM2Nt9Mx1v0RvA"); + private static final Uuid FOO_UUID2 = Uuid.fromString("9d3lha5qv8DoIl93jf8pbX"); + private static final Uuid BAR_UUID = Uuid.fromString("f62ptyETTjet8SL5ZeREiw"); private static final Uuid BAZ_UUID = Uuid.fromString("tgHBnRglT5W_RlENnuG5vg"); + private static final Uuid BAM_UUID = Uuid.fromString("b66ybsWIQoygs01vdjH07A"); + + private static final Uuid BAM_UUID2 = Uuid.fromString("yd6Sq3a9aK1G8snlKv7ag5"); + static { TOPIC_IMAGES1 = Arrays.asList( newTopicImage("foo", FOO_UUID, @@ -118,16 +124,20 @@ public class TopicsImageTest { IMAGE1 = new TopicsImage(newTopicsByIdMap(TOPIC_IMAGES1), newTopicsByNameMap(TOPIC_IMAGES1)); DELTA1_RECORDS = new ArrayList<>(); + // remove topic DELTA1_RECORDS.add(new ApiMessageAndVersion(new RemoveTopicRecord(). setTopicId(FOO_UUID), REMOVE_TOPIC_RECORD.highestSupportedVersion())); + // change topic DELTA1_RECORDS.add(new ApiMessageAndVersion(new PartitionChangeRecord(). setTopicId(BAR_UUID). setPartitionId(0).setLeader(1), PARTITION_CHANGE_RECORD.highestSupportedVersion())); + // add topic DELTA1_RECORDS.add(new ApiMessageAndVersion(new TopicRecord(). setName("baz").setTopicId(BAZ_UUID), TOPIC_RECORD.highestSupportedVersion())); + // add partition record for new topic DELTA1_RECORDS.add(new ApiMessageAndVersion(new PartitionRecord(). setPartitionId(0). setTopicId(BAZ_UUID). @@ -138,11 +148,23 @@ public class TopicsImageTest { setLeader(3). setLeaderEpoch(2). setPartitionEpoch(1), PARTITION_RECORD.highestSupportedVersion())); + // re-add topic with different topic id + DELTA1_RECORDS.add(new ApiMessageAndVersion(new TopicRecord(). + setName("foo").setTopicId(FOO_UUID2), + TOPIC_RECORD.highestSupportedVersion())); + // add then remove topic + DELTA1_RECORDS.add(new ApiMessageAndVersion(new TopicRecord(). + setName("bam").setTopicId(BAM_UUID), + TOPIC_RECORD.highestSupportedVersion())); + DELTA1_RECORDS.add(new ApiMessageAndVersion(new RemoveTopicRecord(). + setTopicId(BAM_UUID), + REMOVE_TOPIC_RECORD.highestSupportedVersion())); DELTA1 = new TopicsDelta(IMAGE1); RecordTestUtils.replayAll(DELTA1, DELTA1_RECORDS); List topics2 = Arrays.asList( + newTopicImage("foo", FOO_UUID2), newTopicImage("bar", BAR_UUID, new PartitionRegistration.Builder().setReplicas(new int[] {0, 1, 2, 3, 4}). setDirectories(DirectoryId.migratingArray(5)). @@ -188,22 +210,22 @@ public class TopicsImageTest { public void testBasicLocalChanges() { int localId = 3; /* Changes already include in DELTA1_RECORDS and IMAGE1: - * foo - topic id deleted + * foo - topic id deleted then recreated with different topic id * bar-0 - stay as follower with different partition epoch * baz-0 - new topic to leader + * bam - topic id created then deleted */ List topicRecords = new ArrayList<>(DELTA1_RECORDS); - // Create a new foo topic with a different id - Uuid newFooId = Uuid.fromString("b66ybsWIQoygs01vdjH07A"); + // Create a new bam topic with a different id topicRecords.add( new ApiMessageAndVersion( - new TopicRecord().setName("foo") .setTopicId(newFooId), + new TopicRecord().setName("bam").setTopicId(BAM_UUID2), TOPIC_RECORD.highestSupportedVersion() ) ); - topicRecords.add(newPartitionRecord(newFooId, 0, Arrays.asList(0, 1, 2))); - topicRecords.add(newPartitionRecord(newFooId, 1, Arrays.asList(0, 1, localId))); + topicRecords.add(newPartitionRecord(BAM_UUID2, 0, Arrays.asList(0, 1, 2))); + topicRecords.add(newPartitionRecord(BAM_UUID2, 1, Arrays.asList(0, 1, localId))); // baz-1 - new partition to follower topicRecords.add( @@ -224,10 +246,6 @@ public class TopicsImageTest { RecordTestUtils.replayAll(delta, topicRecords); LocalReplicaChanges changes = delta.localChanges(localId); - assertEquals( - new HashSet<>(Arrays.asList(new TopicPartition("foo", 0), new TopicPartition("foo", 1))), - changes.deletes() - ); assertEquals( new HashSet<>(Arrays.asList(new TopicPartition("baz", 0))), changes.electedLeaders().keySet() @@ -238,7 +256,8 @@ public class TopicsImageTest { ); assertEquals( new HashSet<>( - Arrays.asList(new TopicPartition("baz", 1), new TopicPartition("bar", 0), new TopicPartition("foo", 1)) + Arrays.asList(new TopicPartition("baz", 1), new TopicPartition("bar", 0), + new TopicPartition("bam", 1)) ), changes.followers().keySet() ); diff --git a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java index 67df5612096..dea5e62db96 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java @@ -567,7 +567,7 @@ public class KRaftMigrationDriverTest { TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE), "Waiting for KRaftMigrationDriver to enter ZK_MIGRATION state"); - // Modify topics in a KRaft snapshot -- delete foo, modify bar, add baz + // Modify topics in a KRaft snapshot -- delete foo, modify bar, add baz, add new foo, add bam, delete bam provenance = new MetadataProvenance(200, 1, 1); delta = new MetadataDelta(image); RecordTestUtils.replayAll(delta, DELTA1_RECORDS); @@ -577,10 +577,11 @@ public class KRaftMigrationDriverTest { assertEquals(1, topicClient.deletedTopics.size()); assertEquals("foo", topicClient.deletedTopics.get(0)); - assertEquals(1, topicClient.createdTopics.size()); - assertEquals("baz", topicClient.createdTopics.get(0)); + assertEquals(2, topicClient.createdTopics.size()); + assertTrue(topicClient.createdTopics.contains("foo")); + assertTrue(topicClient.createdTopics.contains("baz")); assertTrue(topicClient.updatedTopicPartitions.get("bar").contains(0)); - assertEquals(new ConfigResource(ConfigResource.Type.TOPIC, "foo"), configClient.deletedResources.get(0)); + assertEquals(0, configClient.deletedResources.size()); }); } @@ -621,7 +622,7 @@ public class KRaftMigrationDriverTest { TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE), "Waiting for KRaftMigrationDriver to enter DUAL_WRITE state"); - // Modify topics in a KRaft snapshot -- delete foo, modify bar, add baz + // Modify topics in a KRaft snapshot -- delete foo, modify bar, add baz, add new foo, add bam, delete bam provenance = new MetadataProvenance(200, 1, 1); delta = new MetadataDelta(image); RecordTestUtils.replayAll(delta, DELTA1_RECORDS); @@ -631,10 +632,11 @@ public class KRaftMigrationDriverTest { assertEquals(1, topicClient.deletedTopics.size()); assertEquals("foo", topicClient.deletedTopics.get(0)); - assertEquals(1, topicClient.createdTopics.size()); - assertEquals("baz", topicClient.createdTopics.get(0)); + assertEquals(2, topicClient.createdTopics.size()); + assertTrue(topicClient.createdTopics.contains("foo")); + assertTrue(topicClient.createdTopics.contains("baz")); assertTrue(topicClient.updatedTopicPartitions.get("bar").contains(0)); - assertEquals(new ConfigResource(ConfigResource.Type.TOPIC, "foo"), configClient.deletedResources.get(0)); + assertEquals(0, configClient.deletedResources.size()); }); } @@ -730,7 +732,7 @@ public class KRaftMigrationDriverTest { migrationClient.setMigrationRecoveryState( ZkMigrationLeadershipState.EMPTY.withKRaftMetadataOffsetAndEpoch(100, 1)); - // Modify topics in a KRaft -- delete foo, modify bar, add baz + // Modify topics in a KRaft -- delete foo, modify bar, add baz, add new foo, add bam, delete bam provenance = new MetadataProvenance(200, 1, 1); delta = new MetadataDelta(image); RecordTestUtils.replayAll(delta, DELTA1_RECORDS); @@ -746,10 +748,11 @@ public class KRaftMigrationDriverTest { ""); assertEquals(1, topicClient.deletedTopics.size()); assertEquals("foo", topicClient.deletedTopics.get(0)); - assertEquals(1, topicClient.createdTopics.size()); - assertEquals("baz", topicClient.createdTopics.get(0)); + assertEquals(2, topicClient.createdTopics.size()); + assertTrue(topicClient.createdTopics.contains("foo")); + assertTrue(topicClient.createdTopics.contains("baz")); assertTrue(topicClient.updatedTopicPartitions.get("bar").contains(0)); - assertEquals(new ConfigResource(ConfigResource.Type.TOPIC, "foo"), configClient.deletedResources.get(0)); + assertEquals(0, configClient.deletedResources.size()); }); }