KAFKA-12803: Support reassigning partitions when in KRaft mode (#10753)

Support the KIP-455 reassignment API when in KRaft mode. Reassignments
which merely rearrange partitions complete immediately. Those that only
remove a partition complete immediately if the ISR would be non-empty
after the specified removals. Reassignments that add one or more
partitions follow the KIP-455 pattern of adding all the adding replicas
to the replica set, and then waiting for the ISR to include all the new
partitions before completing. Changes to the partition sets are
accomplished via PartitionChangeRecord.

Reviewers: Jun Rao <junrao@gmail.com>
This commit is contained in:
Colin Patrick McCabe 2021-07-15 11:41:51 -07:00 committed by GitHub
parent 3e3264760b
commit e07de97a4c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 1723 additions and 253 deletions

View File

@ -272,7 +272,7 @@
<suppress checks="ClassDataAbstractionCoupling"
files="(QuorumControllerTest|ReplicationControlManager|ReplicationControlManagerTest).java"/>
<suppress checks="ClassFanOutComplexity"
files="(QuorumController|ReplicationControlManager).java"/>
files="(QuorumController|ReplicationControlManager|ReplicationControlManagerTest).java"/>
<suppress checks="ParameterNumber"
files="(QuorumController).java"/>
<suppress checks="CyclomaticComplexity"

View File

@ -87,7 +87,7 @@ public enum ApiKeys {
ELECT_LEADERS(ApiMessageType.ELECT_LEADERS),
INCREMENTAL_ALTER_CONFIGS(ApiMessageType.INCREMENTAL_ALTER_CONFIGS, false, true),
ALTER_PARTITION_REASSIGNMENTS(ApiMessageType.ALTER_PARTITION_REASSIGNMENTS, false, true),
LIST_PARTITION_REASSIGNMENTS(ApiMessageType.LIST_PARTITION_REASSIGNMENTS),
LIST_PARTITION_REASSIGNMENTS(ApiMessageType.LIST_PARTITION_REASSIGNMENTS, false, true),
OFFSET_DELETE(ApiMessageType.OFFSET_DELETE),
DESCRIBE_CLIENT_QUOTAS(ApiMessageType.DESCRIBE_CLIENT_QUOTAS),
ALTER_CLIENT_QUOTAS(ApiMessageType.ALTER_CLIENT_QUOTAS, false, true),

View File

@ -16,7 +16,7 @@
{
"apiKey": 45,
"type": "request",
"listeners": ["zkBroker"],
"listeners": ["broker", "controller", "zkBroker"],
"name": "AlterPartitionReassignmentsRequest",
"validVersions": "0",
"flexibleVersions": "0+",

View File

@ -16,7 +16,7 @@
{
"apiKey": 46,
"type": "request",
"listeners": ["zkBroker"],
"listeners": ["broker", "controller", "zkBroker"],
"name": "ListPartitionReassignmentsRequest",
"validVersions": "0",
"flexibleVersions": "0+",

View File

@ -212,7 +212,7 @@ class KafkaApis(val requestChannel: RequestChannel,
case ApiKeys.ELECT_LEADERS => handleElectReplicaLeader(request)
case ApiKeys.INCREMENTAL_ALTER_CONFIGS => maybeForwardToController(request, handleIncrementalAlterConfigsRequest)
case ApiKeys.ALTER_PARTITION_REASSIGNMENTS => maybeForwardToController(request, handleAlterPartitionReassignmentsRequest)
case ApiKeys.LIST_PARTITION_REASSIGNMENTS => handleListPartitionReassignmentsRequest(request)
case ApiKeys.LIST_PARTITION_REASSIGNMENTS => maybeForwardToController(request, handleListPartitionReassignmentsRequest)
case ApiKeys.OFFSET_DELETE => handleOffsetDeleteRequest(request, requestLocal)
case ApiKeys.DESCRIBE_CLIENT_QUOTAS => handleDescribeClientQuotasRequest(request)
case ApiKeys.ALTER_CLIENT_QUOTAS => maybeForwardToController(request, handleAlterClientQuotasRequest)

View File

@ -21,7 +21,8 @@ import kafka.network.SocketServer
import kafka.server.IntegrationTestUtils.connectAndReceive
import kafka.testkit.{BrokerNode, KafkaClusterTestKit, TestKitNodes}
import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.{Admin, NewTopic}
import org.apache.kafka.clients.admin.{Admin, NewPartitionReassignment, NewTopic}
import org.apache.kafka.common.{TopicPartition, TopicPartitionInfo};
import org.apache.kafka.common.message.DescribeClusterRequestData
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent}
@ -31,7 +32,8 @@ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{Test, Timeout}
import java.util
import java.util.Collections
import java.util.{Arrays, Collections, Optional}
import scala.collection.mutable
import scala.concurrent.duration.{FiniteDuration, MILLISECONDS, SECONDS}
import scala.jdk.CollectionConverters._
@ -98,35 +100,14 @@ class RaftClusterTest {
val newTopic = Collections.singletonList(new NewTopic("test-topic", 1, 3.toShort))
val createTopicResult = admin.createTopics(newTopic)
createTopicResult.all().get()
// List created topic
TestUtils.waitUntilTrue(() => {
val listTopicsResult = admin.listTopics()
val result = listTopicsResult.names().get().size() == newTopic.size()
if (result) {
newTopic forEach(topic => {
assertTrue(listTopicsResult.names().get().contains(topic.name()))
})
}
result
}, "Topics created were not listed.")
waitForTopicListing(admin, Seq("test-topic"), Seq())
// Delete topic
val deleteResult = admin.deleteTopics(Collections.singletonList("test-topic"))
deleteResult.all().get()
// List again
TestUtils.waitUntilTrue(() => {
val listTopicsResult = admin.listTopics()
val result = listTopicsResult.names().get().size() != newTopic.size()
if (result) {
newTopic forEach(topic => {
assertFalse(listTopicsResult.names().get().contains(topic.name()))
})
}
result
}, "Topic was not removed from list.")
waitForTopicListing(admin, Seq(), Seq("test-topic"))
} finally {
admin.close()
}
@ -153,66 +134,14 @@ class RaftClusterTest {
try {
// Create many topics
val newTopic = new util.ArrayList[NewTopic]()
newTopic.add(new NewTopic("test-topic-1", 1, 3.toShort))
newTopic.add(new NewTopic("test-topic-2", 1, 3.toShort))
newTopic.add(new NewTopic("test-topic-3", 1, 3.toShort))
newTopic.add(new NewTopic("test-topic-1", 2, 3.toShort))
newTopic.add(new NewTopic("test-topic-2", 2, 3.toShort))
newTopic.add(new NewTopic("test-topic-3", 2, 3.toShort))
val createTopicResult = admin.createTopics(newTopic)
createTopicResult.all().get()
// List created topic
TestUtils.waitUntilTrue(() => {
val listTopicsResult = admin.listTopics()
val result = listTopicsResult.names().get().size() == newTopic.size()
if (result) {
newTopic forEach(topic => {
assertTrue(listTopicsResult.names().get().contains(topic.name()))
})
}
result
}, "Topics created were not listed.")
} finally {
admin.close()
}
} finally {
cluster.close()
}
}
@Test
def testCreateClusterAndCreateAndManyTopicsWithManyPartitions(): Unit = {
val cluster = new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder().
setNumBrokerNodes(3).
setNumControllerNodes(3).build()).build()
try {
cluster.format()
cluster.startup()
cluster.waitForReadyBrokers()
TestUtils.waitUntilTrue(() => cluster.brokers().get(0).currentState() == BrokerState.RUNNING,
"Broker never made it to RUNNING state.")
TestUtils.waitUntilTrue(() => cluster.raftManagers().get(0).kafkaRaftClient.leaderAndEpoch().leaderId.isPresent,
"RaftManager was not initialized.")
val admin = Admin.create(cluster.clientProperties())
try {
// Create many topics
val newTopic = new util.ArrayList[NewTopic]()
newTopic.add(new NewTopic("test-topic-1", 3, 3.toShort))
newTopic.add(new NewTopic("test-topic-2", 3, 3.toShort))
newTopic.add(new NewTopic("test-topic-3", 3, 3.toShort))
val createTopicResult = admin.createTopics(newTopic)
createTopicResult.all().get()
// List created topic
TestUtils.waitUntilTrue(() => {
val listTopicsResult = admin.listTopics()
val result = listTopicsResult.names().get().size() == newTopic.size()
if (result) {
newTopic forEach(topic => {
assertTrue(listTopicsResult.names().get().contains(topic.name()))
})
}
result
}, "Topics created were not listed.")
waitForTopicListing(admin, Seq("test-topic-1", "test-topic-2", "test-topic-3"), Seq())
} finally {
admin.close()
}
@ -422,4 +351,78 @@ class RaftClusterTest {
listenerName = listenerName
)
@Test
def testCreateClusterAndPerformReassignment(): Unit = {
val cluster = new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder().
setNumBrokerNodes(4).
setNumControllerNodes(3).build()).build()
try {
cluster.format()
cluster.startup()
cluster.waitForReadyBrokers()
val admin = Admin.create(cluster.clientProperties())
try {
// Create the topic.
val assignments = new util.HashMap[Integer, util.List[Integer]]
assignments.put(0, Arrays.asList(0, 1, 2))
assignments.put(1, Arrays.asList(1, 2, 3))
assignments.put(2, Arrays.asList(2, 3, 0))
assignments.put(3, Arrays.asList(3, 2, 1))
val createTopicResult = admin.createTopics(Collections.singletonList(
new NewTopic("foo", assignments)))
createTopicResult.all().get()
waitForTopicListing(admin, Seq("foo"), Seq())
// Start some reassignments.
assertEquals(Collections.emptyMap(), admin.listPartitionReassignments().reassignments().get())
val reassignments = new util.HashMap[TopicPartition, Optional[NewPartitionReassignment]]
reassignments.put(new TopicPartition("foo", 0),
Optional.of(new NewPartitionReassignment(Arrays.asList(2, 1, 0))))
reassignments.put(new TopicPartition("foo", 1),
Optional.of(new NewPartitionReassignment(Arrays.asList(0, 1, 2))))
reassignments.put(new TopicPartition("foo", 2),
Optional.of(new NewPartitionReassignment(Arrays.asList(2, 3))))
reassignments.put(new TopicPartition("foo", 3),
Optional.of(new NewPartitionReassignment(Arrays.asList(3, 2, 0, 1))))
admin.alterPartitionReassignments(reassignments).all().get()
TestUtils.waitUntilTrue(
() => admin.listPartitionReassignments().reassignments().get().isEmpty(),
"The reassignment never completed.")
var currentMapping: Seq[Seq[Int]] = Seq()
val expectedMapping = Seq(Seq(2, 1, 0), Seq(0, 1, 2), Seq(2, 3), Seq(3, 2, 0, 1))
TestUtils.waitUntilTrue( () => {
val topicInfoMap = admin.describeTopics(Collections.singleton("foo")).all().get()
if (topicInfoMap.containsKey("foo")) {
currentMapping = translatePartitionInfoToSeq(topicInfoMap.get("foo").partitions())
expectedMapping.equals(currentMapping)
} else {
false
}
}, "Timed out waiting for replica assignments for topic foo. " +
s"Wanted: ${expectedMapping}. Got: ${currentMapping}")
} finally {
admin.close()
}
} finally {
cluster.close()
}
}
private def translatePartitionInfoToSeq(partitions: util.List[TopicPartitionInfo]): Seq[Seq[Int]] = {
partitions.asScala.map(partition => partition.replicas().asScala.map(_.id()).toSeq).toSeq
}
private def waitForTopicListing(admin: Admin,
expectedPresent: Seq[String],
expectedAbsent: Seq[String]): Unit = {
val topicsNotFound = new util.HashSet[String]
var extraTopics: mutable.Set[String] = null
expectedPresent.foreach(topicsNotFound.add(_))
TestUtils.waitUntilTrue(() => {
admin.listTopics().names().get().forEach(name => topicsNotFound.remove(name))
extraTopics = admin.listTopics().names().get().asScala.filter(expectedAbsent.contains(_))
topicsNotFound.isEmpty && extraTopics.isEmpty
}, s"Failed to find topic(s): ${topicsNotFound.asScala} and NOT find topic(s): ${extraTopics}")
}
}

View File

@ -0,0 +1,243 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.controller;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.metadata.Replicas;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.Supplier;
import static org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_CHANGE_RECORD;
import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
/**
* PartitionChangeBuilder handles changing partition registrations.
*/
public class PartitionChangeBuilder {
public static boolean changeRecordIsNoOp(PartitionChangeRecord record) {
if (record.isr() != null) return false;
if (record.leader() != NO_LEADER_CHANGE) return false;
if (record.replicas() != null) return false;
if (record.removingReplicas() != null) return false;
if (record.addingReplicas() != null) return false;
return true;
}
private final PartitionRegistration partition;
private final Uuid topicId;
private final int partitionId;
private final Function<Integer, Boolean> isAcceptableLeader;
private final Supplier<Boolean> uncleanElectionOk;
private List<Integer> targetIsr;
private List<Integer> targetReplicas;
private List<Integer> targetRemoving;
private List<Integer> targetAdding;
private boolean alwaysElectPreferredIfPossible;
public PartitionChangeBuilder(PartitionRegistration partition,
Uuid topicId,
int partitionId,
Function<Integer, Boolean> isAcceptableLeader,
Supplier<Boolean> uncleanElectionOk) {
this.partition = partition;
this.topicId = topicId;
this.partitionId = partitionId;
this.isAcceptableLeader = isAcceptableLeader;
this.uncleanElectionOk = uncleanElectionOk;
this.targetIsr = Replicas.toList(partition.isr);
this.targetReplicas = Replicas.toList(partition.replicas);
this.targetRemoving = Replicas.toList(partition.removingReplicas);
this.targetAdding = Replicas.toList(partition.addingReplicas);
this.alwaysElectPreferredIfPossible = false;
}
public PartitionChangeBuilder setTargetIsr(List<Integer> targetIsr) {
this.targetIsr = targetIsr;
return this;
}
public PartitionChangeBuilder setTargetReplicas(List<Integer> targetReplicas) {
this.targetReplicas = targetReplicas;
return this;
}
public PartitionChangeBuilder setAlwaysElectPreferredIfPossible(boolean alwaysElectPreferredIfPossible) {
this.alwaysElectPreferredIfPossible = alwaysElectPreferredIfPossible;
return this;
}
public PartitionChangeBuilder setTargetRemoving(List<Integer> targetRemoving) {
this.targetRemoving = targetRemoving;
return this;
}
public PartitionChangeBuilder setTargetAdding(List<Integer> targetAdding) {
this.targetAdding = targetAdding;
return this;
}
boolean shouldTryElection() {
// If the new isr doesn't have the current leader, we need to try to elect a new
// one. Note: this also handles the case where the current leader is NO_LEADER,
// since that value cannot appear in targetIsr.
if (!targetIsr.contains(partition.leader)) return true;
// Check if we want to try to get away from a non-preferred leader.
if (alwaysElectPreferredIfPossible && !partition.hasPreferredLeader()) return true;
return false;
}
class BestLeader {
final int node;
final boolean unclean;
BestLeader() {
for (int replica : targetReplicas) {
if (targetIsr.contains(replica) && isAcceptableLeader.apply(replica)) {
this.node = replica;
this.unclean = false;
return;
}
}
if (uncleanElectionOk.get()) {
for (int replica : targetReplicas) {
if (isAcceptableLeader.apply(replica)) {
this.node = replica;
this.unclean = true;
return;
}
}
}
this.node = NO_LEADER;
this.unclean = false;
}
}
private void tryElection(PartitionChangeRecord record) {
BestLeader bestLeader = new BestLeader();
if (bestLeader.node != partition.leader) {
record.setLeader(bestLeader.node);
if (bestLeader.unclean) {
// If the election was unclean, we have to forcibly set the ISR to just the
// new leader. This can result in data loss!
record.setIsr(Collections.singletonList(bestLeader.node));
}
}
}
/**
* Trigger a leader epoch bump if one is needed.
*
* We need to bump the leader epoch if:
* 1. The leader changed, or
* 2. The new ISR does not contain all the nodes that the old ISR did, or
* 3. The new replia list does not contain all the nodes that the old replia list did.
*
* Changes that do NOT fall in any of these categories will increase the partition epoch, but
* not the leader epoch. Note that if the leader epoch increases, the partition epoch will
* always increase as well; there is no case where the partition epoch increases more slowly
* than the leader epoch.
*
* If the PartitionChangeRecord sets the leader field to something other than
* NO_LEADER_CHANGE, a leader epoch bump will automatically occur. That takes care of
* case 1. In this function, we check for cases 2 and 3, and handle them by manually
* setting record.leader to the current leader.
*/
void triggerLeaderEpochBumpIfNeeded(PartitionChangeRecord record) {
if (record.leader() == NO_LEADER_CHANGE) {
if (!Replicas.contains(targetIsr, partition.isr) ||
!Replicas.contains(targetReplicas, partition.replicas)) {
record.setLeader(partition.leader);
}
}
}
private void completeReassignmentIfNeeded() {
// Check if there is a reassignment to complete.
if (targetRemoving.isEmpty() && targetAdding.isEmpty()) return;
List<Integer> newTargetIsr = targetIsr;
List<Integer> newTargetReplicas = targetReplicas;
if (!targetRemoving.isEmpty()) {
newTargetIsr = new ArrayList<>(targetIsr.size());
for (int replica : targetIsr) {
if (!targetRemoving.contains(replica)) {
newTargetIsr.add(replica);
}
}
if (newTargetIsr.isEmpty()) return;
newTargetReplicas = new ArrayList<>(targetReplicas.size());
for (int replica : targetReplicas) {
if (!targetRemoving.contains(replica)) {
newTargetReplicas.add(replica);
}
}
if (newTargetReplicas.isEmpty()) return;
}
for (int replica : targetAdding) {
if (!newTargetIsr.contains(replica)) return;
}
targetIsr = newTargetIsr;
targetReplicas = newTargetReplicas;
targetRemoving = Collections.emptyList();
targetAdding = Collections.emptyList();
}
public Optional<ApiMessageAndVersion> build() {
PartitionChangeRecord record = new PartitionChangeRecord().
setTopicId(topicId).
setPartitionId(partitionId);
completeReassignmentIfNeeded();
if (shouldTryElection()) {
tryElection(record);
}
triggerLeaderEpochBumpIfNeeded(record);
if (!targetIsr.isEmpty() && !targetIsr.equals(Replicas.toList(partition.isr))) {
record.setIsr(targetIsr);
}
if (!targetReplicas.isEmpty() && !targetReplicas.equals(Replicas.toList(partition.replicas))) {
record.setReplicas(targetReplicas);
}
if (!targetRemoving.equals(Replicas.toList(partition.removingReplicas))) {
record.setRemovingReplicas(targetRemoving);
}
if (!targetAdding.equals(Replicas.toList(partition.addingReplicas))) {
record.setAddingReplicas(targetAdding);
}
if (changeRecordIsNoOp(record)) {
return Optional.empty();
} else {
return Optional.of(new ApiMessageAndVersion(record,
PARTITION_CHANGE_RECORD.highestSupportedVersion()));
}
}
}

View File

@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.controller;
import java.util.ArrayList;
import java.util.Set;
import java.util.List;
import java.util.Objects;
import java.util.TreeSet;
class PartitionReassignmentReplicas {
private final List<Integer> removing;
private final List<Integer> adding;
private final List<Integer> merged;
private static Set<Integer> calculateDifference(List<Integer> a, List<Integer> b) {
Set<Integer> result = new TreeSet<>(a);
result.removeAll(b);
return result;
}
PartitionReassignmentReplicas(List<Integer> currentReplicas,
List<Integer> targetReplicas) {
Set<Integer> removing = calculateDifference(currentReplicas, targetReplicas);
this.removing = new ArrayList<>(removing);
Set<Integer> adding = calculateDifference(targetReplicas, currentReplicas);
this.adding = new ArrayList<>(adding);
this.merged = new ArrayList<>(targetReplicas);
this.merged.addAll(removing);
}
List<Integer> removing() {
return removing;
}
List<Integer> adding() {
return adding;
}
List<Integer> merged() {
return merged;
}
@Override
public int hashCode() {
return Objects.hash(removing, adding, merged);
}
@Override
public boolean equals(Object o) {
if (!(o instanceof PartitionReassignmentReplicas)) return false;
PartitionReassignmentReplicas other = (PartitionReassignmentReplicas) o;
return removing.equals(other.removing) &&
adding.equals(other.adding) &&
merged.equals(other.merged);
}
@Override
public String toString() {
return "PartitionReassignmentReplicas(" +
"removing=" + removing + ", " +
"adding=" + adding + ", " +
"merged=" + merged + ")";
}
}

View File

@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.controller;
import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.metadata.Replicas;
import java.util.ArrayList;
import java.util.Set;
import java.util.List;
import java.util.Objects;
class PartitionReassignmentRevert {
private final List<Integer> replicas;
private final List<Integer> isr;
private final boolean unclean;
PartitionReassignmentRevert(PartitionRegistration registration) {
// Figure out the replica list and ISR that we will have after reverting the
// reassignment. In general, we want to take out any replica that the reassignment
// was adding, but keep the ones the reassignment was removing. (But see the
// special case below.)
Set<Integer> adding = Replicas.toSet(registration.addingReplicas);
this.replicas = new ArrayList<>(registration.replicas.length);
this.isr = new ArrayList<>(registration.isr.length);
for (int i = 0; i < registration.isr.length; i++) {
int replica = registration.isr[i];
if (!adding.contains(replica)) {
this.isr.add(replica);
}
}
for (int replica : registration.replicas) {
if (!adding.contains(replica)) {
this.replicas.add(replica);
}
}
if (isr.isEmpty()) {
// In the special case that all the replicas that are in the ISR are also
// contained in addingReplicas, we choose the first remaining replica and add
// it to the ISR. This is considered an unclean leader election. Therefore,
// calling code must check that unclean leader election is enabled before
// accepting the new ISR.
if (this.replicas.isEmpty()) {
// This should not be reachable, since it would require a partition
// starting with an empty replica set prior to the reassignment we are
// trying to revert.
throw new InvalidReplicaAssignmentException("Invalid replica " +
"assignment: addingReplicas contains all replicas.");
}
isr.add(replicas.get(0));
this.unclean = true;
} else {
this.unclean = false;
}
}
List<Integer> replicas() {
return replicas;
}
List<Integer> isr() {
return isr;
}
boolean unclean() {
return unclean;
}
@Override
public int hashCode() {
return Objects.hash(replicas, isr);
}
@Override
public boolean equals(Object o) {
if (!(o instanceof PartitionReassignmentRevert)) return false;
PartitionReassignmentRevert other = (PartitionReassignmentRevert) o;
return replicas.equals(other.replicas) &&
isr.equals(other.isr);
}
@Override
public String toString() {
return "PartitionReassignmentRevert(" +
"replicas=" + replicas + ", " +
"isr=" + isr + ")";
}
}

View File

@ -1187,19 +1187,19 @@ public final class QuorumController implements Controller {
}
return appendWriteEvent("alterPartitionReassignments",
time.nanoseconds() + NANOSECONDS.convert(request.timeoutMs(), MILLISECONDS),
() -> {
throw new UnsupportedOperationException();
});
() -> replicationControl.alterPartitionReassignments(request));
}
@Override
public CompletableFuture<ListPartitionReassignmentsResponseData>
listPartitionReassignments(ListPartitionReassignmentsRequestData request) {
if (request.topics() != null && request.topics().isEmpty()) {
return CompletableFuture.completedFuture(
new ListPartitionReassignmentsResponseData().setErrorMessage(null));
}
return appendReadEvent("listPartitionReassignments",
time.nanoseconds() + NANOSECONDS.convert(request.timeoutMs(), MILLISECONDS),
() -> {
throw new UnsupportedOperationException();
});
() -> replicationControl.listPartitionReassignments(request.topics()));
}
@Override

View File

@ -28,12 +28,19 @@ import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
import org.apache.kafka.common.errors.InvalidReplicationFactorException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.NoReassignmentInProgressException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicIdException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.message.AlterIsrRequestData;
import org.apache.kafka.common.message.AlterIsrResponseData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData.ReassignablePartition;
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData.ReassignableTopic;
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse;
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignableTopicResponse;
import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsAssignment;
import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic;
@ -49,6 +56,10 @@ import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions;
import org.apache.kafka.common.message.ElectLeadersResponseData;
import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData.ListPartitionReassignmentsTopics;
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingPartitionReassignment;
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingTopicReassignment;
import org.apache.kafka.common.metadata.FenceBrokerRecord;
import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
@ -72,9 +83,9 @@ import org.slf4j.Logger;
import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
import java.util.function.Function;
import java.util.HashMap;
import java.util.Iterator;
@ -89,13 +100,15 @@ import java.util.stream.Collectors;
import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
import static org.apache.kafka.common.metadata.MetadataRecordType.FENCE_BROKER_RECORD;
import static org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_CHANGE_RECORD;
import static org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_RECORD;
import static org.apache.kafka.common.metadata.MetadataRecordType.REMOVE_TOPIC_RECORD;
import static org.apache.kafka.common.metadata.MetadataRecordType.TOPIC_RECORD;
import static org.apache.kafka.common.metadata.MetadataRecordType.UNFENCE_BROKER_RECORD;
import static org.apache.kafka.common.metadata.MetadataRecordType.UNREGISTER_BROKER_RECORD;
import static org.apache.kafka.common.protocol.Errors.FENCED_LEADER_EPOCH;
import static org.apache.kafka.common.protocol.Errors.INVALID_REQUEST;
import static org.apache.kafka.common.protocol.Errors.INVALID_UPDATE_VERSION;
import static org.apache.kafka.common.protocol.Errors.NO_REASSIGNMENT_IN_PROGRESS;
import static org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_ID;
import static org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_OR_PARTITION;
import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
@ -176,6 +189,11 @@ public class ReplicationControlManager {
*/
private final BrokersToIsrs brokersToIsrs;
/**
* A map from topic IDs to the partitions in the topic which are reassigning.
*/
private final TimelineHashMap<Uuid, int[]> reassigningTopics;
ReplicationControlManager(SnapshotRegistry snapshotRegistry,
LogContext logContext,
short defaultReplicationFactor,
@ -195,6 +213,7 @@ public class ReplicationControlManager {
this.topicsByName = new TimelineHashMap<>(snapshotRegistry, 0);
this.topics = new TimelineHashMap<>(snapshotRegistry, 0);
this.brokersToIsrs = new BrokersToIsrs(snapshotRegistry);
this.reassigningTopics = new TimelineHashMap<>(snapshotRegistry, 0);
}
public void replay(TopicRecord record) {
@ -222,11 +241,15 @@ public class ReplicationControlManager {
newPartInfo.isr, NO_LEADER, newPartInfo.leader);
globalPartitionCount.increment();
controllerMetrics.setGlobalPartitionCount(globalPartitionCount.get());
updateReassigningTopicsIfNeeded(record.topicId(), record.partitionId(),
false, newPartInfo.isReassigning());
} else if (!newPartInfo.equals(prevPartInfo)) {
newPartInfo.maybeLogPartitionChange(log, description, prevPartInfo);
topicInfo.parts.put(record.partitionId(), newPartInfo);
brokersToIsrs.update(record.topicId(), record.partitionId(), prevPartInfo.isr,
newPartInfo.isr, prevPartInfo.leader, newPartInfo.leader);
updateReassigningTopicsIfNeeded(record.topicId(), record.partitionId(),
prevPartInfo.isReassigning(), newPartInfo.isReassigning());
}
if (newPartInfo.leader != newPartInfo.preferredReplica()) {
preferredReplicaImbalanceCount.increment();
@ -235,6 +258,24 @@ public class ReplicationControlManager {
controllerMetrics.setPreferredReplicaImbalanceCount(preferredReplicaImbalanceCount.get());
}
private void updateReassigningTopicsIfNeeded(Uuid topicId, int partitionId,
boolean wasReassigning, boolean isReassigning) {
if (!wasReassigning) {
if (isReassigning) {
int[] prevReassigningParts = reassigningTopics.getOrDefault(topicId, Replicas.NONE);
reassigningTopics.put(topicId, Replicas.copyWith(prevReassigningParts, partitionId));
}
} else if (!isReassigning) {
int[] prevReassigningParts = reassigningTopics.getOrDefault(topicId, Replicas.NONE);
int[] newReassigningParts = Replicas.copyWithout(prevReassigningParts, partitionId);
if (newReassigningParts.length == 0) {
reassigningTopics.remove(topicId);
} else {
reassigningTopics.put(topicId, newReassigningParts);
}
}
}
public void replay(PartitionChangeRecord record) {
TopicControlInfo topicInfo = topics.get(record.topicId());
if (topicInfo == null) {
@ -247,6 +288,8 @@ public class ReplicationControlManager {
":" + record.partitionId() + ", but no partition with that id was found.");
}
PartitionRegistration newPartitionInfo = prevPartitionInfo.merge(record);
updateReassigningTopicsIfNeeded(record.topicId(), record.partitionId(),
prevPartitionInfo.isReassigning(), newPartitionInfo.isReassigning());
topicInfo.parts.put(record.partitionId(), newPartitionInfo);
brokersToIsrs.update(record.topicId(), record.partitionId(),
prevPartitionInfo.isr, newPartitionInfo.isr, prevPartitionInfo.leader,
@ -259,6 +302,11 @@ public class ReplicationControlManager {
}
controllerMetrics.setOfflinePartitionCount(brokersToIsrs.offlinePartitionCount());
controllerMetrics.setPreferredReplicaImbalanceCount(preferredReplicaImbalanceCount.get());
if (record.removingReplicas() != null || record.addingReplicas() != null) {
log.info("Replayed partition assignment change {} for topic {}", record, topicInfo.name);
} else if (log.isTraceEnabled()) {
log.trace("Replayed partition change {} for topic {}", record, topicInfo.name);
}
}
public void replay(RemoveTopicRecord record) {
@ -269,6 +317,7 @@ public class ReplicationControlManager {
" to remove.");
}
topicsByName.remove(topic.name);
reassigningTopics.remove(record.topicId());
// Delete the configurations associated with this topic.
configurationControl.deleteTopicConfigs(topic.name);
@ -557,60 +606,122 @@ public class ReplicationControlManager {
setPartitionIndex(partitionData.partitionIndex()).
setErrorCode(UNKNOWN_TOPIC_OR_PARTITION.code()));
}
log.info("Rejecting alterIsr request for unknown topic ID {}.", topicId);
continue;
}
TopicControlInfo topic = topics.get(topicId);
for (AlterIsrRequestData.PartitionData partitionData : topicData.partitions()) {
PartitionRegistration partition = topic.parts.get(partitionData.partitionIndex());
int partitionId = partitionData.partitionIndex();
PartitionRegistration partition = topic.parts.get(partitionId);
if (partition == null) {
responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData().
setPartitionIndex(partitionData.partitionIndex()).
setPartitionIndex(partitionId).
setErrorCode(UNKNOWN_TOPIC_OR_PARTITION.code()));
continue;
}
if (request.brokerId() != partition.leader) {
responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData().
setPartitionIndex(partitionData.partitionIndex()).
setErrorCode(INVALID_REQUEST.code()));
log.info("Rejecting alterIsr request for unknown partition {}-{}.",
topic.name, partitionId);
continue;
}
if (partitionData.leaderEpoch() != partition.leaderEpoch) {
responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData().
setPartitionIndex(partitionData.partitionIndex()).
setErrorCode(Errors.FENCED_LEADER_EPOCH.code()));
setPartitionIndex(partitionId).
setErrorCode(FENCED_LEADER_EPOCH.code()));
log.debug("Rejecting alterIsr request from node {} for {}-{} because " +
"the current leader epoch is {}, not {}.", request.brokerId(), topic.name,
partitionId, partition.leaderEpoch, partitionData.leaderEpoch());
continue;
}
if (request.brokerId() != partition.leader) {
responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData().
setPartitionIndex(partitionId).
setErrorCode(INVALID_REQUEST.code()));
log.info("Rejecting alterIsr request from node {} for {}-{} because " +
"the current leader is {}.", request.brokerId(), topic.name,
partitionId, partition.leader);
continue;
}
if (partitionData.currentIsrVersion() != partition.partitionEpoch) {
responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData().
setPartitionIndex(partitionData.partitionIndex()).
setErrorCode(Errors.INVALID_UPDATE_VERSION.code()));
setPartitionIndex(partitionId).
setErrorCode(INVALID_UPDATE_VERSION.code()));
log.info("Rejecting alterIsr request from node {} for {}-{} because " +
"the current partition epoch is {}, not {}.", request.brokerId(),
topic.name, partitionId, partition.partitionEpoch,
partitionData.currentIsrVersion());
continue;
}
int[] newIsr = Replicas.toArray(partitionData.newIsr());
if (!Replicas.validateIsr(partition.replicas, newIsr)) {
responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData().
setPartitionIndex(partitionData.partitionIndex()).
setPartitionIndex(partitionId).
setErrorCode(INVALID_REQUEST.code()));
log.error("Rejecting alterIsr request from node {} for {}-{} because " +
"it specified an invalid ISR {}.", request.brokerId(),
topic.name, partitionId, partitionData.newIsr());
continue;
}
if (!Replicas.contains(newIsr, partition.leader)) {
// An alterIsr request can't remove the current leader.
// An alterIsr request can't ask for the current leader to be removed.
responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData().
setPartitionIndex(partitionData.partitionIndex()).
setPartitionIndex(partitionId).
setErrorCode(INVALID_REQUEST.code()));
log.error("Rejecting alterIsr request from node {} for {}-{} because " +
"it specified an invalid ISR {} that doesn't include itself.",
request.brokerId(), topic.name, partitionId, partitionData.newIsr());
continue;
}
records.add(new ApiMessageAndVersion(new PartitionChangeRecord().
setPartitionId(partitionData.partitionIndex()).
setTopicId(topic.id).
setIsr(partitionData.newIsr()), PARTITION_CHANGE_RECORD.highestSupportedVersion()));
// At this point, we have decided to perform the ISR change. We use
// PartitionChangeBuilder to find out what its effect will be.
PartitionChangeBuilder builder = new PartitionChangeBuilder(partition,
topic.id,
partitionId,
r -> clusterControl.unfenced(r),
() -> configurationControl.uncleanLeaderElectionEnabledForTopic(topicData.name()));
builder.setTargetIsr(partitionData.newIsr());
Optional<ApiMessageAndVersion> record = builder.build();
Errors result = Errors.NONE;
if (record.isPresent()) {
records.add(record.get());
PartitionChangeRecord change = (PartitionChangeRecord) record.get().message();
partition = partition.merge(change);
if (log.isDebugEnabled()) {
log.debug("Node {} has altered ISR for {}-{} to {}.",
request.brokerId(), topic.name, partitionId, change.isr());
}
if (change.leader() != request.brokerId() &&
change.leader() != NO_LEADER_CHANGE) {
// Normally, an alterIsr request, which is made by the partition
// leader itself, is not allowed to modify the partition leader.
// However, if there is an ongoing partition reassignment and the
// ISR change completes it, then the leader may change as part of
// the changes made during reassignment cleanup.
//
// In this case, we report back FENCED_LEADER_EPOCH to the leader
// which made the alterIsr request. This lets it know that it must
// fetch new metadata before trying again. This return code is
// unusual because we both return an error and generate a new
// metadata record. We usually only do one or the other.
log.info("AlterIsr request from node {} for {}-{} completed " +
"the ongoing partition reassignment and triggered a " +
"leadership change. Reutrning FENCED_LEADER_EPOCH.",
request.brokerId(), topic.name, partitionId);
responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData().
setPartitionIndex(partitionId).
setErrorCode(FENCED_LEADER_EPOCH.code()));
continue;
} else if (change.removingReplicas() != null ||
change.addingReplicas() != null) {
log.info("AlterIsr request from node {} for {}-{} completed " +
"the ongoing partition reassignment.", request.brokerId(),
topic.name, partitionId);
}
}
responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData().
setPartitionIndex(partitionData.partitionIndex()).
setErrorCode(Errors.NONE.code()).
setPartitionIndex(partitionId).
setErrorCode(result.code()).
setLeaderId(partition.leader).
setLeaderEpoch(partition.leaderEpoch).
setCurrentIsrVersion(partition.partitionEpoch + 1).
setIsr(partitionData.newIsr()));
setCurrentIsrVersion(partition.partitionEpoch).
setIsr(Replicas.toList(partition.isr)));
}
}
return ControllerResult.of(records, response);
@ -740,39 +851,29 @@ public class ReplicationControlManager {
return new ApiError(UNKNOWN_TOPIC_OR_PARTITION,
"No such topic id as " + topicId);
}
PartitionRegistration partitionInfo = topicInfo.parts.get(partitionId);
if (partitionInfo == null) {
PartitionRegistration partition = topicInfo.parts.get(partitionId);
if (partition == null) {
return new ApiError(UNKNOWN_TOPIC_OR_PARTITION,
"No such partition as " + topic + "-" + partitionId);
}
int newLeader = bestLeader(partitionInfo.replicas, partitionInfo.isr, uncleanOk,
r -> clusterControl.unfenced(r));
if (newLeader == NO_LEADER) {
// If we can't find any leader for the partition, return an error.
return new ApiError(Errors.LEADER_NOT_AVAILABLE,
"Unable to find any leader for the partition.");
PartitionChangeBuilder builder = new PartitionChangeBuilder(partition,
topicId,
partitionId,
r -> clusterControl.unfenced(r),
() -> uncleanOk || configurationControl.uncleanLeaderElectionEnabledForTopic(topic));
builder.setAlwaysElectPreferredIfPossible(true);
Optional<ApiMessageAndVersion> record = builder.build();
if (!record.isPresent()) {
if (partition.leader == NO_LEADER) {
// If we can't find any leader for the partition, return an error.
return new ApiError(Errors.LEADER_NOT_AVAILABLE,
"Unable to find any leader for the partition.");
} else {
// There is nothing to do.
return ApiError.NONE;
}
}
if (newLeader == partitionInfo.leader) {
// If the new leader we picked is the same as the current leader, there is
// nothing to do.
return ApiError.NONE;
}
if (partitionInfo.hasLeader() && newLeader != partitionInfo.preferredReplica()) {
// It is not worth moving away from a valid leader to a new leader unless the
// new leader is the preferred replica.
return ApiError.NONE;
}
PartitionChangeRecord record = new PartitionChangeRecord().
setPartitionId(partitionId).
setTopicId(topicId).
setLeader(newLeader);
if (!PartitionRegistration.electionWasClean(newLeader, partitionInfo.isr)) {
// If the election was unclean, we have to forcibly set the ISR to just the
// new leader. This can result in data loss!
record.setIsr(Collections.singletonList(newLeader));
}
records.add(new ApiMessageAndVersion(record,
PARTITION_CHANGE_RECORD.highestSupportedVersion()));
records.add(record.get());
return ApiError.NONE;
}
@ -813,25 +914,6 @@ public class ReplicationControlManager {
return ControllerResult.of(records, reply);
}
static boolean isGoodLeader(int[] isr, int leader) {
return Replicas.contains(isr, leader);
}
static int bestLeader(int[] replicas, int[] isr, boolean uncleanOk,
Function<Integer, Boolean> isAcceptableLeader) {
int bestUnclean = NO_LEADER;
for (int i = 0; i < replicas.length; i++) {
int replica = replicas[i];
if (isAcceptableLeader.apply(replica)) {
if (bestUnclean == NO_LEADER) bestUnclean = replica;
if (Replicas.contains(isr, replica)) {
return replica;
}
}
}
return uncleanOk ? bestUnclean : NO_LEADER;
}
public ControllerResult<Void> unregisterBroker(int brokerId) {
BrokerRegistration registration = clusterControl.brokerRegistrations().get(brokerId);
if (registration == null) {
@ -1008,8 +1090,22 @@ public class ReplicationControlManager {
List<ApiMessageAndVersion> records,
Iterator<TopicIdPartition> iterator) {
int oldSize = records.size();
// If the caller passed a valid broker ID for brokerToAdd, rather than passing
// NO_LEADER, that node will be considered an acceptable leader even if it is
// currently fenced. This is useful when handling unfencing. The reason is that
// while we're generating the records to handle unfencing, the ClusterControlManager
// still shows the node as fenced.
//
// Similarly, if the caller passed a valid broker ID for brokerToRemove, rather
// than passing NO_LEADER, that node will never be considered an acceptable leader.
// This is useful when handling a newly fenced node. We also exclude brokerToRemove
// from the target ISR, but we need to exclude it here too, to handle the case
// where there is an unclean leader election which chooses a leader from outside
// the ISR.
Function<Integer, Boolean> isAcceptableLeader =
r -> (r != brokerToRemove) && (r == brokerToAdd || clusterControl.unfenced(r));
while (iterator.hasNext()) {
TopicIdPartition topicIdPart = iterator.next();
TopicControlInfo topic = topics.get(topicIdPart.topicId());
@ -1022,32 +1118,18 @@ public class ReplicationControlManager {
throw new RuntimeException("Partition " + topicIdPart +
" existed in isrMembers, but not in the partitions map.");
}
int[] newIsr = Replicas.copyWithout(partition.isr, brokerToRemove);
int newLeader;
if (isGoodLeader(newIsr, partition.leader)) {
// If the current leader is good, don't change.
newLeader = partition.leader;
} else {
// Choose a new leader.
boolean uncleanOk = configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name);
newLeader = bestLeader(partition.replicas, newIsr, uncleanOk, isAcceptableLeader);
}
if (!PartitionRegistration.electionWasClean(newLeader, newIsr)) {
// After an unclean leader election, the ISR is reset to just the new leader.
newIsr = new int[] {newLeader};
} else if (newIsr.length == 0) {
// We never want to shrink the ISR to size 0.
newIsr = partition.isr;
}
PartitionChangeRecord record = new PartitionChangeRecord().
setPartitionId(topicIdPart.partitionId()).
setTopicId(topic.id);
if (newLeader != partition.leader) record.setLeader(newLeader);
if (!Arrays.equals(newIsr, partition.isr)) record.setIsr(Replicas.toList(newIsr));
if (record.leader() != NO_LEADER_CHANGE || record.isr() != null) {
records.add(new ApiMessageAndVersion(record,
PARTITION_CHANGE_RECORD.highestSupportedVersion()));
}
PartitionChangeBuilder builder = new PartitionChangeBuilder(partition,
topicIdPart.topicId(),
topicIdPart.partitionId(),
isAcceptableLeader,
() -> configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name));
// Note: if brokerToRemove was passed as NO_LEADER, this is a no-op (the new
// target ISR will be the same as the old one).
builder.setTargetIsr(Replicas.toList(
Replicas.copyWithout(partition.isr, brokerToRemove)));
builder.build().ifPresent(records::add);
}
if (records.size() != oldSize) {
if (log.isDebugEnabled()) {
@ -1068,6 +1150,200 @@ public class ReplicationControlManager {
}
}
ControllerResult<AlterPartitionReassignmentsResponseData>
alterPartitionReassignments(AlterPartitionReassignmentsRequestData request) {
List<ApiMessageAndVersion> records = new ArrayList<>();
AlterPartitionReassignmentsResponseData result =
new AlterPartitionReassignmentsResponseData().setErrorMessage(null);
int successfulAlterations = 0, totalAlterations = 0;
for (ReassignableTopic topic : request.topics()) {
ReassignableTopicResponse topicResponse = new ReassignableTopicResponse().
setName(topic.name());
for (ReassignablePartition partition : topic.partitions()) {
ApiError error = ApiError.NONE;
try {
alterPartitionReassignment(topic.name(), partition, records);
successfulAlterations++;
} catch (Throwable e) {
log.info("Unable to alter partition reassignment for " +
topic.name() + ":" + partition.partitionIndex() + " because " +
"of an " + e.getClass().getSimpleName() + " error: " + e.getMessage());
error = ApiError.fromThrowable(e);
}
totalAlterations++;
topicResponse.partitions().add(new ReassignablePartitionResponse().
setPartitionIndex(partition.partitionIndex()).
setErrorCode(error.error().code()).
setErrorMessage(error.message()));
}
result.responses().add(topicResponse);
}
log.info("Successfully altered {} out of {} partition reassignment(s).",
successfulAlterations, totalAlterations);
return ControllerResult.atomicOf(records, result);
}
void alterPartitionReassignment(String topicName,
ReassignablePartition target,
List<ApiMessageAndVersion> records) {
Uuid topicId = topicsByName.get(topicName);
if (topicId == null) {
throw new UnknownTopicOrPartitionException("Unable to find a topic " +
"named " + topicName + ".");
}
TopicControlInfo topicInfo = topics.get(topicId);
if (topicInfo == null) {
throw new UnknownTopicOrPartitionException("Unable to find a topic " +
"with ID " + topicId + ".");
}
TopicIdPartition tp = new TopicIdPartition(topicId, target.partitionIndex());
PartitionRegistration part = topicInfo.parts.get(target.partitionIndex());
if (part == null) {
throw new UnknownTopicOrPartitionException("Unable to find partition " +
topicName + ":" + target.partitionIndex() + ".");
}
Optional<ApiMessageAndVersion> record;
if (target.replicas() == null) {
record = cancelPartitionReassignment(topicName, tp, part);
} else {
record = changePartitionReassignment(tp, part, target);
}
record.ifPresent(records::add);
}
Optional<ApiMessageAndVersion> cancelPartitionReassignment(String topicName,
TopicIdPartition tp,
PartitionRegistration part) {
if (!part.isReassigning()) {
throw new NoReassignmentInProgressException(NO_REASSIGNMENT_IN_PROGRESS.message());
}
PartitionReassignmentRevert revert = new PartitionReassignmentRevert(part);
if (revert.unclean()) {
if (!configurationControl.uncleanLeaderElectionEnabledForTopic(topicName)) {
throw new InvalidReplicaAssignmentException("Unable to revert partition " +
"assignment for " + topicName + ":" + tp.partitionId() + " because " +
"it would require an unclean leader election.");
}
}
PartitionChangeBuilder builder = new PartitionChangeBuilder(part,
tp.topicId(),
tp.partitionId(),
r -> clusterControl.unfenced(r),
() -> configurationControl.uncleanLeaderElectionEnabledForTopic(topicName));
builder.setTargetIsr(revert.isr()).
setTargetReplicas(revert.replicas()).
setTargetRemoving(Collections.emptyList()).
setTargetAdding(Collections.emptyList());
return builder.build();
}
/**
* Apply a given partition reassignment. In general a partition reassignment goes
* through several stages:
*
* 1. Issue a PartitionChangeRecord adding all the new replicas to the partition's
* main replica list, and setting removingReplicas and addingReplicas.
*
* 2. Wait for the partition to have an ISR that contains all the new replicas. Or
* if there are no new replicas, wait until we have an ISR that contains at least one
* replica that we are not removing.
*
* 3. Issue a second PartitionChangeRecord removing all removingReplicas from the
* partitions' main replica list, and clearing removingReplicas and addingReplicas.
*
* After stage 3, the reassignment is done.
*
* Under some conditions, steps #1 and #2 can be skipped entirely since the ISR is
* already suitable to progress to stage #3. For example, a partition reassignment
* that merely rearranges existing replicas in the list can bypass step #1 and #2 and
* complete immediately.
*
* @param tp The topic id and partition id.
* @param part The existing partition info.
* @param target The target partition info.
*
* @return The ChangePartitionRecord for the new partition assignment,
* or empty if no change is needed.
*/
Optional<ApiMessageAndVersion> changePartitionReassignment(TopicIdPartition tp,
PartitionRegistration part,
ReassignablePartition target) {
// Check that the requested partition assignment is valid.
validateManualPartitionAssignment(target.replicas(), OptionalInt.empty());
List<Integer> currentReplicas = Replicas.toList(part.replicas);
PartitionReassignmentReplicas reassignment =
new PartitionReassignmentReplicas(currentReplicas, target.replicas());
PartitionChangeBuilder builder = new PartitionChangeBuilder(part,
tp.topicId(),
tp.partitionId(),
r -> clusterControl.unfenced(r),
() -> false);
if (!reassignment.merged().equals(currentReplicas)) {
builder.setTargetReplicas(reassignment.merged());
}
if (!reassignment.removing().isEmpty()) {
builder.setTargetRemoving(reassignment.removing());
}
if (!reassignment.adding().isEmpty()) {
builder.setTargetAdding(reassignment.adding());
}
return builder.build();
}
ListPartitionReassignmentsResponseData listPartitionReassignments(
List<ListPartitionReassignmentsTopics> topicList) {
ListPartitionReassignmentsResponseData response =
new ListPartitionReassignmentsResponseData().setErrorMessage(null);
if (topicList == null) {
// List all reassigning topics.
for (Entry<Uuid, int[]> entry : reassigningTopics.entrySet()) {
listReassigningTopic(response, entry.getKey(), Replicas.toList(entry.getValue()));
}
} else {
// List the given topics.
for (ListPartitionReassignmentsTopics topic : topicList) {
Uuid topicId = topicsByName.get(topic.name());
if (topicId != null) {
listReassigningTopic(response, topicId, topic.partitionIndexes());
}
}
}
return response;
}
private void listReassigningTopic(ListPartitionReassignmentsResponseData response,
Uuid topicId,
List<Integer> partitionIds) {
TopicControlInfo topicInfo = topics.get(topicId);
if (topicInfo == null) return;
OngoingTopicReassignment ongoingTopic = new OngoingTopicReassignment().
setName(topicInfo.name);
for (int partitionId : partitionIds) {
Optional<OngoingPartitionReassignment> ongoing =
getOngoingPartitionReassignment(topicInfo, partitionId);
if (ongoing.isPresent()) {
ongoingTopic.partitions().add(ongoing.get());
}
}
if (!ongoingTopic.partitions().isEmpty()) {
response.topics().add(ongoingTopic);
}
}
private Optional<OngoingPartitionReassignment>
getOngoingPartitionReassignment(TopicControlInfo topicInfo, int partitionId) {
PartitionRegistration partition = topicInfo.parts.get(partitionId);
if (partition == null || !partition.isReassigning()) {
return Optional.empty();
}
return Optional.of(new OngoingPartitionReassignment().
setAddingReplicas(Replicas.toList(partition.addingReplicas)).
setRemovingReplicas(Replicas.toList(partition.removingReplicas)).
setPartitionIndex(partitionId).
setReplicas(Replicas.toList(partition.replicas)));
}
class ReplicationControlIterator implements Iterator<List<ApiMessageAndVersion>> {
private final long epoch;
private final Iterator<TopicControlInfo> iterator;

View File

@ -93,7 +93,7 @@ public final class TopicDelta {
}
/**
* Find the partitions that we are now leading, that we were not leading before.
* Find the partitions that we are now leading, whose partition epoch has changed.
*
* @param brokerId The broker id.
* @return A list of (partition ID, partition registration) entries.
@ -103,7 +103,8 @@ public final class TopicDelta {
for (Entry<Integer, PartitionRegistration> entry : partitionChanges.entrySet()) {
if (entry.getValue().leader == brokerId) {
PartitionRegistration prevPartition = image.partitions().get(entry.getKey());
if (prevPartition == null || prevPartition.leader != brokerId) {
if (prevPartition == null ||
prevPartition.partitionEpoch != entry.getValue().partitionEpoch) {
results.add(entry);
}
}
@ -112,7 +113,7 @@ public final class TopicDelta {
}
/**
* Find the partitions that we are now following, that we were not following before.
* Find the partitions that we are now following, whose partition epoch has changed.
*
* @param brokerId The broker id.
* @return A list of (partition ID, partition registration) entries.
@ -123,7 +124,8 @@ public final class TopicDelta {
if (entry.getValue().leader != brokerId &&
Replicas.contains(entry.getValue().replicas, brokerId)) {
PartitionRegistration prevPartition = image.partitions().get(entry.getKey());
if (prevPartition == null || prevPartition.leader == brokerId) {
if (prevPartition == null ||
prevPartition.partitionEpoch != entry.getValue().partitionEpoch) {
results.add(entry);
}
}

View File

@ -30,6 +30,7 @@ import java.util.Objects;
import static org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_RECORD;
import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
public class PartitionRegistration {
@ -68,20 +69,26 @@ public class PartitionRegistration {
}
public PartitionRegistration merge(PartitionChangeRecord record) {
int[] newReplicas = (record.replicas() == null) ?
replicas : Replicas.toArray(record.replicas());
int[] newIsr = (record.isr() == null) ? isr : Replicas.toArray(record.isr());
int[] newRemovingReplicas = (record.removingReplicas() == null) ?
removingReplicas : Replicas.toArray(record.removingReplicas());
int[] newAddingReplicas = (record.addingReplicas() == null) ?
addingReplicas : Replicas.toArray(record.addingReplicas());
int newLeader;
int newLeaderEpoch;
if (record.leader() == LeaderConstants.NO_LEADER_CHANGE) {
if (record.leader() == NO_LEADER_CHANGE) {
newLeader = leader;
newLeaderEpoch = leaderEpoch;
} else {
newLeader = record.leader();
newLeaderEpoch = leaderEpoch + 1;
}
return new PartitionRegistration(replicas,
return new PartitionRegistration(newReplicas,
newIsr,
removingReplicas,
addingReplicas,
newRemovingReplicas,
newAddingReplicas,
newLeader,
newLeaderEpoch,
partitionEpoch + 1);
@ -180,6 +187,13 @@ public class PartitionRegistration {
setIsNew(isNew);
}
/**
* Returns true if this partition is reassigning.
*/
public boolean isReassigning() {
return removingReplicas.length > 0 || addingReplicas.length > 0;
}
@Override
public int hashCode() {
return Objects.hash(replicas, isr, removingReplicas, addingReplicas, leader,

View File

@ -19,7 +19,9 @@ package org.apache.kafka.metadata;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
public class Replicas {
@ -137,6 +139,31 @@ public class Replicas {
return false;
}
/**
* Check if the first list of integers contains the second.
*
* @param a The first list
* @param b The second list
*
* @return True only if the first contains the second.
*/
public static boolean contains(List<Integer> a, int[] b) {
List<Integer> aSorted = new ArrayList<>(a);
aSorted.sort(Integer::compareTo);
List<Integer> bSorted = Replicas.toList(b);
bSorted.sort(Integer::compareTo);
int i = 0;
for (int replica : bSorted) {
while (true) {
if (i >= aSorted.size()) return false;
int replica2 = aSorted.get(i++);
if (replica2 == replica) break;
if (replica2 > replica) return false;
}
}
return true;
}
/**
* Copy a replica array without any occurrences of the given value.
*
@ -163,6 +190,32 @@ public class Replicas {
return result;
}
/**
* Copy a replica array without any occurrences of the given values.
*
* @param replicas The replica array.
* @param values The values to filter out.
*
* @return A new array without the given value.
*/
public static int[] copyWithout(int[] replicas, int[] values) {
int size = 0;
for (int i = 0; i < replicas.length; i++) {
if (!Replicas.contains(values, replicas[i])) {
size++;
}
}
int[] result = new int[size];
int j = 0;
for (int i = 0; i < replicas.length; i++) {
int replica = replicas[i];
if (!Replicas.contains(values, replica)) {
result[j++] = replica;
}
}
return result;
}
/**
* Copy a replica array with the given value.
*
@ -177,4 +230,19 @@ public class Replicas {
newReplicas[newReplicas.length - 1] = value;
return newReplicas;
}
/**
* Convert a replica array to a set.
*
* @param replicas The replica array.
*
* @return A new array with the given value.
*/
public static Set<Integer> toSet(int[] replicas) {
Set<Integer> result = new HashSet<>();
for (int replica : replicas) {
result.add(replica);
}
return result;
}
}

View File

@ -0,0 +1,247 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.controller;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.controller.PartitionChangeBuilder.BestLeader;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.metadata.Replicas;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import java.util.Arrays;
import java.util.Collections;
import java.util.Optional;
import static org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_CHANGE_RECORD;
import static org.apache.kafka.controller.PartitionChangeBuilder.changeRecordIsNoOp;
import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Timeout(value = 40)
public class PartitionChangeBuilderTest {
@Test
public void testChangeRecordIsNoOp() {
assertTrue(changeRecordIsNoOp(new PartitionChangeRecord()));
assertFalse(changeRecordIsNoOp(new PartitionChangeRecord().setLeader(1)));
assertFalse(changeRecordIsNoOp(new PartitionChangeRecord().
setIsr(Arrays.asList(1, 2, 3))));
assertFalse(changeRecordIsNoOp(new PartitionChangeRecord().
setRemovingReplicas(Arrays.asList(1))));
assertFalse(changeRecordIsNoOp(new PartitionChangeRecord().
setAddingReplicas(Arrays.asList(4))));
}
private final static PartitionRegistration FOO = new PartitionRegistration(
new int[] {2, 1, 3}, new int[] {2, 1, 3}, Replicas.NONE, Replicas.NONE,
1, 100, 200);
private final static Uuid FOO_ID = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg");
private static PartitionChangeBuilder createFooBuilder(boolean allowUnclean) {
return new PartitionChangeBuilder(FOO, FOO_ID, 0, r -> r != 3, () -> allowUnclean);
}
private final static PartitionRegistration BAR = new PartitionRegistration(
new int[] {1, 2, 3, 4}, new int[] {1, 2, 3}, new int[] {1}, new int[] {4},
1, 100, 200);
private final static Uuid BAR_ID = Uuid.fromString("LKfUsCBnQKekvL9O5dY9nw");
private static PartitionChangeBuilder createBarBuilder(boolean allowUnclean) {
return new PartitionChangeBuilder(BAR, BAR_ID, 0, r -> r != 3, () -> allowUnclean);
}
private static void assertBestLeaderEquals(PartitionChangeBuilder builder,
int expectedNode,
boolean expectedUnclean) {
BestLeader bestLeader = builder.new BestLeader();
assertEquals(expectedNode, bestLeader.node);
assertEquals(expectedUnclean, bestLeader.unclean);
}
@Test
public void testBestLeader() {
assertBestLeaderEquals(createFooBuilder(false), 2, false);
assertBestLeaderEquals(createFooBuilder(true), 2, false);
assertBestLeaderEquals(createFooBuilder(false).
setTargetIsr(Arrays.asList(1, 3)), 1, false);
assertBestLeaderEquals(createFooBuilder(true).
setTargetIsr(Arrays.asList(1, 3)), 1, false);
assertBestLeaderEquals(createFooBuilder(false).
setTargetIsr(Arrays.asList(3)), NO_LEADER, false);
assertBestLeaderEquals(createFooBuilder(true).
setTargetIsr(Arrays.asList(3)), 2, true);
assertBestLeaderEquals(createFooBuilder(true).
setTargetIsr(Arrays.asList(4)).setTargetReplicas(Arrays.asList(2, 1, 3, 4)),
4, false);
}
@Test
public void testShouldTryElection() {
assertFalse(createFooBuilder(false).shouldTryElection());
assertTrue(createFooBuilder(false).setAlwaysElectPreferredIfPossible(true).
shouldTryElection());
assertTrue(createFooBuilder(false).setTargetIsr(Arrays.asList(2, 3)).
shouldTryElection());
assertFalse(createFooBuilder(false).setTargetIsr(Arrays.asList(2, 1)).
shouldTryElection());
}
private static void testTriggerLeaderEpochBumpIfNeededLeader(PartitionChangeBuilder builder,
PartitionChangeRecord record,
int expectedLeader) {
builder.triggerLeaderEpochBumpIfNeeded(record);
assertEquals(expectedLeader, record.leader());
}
@Test
public void testTriggerLeaderEpochBumpIfNeeded() {
testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder(false),
new PartitionChangeRecord(), NO_LEADER_CHANGE);
testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder(false).
setTargetIsr(Arrays.asList(2, 1)), new PartitionChangeRecord(), 1);
testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder(false).
setTargetIsr(Arrays.asList(2, 1, 3, 4)), new PartitionChangeRecord(),
NO_LEADER_CHANGE);
testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder(false).
setTargetReplicas(Arrays.asList(2, 1, 3, 4)), new PartitionChangeRecord(),
NO_LEADER_CHANGE);
testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder(false).
setTargetReplicas(Arrays.asList(2, 1, 3, 4)),
new PartitionChangeRecord().setLeader(2), 2);
}
@Test
public void testNoChange() {
assertEquals(Optional.empty(), createFooBuilder(false).build());
assertEquals(Optional.empty(), createFooBuilder(true).build());
assertEquals(Optional.empty(), createBarBuilder(false).build());
assertEquals(Optional.empty(), createBarBuilder(true).build());
}
@Test
public void testIsrChangeAndLeaderBump() {
assertEquals(Optional.of(new ApiMessageAndVersion(new PartitionChangeRecord().
setTopicId(FOO_ID).
setPartitionId(0).
setIsr(Arrays.asList(2, 1)).
setLeader(1), PARTITION_CHANGE_RECORD.highestSupportedVersion())),
createFooBuilder(false).setTargetIsr(Arrays.asList(2, 1)).build());
}
@Test
public void testIsrChangeAndLeaderChange() {
assertEquals(Optional.of(new ApiMessageAndVersion(new PartitionChangeRecord().
setTopicId(FOO_ID).
setPartitionId(0).
setIsr(Arrays.asList(2, 3)).
setLeader(2), PARTITION_CHANGE_RECORD.highestSupportedVersion())),
createFooBuilder(false).setTargetIsr(Arrays.asList(2, 3)).build());
}
@Test
public void testReassignmentRearrangesReplicas() {
assertEquals(Optional.of(new ApiMessageAndVersion(new PartitionChangeRecord().
setTopicId(FOO_ID).
setPartitionId(0).
setReplicas(Arrays.asList(3, 2, 1)),
PARTITION_CHANGE_RECORD.highestSupportedVersion())),
createFooBuilder(false).setTargetReplicas(Arrays.asList(3, 2, 1)).build());
}
@Test
public void testIsrEnlargementCompletesReassignment() {
assertEquals(Optional.of(new ApiMessageAndVersion(new PartitionChangeRecord().
setTopicId(BAR_ID).
setPartitionId(0).
setReplicas(Arrays.asList(2, 3, 4)).
setIsr(Arrays.asList(2, 3, 4)).
setLeader(2).
setRemovingReplicas(Collections.emptyList()).
setAddingReplicas(Collections.emptyList()),
PARTITION_CHANGE_RECORD.highestSupportedVersion())),
createBarBuilder(false).setTargetIsr(Arrays.asList(1, 2, 3, 4)).build());
}
@Test
public void testRevertReassignment() {
PartitionReassignmentRevert revert = new PartitionReassignmentRevert(BAR);
assertEquals(Arrays.asList(1, 2, 3), revert.replicas());
assertEquals(Arrays.asList(1, 2, 3), revert.isr());
assertEquals(Optional.of(new ApiMessageAndVersion(new PartitionChangeRecord().
setTopicId(BAR_ID).
setPartitionId(0).
setReplicas(Arrays.asList(1, 2, 3)).
setLeader(1).
setRemovingReplicas(Collections.emptyList()).
setAddingReplicas(Collections.emptyList()),
PARTITION_CHANGE_RECORD.highestSupportedVersion())),
createBarBuilder(false).
setTargetReplicas(revert.replicas()).
setTargetIsr(revert.isr()).
setTargetRemoving(Collections.emptyList()).
setTargetAdding(Collections.emptyList()).
build());
}
@Test
public void testRemovingReplicaReassignment() {
PartitionReassignmentReplicas replicas = new PartitionReassignmentReplicas(
Replicas.toList(FOO.replicas), Arrays.asList(1, 2));
assertEquals(Collections.singletonList(3), replicas.removing());
assertEquals(Collections.emptyList(), replicas.adding());
assertEquals(Arrays.asList(1, 2, 3), replicas.merged());
assertEquals(Optional.of(new ApiMessageAndVersion(new PartitionChangeRecord().
setTopicId(FOO_ID).
setPartitionId(0).
setReplicas(Arrays.asList(1, 2)).
setIsr(Arrays.asList(2, 1)).
setLeader(1),
PARTITION_CHANGE_RECORD.highestSupportedVersion())),
createFooBuilder(false).
setTargetReplicas(replicas.merged()).
setTargetRemoving(replicas.removing()).
build());
}
@Test
public void testAddingReplicaReassignment() {
PartitionReassignmentReplicas replicas = new PartitionReassignmentReplicas(
Replicas.toList(FOO.replicas), Arrays.asList(1, 2, 3, 4));
assertEquals(Collections.emptyList(), replicas.removing());
assertEquals(Collections.singletonList(4), replicas.adding());
assertEquals(Arrays.asList(1, 2, 3, 4), replicas.merged());
assertEquals(Optional.of(new ApiMessageAndVersion(new PartitionChangeRecord().
setTopicId(FOO_ID).
setPartitionId(0).
setReplicas(Arrays.asList(1, 2, 3, 4)).
setAddingReplicas(Collections.singletonList(4)),
PARTITION_CHANGE_RECORD.highestSupportedVersion())),
createFooBuilder(false).
setTargetReplicas(replicas.merged()).
setTargetAdding(replicas.adding()).
build());
}
}

View File

@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.controller;
import java.util.Arrays;
import java.util.Collections;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import static org.junit.jupiter.api.Assertions.assertEquals;
@Timeout(40)
public class PartitionReassignmentReplicasTest {
@Test
public void testNoneAddedOrRemoved() {
PartitionReassignmentReplicas replicas = new PartitionReassignmentReplicas(
Arrays.asList(3, 2, 1), Arrays.asList(3, 2, 1));
assertEquals(Collections.emptyList(), replicas.removing());
assertEquals(Collections.emptyList(), replicas.adding());
assertEquals(Arrays.asList(3, 2, 1), replicas.merged());
}
@Test
public void testAdditions() {
PartitionReassignmentReplicas replicas = new PartitionReassignmentReplicas(
Arrays.asList(3, 2, 1), Arrays.asList(3, 6, 2, 1, 5));
assertEquals(Collections.emptyList(), replicas.removing());
assertEquals(Arrays.asList(5, 6), replicas.adding());
assertEquals(Arrays.asList(3, 6, 2, 1, 5), replicas.merged());
}
@Test
public void testRemovals() {
PartitionReassignmentReplicas replicas = new PartitionReassignmentReplicas(
Arrays.asList(3, 2, 1, 0), Arrays.asList(3, 1));
assertEquals(Arrays.asList(0, 2), replicas.removing());
assertEquals(Collections.emptyList(), replicas.adding());
assertEquals(Arrays.asList(3, 1, 0, 2), replicas.merged());
}
@Test
public void testAdditionsAndRemovals() {
PartitionReassignmentReplicas replicas = new PartitionReassignmentReplicas(
Arrays.asList(3, 2, 1, 0), Arrays.asList(7, 3, 1, 9));
assertEquals(Arrays.asList(0, 2), replicas.removing());
assertEquals(Arrays.asList(7, 9), replicas.adding());
assertEquals(Arrays.asList(7, 3, 1, 9, 0, 2), replicas.merged());
}
@Test
public void testRearrangement() {
PartitionReassignmentReplicas replicas = new PartitionReassignmentReplicas(
Arrays.asList(3, 2, 1, 0), Arrays.asList(0, 1, 3, 2));
assertEquals(Collections.emptyList(), replicas.removing());
assertEquals(Collections.emptyList(), replicas.adding());
assertEquals(Arrays.asList(0, 1, 3, 2), replicas.merged());
}
}

View File

@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.controller;
import java.util.Arrays;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.metadata.Replicas;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Timeout(40)
public class PartitionReassignmentRevertTest {
@Test
public void testNoneAddedOrRemoved() {
PartitionRegistration registration = new PartitionRegistration(
new int[] {3, 2, 1}, new int[] {3, 2},
Replicas.NONE, Replicas.NONE, 3, 100, 200);
PartitionReassignmentRevert revert = new PartitionReassignmentRevert(registration);
assertEquals(Arrays.asList(3, 2, 1), revert.replicas());
assertEquals(Arrays.asList(3, 2), revert.isr());
assertFalse(revert.unclean());
}
@Test
public void testSomeRemoving() {
PartitionRegistration registration = new PartitionRegistration(
new int[] {3, 2, 1}, new int[] {3, 2},
new int[] {2, 1}, Replicas.NONE, 3, 100, 200);
PartitionReassignmentRevert revert = new PartitionReassignmentRevert(registration);
assertEquals(Arrays.asList(3, 2, 1), revert.replicas());
assertEquals(Arrays.asList(3, 2), revert.isr());
assertFalse(revert.unclean());
}
@Test
public void testSomeAdding() {
PartitionRegistration registration = new PartitionRegistration(
new int[] {4, 5, 3, 2, 1}, new int[] {4, 5, 2},
Replicas.NONE, new int[] {4, 5}, 3, 100, 200);
PartitionReassignmentRevert revert = new PartitionReassignmentRevert(registration);
assertEquals(Arrays.asList(3, 2, 1), revert.replicas());
assertEquals(Arrays.asList(2), revert.isr());
assertFalse(revert.unclean());
}
@Test
public void testSomeRemovingAndAdding() {
PartitionRegistration registration = new PartitionRegistration(
new int[] {4, 5, 3, 2, 1}, new int[] {4, 5, 2},
new int[] {2}, new int[] {4, 5}, 3, 100, 200);
PartitionReassignmentRevert revert = new PartitionReassignmentRevert(registration);
assertEquals(Arrays.asList(3, 2, 1), revert.replicas());
assertEquals(Arrays.asList(2), revert.isr());
assertFalse(revert.unclean());
}
@Test
public void testIsrSpecialCase() {
PartitionRegistration registration = new PartitionRegistration(
new int[] {4, 5, 3, 2, 1}, new int[] {4, 5},
new int[] {2}, new int[] {4, 5}, 3, 100, 200);
PartitionReassignmentRevert revert = new PartitionReassignmentRevert(registration);
assertEquals(Arrays.asList(3, 2, 1), revert.replicas());
assertEquals(Arrays.asList(3), revert.isr());
assertTrue(revert.unclean());
}
}

View File

@ -547,7 +547,7 @@ public class QuorumControllerTest {
setTopics(Collections.singletonList(new ReassignableTopic())));
CompletableFuture<ListPartitionReassignmentsResponseData> listReassignmentsFuture =
controller.listPartitionReassignments(
new ListPartitionReassignmentsRequestData().setTimeoutMs(0));
new ListPartitionReassignmentsRequestData().setTopics(null).setTimeoutMs(0));
while (controller.time().nanoseconds() == now) {
Thread.sleep(0, 10);
}

