KAFKA-12161; Support raft observers with optional id (#9871)

We would like to be able to use `KafkaRaftClient` for tooling/debugging use cases. For this, we need the localId to be optional so that the client can be used more like a consumer. This is already supported in the `Fetch` protocol by setting `replicaId=-1`, which the Raft implementation checks for. We just need to alter `QuorumState` so that the `localId` is optional. The main benefit of doing this is that it saves tools the need to generate an arbitrary id (which might cause conflicts given limited Int32 space) and it lets the leader avoid any local state for these observers (such as `ReplicaState` inside `LeaderState`).

Reviewers: Ismael Juma <ismael@juma.me.uk>, Boyang Chen <boyang@confluent.io>
This commit is contained in:
Jason Gustafson 2021-01-15 14:10:17 -08:00 committed by GitHub
parent b5c107363f
commit 7ac06065f1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 232 additions and 117 deletions

View File

@ -16,6 +16,12 @@
*/
package kafka.raft
import java.util.OptionalInt
import java.io.File
import java.nio.file.Files
import java.util.concurrent.CompletableFuture
import kafka.log.{Log, LogConfig, LogManager}
import kafka.raft.KafkaRaftManager.RaftIoThread
import kafka.server.{BrokerTopicStats, KafkaConfig, KafkaServer, LogDirFailureChannel}
@ -31,9 +37,6 @@ import org.apache.kafka.common.security.JaasContext
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.raft.{FileBasedStateStore, KafkaRaftClient, RaftClient, RaftConfig, RaftRequest, RecordSerde}
import java.io.File
import java.nio.file.Files
import java.util.concurrent.CompletableFuture
import scala.jdk.CollectionConverters._
object KafkaRaftManager {
@ -166,20 +169,20 @@ class KafkaRaftManager[T](
}
private def buildRaftClient(): KafkaRaftClient[T] = {
val expirationTimer = new SystemTimer("raft-expiration-executor")
val expirationService = new TimingWheelExpirationService(expirationTimer)
val quorumStateStore = new FileBasedStateStore(new File(dataDir, "quorum-state"))
new KafkaRaftClient(
recordSerde,
netChannel,
metadataLog,
new FileBasedStateStore(new File(dataDir, "quorum-state")),
quorumStateStore,
time,
metrics,
expirationService,
logContext,
nodeId
OptionalInt.of(nodeId)
)
}

View File

@ -20,9 +20,9 @@ import java.util.concurrent.CompletableFuture
import kafka.raft.KafkaRaftManager.RaftIoThread
import org.apache.kafka.raft.KafkaRaftClient
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.Assertions._
import org.mockito.Mockito._
import org.junit.Test
class RaftManagerTest {

View File

@ -141,7 +141,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
private final LogContext logContext;
private final Time time;
private final int fetchMaxWaitMs;
private final int nodeId;
private final OptionalInt nodeId;
private final NetworkChannel channel;
private final ReplicatedLog log;
private final Random random;
@ -162,6 +162,12 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
private KafkaRaftMetrics kafkaRaftMetrics;
private RaftConfig raftConfig;
/**
* Create a new instance.
*
* Note that if the node ID is empty, then the the client will behave as a
* non-participating observer.
*/
public KafkaRaftClient(
RecordSerde<T> serde,
NetworkChannel channel,
@ -171,7 +177,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
Metrics metrics,
ExpirationService expirationService,
LogContext logContext,
int nodeId
OptionalInt nodeId
) {
this(serde,
channel,
@ -188,7 +194,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
new Random());
}
public KafkaRaftClient(
KafkaRaftClient(
RecordSerde<T> serde,
NetworkChannel channel,
RaftMessageQueue messageQueue,
@ -199,7 +205,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
Metrics metrics,
ExpirationService expirationService,
int fetchMaxWaitMs,
int nodeId,
OptionalInt nodeId,
LogContext logContext,
Random random
) {
@ -515,7 +521,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
log.topicPartition(),
partitionLevelError,
quorum.epoch(),
quorum.leaderIdOrNil(),
quorum.leaderIdOrSentinel(),
voteGranted);
}
@ -687,7 +693,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
log.topicPartition(),
partitionLevelError,
quorum.epoch(),
quorum.leaderIdOrNil());
quorum.leaderIdOrSentinel());
}
/**
@ -770,7 +776,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
log.topicPartition(),
partitionLevelError,
quorum.epoch(),
quorum.leaderIdOrNil());
quorum.leaderIdOrSentinel());
}
/**
@ -823,7 +829,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
// election backoff time based on strict exponential mechanism so that the most up-to-date
// voter has a higher chance to be elected. If the node's priority is highest, become
// candidate immediately instead of waiting for next poll.
int position = preferredSuccessors.indexOf(quorum.localId);
int position = preferredSuccessors.indexOf(quorum.localIdOrThrow());
if (position <= 0) {
return 0;
} else {
@ -882,7 +888,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
partitionData.currentLeader()
.setLeaderEpoch(quorum.epoch())
.setLeaderId(quorum.leaderIdOrNil());
.setLeaderId(quorum.leaderIdOrSentinel());
divergingEpoch.ifPresent(partitionData::setDivergingEpoch);
});
@ -1073,7 +1079,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
log.truncateToEndOffset(divergingOffsetAndEpoch).ifPresent(truncationOffset -> {
logger.info("Truncated to offset {} from Fetch response from leader {}",
truncationOffset, quorum.leaderIdOrNil());
truncationOffset, quorum.leaderIdOrSentinel());
});
} else if (partitionResponse.snapshotId().epoch() >= 0 ||
partitionResponse.snapshotId().endOffset() >= 0) {
@ -1357,7 +1363,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
private boolean hasConsistentLeader(int epoch, OptionalInt leaderId) {
// Only elected leaders are sent in the request/response header, so if we have an elected
// leaderId, it should be consistent with what is in the message.
if (leaderId.isPresent() && leaderId.getAsInt() == quorum.localId) {
if (leaderId.isPresent() && leaderId.getAsInt() == quorum.localIdOrSentinel()) {
// The response indicates that we should be the leader, so we verify that is the case
return quorum.isLeader();
} else {
@ -1670,7 +1676,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
return EndQuorumEpochRequest.singletonRequest(
log.topicPartition(),
quorum.epoch(),
quorum.localId,
quorum.localIdOrThrow(),
state.preferredSuccessors()
);
}
@ -1694,7 +1700,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
return BeginQuorumEpochRequest.singletonRequest(
log.topicPartition(),
quorum.epoch(),
quorum.localId
quorum.localIdOrThrow()
);
}
@ -1703,7 +1709,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
return VoteRequest.singletonRequest(
log.topicPartition(),
quorum.epoch(),
quorum.localId,
quorum.localIdOrThrow(),
endOffset.epoch,
endOffset.offset
);
@ -1718,7 +1724,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
});
return request
.setMaxWaitMs(fetchMaxWaitMs)
.setReplicaId(quorum.localId);
.setReplicaId(quorum.localIdOrSentinel());
}
private long maybeSendAnyVoterFetch(long currentTimeMs) {
@ -1749,7 +1755,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
}
);
return request.setReplicaId(quorum.localId);
return request.setReplicaId(quorum.localIdOrSentinel());
}
private FetchSnapshotResponseData.PartitionSnapshot addQuorumLeader(
@ -1757,7 +1763,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
) {
partitionSnapshot.currentLeader()
.setLeaderEpoch(quorum.epoch())
.setLeaderId(quorum.leaderIdOrNil());
.setLeaderId(quorum.leaderIdOrSentinel());
return partitionSnapshot;
}

View File

@ -73,7 +73,7 @@ import java.util.stream.Collectors;
*
*/
public class QuorumState {
public final int localId;
private final OptionalInt localId;
private final Time time;
private final Logger log;
private final QuorumStateStore store;
@ -84,7 +84,7 @@ public class QuorumState {
private volatile EpochState state;
public QuorumState(int localId,
public QuorumState(OptionalInt localId,
Set<Integer> voters,
int electionTimeoutMs,
int fetchTimeoutMs,
@ -125,9 +125,16 @@ public class QuorumState {
final EpochState initialState;
if (!election.voters().isEmpty() && !voters.equals(election.voters())) {
throw new IllegalStateException("Configured voter set: " + voters
+ " is different from the voter set read from the state file: " + election.voters() +
". Check if the quorum configuration is up to date, " +
"or wipe out the local state file if necessary");
+ " is different from the voter set read from the state file: " + election.voters()
+ ". Check if the quorum configuration is up to date, "
+ "or wipe out the local state file if necessary");
} else if (election.hasVoted() && !isVoter()) {
String localIdDescription = localId.isPresent() ?
localId.getAsInt() + " is not a voter" :
"is undefined";
throw new IllegalStateException("Initialized quorum state " + election
+ " with a voted candidate, which indicates this node was previously "
+ " a voter, but the local id " + localIdDescription);
} else if (election.epoch < logEndOffsetAndEpoch.epoch) {
log.warn("Epoch from quorum-state file is {}, which is " +
"smaller than last written epoch {} in the log",
@ -139,7 +146,7 @@ public class QuorumState {
Optional.empty(),
randomElectionTimeoutMs()
);
} else if (election.isLeader(localId)) {
} else if (localId.isPresent() && election.isLeader(localId.getAsInt())) {
// If we were previously a leader, then we will start out as resigned
// in the same epoch. This serves two purposes:
// 1. It ensures that we cannot vote for another leader in the same epoch.
@ -148,16 +155,16 @@ public class QuorumState {
// is lost after restarting.
initialState = new ResignedState(
time,
localId,
localId.getAsInt(),
election.epoch,
voters,
randomElectionTimeoutMs(),
Collections.emptyList()
);
} else if (election.isVotedCandidate(localId)) {
} else if (localId.isPresent() && election.isVotedCandidate(localId.getAsInt())) {
initialState = new CandidateState(
time,
localId,
localId.getAsInt(),
election.epoch,
voters,
Optional.empty(),
@ -196,14 +203,22 @@ public class QuorumState {
}
public Set<Integer> remoteVoters() {
return voters.stream().filter(voterId -> voterId != localId).collect(Collectors.toSet());
return voters.stream().filter(voterId -> voterId != localIdOrSentinel()).collect(Collectors.toSet());
}
public int localIdOrSentinel() {
return localId.orElse(-1);
}
public int localIdOrThrow() {
return localId.orElseThrow(() -> new IllegalStateException("Required local id is not present"));
}
public int epoch() {
return state.epoch();
}
public int leaderIdOrNil() {
public int leaderIdOrSentinel() {
return leaderId().orElse(-1);
}
@ -212,6 +227,7 @@ public class QuorumState {
}
public OptionalInt leaderId() {
ElectionState election = state.election();
if (election.hasLeader())
return OptionalInt.of(state.election().leaderId());
@ -224,11 +240,11 @@ public class QuorumState {
}
public boolean hasRemoteLeader() {
return hasLeader() && leaderIdOrNil() != localId;
return hasLeader() && leaderIdOrSentinel() != localIdOrSentinel();
}
public boolean isVoter() {
return voters.contains(localId);
return localId.isPresent() && voters.contains(localId.getAsInt());
}
public boolean isVoter(int nodeId) {
@ -249,7 +265,7 @@ public class QuorumState {
int epoch = state.epoch();
this.state = new ResignedState(
time,
localId,
localIdOrThrow(),
epoch,
voters,
randomElectionTimeoutMs(),
@ -301,7 +317,7 @@ public class QuorumState {
int epoch,
int candidateId
) throws IOException {
if (candidateId == localId) {
if (localId.isPresent() && candidateId == localId.getAsInt()) {
throw new IllegalStateException("Cannot transition to Voted with votedId=" + candidateId +
" and epoch=" + epoch + " since it matches the local broker.id");
} else if (isObserver()) {
@ -341,7 +357,7 @@ public class QuorumState {
int epoch,
int leaderId
) throws IOException {
if (leaderId == localId) {
if (localId.isPresent() && leaderId == localId.getAsInt()) {
throw new IllegalStateException("Cannot transition to Follower with leaderId=" + leaderId +
" and epoch=" + epoch + " since it matches the local broker.id=" + localId);
} else if (!isVoter(leaderId)) {
@ -384,7 +400,7 @@ public class QuorumState {
transitionTo(new CandidateState(
time,
localId,
localIdOrThrow(),
newEpoch,
voters,
state.highWatermark(),
@ -417,7 +433,7 @@ public class QuorumState {
// we typically expect the state machine to be caught up anyway.
transitionTo(new LeaderState(
localId,
localIdOrThrow(),
epoch(),
epochStartOffset,
voters,

View File

@ -86,7 +86,7 @@ public class KafkaRaftMetrics implements AutoCloseable {
this.currentVotedIdMetricName = metrics.metricName("current-vote", metricGroupName, "The current voted leader's id; -1 indicates not voted for anyone");
metrics.addMetric(this.currentVotedIdMetricName, (mConfig, currentTimeMs) -> {
if (state.isLeader() || state.isCandidate()) {
return state.localId;
return state.localIdOrThrow();
} else if (state.isVoted()) {
return state.votedStateOrThrow().votedId();
} else {

View File

@ -404,7 +404,7 @@ final public class KafkaRaftClientSnapshotTest {
context.pollUntilRequest();
context.assertSentVoteRequest(epoch + 1, 0, 0L, 1);
context.assertVotedCandidate(epoch + 1, context.localId);
context.assertVotedCandidate(epoch + 1, localId);
}
@Test
@ -955,7 +955,7 @@ final public class KafkaRaftClientSnapshotTest {
context.pollUntilRequest();
context.assertSentVoteRequest(epoch + 1, 0, 0L, 1);
context.assertVotedCandidate(epoch + 1, context.localId);
context.assertVotedCandidate(epoch + 1, localId);
// Send the response late
context.deliverResponse(
@ -981,7 +981,7 @@ final public class KafkaRaftClientSnapshotTest {
// Assert that the response is ignored and the replicas stays as a candidate
context.client.poll();
context.assertVotedCandidate(epoch + 1, context.localId);
context.assertVotedCandidate(epoch + 1, localId);
}
private static FetchSnapshotRequestData fetchSnapshotRequest(

View File

@ -67,7 +67,7 @@ public class KafkaRaftClientTest {
public void testInitializeSingleMemberQuorum() throws IOException {
int localId = 0;
RaftClientTestContext context = new RaftClientTestContext.Builder(localId, Collections.singleton(localId)).build();
context.assertElectedLeader(1, context.localId);
context.assertElectedLeader(1, localId);
}
@Test
@ -83,9 +83,9 @@ public class KafkaRaftClientTest {
assertEquals(1L, context.log.endOffset().offset);
assertEquals(initialEpoch + 1, context.log.lastFetchedEpoch());
assertEquals(new LeaderAndEpoch(OptionalInt.of(context.localId), initialEpoch + 1),
assertEquals(new LeaderAndEpoch(OptionalInt.of(localId), initialEpoch + 1),
context.currentLeaderAndEpoch());
context.assertElectedLeader(initialEpoch + 1, context.localId);
context.assertElectedLeader(initialEpoch + 1, localId);
}
@Test
@ -270,14 +270,14 @@ public class KafkaRaftClientTest {
context.time.sleep(2 * context.electionTimeoutMs);
context.pollUntilRequest();
context.assertVotedCandidate(1, context.localId);
context.assertVotedCandidate(1, localId);
int correlationId = context.assertSentVoteRequest(1, 0, 0L, 1);
context.deliverResponse(correlationId, otherNodeId, context.voteResponse(true, Optional.empty(), 1));
// Become leader after receiving the vote
context.client.poll();
context.assertElectedLeader(1, context.localId);
context.assertElectedLeader(1, localId);
long electionTimestamp = context.time.milliseconds();
// Leader change record appended
@ -294,8 +294,8 @@ public class KafkaRaftClientTest {
Record record = batch.iterator().next();
assertEquals(electionTimestamp, record.timestamp());
RaftClientTestContext.verifyLeaderChangeMessage(context.localId, Arrays.asList(otherNodeId, context.localId),
Arrays.asList(otherNodeId, context.localId), record.key(), record.value());
RaftClientTestContext.verifyLeaderChangeMessage(localId, Arrays.asList(otherNodeId, localId),
Arrays.asList(otherNodeId, localId), record.key(), record.value());
}
@Test
@ -310,14 +310,14 @@ public class KafkaRaftClientTest {
context.time.sleep(2 * context.electionTimeoutMs);
context.pollUntilRequest();
context.assertVotedCandidate(1, context.localId);
context.assertVotedCandidate(1, localId);
int correlationId = context.assertSentVoteRequest(1, 0, 0L, 2);
context.deliverResponse(correlationId, firstNodeId, context.voteResponse(true, Optional.empty(), 1));
// Become leader after receiving the vote
context.client.poll();
context.assertElectedLeader(1, context.localId);
context.assertElectedLeader(1, localId);
long electionTimestamp = context.time.milliseconds();
// Leader change record appended
@ -334,8 +334,8 @@ public class KafkaRaftClientTest {
Record record = batch.iterator().next();
assertEquals(electionTimestamp, record.timestamp());
RaftClientTestContext.verifyLeaderChangeMessage(context.localId, Arrays.asList(firstNodeId, secondNodeId, context.localId),
Arrays.asList(firstNodeId, context.localId), record.key(), record.value());
RaftClientTestContext.verifyLeaderChangeMessage(localId, Arrays.asList(firstNodeId, secondNodeId, localId),
Arrays.asList(firstNodeId, localId), record.key(), record.value());
}
@Test
@ -395,7 +395,7 @@ public class KafkaRaftClientTest {
context.assertVotedCandidate(epoch, localId);
context.deliverRequest(context.endEpochRequest(epoch - 2, otherNodeId,
Collections.singletonList(context.localId)));
Collections.singletonList(localId)));
context.client.poll();
context.assertSentEndQuorumEpochResponse(Errors.FENCED_LEADER_EPOCH, epoch, OptionalInt.empty());
@ -403,17 +403,17 @@ public class KafkaRaftClientTest {
// We should still be candidate until expiration of election timeout
context.time.sleep(context.electionTimeoutMs + jitterMs - 1);
context.client.poll();
context.assertVotedCandidate(epoch, context.localId);
context.assertVotedCandidate(epoch, localId);
// Enter the backoff period
context.time.sleep(1);
context.client.poll();
context.assertVotedCandidate(epoch, context.localId);
context.assertVotedCandidate(epoch, localId);
// After backoff, we will become a candidate again
context.time.sleep(context.electionBackoffMaxMs);
context.client.poll();
context.assertVotedCandidate(epoch + 1, context.localId);
context.assertVotedCandidate(epoch + 1, localId);
}
@Test
@ -427,15 +427,15 @@ public class KafkaRaftClientTest {
RaftClientTestContext context = RaftClientTestContext.initializeAsLeader(localId, voters, epoch);
// One of the voters may have sent EndQuorumEpoch from an earlier epoch
context.deliverRequest(context.endEpochRequest(epoch - 2, voter2, Arrays.asList(context.localId, voter3)));
context.deliverRequest(context.endEpochRequest(epoch - 2, voter2, Arrays.asList(localId, voter3)));
context.pollUntilResponse();
context.assertSentEndQuorumEpochResponse(Errors.FENCED_LEADER_EPOCH, epoch, OptionalInt.of(context.localId));
context.assertSentEndQuorumEpochResponse(Errors.FENCED_LEADER_EPOCH, epoch, OptionalInt.of(localId));
// We should still be leader as long as fetch timeout has not expired
context.time.sleep(context.fetchTimeoutMs - 1);
context.client.poll();
context.assertElectedLeader(epoch, context.localId);
context.assertElectedLeader(epoch, localId);
}
@Test
@ -451,14 +451,14 @@ public class KafkaRaftClientTest {
.build();
context.deliverRequest(context.endEpochRequest(epoch, voter2,
Arrays.asList(context.localId, voter3)));
Arrays.asList(localId, voter3)));
context.pollUntilResponse();
context.assertSentEndQuorumEpochResponse(Errors.NONE, epoch, OptionalInt.of(voter2));
// Should become a candidate immediately
context.client.poll();
context.assertVotedCandidate(epoch + 1, context.localId);
context.assertVotedCandidate(epoch + 1, localId);
}
@Test
@ -629,13 +629,13 @@ public class KafkaRaftClientTest {
.build();
context.deliverRequest(context.endEpochRequest(leaderEpoch, oldLeaderId,
Collections.singletonList(context.localId)));
Collections.singletonList(localId)));
context.pollUntilResponse();
context.assertSentEndQuorumEpochResponse(Errors.NONE, leaderEpoch, OptionalInt.of(oldLeaderId));
context.client.poll();
context.assertVotedCandidate(leaderEpoch + 1, context.localId);
context.assertVotedCandidate(leaderEpoch + 1, localId);
}
@Test
@ -651,7 +651,7 @@ public class KafkaRaftClientTest {
.build();
context.deliverRequest(context.endEpochRequest(leaderEpoch, oldLeaderId,
Arrays.asList(preferredNextLeader, context.localId)));
Arrays.asList(preferredNextLeader, localId)));
context.pollUntilResponse();
context.assertSentEndQuorumEpochResponse(Errors.NONE, leaderEpoch, OptionalInt.of(oldLeaderId));
@ -671,7 +671,7 @@ public class KafkaRaftClientTest {
assertEquals(2, voteRequests.size());
// Should have already done self-voting
context.assertVotedCandidate(leaderEpoch + 1, context.localId);
context.assertVotedCandidate(leaderEpoch + 1, localId);
}
@Test
@ -686,7 +686,7 @@ public class KafkaRaftClientTest {
context.time.sleep(2 * context.electionTimeoutMs);
context.pollUntilRequest();
context.assertVotedCandidate(epoch, context.localId);
context.assertVotedCandidate(epoch, localId);
int correlationId = context.assertSentVoteRequest(epoch, 0, 0L, 1);
@ -697,12 +697,12 @@ public class KafkaRaftClientTest {
// We will ignore the timed out response if it arrives late
context.deliverResponse(correlationId, otherNodeId, context.voteResponse(true, Optional.empty(), 1));
context.client.poll();
context.assertVotedCandidate(epoch, context.localId);
context.assertVotedCandidate(epoch, localId);
// Become leader after receiving the retry response
context.deliverResponse(retryCorrelationId, otherNodeId, context.voteResponse(true, Optional.empty(), 1));
context.client.poll();
context.assertElectedLeader(epoch, context.localId);
context.assertElectedLeader(epoch, localId);
}
@Test
@ -813,8 +813,8 @@ public class KafkaRaftClientTest {
context.client.poll();
context.assertSentVoteResponse(Errors.NONE, leaderEpoch, OptionalInt.of(context.localId), false);
context.assertElectedLeader(leaderEpoch, context.localId);
context.assertSentVoteResponse(Errors.NONE, leaderEpoch, OptionalInt.of(localId), false);
context.assertElectedLeader(leaderEpoch, localId);
}
@Test
@ -871,7 +871,7 @@ public class KafkaRaftClientTest {
context.deliverRequest(context.voteRequest(leaderEpoch, otherNodeId, leaderEpoch - 1, 1));
context.client.poll();
context.assertSentVoteResponse(Errors.NONE, leaderEpoch, OptionalInt.empty(), false);
context.assertVotedCandidate(leaderEpoch, context.localId);
context.assertVotedCandidate(leaderEpoch, localId);
}
@Test
@ -892,7 +892,7 @@ public class KafkaRaftClientTest {
context.time.sleep(2 * context.electionTimeoutMs);
context.pollUntilRequest();
context.assertVotedCandidate(epoch, context.localId);
context.assertVotedCandidate(epoch, localId);
// Quorum size is two. If the other member rejects, then we need to schedule a revote.
int correlationId = context.assertSentVoteRequest(epoch, 0, 0L, 1);
@ -901,19 +901,19 @@ public class KafkaRaftClientTest {
context.client.poll();
// All nodes have rejected our candidacy, but we should still remember that we had voted
context.assertVotedCandidate(epoch, context.localId);
context.assertVotedCandidate(epoch, localId);
// Even though our candidacy was rejected, we will backoff for jitter period
// before we bump the epoch and start a new election.
context.time.sleep(context.electionBackoffMaxMs - 1);
context.client.poll();
context.assertVotedCandidate(epoch, context.localId);
context.assertVotedCandidate(epoch, localId);
// After jitter expires, we become a candidate again
context.time.sleep(1);
context.client.poll();
context.pollUntilRequest();
context.assertVotedCandidate(epoch + 1, context.localId);
context.assertVotedCandidate(epoch + 1, localId);
context.assertSentVoteRequest(epoch + 1, 0, 0L, 1);
}
@ -976,7 +976,7 @@ public class KafkaRaftClientTest {
context.pollUntilRequest();
context.assertSentVoteRequest(epoch + 1, lastEpoch, 1L, 1);
context.assertVotedCandidate(epoch + 1, context.localId);
context.assertVotedCandidate(epoch + 1, localId);
}
@Test
@ -1073,27 +1073,27 @@ public class KafkaRaftClientTest {
context.deliverRequest(context.fetchRequest(
epoch, otherNodeId, -5L, 0, 0));
context.pollUntilResponse();
context.assertSentFetchResponse(Errors.INVALID_REQUEST, epoch, OptionalInt.of(context.localId));
context.assertSentFetchResponse(Errors.INVALID_REQUEST, epoch, OptionalInt.of(localId));
context.deliverRequest(context.fetchRequest(
epoch, otherNodeId, 0L, -1, 0));
context.pollUntilResponse();
context.assertSentFetchResponse(Errors.INVALID_REQUEST, epoch, OptionalInt.of(context.localId));
context.assertSentFetchResponse(Errors.INVALID_REQUEST, epoch, OptionalInt.of(localId));
context.deliverRequest(context.fetchRequest(
epoch, otherNodeId, 0L, epoch + 1, 0));
context.pollUntilResponse();
context.assertSentFetchResponse(Errors.INVALID_REQUEST, epoch, OptionalInt.of(context.localId));
context.assertSentFetchResponse(Errors.INVALID_REQUEST, epoch, OptionalInt.of(localId));
context.deliverRequest(context.fetchRequest(
epoch + 1, otherNodeId, 0L, 0, 0));
context.pollUntilResponse();
context.assertSentFetchResponse(Errors.UNKNOWN_LEADER_EPOCH, epoch, OptionalInt.of(context.localId));
context.assertSentFetchResponse(Errors.UNKNOWN_LEADER_EPOCH, epoch, OptionalInt.of(localId));
context.deliverRequest(context.fetchRequest(
epoch, otherNodeId, 0L, 0, -1));
context.pollUntilResponse();
context.assertSentFetchResponse(Errors.INVALID_REQUEST, epoch, OptionalInt.of(context.localId));
context.assertSentFetchResponse(Errors.INVALID_REQUEST, epoch, OptionalInt.of(localId));
}
@Test
@ -1108,17 +1108,17 @@ public class KafkaRaftClientTest {
int nonVoterId = 2;
context.deliverRequest(context.voteRequest(epoch, nonVoterId, 0, 0));
context.client.poll();
context.assertSentVoteResponse(Errors.INCONSISTENT_VOTER_SET, epoch, OptionalInt.of(context.localId), false);
context.assertSentVoteResponse(Errors.INCONSISTENT_VOTER_SET, epoch, OptionalInt.of(localId), false);
context.deliverRequest(context.beginEpochRequest(epoch, nonVoterId));
context.client.poll();
context.assertSentBeginQuorumEpochResponse(Errors.INCONSISTENT_VOTER_SET, epoch, OptionalInt.of(context.localId));
context.assertSentBeginQuorumEpochResponse(Errors.INCONSISTENT_VOTER_SET, epoch, OptionalInt.of(localId));
context.deliverRequest(context.endEpochRequest(epoch, nonVoterId, Collections.singletonList(otherNodeId)));
context.client.poll();
// The sent request has no context.localId as a preferable voter.
context.assertSentEndQuorumEpochResponse(Errors.INCONSISTENT_VOTER_SET, epoch, OptionalInt.of(context.localId));
// The sent request has no localId as a preferable voter.
context.assertSentEndQuorumEpochResponse(Errors.INCONSISTENT_VOTER_SET, epoch, OptionalInt.of(localId));
}
@Test
@ -1167,7 +1167,7 @@ public class KafkaRaftClientTest {
// After expiration of the max wait time, the fetch returns an empty record set
context.time.sleep(maxWaitTimeMs);
context.client.poll();
MemoryRecords fetchedRecords = context.assertSentFetchResponse(Errors.NONE, epoch, OptionalInt.of(context.localId));
MemoryRecords fetchedRecords = context.assertSentFetchResponse(Errors.NONE, epoch, OptionalInt.of(localId));
assertEquals(0, fetchedRecords.sizeInBytes());
}
@ -1245,7 +1245,7 @@ public class KafkaRaftClientTest {
// Now await the fetch timeout and become a candidate
context.time.sleep(context.fetchTimeoutMs);
context.client.poll();
context.assertVotedCandidate(epoch + 1, context.localId);
context.assertVotedCandidate(epoch + 1, localId);
// The fetch response from the old leader returns, but it should be ignored
Records records = context.buildBatch(0L, 3, Arrays.asList("a", "b"));
@ -1254,7 +1254,7 @@ public class KafkaRaftClientTest {
context.client.poll();
assertEquals(0, context.log.endOffset().offset);
context.assertVotedCandidate(epoch + 1, context.localId);
context.assertVotedCandidate(epoch + 1, localId);
}
@Test
@ -1310,7 +1310,7 @@ public class KafkaRaftClientTest {
// Wait until the vote requests are inflight
context.pollUntilRequest();
context.assertVotedCandidate(epoch, context.localId);
context.assertVotedCandidate(epoch, localId);
List<RaftRequest.Outbound> voteRequests = context.collectVoteRequests(epoch, 0, 0);
assertEquals(2, voteRequests.size());
@ -1496,10 +1496,10 @@ public class KafkaRaftClientTest {
context.pollUntilResponse();
context.assertSentDescribeQuorumResponse(context.localId, epoch, highWatermark,
context.assertSentDescribeQuorumResponse(localId, epoch, highWatermark,
Arrays.asList(
new ReplicaState()
.setReplicaId(context.localId)
.setReplicaId(localId)
// As we are appending the records directly to the log,
// the leader end offset hasn't been updated yet.
.setLogEndOffset(3L),
@ -1602,7 +1602,7 @@ public class KafkaRaftClientTest {
int localId = 0;
RaftClientTestContext context = new RaftClientTestContext.Builder(localId, Collections.singleton(localId)).build();
context.assertElectedLeader(1, context.localId);
context.assertElectedLeader(1, localId);
context.client.poll();
assertEquals(0, context.channel.drainSendQueue().size());
int shutdownTimeoutMs = 5000;
@ -1708,7 +1708,7 @@ public class KafkaRaftClientTest {
context.client.poll();
// The BeginEpoch request eventually times out. We should not send another one.
context.assertSentFetchResponse(Errors.NONE, epoch, OptionalInt.of(context.localId));
context.assertSentFetchResponse(Errors.NONE, epoch, OptionalInt.of(localId));
context.time.sleep(context.requestTimeoutMs);
context.client.poll();
@ -1725,7 +1725,7 @@ public class KafkaRaftClientTest {
RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters).build();
long now = context.time.milliseconds();
context.assertElectedLeader(1, context.localId);
context.assertElectedLeader(1, localId);
// We still write the leader change message
assertEquals(OptionalLong.of(1L), context.client.highWatermark());
@ -1747,7 +1747,7 @@ public class KafkaRaftClientTest {
context.deliverRequest(context.fetchRequest(1, otherNodeId, 0L, 0, 500));
context.pollUntilResponse();
MemoryRecords fetchedRecords = context.assertSentFetchResponse(Errors.NONE, 1, OptionalInt.of(context.localId));
MemoryRecords fetchedRecords = context.assertSentFetchResponse(Errors.NONE, 1, OptionalInt.of(localId));
List<MutableRecordBatch> batches = Utils.toList(fetchedRecords.batchIterator());
assertEquals(2, batches.size());
@ -1758,8 +1758,8 @@ public class KafkaRaftClientTest {
Record record = readRecords.get(0);
assertEquals(now, record.timestamp());
RaftClientTestContext.verifyLeaderChangeMessage(context.localId, Collections.singletonList(context.localId),
Collections.singletonList(context.localId), record.key(), record.value());
RaftClientTestContext.verifyLeaderChangeMessage(localId, Collections.singletonList(localId),
Collections.singletonList(localId), record.key(), record.value());
MutableRecordBatch batch = batches.get(1);
assertEquals(1, batch.partitionLeaderEpoch());
@ -1829,8 +1829,8 @@ public class KafkaRaftClientTest {
assertNotNull(getMetric(context.metrics, "append-records-rate"));
assertEquals("leader", getMetric(context.metrics, "current-state").metricValue());
assertEquals((double) context.localId, getMetric(context.metrics, "current-leader").metricValue());
assertEquals((double) context.localId, getMetric(context.metrics, "current-vote").metricValue());
assertEquals((double) localId, getMetric(context.metrics, "current-leader").metricValue());
assertEquals((double) localId, getMetric(context.metrics, "current-vote").metricValue());
assertEquals((double) epoch, getMetric(context.metrics, "current-epoch").metricValue());
assertEquals((double) 1L, getMetric(context.metrics, "high-watermark").metricValue());
assertEquals((double) 1L, getMetric(context.metrics, "log-end-offset").metricValue());
@ -1911,7 +1911,7 @@ public class KafkaRaftClientTest {
// Sleep a little to ensure that we become a candidate
context.time.sleep(context.electionTimeoutMs * 2);
context.pollUntilRequest();
context.assertVotedCandidate(epoch, context.localId);
context.assertVotedCandidate(epoch, localId);
int correlationId = context.assertSentVoteRequest(epoch, 0, 0L, 1);
VoteResponseData response = new VoteResponseData()
@ -2157,7 +2157,7 @@ public class KafkaRaftClientTest {
context.deliverRequest(context.fetchRequest(epoch, otherNodeId, 9L, epoch, 500));
context.pollUntilResponse();
assertEquals(OptionalLong.of(9L), context.client.highWatermark());
context.assertSentFetchResponse(Errors.NONE, epoch, OptionalInt.of(context.localId));
context.assertSentFetchResponse(Errors.NONE, epoch, OptionalInt.of(localId));
// Now we receive a vote request which transitions us to the 'unattached' state
context.deliverRequest(context.voteRequest(epoch + 1, otherNodeId, epoch, 9L));
@ -2182,6 +2182,46 @@ public class KafkaRaftClientTest {
assertEquals(OptionalInt.empty(), secondListener.currentClaimedEpoch());
}
@Test
public void testObserverFetchWithNoLocalId() throws Exception {
// When no `localId` is defined, the client will behave as an observer.
// This is designed for tooling/debugging use cases.
Set<Integer> voters = Utils.mkSet(1, 2);
RaftClientTestContext context = new RaftClientTestContext.Builder(OptionalInt.empty(), voters)
.build();
// First fetch discovers the current leader and epoch
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest1 = context.assertSentFetchRequest();
assertTrue(voters.contains(fetchRequest1.destinationId()));
context.assertFetchRequestData(fetchRequest1, 0, 0L, 0);
int leaderEpoch = 5;
int leaderId = 1;
context.deliverResponse(fetchRequest1.correlationId, fetchRequest1.destinationId(),
context.fetchResponse(5, leaderId, MemoryRecords.EMPTY, 0L, Errors.FENCED_LEADER_EPOCH));
context.client.poll();
context.assertElectedLeader(leaderEpoch, leaderId);
// Second fetch goes to the discovered leader
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest2 = context.assertSentFetchRequest();
assertEquals(leaderId, fetchRequest2.destinationId());
context.assertFetchRequestData(fetchRequest2, leaderEpoch, 0L, 0);
List<String> records = Arrays.asList("a", "b", "c");
MemoryRecords batch1 = context.buildBatch(0L, 3, records);
context.deliverResponse(fetchRequest2.correlationId, fetchRequest2.destinationId(),
context.fetchResponse(leaderEpoch, leaderId, batch1, 0L, Errors.NONE));
context.client.poll();
assertEquals(3L, context.log.endOffset().offset);
assertEquals(3, context.log.lastFetchedEpoch());
}
private static KafkaMetric getMetric(final Metrics metrics, final String name) {
return metrics.metrics().get(metrics.metricName(name, "raft-metrics"));
}

View File

@ -46,6 +46,13 @@ public class QuorumStateTest {
private final Random random = Mockito.spy(new Random(1));
private QuorumState buildQuorumState(Set<Integer> voters) {
return buildQuorumState(OptionalInt.of(localId), voters);
}
private QuorumState buildQuorumState(
OptionalInt localId,
Set<Integer> voters
) {
return new QuorumState(
localId,
voters,
@ -1019,6 +1026,40 @@ public class QuorumStateTest {
assertEquals(Optional.empty(), state.highWatermark());
}
@Test
public void testInitializeWithEmptyLocalId() throws IOException {
QuorumState state = buildQuorumState(OptionalInt.empty(), Utils.mkSet(0, 1));
state.initialize(new OffsetAndEpoch(0L, 0));
assertTrue(state.isObserver());
assertFalse(state.isVoter());
assertThrows(IllegalStateException.class, state::transitionToCandidate);
assertThrows(IllegalStateException.class, () -> state.transitionToVoted(1, 1));
assertThrows(IllegalStateException.class, () -> state.transitionToLeader(0L));
state.transitionToFollower(1, 1);
assertTrue(state.isFollower());
state.transitionToUnattached(2);
assertTrue(state.isUnattached());
}
@Test
public void testObserverInitializationFailsIfElectionStateHasVotedCandidate() {
Set<Integer> voters = Utils.mkSet(0, 1);
int epoch = 5;
int votedId = 1;
store.writeElectionState(ElectionState.withVotedCandidate(epoch, votedId, voters));
QuorumState state1 = buildQuorumState(OptionalInt.of(2), voters);
assertThrows(IllegalStateException.class, () -> state1.initialize(new OffsetAndEpoch(0, 0)));
QuorumState state2 = buildQuorumState(OptionalInt.empty(), voters);
assertThrows(IllegalStateException.class, () -> state2.initialize(new OffsetAndEpoch(0, 0)));
}
private QuorumState initializeEmptyState(Set<Integer> voters) throws IOException {
QuorumState state = buildQuorumState(voters);
store.writeElectionState(ElectionState.withUnknownLeader(0, voters));

View File

@ -97,7 +97,7 @@ public final class RaftClientTestContext {
final int requestTimeoutMs;
private final QuorumStateStore quorumStateStore;
final int localId;
private final OptionalInt localId;
public final KafkaRaftClient<String> client;
final Metrics metrics;
public final MockLog log;
@ -127,7 +127,7 @@ public final class RaftClientTestContext {
private final Random random = Mockito.spy(new Random(1));
private final MockLog log = new MockLog(METADATA_PARTITION);
private final Set<Integer> voters;
private final int localId;
private final OptionalInt localId;
private int requestTimeoutMs = DEFAULT_REQUEST_TIMEOUT_MS;
private int electionTimeoutMs = DEFAULT_ELECTION_TIMEOUT_MS;
@ -135,6 +135,10 @@ public final class RaftClientTestContext {
private MemoryPool memoryPool = MemoryPool.NONE;
public Builder(int localId, Set<Integer> voters) {
this(OptionalInt.of(localId), voters);
}
public Builder(OptionalInt localId, Set<Integer> voters) {
this.voters = voters;
this.localId = localId;
}
@ -231,7 +235,7 @@ public final class RaftClientTestContext {
}
private RaftClientTestContext(
int localId,
OptionalInt localId,
KafkaRaftClient<String> client,
MockLog log,
MockNetworkChannel channel,
@ -344,7 +348,11 @@ public final class RaftClientTestContext {
}
client.poll();
assertElectedLeader(epoch, localId);
assertElectedLeader(epoch, localIdOrThrow());
}
private int localIdOrThrow() {
return localId.orElseThrow(() -> new AssertionError("Required local id is not defined"));
}
void expectBeginEpoch(
@ -352,7 +360,7 @@ public final class RaftClientTestContext {
) throws Exception {
pollUntilRequest();
for (RaftRequest.Outbound request : collectBeginEpochRequests(epoch)) {
BeginQuorumEpochResponseData beginEpochResponse = beginEpochResponse(epoch, localId);
BeginQuorumEpochResponseData beginEpochResponse = beginEpochResponse(epoch, localIdOrThrow());
deliverResponse(request.correlationId, request.destinationId(), beginEpochResponse);
}
client.poll();
@ -456,7 +464,7 @@ public final class RaftClientTestContext {
VoteRequestData.PartitionData partitionRequest = unwrap(request);
assertEquals(epoch, partitionRequest.candidateEpoch());
assertEquals(localId, partitionRequest.candidateId());
assertEquals(localIdOrThrow(), partitionRequest.candidateId());
assertEquals(lastEpoch, partitionRequest.lastOffsetEpoch());
assertEquals(lastEpochOffset, partitionRequest.lastOffset());
voteRequests.add((RaftRequest.Outbound) raftMessage);
@ -645,7 +653,7 @@ public final class RaftClientTestContext {
request.topics().get(0).partitions().get(0);
assertEquals(epoch, partitionRequest.leaderEpoch());
assertEquals(localId, partitionRequest.leaderId());
assertEquals(localIdOrThrow(), partitionRequest.leaderId());
RaftRequest.Outbound outboundRequest = (RaftRequest.Outbound) raftMessage;
collectedDestinationIdSet.add(outboundRequest.destinationId());
@ -681,7 +689,7 @@ public final class RaftClientTestContext {
request.topics().get(0).partitions().get(0);
assertEquals(epoch, partitionRequest.leaderEpoch());
assertEquals(localId, partitionRequest.leaderId());
assertEquals(localIdOrThrow(), partitionRequest.leaderId());
requests.add(raftRequest);
}
return requests;
@ -824,7 +832,7 @@ public final class RaftClientTestContext {
assertEquals(epoch, fetchPartition.currentLeaderEpoch());
assertEquals(fetchOffset, fetchPartition.fetchOffset());
assertEquals(lastFetchedEpoch, fetchPartition.lastFetchedEpoch());
assertEquals(localId, request.replicaId());
assertEquals(localId.orElse(-1), request.replicaId());
}
FetchRequestData fetchRequest(

View File

@ -771,7 +771,7 @@ public class RaftEventSimulationTest {
metrics,
new MockExpirationService(time),
FETCH_MAX_WAIT_MS,
nodeId,
OptionalInt.of(nodeId),
logContext,
random
);

View File

@ -32,6 +32,7 @@ import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.Collections;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Random;
import java.util.Set;
@ -59,7 +60,7 @@ public class KafkaRaftMetricsTest {
private QuorumState buildQuorumState(Set<Integer> voters) {
return new QuorumState(
localId,
OptionalInt.of(localId),
voters,
electionTimeoutMs,
fetchTimeoutMs,