From 7ac06065f1e25158688fc3be316ce9571dc51335 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Fri, 15 Jan 2021 14:10:17 -0800 Subject: [PATCH] KAFKA-12161; Support raft observers with optional id (#9871) We would like to be able to use `KafkaRaftClient` for tooling/debugging use cases. For this, we need the localId to be optional so that the client can be used more like a consumer. This is already supported in the `Fetch` protocol by setting `replicaId=-1`, which the Raft implementation checks for. We just need to alter `QuorumState` so that the `localId` is optional. The main benefit of doing this is that it saves tools the need to generate an arbitrary id (which might cause conflicts given limited Int32 space) and it lets the leader avoid any local state for these observers (such as `ReplicaState` inside `LeaderState`). Reviewers: Ismael Juma , Boyang Chen --- .../main/scala/kafka/raft/RaftManager.scala | 15 +- .../unit/kafka/raft/RaftManagerTest.scala | 2 +- .../apache/kafka/raft/KafkaRaftClient.java | 40 +++-- .../org/apache/kafka/raft/QuorumState.java | 52 ++++-- .../raft/internals/KafkaRaftMetrics.java | 2 +- .../raft/KafkaRaftClientSnapshotTest.java | 6 +- .../kafka/raft/KafkaRaftClientTest.java | 160 +++++++++++------- .../apache/kafka/raft/QuorumStateTest.java | 41 +++++ .../kafka/raft/RaftClientTestContext.java | 26 ++- .../kafka/raft/RaftEventSimulationTest.java | 2 +- .../raft/internals/KafkaRaftMetricsTest.java | 3 +- 11 files changed, 232 insertions(+), 117 deletions(-) diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala b/core/src/main/scala/kafka/raft/RaftManager.scala index c3c80334e55..fe199ad584e 100644 --- a/core/src/main/scala/kafka/raft/RaftManager.scala +++ b/core/src/main/scala/kafka/raft/RaftManager.scala @@ -16,6 +16,12 @@ */ package kafka.raft +import java.util.OptionalInt + +import java.io.File +import java.nio.file.Files +import java.util.concurrent.CompletableFuture + import kafka.log.{Log, LogConfig, LogManager} import kafka.raft.KafkaRaftManager.RaftIoThread import kafka.server.{BrokerTopicStats, KafkaConfig, KafkaServer, LogDirFailureChannel} @@ -31,9 +37,6 @@ import org.apache.kafka.common.security.JaasContext import org.apache.kafka.common.utils.{LogContext, Time} import org.apache.kafka.raft.{FileBasedStateStore, KafkaRaftClient, RaftClient, RaftConfig, RaftRequest, RecordSerde} -import java.io.File -import java.nio.file.Files -import java.util.concurrent.CompletableFuture import scala.jdk.CollectionConverters._ object KafkaRaftManager { @@ -166,20 +169,20 @@ class KafkaRaftManager[T]( } private def buildRaftClient(): KafkaRaftClient[T] = { - val expirationTimer = new SystemTimer("raft-expiration-executor") val expirationService = new TimingWheelExpirationService(expirationTimer) + val quorumStateStore = new FileBasedStateStore(new File(dataDir, "quorum-state")) new KafkaRaftClient( recordSerde, netChannel, metadataLog, - new FileBasedStateStore(new File(dataDir, "quorum-state")), + quorumStateStore, time, metrics, expirationService, logContext, - nodeId + OptionalInt.of(nodeId) ) } diff --git a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala index 99e0a9e1a21..b4c7d8945f8 100644 --- a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala +++ b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala @@ -20,9 +20,9 @@ import java.util.concurrent.CompletableFuture import kafka.raft.KafkaRaftManager.RaftIoThread import org.apache.kafka.raft.KafkaRaftClient -import org.junit.jupiter.api.Test import org.junit.jupiter.api.Assertions._ import org.mockito.Mockito._ +import org.junit.Test class RaftManagerTest { diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java index 9d4411d94b9..e4a0b3fd312 100644 --- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java @@ -141,7 +141,7 @@ public class KafkaRaftClient implements RaftClient { private final LogContext logContext; private final Time time; private final int fetchMaxWaitMs; - private final int nodeId; + private final OptionalInt nodeId; private final NetworkChannel channel; private final ReplicatedLog log; private final Random random; @@ -162,6 +162,12 @@ public class KafkaRaftClient implements RaftClient { private KafkaRaftMetrics kafkaRaftMetrics; private RaftConfig raftConfig; + /** + * Create a new instance. + * + * Note that if the node ID is empty, then the the client will behave as a + * non-participating observer. + */ public KafkaRaftClient( RecordSerde serde, NetworkChannel channel, @@ -171,7 +177,7 @@ public class KafkaRaftClient implements RaftClient { Metrics metrics, ExpirationService expirationService, LogContext logContext, - int nodeId + OptionalInt nodeId ) { this(serde, channel, @@ -188,7 +194,7 @@ public class KafkaRaftClient implements RaftClient { new Random()); } - public KafkaRaftClient( + KafkaRaftClient( RecordSerde serde, NetworkChannel channel, RaftMessageQueue messageQueue, @@ -199,7 +205,7 @@ public class KafkaRaftClient implements RaftClient { Metrics metrics, ExpirationService expirationService, int fetchMaxWaitMs, - int nodeId, + OptionalInt nodeId, LogContext logContext, Random random ) { @@ -515,7 +521,7 @@ public class KafkaRaftClient implements RaftClient { log.topicPartition(), partitionLevelError, quorum.epoch(), - quorum.leaderIdOrNil(), + quorum.leaderIdOrSentinel(), voteGranted); } @@ -687,7 +693,7 @@ public class KafkaRaftClient implements RaftClient { log.topicPartition(), partitionLevelError, quorum.epoch(), - quorum.leaderIdOrNil()); + quorum.leaderIdOrSentinel()); } /** @@ -770,7 +776,7 @@ public class KafkaRaftClient implements RaftClient { log.topicPartition(), partitionLevelError, quorum.epoch(), - quorum.leaderIdOrNil()); + quorum.leaderIdOrSentinel()); } /** @@ -823,7 +829,7 @@ public class KafkaRaftClient implements RaftClient { // election backoff time based on strict exponential mechanism so that the most up-to-date // voter has a higher chance to be elected. If the node's priority is highest, become // candidate immediately instead of waiting for next poll. - int position = preferredSuccessors.indexOf(quorum.localId); + int position = preferredSuccessors.indexOf(quorum.localIdOrThrow()); if (position <= 0) { return 0; } else { @@ -882,7 +888,7 @@ public class KafkaRaftClient implements RaftClient { partitionData.currentLeader() .setLeaderEpoch(quorum.epoch()) - .setLeaderId(quorum.leaderIdOrNil()); + .setLeaderId(quorum.leaderIdOrSentinel()); divergingEpoch.ifPresent(partitionData::setDivergingEpoch); }); @@ -1073,7 +1079,7 @@ public class KafkaRaftClient implements RaftClient { log.truncateToEndOffset(divergingOffsetAndEpoch).ifPresent(truncationOffset -> { logger.info("Truncated to offset {} from Fetch response from leader {}", - truncationOffset, quorum.leaderIdOrNil()); + truncationOffset, quorum.leaderIdOrSentinel()); }); } else if (partitionResponse.snapshotId().epoch() >= 0 || partitionResponse.snapshotId().endOffset() >= 0) { @@ -1357,7 +1363,7 @@ public class KafkaRaftClient implements RaftClient { private boolean hasConsistentLeader(int epoch, OptionalInt leaderId) { // Only elected leaders are sent in the request/response header, so if we have an elected // leaderId, it should be consistent with what is in the message. - if (leaderId.isPresent() && leaderId.getAsInt() == quorum.localId) { + if (leaderId.isPresent() && leaderId.getAsInt() == quorum.localIdOrSentinel()) { // The response indicates that we should be the leader, so we verify that is the case return quorum.isLeader(); } else { @@ -1670,7 +1676,7 @@ public class KafkaRaftClient implements RaftClient { return EndQuorumEpochRequest.singletonRequest( log.topicPartition(), quorum.epoch(), - quorum.localId, + quorum.localIdOrThrow(), state.preferredSuccessors() ); } @@ -1694,7 +1700,7 @@ public class KafkaRaftClient implements RaftClient { return BeginQuorumEpochRequest.singletonRequest( log.topicPartition(), quorum.epoch(), - quorum.localId + quorum.localIdOrThrow() ); } @@ -1703,7 +1709,7 @@ public class KafkaRaftClient implements RaftClient { return VoteRequest.singletonRequest( log.topicPartition(), quorum.epoch(), - quorum.localId, + quorum.localIdOrThrow(), endOffset.epoch, endOffset.offset ); @@ -1718,7 +1724,7 @@ public class KafkaRaftClient implements RaftClient { }); return request .setMaxWaitMs(fetchMaxWaitMs) - .setReplicaId(quorum.localId); + .setReplicaId(quorum.localIdOrSentinel()); } private long maybeSendAnyVoterFetch(long currentTimeMs) { @@ -1749,7 +1755,7 @@ public class KafkaRaftClient implements RaftClient { } ); - return request.setReplicaId(quorum.localId); + return request.setReplicaId(quorum.localIdOrSentinel()); } private FetchSnapshotResponseData.PartitionSnapshot addQuorumLeader( @@ -1757,7 +1763,7 @@ public class KafkaRaftClient implements RaftClient { ) { partitionSnapshot.currentLeader() .setLeaderEpoch(quorum.epoch()) - .setLeaderId(quorum.leaderIdOrNil()); + .setLeaderId(quorum.leaderIdOrSentinel()); return partitionSnapshot; } diff --git a/raft/src/main/java/org/apache/kafka/raft/QuorumState.java b/raft/src/main/java/org/apache/kafka/raft/QuorumState.java index dc4f81fa7d3..70c8ed0bda7 100644 --- a/raft/src/main/java/org/apache/kafka/raft/QuorumState.java +++ b/raft/src/main/java/org/apache/kafka/raft/QuorumState.java @@ -73,7 +73,7 @@ import java.util.stream.Collectors; * */ public class QuorumState { - public final int localId; + private final OptionalInt localId; private final Time time; private final Logger log; private final QuorumStateStore store; @@ -84,7 +84,7 @@ public class QuorumState { private volatile EpochState state; - public QuorumState(int localId, + public QuorumState(OptionalInt localId, Set voters, int electionTimeoutMs, int fetchTimeoutMs, @@ -125,9 +125,16 @@ public class QuorumState { final EpochState initialState; if (!election.voters().isEmpty() && !voters.equals(election.voters())) { throw new IllegalStateException("Configured voter set: " + voters - + " is different from the voter set read from the state file: " + election.voters() + - ". Check if the quorum configuration is up to date, " + - "or wipe out the local state file if necessary"); + + " is different from the voter set read from the state file: " + election.voters() + + ". Check if the quorum configuration is up to date, " + + "or wipe out the local state file if necessary"); + } else if (election.hasVoted() && !isVoter()) { + String localIdDescription = localId.isPresent() ? + localId.getAsInt() + " is not a voter" : + "is undefined"; + throw new IllegalStateException("Initialized quorum state " + election + + " with a voted candidate, which indicates this node was previously " + + " a voter, but the local id " + localIdDescription); } else if (election.epoch < logEndOffsetAndEpoch.epoch) { log.warn("Epoch from quorum-state file is {}, which is " + "smaller than last written epoch {} in the log", @@ -139,7 +146,7 @@ public class QuorumState { Optional.empty(), randomElectionTimeoutMs() ); - } else if (election.isLeader(localId)) { + } else if (localId.isPresent() && election.isLeader(localId.getAsInt())) { // If we were previously a leader, then we will start out as resigned // in the same epoch. This serves two purposes: // 1. It ensures that we cannot vote for another leader in the same epoch. @@ -148,16 +155,16 @@ public class QuorumState { // is lost after restarting. initialState = new ResignedState( time, - localId, + localId.getAsInt(), election.epoch, voters, randomElectionTimeoutMs(), Collections.emptyList() ); - } else if (election.isVotedCandidate(localId)) { + } else if (localId.isPresent() && election.isVotedCandidate(localId.getAsInt())) { initialState = new CandidateState( time, - localId, + localId.getAsInt(), election.epoch, voters, Optional.empty(), @@ -196,14 +203,22 @@ public class QuorumState { } public Set remoteVoters() { - return voters.stream().filter(voterId -> voterId != localId).collect(Collectors.toSet()); + return voters.stream().filter(voterId -> voterId != localIdOrSentinel()).collect(Collectors.toSet()); + } + + public int localIdOrSentinel() { + return localId.orElse(-1); + } + + public int localIdOrThrow() { + return localId.orElseThrow(() -> new IllegalStateException("Required local id is not present")); } public int epoch() { return state.epoch(); } - public int leaderIdOrNil() { + public int leaderIdOrSentinel() { return leaderId().orElse(-1); } @@ -212,6 +227,7 @@ public class QuorumState { } public OptionalInt leaderId() { + ElectionState election = state.election(); if (election.hasLeader()) return OptionalInt.of(state.election().leaderId()); @@ -224,11 +240,11 @@ public class QuorumState { } public boolean hasRemoteLeader() { - return hasLeader() && leaderIdOrNil() != localId; + return hasLeader() && leaderIdOrSentinel() != localIdOrSentinel(); } public boolean isVoter() { - return voters.contains(localId); + return localId.isPresent() && voters.contains(localId.getAsInt()); } public boolean isVoter(int nodeId) { @@ -249,7 +265,7 @@ public class QuorumState { int epoch = state.epoch(); this.state = new ResignedState( time, - localId, + localIdOrThrow(), epoch, voters, randomElectionTimeoutMs(), @@ -301,7 +317,7 @@ public class QuorumState { int epoch, int candidateId ) throws IOException { - if (candidateId == localId) { + if (localId.isPresent() && candidateId == localId.getAsInt()) { throw new IllegalStateException("Cannot transition to Voted with votedId=" + candidateId + " and epoch=" + epoch + " since it matches the local broker.id"); } else if (isObserver()) { @@ -341,7 +357,7 @@ public class QuorumState { int epoch, int leaderId ) throws IOException { - if (leaderId == localId) { + if (localId.isPresent() && leaderId == localId.getAsInt()) { throw new IllegalStateException("Cannot transition to Follower with leaderId=" + leaderId + " and epoch=" + epoch + " since it matches the local broker.id=" + localId); } else if (!isVoter(leaderId)) { @@ -384,7 +400,7 @@ public class QuorumState { transitionTo(new CandidateState( time, - localId, + localIdOrThrow(), newEpoch, voters, state.highWatermark(), @@ -417,7 +433,7 @@ public class QuorumState { // we typically expect the state machine to be caught up anyway. transitionTo(new LeaderState( - localId, + localIdOrThrow(), epoch(), epochStartOffset, voters, diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftMetrics.java b/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftMetrics.java index 5c8795d9d5a..c7ffcfb0b6b 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftMetrics.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftMetrics.java @@ -86,7 +86,7 @@ public class KafkaRaftMetrics implements AutoCloseable { this.currentVotedIdMetricName = metrics.metricName("current-vote", metricGroupName, "The current voted leader's id; -1 indicates not voted for anyone"); metrics.addMetric(this.currentVotedIdMetricName, (mConfig, currentTimeMs) -> { if (state.isLeader() || state.isCandidate()) { - return state.localId; + return state.localIdOrThrow(); } else if (state.isVoted()) { return state.votedStateOrThrow().votedId(); } else { diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java index 61fb5b9c16e..a26b0536e49 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java @@ -404,7 +404,7 @@ final public class KafkaRaftClientSnapshotTest { context.pollUntilRequest(); context.assertSentVoteRequest(epoch + 1, 0, 0L, 1); - context.assertVotedCandidate(epoch + 1, context.localId); + context.assertVotedCandidate(epoch + 1, localId); } @Test @@ -955,7 +955,7 @@ final public class KafkaRaftClientSnapshotTest { context.pollUntilRequest(); context.assertSentVoteRequest(epoch + 1, 0, 0L, 1); - context.assertVotedCandidate(epoch + 1, context.localId); + context.assertVotedCandidate(epoch + 1, localId); // Send the response late context.deliverResponse( @@ -981,7 +981,7 @@ final public class KafkaRaftClientSnapshotTest { // Assert that the response is ignored and the replicas stays as a candidate context.client.poll(); - context.assertVotedCandidate(epoch + 1, context.localId); + context.assertVotedCandidate(epoch + 1, localId); } private static FetchSnapshotRequestData fetchSnapshotRequest( diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java index 1b02eb96cec..a9c37814ae2 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java @@ -67,7 +67,7 @@ public class KafkaRaftClientTest { public void testInitializeSingleMemberQuorum() throws IOException { int localId = 0; RaftClientTestContext context = new RaftClientTestContext.Builder(localId, Collections.singleton(localId)).build(); - context.assertElectedLeader(1, context.localId); + context.assertElectedLeader(1, localId); } @Test @@ -83,9 +83,9 @@ public class KafkaRaftClientTest { assertEquals(1L, context.log.endOffset().offset); assertEquals(initialEpoch + 1, context.log.lastFetchedEpoch()); - assertEquals(new LeaderAndEpoch(OptionalInt.of(context.localId), initialEpoch + 1), + assertEquals(new LeaderAndEpoch(OptionalInt.of(localId), initialEpoch + 1), context.currentLeaderAndEpoch()); - context.assertElectedLeader(initialEpoch + 1, context.localId); + context.assertElectedLeader(initialEpoch + 1, localId); } @Test @@ -270,14 +270,14 @@ public class KafkaRaftClientTest { context.time.sleep(2 * context.electionTimeoutMs); context.pollUntilRequest(); - context.assertVotedCandidate(1, context.localId); + context.assertVotedCandidate(1, localId); int correlationId = context.assertSentVoteRequest(1, 0, 0L, 1); context.deliverResponse(correlationId, otherNodeId, context.voteResponse(true, Optional.empty(), 1)); // Become leader after receiving the vote context.client.poll(); - context.assertElectedLeader(1, context.localId); + context.assertElectedLeader(1, localId); long electionTimestamp = context.time.milliseconds(); // Leader change record appended @@ -294,8 +294,8 @@ public class KafkaRaftClientTest { Record record = batch.iterator().next(); assertEquals(electionTimestamp, record.timestamp()); - RaftClientTestContext.verifyLeaderChangeMessage(context.localId, Arrays.asList(otherNodeId, context.localId), - Arrays.asList(otherNodeId, context.localId), record.key(), record.value()); + RaftClientTestContext.verifyLeaderChangeMessage(localId, Arrays.asList(otherNodeId, localId), + Arrays.asList(otherNodeId, localId), record.key(), record.value()); } @Test @@ -310,14 +310,14 @@ public class KafkaRaftClientTest { context.time.sleep(2 * context.electionTimeoutMs); context.pollUntilRequest(); - context.assertVotedCandidate(1, context.localId); + context.assertVotedCandidate(1, localId); int correlationId = context.assertSentVoteRequest(1, 0, 0L, 2); context.deliverResponse(correlationId, firstNodeId, context.voteResponse(true, Optional.empty(), 1)); // Become leader after receiving the vote context.client.poll(); - context.assertElectedLeader(1, context.localId); + context.assertElectedLeader(1, localId); long electionTimestamp = context.time.milliseconds(); // Leader change record appended @@ -334,8 +334,8 @@ public class KafkaRaftClientTest { Record record = batch.iterator().next(); assertEquals(electionTimestamp, record.timestamp()); - RaftClientTestContext.verifyLeaderChangeMessage(context.localId, Arrays.asList(firstNodeId, secondNodeId, context.localId), - Arrays.asList(firstNodeId, context.localId), record.key(), record.value()); + RaftClientTestContext.verifyLeaderChangeMessage(localId, Arrays.asList(firstNodeId, secondNodeId, localId), + Arrays.asList(firstNodeId, localId), record.key(), record.value()); } @Test @@ -395,7 +395,7 @@ public class KafkaRaftClientTest { context.assertVotedCandidate(epoch, localId); context.deliverRequest(context.endEpochRequest(epoch - 2, otherNodeId, - Collections.singletonList(context.localId))); + Collections.singletonList(localId))); context.client.poll(); context.assertSentEndQuorumEpochResponse(Errors.FENCED_LEADER_EPOCH, epoch, OptionalInt.empty()); @@ -403,17 +403,17 @@ public class KafkaRaftClientTest { // We should still be candidate until expiration of election timeout context.time.sleep(context.electionTimeoutMs + jitterMs - 1); context.client.poll(); - context.assertVotedCandidate(epoch, context.localId); + context.assertVotedCandidate(epoch, localId); // Enter the backoff period context.time.sleep(1); context.client.poll(); - context.assertVotedCandidate(epoch, context.localId); + context.assertVotedCandidate(epoch, localId); // After backoff, we will become a candidate again context.time.sleep(context.electionBackoffMaxMs); context.client.poll(); - context.assertVotedCandidate(epoch + 1, context.localId); + context.assertVotedCandidate(epoch + 1, localId); } @Test @@ -427,15 +427,15 @@ public class KafkaRaftClientTest { RaftClientTestContext context = RaftClientTestContext.initializeAsLeader(localId, voters, epoch); // One of the voters may have sent EndQuorumEpoch from an earlier epoch - context.deliverRequest(context.endEpochRequest(epoch - 2, voter2, Arrays.asList(context.localId, voter3))); + context.deliverRequest(context.endEpochRequest(epoch - 2, voter2, Arrays.asList(localId, voter3))); context.pollUntilResponse(); - context.assertSentEndQuorumEpochResponse(Errors.FENCED_LEADER_EPOCH, epoch, OptionalInt.of(context.localId)); + context.assertSentEndQuorumEpochResponse(Errors.FENCED_LEADER_EPOCH, epoch, OptionalInt.of(localId)); // We should still be leader as long as fetch timeout has not expired context.time.sleep(context.fetchTimeoutMs - 1); context.client.poll(); - context.assertElectedLeader(epoch, context.localId); + context.assertElectedLeader(epoch, localId); } @Test @@ -451,14 +451,14 @@ public class KafkaRaftClientTest { .build(); context.deliverRequest(context.endEpochRequest(epoch, voter2, - Arrays.asList(context.localId, voter3))); + Arrays.asList(localId, voter3))); context.pollUntilResponse(); context.assertSentEndQuorumEpochResponse(Errors.NONE, epoch, OptionalInt.of(voter2)); // Should become a candidate immediately context.client.poll(); - context.assertVotedCandidate(epoch + 1, context.localId); + context.assertVotedCandidate(epoch + 1, localId); } @Test @@ -629,13 +629,13 @@ public class KafkaRaftClientTest { .build(); context.deliverRequest(context.endEpochRequest(leaderEpoch, oldLeaderId, - Collections.singletonList(context.localId))); + Collections.singletonList(localId))); context.pollUntilResponse(); context.assertSentEndQuorumEpochResponse(Errors.NONE, leaderEpoch, OptionalInt.of(oldLeaderId)); context.client.poll(); - context.assertVotedCandidate(leaderEpoch + 1, context.localId); + context.assertVotedCandidate(leaderEpoch + 1, localId); } @Test @@ -651,7 +651,7 @@ public class KafkaRaftClientTest { .build(); context.deliverRequest(context.endEpochRequest(leaderEpoch, oldLeaderId, - Arrays.asList(preferredNextLeader, context.localId))); + Arrays.asList(preferredNextLeader, localId))); context.pollUntilResponse(); context.assertSentEndQuorumEpochResponse(Errors.NONE, leaderEpoch, OptionalInt.of(oldLeaderId)); @@ -671,7 +671,7 @@ public class KafkaRaftClientTest { assertEquals(2, voteRequests.size()); // Should have already done self-voting - context.assertVotedCandidate(leaderEpoch + 1, context.localId); + context.assertVotedCandidate(leaderEpoch + 1, localId); } @Test @@ -686,7 +686,7 @@ public class KafkaRaftClientTest { context.time.sleep(2 * context.electionTimeoutMs); context.pollUntilRequest(); - context.assertVotedCandidate(epoch, context.localId); + context.assertVotedCandidate(epoch, localId); int correlationId = context.assertSentVoteRequest(epoch, 0, 0L, 1); @@ -697,12 +697,12 @@ public class KafkaRaftClientTest { // We will ignore the timed out response if it arrives late context.deliverResponse(correlationId, otherNodeId, context.voteResponse(true, Optional.empty(), 1)); context.client.poll(); - context.assertVotedCandidate(epoch, context.localId); + context.assertVotedCandidate(epoch, localId); // Become leader after receiving the retry response context.deliverResponse(retryCorrelationId, otherNodeId, context.voteResponse(true, Optional.empty(), 1)); context.client.poll(); - context.assertElectedLeader(epoch, context.localId); + context.assertElectedLeader(epoch, localId); } @Test @@ -813,8 +813,8 @@ public class KafkaRaftClientTest { context.client.poll(); - context.assertSentVoteResponse(Errors.NONE, leaderEpoch, OptionalInt.of(context.localId), false); - context.assertElectedLeader(leaderEpoch, context.localId); + context.assertSentVoteResponse(Errors.NONE, leaderEpoch, OptionalInt.of(localId), false); + context.assertElectedLeader(leaderEpoch, localId); } @Test @@ -871,7 +871,7 @@ public class KafkaRaftClientTest { context.deliverRequest(context.voteRequest(leaderEpoch, otherNodeId, leaderEpoch - 1, 1)); context.client.poll(); context.assertSentVoteResponse(Errors.NONE, leaderEpoch, OptionalInt.empty(), false); - context.assertVotedCandidate(leaderEpoch, context.localId); + context.assertVotedCandidate(leaderEpoch, localId); } @Test @@ -892,7 +892,7 @@ public class KafkaRaftClientTest { context.time.sleep(2 * context.electionTimeoutMs); context.pollUntilRequest(); - context.assertVotedCandidate(epoch, context.localId); + context.assertVotedCandidate(epoch, localId); // Quorum size is two. If the other member rejects, then we need to schedule a revote. int correlationId = context.assertSentVoteRequest(epoch, 0, 0L, 1); @@ -901,19 +901,19 @@ public class KafkaRaftClientTest { context.client.poll(); // All nodes have rejected our candidacy, but we should still remember that we had voted - context.assertVotedCandidate(epoch, context.localId); + context.assertVotedCandidate(epoch, localId); // Even though our candidacy was rejected, we will backoff for jitter period // before we bump the epoch and start a new election. context.time.sleep(context.electionBackoffMaxMs - 1); context.client.poll(); - context.assertVotedCandidate(epoch, context.localId); + context.assertVotedCandidate(epoch, localId); // After jitter expires, we become a candidate again context.time.sleep(1); context.client.poll(); context.pollUntilRequest(); - context.assertVotedCandidate(epoch + 1, context.localId); + context.assertVotedCandidate(epoch + 1, localId); context.assertSentVoteRequest(epoch + 1, 0, 0L, 1); } @@ -976,7 +976,7 @@ public class KafkaRaftClientTest { context.pollUntilRequest(); context.assertSentVoteRequest(epoch + 1, lastEpoch, 1L, 1); - context.assertVotedCandidate(epoch + 1, context.localId); + context.assertVotedCandidate(epoch + 1, localId); } @Test @@ -1073,27 +1073,27 @@ public class KafkaRaftClientTest { context.deliverRequest(context.fetchRequest( epoch, otherNodeId, -5L, 0, 0)); context.pollUntilResponse(); - context.assertSentFetchResponse(Errors.INVALID_REQUEST, epoch, OptionalInt.of(context.localId)); + context.assertSentFetchResponse(Errors.INVALID_REQUEST, epoch, OptionalInt.of(localId)); context.deliverRequest(context.fetchRequest( epoch, otherNodeId, 0L, -1, 0)); context.pollUntilResponse(); - context.assertSentFetchResponse(Errors.INVALID_REQUEST, epoch, OptionalInt.of(context.localId)); + context.assertSentFetchResponse(Errors.INVALID_REQUEST, epoch, OptionalInt.of(localId)); context.deliverRequest(context.fetchRequest( epoch, otherNodeId, 0L, epoch + 1, 0)); context.pollUntilResponse(); - context.assertSentFetchResponse(Errors.INVALID_REQUEST, epoch, OptionalInt.of(context.localId)); + context.assertSentFetchResponse(Errors.INVALID_REQUEST, epoch, OptionalInt.of(localId)); context.deliverRequest(context.fetchRequest( epoch + 1, otherNodeId, 0L, 0, 0)); context.pollUntilResponse(); - context.assertSentFetchResponse(Errors.UNKNOWN_LEADER_EPOCH, epoch, OptionalInt.of(context.localId)); + context.assertSentFetchResponse(Errors.UNKNOWN_LEADER_EPOCH, epoch, OptionalInt.of(localId)); context.deliverRequest(context.fetchRequest( epoch, otherNodeId, 0L, 0, -1)); context.pollUntilResponse(); - context.assertSentFetchResponse(Errors.INVALID_REQUEST, epoch, OptionalInt.of(context.localId)); + context.assertSentFetchResponse(Errors.INVALID_REQUEST, epoch, OptionalInt.of(localId)); } @Test @@ -1108,17 +1108,17 @@ public class KafkaRaftClientTest { int nonVoterId = 2; context.deliverRequest(context.voteRequest(epoch, nonVoterId, 0, 0)); context.client.poll(); - context.assertSentVoteResponse(Errors.INCONSISTENT_VOTER_SET, epoch, OptionalInt.of(context.localId), false); + context.assertSentVoteResponse(Errors.INCONSISTENT_VOTER_SET, epoch, OptionalInt.of(localId), false); context.deliverRequest(context.beginEpochRequest(epoch, nonVoterId)); context.client.poll(); - context.assertSentBeginQuorumEpochResponse(Errors.INCONSISTENT_VOTER_SET, epoch, OptionalInt.of(context.localId)); + context.assertSentBeginQuorumEpochResponse(Errors.INCONSISTENT_VOTER_SET, epoch, OptionalInt.of(localId)); context.deliverRequest(context.endEpochRequest(epoch, nonVoterId, Collections.singletonList(otherNodeId))); context.client.poll(); - // The sent request has no context.localId as a preferable voter. - context.assertSentEndQuorumEpochResponse(Errors.INCONSISTENT_VOTER_SET, epoch, OptionalInt.of(context.localId)); + // The sent request has no localId as a preferable voter. + context.assertSentEndQuorumEpochResponse(Errors.INCONSISTENT_VOTER_SET, epoch, OptionalInt.of(localId)); } @Test @@ -1167,7 +1167,7 @@ public class KafkaRaftClientTest { // After expiration of the max wait time, the fetch returns an empty record set context.time.sleep(maxWaitTimeMs); context.client.poll(); - MemoryRecords fetchedRecords = context.assertSentFetchResponse(Errors.NONE, epoch, OptionalInt.of(context.localId)); + MemoryRecords fetchedRecords = context.assertSentFetchResponse(Errors.NONE, epoch, OptionalInt.of(localId)); assertEquals(0, fetchedRecords.sizeInBytes()); } @@ -1245,7 +1245,7 @@ public class KafkaRaftClientTest { // Now await the fetch timeout and become a candidate context.time.sleep(context.fetchTimeoutMs); context.client.poll(); - context.assertVotedCandidate(epoch + 1, context.localId); + context.assertVotedCandidate(epoch + 1, localId); // The fetch response from the old leader returns, but it should be ignored Records records = context.buildBatch(0L, 3, Arrays.asList("a", "b")); @@ -1254,7 +1254,7 @@ public class KafkaRaftClientTest { context.client.poll(); assertEquals(0, context.log.endOffset().offset); - context.assertVotedCandidate(epoch + 1, context.localId); + context.assertVotedCandidate(epoch + 1, localId); } @Test @@ -1310,7 +1310,7 @@ public class KafkaRaftClientTest { // Wait until the vote requests are inflight context.pollUntilRequest(); - context.assertVotedCandidate(epoch, context.localId); + context.assertVotedCandidate(epoch, localId); List voteRequests = context.collectVoteRequests(epoch, 0, 0); assertEquals(2, voteRequests.size()); @@ -1496,10 +1496,10 @@ public class KafkaRaftClientTest { context.pollUntilResponse(); - context.assertSentDescribeQuorumResponse(context.localId, epoch, highWatermark, + context.assertSentDescribeQuorumResponse(localId, epoch, highWatermark, Arrays.asList( new ReplicaState() - .setReplicaId(context.localId) + .setReplicaId(localId) // As we are appending the records directly to the log, // the leader end offset hasn't been updated yet. .setLogEndOffset(3L), @@ -1602,7 +1602,7 @@ public class KafkaRaftClientTest { int localId = 0; RaftClientTestContext context = new RaftClientTestContext.Builder(localId, Collections.singleton(localId)).build(); - context.assertElectedLeader(1, context.localId); + context.assertElectedLeader(1, localId); context.client.poll(); assertEquals(0, context.channel.drainSendQueue().size()); int shutdownTimeoutMs = 5000; @@ -1708,7 +1708,7 @@ public class KafkaRaftClientTest { context.client.poll(); // The BeginEpoch request eventually times out. We should not send another one. - context.assertSentFetchResponse(Errors.NONE, epoch, OptionalInt.of(context.localId)); + context.assertSentFetchResponse(Errors.NONE, epoch, OptionalInt.of(localId)); context.time.sleep(context.requestTimeoutMs); context.client.poll(); @@ -1725,7 +1725,7 @@ public class KafkaRaftClientTest { RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters).build(); long now = context.time.milliseconds(); - context.assertElectedLeader(1, context.localId); + context.assertElectedLeader(1, localId); // We still write the leader change message assertEquals(OptionalLong.of(1L), context.client.highWatermark()); @@ -1747,7 +1747,7 @@ public class KafkaRaftClientTest { context.deliverRequest(context.fetchRequest(1, otherNodeId, 0L, 0, 500)); context.pollUntilResponse(); - MemoryRecords fetchedRecords = context.assertSentFetchResponse(Errors.NONE, 1, OptionalInt.of(context.localId)); + MemoryRecords fetchedRecords = context.assertSentFetchResponse(Errors.NONE, 1, OptionalInt.of(localId)); List batches = Utils.toList(fetchedRecords.batchIterator()); assertEquals(2, batches.size()); @@ -1758,8 +1758,8 @@ public class KafkaRaftClientTest { Record record = readRecords.get(0); assertEquals(now, record.timestamp()); - RaftClientTestContext.verifyLeaderChangeMessage(context.localId, Collections.singletonList(context.localId), - Collections.singletonList(context.localId), record.key(), record.value()); + RaftClientTestContext.verifyLeaderChangeMessage(localId, Collections.singletonList(localId), + Collections.singletonList(localId), record.key(), record.value()); MutableRecordBatch batch = batches.get(1); assertEquals(1, batch.partitionLeaderEpoch()); @@ -1829,8 +1829,8 @@ public class KafkaRaftClientTest { assertNotNull(getMetric(context.metrics, "append-records-rate")); assertEquals("leader", getMetric(context.metrics, "current-state").metricValue()); - assertEquals((double) context.localId, getMetric(context.metrics, "current-leader").metricValue()); - assertEquals((double) context.localId, getMetric(context.metrics, "current-vote").metricValue()); + assertEquals((double) localId, getMetric(context.metrics, "current-leader").metricValue()); + assertEquals((double) localId, getMetric(context.metrics, "current-vote").metricValue()); assertEquals((double) epoch, getMetric(context.metrics, "current-epoch").metricValue()); assertEquals((double) 1L, getMetric(context.metrics, "high-watermark").metricValue()); assertEquals((double) 1L, getMetric(context.metrics, "log-end-offset").metricValue()); @@ -1911,7 +1911,7 @@ public class KafkaRaftClientTest { // Sleep a little to ensure that we become a candidate context.time.sleep(context.electionTimeoutMs * 2); context.pollUntilRequest(); - context.assertVotedCandidate(epoch, context.localId); + context.assertVotedCandidate(epoch, localId); int correlationId = context.assertSentVoteRequest(epoch, 0, 0L, 1); VoteResponseData response = new VoteResponseData() @@ -2157,7 +2157,7 @@ public class KafkaRaftClientTest { context.deliverRequest(context.fetchRequest(epoch, otherNodeId, 9L, epoch, 500)); context.pollUntilResponse(); assertEquals(OptionalLong.of(9L), context.client.highWatermark()); - context.assertSentFetchResponse(Errors.NONE, epoch, OptionalInt.of(context.localId)); + context.assertSentFetchResponse(Errors.NONE, epoch, OptionalInt.of(localId)); // Now we receive a vote request which transitions us to the 'unattached' state context.deliverRequest(context.voteRequest(epoch + 1, otherNodeId, epoch, 9L)); @@ -2182,6 +2182,46 @@ public class KafkaRaftClientTest { assertEquals(OptionalInt.empty(), secondListener.currentClaimedEpoch()); } + @Test + public void testObserverFetchWithNoLocalId() throws Exception { + // When no `localId` is defined, the client will behave as an observer. + // This is designed for tooling/debugging use cases. + + Set voters = Utils.mkSet(1, 2); + RaftClientTestContext context = new RaftClientTestContext.Builder(OptionalInt.empty(), voters) + .build(); + + // First fetch discovers the current leader and epoch + + context.pollUntilRequest(); + RaftRequest.Outbound fetchRequest1 = context.assertSentFetchRequest(); + assertTrue(voters.contains(fetchRequest1.destinationId())); + context.assertFetchRequestData(fetchRequest1, 0, 0L, 0); + + int leaderEpoch = 5; + int leaderId = 1; + + context.deliverResponse(fetchRequest1.correlationId, fetchRequest1.destinationId(), + context.fetchResponse(5, leaderId, MemoryRecords.EMPTY, 0L, Errors.FENCED_LEADER_EPOCH)); + context.client.poll(); + context.assertElectedLeader(leaderEpoch, leaderId); + + // Second fetch goes to the discovered leader + + context.pollUntilRequest(); + RaftRequest.Outbound fetchRequest2 = context.assertSentFetchRequest(); + assertEquals(leaderId, fetchRequest2.destinationId()); + context.assertFetchRequestData(fetchRequest2, leaderEpoch, 0L, 0); + + List records = Arrays.asList("a", "b", "c"); + MemoryRecords batch1 = context.buildBatch(0L, 3, records); + context.deliverResponse(fetchRequest2.correlationId, fetchRequest2.destinationId(), + context.fetchResponse(leaderEpoch, leaderId, batch1, 0L, Errors.NONE)); + context.client.poll(); + assertEquals(3L, context.log.endOffset().offset); + assertEquals(3, context.log.lastFetchedEpoch()); + } + private static KafkaMetric getMetric(final Metrics metrics, final String name) { return metrics.metrics().get(metrics.metricName(name, "raft-metrics")); } diff --git a/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java b/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java index a0e9e14d5ba..deda71a61fe 100644 --- a/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java @@ -46,6 +46,13 @@ public class QuorumStateTest { private final Random random = Mockito.spy(new Random(1)); private QuorumState buildQuorumState(Set voters) { + return buildQuorumState(OptionalInt.of(localId), voters); + } + + private QuorumState buildQuorumState( + OptionalInt localId, + Set voters + ) { return new QuorumState( localId, voters, @@ -1019,6 +1026,40 @@ public class QuorumStateTest { assertEquals(Optional.empty(), state.highWatermark()); } + @Test + public void testInitializeWithEmptyLocalId() throws IOException { + QuorumState state = buildQuorumState(OptionalInt.empty(), Utils.mkSet(0, 1)); + state.initialize(new OffsetAndEpoch(0L, 0)); + + assertTrue(state.isObserver()); + assertFalse(state.isVoter()); + + assertThrows(IllegalStateException.class, state::transitionToCandidate); + assertThrows(IllegalStateException.class, () -> state.transitionToVoted(1, 1)); + assertThrows(IllegalStateException.class, () -> state.transitionToLeader(0L)); + + state.transitionToFollower(1, 1); + assertTrue(state.isFollower()); + + state.transitionToUnattached(2); + assertTrue(state.isUnattached()); + } + + @Test + public void testObserverInitializationFailsIfElectionStateHasVotedCandidate() { + Set voters = Utils.mkSet(0, 1); + int epoch = 5; + int votedId = 1; + + store.writeElectionState(ElectionState.withVotedCandidate(epoch, votedId, voters)); + + QuorumState state1 = buildQuorumState(OptionalInt.of(2), voters); + assertThrows(IllegalStateException.class, () -> state1.initialize(new OffsetAndEpoch(0, 0))); + + QuorumState state2 = buildQuorumState(OptionalInt.empty(), voters); + assertThrows(IllegalStateException.class, () -> state2.initialize(new OffsetAndEpoch(0, 0))); + } + private QuorumState initializeEmptyState(Set voters) throws IOException { QuorumState state = buildQuorumState(voters); store.writeElectionState(ElectionState.withUnknownLeader(0, voters)); diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java index 020e1aa96ab..40f534308f5 100644 --- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java +++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java @@ -97,7 +97,7 @@ public final class RaftClientTestContext { final int requestTimeoutMs; private final QuorumStateStore quorumStateStore; - final int localId; + private final OptionalInt localId; public final KafkaRaftClient client; final Metrics metrics; public final MockLog log; @@ -127,7 +127,7 @@ public final class RaftClientTestContext { private final Random random = Mockito.spy(new Random(1)); private final MockLog log = new MockLog(METADATA_PARTITION); private final Set voters; - private final int localId; + private final OptionalInt localId; private int requestTimeoutMs = DEFAULT_REQUEST_TIMEOUT_MS; private int electionTimeoutMs = DEFAULT_ELECTION_TIMEOUT_MS; @@ -135,6 +135,10 @@ public final class RaftClientTestContext { private MemoryPool memoryPool = MemoryPool.NONE; public Builder(int localId, Set voters) { + this(OptionalInt.of(localId), voters); + } + + public Builder(OptionalInt localId, Set voters) { this.voters = voters; this.localId = localId; } @@ -231,7 +235,7 @@ public final class RaftClientTestContext { } private RaftClientTestContext( - int localId, + OptionalInt localId, KafkaRaftClient client, MockLog log, MockNetworkChannel channel, @@ -344,7 +348,11 @@ public final class RaftClientTestContext { } client.poll(); - assertElectedLeader(epoch, localId); + assertElectedLeader(epoch, localIdOrThrow()); + } + + private int localIdOrThrow() { + return localId.orElseThrow(() -> new AssertionError("Required local id is not defined")); } void expectBeginEpoch( @@ -352,7 +360,7 @@ public final class RaftClientTestContext { ) throws Exception { pollUntilRequest(); for (RaftRequest.Outbound request : collectBeginEpochRequests(epoch)) { - BeginQuorumEpochResponseData beginEpochResponse = beginEpochResponse(epoch, localId); + BeginQuorumEpochResponseData beginEpochResponse = beginEpochResponse(epoch, localIdOrThrow()); deliverResponse(request.correlationId, request.destinationId(), beginEpochResponse); } client.poll(); @@ -456,7 +464,7 @@ public final class RaftClientTestContext { VoteRequestData.PartitionData partitionRequest = unwrap(request); assertEquals(epoch, partitionRequest.candidateEpoch()); - assertEquals(localId, partitionRequest.candidateId()); + assertEquals(localIdOrThrow(), partitionRequest.candidateId()); assertEquals(lastEpoch, partitionRequest.lastOffsetEpoch()); assertEquals(lastEpochOffset, partitionRequest.lastOffset()); voteRequests.add((RaftRequest.Outbound) raftMessage); @@ -645,7 +653,7 @@ public final class RaftClientTestContext { request.topics().get(0).partitions().get(0); assertEquals(epoch, partitionRequest.leaderEpoch()); - assertEquals(localId, partitionRequest.leaderId()); + assertEquals(localIdOrThrow(), partitionRequest.leaderId()); RaftRequest.Outbound outboundRequest = (RaftRequest.Outbound) raftMessage; collectedDestinationIdSet.add(outboundRequest.destinationId()); @@ -681,7 +689,7 @@ public final class RaftClientTestContext { request.topics().get(0).partitions().get(0); assertEquals(epoch, partitionRequest.leaderEpoch()); - assertEquals(localId, partitionRequest.leaderId()); + assertEquals(localIdOrThrow(), partitionRequest.leaderId()); requests.add(raftRequest); } return requests; @@ -824,7 +832,7 @@ public final class RaftClientTestContext { assertEquals(epoch, fetchPartition.currentLeaderEpoch()); assertEquals(fetchOffset, fetchPartition.fetchOffset()); assertEquals(lastFetchedEpoch, fetchPartition.lastFetchedEpoch()); - assertEquals(localId, request.replicaId()); + assertEquals(localId.orElse(-1), request.replicaId()); } FetchRequestData fetchRequest( diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java index 9343dca6af3..486a4e0b407 100644 --- a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java @@ -771,7 +771,7 @@ public class RaftEventSimulationTest { metrics, new MockExpirationService(time), FETCH_MAX_WAIT_MS, - nodeId, + OptionalInt.of(nodeId), logContext, random ); diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java index 641b0a88a89..3122873c248 100644 --- a/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java @@ -32,6 +32,7 @@ import org.junit.jupiter.api.Test; import java.io.IOException; import java.util.Collections; +import java.util.OptionalInt; import java.util.OptionalLong; import java.util.Random; import java.util.Set; @@ -59,7 +60,7 @@ public class KafkaRaftMetricsTest { private QuorumState buildQuorumState(Set voters) { return new QuorumState( - localId, + OptionalInt.of(localId), voters, electionTimeoutMs, fetchTimeoutMs,