View File

@ -23,7 +23,15 @@ import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
import org.apache.kafka.common.errors.StaleBrokerEpochException;
import org.apache.kafka.common.message.AlterIsrRequestData;
import org.apache.kafka.common.message.AlterIsrRequestData.PartitionData;
import org.apache.kafka.common.message.AlterIsrRequestData.TopicData;
import org.apache.kafka.common.message.AlterIsrResponseData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData.ReassignablePartition;
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData.ReassignableTopic;
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse;
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignableTopicResponse;
import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsAssignment;
import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic;
@ -40,6 +48,10 @@ import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitionsCo
import org.apache.kafka.common.message.ElectLeadersResponseData;
import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData.ListPartitionReassignmentsTopics;
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingPartitionReassignment;
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingTopicReassignment;
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
@ -49,15 +61,17 @@ import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.metadata.BrokerHeartbeatReply;
import org.apache.kafka.metadata.BrokerRegistration;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.metadata.RecordTestUtils;
import org.apache.kafka.metadata.Replicas;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.metadata.BrokerHeartbeatReply;
import org.apache.kafka.metadata.BrokerRegistration;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
@ -71,10 +85,12 @@ import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.apache.kafka.common.protocol.Errors.FENCED_LEADER_EPOCH;
import static org.apache.kafka.common.protocol.Errors.INVALID_PARTITIONS;
import static org.apache.kafka.common.protocol.Errors.INVALID_REPLICA_ASSIGNMENT;
import static org.apache.kafka.common.protocol.Errors.INVALID_TOPIC_EXCEPTION;
import static org.apache.kafka.common.protocol.Errors.NONE;
import static org.apache.kafka.common.protocol.Errors.NO_REASSIGNMENT_IN_PROGRESS;
import static org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_ID;
import static org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_OR_PARTITION;
import static org.apache.kafka.controller.BrokersToIsrs.TopicIdPartition;
@ -88,6 +104,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
@Timeout(40)
public class ReplicationControlManagerTest {
private final static Logger log = LoggerFactory.getLogger(ReplicationControlManagerTest.class);
private static class ReplicationControlTestContext {
final SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
final LogContext logContext = new LogContext();
@ -189,6 +207,19 @@ public class ReplicationControlManagerTest {
replay(result.records());
}
}
long currentBrokerEpoch(int brokerId) {
Map<Integer, BrokerRegistration> registrations = clusterControl.brokerRegistrations();
BrokerRegistration registration = registrations.get(brokerId);
assertNotNull(registration, "No current registration for broker " + brokerId);
return registration.epoch();
}
OptionalInt currentLeader(TopicIdPartition topicIdPartition) {
PartitionRegistration partition = replicationControl.
getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
return (partition.leader < 0) ? OptionalInt.empty() : OptionalInt.of(partition.leader);
}
}
@Test
@ -393,9 +424,9 @@ public class ReplicationControlManagerTest {
TopicIdPartition topicIdPartition = new TopicIdPartition(createTopicResult.topicId(), 0);
TopicPartition topicPartition = new TopicPartition("foo", 0);
assertEquals(OptionalInt.of(0), currentLeader(replicationControl, topicIdPartition));
long brokerEpoch = currentBrokerEpoch(ctx, 0);
AlterIsrRequestData.PartitionData shrinkIsrRequest = newAlterIsrPartition(
assertEquals(OptionalInt.of(0), ctx.currentLeader(topicIdPartition));
long brokerEpoch = ctx.currentBrokerEpoch(0);
PartitionData shrinkIsrRequest = newAlterIsrPartition(
replicationControl, topicIdPartition, Arrays.asList(0, 1));
ControllerResult<AlterIsrResponseData> shrinkIsrResult = sendAlterIsr(
replicationControl, 0, brokerEpoch, "foo", shrinkIsrRequest);
@ -403,7 +434,7 @@ public class ReplicationControlManagerTest {
shrinkIsrResult, topicPartition, NONE);
assertConsistentAlterIsrResponse(replicationControl, topicIdPartition, shrinkIsrResponse);
AlterIsrRequestData.PartitionData expandIsrRequest = newAlterIsrPartition(
PartitionData expandIsrRequest = newAlterIsrPartition(
replicationControl, topicIdPartition, Arrays.asList(0, 1, 2));
ControllerResult<AlterIsrResponseData> expandIsrResult = sendAlterIsr(
replicationControl, 0, brokerEpoch, "foo", expandIsrRequest);
@ -423,75 +454,52 @@ public class ReplicationControlManagerTest {
TopicIdPartition topicIdPartition = new TopicIdPartition(createTopicResult.topicId(), 0);
TopicPartition topicPartition = new TopicPartition("foo", 0);
assertEquals(OptionalInt.of(0), currentLeader(replicationControl, topicIdPartition));
long brokerEpoch = currentBrokerEpoch(ctx, 0);
assertEquals(OptionalInt.of(0), ctx.currentLeader(topicIdPartition));
long brokerEpoch = ctx.currentBrokerEpoch(0);
// Invalid leader
AlterIsrRequestData.PartitionData invalidLeaderRequest = newAlterIsrPartition(
PartitionData invalidLeaderRequest = newAlterIsrPartition(
replicationControl, topicIdPartition, Arrays.asList(0, 1));
ControllerResult<AlterIsrResponseData> invalidLeaderResult = sendAlterIsr(
replicationControl, 1, currentBrokerEpoch(ctx, 1),
replicationControl, 1, ctx.currentBrokerEpoch(1),
"foo", invalidLeaderRequest);
assertAlterIsrResponse(invalidLeaderResult, topicPartition, Errors.INVALID_REQUEST);
// Stale broker epoch
AlterIsrRequestData.PartitionData invalidBrokerEpochRequest = newAlterIsrPartition(
PartitionData invalidBrokerEpochRequest = newAlterIsrPartition(
replicationControl, topicIdPartition, Arrays.asList(0, 1));
assertThrows(StaleBrokerEpochException.class, () -> sendAlterIsr(
replicationControl, 0, brokerEpoch - 1, "foo", invalidBrokerEpochRequest));
// Invalid leader epoch
AlterIsrRequestData.PartitionData invalidLeaderEpochRequest = newAlterIsrPartition(
PartitionData invalidLeaderEpochRequest = newAlterIsrPartition(
replicationControl, topicIdPartition, Arrays.asList(0, 1));
invalidLeaderEpochRequest.setLeaderEpoch(500);
ControllerResult<AlterIsrResponseData> invalidLeaderEpochResult = sendAlterIsr(
replicationControl, 1, currentBrokerEpoch(ctx, 1),
replicationControl, 1, ctx.currentBrokerEpoch(1),
"foo", invalidLeaderEpochRequest);
assertAlterIsrResponse(invalidLeaderEpochResult, topicPartition, Errors.INVALID_REQUEST);
assertAlterIsrResponse(invalidLeaderEpochResult, topicPartition, FENCED_LEADER_EPOCH);
// Invalid ISR (3 is not a valid replica)
AlterIsrRequestData.PartitionData invalidIsrRequest1 = newAlterIsrPartition(
PartitionData invalidIsrRequest1 = newAlterIsrPartition(
replicationControl, topicIdPartition, Arrays.asList(0, 1));
invalidIsrRequest1.setNewIsr(Arrays.asList(0, 1, 3));
ControllerResult<AlterIsrResponseData> invalidIsrResult1 = sendAlterIsr(
replicationControl, 1, currentBrokerEpoch(ctx, 1),
replicationControl, 1, ctx.currentBrokerEpoch(1),
"foo", invalidIsrRequest1);
assertAlterIsrResponse(invalidIsrResult1, topicPartition, Errors.INVALID_REQUEST);
// Invalid ISR (does not include leader 0)
AlterIsrRequestData.PartitionData invalidIsrRequest2 = newAlterIsrPartition(
PartitionData invalidIsrRequest2 = newAlterIsrPartition(
replicationControl, topicIdPartition, Arrays.asList(0, 1));
invalidIsrRequest2.setNewIsr(Arrays.asList(1, 2));
ControllerResult<AlterIsrResponseData> invalidIsrResult2 = sendAlterIsr(
replicationControl, 1, currentBrokerEpoch(ctx, 1),
replicationControl, 1, ctx.currentBrokerEpoch(1),
"foo", invalidIsrRequest2);
assertAlterIsrResponse(invalidIsrResult2, topicPartition, Errors.INVALID_REQUEST);
}
private long currentBrokerEpoch(
ReplicationControlTestContext ctx,
int brokerId
) {
Map<Integer, BrokerRegistration> registrations = ctx.clusterControl.brokerRegistrations();
BrokerRegistration registration = registrations.get(brokerId);
assertNotNull(registration, "No current registration for broker " + brokerId);
return registration.epoch();
}
private OptionalInt currentLeader(
ReplicationControlManager replicationControl,
TopicIdPartition topicIdPartition
) {
PartitionRegistration partitionControl =
replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
if (partitionControl.leader < 0) {
return OptionalInt.empty();
} else {
return OptionalInt.of(partitionControl.leader);
}
}
private AlterIsrRequestData.PartitionData newAlterIsrPartition(
private PartitionData newAlterIsrPartition(
ReplicationControlManager replicationControl,
TopicIdPartition topicIdPartition,
List<Integer> newIsr
@ -777,18 +785,226 @@ public class ReplicationControlManagerTest {
OptionalInt.of(3))).getMessage());
}
private final static ListPartitionReassignmentsResponseData NONE_REASSIGNING =
new ListPartitionReassignmentsResponseData().setErrorMessage(null);
@Test
public void testBestLeader() {
assertEquals(2, ReplicationControlManager.bestLeader(
new int[]{1, 2, 3, 4}, new int[]{4, 2, 3}, false, __ -> true));
assertEquals(3, ReplicationControlManager.bestLeader(
new int[]{3, 2, 1, 4}, new int[]{4, 2, 3}, false, __ -> true));
assertEquals(4, ReplicationControlManager.bestLeader(
new int[]{3, 2, 1, 4}, new int[]{4, 2, 3}, false, r -> r == 4));
assertEquals(-1, ReplicationControlManager.bestLeader(
new int[]{3, 4, 5}, new int[]{1, 2}, false, r -> r == 4));
assertEquals(4, ReplicationControlManager.bestLeader(
new int[]{3, 4, 5}, new int[]{1, 2}, true, r -> r == 4));
public void testReassignPartitions() throws Exception {
ReplicationControlTestContext ctx = new ReplicationControlTestContext();
ReplicationControlManager replication = ctx.replicationControl;
ctx.registerBrokers(0, 1, 2, 3);
ctx.unfenceBrokers(0, 1, 2, 3);
Uuid fooId = ctx.createTestTopic("foo", new int[][] {
new int[] {1, 2, 3}, new int[] {3, 2, 1}}).topicId();
ctx.createTestTopic("bar", new int[][] {
new int[] {1, 2, 3}}).topicId();
assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(null));
ControllerResult<AlterPartitionReassignmentsResponseData> alterResult =
replication.alterPartitionReassignments(
new AlterPartitionReassignmentsRequestData().setTopics(Arrays.asList(
new ReassignableTopic().setName("foo").setPartitions(Arrays.asList(
new ReassignablePartition().setPartitionIndex(0).
setReplicas(Arrays.asList(3, 2, 1)),
new ReassignablePartition().setPartitionIndex(1).
setReplicas(Arrays.asList(0, 2, 1)),
new ReassignablePartition().setPartitionIndex(2).
setReplicas(Arrays.asList(0, 2, 1)))),
new ReassignableTopic().setName("bar"))));
assertEquals(new AlterPartitionReassignmentsResponseData().
setErrorMessage(null).setResponses(Arrays.asList(
new ReassignableTopicResponse().setName("foo").setPartitions(Arrays.asList(
new ReassignablePartitionResponse().setPartitionIndex(0).
setErrorMessage(null),
new ReassignablePartitionResponse().setPartitionIndex(1).
setErrorMessage(null),
new ReassignablePartitionResponse().setPartitionIndex(2).
setErrorCode(UNKNOWN_TOPIC_OR_PARTITION.code()).
setErrorMessage("Unable to find partition foo:2."))),
new ReassignableTopicResponse().
setName("bar"))),
alterResult.response());
ctx.replay(alterResult.records());
ListPartitionReassignmentsResponseData currentReassigning =
new ListPartitionReassignmentsResponseData().setErrorMessage(null).
setTopics(Arrays.asList(new OngoingTopicReassignment().
setName("foo").setPartitions(Arrays.asList(
new OngoingPartitionReassignment().setPartitionIndex(1).
setRemovingReplicas(Arrays.asList(3)).
setAddingReplicas(Arrays.asList(0)).
setReplicas(Arrays.asList(0, 2, 1, 3))))));
assertEquals(currentReassigning, replication.listPartitionReassignments(null));
assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(Arrays.asList(
new ListPartitionReassignmentsTopics().setName("bar").
setPartitionIndexes(Arrays.asList(0, 1, 2)))));
assertEquals(currentReassigning, replication.listPartitionReassignments(Arrays.asList(
new ListPartitionReassignmentsTopics().setName("foo").
setPartitionIndexes(Arrays.asList(0, 1, 2)))));
ControllerResult<AlterPartitionReassignmentsResponseData> cancelResult =
replication.alterPartitionReassignments(
new AlterPartitionReassignmentsRequestData().setTopics(Arrays.asList(
new ReassignableTopic().setName("foo").setPartitions(Arrays.asList(
new ReassignablePartition().setPartitionIndex(0).
setReplicas(null),
new ReassignablePartition().setPartitionIndex(1).
setReplicas(null),
new ReassignablePartition().setPartitionIndex(2).
setReplicas(null))),
new ReassignableTopic().setName("bar").setPartitions(Arrays.asList(
new ReassignablePartition().setPartitionIndex(0).
setReplicas(null))))));
assertEquals(ControllerResult.atomicOf(Collections.singletonList(new ApiMessageAndVersion(
new PartitionChangeRecord().setTopicId(fooId).
setPartitionId(1).
setReplicas(Arrays.asList(2, 1, 3)).
setLeader(3).
setRemovingReplicas(Collections.emptyList()).
setAddingReplicas(Collections.emptyList()), (short) 0)),
new AlterPartitionReassignmentsResponseData().setErrorMessage(null).setResponses(Arrays.asList(
new ReassignableTopicResponse().setName("foo").setPartitions(Arrays.asList(
new ReassignablePartitionResponse().setPartitionIndex(0).
setErrorCode(NO_REASSIGNMENT_IN_PROGRESS.code()).setErrorMessage(null),
new ReassignablePartitionResponse().setPartitionIndex(1).
setErrorCode(NONE.code()).setErrorMessage(null),
new ReassignablePartitionResponse().setPartitionIndex(2).
setErrorCode(UNKNOWN_TOPIC_OR_PARTITION.code()).
setErrorMessage("Unable to find partition foo:2."))),
new ReassignableTopicResponse().setName("bar").setPartitions(Arrays.asList(
new ReassignablePartitionResponse().setPartitionIndex(0).
setErrorCode(NO_REASSIGNMENT_IN_PROGRESS.code()).
setErrorMessage(null)))))),
cancelResult);
log.info("running final alterIsr...");
ControllerResult<AlterIsrResponseData> alterIsrResult = replication.alterIsr(
new AlterIsrRequestData().setBrokerId(3).setBrokerEpoch(103).
setTopics(Arrays.asList(new TopicData().setName("foo").setPartitions(Arrays.asList(
new PartitionData().setPartitionIndex(1).setCurrentIsrVersion(1).
setLeaderEpoch(0).setNewIsr(Arrays.asList(3, 0, 2, 1)))))));
assertEquals(new AlterIsrResponseData().setTopics(Arrays.asList(
new AlterIsrResponseData.TopicData().setName("foo").setPartitions(Arrays.asList(
new AlterIsrResponseData.PartitionData().
setPartitionIndex(1).
setErrorCode(FENCED_LEADER_EPOCH.code()))))),
alterIsrResult.response());
ctx.replay(alterIsrResult.records());
assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(null));
}
@Test
public void testCancelReassignPartitions() throws Exception {
ReplicationControlTestContext ctx = new ReplicationControlTestContext();
ReplicationControlManager replication = ctx.replicationControl;
ctx.registerBrokers(0, 1, 2, 3, 4);
ctx.unfenceBrokers(0, 1, 2, 3, 4);
Uuid fooId = ctx.createTestTopic("foo", new int[][] {
new int[] {1, 2, 3, 4}, new int[] {0, 1, 2, 3}, new int[] {4, 3, 1, 0},
new int[] {2, 3, 4, 1}}).topicId();
Uuid barId = ctx.createTestTopic("bar", new int[][] {
new int[] {4, 3, 2}}).topicId();
assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(null));
List<ApiMessageAndVersion> fenceRecords = new ArrayList<>();
replication.handleBrokerFenced(3, fenceRecords);
ctx.replay(fenceRecords);
assertEquals(new PartitionRegistration(new int[] {1, 2, 3, 4}, new int[] {1, 2, 4},
new int[] {}, new int[] {}, 1, 1, 1), replication.getPartition(fooId, 0));
ControllerResult<AlterPartitionReassignmentsResponseData> alterResult =
replication.alterPartitionReassignments(
new AlterPartitionReassignmentsRequestData().setTopics(Arrays.asList(
new ReassignableTopic().setName("foo").setPartitions(Arrays.asList(
new ReassignablePartition().setPartitionIndex(0).
setReplicas(Arrays.asList(1, 2, 3)),
new ReassignablePartition().setPartitionIndex(1).
setReplicas(Arrays.asList(1, 2, 3, 0)),
new ReassignablePartition().setPartitionIndex(2).
setReplicas(Arrays.asList(5, 6, 7)),
new ReassignablePartition().setPartitionIndex(3).
setReplicas(Arrays.asList()))),
new ReassignableTopic().setName("bar").setPartitions(Arrays.asList(
new ReassignablePartition().setPartitionIndex(0).
setReplicas(Arrays.asList(1, 2, 3, 4, 0)))))));
assertEquals(new AlterPartitionReassignmentsResponseData().
setErrorMessage(null).setResponses(Arrays.asList(
new ReassignableTopicResponse().setName("foo").setPartitions(Arrays.asList(
new ReassignablePartitionResponse().setPartitionIndex(0).
setErrorMessage(null),
new ReassignablePartitionResponse().setPartitionIndex(1).
setErrorMessage(null),
new ReassignablePartitionResponse().setPartitionIndex(2).
setErrorCode(INVALID_REPLICA_ASSIGNMENT.code()).
setErrorMessage("The manual partition assignment includes broker 5, " +
"but no such broker is registered."),
new ReassignablePartitionResponse().setPartitionIndex(3).
setErrorCode(INVALID_REPLICA_ASSIGNMENT.code()).
setErrorMessage("The manual partition assignment includes an empty " +
"replica list."))),
new ReassignableTopicResponse().setName("bar").setPartitions(Arrays.asList(
new ReassignablePartitionResponse().setPartitionIndex(0).
setErrorMessage(null))))),
alterResult.response());
ctx.replay(alterResult.records());
assertEquals(new PartitionRegistration(new int[] {1, 2, 3}, new int[] {1, 2},
new int[] {}, new int[] {}, 1, 2, 2), replication.getPartition(fooId, 0));
assertEquals(new PartitionRegistration(new int[] {1, 2, 3, 0}, new int[] {0, 1, 2},
new int[] {}, new int[] {}, 0, 1, 2), replication.getPartition(fooId, 1));
assertEquals(new PartitionRegistration(new int[] {1, 2, 3, 4, 0}, new int[] {4, 2},
new int[] {}, new int[] {0, 1}, 4, 1, 2), replication.getPartition(barId, 0));
ListPartitionReassignmentsResponseData currentReassigning =
new ListPartitionReassignmentsResponseData().setErrorMessage(null).
setTopics(Arrays.asList(new OngoingTopicReassignment().
setName("bar").setPartitions(Arrays.asList(
new OngoingPartitionReassignment().setPartitionIndex(0).
setRemovingReplicas(Collections.emptyList()).
setAddingReplicas(Arrays.asList(0, 1)).
setReplicas(Arrays.asList(1, 2, 3, 4, 0))))));
assertEquals(currentReassigning, replication.listPartitionReassignments(null));
assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(Arrays.asList(
new ListPartitionReassignmentsTopics().setName("foo").
setPartitionIndexes(Arrays.asList(0, 1, 2)))));
assertEquals(currentReassigning, replication.listPartitionReassignments(Arrays.asList(
new ListPartitionReassignmentsTopics().setName("bar").
setPartitionIndexes(Arrays.asList(0, 1, 2)))));
ControllerResult<AlterIsrResponseData> alterIsrResult = replication.alterIsr(
new AlterIsrRequestData().setBrokerId(4).setBrokerEpoch(104).
setTopics(Arrays.asList(new TopicData().setName("bar").setPartitions(Arrays.asList(
new PartitionData().setPartitionIndex(0).setCurrentIsrVersion(2).
setLeaderEpoch(1).setNewIsr(Arrays.asList(4, 1, 2, 3, 0)))))));
assertEquals(new AlterIsrResponseData().setTopics(Arrays.asList(
new AlterIsrResponseData.TopicData().setName("bar").setPartitions(Arrays.asList(
new AlterIsrResponseData.PartitionData().
setPartitionIndex(0).
setLeaderId(4).
setLeaderEpoch(1).
setIsr(Arrays.asList(4, 1, 2, 3, 0)).
setCurrentIsrVersion(3).
setErrorCode(NONE.code()))))),
alterIsrResult.response());
ControllerResult<AlterPartitionReassignmentsResponseData> cancelResult =
replication.alterPartitionReassignments(
new AlterPartitionReassignmentsRequestData().setTopics(Arrays.asList(
new ReassignableTopic().setName("foo").setPartitions(Arrays.asList(
new ReassignablePartition().setPartitionIndex(0).
setReplicas(null))),
new ReassignableTopic().setName("bar").setPartitions(Arrays.asList(
new ReassignablePartition().setPartitionIndex(0).
setReplicas(null))))));
assertEquals(ControllerResult.atomicOf(Collections.singletonList(new ApiMessageAndVersion(
new PartitionChangeRecord().setTopicId(barId).
setPartitionId(0).
setLeader(4).
setReplicas(Arrays.asList(2, 3, 4)).
setRemovingReplicas(null).
setAddingReplicas(Collections.emptyList()), (short) 0)),
new AlterPartitionReassignmentsResponseData().setErrorMessage(null).setResponses(Arrays.asList(
new ReassignableTopicResponse().setName("foo").setPartitions(Arrays.asList(
new ReassignablePartitionResponse().setPartitionIndex(0).
setErrorCode(NO_REASSIGNMENT_IN_PROGRESS.code()).setErrorMessage(null))),
new ReassignableTopicResponse().setName("bar").setPartitions(Arrays.asList(
new ReassignablePartitionResponse().setPartitionIndex(0).
setErrorMessage(null)))))),
cancelResult);
ctx.replay(cancelResult.records());
assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(null));
assertEquals(new PartitionRegistration(new int[] {2, 3, 4}, new int[] {4, 2},
new int[] {}, new int[] {}, 4, 2, 3), replication.getPartition(barId, 0));
}
@Test

View File

@ -105,4 +105,24 @@ public class PartitionRegistrationTest {
setIsNew(false).toString(),
b.toLeaderAndIsrPartitionState(new TopicPartition("bar", 0), false).toString());
}
@Test
public void testMergePartitionChangeRecordWithReassignmentData() {
PartitionRegistration partition0 = new PartitionRegistration(new int[] {1, 2, 3},
new int[] {1, 2, 3}, Replicas.NONE, Replicas.NONE, 1, 100, 200);
PartitionRegistration partition1 = partition0.merge(new PartitionChangeRecord().
setRemovingReplicas(Collections.singletonList(3)).
setAddingReplicas(Collections.singletonList(4)).
setReplicas(Arrays.asList(1, 2, 3, 4)));
assertEquals(new PartitionRegistration(new int[] {1, 2, 3, 4},
new int[] {1, 2, 3}, new int[] {3}, new int[] {4}, 1, 100, 201), partition1);
PartitionRegistration partition2 = partition1.merge(new PartitionChangeRecord().
setIsr(Arrays.asList(1, 2, 4)).
setRemovingReplicas(Collections.emptyList()).
setAddingReplicas(Collections.emptyList()).
setReplicas(Arrays.asList(1, 2, 4)));
assertEquals(new PartitionRegistration(new int[] {1, 2, 4},
new int[] {1, 2, 4}, Replicas.NONE, Replicas.NONE, 1, 100, 202), partition2);
assertFalse(partition2.isReassigning());
}
}

View File

@ -21,6 +21,8 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -88,9 +90,39 @@ public class ReplicasTest {
assertArrayEquals(new int[] {4, 1}, Replicas.copyWithout(new int[] {4, 2, 2, 1}, 2));
}
@Test
public void testCopyWithout2() {
assertArrayEquals(new int[] {}, Replicas.copyWithout(new int[] {}, new int[] {}));
assertArrayEquals(new int[] {}, Replicas.copyWithout(new int[] {1}, new int[] {1}));
assertArrayEquals(new int[] {1, 3},
Replicas.copyWithout(new int[] {1, 2, 3}, new int[]{2, 4}));
assertArrayEquals(new int[] {4},
Replicas.copyWithout(new int[] {4, 2, 2, 1}, new int[]{2, 1}));
}
@Test
public void testCopyWith() {
assertArrayEquals(new int[] {-1}, Replicas.copyWith(new int[] {}, -1));
assertArrayEquals(new int[] {1, 2, 3, 4}, Replicas.copyWith(new int[] {1, 2, 3}, 4));
}
@Test
public void testToSet() {
assertEquals(Collections.emptySet(), Replicas.toSet(new int[] {}));
assertEquals(new HashSet<>(Arrays.asList(3, 1, 5)),
Replicas.toSet(new int[] {1, 3, 5}));
assertEquals(new HashSet<>(Arrays.asList(1, 2, 10)),
Replicas.toSet(new int[] {1, 1, 2, 10, 10}));
}
@Test
public void testContains2() {
assertTrue(Replicas.contains(Collections.emptyList(), Replicas.NONE));
assertFalse(Replicas.contains(Collections.emptyList(), new int[] {1}));
assertTrue(Replicas.contains(Arrays.asList(1, 2, 3), new int[] {3, 2, 1}));
assertTrue(Replicas.contains(Arrays.asList(1, 2, 3, 4), new int[] {3}));
assertTrue(Replicas.contains(Arrays.asList(1, 2, 3, 4), new int[] {3, 1}));
assertFalse(Replicas.contains(Arrays.asList(1, 2, 3, 4), new int[] {3, 1, 7}));
assertTrue(Replicas.contains(Arrays.asList(1, 2, 3, 4), new int[] {}));
}
}