mirror of https://github.com/apache/kafka.git
KAFKA-14154; Kraft controller should return NOT_CONTROLLER if request epoch is ahead (#12514)
Similar to https://github.com/apache/kafka/pull/12506. For the Kraft controller, we should return NOT_CONTROLLER if the leader/partition epoch in the request is ahead of the controller. Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>
This commit is contained in:
parent
75d89931e0
commit
5990471b8c
|
@ -125,6 +125,7 @@ import static org.apache.kafka.common.protocol.Errors.INVALID_REQUEST;
|
|||
import static org.apache.kafka.common.protocol.Errors.INVALID_UPDATE_VERSION;
|
||||
import static org.apache.kafka.common.protocol.Errors.NEW_LEADER_ELECTED;
|
||||
import static org.apache.kafka.common.protocol.Errors.NONE;
|
||||
import static org.apache.kafka.common.protocol.Errors.NOT_CONTROLLER;
|
||||
import static org.apache.kafka.common.protocol.Errors.NO_REASSIGNMENT_IN_PROGRESS;
|
||||
import static org.apache.kafka.common.protocol.Errors.OPERATION_NOT_ATTEMPTED;
|
||||
import static org.apache.kafka.common.protocol.Errors.TOPIC_AUTHORIZATION_FAILED;
|
||||
|
@ -1071,7 +1072,23 @@ public class ReplicationControlManager {
|
|||
|
||||
return UNKNOWN_TOPIC_OR_PARTITION;
|
||||
}
|
||||
if (partitionData.leaderEpoch() != partition.leaderEpoch) {
|
||||
|
||||
// If the partition leader has a higher leader/partition epoch, then it is likely
|
||||
// that this node is no longer the active controller. We return NOT_CONTROLLER in
|
||||
// this case to give the leader an opportunity to find the new controller.
|
||||
if (partitionData.leaderEpoch() > partition.leaderEpoch) {
|
||||
log.debug("Rejecting AlterPartition request from node {} for {}-{} because " +
|
||||
"the current leader epoch is {}, which is greater than the local value {}.",
|
||||
brokerId, topic.name, partitionId, partition.leaderEpoch, partitionData.leaderEpoch());
|
||||
return NOT_CONTROLLER;
|
||||
}
|
||||
if (partitionData.partitionEpoch() > partition.partitionEpoch) {
|
||||
log.debug("Rejecting AlterPartition request from node {} for {}-{} because " +
|
||||
"the current partition epoch is {}, which is greater than the local value {}.",
|
||||
brokerId, topic.name, partitionId, partition.partitionEpoch, partitionData.partitionEpoch());
|
||||
return NOT_CONTROLLER;
|
||||
}
|
||||
if (partitionData.leaderEpoch() < partition.leaderEpoch) {
|
||||
log.debug("Rejecting AlterPartition request from node {} for {}-{} because " +
|
||||
"the current leader epoch is {}, not {}.", brokerId, topic.name,
|
||||
partitionId, partition.leaderEpoch, partitionData.leaderEpoch());
|
||||
|
@ -1085,7 +1102,7 @@ public class ReplicationControlManager {
|
|||
|
||||
return INVALID_REQUEST;
|
||||
}
|
||||
if (partitionData.partitionEpoch() != partition.partitionEpoch) {
|
||||
if (partitionData.partitionEpoch() < partition.partitionEpoch) {
|
||||
log.info("Rejecting AlterPartition request from node {} for {}-{} because " +
|
||||
"the current partition epoch is {}, not {}.", brokerId,
|
||||
topic.name, partitionId, partition.partitionEpoch,
|
||||
|
|
|
@ -122,6 +122,7 @@ import static org.apache.kafka.common.protocol.Errors.INVALID_REPLICA_ASSIGNMENT
|
|||
import static org.apache.kafka.common.protocol.Errors.INVALID_TOPIC_EXCEPTION;
|
||||
import static org.apache.kafka.common.protocol.Errors.NEW_LEADER_ELECTED;
|
||||
import static org.apache.kafka.common.protocol.Errors.NONE;
|
||||
import static org.apache.kafka.common.protocol.Errors.NOT_CONTROLLER;
|
||||
import static org.apache.kafka.common.protocol.Errors.NO_REASSIGNMENT_IN_PROGRESS;
|
||||
import static org.apache.kafka.common.protocol.Errors.OPERATION_NOT_ATTEMPTED;
|
||||
import static org.apache.kafka.common.protocol.Errors.POLICY_VIOLATION;
|
||||
|
@ -960,7 +961,16 @@ public class ReplicationControlManagerTest {
|
|||
ControllerResult<AlterPartitionResponseData> invalidLeaderEpochResult = sendAlterPartition(
|
||||
replicationControl, leaderId, ctx.currentBrokerEpoch(leaderId),
|
||||
topicIdPartition.topicId(), invalidLeaderEpochRequest);
|
||||
assertAlterPartitionResponse(invalidLeaderEpochResult, topicIdPartition, FENCED_LEADER_EPOCH);
|
||||
assertAlterPartitionResponse(invalidLeaderEpochResult, topicIdPartition, NOT_CONTROLLER);
|
||||
|
||||
// Invalid partition epoch
|
||||
PartitionData invalidPartitionEpochRequest = newAlterPartition(
|
||||
replicationControl, topicIdPartition, asList(0, 1), LeaderRecoveryState.RECOVERED);
|
||||
invalidPartitionEpochRequest.setPartitionEpoch(500);
|
||||
ControllerResult<AlterPartitionResponseData> invalidPartitionEpochResult = sendAlterPartition(
|
||||
replicationControl, leaderId, ctx.currentBrokerEpoch(leaderId),
|
||||
topicIdPartition.topicId(), invalidPartitionEpochRequest);
|
||||
assertAlterPartitionResponse(invalidPartitionEpochResult, topicIdPartition, NOT_CONTROLLER);
|
||||
|
||||
// Invalid ISR (3 is not a valid replica)
|
||||
PartitionData invalidIsrRequest1 = newAlterPartition(
|
||||
|
|
Loading…
Reference in New Issue