From 69eebbf968b520097c9e8441e5b18eac9f1fc075 Mon Sep 17 00:00:00 2001 From: dengziming Date: Sat, 20 Mar 2021 01:27:47 +0800 Subject: [PATCH] KAFKA-12440; ClusterId validation for Vote, BeginQuorum, EndQuorum and FetchSnapshot (#10289) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously we implemented ClusterId validation for the Fetch API in the Raft implementation. This patch adds ClusterId validation to the remaining Raft RPCs. Reviewers: José Armando García Sancio , Jason Gustafson --- checkstyle/suppressions.xml | 2 +- .../common/requests/FetchSnapshotRequest.java | 2 + .../apache/kafka/raft/KafkaRaftClient.java | 51 +++++++++- .../raft/KafkaRaftClientSnapshotTest.java | 78 +++++++++++++++ .../kafka/raft/KafkaRaftClientTest.java | 96 +++++++++++++++++++ .../kafka/raft/RaftClientTestContext.java | 89 ++++++++++++++++- 6 files changed, 312 insertions(+), 6 deletions(-) diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 5473b035ea9..0496d2fca46 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -68,7 +68,7 @@ files="(Utils|Topic|KafkaLZ4BlockOutputStream|AclData|JoinGroupRequest).java"/> + files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory|Authorizer|KafkaRaftClient).java"/> diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchSnapshotRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchSnapshotRequest.java index e7e224e33c3..1769e94f47f 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FetchSnapshotRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchSnapshotRequest.java @@ -60,6 +60,7 @@ final public class FetchSnapshotRequest extends AbstractRequest { * @return the created fetch snapshot request data */ public static FetchSnapshotRequestData singleton( + String clusterId, TopicPartition topicPartition, UnaryOperator operator ) { @@ -68,6 +69,7 @@ final public class FetchSnapshotRequest extends AbstractRequest { ); return new FetchSnapshotRequestData() + .setClusterId(clusterId) .setTopics( Collections.singletonList( new FetchSnapshotRequestData.TopicSnapshot() diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java index e02e20b2452..4e2f2a56bb8 100644 --- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java @@ -549,6 +549,8 @@ public class KafkaRaftClient implements RaftClient { /** * Handle a Vote request. This API may return the following errors: * + * - {@link Errors#INCONSISTENT_CLUSTER_ID} if the cluster id is presented in request + * but different from this node * - {@link Errors#BROKER_NOT_AVAILABLE} if this node is currently shutting down * - {@link Errors#FENCED_LEADER_EPOCH} if the epoch is smaller than this node's epoch * - {@link Errors#INCONSISTENT_VOTER_SET} if the request suggests inconsistent voter membership (e.g. @@ -560,6 +562,10 @@ public class KafkaRaftClient implements RaftClient { ) throws IOException { VoteRequestData request = (VoteRequestData) requestMetadata.data; + if (!hasValidClusterId(request.clusterId())) { + return new VoteResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code()); + } + if (!hasValidTopicPartition(request, log.topicPartition())) { // Until we support multi-raft, we treat topic partition mismatches as invalid requests return new VoteResponseData().setErrorCode(Errors.INVALID_REQUEST.code()); @@ -720,6 +726,8 @@ public class KafkaRaftClient implements RaftClient { /** * Handle a BeginEpoch request. This API may return the following errors: * + * - {@link Errors#INCONSISTENT_CLUSTER_ID} if the cluster id is presented in request + * but different from this node * - {@link Errors#BROKER_NOT_AVAILABLE} if this node is currently shutting down * - {@link Errors#INCONSISTENT_VOTER_SET} if the request suggests inconsistent voter membership (e.g. * if this node or the sender is not one of the current known voters) @@ -731,6 +739,10 @@ public class KafkaRaftClient implements RaftClient { ) throws IOException { BeginQuorumEpochRequestData request = (BeginQuorumEpochRequestData) requestMetadata.data; + if (!hasValidClusterId(request.clusterId())) { + return new BeginQuorumEpochResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code()); + } + if (!hasValidTopicPartition(request, log.topicPartition())) { // Until we support multi-raft, we treat topic partition mismatches as invalid requests return new BeginQuorumEpochResponseData().setErrorCode(Errors.INVALID_REQUEST.code()); @@ -803,6 +815,8 @@ public class KafkaRaftClient implements RaftClient { /** * Handle an EndEpoch request. This API may return the following errors: * + * - {@link Errors#INCONSISTENT_CLUSTER_ID} if the cluster id is presented in request + * but different from this node * - {@link Errors#BROKER_NOT_AVAILABLE} if this node is currently shutting down * - {@link Errors#INCONSISTENT_VOTER_SET} if the request suggests inconsistent voter membership (e.g. * if this node or the sender is not one of the current known voters) @@ -814,6 +828,10 @@ public class KafkaRaftClient implements RaftClient { ) throws IOException { EndQuorumEpochRequestData request = (EndQuorumEpochRequestData) requestMetadata.data; + if (!hasValidClusterId(request.clusterId())) { + return new EndQuorumEpochResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code()); + } + if (!hasValidTopicPartition(request, log.topicPartition())) { // Until we support multi-raft, we treat topic partition mismatches as invalid requests return new EndQuorumEpochResponseData().setErrorCode(Errors.INVALID_REQUEST.code()); @@ -939,12 +957,12 @@ public class KafkaRaftClient implements RaftClient { ); } - private boolean hasValidClusterId(FetchRequestData request) { + private boolean hasValidClusterId(String requestClusterId) { // We don't enforce the cluster id if it is not provided. - if (request.clusterId() == null) { + if (requestClusterId == null) { return true; } - return clusterId.equals(request.clusterId()); + return clusterId.equals(requestClusterId); } /** @@ -955,6 +973,8 @@ public class KafkaRaftClient implements RaftClient { * * This API may return the following errors: * + * - {@link Errors#INCONSISTENT_CLUSTER_ID} if the cluster id is presented in request + * but different from this node * - {@link Errors#BROKER_NOT_AVAILABLE} if this node is currently shutting down * - {@link Errors#FENCED_LEADER_EPOCH} if the epoch is smaller than this node's epoch * - {@link Errors#INVALID_REQUEST} if the request epoch is larger than the leader's current epoch @@ -966,7 +986,7 @@ public class KafkaRaftClient implements RaftClient { ) { FetchRequestData request = (FetchRequestData) requestMetadata.data; - if (!hasValidClusterId(request)) { + if (!hasValidClusterId(request.clusterId())) { return completedFuture(new FetchResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code())); } @@ -1200,11 +1220,30 @@ public class KafkaRaftClient implements RaftClient { ); } + /** + * Handle a FetchSnapshot request, similar to the Fetch request but we use {@link UnalignedRecords} + * in response because the records are not necessarily offset-aligned. + * + * This API may return the following errors: + * + * - {@link Errors#INCONSISTENT_CLUSTER_ID} if the cluster id is presented in request + * but different from this node + * - {@link Errors#BROKER_NOT_AVAILABLE} if this node is currently shutting down + * - {@link Errors#FENCED_LEADER_EPOCH} if the epoch is smaller than this node's epoch + * - {@link Errors#INVALID_REQUEST} if the request epoch is larger than the leader's current epoch + * or if either the fetch offset or the last fetched epoch is invalid + * - {@link Errors#SNAPSHOT_NOT_FOUND} if the request snapshot id does not exists + * - {@link Errors#POSITION_OUT_OF_RANGE} if the request snapshot offset out of range + */ private FetchSnapshotResponseData handleFetchSnapshotRequest( RaftRequest.Inbound requestMetadata ) throws IOException { FetchSnapshotRequestData data = (FetchSnapshotRequestData) requestMetadata.data; + if (!hasValidClusterId(data.clusterId())) { + return new FetchSnapshotResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code()); + } + if (data.topics().size() != 1 && data.topics().get(0).partitions().size() != 1) { return FetchSnapshotResponse.withTopLevelError(Errors.INVALID_REQUEST); } @@ -1728,6 +1767,7 @@ public class KafkaRaftClient implements RaftClient { ) { return EndQuorumEpochRequest.singletonRequest( log.topicPartition(), + clusterId, quorum.epoch(), quorum.localIdOrThrow(), state.preferredSuccessors() @@ -1752,6 +1792,7 @@ public class KafkaRaftClient implements RaftClient { private BeginQuorumEpochRequestData buildBeginQuorumEpochRequest() { return BeginQuorumEpochRequest.singletonRequest( log.topicPartition(), + clusterId, quorum.epoch(), quorum.localIdOrThrow() ); @@ -1761,6 +1802,7 @@ public class KafkaRaftClient implements RaftClient { OffsetAndEpoch endOffset = endOffset(); return VoteRequest.singletonRequest( log.topicPartition(), + clusterId, quorum.epoch(), quorum.localIdOrThrow(), endOffset.epoch, @@ -1801,6 +1843,7 @@ public class KafkaRaftClient implements RaftClient { .setEndOffset(snapshotId.offset); FetchSnapshotRequestData request = FetchSnapshotRequest.singleton( + clusterId, log.topicPartition(), snapshotPartition -> { return snapshotPartition diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java index c66b9bdf31e..6af2f0d4b7a 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java @@ -1269,7 +1269,84 @@ final public class KafkaRaftClientSnapshotTest { context.assertVotedCandidate(epoch + 1, localId); } + @Test + public void testFetchSnapshotRequestClusterIdValidation() throws Exception { + int localId = 0; + int otherNodeId = 1; + int epoch = 5; + Set voters = Utils.mkSet(localId, otherNodeId); + + RaftClientTestContext context = RaftClientTestContext.initializeAsLeader(localId, voters, epoch); + + // null cluster id is accepted + context.deliverRequest( + fetchSnapshotRequest( + context.clusterId.toString(), + context.metadataPartition, + epoch, + new OffsetAndEpoch(0, 0), + Integer.MAX_VALUE, + 0 + ) + ); + context.pollUntilResponse(); + context.assertSentFetchSnapshotResponse(context.metadataPartition); + + // null cluster id is accepted + context.deliverRequest( + fetchSnapshotRequest( + null, + context.metadataPartition, + epoch, + new OffsetAndEpoch(0, 0), + Integer.MAX_VALUE, + 0 + ) + ); + context.pollUntilResponse(); + context.assertSentFetchSnapshotResponse(context.metadataPartition); + + // empty cluster id is rejected + context.deliverRequest( + fetchSnapshotRequest( + "", + context.metadataPartition, + epoch, + new OffsetAndEpoch(0, 0), + Integer.MAX_VALUE, + 0 + ) + ); + context.pollUntilResponse(); + context.assertSentFetchSnapshotResponse(Errors.INCONSISTENT_CLUSTER_ID); + + // invalid cluster id is rejected + context.deliverRequest( + fetchSnapshotRequest( + "invalid-uuid", + context.metadataPartition, + epoch, + new OffsetAndEpoch(0, 0), + Integer.MAX_VALUE, + 0 + ) + ); + context.pollUntilResponse(); + context.assertSentFetchSnapshotResponse(Errors.INCONSISTENT_CLUSTER_ID); + } + private static FetchSnapshotRequestData fetchSnapshotRequest( + TopicPartition topicPartition, + int epoch, + OffsetAndEpoch offsetAndEpoch, + int maxBytes, + long position + ) { + return fetchSnapshotRequest(null, topicPartition, epoch, offsetAndEpoch, maxBytes, position); + } + + private static FetchSnapshotRequestData fetchSnapshotRequest( + String clusterId, TopicPartition topicPartition, int epoch, OffsetAndEpoch offsetAndEpoch, @@ -1281,6 +1358,7 @@ final public class KafkaRaftClientSnapshotTest { .setEpoch(offsetAndEpoch.epoch); FetchSnapshotRequestData request = FetchSnapshotRequest.singleton( + clusterId, topicPartition, snapshotPartition -> { return snapshotPartition diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java index 1c716f42fc0..61e3e86a4d4 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java @@ -1107,6 +1107,12 @@ public class KafkaRaftClientTest { RaftClientTestContext context = RaftClientTestContext.initializeAsLeader(localId, voters, epoch); + // valid cluster id is accepted + context.deliverRequest(context.fetchRequest( + epoch, context.clusterId.toString(), otherNodeId, -5L, 0, 0)); + context.pollUntilResponse(); + context.assertSentFetchPartitionResponse(Errors.INVALID_REQUEST, epoch, OptionalInt.of(localId)); + // null cluster id is accepted context.deliverRequest(context.fetchRequest( epoch, null, otherNodeId, -5L, 0, 0)); @@ -1126,6 +1132,96 @@ public class KafkaRaftClientTest { context.assertSentFetchPartitionResponse(Errors.INCONSISTENT_CLUSTER_ID); } + @Test + public void testVoteRequestClusterIdValidation() throws Exception { + int localId = 0; + int otherNodeId = 1; + int epoch = 5; + Set voters = Utils.mkSet(localId, otherNodeId); + + RaftClientTestContext context = RaftClientTestContext.initializeAsLeader(localId, voters, epoch); + + // valid cluster id is accepted + context.deliverRequest(context.voteRequest(epoch, localId, 0, 0)); + context.pollUntilResponse(); + context.assertSentVoteResponse(Errors.NONE, epoch, OptionalInt.of(localId), false); + + // null cluster id is accepted + context.deliverRequest(context.voteRequest(epoch, localId, 0, 0)); + context.pollUntilResponse(); + context.assertSentVoteResponse(Errors.NONE, epoch, OptionalInt.of(localId), false); + + // empty cluster id is rejected + context.deliverRequest(context.voteRequest("", epoch, localId, 0, 0)); + context.pollUntilResponse(); + context.assertSentVoteResponse(Errors.INCONSISTENT_CLUSTER_ID); + + // invalid cluster id is rejected + context.deliverRequest(context.voteRequest("invalid-uuid", epoch, localId, 0, 0)); + context.pollUntilResponse(); + context.assertSentVoteResponse(Errors.INCONSISTENT_CLUSTER_ID); + } + + @Test + public void testBeginQuorumEpochRequestClusterIdValidation() throws Exception { + int localId = 0; + int otherNodeId = 1; + int epoch = 5; + Set voters = Utils.mkSet(localId, otherNodeId); + + RaftClientTestContext context = RaftClientTestContext.initializeAsLeader(localId, voters, epoch); + + // valid cluster id is accepted + context.deliverRequest(context.beginEpochRequest(context.clusterId.toString(), epoch, localId)); + context.pollUntilResponse(); + context.assertSentBeginQuorumEpochResponse(Errors.NONE, epoch, OptionalInt.of(localId)); + + // null cluster id is accepted + context.deliverRequest(context.beginEpochRequest(epoch, localId)); + context.pollUntilResponse(); + context.assertSentBeginQuorumEpochResponse(Errors.NONE, epoch, OptionalInt.of(localId)); + + // empty cluster id is rejected + context.deliverRequest(context.beginEpochRequest("", epoch, localId)); + context.pollUntilResponse(); + context.assertSentBeginQuorumEpochResponse(Errors.INCONSISTENT_CLUSTER_ID); + + // invalid cluster id is rejected + context.deliverRequest(context.beginEpochRequest("invalid-uuid", epoch, localId)); + context.pollUntilResponse(); + context.assertSentBeginQuorumEpochResponse(Errors.INCONSISTENT_CLUSTER_ID); + } + + @Test + public void testEndQuorumEpochRequestClusterIdValidation() throws Exception { + int localId = 0; + int otherNodeId = 1; + int epoch = 5; + Set voters = Utils.mkSet(localId, otherNodeId); + + RaftClientTestContext context = RaftClientTestContext.initializeAsLeader(localId, voters, epoch); + + // valid cluster id is accepted + context.deliverRequest(context.endEpochRequest(context.clusterId.toString(), epoch, localId, Collections.singletonList(otherNodeId))); + context.pollUntilResponse(); + context.assertSentEndQuorumEpochResponse(Errors.NONE, epoch, OptionalInt.of(localId)); + + // null cluster id is accepted + context.deliverRequest(context.endEpochRequest(epoch, localId, Collections.singletonList(otherNodeId))); + context.pollUntilResponse(); + context.assertSentEndQuorumEpochResponse(Errors.NONE, epoch, OptionalInt.of(localId)); + + // empty cluster id is rejected + context.deliverRequest(context.endEpochRequest("", epoch, localId, Collections.singletonList(otherNodeId))); + context.pollUntilResponse(); + context.assertSentEndQuorumEpochResponse(Errors.INCONSISTENT_CLUSTER_ID); + + // invalid cluster id is rejected + context.deliverRequest(context.endEpochRequest("invalid-uuid", epoch, localId, Collections.singletonList(otherNodeId))); + context.pollUntilResponse(); + context.assertSentEndQuorumEpochResponse(Errors.INCONSISTENT_CLUSTER_ID); + } + @Test public void testVoterOnlyRequestValidation() throws Exception { int localId = 0; diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java index 1f87e27fc48..39e9c34de54 100644 --- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java +++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java @@ -96,7 +96,7 @@ public final class RaftClientTestContext { private int appendLingerMs; private final QuorumStateStore quorumStateStore; - private final Uuid clusterId; + final Uuid clusterId; private final OptionalInt localId; public final KafkaRaftClient client; final Metrics metrics; @@ -459,6 +459,18 @@ public final class RaftClientTestContext { return voteRequests.iterator().next().correlationId(); } + void assertSentVoteResponse( + Errors error + ) { + List sentMessages = drainSentResponses(ApiKeys.VOTE); + assertEquals(1, sentMessages.size()); + RaftMessage raftMessage = sentMessages.get(0); + assertTrue(raftMessage.data() instanceof VoteResponseData); + VoteResponseData response = (VoteResponseData) raftMessage.data(); + + assertEquals(error, Errors.forCode(response.errorCode())); + } + void assertSentVoteResponse( Errors error, int epoch, @@ -539,6 +551,17 @@ public final class RaftClientTestContext { return res; } + void assertSentBeginQuorumEpochResponse( + Errors responseError + ) { + List sentMessages = drainSentResponses(ApiKeys.BEGIN_QUORUM_EPOCH); + assertEquals(1, sentMessages.size()); + RaftMessage raftMessage = sentMessages.get(0); + assertTrue(raftMessage.data() instanceof BeginQuorumEpochResponseData); + BeginQuorumEpochResponseData response = (BeginQuorumEpochResponseData) raftMessage.data(); + assertEquals(responseError, Errors.forCode(response.errorCode())); + } + void assertSentBeginQuorumEpochResponse( Errors partitionError, int epoch, @@ -566,6 +589,17 @@ public final class RaftClientTestContext { return endQuorumRequests.get(0).correlationId(); } + void assertSentEndQuorumEpochResponse( + Errors responseError + ) { + List sentMessages = drainSentResponses(ApiKeys.END_QUORUM_EPOCH); + assertEquals(1, sentMessages.size()); + RaftMessage raftMessage = sentMessages.get(0); + assertTrue(raftMessage.data() instanceof EndQuorumEpochResponseData); + EndQuorumEpochResponseData response = (EndQuorumEpochResponseData) raftMessage.data(); + assertEquals(responseError, Errors.forCode(response.errorCode())); + } + void assertSentEndQuorumEpochResponse( Errors partitionError, int epoch, @@ -670,6 +704,17 @@ public final class RaftClientTestContext { return sentRequests.get(0); } + void assertSentFetchSnapshotResponse(Errors responseError) { + List sentMessages = drainSentResponses(ApiKeys.FETCH_SNAPSHOT); + assertEquals(1, sentMessages.size()); + + RaftMessage message = sentMessages.get(0); + assertTrue(message.data() instanceof FetchSnapshotResponseData); + + FetchSnapshotResponseData response = (FetchSnapshotResponseData) message.data(); + assertEquals(responseError, Errors.forCode(response.errorCode())); + } + Optional assertSentFetchSnapshotResponse(TopicPartition topicPartition) { List sentMessages = drainSentResponses(ApiKeys.FETCH_SNAPSHOT); assertEquals(1, sentMessages.size()); @@ -787,6 +832,30 @@ public final class RaftClientTestContext { ); } + EndQuorumEpochRequestData endEpochRequest( + String clusterId, + int epoch, + int leaderId, + List preferredSuccessors + ) { + return EndQuorumEpochRequest.singletonRequest( + metadataPartition, + clusterId, + epoch, + leaderId, + preferredSuccessors + ); + } + + BeginQuorumEpochRequestData beginEpochRequest(String clusterId, int epoch, int leaderId) { + return BeginQuorumEpochRequest.singletonRequest( + metadataPartition, + clusterId, + epoch, + leaderId + ); + } + BeginQuorumEpochRequestData beginEpochRequest(int epoch, int leaderId) { return BeginQuorumEpochRequest.singletonRequest( metadataPartition, @@ -808,6 +877,7 @@ public final class RaftClientTestContext { VoteRequestData voteRequest(int epoch, int candidateId, int lastEpoch, long lastEpochOffset) { return VoteRequest.singletonRequest( metadataPartition, + clusterId.toString(), epoch, candidateId, lastEpoch, @@ -815,6 +885,23 @@ public final class RaftClientTestContext { ); } + VoteRequestData voteRequest( + String clusterId, + int epoch, + int candidateId, + int lastEpoch, + long lastEpochOffset + ) { + return VoteRequest.singletonRequest( + metadataPartition, + clusterId, + epoch, + candidateId, + lastEpoch, + lastEpochOffset + ); + } + VoteResponseData voteResponse(boolean voteGranted, Optional leaderId, int epoch) { return VoteResponse.singletonResponse( Errors.NONE,