KAFKA-14829: Consolidate reassignment logic into PartitionReassignmentReplicas (#13440)

Currently, we have various bits of reassignment logic spread across different classes. For example, ReplicationControlManager contains logic for when a reassignment is in progress, which is duplication in PartitionChangeBuilder. Another example is PartitionReassignmentRevert which contains logic for how to undo/revert a reassignment. The idea here is to move the logic to PartitionReassignmentReplicas so it's more testable and easier to reason about.

Reviewers: José Armando García Sancio <jsancio@apache.org>
This commit is contained in:
andymg3 2023-03-29 13:12:40 -04:00 committed by GitHub
parent 6e8d0d9850
commit 379b6978a0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 242 additions and 65 deletions

View File

@ -17,7 +17,6 @@
package org.apache.kafka.controller;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
@ -263,32 +262,22 @@ public class PartitionChangeBuilder {
}
private void completeReassignmentIfNeeded() {
// Check if there is a reassignment to complete.
if (targetRemoving.isEmpty() && targetAdding.isEmpty()) return;
PartitionReassignmentReplicas reassignmentReplicas =
new PartitionReassignmentReplicas(
targetRemoving,
targetAdding,
targetReplicas);
List<Integer> newTargetIsr = targetIsr;
List<Integer> newTargetReplicas = targetReplicas;
if (!targetRemoving.isEmpty()) {
newTargetIsr = new ArrayList<>(targetIsr.size());
for (int replica : targetIsr) {
if (!targetRemoving.contains(replica)) {
newTargetIsr.add(replica);
}
}
if (newTargetIsr.isEmpty()) return;
newTargetReplicas = new ArrayList<>(targetReplicas.size());
for (int replica : targetReplicas) {
if (!targetRemoving.contains(replica)) {
newTargetReplicas.add(replica);
}
}
if (newTargetReplicas.isEmpty()) return;
Optional<PartitionReassignmentReplicas.CompletedReassignment> completedReassignmentOpt =
reassignmentReplicas.maybeCompleteReassignment(targetIsr);
if (!completedReassignmentOpt.isPresent()) {
return;
}
for (int replica : targetAdding) {
if (!newTargetIsr.contains(replica)) return;
}
targetIsr = newTargetIsr;
targetReplicas = newTargetReplicas;
PartitionReassignmentReplicas.CompletedReassignment completedReassignment = completedReassignmentOpt.get();
targetIsr = completedReassignment.isr;
targetReplicas = completedReassignment.replicas;
targetRemoving = Collections.emptyList();
targetAdding = Collections.emptyList();
}
@ -308,15 +297,9 @@ public class PartitionChangeBuilder {
// Set the new ISR if it is different from the current ISR and unclean leader election didn't already set it.
record.setIsr(targetIsr);
}
if (!targetReplicas.isEmpty() && !targetReplicas.equals(Replicas.toList(partition.replicas))) {
record.setReplicas(targetReplicas);
}
if (!targetRemoving.equals(Replicas.toList(partition.removingReplicas))) {
record.setRemovingReplicas(targetRemoving);
}
if (!targetAdding.equals(Replicas.toList(partition.addingReplicas))) {
record.setAddingReplicas(targetAdding);
}
setAssignmentChanges(record);
if (targetLeaderRecoveryState != partition.leaderRecoveryState) {
record.setLeaderRecoveryState(targetLeaderRecoveryState.value());
}
@ -328,6 +311,18 @@ public class PartitionChangeBuilder {
}
}
private void setAssignmentChanges(PartitionChangeRecord record) {
if (!targetReplicas.isEmpty() && !targetReplicas.equals(Replicas.toList(partition.replicas))) {
record.setReplicas(targetReplicas);
}
if (!targetRemoving.equals(Replicas.toList(partition.removingReplicas))) {
record.setRemovingReplicas(targetRemoving);
}
if (!targetAdding.equals(Replicas.toList(partition.addingReplicas))) {
record.setAddingReplicas(targetAdding);
}
}
@Override
public String toString() {
return "PartitionChangeBuilder(" +

View File

@ -17,12 +17,15 @@
package org.apache.kafka.controller;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.metadata.Replicas;
import org.apache.kafka.metadata.placement.PartitionAssignment;
import java.util.ArrayList;
import java.util.Set;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.TreeSet;
@ -31,6 +34,16 @@ class PartitionReassignmentReplicas {
private final List<Integer> adding;
private final List<Integer> replicas;
public PartitionReassignmentReplicas(
List<Integer> removing,
List<Integer> adding,
List<Integer> replicas
) {
this.removing = removing;
this.adding = adding;
this.replicas = replicas;
}
private static Set<Integer> calculateDifference(List<Integer> a, List<Integer> b) {
Set<Integer> result = new TreeSet<>(a);
result.removeAll(b);
@ -61,6 +74,80 @@ class PartitionReassignmentReplicas {
return replicas;
}
boolean isReassignmentInProgress() {
return isReassignmentInProgress(
removing,
adding);
}
static boolean isReassignmentInProgress(PartitionRegistration part) {
return isReassignmentInProgress(
Replicas.toList(part.removingReplicas),
Replicas.toList(part.addingReplicas));
}
private static boolean isReassignmentInProgress(
List<Integer> removingReplicas,
List<Integer> addingReplicas
) {
return removingReplicas.size() > 0
|| addingReplicas.size() > 0;
}
Optional<CompletedReassignment> maybeCompleteReassignment(List<Integer> targetIsr) {
// Check if there is a reassignment to complete.
if (!isReassignmentInProgress()) {
return Optional.empty();
}
List<Integer> newTargetIsr = new ArrayList<>(targetIsr);
List<Integer> newTargetReplicas = replicas;
if (!removing.isEmpty()) {
newTargetIsr = new ArrayList<>(targetIsr.size());
for (int replica : targetIsr) {
if (!removing.contains(replica)) {
newTargetIsr.add(replica);
}
}
if (newTargetIsr.isEmpty()) return Optional.empty();
newTargetReplicas = new ArrayList<>(replicas.size());
for (int replica : replicas) {
if (!removing.contains(replica)) {
newTargetReplicas.add(replica);
}
}
if (newTargetReplicas.isEmpty()) return Optional.empty();
}
for (int replica : adding) {
if (!newTargetIsr.contains(replica)) return Optional.empty();
}
return Optional.of(
new CompletedReassignment(
newTargetReplicas,
newTargetIsr
)
);
}
static class CompletedReassignment {
final List<Integer> replicas;
final List<Integer> isr;
public CompletedReassignment(List<Integer> replicas, List<Integer> isr) {
this.replicas = replicas;
this.isr = isr;
}
}
List<Integer> originalReplicas() {
List<Integer> replicas = new ArrayList<>(this.replicas);
replicas.removeAll(adding);
return replicas;
}
@Override
public int hashCode() {
return Objects.hash(removing, adding, replicas);

View File

@ -21,8 +21,6 @@ import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.metadata.Replicas;
import java.util.ArrayList;
import java.util.Set;
import java.util.List;
import java.util.Objects;
@ -37,20 +35,18 @@ class PartitionReassignmentRevert {
// reassignment. In general, we want to take out any replica that the reassignment
// was adding, but keep the ones the reassignment was removing. (But see the
// special case below.)
Set<Integer> adding = Replicas.toSet(registration.addingReplicas);
this.replicas = new ArrayList<>(registration.replicas.length);
this.isr = new ArrayList<>(registration.isr.length);
for (int i = 0; i < registration.isr.length; i++) {
int replica = registration.isr[i];
if (!adding.contains(replica)) {
this.isr.add(replica);
}
}
for (int replica : registration.replicas) {
if (!adding.contains(replica)) {
this.replicas.add(replica);
}
}
PartitionReassignmentReplicas ongoingReassignment =
new PartitionReassignmentReplicas(
Replicas.toList(registration.removingReplicas),
Replicas.toList(registration.addingReplicas),
Replicas.toList(registration.replicas)
);
this.replicas = ongoingReassignment.originalReplicas();
this.isr = Replicas.toList(registration.isr);
this.isr.removeAll(ongoingReassignment.adding());
if (isr.isEmpty()) {
// In the special case that all the replicas that are in the ISR are also
// contained in addingReplicas, we choose the first remaining replica and add

View File

@ -129,6 +129,7 @@ import static org.apache.kafka.common.protocol.Errors.OPERATION_NOT_ATTEMPTED;
import static org.apache.kafka.common.protocol.Errors.TOPIC_AUTHORIZATION_FAILED;
import static org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_ID;
import static org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_OR_PARTITION;
import static org.apache.kafka.controller.PartitionReassignmentReplicas.isReassignmentInProgress;
import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
@ -414,14 +415,14 @@ public class ReplicationControlManager {
brokersToIsrs.update(record.topicId(), record.partitionId(), null,
newPartInfo.isr, NO_LEADER, newPartInfo.leader);
updateReassigningTopicsIfNeeded(record.topicId(), record.partitionId(),
false, newPartInfo.isReassigning());
false, isReassignmentInProgress(newPartInfo));
} else if (!newPartInfo.equals(prevPartInfo)) {
newPartInfo.maybeLogPartitionChange(log, description, prevPartInfo);
topicInfo.parts.put(record.partitionId(), newPartInfo);
brokersToIsrs.update(record.topicId(), record.partitionId(), prevPartInfo.isr,
newPartInfo.isr, prevPartInfo.leader, newPartInfo.leader);
updateReassigningTopicsIfNeeded(record.topicId(), record.partitionId(),
prevPartInfo.isReassigning(), newPartInfo.isReassigning());
isReassignmentInProgress(prevPartInfo), isReassignmentInProgress(newPartInfo));
}
if (newPartInfo.hasPreferredLeader()) {
@ -462,7 +463,7 @@ public class ReplicationControlManager {
}
PartitionRegistration newPartitionInfo = prevPartitionInfo.merge(record);
updateReassigningTopicsIfNeeded(record.topicId(), record.partitionId(),
prevPartitionInfo.isReassigning(), newPartitionInfo.isReassigning());
isReassignmentInProgress(prevPartitionInfo), isReassignmentInProgress(newPartitionInfo));
topicInfo.parts.put(record.partitionId(), newPartitionInfo);
brokersToIsrs.update(record.topicId(), record.partitionId(),
prevPartitionInfo.isr, newPartitionInfo.isr, prevPartitionInfo.leader,
@ -1020,8 +1021,7 @@ public class ReplicationControlManager {
setPartitionIndex(partitionId).
setErrorCode(error.code()));
continue;
} else if (change.removingReplicas() != null ||
change.addingReplicas() != null) {
} else if (isReassignmentInProgress(partition)) {
log.info("AlterPartition request from node {} for {}-{} completed " +
"the ongoing partition reassignment.", request.brokerId(),
topic.name, partitionId);
@ -1787,7 +1787,7 @@ public class ReplicationControlManager {
Optional<ApiMessageAndVersion> cancelPartitionReassignment(String topicName,
TopicIdPartition tp,
PartitionRegistration part) {
if (!part.isReassigning()) {
if (!isReassignmentInProgress(part)) {
throw new NoReassignmentInProgressException(NO_REASSIGNMENT_IN_PROGRESS.message());
}
PartitionReassignmentRevert revert = new PartitionReassignmentRevert(part);
@ -1913,7 +1913,7 @@ public class ReplicationControlManager {
private Optional<OngoingPartitionReassignment>
getOngoingPartitionReassignment(TopicControlInfo topicInfo, int partitionId) {
PartitionRegistration partition = topicInfo.parts.get(partitionId);
if (partition == null || !partition.isReassigning()) {
if (partition == null || !isReassignmentInProgress(partition)) {
return Optional.empty();
}
return Optional.of(new OngoingPartitionReassignment().

View File

@ -201,13 +201,6 @@ public class PartitionRegistration {
setIsNew(isNew);
}
/**
* Returns true if this partition is reassigning.
*/
public boolean isReassigning() {
return removingReplicas.length > 0 || addingReplicas.length > 0;
}
@Override
public int hashCode() {
return Objects.hash(Arrays.hashCode(replicas), Arrays.hashCode(isr), Arrays.hashCode(removingReplicas),

View File

@ -19,12 +19,18 @@ package org.apache.kafka.controller;
import java.util.Arrays;
import java.util.Collections;
import java.util.Optional;
import org.apache.kafka.metadata.LeaderRecoveryState;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.metadata.Replicas;
import org.apache.kafka.metadata.placement.PartitionAssignment;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Timeout(40)
@ -73,4 +79,105 @@ public class PartitionReassignmentReplicasTest {
assertEquals(Collections.emptyList(), replicas.adding());
assertEquals(Arrays.asList(0, 1, 3, 2), replicas.replicas());
}
@Test
public void testDoesNotCompleteReassignment() {
PartitionReassignmentReplicas replicas = new PartitionReassignmentReplicas(
new PartitionAssignment(Arrays.asList(0, 1, 2)), new PartitionAssignment(Arrays.asList(3, 4, 5)));
assertTrue(replicas.isReassignmentInProgress());
Optional<PartitionReassignmentReplicas.CompletedReassignment> reassignmentOptional =
replicas.maybeCompleteReassignment(Arrays.asList(0, 1, 2, 3, 4));
assertFalse(reassignmentOptional.isPresent());
}
@Test
public void testDoesNotCompleteReassignmentIfNoneOngoing() {
PartitionReassignmentReplicas replicas = new PartitionReassignmentReplicas(
Collections.emptyList(),
Collections.emptyList(),
Arrays.asList(0, 1, 2)
);
assertFalse(replicas.isReassignmentInProgress());
Optional<PartitionReassignmentReplicas.CompletedReassignment> reassignmentOptional =
replicas.maybeCompleteReassignment(Arrays.asList(0, 1, 2));
assertFalse(reassignmentOptional.isPresent());
}
@Test
public void testDoesCompleteReassignmentAllNewReplicas() {
PartitionReassignmentReplicas replicas = new PartitionReassignmentReplicas(
new PartitionAssignment(Arrays.asList(0, 1, 2)), new PartitionAssignment(Arrays.asList(3, 4, 5)));
assertTrue(replicas.isReassignmentInProgress());
Optional<PartitionReassignmentReplicas.CompletedReassignment> reassignmentOptional =
replicas.maybeCompleteReassignment(Arrays.asList(0, 1, 2, 3, 4, 5));
assertTrue(reassignmentOptional.isPresent());
PartitionReassignmentReplicas.CompletedReassignment completedReassignment = reassignmentOptional.get();
assertEquals(Arrays.asList(3, 4, 5), completedReassignment.isr);
assertEquals(Arrays.asList(3, 4, 5), completedReassignment.replicas);
}
@Test
public void testDoesCompleteReassignmentSomeNewReplicas() {
PartitionReassignmentReplicas replicas = new PartitionReassignmentReplicas(
new PartitionAssignment(Arrays.asList(0, 1, 2)), new PartitionAssignment(Arrays.asList(0, 1, 3)));
assertTrue(replicas.isReassignmentInProgress());
Optional<PartitionReassignmentReplicas.CompletedReassignment> reassignmentOptional =
replicas.maybeCompleteReassignment(Arrays.asList(0, 1, 2, 3));
assertTrue(reassignmentOptional.isPresent());
PartitionReassignmentReplicas.CompletedReassignment completedReassignment = reassignmentOptional.get();
assertEquals(Arrays.asList(0, 1, 3), completedReassignment.isr);
assertEquals(Arrays.asList(0, 1, 3), completedReassignment.replicas);
}
@Test
public void testIsReassignmentInProgress() {
assertTrue(PartitionReassignmentReplicas.isReassignmentInProgress(
new PartitionRegistration(
new int[]{0, 1, 3, 2},
new int[]{0, 1, 3, 2},
new int[]{2},
new int[]{3},
0,
LeaderRecoveryState.RECOVERED,
0,
0)));
assertTrue(PartitionReassignmentReplicas.isReassignmentInProgress(
new PartitionRegistration(
new int[]{0, 1, 3, 2},
new int[]{0, 1, 3, 2},
new int[]{2},
Replicas.NONE,
0,
LeaderRecoveryState.RECOVERED,
0,
0)));
assertTrue(PartitionReassignmentReplicas.isReassignmentInProgress(
new PartitionRegistration(
new int[]{0, 1, 2, 3},
new int[]{0, 1, 2, 3},
Replicas.NONE,
new int[]{3},
0,
LeaderRecoveryState.RECOVERED,
0,
0)));
assertFalse(PartitionReassignmentReplicas.isReassignmentInProgress(
new PartitionRegistration(
new int[]{0, 1, 2},
new int[]{0, 1, 2},
Replicas.NONE,
Replicas.NONE,
0,
LeaderRecoveryState.RECOVERED,
0,
0)));
}
@Test
public void testOriginalReplicas() {
PartitionReassignmentReplicas replicas = new PartitionReassignmentReplicas(
new PartitionAssignment(Arrays.asList(0, 1, 2)), new PartitionAssignment(Arrays.asList(0, 1, 3)));
assertEquals(Arrays.asList(0, 1, 2), replicas.originalReplicas());
}
}

View File

@ -129,7 +129,6 @@ public class PartitionRegistrationTest {
setReplicas(Arrays.asList(1, 2, 4)));
assertEquals(new PartitionRegistration(new int[] {1, 2, 4},
new int[] {1, 2, 4}, Replicas.NONE, Replicas.NONE, 1, LeaderRecoveryState.RECOVERED, 100, 202), partition2);
assertFalse(partition2.isReassigning());
}
@Property