diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index e59fc34b947..2ceae56a7ed 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -272,7 +272,7 @@
+ files="(QuorumController|ReplicationControlManager|ReplicationControlManagerTest).java"/>
handleElectReplicaLeader(request)
case ApiKeys.INCREMENTAL_ALTER_CONFIGS => maybeForwardToController(request, handleIncrementalAlterConfigsRequest)
case ApiKeys.ALTER_PARTITION_REASSIGNMENTS => maybeForwardToController(request, handleAlterPartitionReassignmentsRequest)
- case ApiKeys.LIST_PARTITION_REASSIGNMENTS => handleListPartitionReassignmentsRequest(request)
+ case ApiKeys.LIST_PARTITION_REASSIGNMENTS => maybeForwardToController(request, handleListPartitionReassignmentsRequest)
case ApiKeys.OFFSET_DELETE => handleOffsetDeleteRequest(request, requestLocal)
case ApiKeys.DESCRIBE_CLIENT_QUOTAS => handleDescribeClientQuotasRequest(request)
case ApiKeys.ALTER_CLIENT_QUOTAS => maybeForwardToController(request, handleAlterClientQuotasRequest)
diff --git a/core/src/test/scala/integration/kafka/server/RaftClusterTest.scala b/core/src/test/scala/integration/kafka/server/RaftClusterTest.scala
index 30e04f15f83..e30179d4121 100644
--- a/core/src/test/scala/integration/kafka/server/RaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/RaftClusterTest.scala
@@ -21,7 +21,8 @@ import kafka.network.SocketServer
import kafka.server.IntegrationTestUtils.connectAndReceive
import kafka.testkit.{BrokerNode, KafkaClusterTestKit, TestKitNodes}
import kafka.utils.TestUtils
-import org.apache.kafka.clients.admin.{Admin, NewTopic}
+import org.apache.kafka.clients.admin.{Admin, NewPartitionReassignment, NewTopic}
+import org.apache.kafka.common.{TopicPartition, TopicPartitionInfo};
import org.apache.kafka.common.message.DescribeClusterRequestData
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent}
@@ -31,7 +32,8 @@ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{Test, Timeout}
import java.util
-import java.util.Collections
+import java.util.{Arrays, Collections, Optional}
+import scala.collection.mutable
import scala.concurrent.duration.{FiniteDuration, MILLISECONDS, SECONDS}
import scala.jdk.CollectionConverters._
@@ -98,35 +100,14 @@ class RaftClusterTest {
val newTopic = Collections.singletonList(new NewTopic("test-topic", 1, 3.toShort))
val createTopicResult = admin.createTopics(newTopic)
createTopicResult.all().get()
-
- // List created topic
- TestUtils.waitUntilTrue(() => {
- val listTopicsResult = admin.listTopics()
- val result = listTopicsResult.names().get().size() == newTopic.size()
- if (result) {
- newTopic forEach(topic => {
- assertTrue(listTopicsResult.names().get().contains(topic.name()))
- })
- }
- result
- }, "Topics created were not listed.")
+ waitForTopicListing(admin, Seq("test-topic"), Seq())
// Delete topic
val deleteResult = admin.deleteTopics(Collections.singletonList("test-topic"))
deleteResult.all().get()
// List again
- TestUtils.waitUntilTrue(() => {
- val listTopicsResult = admin.listTopics()
- val result = listTopicsResult.names().get().size() != newTopic.size()
- if (result) {
- newTopic forEach(topic => {
- assertFalse(listTopicsResult.names().get().contains(topic.name()))
- })
- }
- result
- }, "Topic was not removed from list.")
-
+ waitForTopicListing(admin, Seq(), Seq("test-topic"))
} finally {
admin.close()
}
@@ -153,66 +134,14 @@ class RaftClusterTest {
try {
// Create many topics
val newTopic = new util.ArrayList[NewTopic]()
- newTopic.add(new NewTopic("test-topic-1", 1, 3.toShort))
- newTopic.add(new NewTopic("test-topic-2", 1, 3.toShort))
- newTopic.add(new NewTopic("test-topic-3", 1, 3.toShort))
+ newTopic.add(new NewTopic("test-topic-1", 2, 3.toShort))
+ newTopic.add(new NewTopic("test-topic-2", 2, 3.toShort))
+ newTopic.add(new NewTopic("test-topic-3", 2, 3.toShort))
val createTopicResult = admin.createTopics(newTopic)
createTopicResult.all().get()
// List created topic
- TestUtils.waitUntilTrue(() => {
- val listTopicsResult = admin.listTopics()
- val result = listTopicsResult.names().get().size() == newTopic.size()
- if (result) {
- newTopic forEach(topic => {
- assertTrue(listTopicsResult.names().get().contains(topic.name()))
- })
- }
- result
- }, "Topics created were not listed.")
- } finally {
- admin.close()
- }
- } finally {
- cluster.close()
- }
- }
-
- @Test
- def testCreateClusterAndCreateAndManyTopicsWithManyPartitions(): Unit = {
- val cluster = new KafkaClusterTestKit.Builder(
- new TestKitNodes.Builder().
- setNumBrokerNodes(3).
- setNumControllerNodes(3).build()).build()
- try {
- cluster.format()
- cluster.startup()
- cluster.waitForReadyBrokers()
- TestUtils.waitUntilTrue(() => cluster.brokers().get(0).currentState() == BrokerState.RUNNING,
- "Broker never made it to RUNNING state.")
- TestUtils.waitUntilTrue(() => cluster.raftManagers().get(0).kafkaRaftClient.leaderAndEpoch().leaderId.isPresent,
- "RaftManager was not initialized.")
- val admin = Admin.create(cluster.clientProperties())
- try {
- // Create many topics
- val newTopic = new util.ArrayList[NewTopic]()
- newTopic.add(new NewTopic("test-topic-1", 3, 3.toShort))
- newTopic.add(new NewTopic("test-topic-2", 3, 3.toShort))
- newTopic.add(new NewTopic("test-topic-3", 3, 3.toShort))
- val createTopicResult = admin.createTopics(newTopic)
- createTopicResult.all().get()
-
- // List created topic
- TestUtils.waitUntilTrue(() => {
- val listTopicsResult = admin.listTopics()
- val result = listTopicsResult.names().get().size() == newTopic.size()
- if (result) {
- newTopic forEach(topic => {
- assertTrue(listTopicsResult.names().get().contains(topic.name()))
- })
- }
- result
- }, "Topics created were not listed.")
+ waitForTopicListing(admin, Seq("test-topic-1", "test-topic-2", "test-topic-3"), Seq())
} finally {
admin.close()
}
@@ -422,4 +351,78 @@ class RaftClusterTest {
listenerName = listenerName
)
+ @Test
+ def testCreateClusterAndPerformReassignment(): Unit = {
+ val cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder().
+ setNumBrokerNodes(4).
+ setNumControllerNodes(3).build()).build()
+ try {
+ cluster.format()
+ cluster.startup()
+ cluster.waitForReadyBrokers()
+ val admin = Admin.create(cluster.clientProperties())
+ try {
+ // Create the topic.
+ val assignments = new util.HashMap[Integer, util.List[Integer]]
+ assignments.put(0, Arrays.asList(0, 1, 2))
+ assignments.put(1, Arrays.asList(1, 2, 3))
+ assignments.put(2, Arrays.asList(2, 3, 0))
+ assignments.put(3, Arrays.asList(3, 2, 1))
+ val createTopicResult = admin.createTopics(Collections.singletonList(
+ new NewTopic("foo", assignments)))
+ createTopicResult.all().get()
+ waitForTopicListing(admin, Seq("foo"), Seq())
+
+ // Start some reassignments.
+ assertEquals(Collections.emptyMap(), admin.listPartitionReassignments().reassignments().get())
+ val reassignments = new util.HashMap[TopicPartition, Optional[NewPartitionReassignment]]
+ reassignments.put(new TopicPartition("foo", 0),
+ Optional.of(new NewPartitionReassignment(Arrays.asList(2, 1, 0))))
+ reassignments.put(new TopicPartition("foo", 1),
+ Optional.of(new NewPartitionReassignment(Arrays.asList(0, 1, 2))))
+ reassignments.put(new TopicPartition("foo", 2),
+ Optional.of(new NewPartitionReassignment(Arrays.asList(2, 3))))
+ reassignments.put(new TopicPartition("foo", 3),
+ Optional.of(new NewPartitionReassignment(Arrays.asList(3, 2, 0, 1))))
+ admin.alterPartitionReassignments(reassignments).all().get()
+ TestUtils.waitUntilTrue(
+ () => admin.listPartitionReassignments().reassignments().get().isEmpty(),
+ "The reassignment never completed.")
+ var currentMapping: Seq[Seq[Int]] = Seq()
+ val expectedMapping = Seq(Seq(2, 1, 0), Seq(0, 1, 2), Seq(2, 3), Seq(3, 2, 0, 1))
+ TestUtils.waitUntilTrue( () => {
+ val topicInfoMap = admin.describeTopics(Collections.singleton("foo")).all().get()
+ if (topicInfoMap.containsKey("foo")) {
+ currentMapping = translatePartitionInfoToSeq(topicInfoMap.get("foo").partitions())
+ expectedMapping.equals(currentMapping)
+ } else {
+ false
+ }
+ }, "Timed out waiting for replica assignments for topic foo. " +
+ s"Wanted: ${expectedMapping}. Got: ${currentMapping}")
+ } finally {
+ admin.close()
+ }
+ } finally {
+ cluster.close()
+ }
+ }
+
+ private def translatePartitionInfoToSeq(partitions: util.List[TopicPartitionInfo]): Seq[Seq[Int]] = {
+ partitions.asScala.map(partition => partition.replicas().asScala.map(_.id()).toSeq).toSeq
+ }
+
+ private def waitForTopicListing(admin: Admin,
+ expectedPresent: Seq[String],
+ expectedAbsent: Seq[String]): Unit = {
+ val topicsNotFound = new util.HashSet[String]
+ var extraTopics: mutable.Set[String] = null
+ expectedPresent.foreach(topicsNotFound.add(_))
+ TestUtils.waitUntilTrue(() => {
+ admin.listTopics().names().get().forEach(name => topicsNotFound.remove(name))
+ extraTopics = admin.listTopics().names().get().asScala.filter(expectedAbsent.contains(_))
+ topicsNotFound.isEmpty && extraTopics.isEmpty
+ }, s"Failed to find topic(s): ${topicsNotFound.asScala} and NOT find topic(s): ${extraTopics}")
+ }
}
diff --git a/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java b/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
new file mode 100644
index 00000000000..5092ba164ba
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.metadata.PartitionRegistration;
+import org.apache.kafka.metadata.Replicas;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+import static org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_CHANGE_RECORD;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
+
+/**
+ * PartitionChangeBuilder handles changing partition registrations.
+ */
+public class PartitionChangeBuilder {
+ public static boolean changeRecordIsNoOp(PartitionChangeRecord record) {
+ if (record.isr() != null) return false;
+ if (record.leader() != NO_LEADER_CHANGE) return false;
+ if (record.replicas() != null) return false;
+ if (record.removingReplicas() != null) return false;
+ if (record.addingReplicas() != null) return false;
+ return true;
+ }
+
+ private final PartitionRegistration partition;
+ private final Uuid topicId;
+ private final int partitionId;
+ private final Function isAcceptableLeader;
+ private final Supplier uncleanElectionOk;
+ private List targetIsr;
+ private List targetReplicas;
+ private List targetRemoving;
+ private List targetAdding;
+ private boolean alwaysElectPreferredIfPossible;
+
+ public PartitionChangeBuilder(PartitionRegistration partition,
+ Uuid topicId,
+ int partitionId,
+ Function isAcceptableLeader,
+ Supplier uncleanElectionOk) {
+ this.partition = partition;
+ this.topicId = topicId;
+ this.partitionId = partitionId;
+ this.isAcceptableLeader = isAcceptableLeader;
+ this.uncleanElectionOk = uncleanElectionOk;
+ this.targetIsr = Replicas.toList(partition.isr);
+ this.targetReplicas = Replicas.toList(partition.replicas);
+ this.targetRemoving = Replicas.toList(partition.removingReplicas);
+ this.targetAdding = Replicas.toList(partition.addingReplicas);
+ this.alwaysElectPreferredIfPossible = false;
+ }
+
+ public PartitionChangeBuilder setTargetIsr(List targetIsr) {
+ this.targetIsr = targetIsr;
+ return this;
+ }
+
+ public PartitionChangeBuilder setTargetReplicas(List targetReplicas) {
+ this.targetReplicas = targetReplicas;
+ return this;
+ }
+
+ public PartitionChangeBuilder setAlwaysElectPreferredIfPossible(boolean alwaysElectPreferredIfPossible) {
+ this.alwaysElectPreferredIfPossible = alwaysElectPreferredIfPossible;
+ return this;
+ }
+
+ public PartitionChangeBuilder setTargetRemoving(List targetRemoving) {
+ this.targetRemoving = targetRemoving;
+ return this;
+ }
+
+ public PartitionChangeBuilder setTargetAdding(List targetAdding) {
+ this.targetAdding = targetAdding;
+ return this;
+ }
+
+ boolean shouldTryElection() {
+ // If the new isr doesn't have the current leader, we need to try to elect a new
+ // one. Note: this also handles the case where the current leader is NO_LEADER,
+ // since that value cannot appear in targetIsr.
+ if (!targetIsr.contains(partition.leader)) return true;
+
+ // Check if we want to try to get away from a non-preferred leader.
+ if (alwaysElectPreferredIfPossible && !partition.hasPreferredLeader()) return true;
+
+ return false;
+ }
+
+ class BestLeader {
+ final int node;
+ final boolean unclean;
+
+ BestLeader() {
+ for (int replica : targetReplicas) {
+ if (targetIsr.contains(replica) && isAcceptableLeader.apply(replica)) {
+ this.node = replica;
+ this.unclean = false;
+ return;
+ }
+ }
+ if (uncleanElectionOk.get()) {
+ for (int replica : targetReplicas) {
+ if (isAcceptableLeader.apply(replica)) {
+ this.node = replica;
+ this.unclean = true;
+ return;
+ }
+ }
+ }
+ this.node = NO_LEADER;
+ this.unclean = false;
+ }
+ }
+
+ private void tryElection(PartitionChangeRecord record) {
+ BestLeader bestLeader = new BestLeader();
+ if (bestLeader.node != partition.leader) {
+ record.setLeader(bestLeader.node);
+ if (bestLeader.unclean) {
+ // If the election was unclean, we have to forcibly set the ISR to just the
+ // new leader. This can result in data loss!
+ record.setIsr(Collections.singletonList(bestLeader.node));
+ }
+ }
+ }
+
+ /**
+ * Trigger a leader epoch bump if one is needed.
+ *
+ * We need to bump the leader epoch if:
+ * 1. The leader changed, or
+ * 2. The new ISR does not contain all the nodes that the old ISR did, or
+ * 3. The new replia list does not contain all the nodes that the old replia list did.
+ *
+ * Changes that do NOT fall in any of these categories will increase the partition epoch, but
+ * not the leader epoch. Note that if the leader epoch increases, the partition epoch will
+ * always increase as well; there is no case where the partition epoch increases more slowly
+ * than the leader epoch.
+ *
+ * If the PartitionChangeRecord sets the leader field to something other than
+ * NO_LEADER_CHANGE, a leader epoch bump will automatically occur. That takes care of
+ * case 1. In this function, we check for cases 2 and 3, and handle them by manually
+ * setting record.leader to the current leader.
+ */
+ void triggerLeaderEpochBumpIfNeeded(PartitionChangeRecord record) {
+ if (record.leader() == NO_LEADER_CHANGE) {
+ if (!Replicas.contains(targetIsr, partition.isr) ||
+ !Replicas.contains(targetReplicas, partition.replicas)) {
+ record.setLeader(partition.leader);
+ }
+ }
+ }
+
+ private void completeReassignmentIfNeeded() {
+ // Check if there is a reassignment to complete.
+ if (targetRemoving.isEmpty() && targetAdding.isEmpty()) return;
+
+ List newTargetIsr = targetIsr;
+ List newTargetReplicas = targetReplicas;
+ if (!targetRemoving.isEmpty()) {
+ newTargetIsr = new ArrayList<>(targetIsr.size());
+ for (int replica : targetIsr) {
+ if (!targetRemoving.contains(replica)) {
+ newTargetIsr.add(replica);
+ }
+ }
+ if (newTargetIsr.isEmpty()) return;
+ newTargetReplicas = new ArrayList<>(targetReplicas.size());
+ for (int replica : targetReplicas) {
+ if (!targetRemoving.contains(replica)) {
+ newTargetReplicas.add(replica);
+ }
+ }
+ if (newTargetReplicas.isEmpty()) return;
+ }
+ for (int replica : targetAdding) {
+ if (!newTargetIsr.contains(replica)) return;
+ }
+ targetIsr = newTargetIsr;
+ targetReplicas = newTargetReplicas;
+ targetRemoving = Collections.emptyList();
+ targetAdding = Collections.emptyList();
+ }
+
+ public Optional build() {
+ PartitionChangeRecord record = new PartitionChangeRecord().
+ setTopicId(topicId).
+ setPartitionId(partitionId);
+
+ completeReassignmentIfNeeded();
+
+ if (shouldTryElection()) {
+ tryElection(record);
+ }
+
+ triggerLeaderEpochBumpIfNeeded(record);
+
+ if (!targetIsr.isEmpty() && !targetIsr.equals(Replicas.toList(partition.isr))) {
+ record.setIsr(targetIsr);
+ }
+ if (!targetReplicas.isEmpty() && !targetReplicas.equals(Replicas.toList(partition.replicas))) {
+ record.setReplicas(targetReplicas);
+ }
+ if (!targetRemoving.equals(Replicas.toList(partition.removingReplicas))) {
+ record.setRemovingReplicas(targetRemoving);
+ }
+ if (!targetAdding.equals(Replicas.toList(partition.addingReplicas))) {
+ record.setAddingReplicas(targetAdding);
+ }
+ if (changeRecordIsNoOp(record)) {
+ return Optional.empty();
+ } else {
+ return Optional.of(new ApiMessageAndVersion(record,
+ PARTITION_CHANGE_RECORD.highestSupportedVersion()));
+ }
+ }
+}
diff --git a/metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentReplicas.java b/metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentReplicas.java
new file mode 100644
index 00000000000..96ae4085632
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentReplicas.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.ArrayList;
+import java.util.Set;
+import java.util.List;
+import java.util.Objects;
+import java.util.TreeSet;
+
+
+class PartitionReassignmentReplicas {
+ private final List removing;
+ private final List adding;
+ private final List merged;
+
+ private static Set calculateDifference(List a, List b) {
+ Set result = new TreeSet<>(a);
+ result.removeAll(b);
+ return result;
+ }
+
+ PartitionReassignmentReplicas(List currentReplicas,
+ List targetReplicas) {
+ Set removing = calculateDifference(currentReplicas, targetReplicas);
+ this.removing = new ArrayList<>(removing);
+ Set adding = calculateDifference(targetReplicas, currentReplicas);
+ this.adding = new ArrayList<>(adding);
+ this.merged = new ArrayList<>(targetReplicas);
+ this.merged.addAll(removing);
+ }
+
+ List removing() {
+ return removing;
+ }
+
+ List adding() {
+ return adding;
+ }
+
+ List merged() {
+ return merged;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(removing, adding, merged);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof PartitionReassignmentReplicas)) return false;
+ PartitionReassignmentReplicas other = (PartitionReassignmentReplicas) o;
+ return removing.equals(other.removing) &&
+ adding.equals(other.adding) &&
+ merged.equals(other.merged);
+ }
+
+ @Override
+ public String toString() {
+ return "PartitionReassignmentReplicas(" +
+ "removing=" + removing + ", " +
+ "adding=" + adding + ", " +
+ "merged=" + merged + ")";
+ }
+}
diff --git a/metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentRevert.java b/metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentRevert.java
new file mode 100644
index 00000000000..3cf6dbb4562
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentRevert.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
+import org.apache.kafka.metadata.PartitionRegistration;
+import org.apache.kafka.metadata.Replicas;
+
+import java.util.ArrayList;
+import java.util.Set;
+import java.util.List;
+import java.util.Objects;
+
+
+class PartitionReassignmentRevert {
+ private final List replicas;
+ private final List isr;
+ private final boolean unclean;
+
+ PartitionReassignmentRevert(PartitionRegistration registration) {
+ // Figure out the replica list and ISR that we will have after reverting the
+ // reassignment. In general, we want to take out any replica that the reassignment
+ // was adding, but keep the ones the reassignment was removing. (But see the
+ // special case below.)
+ Set adding = Replicas.toSet(registration.addingReplicas);
+ this.replicas = new ArrayList<>(registration.replicas.length);
+ this.isr = new ArrayList<>(registration.isr.length);
+ for (int i = 0; i < registration.isr.length; i++) {
+ int replica = registration.isr[i];
+ if (!adding.contains(replica)) {
+ this.isr.add(replica);
+ }
+ }
+ for (int replica : registration.replicas) {
+ if (!adding.contains(replica)) {
+ this.replicas.add(replica);
+ }
+ }
+ if (isr.isEmpty()) {
+ // In the special case that all the replicas that are in the ISR are also
+ // contained in addingReplicas, we choose the first remaining replica and add
+ // it to the ISR. This is considered an unclean leader election. Therefore,
+ // calling code must check that unclean leader election is enabled before
+ // accepting the new ISR.
+ if (this.replicas.isEmpty()) {
+ // This should not be reachable, since it would require a partition
+ // starting with an empty replica set prior to the reassignment we are
+ // trying to revert.
+ throw new InvalidReplicaAssignmentException("Invalid replica " +
+ "assignment: addingReplicas contains all replicas.");
+ }
+ isr.add(replicas.get(0));
+ this.unclean = true;
+ } else {
+ this.unclean = false;
+ }
+ }
+
+ List replicas() {
+ return replicas;
+ }
+
+ List isr() {
+ return isr;
+ }
+
+ boolean unclean() {
+ return unclean;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(replicas, isr);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof PartitionReassignmentRevert)) return false;
+ PartitionReassignmentRevert other = (PartitionReassignmentRevert) o;
+ return replicas.equals(other.replicas) &&
+ isr.equals(other.isr);
+ }
+
+ @Override
+ public String toString() {
+ return "PartitionReassignmentRevert(" +
+ "replicas=" + replicas + ", " +
+ "isr=" + isr + ")";
+ }
+}
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 15243084752..4c6d7125721 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -1187,19 +1187,19 @@ public final class QuorumController implements Controller {
}
return appendWriteEvent("alterPartitionReassignments",
time.nanoseconds() + NANOSECONDS.convert(request.timeoutMs(), MILLISECONDS),
- () -> {
- throw new UnsupportedOperationException();
- });
+ () -> replicationControl.alterPartitionReassignments(request));
}
@Override
public CompletableFuture
listPartitionReassignments(ListPartitionReassignmentsRequestData request) {
+ if (request.topics() != null && request.topics().isEmpty()) {
+ return CompletableFuture.completedFuture(
+ new ListPartitionReassignmentsResponseData().setErrorMessage(null));
+ }
return appendReadEvent("listPartitionReassignments",
time.nanoseconds() + NANOSECONDS.convert(request.timeoutMs(), MILLISECONDS),
- () -> {
- throw new UnsupportedOperationException();
- });
+ () -> replicationControl.listPartitionReassignments(request.topics()));
}
@Override
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index 1aa133e8b60..d9d3e9d8ca0 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -28,12 +28,19 @@ import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
import org.apache.kafka.common.errors.InvalidReplicationFactorException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.NoReassignmentInProgressException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicIdException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.message.AlterIsrRequestData;
import org.apache.kafka.common.message.AlterIsrResponseData;
+import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
+import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData.ReassignablePartition;
+import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData.ReassignableTopic;
+import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
+import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse;
+import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignableTopicResponse;
import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsAssignment;
import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic;
@@ -49,6 +56,10 @@ import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions;
import org.apache.kafka.common.message.ElectLeadersResponseData;
import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
+import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData.ListPartitionReassignmentsTopics;
+import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
+import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingPartitionReassignment;
+import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingTopicReassignment;
import org.apache.kafka.common.metadata.FenceBrokerRecord;
import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
@@ -72,9 +83,9 @@ import org.slf4j.Logger;
import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.Optional;
import java.util.function.Function;
import java.util.HashMap;
import java.util.Iterator;
@@ -89,13 +100,15 @@ import java.util.stream.Collectors;
import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
import static org.apache.kafka.common.metadata.MetadataRecordType.FENCE_BROKER_RECORD;
-import static org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_CHANGE_RECORD;
import static org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_RECORD;
import static org.apache.kafka.common.metadata.MetadataRecordType.REMOVE_TOPIC_RECORD;
import static org.apache.kafka.common.metadata.MetadataRecordType.TOPIC_RECORD;
import static org.apache.kafka.common.metadata.MetadataRecordType.UNFENCE_BROKER_RECORD;
import static org.apache.kafka.common.metadata.MetadataRecordType.UNREGISTER_BROKER_RECORD;
+import static org.apache.kafka.common.protocol.Errors.FENCED_LEADER_EPOCH;
import static org.apache.kafka.common.protocol.Errors.INVALID_REQUEST;
+import static org.apache.kafka.common.protocol.Errors.INVALID_UPDATE_VERSION;
+import static org.apache.kafka.common.protocol.Errors.NO_REASSIGNMENT_IN_PROGRESS;
import static org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_ID;
import static org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_OR_PARTITION;
import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
@@ -176,6 +189,11 @@ public class ReplicationControlManager {
*/
private final BrokersToIsrs brokersToIsrs;
+ /**
+ * A map from topic IDs to the partitions in the topic which are reassigning.
+ */
+ private final TimelineHashMap reassigningTopics;
+
ReplicationControlManager(SnapshotRegistry snapshotRegistry,
LogContext logContext,
short defaultReplicationFactor,
@@ -195,6 +213,7 @@ public class ReplicationControlManager {
this.topicsByName = new TimelineHashMap<>(snapshotRegistry, 0);
this.topics = new TimelineHashMap<>(snapshotRegistry, 0);
this.brokersToIsrs = new BrokersToIsrs(snapshotRegistry);
+ this.reassigningTopics = new TimelineHashMap<>(snapshotRegistry, 0);
}
public void replay(TopicRecord record) {
@@ -222,11 +241,15 @@ public class ReplicationControlManager {
newPartInfo.isr, NO_LEADER, newPartInfo.leader);
globalPartitionCount.increment();
controllerMetrics.setGlobalPartitionCount(globalPartitionCount.get());
+ updateReassigningTopicsIfNeeded(record.topicId(), record.partitionId(),
+ false, newPartInfo.isReassigning());
} else if (!newPartInfo.equals(prevPartInfo)) {
newPartInfo.maybeLogPartitionChange(log, description, prevPartInfo);
topicInfo.parts.put(record.partitionId(), newPartInfo);
brokersToIsrs.update(record.topicId(), record.partitionId(), prevPartInfo.isr,
newPartInfo.isr, prevPartInfo.leader, newPartInfo.leader);
+ updateReassigningTopicsIfNeeded(record.topicId(), record.partitionId(),
+ prevPartInfo.isReassigning(), newPartInfo.isReassigning());
}
if (newPartInfo.leader != newPartInfo.preferredReplica()) {
preferredReplicaImbalanceCount.increment();
@@ -235,6 +258,24 @@ public class ReplicationControlManager {
controllerMetrics.setPreferredReplicaImbalanceCount(preferredReplicaImbalanceCount.get());
}
+ private void updateReassigningTopicsIfNeeded(Uuid topicId, int partitionId,
+ boolean wasReassigning, boolean isReassigning) {
+ if (!wasReassigning) {
+ if (isReassigning) {
+ int[] prevReassigningParts = reassigningTopics.getOrDefault(topicId, Replicas.NONE);
+ reassigningTopics.put(topicId, Replicas.copyWith(prevReassigningParts, partitionId));
+ }
+ } else if (!isReassigning) {
+ int[] prevReassigningParts = reassigningTopics.getOrDefault(topicId, Replicas.NONE);
+ int[] newReassigningParts = Replicas.copyWithout(prevReassigningParts, partitionId);
+ if (newReassigningParts.length == 0) {
+ reassigningTopics.remove(topicId);
+ } else {
+ reassigningTopics.put(topicId, newReassigningParts);
+ }
+ }
+ }
+
public void replay(PartitionChangeRecord record) {
TopicControlInfo topicInfo = topics.get(record.topicId());
if (topicInfo == null) {
@@ -247,6 +288,8 @@ public class ReplicationControlManager {
":" + record.partitionId() + ", but no partition with that id was found.");
}
PartitionRegistration newPartitionInfo = prevPartitionInfo.merge(record);
+ updateReassigningTopicsIfNeeded(record.topicId(), record.partitionId(),
+ prevPartitionInfo.isReassigning(), newPartitionInfo.isReassigning());
topicInfo.parts.put(record.partitionId(), newPartitionInfo);
brokersToIsrs.update(record.topicId(), record.partitionId(),
prevPartitionInfo.isr, newPartitionInfo.isr, prevPartitionInfo.leader,
@@ -259,6 +302,11 @@ public class ReplicationControlManager {
}
controllerMetrics.setOfflinePartitionCount(brokersToIsrs.offlinePartitionCount());
controllerMetrics.setPreferredReplicaImbalanceCount(preferredReplicaImbalanceCount.get());
+ if (record.removingReplicas() != null || record.addingReplicas() != null) {
+ log.info("Replayed partition assignment change {} for topic {}", record, topicInfo.name);
+ } else if (log.isTraceEnabled()) {
+ log.trace("Replayed partition change {} for topic {}", record, topicInfo.name);
+ }
}
public void replay(RemoveTopicRecord record) {
@@ -269,6 +317,7 @@ public class ReplicationControlManager {
" to remove.");
}
topicsByName.remove(topic.name);
+ reassigningTopics.remove(record.topicId());
// Delete the configurations associated with this topic.
configurationControl.deleteTopicConfigs(topic.name);
@@ -557,60 +606,122 @@ public class ReplicationControlManager {
setPartitionIndex(partitionData.partitionIndex()).
setErrorCode(UNKNOWN_TOPIC_OR_PARTITION.code()));
}
+ log.info("Rejecting alterIsr request for unknown topic ID {}.", topicId);
continue;
}
TopicControlInfo topic = topics.get(topicId);
for (AlterIsrRequestData.PartitionData partitionData : topicData.partitions()) {
- PartitionRegistration partition = topic.parts.get(partitionData.partitionIndex());
+ int partitionId = partitionData.partitionIndex();
+ PartitionRegistration partition = topic.parts.get(partitionId);
if (partition == null) {
responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData().
- setPartitionIndex(partitionData.partitionIndex()).
+ setPartitionIndex(partitionId).
setErrorCode(UNKNOWN_TOPIC_OR_PARTITION.code()));
- continue;
- }
- if (request.brokerId() != partition.leader) {
- responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData().
- setPartitionIndex(partitionData.partitionIndex()).
- setErrorCode(INVALID_REQUEST.code()));
+ log.info("Rejecting alterIsr request for unknown partition {}-{}.",
+ topic.name, partitionId);
continue;
}
if (partitionData.leaderEpoch() != partition.leaderEpoch) {
responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData().
- setPartitionIndex(partitionData.partitionIndex()).
- setErrorCode(Errors.FENCED_LEADER_EPOCH.code()));
+ setPartitionIndex(partitionId).
+ setErrorCode(FENCED_LEADER_EPOCH.code()));
+ log.debug("Rejecting alterIsr request from node {} for {}-{} because " +
+ "the current leader epoch is {}, not {}.", request.brokerId(), topic.name,
+ partitionId, partition.leaderEpoch, partitionData.leaderEpoch());
+ continue;
+ }
+ if (request.brokerId() != partition.leader) {
+ responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData().
+ setPartitionIndex(partitionId).
+ setErrorCode(INVALID_REQUEST.code()));
+ log.info("Rejecting alterIsr request from node {} for {}-{} because " +
+ "the current leader is {}.", request.brokerId(), topic.name,
+ partitionId, partition.leader);
continue;
}
if (partitionData.currentIsrVersion() != partition.partitionEpoch) {
responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData().
- setPartitionIndex(partitionData.partitionIndex()).
- setErrorCode(Errors.INVALID_UPDATE_VERSION.code()));
+ setPartitionIndex(partitionId).
+ setErrorCode(INVALID_UPDATE_VERSION.code()));
+ log.info("Rejecting alterIsr request from node {} for {}-{} because " +
+ "the current partition epoch is {}, not {}.", request.brokerId(),
+ topic.name, partitionId, partition.partitionEpoch,
+ partitionData.currentIsrVersion());
continue;
}
int[] newIsr = Replicas.toArray(partitionData.newIsr());
if (!Replicas.validateIsr(partition.replicas, newIsr)) {
responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData().
- setPartitionIndex(partitionData.partitionIndex()).
+ setPartitionIndex(partitionId).
setErrorCode(INVALID_REQUEST.code()));
+ log.error("Rejecting alterIsr request from node {} for {}-{} because " +
+ "it specified an invalid ISR {}.", request.brokerId(),
+ topic.name, partitionId, partitionData.newIsr());
continue;
}
if (!Replicas.contains(newIsr, partition.leader)) {
- // An alterIsr request can't remove the current leader.
+ // An alterIsr request can't ask for the current leader to be removed.
responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData().
- setPartitionIndex(partitionData.partitionIndex()).
+ setPartitionIndex(partitionId).
setErrorCode(INVALID_REQUEST.code()));
+ log.error("Rejecting alterIsr request from node {} for {}-{} because " +
+ "it specified an invalid ISR {} that doesn't include itself.",
+ request.brokerId(), topic.name, partitionId, partitionData.newIsr());
continue;
}
- records.add(new ApiMessageAndVersion(new PartitionChangeRecord().
- setPartitionId(partitionData.partitionIndex()).
- setTopicId(topic.id).
- setIsr(partitionData.newIsr()), PARTITION_CHANGE_RECORD.highestSupportedVersion()));
+ // At this point, we have decided to perform the ISR change. We use
+ // PartitionChangeBuilder to find out what its effect will be.
+ PartitionChangeBuilder builder = new PartitionChangeBuilder(partition,
+ topic.id,
+ partitionId,
+ r -> clusterControl.unfenced(r),
+ () -> configurationControl.uncleanLeaderElectionEnabledForTopic(topicData.name()));
+ builder.setTargetIsr(partitionData.newIsr());
+ Optional record = builder.build();
+ Errors result = Errors.NONE;
+ if (record.isPresent()) {
+ records.add(record.get());
+ PartitionChangeRecord change = (PartitionChangeRecord) record.get().message();
+ partition = partition.merge(change);
+ if (log.isDebugEnabled()) {
+ log.debug("Node {} has altered ISR for {}-{} to {}.",
+ request.brokerId(), topic.name, partitionId, change.isr());
+ }
+ if (change.leader() != request.brokerId() &&
+ change.leader() != NO_LEADER_CHANGE) {
+ // Normally, an alterIsr request, which is made by the partition
+ // leader itself, is not allowed to modify the partition leader.
+ // However, if there is an ongoing partition reassignment and the
+ // ISR change completes it, then the leader may change as part of
+ // the changes made during reassignment cleanup.
+ //
+ // In this case, we report back FENCED_LEADER_EPOCH to the leader
+ // which made the alterIsr request. This lets it know that it must
+ // fetch new metadata before trying again. This return code is
+ // unusual because we both return an error and generate a new
+ // metadata record. We usually only do one or the other.
+ log.info("AlterIsr request from node {} for {}-{} completed " +
+ "the ongoing partition reassignment and triggered a " +
+ "leadership change. Reutrning FENCED_LEADER_EPOCH.",
+ request.brokerId(), topic.name, partitionId);
+ responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData().
+ setPartitionIndex(partitionId).
+ setErrorCode(FENCED_LEADER_EPOCH.code()));
+ continue;
+ } else if (change.removingReplicas() != null ||
+ change.addingReplicas() != null) {
+ log.info("AlterIsr request from node {} for {}-{} completed " +
+ "the ongoing partition reassignment.", request.brokerId(),
+ topic.name, partitionId);
+ }
+ }
responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData().
- setPartitionIndex(partitionData.partitionIndex()).
- setErrorCode(Errors.NONE.code()).
+ setPartitionIndex(partitionId).
+ setErrorCode(result.code()).
setLeaderId(partition.leader).
setLeaderEpoch(partition.leaderEpoch).
- setCurrentIsrVersion(partition.partitionEpoch + 1).
- setIsr(partitionData.newIsr()));
+ setCurrentIsrVersion(partition.partitionEpoch).
+ setIsr(Replicas.toList(partition.isr)));
}
}
return ControllerResult.of(records, response);
@@ -740,39 +851,29 @@ public class ReplicationControlManager {
return new ApiError(UNKNOWN_TOPIC_OR_PARTITION,
"No such topic id as " + topicId);
}
- PartitionRegistration partitionInfo = topicInfo.parts.get(partitionId);
- if (partitionInfo == null) {
+ PartitionRegistration partition = topicInfo.parts.get(partitionId);
+ if (partition == null) {
return new ApiError(UNKNOWN_TOPIC_OR_PARTITION,
"No such partition as " + topic + "-" + partitionId);
}
- int newLeader = bestLeader(partitionInfo.replicas, partitionInfo.isr, uncleanOk,
- r -> clusterControl.unfenced(r));
- if (newLeader == NO_LEADER) {
- // If we can't find any leader for the partition, return an error.
- return new ApiError(Errors.LEADER_NOT_AVAILABLE,
- "Unable to find any leader for the partition.");
+ PartitionChangeBuilder builder = new PartitionChangeBuilder(partition,
+ topicId,
+ partitionId,
+ r -> clusterControl.unfenced(r),
+ () -> uncleanOk || configurationControl.uncleanLeaderElectionEnabledForTopic(topic));
+ builder.setAlwaysElectPreferredIfPossible(true);
+ Optional record = builder.build();
+ if (!record.isPresent()) {
+ if (partition.leader == NO_LEADER) {
+ // If we can't find any leader for the partition, return an error.
+ return new ApiError(Errors.LEADER_NOT_AVAILABLE,
+ "Unable to find any leader for the partition.");
+ } else {
+ // There is nothing to do.
+ return ApiError.NONE;
+ }
}
- if (newLeader == partitionInfo.leader) {
- // If the new leader we picked is the same as the current leader, there is
- // nothing to do.
- return ApiError.NONE;
- }
- if (partitionInfo.hasLeader() && newLeader != partitionInfo.preferredReplica()) {
- // It is not worth moving away from a valid leader to a new leader unless the
- // new leader is the preferred replica.
- return ApiError.NONE;
- }
- PartitionChangeRecord record = new PartitionChangeRecord().
- setPartitionId(partitionId).
- setTopicId(topicId).
- setLeader(newLeader);
- if (!PartitionRegistration.electionWasClean(newLeader, partitionInfo.isr)) {
- // If the election was unclean, we have to forcibly set the ISR to just the
- // new leader. This can result in data loss!
- record.setIsr(Collections.singletonList(newLeader));
- }
- records.add(new ApiMessageAndVersion(record,
- PARTITION_CHANGE_RECORD.highestSupportedVersion()));
+ records.add(record.get());
return ApiError.NONE;
}
@@ -813,25 +914,6 @@ public class ReplicationControlManager {
return ControllerResult.of(records, reply);
}
- static boolean isGoodLeader(int[] isr, int leader) {
- return Replicas.contains(isr, leader);
- }
-
- static int bestLeader(int[] replicas, int[] isr, boolean uncleanOk,
- Function isAcceptableLeader) {
- int bestUnclean = NO_LEADER;
- for (int i = 0; i < replicas.length; i++) {
- int replica = replicas[i];
- if (isAcceptableLeader.apply(replica)) {
- if (bestUnclean == NO_LEADER) bestUnclean = replica;
- if (Replicas.contains(isr, replica)) {
- return replica;
- }
- }
- }
- return uncleanOk ? bestUnclean : NO_LEADER;
- }
-
public ControllerResult unregisterBroker(int brokerId) {
BrokerRegistration registration = clusterControl.brokerRegistrations().get(brokerId);
if (registration == null) {
@@ -1008,8 +1090,22 @@ public class ReplicationControlManager {
List records,
Iterator iterator) {
int oldSize = records.size();
+
+ // If the caller passed a valid broker ID for brokerToAdd, rather than passing
+ // NO_LEADER, that node will be considered an acceptable leader even if it is
+ // currently fenced. This is useful when handling unfencing. The reason is that
+ // while we're generating the records to handle unfencing, the ClusterControlManager
+ // still shows the node as fenced.
+ //
+ // Similarly, if the caller passed a valid broker ID for brokerToRemove, rather
+ // than passing NO_LEADER, that node will never be considered an acceptable leader.
+ // This is useful when handling a newly fenced node. We also exclude brokerToRemove
+ // from the target ISR, but we need to exclude it here too, to handle the case
+ // where there is an unclean leader election which chooses a leader from outside
+ // the ISR.
Function isAcceptableLeader =
r -> (r != brokerToRemove) && (r == brokerToAdd || clusterControl.unfenced(r));
+
while (iterator.hasNext()) {
TopicIdPartition topicIdPart = iterator.next();
TopicControlInfo topic = topics.get(topicIdPart.topicId());
@@ -1022,32 +1118,18 @@ public class ReplicationControlManager {
throw new RuntimeException("Partition " + topicIdPart +
" existed in isrMembers, but not in the partitions map.");
}
- int[] newIsr = Replicas.copyWithout(partition.isr, brokerToRemove);
- int newLeader;
- if (isGoodLeader(newIsr, partition.leader)) {
- // If the current leader is good, don't change.
- newLeader = partition.leader;
- } else {
- // Choose a new leader.
- boolean uncleanOk = configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name);
- newLeader = bestLeader(partition.replicas, newIsr, uncleanOk, isAcceptableLeader);
- }
- if (!PartitionRegistration.electionWasClean(newLeader, newIsr)) {
- // After an unclean leader election, the ISR is reset to just the new leader.
- newIsr = new int[] {newLeader};
- } else if (newIsr.length == 0) {
- // We never want to shrink the ISR to size 0.
- newIsr = partition.isr;
- }
- PartitionChangeRecord record = new PartitionChangeRecord().
- setPartitionId(topicIdPart.partitionId()).
- setTopicId(topic.id);
- if (newLeader != partition.leader) record.setLeader(newLeader);
- if (!Arrays.equals(newIsr, partition.isr)) record.setIsr(Replicas.toList(newIsr));
- if (record.leader() != NO_LEADER_CHANGE || record.isr() != null) {
- records.add(new ApiMessageAndVersion(record,
- PARTITION_CHANGE_RECORD.highestSupportedVersion()));
- }
+ PartitionChangeBuilder builder = new PartitionChangeBuilder(partition,
+ topicIdPart.topicId(),
+ topicIdPart.partitionId(),
+ isAcceptableLeader,
+ () -> configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name));
+
+ // Note: if brokerToRemove was passed as NO_LEADER, this is a no-op (the new
+ // target ISR will be the same as the old one).
+ builder.setTargetIsr(Replicas.toList(
+ Replicas.copyWithout(partition.isr, brokerToRemove)));
+
+ builder.build().ifPresent(records::add);
}
if (records.size() != oldSize) {
if (log.isDebugEnabled()) {
@@ -1068,6 +1150,200 @@ public class ReplicationControlManager {
}
}
+ ControllerResult
+ alterPartitionReassignments(AlterPartitionReassignmentsRequestData request) {
+ List records = new ArrayList<>();
+ AlterPartitionReassignmentsResponseData result =
+ new AlterPartitionReassignmentsResponseData().setErrorMessage(null);
+ int successfulAlterations = 0, totalAlterations = 0;
+ for (ReassignableTopic topic : request.topics()) {
+ ReassignableTopicResponse topicResponse = new ReassignableTopicResponse().
+ setName(topic.name());
+ for (ReassignablePartition partition : topic.partitions()) {
+ ApiError error = ApiError.NONE;
+ try {
+ alterPartitionReassignment(topic.name(), partition, records);
+ successfulAlterations++;
+ } catch (Throwable e) {
+ log.info("Unable to alter partition reassignment for " +
+ topic.name() + ":" + partition.partitionIndex() + " because " +
+ "of an " + e.getClass().getSimpleName() + " error: " + e.getMessage());
+ error = ApiError.fromThrowable(e);
+ }
+ totalAlterations++;
+ topicResponse.partitions().add(new ReassignablePartitionResponse().
+ setPartitionIndex(partition.partitionIndex()).
+ setErrorCode(error.error().code()).
+ setErrorMessage(error.message()));
+ }
+ result.responses().add(topicResponse);
+ }
+ log.info("Successfully altered {} out of {} partition reassignment(s).",
+ successfulAlterations, totalAlterations);
+ return ControllerResult.atomicOf(records, result);
+ }
+
+ void alterPartitionReassignment(String topicName,
+ ReassignablePartition target,
+ List records) {
+ Uuid topicId = topicsByName.get(topicName);
+ if (topicId == null) {
+ throw new UnknownTopicOrPartitionException("Unable to find a topic " +
+ "named " + topicName + ".");
+ }
+ TopicControlInfo topicInfo = topics.get(topicId);
+ if (topicInfo == null) {
+ throw new UnknownTopicOrPartitionException("Unable to find a topic " +
+ "with ID " + topicId + ".");
+ }
+ TopicIdPartition tp = new TopicIdPartition(topicId, target.partitionIndex());
+ PartitionRegistration part = topicInfo.parts.get(target.partitionIndex());
+ if (part == null) {
+ throw new UnknownTopicOrPartitionException("Unable to find partition " +
+ topicName + ":" + target.partitionIndex() + ".");
+ }
+ Optional record;
+ if (target.replicas() == null) {
+ record = cancelPartitionReassignment(topicName, tp, part);
+ } else {
+ record = changePartitionReassignment(tp, part, target);
+ }
+ record.ifPresent(records::add);
+ }
+
+ Optional cancelPartitionReassignment(String topicName,
+ TopicIdPartition tp,
+ PartitionRegistration part) {
+ if (!part.isReassigning()) {
+ throw new NoReassignmentInProgressException(NO_REASSIGNMENT_IN_PROGRESS.message());
+ }
+ PartitionReassignmentRevert revert = new PartitionReassignmentRevert(part);
+ if (revert.unclean()) {
+ if (!configurationControl.uncleanLeaderElectionEnabledForTopic(topicName)) {
+ throw new InvalidReplicaAssignmentException("Unable to revert partition " +
+ "assignment for " + topicName + ":" + tp.partitionId() + " because " +
+ "it would require an unclean leader election.");
+ }
+ }
+ PartitionChangeBuilder builder = new PartitionChangeBuilder(part,
+ tp.topicId(),
+ tp.partitionId(),
+ r -> clusterControl.unfenced(r),
+ () -> configurationControl.uncleanLeaderElectionEnabledForTopic(topicName));
+ builder.setTargetIsr(revert.isr()).
+ setTargetReplicas(revert.replicas()).
+ setTargetRemoving(Collections.emptyList()).
+ setTargetAdding(Collections.emptyList());
+ return builder.build();
+ }
+
+ /**
+ * Apply a given partition reassignment. In general a partition reassignment goes
+ * through several stages:
+ *
+ * 1. Issue a PartitionChangeRecord adding all the new replicas to the partition's
+ * main replica list, and setting removingReplicas and addingReplicas.
+ *
+ * 2. Wait for the partition to have an ISR that contains all the new replicas. Or
+ * if there are no new replicas, wait until we have an ISR that contains at least one
+ * replica that we are not removing.
+ *
+ * 3. Issue a second PartitionChangeRecord removing all removingReplicas from the
+ * partitions' main replica list, and clearing removingReplicas and addingReplicas.
+ *
+ * After stage 3, the reassignment is done.
+ *
+ * Under some conditions, steps #1 and #2 can be skipped entirely since the ISR is
+ * already suitable to progress to stage #3. For example, a partition reassignment
+ * that merely rearranges existing replicas in the list can bypass step #1 and #2 and
+ * complete immediately.
+ *
+ * @param tp The topic id and partition id.
+ * @param part The existing partition info.
+ * @param target The target partition info.
+ *
+ * @return The ChangePartitionRecord for the new partition assignment,
+ * or empty if no change is needed.
+ */
+ Optional changePartitionReassignment(TopicIdPartition tp,
+ PartitionRegistration part,
+ ReassignablePartition target) {
+ // Check that the requested partition assignment is valid.
+ validateManualPartitionAssignment(target.replicas(), OptionalInt.empty());
+
+ List currentReplicas = Replicas.toList(part.replicas);
+ PartitionReassignmentReplicas reassignment =
+ new PartitionReassignmentReplicas(currentReplicas, target.replicas());
+ PartitionChangeBuilder builder = new PartitionChangeBuilder(part,
+ tp.topicId(),
+ tp.partitionId(),
+ r -> clusterControl.unfenced(r),
+ () -> false);
+ if (!reassignment.merged().equals(currentReplicas)) {
+ builder.setTargetReplicas(reassignment.merged());
+ }
+ if (!reassignment.removing().isEmpty()) {
+ builder.setTargetRemoving(reassignment.removing());
+ }
+ if (!reassignment.adding().isEmpty()) {
+ builder.setTargetAdding(reassignment.adding());
+ }
+ return builder.build();
+ }
+
+ ListPartitionReassignmentsResponseData listPartitionReassignments(
+ List topicList) {
+ ListPartitionReassignmentsResponseData response =
+ new ListPartitionReassignmentsResponseData().setErrorMessage(null);
+ if (topicList == null) {
+ // List all reassigning topics.
+ for (Entry entry : reassigningTopics.entrySet()) {
+ listReassigningTopic(response, entry.getKey(), Replicas.toList(entry.getValue()));
+ }
+ } else {
+ // List the given topics.
+ for (ListPartitionReassignmentsTopics topic : topicList) {
+ Uuid topicId = topicsByName.get(topic.name());
+ if (topicId != null) {
+ listReassigningTopic(response, topicId, topic.partitionIndexes());
+ }
+ }
+ }
+ return response;
+ }
+
+ private void listReassigningTopic(ListPartitionReassignmentsResponseData response,
+ Uuid topicId,
+ List partitionIds) {
+ TopicControlInfo topicInfo = topics.get(topicId);
+ if (topicInfo == null) return;
+ OngoingTopicReassignment ongoingTopic = new OngoingTopicReassignment().
+ setName(topicInfo.name);
+ for (int partitionId : partitionIds) {
+ Optional ongoing =
+ getOngoingPartitionReassignment(topicInfo, partitionId);
+ if (ongoing.isPresent()) {
+ ongoingTopic.partitions().add(ongoing.get());
+ }
+ }
+ if (!ongoingTopic.partitions().isEmpty()) {
+ response.topics().add(ongoingTopic);
+ }
+ }
+
+ private Optional
+ getOngoingPartitionReassignment(TopicControlInfo topicInfo, int partitionId) {
+ PartitionRegistration partition = topicInfo.parts.get(partitionId);
+ if (partition == null || !partition.isReassigning()) {
+ return Optional.empty();
+ }
+ return Optional.of(new OngoingPartitionReassignment().
+ setAddingReplicas(Replicas.toList(partition.addingReplicas)).
+ setRemovingReplicas(Replicas.toList(partition.removingReplicas)).
+ setPartitionIndex(partitionId).
+ setReplicas(Replicas.toList(partition.replicas)));
+ }
+
class ReplicationControlIterator implements Iterator> {
private final long epoch;
private final Iterator iterator;
diff --git a/metadata/src/main/java/org/apache/kafka/image/TopicDelta.java b/metadata/src/main/java/org/apache/kafka/image/TopicDelta.java
index e7530de3841..87e4201a212 100644
--- a/metadata/src/main/java/org/apache/kafka/image/TopicDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/TopicDelta.java
@@ -93,7 +93,7 @@ public final class TopicDelta {
}
/**
- * Find the partitions that we are now leading, that we were not leading before.
+ * Find the partitions that we are now leading, whose partition epoch has changed.
*
* @param brokerId The broker id.
* @return A list of (partition ID, partition registration) entries.
@@ -103,7 +103,8 @@ public final class TopicDelta {
for (Entry entry : partitionChanges.entrySet()) {
if (entry.getValue().leader == brokerId) {
PartitionRegistration prevPartition = image.partitions().get(entry.getKey());
- if (prevPartition == null || prevPartition.leader != brokerId) {
+ if (prevPartition == null ||
+ prevPartition.partitionEpoch != entry.getValue().partitionEpoch) {
results.add(entry);
}
}
@@ -112,7 +113,7 @@ public final class TopicDelta {
}
/**
- * Find the partitions that we are now following, that we were not following before.
+ * Find the partitions that we are now following, whose partition epoch has changed.
*
* @param brokerId The broker id.
* @return A list of (partition ID, partition registration) entries.
@@ -123,7 +124,8 @@ public final class TopicDelta {
if (entry.getValue().leader != brokerId &&
Replicas.contains(entry.getValue().replicas, brokerId)) {
PartitionRegistration prevPartition = image.partitions().get(entry.getKey());
- if (prevPartition == null || prevPartition.leader == brokerId) {
+ if (prevPartition == null ||
+ prevPartition.partitionEpoch != entry.getValue().partitionEpoch) {
results.add(entry);
}
}
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java b/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java
index 4ca5ea8cb33..933bda95cad 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java
@@ -30,6 +30,7 @@ import java.util.Objects;
import static org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_RECORD;
import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
public class PartitionRegistration {
@@ -68,20 +69,26 @@ public class PartitionRegistration {
}
public PartitionRegistration merge(PartitionChangeRecord record) {
+ int[] newReplicas = (record.replicas() == null) ?
+ replicas : Replicas.toArray(record.replicas());
int[] newIsr = (record.isr() == null) ? isr : Replicas.toArray(record.isr());
+ int[] newRemovingReplicas = (record.removingReplicas() == null) ?
+ removingReplicas : Replicas.toArray(record.removingReplicas());
+ int[] newAddingReplicas = (record.addingReplicas() == null) ?
+ addingReplicas : Replicas.toArray(record.addingReplicas());
int newLeader;
int newLeaderEpoch;
- if (record.leader() == LeaderConstants.NO_LEADER_CHANGE) {
+ if (record.leader() == NO_LEADER_CHANGE) {
newLeader = leader;
newLeaderEpoch = leaderEpoch;
} else {
newLeader = record.leader();
newLeaderEpoch = leaderEpoch + 1;
}
- return new PartitionRegistration(replicas,
+ return new PartitionRegistration(newReplicas,
newIsr,
- removingReplicas,
- addingReplicas,
+ newRemovingReplicas,
+ newAddingReplicas,
newLeader,
newLeaderEpoch,
partitionEpoch + 1);
@@ -180,6 +187,13 @@ public class PartitionRegistration {
setIsNew(isNew);
}
+ /**
+ * Returns true if this partition is reassigning.
+ */
+ public boolean isReassigning() {
+ return removingReplicas.length > 0 || addingReplicas.length > 0;
+ }
+
@Override
public int hashCode() {
return Objects.hash(replicas, isr, removingReplicas, addingReplicas, leader,
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/Replicas.java b/metadata/src/main/java/org/apache/kafka/metadata/Replicas.java
index 1a8e1bb0a0e..fa5ef4be4f1 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/Replicas.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/Replicas.java
@@ -19,7 +19,9 @@ package org.apache.kafka.metadata;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
public class Replicas {
@@ -137,6 +139,31 @@ public class Replicas {
return false;
}
+ /**
+ * Check if the first list of integers contains the second.
+ *
+ * @param a The first list
+ * @param b The second list
+ *
+ * @return True only if the first contains the second.
+ */
+ public static boolean contains(List a, int[] b) {
+ List aSorted = new ArrayList<>(a);
+ aSorted.sort(Integer::compareTo);
+ List bSorted = Replicas.toList(b);
+ bSorted.sort(Integer::compareTo);
+ int i = 0;
+ for (int replica : bSorted) {
+ while (true) {
+ if (i >= aSorted.size()) return false;
+ int replica2 = aSorted.get(i++);
+ if (replica2 == replica) break;
+ if (replica2 > replica) return false;
+ }
+ }
+ return true;
+ }
+
/**
* Copy a replica array without any occurrences of the given value.
*
@@ -163,6 +190,32 @@ public class Replicas {
return result;
}
+ /**
+ * Copy a replica array without any occurrences of the given values.
+ *
+ * @param replicas The replica array.
+ * @param values The values to filter out.
+ *
+ * @return A new array without the given value.
+ */
+ public static int[] copyWithout(int[] replicas, int[] values) {
+ int size = 0;
+ for (int i = 0; i < replicas.length; i++) {
+ if (!Replicas.contains(values, replicas[i])) {
+ size++;
+ }
+ }
+ int[] result = new int[size];
+ int j = 0;
+ for (int i = 0; i < replicas.length; i++) {
+ int replica = replicas[i];
+ if (!Replicas.contains(values, replica)) {
+ result[j++] = replica;
+ }
+ }
+ return result;
+ }
+
/**
* Copy a replica array with the given value.
*
@@ -177,4 +230,19 @@ public class Replicas {
newReplicas[newReplicas.length - 1] = value;
return newReplicas;
}
+
+ /**
+ * Convert a replica array to a set.
+ *
+ * @param replicas The replica array.
+ *
+ * @return A new array with the given value.
+ */
+ public static Set toSet(int[] replicas) {
+ Set result = new HashSet<>();
+ for (int replica : replicas) {
+ result.add(replica);
+ }
+ return result;
+ }
}
diff --git a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
new file mode 100644
index 00000000000..1b013f4ee5a
--- /dev/null
+++ b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.controller.PartitionChangeBuilder.BestLeader;
+import org.apache.kafka.metadata.PartitionRegistration;
+import org.apache.kafka.metadata.Replicas;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
+
+import static org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_CHANGE_RECORD;
+import static org.apache.kafka.controller.PartitionChangeBuilder.changeRecordIsNoOp;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+
+@Timeout(value = 40)
+public class PartitionChangeBuilderTest {
+ @Test
+ public void testChangeRecordIsNoOp() {
+ assertTrue(changeRecordIsNoOp(new PartitionChangeRecord()));
+ assertFalse(changeRecordIsNoOp(new PartitionChangeRecord().setLeader(1)));
+ assertFalse(changeRecordIsNoOp(new PartitionChangeRecord().
+ setIsr(Arrays.asList(1, 2, 3))));
+ assertFalse(changeRecordIsNoOp(new PartitionChangeRecord().
+ setRemovingReplicas(Arrays.asList(1))));
+ assertFalse(changeRecordIsNoOp(new PartitionChangeRecord().
+ setAddingReplicas(Arrays.asList(4))));
+ }
+
+ private final static PartitionRegistration FOO = new PartitionRegistration(
+ new int[] {2, 1, 3}, new int[] {2, 1, 3}, Replicas.NONE, Replicas.NONE,
+ 1, 100, 200);
+
+ private final static Uuid FOO_ID = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg");
+
+ private static PartitionChangeBuilder createFooBuilder(boolean allowUnclean) {
+ return new PartitionChangeBuilder(FOO, FOO_ID, 0, r -> r != 3, () -> allowUnclean);
+ }
+
+ private final static PartitionRegistration BAR = new PartitionRegistration(
+ new int[] {1, 2, 3, 4}, new int[] {1, 2, 3}, new int[] {1}, new int[] {4},
+ 1, 100, 200);
+
+ private final static Uuid BAR_ID = Uuid.fromString("LKfUsCBnQKekvL9O5dY9nw");
+
+ private static PartitionChangeBuilder createBarBuilder(boolean allowUnclean) {
+ return new PartitionChangeBuilder(BAR, BAR_ID, 0, r -> r != 3, () -> allowUnclean);
+ }
+
+ private static void assertBestLeaderEquals(PartitionChangeBuilder builder,
+ int expectedNode,
+ boolean expectedUnclean) {
+ BestLeader bestLeader = builder.new BestLeader();
+ assertEquals(expectedNode, bestLeader.node);
+ assertEquals(expectedUnclean, bestLeader.unclean);
+ }
+
+ @Test
+ public void testBestLeader() {
+ assertBestLeaderEquals(createFooBuilder(false), 2, false);
+ assertBestLeaderEquals(createFooBuilder(true), 2, false);
+ assertBestLeaderEquals(createFooBuilder(false).
+ setTargetIsr(Arrays.asList(1, 3)), 1, false);
+ assertBestLeaderEquals(createFooBuilder(true).
+ setTargetIsr(Arrays.asList(1, 3)), 1, false);
+ assertBestLeaderEquals(createFooBuilder(false).
+ setTargetIsr(Arrays.asList(3)), NO_LEADER, false);
+ assertBestLeaderEquals(createFooBuilder(true).
+ setTargetIsr(Arrays.asList(3)), 2, true);
+ assertBestLeaderEquals(createFooBuilder(true).
+ setTargetIsr(Arrays.asList(4)).setTargetReplicas(Arrays.asList(2, 1, 3, 4)),
+ 4, false);
+ }
+
+ @Test
+ public void testShouldTryElection() {
+ assertFalse(createFooBuilder(false).shouldTryElection());
+ assertTrue(createFooBuilder(false).setAlwaysElectPreferredIfPossible(true).
+ shouldTryElection());
+ assertTrue(createFooBuilder(false).setTargetIsr(Arrays.asList(2, 3)).
+ shouldTryElection());
+ assertFalse(createFooBuilder(false).setTargetIsr(Arrays.asList(2, 1)).
+ shouldTryElection());
+ }
+
+ private static void testTriggerLeaderEpochBumpIfNeededLeader(PartitionChangeBuilder builder,
+ PartitionChangeRecord record,
+ int expectedLeader) {
+ builder.triggerLeaderEpochBumpIfNeeded(record);
+ assertEquals(expectedLeader, record.leader());
+ }
+
+ @Test
+ public void testTriggerLeaderEpochBumpIfNeeded() {
+ testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder(false),
+ new PartitionChangeRecord(), NO_LEADER_CHANGE);
+ testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder(false).
+ setTargetIsr(Arrays.asList(2, 1)), new PartitionChangeRecord(), 1);
+ testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder(false).
+ setTargetIsr(Arrays.asList(2, 1, 3, 4)), new PartitionChangeRecord(),
+ NO_LEADER_CHANGE);
+ testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder(false).
+ setTargetReplicas(Arrays.asList(2, 1, 3, 4)), new PartitionChangeRecord(),
+ NO_LEADER_CHANGE);
+ testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder(false).
+ setTargetReplicas(Arrays.asList(2, 1, 3, 4)),
+ new PartitionChangeRecord().setLeader(2), 2);
+ }
+
+ @Test
+ public void testNoChange() {
+ assertEquals(Optional.empty(), createFooBuilder(false).build());
+ assertEquals(Optional.empty(), createFooBuilder(true).build());
+ assertEquals(Optional.empty(), createBarBuilder(false).build());
+ assertEquals(Optional.empty(), createBarBuilder(true).build());
+ }
+
+ @Test
+ public void testIsrChangeAndLeaderBump() {
+ assertEquals(Optional.of(new ApiMessageAndVersion(new PartitionChangeRecord().
+ setTopicId(FOO_ID).
+ setPartitionId(0).
+ setIsr(Arrays.asList(2, 1)).
+ setLeader(1), PARTITION_CHANGE_RECORD.highestSupportedVersion())),
+ createFooBuilder(false).setTargetIsr(Arrays.asList(2, 1)).build());
+ }
+
+ @Test
+ public void testIsrChangeAndLeaderChange() {
+ assertEquals(Optional.of(new ApiMessageAndVersion(new PartitionChangeRecord().
+ setTopicId(FOO_ID).
+ setPartitionId(0).
+ setIsr(Arrays.asList(2, 3)).
+ setLeader(2), PARTITION_CHANGE_RECORD.highestSupportedVersion())),
+ createFooBuilder(false).setTargetIsr(Arrays.asList(2, 3)).build());
+ }
+
+ @Test
+ public void testReassignmentRearrangesReplicas() {
+ assertEquals(Optional.of(new ApiMessageAndVersion(new PartitionChangeRecord().
+ setTopicId(FOO_ID).
+ setPartitionId(0).
+ setReplicas(Arrays.asList(3, 2, 1)),
+ PARTITION_CHANGE_RECORD.highestSupportedVersion())),
+ createFooBuilder(false).setTargetReplicas(Arrays.asList(3, 2, 1)).build());
+ }
+
+ @Test
+ public void testIsrEnlargementCompletesReassignment() {
+ assertEquals(Optional.of(new ApiMessageAndVersion(new PartitionChangeRecord().
+ setTopicId(BAR_ID).
+ setPartitionId(0).
+ setReplicas(Arrays.asList(2, 3, 4)).
+ setIsr(Arrays.asList(2, 3, 4)).
+ setLeader(2).
+ setRemovingReplicas(Collections.emptyList()).
+ setAddingReplicas(Collections.emptyList()),
+ PARTITION_CHANGE_RECORD.highestSupportedVersion())),
+ createBarBuilder(false).setTargetIsr(Arrays.asList(1, 2, 3, 4)).build());
+ }
+
+ @Test
+ public void testRevertReassignment() {
+ PartitionReassignmentRevert revert = new PartitionReassignmentRevert(BAR);
+ assertEquals(Arrays.asList(1, 2, 3), revert.replicas());
+ assertEquals(Arrays.asList(1, 2, 3), revert.isr());
+ assertEquals(Optional.of(new ApiMessageAndVersion(new PartitionChangeRecord().
+ setTopicId(BAR_ID).
+ setPartitionId(0).
+ setReplicas(Arrays.asList(1, 2, 3)).
+ setLeader(1).
+ setRemovingReplicas(Collections.emptyList()).
+ setAddingReplicas(Collections.emptyList()),
+ PARTITION_CHANGE_RECORD.highestSupportedVersion())),
+ createBarBuilder(false).
+ setTargetReplicas(revert.replicas()).
+ setTargetIsr(revert.isr()).
+ setTargetRemoving(Collections.emptyList()).
+ setTargetAdding(Collections.emptyList()).
+ build());
+ }
+
+ @Test
+ public void testRemovingReplicaReassignment() {
+ PartitionReassignmentReplicas replicas = new PartitionReassignmentReplicas(
+ Replicas.toList(FOO.replicas), Arrays.asList(1, 2));
+ assertEquals(Collections.singletonList(3), replicas.removing());
+ assertEquals(Collections.emptyList(), replicas.adding());
+ assertEquals(Arrays.asList(1, 2, 3), replicas.merged());
+ assertEquals(Optional.of(new ApiMessageAndVersion(new PartitionChangeRecord().
+ setTopicId(FOO_ID).
+ setPartitionId(0).
+ setReplicas(Arrays.asList(1, 2)).
+ setIsr(Arrays.asList(2, 1)).
+ setLeader(1),
+ PARTITION_CHANGE_RECORD.highestSupportedVersion())),
+ createFooBuilder(false).
+ setTargetReplicas(replicas.merged()).
+ setTargetRemoving(replicas.removing()).
+ build());
+ }
+
+ @Test
+ public void testAddingReplicaReassignment() {
+ PartitionReassignmentReplicas replicas = new PartitionReassignmentReplicas(
+ Replicas.toList(FOO.replicas), Arrays.asList(1, 2, 3, 4));
+ assertEquals(Collections.emptyList(), replicas.removing());
+ assertEquals(Collections.singletonList(4), replicas.adding());
+ assertEquals(Arrays.asList(1, 2, 3, 4), replicas.merged());
+ assertEquals(Optional.of(new ApiMessageAndVersion(new PartitionChangeRecord().
+ setTopicId(FOO_ID).
+ setPartitionId(0).
+ setReplicas(Arrays.asList(1, 2, 3, 4)).
+ setAddingReplicas(Collections.singletonList(4)),
+ PARTITION_CHANGE_RECORD.highestSupportedVersion())),
+ createFooBuilder(false).
+ setTargetReplicas(replicas.merged()).
+ setTargetAdding(replicas.adding()).
+ build());
+ }
+}
diff --git a/metadata/src/test/java/org/apache/kafka/controller/PartitionReassignmentReplicasTest.java b/metadata/src/test/java/org/apache/kafka/controller/PartitionReassignmentReplicasTest.java
new file mode 100644
index 00000000000..c74090e5c5d
--- /dev/null
+++ b/metadata/src/test/java/org/apache/kafka/controller/PartitionReassignmentReplicasTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+
+@Timeout(40)
+public class PartitionReassignmentReplicasTest {
+ @Test
+ public void testNoneAddedOrRemoved() {
+ PartitionReassignmentReplicas replicas = new PartitionReassignmentReplicas(
+ Arrays.asList(3, 2, 1), Arrays.asList(3, 2, 1));
+ assertEquals(Collections.emptyList(), replicas.removing());
+ assertEquals(Collections.emptyList(), replicas.adding());
+ assertEquals(Arrays.asList(3, 2, 1), replicas.merged());
+ }
+
+ @Test
+ public void testAdditions() {
+ PartitionReassignmentReplicas replicas = new PartitionReassignmentReplicas(
+ Arrays.asList(3, 2, 1), Arrays.asList(3, 6, 2, 1, 5));
+ assertEquals(Collections.emptyList(), replicas.removing());
+ assertEquals(Arrays.asList(5, 6), replicas.adding());
+ assertEquals(Arrays.asList(3, 6, 2, 1, 5), replicas.merged());
+ }
+
+ @Test
+ public void testRemovals() {
+ PartitionReassignmentReplicas replicas = new PartitionReassignmentReplicas(
+ Arrays.asList(3, 2, 1, 0), Arrays.asList(3, 1));
+ assertEquals(Arrays.asList(0, 2), replicas.removing());
+ assertEquals(Collections.emptyList(), replicas.adding());
+ assertEquals(Arrays.asList(3, 1, 0, 2), replicas.merged());
+ }
+
+ @Test
+ public void testAdditionsAndRemovals() {
+ PartitionReassignmentReplicas replicas = new PartitionReassignmentReplicas(
+ Arrays.asList(3, 2, 1, 0), Arrays.asList(7, 3, 1, 9));
+ assertEquals(Arrays.asList(0, 2), replicas.removing());
+ assertEquals(Arrays.asList(7, 9), replicas.adding());
+ assertEquals(Arrays.asList(7, 3, 1, 9, 0, 2), replicas.merged());
+ }
+
+ @Test
+ public void testRearrangement() {
+ PartitionReassignmentReplicas replicas = new PartitionReassignmentReplicas(
+ Arrays.asList(3, 2, 1, 0), Arrays.asList(0, 1, 3, 2));
+ assertEquals(Collections.emptyList(), replicas.removing());
+ assertEquals(Collections.emptyList(), replicas.adding());
+ assertEquals(Arrays.asList(0, 1, 3, 2), replicas.merged());
+ }
+}
diff --git a/metadata/src/test/java/org/apache/kafka/controller/PartitionReassignmentRevertTest.java b/metadata/src/test/java/org/apache/kafka/controller/PartitionReassignmentRevertTest.java
new file mode 100644
index 00000000000..26120be8fb8
--- /dev/null
+++ b/metadata/src/test/java/org/apache/kafka/controller/PartitionReassignmentRevertTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.Arrays;
+
+import org.apache.kafka.metadata.PartitionRegistration;
+import org.apache.kafka.metadata.Replicas;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+
+@Timeout(40)
+public class PartitionReassignmentRevertTest {
+ @Test
+ public void testNoneAddedOrRemoved() {
+ PartitionRegistration registration = new PartitionRegistration(
+ new int[] {3, 2, 1}, new int[] {3, 2},
+ Replicas.NONE, Replicas.NONE, 3, 100, 200);
+ PartitionReassignmentRevert revert = new PartitionReassignmentRevert(registration);
+ assertEquals(Arrays.asList(3, 2, 1), revert.replicas());
+ assertEquals(Arrays.asList(3, 2), revert.isr());
+ assertFalse(revert.unclean());
+ }
+
+ @Test
+ public void testSomeRemoving() {
+ PartitionRegistration registration = new PartitionRegistration(
+ new int[] {3, 2, 1}, new int[] {3, 2},
+ new int[] {2, 1}, Replicas.NONE, 3, 100, 200);
+ PartitionReassignmentRevert revert = new PartitionReassignmentRevert(registration);
+ assertEquals(Arrays.asList(3, 2, 1), revert.replicas());
+ assertEquals(Arrays.asList(3, 2), revert.isr());
+ assertFalse(revert.unclean());
+ }
+
+ @Test
+ public void testSomeAdding() {
+ PartitionRegistration registration = new PartitionRegistration(
+ new int[] {4, 5, 3, 2, 1}, new int[] {4, 5, 2},
+ Replicas.NONE, new int[] {4, 5}, 3, 100, 200);
+ PartitionReassignmentRevert revert = new PartitionReassignmentRevert(registration);
+ assertEquals(Arrays.asList(3, 2, 1), revert.replicas());
+ assertEquals(Arrays.asList(2), revert.isr());
+ assertFalse(revert.unclean());
+ }
+
+ @Test
+ public void testSomeRemovingAndAdding() {
+ PartitionRegistration registration = new PartitionRegistration(
+ new int[] {4, 5, 3, 2, 1}, new int[] {4, 5, 2},
+ new int[] {2}, new int[] {4, 5}, 3, 100, 200);
+ PartitionReassignmentRevert revert = new PartitionReassignmentRevert(registration);
+ assertEquals(Arrays.asList(3, 2, 1), revert.replicas());
+ assertEquals(Arrays.asList(2), revert.isr());
+ assertFalse(revert.unclean());
+ }
+
+ @Test
+ public void testIsrSpecialCase() {
+ PartitionRegistration registration = new PartitionRegistration(
+ new int[] {4, 5, 3, 2, 1}, new int[] {4, 5},
+ new int[] {2}, new int[] {4, 5}, 3, 100, 200);
+ PartitionReassignmentRevert revert = new PartitionReassignmentRevert(registration);
+ assertEquals(Arrays.asList(3, 2, 1), revert.replicas());
+ assertEquals(Arrays.asList(3), revert.isr());
+ assertTrue(revert.unclean());
+ }
+}
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index 911f955cde7..ba2d52a1259 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -547,7 +547,7 @@ public class QuorumControllerTest {
setTopics(Collections.singletonList(new ReassignableTopic())));
CompletableFuture listReassignmentsFuture =
controller.listPartitionReassignments(
- new ListPartitionReassignmentsRequestData().setTimeoutMs(0));
+ new ListPartitionReassignmentsRequestData().setTopics(null).setTimeoutMs(0));
while (controller.time().nanoseconds() == now) {
Thread.sleep(0, 10);
}
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index 628f9ed68c3..f67756d5f0b 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -23,7 +23,15 @@ import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
import org.apache.kafka.common.errors.StaleBrokerEpochException;
import org.apache.kafka.common.message.AlterIsrRequestData;
+import org.apache.kafka.common.message.AlterIsrRequestData.PartitionData;
+import org.apache.kafka.common.message.AlterIsrRequestData.TopicData;
import org.apache.kafka.common.message.AlterIsrResponseData;
+import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
+import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData.ReassignablePartition;
+import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData.ReassignableTopic;
+import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
+import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse;
+import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignableTopicResponse;
import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsAssignment;
import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic;
@@ -40,6 +48,10 @@ import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitionsCo
import org.apache.kafka.common.message.ElectLeadersResponseData;
import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
+import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData.ListPartitionReassignmentsTopics;
+import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingPartitionReassignment;
+import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingTopicReassignment;
+import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
@@ -49,15 +61,17 @@ import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.metadata.BrokerHeartbeatReply;
-import org.apache.kafka.metadata.BrokerRegistration;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.metadata.RecordTestUtils;
import org.apache.kafka.metadata.Replicas;
import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.metadata.BrokerHeartbeatReply;
+import org.apache.kafka.metadata.BrokerRegistration;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
@@ -71,10 +85,12 @@ import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import static org.apache.kafka.common.protocol.Errors.FENCED_LEADER_EPOCH;
import static org.apache.kafka.common.protocol.Errors.INVALID_PARTITIONS;
import static org.apache.kafka.common.protocol.Errors.INVALID_REPLICA_ASSIGNMENT;
import static org.apache.kafka.common.protocol.Errors.INVALID_TOPIC_EXCEPTION;
import static org.apache.kafka.common.protocol.Errors.NONE;
+import static org.apache.kafka.common.protocol.Errors.NO_REASSIGNMENT_IN_PROGRESS;
import static org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_ID;
import static org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_OR_PARTITION;
import static org.apache.kafka.controller.BrokersToIsrs.TopicIdPartition;
@@ -88,6 +104,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
@Timeout(40)
public class ReplicationControlManagerTest {
+ private final static Logger log = LoggerFactory.getLogger(ReplicationControlManagerTest.class);
+
private static class ReplicationControlTestContext {
final SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
final LogContext logContext = new LogContext();
@@ -189,6 +207,19 @@ public class ReplicationControlManagerTest {
replay(result.records());
}
}
+
+ long currentBrokerEpoch(int brokerId) {
+ Map registrations = clusterControl.brokerRegistrations();
+ BrokerRegistration registration = registrations.get(brokerId);
+ assertNotNull(registration, "No current registration for broker " + brokerId);
+ return registration.epoch();
+ }
+
+ OptionalInt currentLeader(TopicIdPartition topicIdPartition) {
+ PartitionRegistration partition = replicationControl.
+ getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
+ return (partition.leader < 0) ? OptionalInt.empty() : OptionalInt.of(partition.leader);
+ }
}
@Test
@@ -393,9 +424,9 @@ public class ReplicationControlManagerTest {
TopicIdPartition topicIdPartition = new TopicIdPartition(createTopicResult.topicId(), 0);
TopicPartition topicPartition = new TopicPartition("foo", 0);
- assertEquals(OptionalInt.of(0), currentLeader(replicationControl, topicIdPartition));
- long brokerEpoch = currentBrokerEpoch(ctx, 0);
- AlterIsrRequestData.PartitionData shrinkIsrRequest = newAlterIsrPartition(
+ assertEquals(OptionalInt.of(0), ctx.currentLeader(topicIdPartition));
+ long brokerEpoch = ctx.currentBrokerEpoch(0);
+ PartitionData shrinkIsrRequest = newAlterIsrPartition(
replicationControl, topicIdPartition, Arrays.asList(0, 1));
ControllerResult shrinkIsrResult = sendAlterIsr(
replicationControl, 0, brokerEpoch, "foo", shrinkIsrRequest);
@@ -403,7 +434,7 @@ public class ReplicationControlManagerTest {
shrinkIsrResult, topicPartition, NONE);
assertConsistentAlterIsrResponse(replicationControl, topicIdPartition, shrinkIsrResponse);
- AlterIsrRequestData.PartitionData expandIsrRequest = newAlterIsrPartition(
+ PartitionData expandIsrRequest = newAlterIsrPartition(
replicationControl, topicIdPartition, Arrays.asList(0, 1, 2));
ControllerResult expandIsrResult = sendAlterIsr(
replicationControl, 0, brokerEpoch, "foo", expandIsrRequest);
@@ -423,75 +454,52 @@ public class ReplicationControlManagerTest {
TopicIdPartition topicIdPartition = new TopicIdPartition(createTopicResult.topicId(), 0);
TopicPartition topicPartition = new TopicPartition("foo", 0);
- assertEquals(OptionalInt.of(0), currentLeader(replicationControl, topicIdPartition));
- long brokerEpoch = currentBrokerEpoch(ctx, 0);
+ assertEquals(OptionalInt.of(0), ctx.currentLeader(topicIdPartition));
+ long brokerEpoch = ctx.currentBrokerEpoch(0);
// Invalid leader
- AlterIsrRequestData.PartitionData invalidLeaderRequest = newAlterIsrPartition(
+ PartitionData invalidLeaderRequest = newAlterIsrPartition(
replicationControl, topicIdPartition, Arrays.asList(0, 1));
ControllerResult invalidLeaderResult = sendAlterIsr(
- replicationControl, 1, currentBrokerEpoch(ctx, 1),
+ replicationControl, 1, ctx.currentBrokerEpoch(1),
"foo", invalidLeaderRequest);
assertAlterIsrResponse(invalidLeaderResult, topicPartition, Errors.INVALID_REQUEST);
// Stale broker epoch
- AlterIsrRequestData.PartitionData invalidBrokerEpochRequest = newAlterIsrPartition(
+ PartitionData invalidBrokerEpochRequest = newAlterIsrPartition(
replicationControl, topicIdPartition, Arrays.asList(0, 1));
assertThrows(StaleBrokerEpochException.class, () -> sendAlterIsr(
replicationControl, 0, brokerEpoch - 1, "foo", invalidBrokerEpochRequest));
// Invalid leader epoch
- AlterIsrRequestData.PartitionData invalidLeaderEpochRequest = newAlterIsrPartition(
+ PartitionData invalidLeaderEpochRequest = newAlterIsrPartition(
replicationControl, topicIdPartition, Arrays.asList(0, 1));
invalidLeaderEpochRequest.setLeaderEpoch(500);
ControllerResult invalidLeaderEpochResult = sendAlterIsr(
- replicationControl, 1, currentBrokerEpoch(ctx, 1),
+ replicationControl, 1, ctx.currentBrokerEpoch(1),
"foo", invalidLeaderEpochRequest);
- assertAlterIsrResponse(invalidLeaderEpochResult, topicPartition, Errors.INVALID_REQUEST);
+ assertAlterIsrResponse(invalidLeaderEpochResult, topicPartition, FENCED_LEADER_EPOCH);
// Invalid ISR (3 is not a valid replica)
- AlterIsrRequestData.PartitionData invalidIsrRequest1 = newAlterIsrPartition(
+ PartitionData invalidIsrRequest1 = newAlterIsrPartition(
replicationControl, topicIdPartition, Arrays.asList(0, 1));
invalidIsrRequest1.setNewIsr(Arrays.asList(0, 1, 3));
ControllerResult invalidIsrResult1 = sendAlterIsr(
- replicationControl, 1, currentBrokerEpoch(ctx, 1),
+ replicationControl, 1, ctx.currentBrokerEpoch(1),
"foo", invalidIsrRequest1);
assertAlterIsrResponse(invalidIsrResult1, topicPartition, Errors.INVALID_REQUEST);
// Invalid ISR (does not include leader 0)
- AlterIsrRequestData.PartitionData invalidIsrRequest2 = newAlterIsrPartition(
+ PartitionData invalidIsrRequest2 = newAlterIsrPartition(
replicationControl, topicIdPartition, Arrays.asList(0, 1));
invalidIsrRequest2.setNewIsr(Arrays.asList(1, 2));
ControllerResult invalidIsrResult2 = sendAlterIsr(
- replicationControl, 1, currentBrokerEpoch(ctx, 1),
+ replicationControl, 1, ctx.currentBrokerEpoch(1),
"foo", invalidIsrRequest2);
assertAlterIsrResponse(invalidIsrResult2, topicPartition, Errors.INVALID_REQUEST);
}
- private long currentBrokerEpoch(
- ReplicationControlTestContext ctx,
- int brokerId
- ) {
- Map registrations = ctx.clusterControl.brokerRegistrations();
- BrokerRegistration registration = registrations.get(brokerId);
- assertNotNull(registration, "No current registration for broker " + brokerId);
- return registration.epoch();
- }
-
- private OptionalInt currentLeader(
- ReplicationControlManager replicationControl,
- TopicIdPartition topicIdPartition
- ) {
- PartitionRegistration partitionControl =
- replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
- if (partitionControl.leader < 0) {
- return OptionalInt.empty();
- } else {
- return OptionalInt.of(partitionControl.leader);
- }
- }
-
- private AlterIsrRequestData.PartitionData newAlterIsrPartition(
+ private PartitionData newAlterIsrPartition(
ReplicationControlManager replicationControl,
TopicIdPartition topicIdPartition,
List newIsr
@@ -777,18 +785,226 @@ public class ReplicationControlManagerTest {
OptionalInt.of(3))).getMessage());
}
+ private final static ListPartitionReassignmentsResponseData NONE_REASSIGNING =
+ new ListPartitionReassignmentsResponseData().setErrorMessage(null);
+
@Test
- public void testBestLeader() {
- assertEquals(2, ReplicationControlManager.bestLeader(
- new int[]{1, 2, 3, 4}, new int[]{4, 2, 3}, false, __ -> true));
- assertEquals(3, ReplicationControlManager.bestLeader(
- new int[]{3, 2, 1, 4}, new int[]{4, 2, 3}, false, __ -> true));
- assertEquals(4, ReplicationControlManager.bestLeader(
- new int[]{3, 2, 1, 4}, new int[]{4, 2, 3}, false, r -> r == 4));
- assertEquals(-1, ReplicationControlManager.bestLeader(
- new int[]{3, 4, 5}, new int[]{1, 2}, false, r -> r == 4));
- assertEquals(4, ReplicationControlManager.bestLeader(
- new int[]{3, 4, 5}, new int[]{1, 2}, true, r -> r == 4));
+ public void testReassignPartitions() throws Exception {
+ ReplicationControlTestContext ctx = new ReplicationControlTestContext();
+ ReplicationControlManager replication = ctx.replicationControl;
+ ctx.registerBrokers(0, 1, 2, 3);
+ ctx.unfenceBrokers(0, 1, 2, 3);
+ Uuid fooId = ctx.createTestTopic("foo", new int[][] {
+ new int[] {1, 2, 3}, new int[] {3, 2, 1}}).topicId();
+ ctx.createTestTopic("bar", new int[][] {
+ new int[] {1, 2, 3}}).topicId();
+ assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(null));
+ ControllerResult alterResult =
+ replication.alterPartitionReassignments(
+ new AlterPartitionReassignmentsRequestData().setTopics(Arrays.asList(
+ new ReassignableTopic().setName("foo").setPartitions(Arrays.asList(
+ new ReassignablePartition().setPartitionIndex(0).
+ setReplicas(Arrays.asList(3, 2, 1)),
+ new ReassignablePartition().setPartitionIndex(1).
+ setReplicas(Arrays.asList(0, 2, 1)),
+ new ReassignablePartition().setPartitionIndex(2).
+ setReplicas(Arrays.asList(0, 2, 1)))),
+ new ReassignableTopic().setName("bar"))));
+ assertEquals(new AlterPartitionReassignmentsResponseData().
+ setErrorMessage(null).setResponses(Arrays.asList(
+ new ReassignableTopicResponse().setName("foo").setPartitions(Arrays.asList(
+ new ReassignablePartitionResponse().setPartitionIndex(0).
+ setErrorMessage(null),
+ new ReassignablePartitionResponse().setPartitionIndex(1).
+ setErrorMessage(null),
+ new ReassignablePartitionResponse().setPartitionIndex(2).
+ setErrorCode(UNKNOWN_TOPIC_OR_PARTITION.code()).
+ setErrorMessage("Unable to find partition foo:2."))),
+ new ReassignableTopicResponse().
+ setName("bar"))),
+ alterResult.response());
+ ctx.replay(alterResult.records());
+ ListPartitionReassignmentsResponseData currentReassigning =
+ new ListPartitionReassignmentsResponseData().setErrorMessage(null).
+ setTopics(Arrays.asList(new OngoingTopicReassignment().
+ setName("foo").setPartitions(Arrays.asList(
+ new OngoingPartitionReassignment().setPartitionIndex(1).
+ setRemovingReplicas(Arrays.asList(3)).
+ setAddingReplicas(Arrays.asList(0)).
+ setReplicas(Arrays.asList(0, 2, 1, 3))))));
+ assertEquals(currentReassigning, replication.listPartitionReassignments(null));
+ assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(Arrays.asList(
+ new ListPartitionReassignmentsTopics().setName("bar").
+ setPartitionIndexes(Arrays.asList(0, 1, 2)))));
+ assertEquals(currentReassigning, replication.listPartitionReassignments(Arrays.asList(
+ new ListPartitionReassignmentsTopics().setName("foo").
+ setPartitionIndexes(Arrays.asList(0, 1, 2)))));
+ ControllerResult cancelResult =
+ replication.alterPartitionReassignments(
+ new AlterPartitionReassignmentsRequestData().setTopics(Arrays.asList(
+ new ReassignableTopic().setName("foo").setPartitions(Arrays.asList(
+ new ReassignablePartition().setPartitionIndex(0).
+ setReplicas(null),
+ new ReassignablePartition().setPartitionIndex(1).
+ setReplicas(null),
+ new ReassignablePartition().setPartitionIndex(2).
+ setReplicas(null))),
+ new ReassignableTopic().setName("bar").setPartitions(Arrays.asList(
+ new ReassignablePartition().setPartitionIndex(0).
+ setReplicas(null))))));
+ assertEquals(ControllerResult.atomicOf(Collections.singletonList(new ApiMessageAndVersion(
+ new PartitionChangeRecord().setTopicId(fooId).
+ setPartitionId(1).
+ setReplicas(Arrays.asList(2, 1, 3)).
+ setLeader(3).
+ setRemovingReplicas(Collections.emptyList()).
+ setAddingReplicas(Collections.emptyList()), (short) 0)),
+ new AlterPartitionReassignmentsResponseData().setErrorMessage(null).setResponses(Arrays.asList(
+ new ReassignableTopicResponse().setName("foo").setPartitions(Arrays.asList(
+ new ReassignablePartitionResponse().setPartitionIndex(0).
+ setErrorCode(NO_REASSIGNMENT_IN_PROGRESS.code()).setErrorMessage(null),
+ new ReassignablePartitionResponse().setPartitionIndex(1).
+ setErrorCode(NONE.code()).setErrorMessage(null),
+ new ReassignablePartitionResponse().setPartitionIndex(2).
+ setErrorCode(UNKNOWN_TOPIC_OR_PARTITION.code()).
+ setErrorMessage("Unable to find partition foo:2."))),
+ new ReassignableTopicResponse().setName("bar").setPartitions(Arrays.asList(
+ new ReassignablePartitionResponse().setPartitionIndex(0).
+ setErrorCode(NO_REASSIGNMENT_IN_PROGRESS.code()).
+ setErrorMessage(null)))))),
+ cancelResult);
+ log.info("running final alterIsr...");
+ ControllerResult alterIsrResult = replication.alterIsr(
+ new AlterIsrRequestData().setBrokerId(3).setBrokerEpoch(103).
+ setTopics(Arrays.asList(new TopicData().setName("foo").setPartitions(Arrays.asList(
+ new PartitionData().setPartitionIndex(1).setCurrentIsrVersion(1).
+ setLeaderEpoch(0).setNewIsr(Arrays.asList(3, 0, 2, 1)))))));
+ assertEquals(new AlterIsrResponseData().setTopics(Arrays.asList(
+ new AlterIsrResponseData.TopicData().setName("foo").setPartitions(Arrays.asList(
+ new AlterIsrResponseData.PartitionData().
+ setPartitionIndex(1).
+ setErrorCode(FENCED_LEADER_EPOCH.code()))))),
+ alterIsrResult.response());
+ ctx.replay(alterIsrResult.records());
+ assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(null));
+ }
+
+ @Test
+ public void testCancelReassignPartitions() throws Exception {
+ ReplicationControlTestContext ctx = new ReplicationControlTestContext();
+ ReplicationControlManager replication = ctx.replicationControl;
+ ctx.registerBrokers(0, 1, 2, 3, 4);
+ ctx.unfenceBrokers(0, 1, 2, 3, 4);
+ Uuid fooId = ctx.createTestTopic("foo", new int[][] {
+ new int[] {1, 2, 3, 4}, new int[] {0, 1, 2, 3}, new int[] {4, 3, 1, 0},
+ new int[] {2, 3, 4, 1}}).topicId();
+ Uuid barId = ctx.createTestTopic("bar", new int[][] {
+ new int[] {4, 3, 2}}).topicId();
+ assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(null));
+ List fenceRecords = new ArrayList<>();
+ replication.handleBrokerFenced(3, fenceRecords);
+ ctx.replay(fenceRecords);
+ assertEquals(new PartitionRegistration(new int[] {1, 2, 3, 4}, new int[] {1, 2, 4},
+ new int[] {}, new int[] {}, 1, 1, 1), replication.getPartition(fooId, 0));
+ ControllerResult alterResult =
+ replication.alterPartitionReassignments(
+ new AlterPartitionReassignmentsRequestData().setTopics(Arrays.asList(
+ new ReassignableTopic().setName("foo").setPartitions(Arrays.asList(
+ new ReassignablePartition().setPartitionIndex(0).
+ setReplicas(Arrays.asList(1, 2, 3)),
+ new ReassignablePartition().setPartitionIndex(1).
+ setReplicas(Arrays.asList(1, 2, 3, 0)),
+ new ReassignablePartition().setPartitionIndex(2).
+ setReplicas(Arrays.asList(5, 6, 7)),
+ new ReassignablePartition().setPartitionIndex(3).
+ setReplicas(Arrays.asList()))),
+ new ReassignableTopic().setName("bar").setPartitions(Arrays.asList(
+ new ReassignablePartition().setPartitionIndex(0).
+ setReplicas(Arrays.asList(1, 2, 3, 4, 0)))))));
+ assertEquals(new AlterPartitionReassignmentsResponseData().
+ setErrorMessage(null).setResponses(Arrays.asList(
+ new ReassignableTopicResponse().setName("foo").setPartitions(Arrays.asList(
+ new ReassignablePartitionResponse().setPartitionIndex(0).
+ setErrorMessage(null),
+ new ReassignablePartitionResponse().setPartitionIndex(1).
+ setErrorMessage(null),
+ new ReassignablePartitionResponse().setPartitionIndex(2).
+ setErrorCode(INVALID_REPLICA_ASSIGNMENT.code()).
+ setErrorMessage("The manual partition assignment includes broker 5, " +
+ "but no such broker is registered."),
+ new ReassignablePartitionResponse().setPartitionIndex(3).
+ setErrorCode(INVALID_REPLICA_ASSIGNMENT.code()).
+ setErrorMessage("The manual partition assignment includes an empty " +
+ "replica list."))),
+ new ReassignableTopicResponse().setName("bar").setPartitions(Arrays.asList(
+ new ReassignablePartitionResponse().setPartitionIndex(0).
+ setErrorMessage(null))))),
+ alterResult.response());
+ ctx.replay(alterResult.records());
+ assertEquals(new PartitionRegistration(new int[] {1, 2, 3}, new int[] {1, 2},
+ new int[] {}, new int[] {}, 1, 2, 2), replication.getPartition(fooId, 0));
+ assertEquals(new PartitionRegistration(new int[] {1, 2, 3, 0}, new int[] {0, 1, 2},
+ new int[] {}, new int[] {}, 0, 1, 2), replication.getPartition(fooId, 1));
+ assertEquals(new PartitionRegistration(new int[] {1, 2, 3, 4, 0}, new int[] {4, 2},
+ new int[] {}, new int[] {0, 1}, 4, 1, 2), replication.getPartition(barId, 0));
+ ListPartitionReassignmentsResponseData currentReassigning =
+ new ListPartitionReassignmentsResponseData().setErrorMessage(null).
+ setTopics(Arrays.asList(new OngoingTopicReassignment().
+ setName("bar").setPartitions(Arrays.asList(
+ new OngoingPartitionReassignment().setPartitionIndex(0).
+ setRemovingReplicas(Collections.emptyList()).
+ setAddingReplicas(Arrays.asList(0, 1)).
+ setReplicas(Arrays.asList(1, 2, 3, 4, 0))))));
+ assertEquals(currentReassigning, replication.listPartitionReassignments(null));
+ assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(Arrays.asList(
+ new ListPartitionReassignmentsTopics().setName("foo").
+ setPartitionIndexes(Arrays.asList(0, 1, 2)))));
+ assertEquals(currentReassigning, replication.listPartitionReassignments(Arrays.asList(
+ new ListPartitionReassignmentsTopics().setName("bar").
+ setPartitionIndexes(Arrays.asList(0, 1, 2)))));
+ ControllerResult alterIsrResult = replication.alterIsr(
+ new AlterIsrRequestData().setBrokerId(4).setBrokerEpoch(104).
+ setTopics(Arrays.asList(new TopicData().setName("bar").setPartitions(Arrays.asList(
+ new PartitionData().setPartitionIndex(0).setCurrentIsrVersion(2).
+ setLeaderEpoch(1).setNewIsr(Arrays.asList(4, 1, 2, 3, 0)))))));
+ assertEquals(new AlterIsrResponseData().setTopics(Arrays.asList(
+ new AlterIsrResponseData.TopicData().setName("bar").setPartitions(Arrays.asList(
+ new AlterIsrResponseData.PartitionData().
+ setPartitionIndex(0).
+ setLeaderId(4).
+ setLeaderEpoch(1).
+ setIsr(Arrays.asList(4, 1, 2, 3, 0)).
+ setCurrentIsrVersion(3).
+ setErrorCode(NONE.code()))))),
+ alterIsrResult.response());
+ ControllerResult cancelResult =
+ replication.alterPartitionReassignments(
+ new AlterPartitionReassignmentsRequestData().setTopics(Arrays.asList(
+ new ReassignableTopic().setName("foo").setPartitions(Arrays.asList(
+ new ReassignablePartition().setPartitionIndex(0).
+ setReplicas(null))),
+ new ReassignableTopic().setName("bar").setPartitions(Arrays.asList(
+ new ReassignablePartition().setPartitionIndex(0).
+ setReplicas(null))))));
+ assertEquals(ControllerResult.atomicOf(Collections.singletonList(new ApiMessageAndVersion(
+ new PartitionChangeRecord().setTopicId(barId).
+ setPartitionId(0).
+ setLeader(4).
+ setReplicas(Arrays.asList(2, 3, 4)).
+ setRemovingReplicas(null).
+ setAddingReplicas(Collections.emptyList()), (short) 0)),
+ new AlterPartitionReassignmentsResponseData().setErrorMessage(null).setResponses(Arrays.asList(
+ new ReassignableTopicResponse().setName("foo").setPartitions(Arrays.asList(
+ new ReassignablePartitionResponse().setPartitionIndex(0).
+ setErrorCode(NO_REASSIGNMENT_IN_PROGRESS.code()).setErrorMessage(null))),
+ new ReassignableTopicResponse().setName("bar").setPartitions(Arrays.asList(
+ new ReassignablePartitionResponse().setPartitionIndex(0).
+ setErrorMessage(null)))))),
+ cancelResult);
+ ctx.replay(cancelResult.records());
+ assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(null));
+ assertEquals(new PartitionRegistration(new int[] {2, 3, 4}, new int[] {4, 2},
+ new int[] {}, new int[] {}, 4, 2, 3), replication.getPartition(barId, 0));
}
@Test
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java b/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java
index 787f0327953..9b1be5d4b3d 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java
@@ -105,4 +105,24 @@ public class PartitionRegistrationTest {
setIsNew(false).toString(),
b.toLeaderAndIsrPartitionState(new TopicPartition("bar", 0), false).toString());
}
+
+ @Test
+ public void testMergePartitionChangeRecordWithReassignmentData() {
+ PartitionRegistration partition0 = new PartitionRegistration(new int[] {1, 2, 3},
+ new int[] {1, 2, 3}, Replicas.NONE, Replicas.NONE, 1, 100, 200);
+ PartitionRegistration partition1 = partition0.merge(new PartitionChangeRecord().
+ setRemovingReplicas(Collections.singletonList(3)).
+ setAddingReplicas(Collections.singletonList(4)).
+ setReplicas(Arrays.asList(1, 2, 3, 4)));
+ assertEquals(new PartitionRegistration(new int[] {1, 2, 3, 4},
+ new int[] {1, 2, 3}, new int[] {3}, new int[] {4}, 1, 100, 201), partition1);
+ PartitionRegistration partition2 = partition1.merge(new PartitionChangeRecord().
+ setIsr(Arrays.asList(1, 2, 4)).
+ setRemovingReplicas(Collections.emptyList()).
+ setAddingReplicas(Collections.emptyList()).
+ setReplicas(Arrays.asList(1, 2, 4)));
+ assertEquals(new PartitionRegistration(new int[] {1, 2, 4},
+ new int[] {1, 2, 4}, Replicas.NONE, Replicas.NONE, 1, 100, 202), partition2);
+ assertFalse(partition2.isReassigning());
+ }
}
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/ReplicasTest.java b/metadata/src/test/java/org/apache/kafka/metadata/ReplicasTest.java
index bf6f16efd3a..7a26d48f63b 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/ReplicasTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/ReplicasTest.java
@@ -21,6 +21,8 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -88,9 +90,39 @@ public class ReplicasTest {
assertArrayEquals(new int[] {4, 1}, Replicas.copyWithout(new int[] {4, 2, 2, 1}, 2));
}
+ @Test
+ public void testCopyWithout2() {
+ assertArrayEquals(new int[] {}, Replicas.copyWithout(new int[] {}, new int[] {}));
+ assertArrayEquals(new int[] {}, Replicas.copyWithout(new int[] {1}, new int[] {1}));
+ assertArrayEquals(new int[] {1, 3},
+ Replicas.copyWithout(new int[] {1, 2, 3}, new int[]{2, 4}));
+ assertArrayEquals(new int[] {4},
+ Replicas.copyWithout(new int[] {4, 2, 2, 1}, new int[]{2, 1}));
+ }
+
@Test
public void testCopyWith() {
assertArrayEquals(new int[] {-1}, Replicas.copyWith(new int[] {}, -1));
assertArrayEquals(new int[] {1, 2, 3, 4}, Replicas.copyWith(new int[] {1, 2, 3}, 4));
}
+
+ @Test
+ public void testToSet() {
+ assertEquals(Collections.emptySet(), Replicas.toSet(new int[] {}));
+ assertEquals(new HashSet<>(Arrays.asList(3, 1, 5)),
+ Replicas.toSet(new int[] {1, 3, 5}));
+ assertEquals(new HashSet<>(Arrays.asList(1, 2, 10)),
+ Replicas.toSet(new int[] {1, 1, 2, 10, 10}));
+ }
+
+ @Test
+ public void testContains2() {
+ assertTrue(Replicas.contains(Collections.emptyList(), Replicas.NONE));
+ assertFalse(Replicas.contains(Collections.emptyList(), new int[] {1}));
+ assertTrue(Replicas.contains(Arrays.asList(1, 2, 3), new int[] {3, 2, 1}));
+ assertTrue(Replicas.contains(Arrays.asList(1, 2, 3, 4), new int[] {3}));
+ assertTrue(Replicas.contains(Arrays.asList(1, 2, 3, 4), new int[] {3, 1}));
+ assertFalse(Replicas.contains(Arrays.asList(1, 2, 3, 4), new int[] {3, 1, 7}));
+ assertTrue(Replicas.contains(Arrays.asList(1, 2, 3, 4), new int[] {}));
+ }
}