mirror of https://github.com/apache/kafka.git
MINOR: Add ineligible replica reason to log message (#12328)
It's useful if the message about ineligible replicas explains the reason the replica is ineligible. Reviewers: David Jacot <djacot@confluent.io>
This commit is contained in:
parent
3072b3d23e
commit
ead6645123
|
@ -528,7 +528,7 @@ public class ClusterControlManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns true if the broker is in fenced state; Returns false if it is
|
* Returns true if the broker is unfenced; Returns false if it is
|
||||||
* not or if it does not exist.
|
* not or if it does not exist.
|
||||||
*/
|
*/
|
||||||
public boolean unfenced(int brokerId) {
|
public boolean unfenced(int brokerId) {
|
||||||
|
@ -537,6 +537,16 @@ public class ClusterControlManager {
|
||||||
return !registration.fenced();
|
return !registration.fenced();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get a broker registration if it exists.
|
||||||
|
*
|
||||||
|
* @param brokerId The brokerId to get the registration for
|
||||||
|
* @return The current registration or null if the broker is not registered
|
||||||
|
*/
|
||||||
|
public BrokerRegistration registration(int brokerId) {
|
||||||
|
return brokerRegistrations.get(brokerId);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns true if the broker is in controlled shutdown state; Returns false
|
* Returns true if the broker is in controlled shutdown state; Returns false
|
||||||
* if it is not or if it does not exist.
|
* if it is not or if it does not exist.
|
||||||
|
|
|
@ -427,7 +427,7 @@ public final class QuorumController implements Controller {
|
||||||
return exception;
|
return exception;
|
||||||
}
|
}
|
||||||
log.warn("{}: failed with unknown server exception {} at epoch {} in {} us. " +
|
log.warn("{}: failed with unknown server exception {} at epoch {} in {} us. " +
|
||||||
"Reverting to last committed offset {}.",
|
"Renouncing leadership and reverting to the last committed offset {}.",
|
||||||
name, exception.getClass().getSimpleName(), curClaimEpoch, deltaUs,
|
name, exception.getClass().getSimpleName(), curClaimEpoch, deltaUs,
|
||||||
lastCommittedOffset, exception);
|
lastCommittedOffset, exception);
|
||||||
raftClient.resign(curClaimEpoch);
|
raftClient.resign(curClaimEpoch);
|
||||||
|
|
|
@ -956,7 +956,6 @@ public class ReplicationControlManager {
|
||||||
topic,
|
topic,
|
||||||
partitionId,
|
partitionId,
|
||||||
partition,
|
partition,
|
||||||
clusterControl::active,
|
|
||||||
context.requestHeader().requestApiVersion(),
|
context.requestHeader().requestApiVersion(),
|
||||||
partitionData);
|
partitionData);
|
||||||
|
|
||||||
|
@ -1048,7 +1047,6 @@ public class ReplicationControlManager {
|
||||||
* @param topic current topic information store by the replication manager
|
* @param topic current topic information store by the replication manager
|
||||||
* @param partitionId partition id being altered
|
* @param partitionId partition id being altered
|
||||||
* @param partition current partition registration for the partition being altered
|
* @param partition current partition registration for the partition being altered
|
||||||
* @param isEligibleReplica function telling if the replica is acceptable to join the ISR
|
|
||||||
* @param partitionData partition data from the alter partition request
|
* @param partitionData partition data from the alter partition request
|
||||||
*
|
*
|
||||||
* @return Errors.NONE for valid alter partition data; otherwise the validation error
|
* @return Errors.NONE for valid alter partition data; otherwise the validation error
|
||||||
|
@ -1058,7 +1056,6 @@ public class ReplicationControlManager {
|
||||||
TopicControlInfo topic,
|
TopicControlInfo topic,
|
||||||
int partitionId,
|
int partitionId,
|
||||||
PartitionRegistration partition,
|
PartitionRegistration partition,
|
||||||
Function<Integer, Boolean> isEligibleReplica,
|
|
||||||
short requestApiVersion,
|
short requestApiVersion,
|
||||||
AlterPartitionRequestData.PartitionData partitionData
|
AlterPartitionRequestData.PartitionData partitionData
|
||||||
) {
|
) {
|
||||||
|
@ -1125,9 +1122,7 @@ public class ReplicationControlManager {
|
||||||
return INVALID_REQUEST;
|
return INVALID_REQUEST;
|
||||||
}
|
}
|
||||||
|
|
||||||
List<Integer> ineligibleReplicas = partitionData.newIsr().stream()
|
List<IneligibleReplica> ineligibleReplicas = ineligibleReplicasForIsr(newIsr);
|
||||||
.filter(replica -> !isEligibleReplica.apply(replica))
|
|
||||||
.collect(Collectors.toList());
|
|
||||||
if (!ineligibleReplicas.isEmpty()) {
|
if (!ineligibleReplicas.isEmpty()) {
|
||||||
log.info("Rejecting AlterPartition request from node {} for {}-{} because " +
|
log.info("Rejecting AlterPartition request from node {} for {}-{} because " +
|
||||||
"it specified ineligible replicas {} in the new ISR {}.",
|
"it specified ineligible replicas {} in the new ISR {}.",
|
||||||
|
@ -1143,6 +1138,21 @@ public class ReplicationControlManager {
|
||||||
return Errors.NONE;
|
return Errors.NONE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private List<IneligibleReplica> ineligibleReplicasForIsr(int[] replicas) {
|
||||||
|
List<IneligibleReplica> ineligibleReplicas = new ArrayList<>(0);
|
||||||
|
for (Integer replicaId : replicas) {
|
||||||
|
BrokerRegistration registration = clusterControl.registration(replicaId);
|
||||||
|
if (registration == null) {
|
||||||
|
ineligibleReplicas.add(new IneligibleReplica(replicaId, "not registered"));
|
||||||
|
} else if (registration.inControlledShutdown()) {
|
||||||
|
ineligibleReplicas.add(new IneligibleReplica(replicaId, "shutting down"));
|
||||||
|
} else if (registration.fenced()) {
|
||||||
|
ineligibleReplicas.add(new IneligibleReplica(replicaId, "fenced"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ineligibleReplicas;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Generate the appropriate records to handle a broker being fenced.
|
* Generate the appropriate records to handle a broker being fenced.
|
||||||
*
|
*
|
||||||
|
@ -1906,4 +1916,19 @@ public class ReplicationControlManager {
|
||||||
ReplicationControlIterator iterator(long epoch) {
|
ReplicationControlIterator iterator(long epoch) {
|
||||||
return new ReplicationControlIterator(epoch);
|
return new ReplicationControlIterator(epoch);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static final class IneligibleReplica {
|
||||||
|
private final int replicaId;
|
||||||
|
private final String reason;
|
||||||
|
|
||||||
|
private IneligibleReplica(int replicaId, String reason) {
|
||||||
|
this.replicaId = replicaId;
|
||||||
|
this.reason = reason;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return replicaId + " (" + reason + ")";
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue