From 16359e70d3ae1daa82c2e1fffe79ebfd6a63e8e4 Mon Sep 17 00:00:00 2001 From: Igor Soarez Date: Tue, 4 Jun 2024 15:37:20 +0100 Subject: [PATCH] KAFKA-16583: Handle PartitionChangeRecord without directory IDs (#16118) When PartitionRegistration#merge() reads a PartitionChangeRecord from an older MetadataVersion, with a replica assignment change and without #directories() set, it produces a direcotry assignment of DirectoryId.UNASSIGNED. This is problematic because the MetadataVersion may not yet support directory assignments, leading to a UnwritableMetadataException in PartitionRegistration#toRecord. Since the Controller always sets directories on PartitionChangeRecord if the MetadataVersion supports it, via PartitionChangeBuilder, there's no need for PartitionRegistration#merge() to populate directories upon a replica assignment change. Reviewers: Luke Chen --- .../kafka/server/ReplicaManagerTest.scala | 1 + .../kafka/metadata/PartitionRegistration.java | 29 ++++++++++++------- .../metadata/PartitionRegistrationTest.java | 6 ++-- 3 files changed, 24 insertions(+), 12 deletions(-) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 151ffb9e184..bd78d967801 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -5970,6 +5970,7 @@ class ReplicaManagerTest { .setPartitionId(0) .setTopicId(FOO_UUID) .setReplicas(util.Arrays.asList(localId, localId + 1, localId + 2)) + .setDirectories(util.Arrays.asList(Uuid.fromString("fKgQ2axkQiuzt4ANqKbPkQ"), DirectoryId.UNASSIGNED, DirectoryId.UNASSIGNED)) .setIsr(util.Arrays.asList(localId, localId + 1)) ) followerMetadataImage = imageFromTopics(followerTopicsDelta.apply()) diff --git a/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java b/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java index 72476cf206c..4671b2f0549 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java @@ -182,8 +182,20 @@ public class PartitionRegistration { return record.directories(); } + private static boolean migratingDirectories(Uuid[] directories) { + if (directories == null) { + return true; + } + for (Uuid directory : directories) { + if (!DirectoryId.MIGRATING.equals(directory)) { + return false; + } + } + return true; + } + private static Uuid[] defaultToMigrating(Uuid[] directories, int numReplicas) { - if (directories == null || directories.length == 0) { + if (migratingDirectories(directories)) { return DirectoryId.migratingArray(numReplicas); } return directories; @@ -228,14 +240,11 @@ public class PartitionRegistration { public PartitionRegistration merge(PartitionChangeRecord record) { int[] newReplicas = (record.replicas() == null) ? replicas : Replicas.toArray(record.replicas()); - Uuid[] newDirectories; - if (record.directories() != null && !record.directories().isEmpty()) { - newDirectories = Uuid.toArray(checkDirectories(record)); - } else if (record.replicas() != null) { - newDirectories = Uuid.toArray(DirectoryId.createDirectoriesFrom(replicas, directories, record.replicas())); - } else { - newDirectories = directories; - } + Uuid[] newDirectories = defaultToMigrating( + (record.directories() == null) ? + directories : Uuid.toArray(checkDirectories(record)), + newReplicas.length + ); int[] newIsr = (record.isr() == null) ? isr : Replicas.toArray(record.isr()); int[] newRemovingReplicas = (record.removingReplicas() == null) ? removingReplicas : Replicas.toArray(record.removingReplicas()); @@ -257,7 +266,7 @@ public class PartitionRegistration { int[] newElr = (record.eligibleLeaderReplicas() == null) ? elr : Replicas.toArray(record.eligibleLeaderReplicas()); int[] newLastKnownElr = (record.lastKnownElr() == null) ? lastKnownElr : Replicas.toArray(record.lastKnownElr()); return new PartitionRegistration(newReplicas, - defaultToMigrating(newDirectories, replicas.length), + newDirectories, newIsr, newRemovingReplicas, newAddingReplicas, diff --git a/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java b/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java index 8816f2f141d..02b7a346234 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java @@ -153,7 +153,8 @@ public class PartitionRegistrationTest { PartitionRegistration partition1 = partition0.merge(new PartitionChangeRecord(). setRemovingReplicas(Collections.singletonList(3)). setAddingReplicas(Collections.singletonList(4)). - setReplicas(Arrays.asList(1, 2, 3, 4))); + setReplicas(Arrays.asList(1, 2, 3, 4)). + setDirectories(Arrays.asList(dir1, dir2, dir3, DirectoryId.UNASSIGNED))); assertEquals(new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 3, 4}). setDirectories(new Uuid[]{dir1, dir2, dir3, DirectoryId.UNASSIGNED}). setIsr(new int[] {1, 2, 3}).setRemovingReplicas(new int[] {3}).setAddingReplicas(new int[] {4}).setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(100).setPartitionEpoch(201).build(), partition1); @@ -161,7 +162,8 @@ public class PartitionRegistrationTest { setIsr(Arrays.asList(1, 2, 4)). setRemovingReplicas(Collections.emptyList()). setAddingReplicas(Collections.emptyList()). - setReplicas(Arrays.asList(1, 2, 4))); + setReplicas(Arrays.asList(1, 2, 4)). + setDirectories(Arrays.asList(dir1, dir2, DirectoryId.UNASSIGNED))); assertEquals(new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 4}). setDirectories(new Uuid[]{dir1, dir2, DirectoryId.UNASSIGNED}). setIsr(new int[] {1, 2, 4}).setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(100).setPartitionEpoch(202).build(), partition2);