MINOR: fix reassign command bug (#20003)

see

9570c67b8c/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala (L1208)
During the rewrite for
[KAFKA-14595](https://github.com/apache/kafka/pull/13247), the relevant
condition was omitted.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Lan Ding 2025-06-25 02:34:13 +08:00 committed by Chia-Ping Tsai
parent 1dad77615d
commit 75ab2f5d03
2 changed files with 39 additions and 3 deletions

View File

@ -1271,7 +1271,7 @@ public class ReassignPartitionsCommand {
Set<TopicPartition> targetPartsSet = targetParts.stream().map(t -> t.getKey()).collect(Collectors.toSet()); Set<TopicPartition> targetPartsSet = targetParts.stream().map(t -> t.getKey()).collect(Collectors.toSet());
Set<TopicPartition> curReassigningParts = new HashSet<>(); Set<TopicPartition> curReassigningParts = new HashSet<>();
adminClient.listPartitionReassignments(targetPartsSet).reassignments().get().forEach((part, reassignment) -> { adminClient.listPartitionReassignments(targetPartsSet).reassignments().get().forEach((part, reassignment) -> {
if (reassignment.addingReplicas().isEmpty() || !reassignment.removingReplicas().isEmpty()) if (!reassignment.addingReplicas().isEmpty() || !reassignment.removingReplicas().isEmpty())
curReassigningParts.add(part); curReassigningParts.add(part);
}); });
if (!curReassigningParts.isEmpty()) { if (!curReassigningParts.isEmpty()) {

View File

@ -316,7 +316,7 @@ public class ReassignPartitionsCommandTest {
} }
@ClusterTest @ClusterTest
public void testCancellationWithAddingReplicaInIsr() throws Exception { public void testCancellationWithAddingAndRemovingReplicaInIsr() throws Exception {
createTopics(); createTopics();
TopicPartition foo0 = new TopicPartition("foo", 0); TopicPartition foo0 = new TopicPartition("foo", 0);
produceMessages(foo0.topic(), foo0.partition(), 200); produceMessages(foo0.topic(), foo0.partition(), 200);
@ -351,6 +351,42 @@ public class ReassignPartitionsCommandTest {
verifyReplicaDeleted(new TopicPartitionReplica(foo0.topic(), foo0.partition(), 4)); verifyReplicaDeleted(new TopicPartitionReplica(foo0.topic(), foo0.partition(), 4));
} }
@ClusterTest
public void testCancellationWithAddingReplicaInIsr() throws Exception {
createTopics();
TopicPartition foo0 = new TopicPartition("foo", 0);
produceMessages(foo0.topic(), foo0.partition(), 200);
// The reassignment will bring replicas 3 and 4 into the replica set.
String assignment = "{\"version\":1,\"partitions\":" +
"[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,1,2,3,4],\"log_dirs\":[\"any\",\"any\",\"any\",\"any\",\"any\"]}" +
"]}";
// We will throttle replica 4 so that only replica 3 joins the ISR
setReplicationThrottleForPartitions(foo0);
// Execute the assignment and wait for replica 3 (only) to join the ISR
runExecuteAssignment(false, assignment, -1L, -1L);
try (Admin admin = Admin.create(Collections.singletonMap(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()))) {
TestUtils.waitForCondition(
() -> {
Set<Integer> isr = admin.describeTopics(Collections.singleton(foo0.topic()))
.allTopicNames().get().get(foo0.topic()).partitions().stream()
.filter(p -> p.partition() == foo0.partition())
.flatMap(p -> p.isr().stream())
.map(Node::id).collect(Collectors.toSet());
return isr.containsAll(Arrays.asList(0, 1, 2, 3));
},
"Timed out while waiting for replica 3 to join the ISR"
);
}
// Now cancel the assignment and verify that the partition is removed from cancelled replicas
assertEquals(new AbstractMap.SimpleImmutableEntry<>(singleton(foo0), Collections.emptySet()), runCancelAssignment(assignment, true, true));
verifyReplicaDeleted(new TopicPartitionReplica(foo0.topic(), foo0.partition(), 3));
verifyReplicaDeleted(new TopicPartitionReplica(foo0.topic(), foo0.partition(), 4));
}
/** /**
* Test moving partitions between directories. * Test moving partitions between directories.
*/ */