KAFKA-12440; ClusterId validation for Vote, BeginQuorum, EndQuorum and FetchSnapshot (#10289)

Previously we implemented ClusterId validation for the Fetch API in the Raft implementation. This patch adds ClusterId validation to the remaining Raft RPCs. 

Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>, Jason Gustafson <jason@confluent.io>
This commit is contained in:
dengziming 2021-03-20 01:27:47 +08:00 committed by GitHub
parent a19806f262
commit 69eebbf968
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 312 additions and 6 deletions

View File

@ -68,7 +68,7 @@
files="(Utils|Topic|KafkaLZ4BlockOutputStream|AclData|JoinGroupRequest).java"/>
<suppress checks="CyclomaticComplexity"
files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory|Authorizer).java"/>
files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory|Authorizer|KafkaRaftClient).java"/>
<suppress checks="JavaNCSS"
files="(AbstractRequest|AbstractResponse|KerberosLogin|WorkerSinkTaskTest|TransactionManagerTest|SenderTest|KafkaAdminClient|ConsumerCoordinatorTest|KafkaAdminClientTest).java"/>

View File

@ -60,6 +60,7 @@ final public class FetchSnapshotRequest extends AbstractRequest {
* @return the created fetch snapshot request data
*/
public static FetchSnapshotRequestData singleton(
String clusterId,
TopicPartition topicPartition,
UnaryOperator<FetchSnapshotRequestData.PartitionSnapshot> operator
) {
@ -68,6 +69,7 @@ final public class FetchSnapshotRequest extends AbstractRequest {
);
return new FetchSnapshotRequestData()
.setClusterId(clusterId)
.setTopics(
Collections.singletonList(
new FetchSnapshotRequestData.TopicSnapshot()

View File

@ -549,6 +549,8 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
/**
* Handle a Vote request. This API may return the following errors:
*
* - {@link Errors#INCONSISTENT_CLUSTER_ID} if the cluster id is presented in request
* but different from this node
* - {@link Errors#BROKER_NOT_AVAILABLE} if this node is currently shutting down
* - {@link Errors#FENCED_LEADER_EPOCH} if the epoch is smaller than this node's epoch
* - {@link Errors#INCONSISTENT_VOTER_SET} if the request suggests inconsistent voter membership (e.g.
@ -560,6 +562,10 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
) throws IOException {
VoteRequestData request = (VoteRequestData) requestMetadata.data;
if (!hasValidClusterId(request.clusterId())) {
return new VoteResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code());
}
if (!hasValidTopicPartition(request, log.topicPartition())) {
// Until we support multi-raft, we treat topic partition mismatches as invalid requests
return new VoteResponseData().setErrorCode(Errors.INVALID_REQUEST.code());
@ -720,6 +726,8 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
/**
* Handle a BeginEpoch request. This API may return the following errors:
*
* - {@link Errors#INCONSISTENT_CLUSTER_ID} if the cluster id is presented in request
* but different from this node
* - {@link Errors#BROKER_NOT_AVAILABLE} if this node is currently shutting down
* - {@link Errors#INCONSISTENT_VOTER_SET} if the request suggests inconsistent voter membership (e.g.
* if this node or the sender is not one of the current known voters)
@ -731,6 +739,10 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
) throws IOException {
BeginQuorumEpochRequestData request = (BeginQuorumEpochRequestData) requestMetadata.data;
if (!hasValidClusterId(request.clusterId())) {
return new BeginQuorumEpochResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code());
}
if (!hasValidTopicPartition(request, log.topicPartition())) {
// Until we support multi-raft, we treat topic partition mismatches as invalid requests
return new BeginQuorumEpochResponseData().setErrorCode(Errors.INVALID_REQUEST.code());
@ -803,6 +815,8 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
/**
* Handle an EndEpoch request. This API may return the following errors:
*
* - {@link Errors#INCONSISTENT_CLUSTER_ID} if the cluster id is presented in request
* but different from this node
* - {@link Errors#BROKER_NOT_AVAILABLE} if this node is currently shutting down
* - {@link Errors#INCONSISTENT_VOTER_SET} if the request suggests inconsistent voter membership (e.g.
* if this node or the sender is not one of the current known voters)
@ -814,6 +828,10 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
) throws IOException {
EndQuorumEpochRequestData request = (EndQuorumEpochRequestData) requestMetadata.data;
if (!hasValidClusterId(request.clusterId())) {
return new EndQuorumEpochResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code());
}
if (!hasValidTopicPartition(request, log.topicPartition())) {
// Until we support multi-raft, we treat topic partition mismatches as invalid requests
return new EndQuorumEpochResponseData().setErrorCode(Errors.INVALID_REQUEST.code());
@ -939,12 +957,12 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
);
}
private boolean hasValidClusterId(FetchRequestData request) {
private boolean hasValidClusterId(String requestClusterId) {
// We don't enforce the cluster id if it is not provided.
if (request.clusterId() == null) {
if (requestClusterId == null) {
return true;
}
return clusterId.equals(request.clusterId());
return clusterId.equals(requestClusterId);
}
/**
@ -955,6 +973,8 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
*
* This API may return the following errors:
*
* - {@link Errors#INCONSISTENT_CLUSTER_ID} if the cluster id is presented in request
* but different from this node
* - {@link Errors#BROKER_NOT_AVAILABLE} if this node is currently shutting down
* - {@link Errors#FENCED_LEADER_EPOCH} if the epoch is smaller than this node's epoch
* - {@link Errors#INVALID_REQUEST} if the request epoch is larger than the leader's current epoch
@ -966,7 +986,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
) {
FetchRequestData request = (FetchRequestData) requestMetadata.data;
if (!hasValidClusterId(request)) {
if (!hasValidClusterId(request.clusterId())) {
return completedFuture(new FetchResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code()));
}
@ -1200,11 +1220,30 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
);
}
/**
* Handle a FetchSnapshot request, similar to the Fetch request but we use {@link UnalignedRecords}
* in response because the records are not necessarily offset-aligned.
*
* This API may return the following errors:
*
* - {@link Errors#INCONSISTENT_CLUSTER_ID} if the cluster id is presented in request
* but different from this node
* - {@link Errors#BROKER_NOT_AVAILABLE} if this node is currently shutting down
* - {@link Errors#FENCED_LEADER_EPOCH} if the epoch is smaller than this node's epoch
* - {@link Errors#INVALID_REQUEST} if the request epoch is larger than the leader's current epoch
* or if either the fetch offset or the last fetched epoch is invalid
* - {@link Errors#SNAPSHOT_NOT_FOUND} if the request snapshot id does not exists
* - {@link Errors#POSITION_OUT_OF_RANGE} if the request snapshot offset out of range
*/
private FetchSnapshotResponseData handleFetchSnapshotRequest(
RaftRequest.Inbound requestMetadata
) throws IOException {
FetchSnapshotRequestData data = (FetchSnapshotRequestData) requestMetadata.data;
if (!hasValidClusterId(data.clusterId())) {
return new FetchSnapshotResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code());
}
if (data.topics().size() != 1 && data.topics().get(0).partitions().size() != 1) {
return FetchSnapshotResponse.withTopLevelError(Errors.INVALID_REQUEST);
}
@ -1728,6 +1767,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
) {
return EndQuorumEpochRequest.singletonRequest(
log.topicPartition(),
clusterId,
quorum.epoch(),
quorum.localIdOrThrow(),
state.preferredSuccessors()
@ -1752,6 +1792,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
private BeginQuorumEpochRequestData buildBeginQuorumEpochRequest() {
return BeginQuorumEpochRequest.singletonRequest(
log.topicPartition(),
clusterId,
quorum.epoch(),
quorum.localIdOrThrow()
);
@ -1761,6 +1802,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
OffsetAndEpoch endOffset = endOffset();
return VoteRequest.singletonRequest(
log.topicPartition(),
clusterId,
quorum.epoch(),
quorum.localIdOrThrow(),
endOffset.epoch,
@ -1801,6 +1843,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
.setEndOffset(snapshotId.offset);
FetchSnapshotRequestData request = FetchSnapshotRequest.singleton(
clusterId,
log.topicPartition(),
snapshotPartition -> {
return snapshotPartition

View File

@ -1269,18 +1269,96 @@ final public class KafkaRaftClientSnapshotTest {
context.assertVotedCandidate(epoch + 1, localId);
}
@Test
public void testFetchSnapshotRequestClusterIdValidation() throws Exception {
int localId = 0;
int otherNodeId = 1;
int epoch = 5;
Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
RaftClientTestContext context = RaftClientTestContext.initializeAsLeader(localId, voters, epoch);
// null cluster id is accepted
context.deliverRequest(
fetchSnapshotRequest(
context.clusterId.toString(),
context.metadataPartition,
epoch,
new OffsetAndEpoch(0, 0),
Integer.MAX_VALUE,
0
)
);
context.pollUntilResponse();
context.assertSentFetchSnapshotResponse(context.metadataPartition);
// null cluster id is accepted
context.deliverRequest(
fetchSnapshotRequest(
null,
context.metadataPartition,
epoch,
new OffsetAndEpoch(0, 0),
Integer.MAX_VALUE,
0
)
);
context.pollUntilResponse();
context.assertSentFetchSnapshotResponse(context.metadataPartition);
// empty cluster id is rejected
context.deliverRequest(
fetchSnapshotRequest(
"",
context.metadataPartition,
epoch,
new OffsetAndEpoch(0, 0),
Integer.MAX_VALUE,
0
)
);
context.pollUntilResponse();
context.assertSentFetchSnapshotResponse(Errors.INCONSISTENT_CLUSTER_ID);
// invalid cluster id is rejected
context.deliverRequest(
fetchSnapshotRequest(
"invalid-uuid",
context.metadataPartition,
epoch,
new OffsetAndEpoch(0, 0),
Integer.MAX_VALUE,
0
)
);
context.pollUntilResponse();
context.assertSentFetchSnapshotResponse(Errors.INCONSISTENT_CLUSTER_ID);
}
private static FetchSnapshotRequestData fetchSnapshotRequest(
TopicPartition topicPartition,
int epoch,
OffsetAndEpoch offsetAndEpoch,
int maxBytes,
long position
) {
return fetchSnapshotRequest(null, topicPartition, epoch, offsetAndEpoch, maxBytes, position);
}
private static FetchSnapshotRequestData fetchSnapshotRequest(
String clusterId,
TopicPartition topicPartition,
int epoch,
OffsetAndEpoch offsetAndEpoch,
int maxBytes,
long position
) {
FetchSnapshotRequestData.SnapshotId snapshotId = new FetchSnapshotRequestData.SnapshotId()
.setEndOffset(offsetAndEpoch.offset)
.setEpoch(offsetAndEpoch.epoch);
FetchSnapshotRequestData request = FetchSnapshotRequest.singleton(
clusterId,
topicPartition,
snapshotPartition -> {
return snapshotPartition

View File

@ -1107,6 +1107,12 @@ public class KafkaRaftClientTest {
RaftClientTestContext context = RaftClientTestContext.initializeAsLeader(localId, voters, epoch);
// valid cluster id is accepted
context.deliverRequest(context.fetchRequest(
epoch, context.clusterId.toString(), otherNodeId, -5L, 0, 0));
context.pollUntilResponse();
context.assertSentFetchPartitionResponse(Errors.INVALID_REQUEST, epoch, OptionalInt.of(localId));
// null cluster id is accepted
context.deliverRequest(context.fetchRequest(
epoch, null, otherNodeId, -5L, 0, 0));
@ -1126,6 +1132,96 @@ public class KafkaRaftClientTest {
context.assertSentFetchPartitionResponse(Errors.INCONSISTENT_CLUSTER_ID);
}
@Test
public void testVoteRequestClusterIdValidation() throws Exception {
int localId = 0;
int otherNodeId = 1;
int epoch = 5;
Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
RaftClientTestContext context = RaftClientTestContext.initializeAsLeader(localId, voters, epoch);
// valid cluster id is accepted
context.deliverRequest(context.voteRequest(epoch, localId, 0, 0));
context.pollUntilResponse();
context.assertSentVoteResponse(Errors.NONE, epoch, OptionalInt.of(localId), false);
// null cluster id is accepted
context.deliverRequest(context.voteRequest(epoch, localId, 0, 0));
context.pollUntilResponse();
context.assertSentVoteResponse(Errors.NONE, epoch, OptionalInt.of(localId), false);
// empty cluster id is rejected
context.deliverRequest(context.voteRequest("", epoch, localId, 0, 0));
context.pollUntilResponse();
context.assertSentVoteResponse(Errors.INCONSISTENT_CLUSTER_ID);
// invalid cluster id is rejected
context.deliverRequest(context.voteRequest("invalid-uuid", epoch, localId, 0, 0));
context.pollUntilResponse();
context.assertSentVoteResponse(Errors.INCONSISTENT_CLUSTER_ID);
}
@Test
public void testBeginQuorumEpochRequestClusterIdValidation() throws Exception {
int localId = 0;
int otherNodeId = 1;
int epoch = 5;
Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
RaftClientTestContext context = RaftClientTestContext.initializeAsLeader(localId, voters, epoch);
// valid cluster id is accepted
context.deliverRequest(context.beginEpochRequest(context.clusterId.toString(), epoch, localId));
context.pollUntilResponse();
context.assertSentBeginQuorumEpochResponse(Errors.NONE, epoch, OptionalInt.of(localId));
// null cluster id is accepted
context.deliverRequest(context.beginEpochRequest(epoch, localId));
context.pollUntilResponse();
context.assertSentBeginQuorumEpochResponse(Errors.NONE, epoch, OptionalInt.of(localId));
// empty cluster id is rejected
context.deliverRequest(context.beginEpochRequest("", epoch, localId));
context.pollUntilResponse();
context.assertSentBeginQuorumEpochResponse(Errors.INCONSISTENT_CLUSTER_ID);
// invalid cluster id is rejected
context.deliverRequest(context.beginEpochRequest("invalid-uuid", epoch, localId));
context.pollUntilResponse();
context.assertSentBeginQuorumEpochResponse(Errors.INCONSISTENT_CLUSTER_ID);
}
@Test
public void testEndQuorumEpochRequestClusterIdValidation() throws Exception {
int localId = 0;
int otherNodeId = 1;
int epoch = 5;
Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
RaftClientTestContext context = RaftClientTestContext.initializeAsLeader(localId, voters, epoch);
// valid cluster id is accepted
context.deliverRequest(context.endEpochRequest(context.clusterId.toString(), epoch, localId, Collections.singletonList(otherNodeId)));
context.pollUntilResponse();
context.assertSentEndQuorumEpochResponse(Errors.NONE, epoch, OptionalInt.of(localId));
// null cluster id is accepted
context.deliverRequest(context.endEpochRequest(epoch, localId, Collections.singletonList(otherNodeId)));
context.pollUntilResponse();
context.assertSentEndQuorumEpochResponse(Errors.NONE, epoch, OptionalInt.of(localId));
// empty cluster id is rejected
context.deliverRequest(context.endEpochRequest("", epoch, localId, Collections.singletonList(otherNodeId)));
context.pollUntilResponse();
context.assertSentEndQuorumEpochResponse(Errors.INCONSISTENT_CLUSTER_ID);
// invalid cluster id is rejected
context.deliverRequest(context.endEpochRequest("invalid-uuid", epoch, localId, Collections.singletonList(otherNodeId)));
context.pollUntilResponse();
context.assertSentEndQuorumEpochResponse(Errors.INCONSISTENT_CLUSTER_ID);
}
@Test
public void testVoterOnlyRequestValidation() throws Exception {
int localId = 0;

View File

@ -96,7 +96,7 @@ public final class RaftClientTestContext {
private int appendLingerMs;
private final QuorumStateStore quorumStateStore;
private final Uuid clusterId;
final Uuid clusterId;
private final OptionalInt localId;
public final KafkaRaftClient<String> client;
final Metrics metrics;
@ -459,6 +459,18 @@ public final class RaftClientTestContext {
return voteRequests.iterator().next().correlationId();
}
void assertSentVoteResponse(
Errors error
) {
List<RaftResponse.Outbound> sentMessages = drainSentResponses(ApiKeys.VOTE);
assertEquals(1, sentMessages.size());
RaftMessage raftMessage = sentMessages.get(0);
assertTrue(raftMessage.data() instanceof VoteResponseData);
VoteResponseData response = (VoteResponseData) raftMessage.data();
assertEquals(error, Errors.forCode(response.errorCode()));
}
void assertSentVoteResponse(
Errors error,
int epoch,
@ -539,6 +551,17 @@ public final class RaftClientTestContext {
return res;
}
void assertSentBeginQuorumEpochResponse(
Errors responseError
) {
List<RaftResponse.Outbound> sentMessages = drainSentResponses(ApiKeys.BEGIN_QUORUM_EPOCH);
assertEquals(1, sentMessages.size());
RaftMessage raftMessage = sentMessages.get(0);
assertTrue(raftMessage.data() instanceof BeginQuorumEpochResponseData);
BeginQuorumEpochResponseData response = (BeginQuorumEpochResponseData) raftMessage.data();
assertEquals(responseError, Errors.forCode(response.errorCode()));
}
void assertSentBeginQuorumEpochResponse(
Errors partitionError,
int epoch,
@ -566,6 +589,17 @@ public final class RaftClientTestContext {
return endQuorumRequests.get(0).correlationId();
}
void assertSentEndQuorumEpochResponse(
Errors responseError
) {
List<RaftResponse.Outbound> sentMessages = drainSentResponses(ApiKeys.END_QUORUM_EPOCH);
assertEquals(1, sentMessages.size());
RaftMessage raftMessage = sentMessages.get(0);
assertTrue(raftMessage.data() instanceof EndQuorumEpochResponseData);
EndQuorumEpochResponseData response = (EndQuorumEpochResponseData) raftMessage.data();
assertEquals(responseError, Errors.forCode(response.errorCode()));
}
void assertSentEndQuorumEpochResponse(
Errors partitionError,
int epoch,
@ -670,6 +704,17 @@ public final class RaftClientTestContext {
return sentRequests.get(0);
}
void assertSentFetchSnapshotResponse(Errors responseError) {
List<RaftResponse.Outbound> sentMessages = drainSentResponses(ApiKeys.FETCH_SNAPSHOT);
assertEquals(1, sentMessages.size());
RaftMessage message = sentMessages.get(0);
assertTrue(message.data() instanceof FetchSnapshotResponseData);
FetchSnapshotResponseData response = (FetchSnapshotResponseData) message.data();
assertEquals(responseError, Errors.forCode(response.errorCode()));
}
Optional<FetchSnapshotResponseData.PartitionSnapshot> assertSentFetchSnapshotResponse(TopicPartition topicPartition) {
List<RaftResponse.Outbound> sentMessages = drainSentResponses(ApiKeys.FETCH_SNAPSHOT);
assertEquals(1, sentMessages.size());
@ -787,6 +832,30 @@ public final class RaftClientTestContext {
);
}
EndQuorumEpochRequestData endEpochRequest(
String clusterId,
int epoch,
int leaderId,
List<Integer> preferredSuccessors
) {
return EndQuorumEpochRequest.singletonRequest(
metadataPartition,
clusterId,
epoch,
leaderId,
preferredSuccessors
);
}
BeginQuorumEpochRequestData beginEpochRequest(String clusterId, int epoch, int leaderId) {
return BeginQuorumEpochRequest.singletonRequest(
metadataPartition,
clusterId,
epoch,
leaderId
);
}
BeginQuorumEpochRequestData beginEpochRequest(int epoch, int leaderId) {
return BeginQuorumEpochRequest.singletonRequest(
metadataPartition,
@ -808,6 +877,24 @@ public final class RaftClientTestContext {
VoteRequestData voteRequest(int epoch, int candidateId, int lastEpoch, long lastEpochOffset) {
return VoteRequest.singletonRequest(
metadataPartition,
clusterId.toString(),
epoch,
candidateId,
lastEpoch,
lastEpochOffset
);
}
VoteRequestData voteRequest(
String clusterId,
int epoch,
int candidateId,
int lastEpoch,
long lastEpochOffset
) {
return VoteRequest.singletonRequest(
metadataPartition,
clusterId,
epoch,
candidateId,
lastEpoch,