KAFKA-16455: Check partition exists before send reassignments to server in ReassignPartitionsCommand (#15659)

Currently, when executing kafka-reassign-partitions.sh with the --execute option, if a partition number specified in the JSON file does not exist, this check occurs only when submitting the reassignments to alterPartitionReassignments on the server-side.

We can perform this check in advance before submitting the reassignments to the server side.

Reviewers: Luke Chen <showuon@gmail.com>
This commit is contained in:
Kuan-Po (Cooper) Tseng 2024-04-09 07:56:31 +08:00 committed by GitHub
parent 169ed60fe1
commit f895ab5145
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 16 additions and 1 deletions

View File

@ -641,7 +641,7 @@ public class ReassignPartitionsCommand {
* @param adminClient The AdminClient to use. * @param adminClient The AdminClient to use.
* @param partitions The partitions to get information about. * @param partitions The partitions to get information about.
* @return A map from partitions to broker assignments. * @return A map from partitions to broker assignments.
* If any topic can't be found, an exception will be thrown. * If any topic or partition can't be found, an exception will be thrown.
*/ */
static Map<TopicPartition, List<Integer>> getReplicaAssignmentForPartitions(Admin adminClient, static Map<TopicPartition, List<Integer>> getReplicaAssignmentForPartitions(Admin adminClient,
Set<TopicPartition> partitions Set<TopicPartition> partitions
@ -654,6 +654,13 @@ public class ReassignPartitionsCommand {
res.put(tp, info.replicas().stream().map(Node::id).collect(Collectors.toList())); res.put(tp, info.replicas().stream().map(Node::id).collect(Collectors.toList()));
}) })
); );
if (!res.keySet().equals(partitions)) {
Set<TopicPartition> missingPartitions = new HashSet<>(partitions);
missingPartitions.removeAll(res.keySet());
throw new ExecutionException(new UnknownTopicOrPartitionException("Unable to find partition: " +
missingPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(", "))));
}
return res; return res;
} }

View File

@ -83,6 +83,7 @@ import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.partitio
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.replicaMoveStatesToString; import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.replicaMoveStatesToString;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
@ -300,6 +301,13 @@ public class ReassignPartitionsUnitTest {
assertEquals(assignments, assertEquals(assignments,
getReplicaAssignmentForPartitions(adminClient, new HashSet<>(asList(new TopicPartition("foo", 0), new TopicPartition("bar", 0))))); getReplicaAssignmentForPartitions(adminClient, new HashSet<>(asList(new TopicPartition("foo", 0), new TopicPartition("bar", 0)))));
UnknownTopicOrPartitionException exception =
assertInstanceOf(UnknownTopicOrPartitionException.class,
assertThrows(ExecutionException.class,
() -> getReplicaAssignmentForPartitions(adminClient,
new HashSet<>(asList(new TopicPartition("foo", 0), new TopicPartition("foo", 10))))).getCause());
assertEquals("Unable to find partition: foo-10", exception.getMessage());
} }
} }