KAFKA-17973: Relax Restriction for Voters Set Change (#17728)

Relax the voter set change validation that exists in KRaft. When reading the kraft partition and validating voter set changes allow the voter set to have more than one change.

This violates the invariant that after a voter change there are overlapping voters for all possible majorities. This is okay because the KRaft leader checks that there are no pending voter set updates when handling an add voter request and a remove voter request.

Reviewers: José Armando García Sancio <jsancio@apache.org>
This commit is contained in:
Hailey Ni 2025-01-13 08:44:46 -08:00 committed by José Armando García Sancio
parent ed83f7224d
commit 5c5bfdc70c
3 changed files with 36 additions and 33 deletions

View File

@ -90,7 +90,7 @@ public final class KRaftControlRecordStateMachine {
LogContext logContext LogContext logContext
) { ) {
this.log = log; this.log = log;
this.voterSetHistory = new VoterSetHistory(staticVoterSet); this.voterSetHistory = new VoterSetHistory(staticVoterSet, logContext);
this.serde = serde; this.serde = serde;
this.bufferSupplier = bufferSupplier; this.bufferSupplier = bufferSupplier;
this.maxBatchSizeBytes = maxBatchSizeBytes; this.maxBatchSizeBytes = maxBatchSizeBytes;

View File

@ -16,8 +16,11 @@
*/ */
package org.apache.kafka.raft.internals; package org.apache.kafka.raft.internals;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.raft.VoterSet; import org.apache.kafka.raft.VoterSet;
import org.slf4j.Logger;
import java.util.Optional; import java.util.Optional;
import java.util.OptionalLong; import java.util.OptionalLong;
@ -31,9 +34,11 @@ import java.util.OptionalLong;
public final class VoterSetHistory { public final class VoterSetHistory {
private final VoterSet staticVoterSet; private final VoterSet staticVoterSet;
private final LogHistory<VoterSet> votersHistory = new TreeMapLogHistory<>(); private final LogHistory<VoterSet> votersHistory = new TreeMapLogHistory<>();
private final Logger logger;
VoterSetHistory(VoterSet staticVoterSet) { VoterSetHistory(VoterSet staticVoterSet, LogContext logContext) {
this.staticVoterSet = staticVoterSet; this.staticVoterSet = staticVoterSet;
this.logger = logContext.logger(getClass());
} }
/** /**
@ -55,12 +60,10 @@ public final class VoterSetHistory {
// all replicas. // all replicas.
VoterSet lastVoterSet = lastEntry.get().value(); VoterSet lastVoterSet = lastEntry.get().value();
if (!lastVoterSet.hasOverlappingMajority(voters)) { if (!lastVoterSet.hasOverlappingMajority(voters)) {
throw new IllegalArgumentException( logger.info(
String.format( "Last voter set ({}) doesn't have an overlapping majority with the new voter set ({})",
"Last voter set %s doesn't have an overlapping majority with the new voter set %s", lastVoterSet,
lastVoterSet, voters
voters
)
); );
} }
} }

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.kafka.raft.internals; package org.apache.kafka.raft.internals;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.raft.VoterSet; import org.apache.kafka.raft.VoterSet;
import org.apache.kafka.raft.VoterSetTest; import org.apache.kafka.raft.VoterSetTest;
@ -33,7 +34,7 @@ public final class VoterSetHistoryTest {
@Test @Test
void testStaticVoterSet() { void testStaticVoterSet() {
VoterSet staticVoterSet = VoterSet.fromMap(VoterSetTest.voterMap(IntStream.of(1, 2, 3), true)); VoterSet staticVoterSet = VoterSet.fromMap(VoterSetTest.voterMap(IntStream.of(1, 2, 3), true));
VoterSetHistory votersHistory = new VoterSetHistory(staticVoterSet); VoterSetHistory votersHistory = voterSetHistory(staticVoterSet);
assertEquals(Optional.empty(), votersHistory.valueAtOrBefore(0)); assertEquals(Optional.empty(), votersHistory.valueAtOrBefore(0));
assertEquals(Optional.empty(), votersHistory.valueAtOrBefore(100)); assertEquals(Optional.empty(), votersHistory.valueAtOrBefore(100));
@ -54,7 +55,7 @@ public final class VoterSetHistoryTest {
@Test @Test
void TestNoStaticVoterSet() { void TestNoStaticVoterSet() {
VoterSetHistory votersHistory = new VoterSetHistory(VoterSet.empty()); VoterSetHistory votersHistory = voterSetHistory(VoterSet.empty());
assertEquals(Optional.empty(), votersHistory.valueAtOrBefore(0)); assertEquals(Optional.empty(), votersHistory.valueAtOrBefore(0));
assertEquals(Optional.empty(), votersHistory.valueAtOrBefore(100)); assertEquals(Optional.empty(), votersHistory.valueAtOrBefore(100));
@ -65,7 +66,7 @@ public final class VoterSetHistoryTest {
void testAddAt() { void testAddAt() {
Map<Integer, VoterSet.VoterNode> voterMap = VoterSetTest.voterMap(IntStream.of(1, 2, 3), true); Map<Integer, VoterSet.VoterNode> voterMap = VoterSetTest.voterMap(IntStream.of(1, 2, 3), true);
VoterSet staticVoterSet = VoterSet.fromMap(new HashMap<>(voterMap)); VoterSet staticVoterSet = VoterSet.fromMap(new HashMap<>(voterMap));
VoterSetHistory votersHistory = new VoterSetHistory(staticVoterSet); VoterSetHistory votersHistory = voterSetHistory(staticVoterSet);
assertThrows( assertThrows(
IllegalArgumentException.class, IllegalArgumentException.class,
@ -95,7 +96,7 @@ public final class VoterSetHistoryTest {
void testBootstrapAddAt() { void testBootstrapAddAt() {
Map<Integer, VoterSet.VoterNode> voterMap = VoterSetTest.voterMap(IntStream.of(1, 2, 3), true); Map<Integer, VoterSet.VoterNode> voterMap = VoterSetTest.voterMap(IntStream.of(1, 2, 3), true);
VoterSet bootstrapVoterSet = VoterSet.fromMap(new HashMap<>(voterMap)); VoterSet bootstrapVoterSet = VoterSet.fromMap(new HashMap<>(voterMap));
VoterSetHistory votersHistory = new VoterSetHistory(VoterSet.empty()); VoterSetHistory votersHistory = voterSetHistory(VoterSet.empty());
votersHistory.addAt(-1, bootstrapVoterSet); votersHistory.addAt(-1, bootstrapVoterSet);
assertEquals(bootstrapVoterSet, votersHistory.lastValue()); assertEquals(bootstrapVoterSet, votersHistory.lastValue());
@ -124,7 +125,7 @@ public final class VoterSetHistoryTest {
@Test @Test
void testAddAtNonOverlapping() { void testAddAtNonOverlapping() {
VoterSetHistory votersHistory = new VoterSetHistory(VoterSet.empty()); VoterSetHistory votersHistory = voterSetHistory(VoterSet.empty());
Map<Integer, VoterSet.VoterNode> voterMap = VoterSetTest.voterMap(IntStream.of(1, 2, 3), true); Map<Integer, VoterSet.VoterNode> voterMap = VoterSetTest.voterMap(IntStream.of(1, 2, 3), true);
VoterSet voterSet = VoterSet.fromMap(new HashMap<>(voterMap)); VoterSet voterSet = VoterSet.fromMap(new HashMap<>(voterMap));
@ -132,35 +133,30 @@ public final class VoterSetHistoryTest {
// Add a starting voter to the history // Add a starting voter to the history
votersHistory.addAt(100, voterSet); votersHistory.addAt(100, voterSet);
// Remove voter so that it doesn't overlap // Assert multiple voters can be removed at a time
VoterSet nonoverlappingRemovedSet = voterSet VoterSet nonOverlappingRemovedSet = voterSet
.removeVoter(voterMap.get(1).voterKey()).get() .removeVoter(voterMap.get(1).voterKey()).get()
.removeVoter(voterMap.get(2).voterKey()).get(); .removeVoter(voterMap.get(2).voterKey()).get();
assertThrows( votersHistory.addAt(200, nonOverlappingRemovedSet);
IllegalArgumentException.class,
() -> votersHistory.addAt(200, nonoverlappingRemovedSet)
);
assertEquals(voterSet, votersHistory.lastValue());
assertEquals(nonOverlappingRemovedSet, votersHistory.lastValue());
// Add voters so that it doesn't overlap // Assert multiple voters can be added at a time
VoterSet nonoverlappingAddSet = voterSet VoterSet nonOverlappingAddSet = nonOverlappingRemovedSet
.addVoter(VoterSetTest.voterNode(4, true)).get() .addVoter(VoterSetTest.voterNode(1, true)).get()
.addVoter(VoterSetTest.voterNode(5, true)).get(); .addVoter(VoterSetTest.voterNode(2, true)).get();
assertThrows( votersHistory.addAt(300, nonOverlappingAddSet);
IllegalArgumentException.class,
() -> votersHistory.addAt(200, nonoverlappingAddSet) assertEquals(nonOverlappingAddSet, votersHistory.lastValue());
);
assertEquals(voterSet, votersHistory.lastValue());
} }
@Test @Test
void testNonoverlappingFromStaticVoterSet() { void testNonoverlappingFromStaticVoterSet() {
Map<Integer, VoterSet.VoterNode> voterMap = VoterSetTest.voterMap(IntStream.of(1, 2, 3), true); Map<Integer, VoterSet.VoterNode> voterMap = VoterSetTest.voterMap(IntStream.of(1, 2, 3), true);
VoterSet staticVoterSet = VoterSet.fromMap(new HashMap<>(voterMap)); VoterSet staticVoterSet = VoterSet.fromMap(new HashMap<>(voterMap));
VoterSetHistory votersHistory = new VoterSetHistory(VoterSet.empty()); VoterSetHistory votersHistory = voterSetHistory(VoterSet.empty());
// Remove voter so that it doesn't overlap // Remove voter so that it doesn't overlap
VoterSet nonoverlappingRemovedSet = staticVoterSet VoterSet nonoverlappingRemovedSet = staticVoterSet
@ -175,7 +171,7 @@ public final class VoterSetHistoryTest {
void testTruncateTo() { void testTruncateTo() {
Map<Integer, VoterSet.VoterNode> voterMap = VoterSetTest.voterMap(IntStream.of(1, 2, 3), true); Map<Integer, VoterSet.VoterNode> voterMap = VoterSetTest.voterMap(IntStream.of(1, 2, 3), true);
VoterSet staticVoterSet = VoterSet.fromMap(new HashMap<>(voterMap)); VoterSet staticVoterSet = VoterSet.fromMap(new HashMap<>(voterMap));
VoterSetHistory votersHistory = new VoterSetHistory(staticVoterSet); VoterSetHistory votersHistory = voterSetHistory(staticVoterSet);
// Add voter 4 to the voter set and voter set history // Add voter 4 to the voter set and voter set history
voterMap.put(4, VoterSetTest.voterNode(4, true)); voterMap.put(4, VoterSetTest.voterNode(4, true));
@ -201,7 +197,7 @@ public final class VoterSetHistoryTest {
void testTrimPrefixTo() { void testTrimPrefixTo() {
Map<Integer, VoterSet.VoterNode> voterMap = VoterSetTest.voterMap(IntStream.of(1, 2, 3), true); Map<Integer, VoterSet.VoterNode> voterMap = VoterSetTest.voterMap(IntStream.of(1, 2, 3), true);
VoterSet staticVoterSet = VoterSet.fromMap(new HashMap<>(voterMap)); VoterSet staticVoterSet = VoterSet.fromMap(new HashMap<>(voterMap));
VoterSetHistory votersHistory = new VoterSetHistory(staticVoterSet); VoterSetHistory votersHistory = voterSetHistory(staticVoterSet);
// Add voter 4 to the voter set and voter set history // Add voter 4 to the voter set and voter set history
voterMap.put(4, VoterSetTest.voterNode(4, true)); voterMap.put(4, VoterSetTest.voterNode(4, true));
@ -234,7 +230,7 @@ public final class VoterSetHistoryTest {
void testClear() { void testClear() {
Map<Integer, VoterSet.VoterNode> voterMap = VoterSetTest.voterMap(IntStream.of(1, 2, 3), true); Map<Integer, VoterSet.VoterNode> voterMap = VoterSetTest.voterMap(IntStream.of(1, 2, 3), true);
VoterSet staticVoterSet = VoterSet.fromMap(new HashMap<>(voterMap)); VoterSet staticVoterSet = VoterSet.fromMap(new HashMap<>(voterMap));
VoterSetHistory votersHistory = new VoterSetHistory(staticVoterSet); VoterSetHistory votersHistory = voterSetHistory(staticVoterSet);
// Add voter 4 to the voter set and voter set history // Add voter 4 to the voter set and voter set history
voterMap.put(4, VoterSetTest.voterNode(4, true)); voterMap.put(4, VoterSetTest.voterNode(4, true));
@ -250,4 +246,8 @@ public final class VoterSetHistoryTest {
assertEquals(staticVoterSet, votersHistory.lastValue()); assertEquals(staticVoterSet, votersHistory.lastValue());
} }
private VoterSetHistory voterSetHistory(VoterSet staticVoterSet) {
return new VoterSetHistory(staticVoterSet, new LogContext());
}
} }