KAFKA-16206: Fix unnecessary topic config deletion during ZK migration (#14206)

Reviewers: Mickael Maison <mickael.maison@gmail.com>, Ron Dagostino <rndgstn@gmail.com>
This commit is contained in:
Alyssa Huang 2024-03-21 07:38:42 -07:00 committed by GitHub
parent a41c10fd49
commit 03f7b5aa3a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 69 additions and 42 deletions

View File

@ -228,8 +228,8 @@ class ZkConfigMigrationClient(
val (migrationZkVersion, responses) = zkClient.retryMigrationRequestsUntilConnected(requests, state)
if (responses.head.resultCode.equals(Code.NONODE)) {
// Not fatal.
error(s"Did not delete $configResource since the node did not exist.")
// Not fatal. This is expected in the case this is a topic config and we delete the topic
debug(s"Did not delete $configResource since the node did not exist.")
state
} else if (responses.head.resultCode.equals(Code.OK)) {
// Write the notification znode if our update was successful

View File

@ -71,11 +71,12 @@ public final class ConfigurationsDelta {
public void replay(RemoveTopicRecord record, String topicName) {
ConfigResource resource =
new ConfigResource(Type.TOPIC, topicName);
ConfigurationImage configImage = image.resourceData().getOrDefault(resource,
new ConfigurationImage(resource, Collections.emptyMap()));
ConfigurationDelta delta = changes.computeIfAbsent(resource,
__ -> new ConfigurationDelta(configImage));
delta.deleteAll();
if (image.resourceData().containsKey(resource)) {
ConfigurationImage configImage = image.resourceData().get(resource);
ConfigurationDelta delta = changes.computeIfAbsent(resource,
__ -> new ConfigurationDelta(configImage));
delta.deleteAll();
}
}
public ConfigurationsImage apply() {

View File

@ -250,11 +250,6 @@ public class KRaftMigrationZkWriter {
migrationState -> migrationClient.topicClient().deleteTopic(topicName, migrationState)
);
ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
operationConsumer.accept(
UPDATE_TOPIC_CONFIG,
"Updating Configs for Topic " + topicName + ", ID " + topicId,
migrationState -> migrationClient.configClient().deleteConfigs(resource, migrationState)
);
});
newPartitions.forEach((topicId, partitionMap) -> {

View File

@ -27,6 +27,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -61,26 +62,34 @@ public class ConfigurationsImageTest {
IMAGE1 = new ConfigurationsImage(map1);
DELTA1_RECORDS = new ArrayList<>();
// remove configs
DELTA1_RECORDS.add(new ApiMessageAndVersion(new ConfigRecord().setResourceType(BROKER.id()).
setResourceName("0").setName("foo").setValue(null),
CONFIG_RECORD.highestSupportedVersion()));
DELTA1_RECORDS.add(new ApiMessageAndVersion(new ConfigRecord().setResourceType(BROKER.id()).
setResourceName("0").setName("baz").setValue(null),
CONFIG_RECORD.highestSupportedVersion()));
DELTA1_RECORDS.add(new ApiMessageAndVersion(new ConfigRecord().setResourceType(BROKER.id()).
setResourceName("1").setName("foobar").setValue(null),
CONFIG_RECORD.highestSupportedVersion()));
// add new config to b1
DELTA1_RECORDS.add(new ApiMessageAndVersion(new ConfigRecord().setResourceType(BROKER.id()).
setResourceName("1").setName("barfoo").setValue("bazfoo"),
CONFIG_RECORD.highestSupportedVersion()));
// add new config to b2
DELTA1_RECORDS.add(new ApiMessageAndVersion(new ConfigRecord().setResourceType(BROKER.id()).
setResourceName("2").setName("foo").setValue("bar"),
CONFIG_RECORD.highestSupportedVersion()));
DELTA1 = new ConfigurationsDelta(IMAGE1);
RecordTestUtils.replayAll(DELTA1, DELTA1_RECORDS);
Map<ConfigResource, ConfigurationImage> map2 = new HashMap<>();
Map<String, String> broker0Map2 = new HashMap<>();
broker0Map2.put("baz", "quux");
map2.put(new ConfigResource(BROKER, "0"),
new ConfigurationImage(new ConfigResource(BROKER, "0"), broker0Map2));
Map<String, String> broker1Map2 = new HashMap<>();
broker1Map2.put("foobar", "foobaz");
broker1Map2.put("barfoo", "bazfoo");
Map<String, String> broker1Map2 = Collections.singletonMap("barfoo", "bazfoo");
map2.put(new ConfigResource(BROKER, "1"),
new ConfigurationImage(new ConfigResource(BROKER, "1"), broker1Map2));
Map<String, String> broker2Map = Collections.singletonMap("foo", "bar");
map2.put(new ConfigResource(BROKER, "2"), new ConfigurationImage(new ConfigResource(BROKER, "2"), broker2Map));
IMAGE2 = new ConfigurationsImage(map2);
}

View File

@ -94,10 +94,16 @@ public class TopicsImageTest {
public static final Uuid FOO_UUID = Uuid.fromString("ThIaNwRnSM2Nt9Mx1v0RvA");
private static final Uuid FOO_UUID2 = Uuid.fromString("9d3lha5qv8DoIl93jf8pbX");
private static final Uuid BAR_UUID = Uuid.fromString("f62ptyETTjet8SL5ZeREiw");
private static final Uuid BAZ_UUID = Uuid.fromString("tgHBnRglT5W_RlENnuG5vg");
private static final Uuid BAM_UUID = Uuid.fromString("b66ybsWIQoygs01vdjH07A");
private static final Uuid BAM_UUID2 = Uuid.fromString("yd6Sq3a9aK1G8snlKv7ag5");
static {
TOPIC_IMAGES1 = Arrays.asList(
newTopicImage("foo", FOO_UUID,
@ -118,16 +124,20 @@ public class TopicsImageTest {
IMAGE1 = new TopicsImage(newTopicsByIdMap(TOPIC_IMAGES1), newTopicsByNameMap(TOPIC_IMAGES1));
DELTA1_RECORDS = new ArrayList<>();
// remove topic
DELTA1_RECORDS.add(new ApiMessageAndVersion(new RemoveTopicRecord().
setTopicId(FOO_UUID),
REMOVE_TOPIC_RECORD.highestSupportedVersion()));
// change topic
DELTA1_RECORDS.add(new ApiMessageAndVersion(new PartitionChangeRecord().
setTopicId(BAR_UUID).
setPartitionId(0).setLeader(1),
PARTITION_CHANGE_RECORD.highestSupportedVersion()));
// add topic
DELTA1_RECORDS.add(new ApiMessageAndVersion(new TopicRecord().
setName("baz").setTopicId(BAZ_UUID),
TOPIC_RECORD.highestSupportedVersion()));
// add partition record for new topic
DELTA1_RECORDS.add(new ApiMessageAndVersion(new PartitionRecord().
setPartitionId(0).
setTopicId(BAZ_UUID).
@ -138,11 +148,23 @@ public class TopicsImageTest {
setLeader(3).
setLeaderEpoch(2).
setPartitionEpoch(1), PARTITION_RECORD.highestSupportedVersion()));
// re-add topic with different topic id
DELTA1_RECORDS.add(new ApiMessageAndVersion(new TopicRecord().
setName("foo").setTopicId(FOO_UUID2),
TOPIC_RECORD.highestSupportedVersion()));
// add then remove topic
DELTA1_RECORDS.add(new ApiMessageAndVersion(new TopicRecord().
setName("bam").setTopicId(BAM_UUID),
TOPIC_RECORD.highestSupportedVersion()));
DELTA1_RECORDS.add(new ApiMessageAndVersion(new RemoveTopicRecord().
setTopicId(BAM_UUID),
REMOVE_TOPIC_RECORD.highestSupportedVersion()));
DELTA1 = new TopicsDelta(IMAGE1);
RecordTestUtils.replayAll(DELTA1, DELTA1_RECORDS);
List<TopicImage> topics2 = Arrays.asList(
newTopicImage("foo", FOO_UUID2),
newTopicImage("bar", BAR_UUID,
new PartitionRegistration.Builder().setReplicas(new int[] {0, 1, 2, 3, 4}).
setDirectories(DirectoryId.migratingArray(5)).
@ -188,22 +210,22 @@ public class TopicsImageTest {
public void testBasicLocalChanges() {
int localId = 3;
/* Changes already include in DELTA1_RECORDS and IMAGE1:
* foo - topic id deleted
* foo - topic id deleted then recreated with different topic id
* bar-0 - stay as follower with different partition epoch
* baz-0 - new topic to leader
* bam - topic id created then deleted
*/
List<ApiMessageAndVersion> topicRecords = new ArrayList<>(DELTA1_RECORDS);
// Create a new foo topic with a different id
Uuid newFooId = Uuid.fromString("b66ybsWIQoygs01vdjH07A");
// Create a new bam topic with a different id
topicRecords.add(
new ApiMessageAndVersion(
new TopicRecord().setName("foo") .setTopicId(newFooId),
new TopicRecord().setName("bam").setTopicId(BAM_UUID2),
TOPIC_RECORD.highestSupportedVersion()
)
);
topicRecords.add(newPartitionRecord(newFooId, 0, Arrays.asList(0, 1, 2)));
topicRecords.add(newPartitionRecord(newFooId, 1, Arrays.asList(0, 1, localId)));
topicRecords.add(newPartitionRecord(BAM_UUID2, 0, Arrays.asList(0, 1, 2)));
topicRecords.add(newPartitionRecord(BAM_UUID2, 1, Arrays.asList(0, 1, localId)));
// baz-1 - new partition to follower
topicRecords.add(
@ -224,10 +246,6 @@ public class TopicsImageTest {
RecordTestUtils.replayAll(delta, topicRecords);
LocalReplicaChanges changes = delta.localChanges(localId);
assertEquals(
new HashSet<>(Arrays.asList(new TopicPartition("foo", 0), new TopicPartition("foo", 1))),
changes.deletes()
);
assertEquals(
new HashSet<>(Arrays.asList(new TopicPartition("baz", 0))),
changes.electedLeaders().keySet()
@ -238,7 +256,8 @@ public class TopicsImageTest {
);
assertEquals(
new HashSet<>(
Arrays.asList(new TopicPartition("baz", 1), new TopicPartition("bar", 0), new TopicPartition("foo", 1))
Arrays.asList(new TopicPartition("baz", 1), new TopicPartition("bar", 0),
new TopicPartition("bam", 1))
),
changes.followers().keySet()
);

View File

@ -567,7 +567,7 @@ public class KRaftMigrationDriverTest {
TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE),
"Waiting for KRaftMigrationDriver to enter ZK_MIGRATION state");
// Modify topics in a KRaft snapshot -- delete foo, modify bar, add baz
// Modify topics in a KRaft snapshot -- delete foo, modify bar, add baz, add new foo, add bam, delete bam
provenance = new MetadataProvenance(200, 1, 1);
delta = new MetadataDelta(image);
RecordTestUtils.replayAll(delta, DELTA1_RECORDS);
@ -577,10 +577,11 @@ public class KRaftMigrationDriverTest {
assertEquals(1, topicClient.deletedTopics.size());
assertEquals("foo", topicClient.deletedTopics.get(0));
assertEquals(1, topicClient.createdTopics.size());
assertEquals("baz", topicClient.createdTopics.get(0));
assertEquals(2, topicClient.createdTopics.size());
assertTrue(topicClient.createdTopics.contains("foo"));
assertTrue(topicClient.createdTopics.contains("baz"));
assertTrue(topicClient.updatedTopicPartitions.get("bar").contains(0));
assertEquals(new ConfigResource(ConfigResource.Type.TOPIC, "foo"), configClient.deletedResources.get(0));
assertEquals(0, configClient.deletedResources.size());
});
}
@ -621,7 +622,7 @@ public class KRaftMigrationDriverTest {
TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE),
"Waiting for KRaftMigrationDriver to enter DUAL_WRITE state");
// Modify topics in a KRaft snapshot -- delete foo, modify bar, add baz
// Modify topics in a KRaft snapshot -- delete foo, modify bar, add baz, add new foo, add bam, delete bam
provenance = new MetadataProvenance(200, 1, 1);
delta = new MetadataDelta(image);
RecordTestUtils.replayAll(delta, DELTA1_RECORDS);
@ -631,10 +632,11 @@ public class KRaftMigrationDriverTest {
assertEquals(1, topicClient.deletedTopics.size());
assertEquals("foo", topicClient.deletedTopics.get(0));
assertEquals(1, topicClient.createdTopics.size());
assertEquals("baz", topicClient.createdTopics.get(0));
assertEquals(2, topicClient.createdTopics.size());
assertTrue(topicClient.createdTopics.contains("foo"));
assertTrue(topicClient.createdTopics.contains("baz"));
assertTrue(topicClient.updatedTopicPartitions.get("bar").contains(0));
assertEquals(new ConfigResource(ConfigResource.Type.TOPIC, "foo"), configClient.deletedResources.get(0));
assertEquals(0, configClient.deletedResources.size());
});
}
@ -730,7 +732,7 @@ public class KRaftMigrationDriverTest {
migrationClient.setMigrationRecoveryState(
ZkMigrationLeadershipState.EMPTY.withKRaftMetadataOffsetAndEpoch(100, 1));
// Modify topics in a KRaft -- delete foo, modify bar, add baz
// Modify topics in a KRaft -- delete foo, modify bar, add baz, add new foo, add bam, delete bam
provenance = new MetadataProvenance(200, 1, 1);
delta = new MetadataDelta(image);
RecordTestUtils.replayAll(delta, DELTA1_RECORDS);
@ -746,10 +748,11 @@ public class KRaftMigrationDriverTest {
"");
assertEquals(1, topicClient.deletedTopics.size());
assertEquals("foo", topicClient.deletedTopics.get(0));
assertEquals(1, topicClient.createdTopics.size());
assertEquals("baz", topicClient.createdTopics.get(0));
assertEquals(2, topicClient.createdTopics.size());
assertTrue(topicClient.createdTopics.contains("foo"));
assertTrue(topicClient.createdTopics.contains("baz"));
assertTrue(topicClient.updatedTopicPartitions.get("bar").contains(0));
assertEquals(new ConfigResource(ConfigResource.Type.TOPIC, "foo"), configClient.deletedResources.get(0));
assertEquals(0, configClient.deletedResources.size());
});
}