KAFKA-15665: Enforce partition reassignment should complete when all target replicas are in ISR (#15359)

When completing the partition reassignment, the new ISR should have all the target replicas.

Reviewers: Justine Olshan <jolshan@confluent.io>, David Mao <dmao@confluent.io>
This commit is contained in:
Calvin Liu 2024-02-16 10:27:43 -08:00 committed by GitHub
parent 0789952b68
commit 756f44a3e5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 14 additions and 6 deletions

View File

@ -120,9 +120,7 @@ class PartitionReassignmentReplicas {
}
if (newTargetReplicas.isEmpty()) return Optional.empty();
}
for (int replica : adding) {
if (!newTargetIsr.contains(replica)) return Optional.empty();
}
if (!newTargetIsr.containsAll(newTargetReplicas)) return Optional.empty();
return Optional.of(
new CompletedReassignment(

View File

@ -196,6 +196,16 @@ public class PartitionReassignmentReplicasTest {
build()));
}
@Test
public void testDoesNotCompleteReassignmentIfIsrDoesNotHaveAllTargetReplicas() {
PartitionReassignmentReplicas replicas = new PartitionReassignmentReplicas(
new PartitionAssignment(Arrays.asList(0, 1, 2)), new PartitionAssignment(Arrays.asList(0, 1, 3)));
assertTrue(replicas.isReassignmentInProgress());
Optional<PartitionReassignmentReplicas.CompletedReassignment> reassignmentOptional =
replicas.maybeCompleteReassignment(Arrays.asList(3));
assertFalse(reassignmentOptional.isPresent());
}
@Test
public void testOriginalReplicas() {
PartitionReassignmentReplicas replicas = new PartitionReassignmentReplicas(

View File

@ -1989,7 +1989,7 @@ public class ReplicationControlManagerTest {
new AlterPartitionReassignmentsRequestData().setTopics(asList(
new ReassignableTopic().setName("foo").setPartitions(asList(
new ReassignablePartition().setPartitionIndex(0).
setReplicas(asList(1, 2, 3)),
setReplicas(asList(1, 2, 4)),
new ReassignablePartition().setPartitionIndex(1).
setReplicas(asList(1, 2, 3, 0)),
new ReassignablePartition().setPartitionIndex(2).
@ -2019,11 +2019,11 @@ public class ReplicationControlManagerTest {
setErrorMessage(null))))),
alterResult.response());
ctx.replay(alterResult.records());
assertEquals(new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 3}).setIsr(new int[] {1, 2}).
assertEquals(new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 4}).setIsr(new int[] {1, 2, 4}).
setDirectories(new Uuid[] {
Uuid.fromString("TESTBROKER00001DIRAAAA"),
Uuid.fromString("TESTBROKER00002DIRAAAA"),
Uuid.fromString("TESTBROKER00003DIRAAAA")
Uuid.fromString("TESTBROKER00004DIRAAAA")
}).
setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(1).setPartitionEpoch(2).build(), replication.getPartition(fooId, 0));
assertEquals(new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 3, 0}).setIsr(new int[] {0, 1, 2}).