mirror of https://github.com/apache/kafka.git
KAFKA-16530: Fix high-watermark calculation to not assume the leader is in the voter set (#16079)
1. Changing log message from error to info - We may expect the HW calculation to give us a smaller result than the current HW in the case of quorum reconfiguration. We will continue to not allow the HW to actually decrease. 2. Logic for finding the updated LeaderEndOffset for updateReplicaState is changed as well. We do not assume the leader is in the voter set and check the observer states as well. 3. updateLocalState now accepts an additional "lastVoterSet" param which allows us to update the leader state with the last known voters. any nodes in this set but not in voterStates will be added to voterStates and removed from observerStates, any nodes not in this set but in voterStates will be removed from voterStates and added to observerStates Reviewers: Luke Chen <showuon@gmail.com>, José Armando García Sancio <jsancio@apache.org>
This commit is contained in:
parent
04f7ed4c10
commit
25ca963980
|
|
@ -314,7 +314,7 @@ final public class KafkaRaftClient<T> implements RaftClient<T> {
|
|||
) {
|
||||
final LogOffsetMetadata endOffsetMetadata = log.endOffset();
|
||||
|
||||
if (state.updateLocalState(endOffsetMetadata)) {
|
||||
if (state.updateLocalState(endOffsetMetadata, partitionState.lastVoterSet().voterIds())) {
|
||||
onUpdateLeaderHighWatermark(state, currentTimeMs);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -32,6 +32,7 @@ import java.util.ArrayList;
|
|||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
|
@ -243,8 +244,9 @@ public class LeaderState<T> implements EpochState {
|
|||
);
|
||||
return true;
|
||||
} else if (highWatermarkUpdateOffset < currentHighWatermarkMetadata.offset) {
|
||||
log.error("The latest computed high watermark {} is smaller than the current " +
|
||||
"value {}, which suggests that one of the voters has lost committed data. " +
|
||||
log.info("The latest computed high watermark {} is smaller than the current " +
|
||||
"value {}, which should only happen when voter set membership changes. If the voter " +
|
||||
"set has not changed this suggests that one of the voters has lost committed data. " +
|
||||
"Full voter replication state: {}", highWatermarkUpdateOffset,
|
||||
currentHighWatermarkMetadata.offset, voterStates.values());
|
||||
return false;
|
||||
|
|
@ -296,10 +298,12 @@ public class LeaderState<T> implements EpochState {
|
|||
* Update the local replica state.
|
||||
*
|
||||
* @param endOffsetMetadata updated log end offset of local replica
|
||||
* @param lastVoterSet the up-to-date voter set
|
||||
* @return true if the high watermark is updated as a result of this call
|
||||
*/
|
||||
public boolean updateLocalState(
|
||||
LogOffsetMetadata endOffsetMetadata
|
||||
LogOffsetMetadata endOffsetMetadata,
|
||||
Set<Integer> lastVoterSet
|
||||
) {
|
||||
ReplicaState state = getOrCreateReplicaState(localId);
|
||||
state.endOffset.ifPresent(currentEndOffset -> {
|
||||
|
|
@ -308,7 +312,8 @@ public class LeaderState<T> implements EpochState {
|
|||
"end offset: " + currentEndOffset.offset + " -> " + endOffsetMetadata.offset);
|
||||
}
|
||||
});
|
||||
state.updateLeaderState(endOffsetMetadata);
|
||||
state.updateLeaderEndOffset(endOffsetMetadata);
|
||||
updateVoterAndObserverStates(lastVoterSet);
|
||||
return maybeUpdateHighWatermark();
|
||||
}
|
||||
|
||||
|
|
@ -341,9 +346,7 @@ public class LeaderState<T> implements EpochState {
|
|||
state.nodeId, currentEndOffset.offset, fetchOffsetMetadata.offset);
|
||||
}
|
||||
});
|
||||
|
||||
Optional<LogOffsetMetadata> leaderEndOffsetOpt =
|
||||
voterStates.get(localId).endOffset;
|
||||
Optional<LogOffsetMetadata> leaderEndOffsetOpt = getOrCreateReplicaState(localId).endOffset;
|
||||
|
||||
state.updateFollowerState(
|
||||
currentTimeMs,
|
||||
|
|
@ -435,9 +438,13 @@ public class LeaderState<T> implements EpochState {
|
|||
|
||||
}
|
||||
|
||||
/**
|
||||
* Clear observer states that have not been active for a while and are not the leader.
|
||||
*/
|
||||
private void clearInactiveObservers(final long currentTimeMs) {
|
||||
observerStates.entrySet().removeIf(integerReplicaStateEntry ->
|
||||
currentTimeMs - integerReplicaStateEntry.getValue().lastFetchTimestamp >= OBSERVER_SESSION_TIMEOUT_MS
|
||||
currentTimeMs - integerReplicaStateEntry.getValue().lastFetchTimestamp >= OBSERVER_SESSION_TIMEOUT_MS &&
|
||||
integerReplicaStateEntry.getKey() != localId
|
||||
);
|
||||
}
|
||||
|
||||
|
|
@ -445,6 +452,26 @@ public class LeaderState<T> implements EpochState {
|
|||
return voterStates.containsKey(remoteNodeId);
|
||||
}
|
||||
|
||||
private void updateVoterAndObserverStates(Set<Integer> lastVoterSet) {
|
||||
// Move any replica that is not in the last voter set from voterStates to observerStates
|
||||
for (Iterator<Map.Entry<Integer, ReplicaState>> iter = voterStates.entrySet().iterator(); iter.hasNext(); ) {
|
||||
Map.Entry<Integer, ReplicaState> replica = iter.next();
|
||||
if (!lastVoterSet.contains(replica.getKey())) {
|
||||
observerStates.put(replica.getKey(), replica.getValue());
|
||||
iter.remove();
|
||||
}
|
||||
}
|
||||
|
||||
// Add replicas that are in the last voter set and not in voterStates to voterStates (from observerStates
|
||||
// if they exist)
|
||||
for (int voterId : lastVoterSet) {
|
||||
if (!voterStates.containsKey(voterId)) {
|
||||
Optional<ReplicaState> existingObserverState = Optional.ofNullable(observerStates.remove(voterId));
|
||||
voterStates.put(voterId, existingObserverState.orElse(new ReplicaState(voterId, false)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static class ReplicaState implements Comparable<ReplicaState> {
|
||||
final int nodeId;
|
||||
Optional<LogOffsetMetadata> endOffset;
|
||||
|
|
@ -462,7 +489,7 @@ public class LeaderState<T> implements EpochState {
|
|||
this.hasAcknowledgedLeader = hasAcknowledgedLeader;
|
||||
}
|
||||
|
||||
void updateLeaderState(
|
||||
void updateLeaderEndOffset(
|
||||
LogOffsetMetadata endOffsetMetadata
|
||||
) {
|
||||
// For the leader, we only update the end offset. The remaining fields
|
||||
|
|
|
|||
|
|
@ -105,25 +105,27 @@ public class LeaderStateTest {
|
|||
|
||||
@Test
|
||||
public void testUpdateHighWatermarkQuorumSizeOne() {
|
||||
LeaderState<?> state = newLeaderState(singleton(localId), 15L);
|
||||
Set<Integer> voterSet = singleton(localId);
|
||||
LeaderState<?> state = newLeaderState(voterSet, 15L);
|
||||
assertEquals(Optional.empty(), state.highWatermark());
|
||||
assertFalse(state.updateLocalState(new LogOffsetMetadata(15L)));
|
||||
assertFalse(state.updateLocalState(new LogOffsetMetadata(15L), voterSet));
|
||||
assertEquals(emptySet(), state.nonAcknowledgingVoters());
|
||||
assertEquals(Optional.empty(), state.highWatermark());
|
||||
assertTrue(state.updateLocalState(new LogOffsetMetadata(16L)));
|
||||
assertTrue(state.updateLocalState(new LogOffsetMetadata(16L), voterSet));
|
||||
assertEquals(Optional.of(new LogOffsetMetadata(16L)), state.highWatermark());
|
||||
assertTrue(state.updateLocalState(new LogOffsetMetadata(20)));
|
||||
assertTrue(state.updateLocalState(new LogOffsetMetadata(20), voterSet));
|
||||
assertEquals(Optional.of(new LogOffsetMetadata(20L)), state.highWatermark());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNonMonotonicLocalEndOffsetUpdate() {
|
||||
LeaderState<?> state = newLeaderState(singleton(localId), 15L);
|
||||
Set<Integer> voterSet = singleton(localId);
|
||||
LeaderState<?> state = newLeaderState(voterSet, 15L);
|
||||
assertEquals(Optional.empty(), state.highWatermark());
|
||||
assertTrue(state.updateLocalState(new LogOffsetMetadata(16L)));
|
||||
assertTrue(state.updateLocalState(new LogOffsetMetadata(16L), voterSet));
|
||||
assertEquals(Optional.of(new LogOffsetMetadata(16L)), state.highWatermark());
|
||||
assertThrows(IllegalStateException.class,
|
||||
() -> state.updateLocalState(new LogOffsetMetadata(15L)));
|
||||
() -> state.updateLocalState(new LogOffsetMetadata(15L), voterSet));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -133,14 +135,15 @@ public class LeaderStateTest {
|
|||
int currentTime = 1000;
|
||||
int fetchTime = 0;
|
||||
int caughtUpTime = -1;
|
||||
LeaderState<?> state = newLeaderState(mkSet(localId, node1, node2), 10L);
|
||||
Set<Integer> voterSet = mkSet(localId, node1, node2);
|
||||
LeaderState<?> state = newLeaderState(voterSet, 10L);
|
||||
assertEquals(Optional.empty(), state.highWatermark());
|
||||
assertFalse(state.updateLocalState(new LogOffsetMetadata(10L)));
|
||||
assertFalse(state.updateLocalState(new LogOffsetMetadata(10L), voterSet));
|
||||
assertEquals(mkSet(node1, node2), state.nonAcknowledgingVoters());
|
||||
assertEquals(Optional.empty(), state.highWatermark());
|
||||
|
||||
// Node 1 falls behind
|
||||
assertFalse(state.updateLocalState(new LogOffsetMetadata(11L)));
|
||||
assertFalse(state.updateLocalState(new LogOffsetMetadata(11L), voterSet));
|
||||
assertFalse(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(10L)));
|
||||
assertEquals(currentTime, describeVoterState(state, localId, currentTime).lastCaughtUpTimestamp());
|
||||
assertEquals(caughtUpTime, describeVoterState(state, node1, currentTime).lastCaughtUpTimestamp());
|
||||
|
|
@ -152,14 +155,14 @@ public class LeaderStateTest {
|
|||
assertEquals(caughtUpTime, describeVoterState(state, node1, currentTime).lastCaughtUpTimestamp());
|
||||
|
||||
// Node 1 falls behind
|
||||
assertFalse(state.updateLocalState(new LogOffsetMetadata(100L)));
|
||||
assertFalse(state.updateLocalState(new LogOffsetMetadata(100L), voterSet));
|
||||
assertTrue(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(50L)));
|
||||
assertEquals(currentTime, describeVoterState(state, localId, currentTime).lastCaughtUpTimestamp());
|
||||
assertEquals(caughtUpTime, describeVoterState(state, node1, currentTime).lastCaughtUpTimestamp());
|
||||
|
||||
// Node 1 catches up to the last fetch offset
|
||||
int prevFetchTime = fetchTime;
|
||||
assertFalse(state.updateLocalState(new LogOffsetMetadata(200L)));
|
||||
assertFalse(state.updateLocalState(new LogOffsetMetadata(200L), voterSet));
|
||||
assertTrue(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(100L)));
|
||||
caughtUpTime = prevFetchTime;
|
||||
assertEquals(currentTime, describeVoterState(state, localId, currentTime).lastCaughtUpTimestamp());
|
||||
|
|
@ -167,7 +170,7 @@ public class LeaderStateTest {
|
|||
|
||||
// Node2 has never caught up to leader
|
||||
assertEquals(-1L, describeVoterState(state, node2, currentTime).lastCaughtUpTimestamp());
|
||||
assertFalse(state.updateLocalState(new LogOffsetMetadata(300L)));
|
||||
assertFalse(state.updateLocalState(new LogOffsetMetadata(300L), voterSet));
|
||||
assertTrue(state.updateReplicaState(node2, ++fetchTime, new LogOffsetMetadata(200L)));
|
||||
assertEquals(-1L, describeVoterState(state, node2, currentTime).lastCaughtUpTimestamp());
|
||||
assertTrue(state.updateReplicaState(node2, ++fetchTime, new LogOffsetMetadata(250L)));
|
||||
|
|
@ -180,12 +183,13 @@ public class LeaderStateTest {
|
|||
int currentTime = 1000;
|
||||
int fetchTime = 0;
|
||||
int caughtUpTime = -1;
|
||||
LeaderState<?> state = newLeaderState(singleton(localId), 5L);
|
||||
Set<Integer> voterSet = singleton(localId);
|
||||
LeaderState<?> state = newLeaderState(voterSet, 5L);
|
||||
assertEquals(Optional.empty(), state.highWatermark());
|
||||
assertEquals(emptySet(), state.nonAcknowledgingVoters());
|
||||
|
||||
// Node 1 falls behind
|
||||
assertTrue(state.updateLocalState(new LogOffsetMetadata(11L)));
|
||||
assertTrue(state.updateLocalState(new LogOffsetMetadata(11L), voterSet));
|
||||
assertFalse(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(10L)));
|
||||
assertEquals(currentTime, describeVoterState(state, localId, currentTime).lastCaughtUpTimestamp());
|
||||
assertEquals(caughtUpTime, describeObserverState(state, node1, currentTime).lastCaughtUpTimestamp());
|
||||
|
|
@ -197,14 +201,14 @@ public class LeaderStateTest {
|
|||
assertEquals(caughtUpTime, describeObserverState(state, node1, currentTime).lastCaughtUpTimestamp());
|
||||
|
||||
// Node 1 falls behind
|
||||
assertTrue(state.updateLocalState(new LogOffsetMetadata(100L)));
|
||||
assertTrue(state.updateLocalState(new LogOffsetMetadata(100L), voterSet));
|
||||
assertFalse(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(50L)));
|
||||
assertEquals(currentTime, describeVoterState(state, localId, currentTime).lastCaughtUpTimestamp());
|
||||
assertEquals(caughtUpTime, describeObserverState(state, node1, currentTime).lastCaughtUpTimestamp());
|
||||
|
||||
// Node 1 catches up to the last fetch offset
|
||||
int prevFetchTime = fetchTime;
|
||||
assertTrue(state.updateLocalState(new LogOffsetMetadata(200L)));
|
||||
assertTrue(state.updateLocalState(new LogOffsetMetadata(200L), voterSet));
|
||||
assertFalse(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(102L)));
|
||||
caughtUpTime = prevFetchTime;
|
||||
assertEquals(currentTime, describeVoterState(state, localId, currentTime).lastCaughtUpTimestamp());
|
||||
|
|
@ -219,32 +223,35 @@ public class LeaderStateTest {
|
|||
|
||||
@Test
|
||||
public void testIdempotentEndOffsetUpdate() {
|
||||
LeaderState<?> state = newLeaderState(singleton(localId), 15L);
|
||||
Set<Integer> voterSet = singleton(localId);
|
||||
LeaderState<?> state = newLeaderState(voterSet, 15L);
|
||||
assertEquals(Optional.empty(), state.highWatermark());
|
||||
assertTrue(state.updateLocalState(new LogOffsetMetadata(16L)));
|
||||
assertFalse(state.updateLocalState(new LogOffsetMetadata(16L)));
|
||||
assertTrue(state.updateLocalState(new LogOffsetMetadata(16L), voterSet));
|
||||
assertFalse(state.updateLocalState(new LogOffsetMetadata(16L), voterSet));
|
||||
assertEquals(Optional.of(new LogOffsetMetadata(16L)), state.highWatermark());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpdateHighWatermarkMetadata() {
|
||||
LeaderState<?> state = newLeaderState(singleton(localId), 15L);
|
||||
Set<Integer> voterSet = singleton(localId);
|
||||
LeaderState<?> state = newLeaderState(voterSet, 15L);
|
||||
assertEquals(Optional.empty(), state.highWatermark());
|
||||
|
||||
LogOffsetMetadata initialHw = new LogOffsetMetadata(16L, Optional.of(new MockOffsetMetadata("bar")));
|
||||
assertTrue(state.updateLocalState(initialHw));
|
||||
assertTrue(state.updateLocalState(initialHw, voterSet));
|
||||
assertEquals(Optional.of(initialHw), state.highWatermark());
|
||||
|
||||
LogOffsetMetadata updateHw = new LogOffsetMetadata(16L, Optional.of(new MockOffsetMetadata("baz")));
|
||||
assertTrue(state.updateLocalState(updateHw));
|
||||
assertTrue(state.updateLocalState(updateHw, voterSet));
|
||||
assertEquals(Optional.of(updateHw), state.highWatermark());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpdateHighWatermarkQuorumSizeTwo() {
|
||||
int otherNodeId = 1;
|
||||
LeaderState<?> state = newLeaderState(mkSet(localId, otherNodeId), 10L);
|
||||
assertFalse(state.updateLocalState(new LogOffsetMetadata(13L)));
|
||||
Set<Integer> voterSet = mkSet(localId, otherNodeId);
|
||||
LeaderState<?> state = newLeaderState(voterSet, 10L);
|
||||
assertFalse(state.updateLocalState(new LogOffsetMetadata(13L), voterSet));
|
||||
assertEquals(singleton(otherNodeId), state.nonAcknowledgingVoters());
|
||||
assertEquals(Optional.empty(), state.highWatermark());
|
||||
assertFalse(state.updateReplicaState(otherNodeId, 0, new LogOffsetMetadata(10L)));
|
||||
|
|
@ -260,8 +267,9 @@ public class LeaderStateTest {
|
|||
public void testUpdateHighWatermarkQuorumSizeThree() {
|
||||
int node1 = 1;
|
||||
int node2 = 2;
|
||||
LeaderState<?> state = newLeaderState(mkSet(localId, node1, node2), 10L);
|
||||
assertFalse(state.updateLocalState(new LogOffsetMetadata(15L)));
|
||||
Set<Integer> voterSet = mkSet(localId, node1, node2);
|
||||
LeaderState<?> state = newLeaderState(voterSet, 10L);
|
||||
assertFalse(state.updateLocalState(new LogOffsetMetadata(15L), voterSet));
|
||||
assertEquals(mkSet(node1, node2), state.nonAcknowledgingVoters());
|
||||
assertEquals(Optional.empty(), state.highWatermark());
|
||||
assertFalse(state.updateReplicaState(node1, 0, new LogOffsetMetadata(10L)));
|
||||
|
|
@ -272,7 +280,7 @@ public class LeaderStateTest {
|
|||
assertEquals(Optional.empty(), state.highWatermark());
|
||||
assertTrue(state.updateReplicaState(node2, 0, new LogOffsetMetadata(15L)));
|
||||
assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark());
|
||||
assertFalse(state.updateLocalState(new LogOffsetMetadata(20L)));
|
||||
assertFalse(state.updateLocalState(new LogOffsetMetadata(20L), voterSet));
|
||||
assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark());
|
||||
assertTrue(state.updateReplicaState(node1, 0, new LogOffsetMetadata(20L)));
|
||||
assertEquals(Optional.of(new LogOffsetMetadata(20L)), state.highWatermark());
|
||||
|
|
@ -280,12 +288,135 @@ public class LeaderStateTest {
|
|||
assertEquals(Optional.of(new LogOffsetMetadata(20L)), state.highWatermark());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHighWatermarkDoesIncreaseFromNewVoter() {
|
||||
int node1 = 1;
|
||||
int node2 = 2;
|
||||
Set<Integer> originalVoterSet = mkSet(localId, node1);
|
||||
LeaderState<?> state = newLeaderState(originalVoterSet, 5L);
|
||||
assertFalse(state.updateLocalState(new LogOffsetMetadata(15L), originalVoterSet));
|
||||
assertTrue(state.updateReplicaState(node1, 0, new LogOffsetMetadata(10L)));
|
||||
assertEquals(Optional.of(new LogOffsetMetadata(10L)), state.highWatermark());
|
||||
|
||||
// updating replica state of node2 before it joins voterSet should not increase HW to 15L
|
||||
assertFalse(state.updateReplicaState(node2, 0, new LogOffsetMetadata(15L)));
|
||||
assertEquals(Optional.of(new LogOffsetMetadata(10L)), state.highWatermark());
|
||||
|
||||
// adding node2 to voterSet will cause HW to increase to 15L
|
||||
Set<Integer> voterSetWithNode2 = mkSet(localId, node1, node2);
|
||||
assertTrue(state.updateLocalState(new LogOffsetMetadata(15L), voterSetWithNode2));
|
||||
assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark());
|
||||
|
||||
// HW will not update to 16L until a majority reaches it
|
||||
assertFalse(state.updateLocalState(new LogOffsetMetadata(16L), voterSetWithNode2));
|
||||
assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark());
|
||||
assertTrue(state.updateReplicaState(node2, 0, new LogOffsetMetadata(16L)));
|
||||
assertEquals(Optional.of(new LogOffsetMetadata(16L)), state.highWatermark());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHighWatermarkDoesNotDecreaseFromNewVoter() {
|
||||
int node1 = 1;
|
||||
int node2 = 2;
|
||||
int node3 = 3;
|
||||
// start with three voters with HW at 15L
|
||||
Set<Integer> originalVoterSet = mkSet(localId, node1, node2);
|
||||
LeaderState<?> state = newLeaderState(originalVoterSet, 5L);
|
||||
assertFalse(state.updateLocalState(new LogOffsetMetadata(15L), originalVoterSet));
|
||||
assertTrue(state.updateReplicaState(node1, 0, new LogOffsetMetadata(15L)));
|
||||
assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark());
|
||||
assertFalse(state.updateReplicaState(node2, 0, new LogOffsetMetadata(10L)));
|
||||
|
||||
// updating replica state of node3 before it joins voterSet
|
||||
assertFalse(state.updateReplicaState(node3, 0, new LogOffsetMetadata(10L)));
|
||||
|
||||
// adding node3 to voterSet should not cause HW to decrease even if majority is < HW
|
||||
Set<Integer> voterSetWithNode3 = mkSet(localId, node1, node2, node3);
|
||||
assertFalse(state.updateLocalState(new LogOffsetMetadata(16L), voterSetWithNode3));
|
||||
assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark());
|
||||
|
||||
// HW will not decrease if calculated HW is anything lower than the last HW
|
||||
assertFalse(state.updateReplicaState(node2, 0, new LogOffsetMetadata(13L)));
|
||||
assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark());
|
||||
assertFalse(state.updateReplicaState(node3, 0, new LogOffsetMetadata(13L)));
|
||||
assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark());
|
||||
assertFalse(state.updateReplicaState(node1, 0, new LogOffsetMetadata(16L)));
|
||||
assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark());
|
||||
|
||||
// HW will update to 16L once a majority of the voterSet is at least 16L
|
||||
assertTrue(state.updateReplicaState(node3, 0, new LogOffsetMetadata(16L)));
|
||||
assertEquals(Optional.of(new LogOffsetMetadata(16L)), state.highWatermark());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpdateHighWatermarkRemovingFollowerFromVoterStates() {
|
||||
int node1 = 1;
|
||||
int node2 = 2;
|
||||
Set<Integer> originalVoterSet = mkSet(localId, node1, node2);
|
||||
LeaderState<?> state = newLeaderState(originalVoterSet, 10L);
|
||||
assertFalse(state.updateLocalState(new LogOffsetMetadata(15L), originalVoterSet));
|
||||
assertTrue(state.updateReplicaState(node1, 0, new LogOffsetMetadata(15L)));
|
||||
assertFalse(state.updateReplicaState(node2, 0, new LogOffsetMetadata(10L)));
|
||||
assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark());
|
||||
|
||||
// removing node1 should not decrement HW to 10L
|
||||
Set<Integer> voterSetWithoutNode1 = mkSet(localId, node2);
|
||||
assertFalse(state.updateLocalState(new LogOffsetMetadata(17L), voterSetWithoutNode1));
|
||||
assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark());
|
||||
|
||||
// HW cannot change until after node2 catches up to last HW
|
||||
assertFalse(state.updateReplicaState(node2, 0, new LogOffsetMetadata(14L)));
|
||||
assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark());
|
||||
assertFalse(state.updateLocalState(new LogOffsetMetadata(18L), voterSetWithoutNode1));
|
||||
assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark());
|
||||
assertFalse(state.updateReplicaState(node1, 0, new LogOffsetMetadata(18L)));
|
||||
assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark());
|
||||
assertFalse(state.updateReplicaState(node2, 0, new LogOffsetMetadata(15L)));
|
||||
assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark());
|
||||
|
||||
// HW should update to 16L
|
||||
assertTrue(state.updateReplicaState(node2, 0, new LogOffsetMetadata(16L)));
|
||||
assertEquals(Optional.of(new LogOffsetMetadata(16L)), state.highWatermark());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpdateHighWatermarkQuorumRemovingLeaderFromVoterStates() {
|
||||
int node1 = 1;
|
||||
int node2 = 2;
|
||||
Set<Integer> originalVoterSet = mkSet(localId, node1, node2);
|
||||
LeaderState<?> state = newLeaderState(originalVoterSet, 10L);
|
||||
assertFalse(state.updateLocalState(new LogOffsetMetadata(15L), originalVoterSet));
|
||||
assertTrue(state.updateReplicaState(node1, 0, new LogOffsetMetadata(15L)));
|
||||
assertFalse(state.updateReplicaState(node2, 0, new LogOffsetMetadata(10L)));
|
||||
assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark());
|
||||
|
||||
// removing leader should not decrement HW to 10L
|
||||
Set<Integer> voterSetWithoutLeader = mkSet(node1, node2);
|
||||
assertFalse(state.updateLocalState(new LogOffsetMetadata(17L), voterSetWithoutLeader));
|
||||
assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark());
|
||||
|
||||
// HW cannot change until node2 catches up to last HW
|
||||
assertFalse(state.updateReplicaState(node1, 0, new LogOffsetMetadata(16L)));
|
||||
assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark());
|
||||
assertFalse(state.updateLocalState(new LogOffsetMetadata(18L), voterSetWithoutLeader));
|
||||
assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark());
|
||||
assertFalse(state.updateReplicaState(node2, 0, new LogOffsetMetadata(14L)));
|
||||
assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark());
|
||||
assertFalse(state.updateReplicaState(node2, 0, new LogOffsetMetadata(15L)));
|
||||
assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark());
|
||||
|
||||
// HW will not update to 16L until majority of remaining voterSet (node1, node2) are at least 16L
|
||||
assertTrue(state.updateReplicaState(node2, 0, new LogOffsetMetadata(16L)));
|
||||
assertEquals(Optional.of(new LogOffsetMetadata(16L)), state.highWatermark());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNonMonotonicHighWatermarkUpdate() {
|
||||
MockTime time = new MockTime();
|
||||
int node1 = 1;
|
||||
LeaderState<?> state = newLeaderState(mkSet(localId, node1), 0L);
|
||||
state.updateLocalState(new LogOffsetMetadata(10L));
|
||||
Set<Integer> voterSet = mkSet(localId, node1);
|
||||
LeaderState<?> state = newLeaderState(voterSet, 0L);
|
||||
state.updateLocalState(new LogOffsetMetadata(10L), voterSet);
|
||||
state.updateReplicaState(node1, time.milliseconds(), new LogOffsetMetadata(10L));
|
||||
assertEquals(Optional.of(new LogOffsetMetadata(10L)), state.highWatermark());
|
||||
|
||||
|
|
@ -315,7 +446,8 @@ public class LeaderStateTest {
|
|||
long leaderStartOffset = 10L;
|
||||
long leaderEndOffset = 15L;
|
||||
|
||||
LeaderState<?> state = newLeaderState(mkSet(localId), leaderStartOffset);
|
||||
Set<Integer> voterSet = singleton(localId);
|
||||
LeaderState<?> state = newLeaderState(voterSet, leaderStartOffset);
|
||||
|
||||
// Until we have updated local state, high watermark should be uninitialized
|
||||
assertEquals(Optional.empty(), state.highWatermark());
|
||||
|
|
@ -334,7 +466,7 @@ public class LeaderStateTest {
|
|||
|
||||
|
||||
// Now update the high watermark and verify the describe output
|
||||
assertTrue(state.updateLocalState(new LogOffsetMetadata(leaderEndOffset)));
|
||||
assertTrue(state.updateLocalState(new LogOffsetMetadata(leaderEndOffset), voterSet));
|
||||
assertEquals(Optional.of(new LogOffsetMetadata(leaderEndOffset)), state.highWatermark());
|
||||
|
||||
time.sleep(500);
|
||||
|
|
@ -361,8 +493,9 @@ public class LeaderStateTest {
|
|||
long leaderStartOffset = 10L;
|
||||
long leaderEndOffset = 15L;
|
||||
|
||||
LeaderState<?> state = newLeaderState(mkSet(localId, activeFollowerId, inactiveFollowerId), leaderStartOffset);
|
||||
assertFalse(state.updateLocalState(new LogOffsetMetadata(leaderEndOffset)));
|
||||
Set<Integer> voterSet = mkSet(localId, activeFollowerId, inactiveFollowerId);
|
||||
LeaderState<?> state = newLeaderState(voterSet, leaderStartOffset);
|
||||
assertFalse(state.updateLocalState(new LogOffsetMetadata(leaderEndOffset), voterSet));
|
||||
assertEquals(Optional.empty(), state.highWatermark());
|
||||
|
||||
long activeFollowerFetchTimeMs = time.milliseconds();
|
||||
|
|
@ -412,8 +545,9 @@ public class LeaderStateTest {
|
|||
int follower2,
|
||||
long leaderStartOffset,
|
||||
long leaderEndOffset) {
|
||||
LeaderState<?> state = newLeaderState(mkSet(localId, follower1, follower2), leaderStartOffset);
|
||||
state.updateLocalState(new LogOffsetMetadata(leaderEndOffset));
|
||||
Set<Integer> voterSet = mkSet(localId, follower1, follower2);
|
||||
LeaderState<?> state = newLeaderState(voterSet, leaderStartOffset);
|
||||
state.updateLocalState(new LogOffsetMetadata(leaderEndOffset), voterSet);
|
||||
assertEquals(Optional.empty(), state.highWatermark());
|
||||
state.updateReplicaState(follower1, 0, new LogOffsetMetadata(leaderStartOffset));
|
||||
state.updateReplicaState(follower2, 0, new LogOffsetMetadata(leaderEndOffset));
|
||||
|
|
@ -426,8 +560,9 @@ public class LeaderStateTest {
|
|||
int observerId = 10;
|
||||
long epochStartOffset = 10L;
|
||||
|
||||
LeaderState<?> state = newLeaderState(mkSet(localId), epochStartOffset);
|
||||
assertTrue(state.updateLocalState(new LogOffsetMetadata(epochStartOffset + 1)));
|
||||
Set<Integer> voterSet = singleton(localId);
|
||||
LeaderState<?> state = newLeaderState(voterSet, epochStartOffset);
|
||||
assertTrue(state.updateLocalState(new LogOffsetMetadata(epochStartOffset + 1), voterSet));
|
||||
assertEquals(Optional.of(new LogOffsetMetadata(epochStartOffset + 1)), state.highWatermark());
|
||||
|
||||
time.sleep(500);
|
||||
|
|
@ -455,6 +590,110 @@ public class LeaderStateTest {
|
|||
observerState);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDescribeQuorumWithVotersAndObservers() {
|
||||
MockTime time = new MockTime();
|
||||
int leader = localId;
|
||||
int node1 = 1;
|
||||
int node2 = 2;
|
||||
long epochStartOffset = 10L;
|
||||
|
||||
Set<Integer> voterSet = mkSet(leader, node1, node2);
|
||||
LeaderState<?> state = newLeaderState(voterSet, epochStartOffset);
|
||||
assertFalse(state.updateLocalState(new LogOffsetMetadata(epochStartOffset + 1), voterSet));
|
||||
assertTrue(state.updateReplicaState(node2, 0, new LogOffsetMetadata(epochStartOffset + 1)));
|
||||
assertEquals(Optional.of(new LogOffsetMetadata(epochStartOffset + 1)), state.highWatermark());
|
||||
|
||||
// node1 becomes an observer
|
||||
long fetchTimeMs = time.milliseconds();
|
||||
assertFalse(state.updateReplicaState(node1, fetchTimeMs, new LogOffsetMetadata(epochStartOffset + 1)));
|
||||
Set<Integer> voterSetWithoutNode1 = mkSet(leader, node2);
|
||||
state.updateLocalState(new LogOffsetMetadata(epochStartOffset + 5), voterSetWithoutNode1);
|
||||
|
||||
|
||||
time.sleep(500);
|
||||
DescribeQuorumResponseData.PartitionData partitionData = state.describeQuorum(time.milliseconds());
|
||||
assertEquals(epochStartOffset + 1, partitionData.highWatermark());
|
||||
assertEquals(localId, partitionData.leaderId());
|
||||
assertEquals(epoch, partitionData.leaderEpoch());
|
||||
DescribeQuorumResponseData.ReplicaState observer = partitionData.observers().get(0);
|
||||
assertEquals(node1, observer.replicaId());
|
||||
assertEquals(epochStartOffset + 1, observer.logEndOffset());
|
||||
assertEquals(2, partitionData.currentVoters().size());
|
||||
|
||||
// node1 catches up with leader, HW should not change
|
||||
time.sleep(500);
|
||||
fetchTimeMs = time.milliseconds();
|
||||
assertFalse(state.updateReplicaState(node1, fetchTimeMs, new LogOffsetMetadata(epochStartOffset + 5)));
|
||||
assertEquals(Optional.of(new LogOffsetMetadata(epochStartOffset + 1)), state.highWatermark());
|
||||
|
||||
// node1 becomes a voter again, HW should change
|
||||
assertTrue(state.updateLocalState(new LogOffsetMetadata(epochStartOffset + 10), voterSet));
|
||||
|
||||
time.sleep(500);
|
||||
partitionData = state.describeQuorum(time.milliseconds());
|
||||
assertEquals(epochStartOffset + 5, partitionData.highWatermark());
|
||||
assertEquals(localId, partitionData.leaderId());
|
||||
assertEquals(epoch, partitionData.leaderEpoch());
|
||||
assertEquals(Collections.emptyList(), partitionData.observers());
|
||||
assertEquals(3, partitionData.currentVoters().size());
|
||||
DescribeQuorumResponseData.ReplicaState node1State = partitionData.currentVoters().stream()
|
||||
.filter(replicaState -> replicaState.replicaId() == node1)
|
||||
.findFirst().get();
|
||||
assertEquals(epochStartOffset + 5, node1State.logEndOffset());
|
||||
assertEquals(fetchTimeMs, node1State.lastFetchTimestamp());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClearInactiveObserversIgnoresLeader() {
|
||||
MockTime time = new MockTime();
|
||||
int followerId = 1;
|
||||
int observerId = 10;
|
||||
long epochStartOffset = 10L;
|
||||
|
||||
Set<Integer> voterSet = mkSet(localId, followerId);
|
||||
LeaderState<?> state = newLeaderState(voterSet, epochStartOffset);
|
||||
assertFalse(state.updateLocalState(new LogOffsetMetadata(epochStartOffset + 1), voterSet));
|
||||
assertTrue(state.updateReplicaState(followerId, time.milliseconds(), new LogOffsetMetadata(epochStartOffset + 1)));
|
||||
|
||||
// observer is returned since its lastFetchTimestamp is within OBSERVER_SESSION_TIMEOUT_MS
|
||||
time.sleep(500);
|
||||
long observerFetchTimeMs = time.milliseconds();
|
||||
assertFalse(state.updateReplicaState(observerId, observerFetchTimeMs, new LogOffsetMetadata(epochStartOffset + 1)));
|
||||
|
||||
time.sleep(500);
|
||||
DescribeQuorumResponseData.PartitionData partitionData = state.describeQuorum(time.milliseconds());
|
||||
assertEquals(epochStartOffset + 1, partitionData.highWatermark());
|
||||
assertEquals(localId, partitionData.leaderId());
|
||||
assertEquals(2, partitionData.currentVoters().size());
|
||||
assertEquals(1, partitionData.observers().size());
|
||||
assertEquals(observerId, partitionData.observers().get(0).replicaId());
|
||||
|
||||
// observer is not returned once its lastFetchTimestamp surpasses OBSERVER_SESSION_TIMEOUT_MS
|
||||
time.sleep(LeaderState.OBSERVER_SESSION_TIMEOUT_MS);
|
||||
partitionData = state.describeQuorum(time.milliseconds());
|
||||
assertEquals(epochStartOffset + 1, partitionData.highWatermark());
|
||||
assertEquals(localId, partitionData.leaderId());
|
||||
assertEquals(2, partitionData.currentVoters().size());
|
||||
assertEquals(0, partitionData.observers().size());
|
||||
|
||||
// leader becomes observer
|
||||
Set<Integer> voterSetWithoutLeader = singleton(followerId);
|
||||
assertFalse(state.updateLocalState(new LogOffsetMetadata(epochStartOffset + 10), voterSetWithoutLeader));
|
||||
|
||||
// leader should be returned in describe quorum output
|
||||
time.sleep(LeaderState.OBSERVER_SESSION_TIMEOUT_MS);
|
||||
long describeQuorumCalledTime = time.milliseconds();
|
||||
partitionData = state.describeQuorum(describeQuorumCalledTime);
|
||||
assertEquals(epochStartOffset + 1, partitionData.highWatermark());
|
||||
assertEquals(localId, partitionData.leaderId());
|
||||
assertEquals(1, partitionData.currentVoters().size());
|
||||
assertEquals(1, partitionData.observers().size());
|
||||
DescribeQuorumResponseData.ReplicaState observer = partitionData.observers().get(0);
|
||||
assertEquals(localId, observer.replicaId());
|
||||
assertEquals(describeQuorumCalledTime, observer.lastFetchTimestamp());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCheckQuorum() {
|
||||
int node1 = 1;
|
||||
|
|
|
|||
|
|
@ -147,7 +147,7 @@ public class KafkaRaftMetricsTest {
|
|||
assertEquals((double) 1, getMetric(metrics, "current-epoch").metricValue());
|
||||
assertEquals((double) -1L, getMetric(metrics, "high-watermark").metricValue());
|
||||
|
||||
state.leaderStateOrThrow().updateLocalState(new LogOffsetMetadata(5L));
|
||||
state.leaderStateOrThrow().updateLocalState(new LogOffsetMetadata(5L), voters.voterIds());
|
||||
state.leaderStateOrThrow().updateReplicaState(1, 0, new LogOffsetMetadata(5L));
|
||||
assertEquals((double) 5L, getMetric(metrics, "high-watermark").metricValue());
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue