MINOR: Replace lambda expressions with method references for ReplicationControlManager (#16547)

Reviewers: Xuan-Zhang Gong <gongxuanzhangmelt@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Logan Zhu 2024-07-24 19:44:54 +08:00 committed by GitHub
parent baedfc7e04
commit 3589f45656
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 4 additions and 8 deletions

View File

@ -1216,7 +1216,7 @@ public class ReplicationControlManager {
} }
int[] newIsr = partitionData.newIsrWithEpochs().stream() int[] newIsr = partitionData.newIsrWithEpochs().stream()
.mapToInt(brokerState -> brokerState.brokerId()).toArray(); .mapToInt(BrokerState::brokerId).toArray();
if (!Replicas.validateIsr(partition.replicas, newIsr)) { if (!Replicas.validateIsr(partition.replicas, newIsr)) {
log.error("Rejecting AlterPartition request from node {} for {}-{} because " + log.error("Rejecting AlterPartition request from node {} for {}-{} because " +
@ -1560,6 +1560,7 @@ public class ReplicationControlManager {
if (states.current() != states.next()) { if (states.current() != states.next()) {
switch (states.next()) { switch (states.next()) {
case FENCED: case FENCED:
case SHUTDOWN_NOW:
handleBrokerFenced(brokerId, records); handleBrokerFenced(brokerId, records);
break; break;
case UNFENCED: case UNFENCED:
@ -1568,9 +1569,6 @@ public class ReplicationControlManager {
case CONTROLLED_SHUTDOWN: case CONTROLLED_SHUTDOWN:
handleBrokerInControlledShutdown(brokerId, brokerEpoch, records); handleBrokerInControlledShutdown(brokerId, brokerEpoch, records);
break; break;
case SHUTDOWN_NOW:
handleBrokerFenced(brokerId, records);
break;
} }
} }
heartbeatManager.touch(brokerId, heartbeatManager.touch(brokerId,
@ -2078,7 +2076,7 @@ public class ReplicationControlManager {
tp.partitionId(), tp.partitionId(),
new LeaderAcceptor(clusterControl, part), new LeaderAcceptor(clusterControl, part),
featureControl.metadataVersion(), featureControl.metadataVersion(),
getTopicEffectiveMinIsr(topics.get(tp.topicId()).name.toString()) getTopicEffectiveMinIsr(topics.get(tp.topicId()).name)
); );
builder.setZkMigrationEnabled(clusterControl.zkRegistrationAllowed()); builder.setZkMigrationEnabled(clusterControl.zkRegistrationAllowed());
builder.setEligibleLeaderReplicasEnabled(isElrEnabled()); builder.setEligibleLeaderReplicasEnabled(isElrEnabled());
@ -2202,9 +2200,7 @@ public class ReplicationControlManager {
for (int partitionId : partitionIds) { for (int partitionId : partitionIds) {
Optional<OngoingPartitionReassignment> ongoing = Optional<OngoingPartitionReassignment> ongoing =
getOngoingPartitionReassignment(topicInfo, partitionId); getOngoingPartitionReassignment(topicInfo, partitionId);
if (ongoing.isPresent()) { ongoing.ifPresent(ongoingPartitionReassignment -> ongoingTopic.partitions().add(ongoingPartitionReassignment));
ongoingTopic.partitions().add(ongoing.get());
}
} }
if (!ongoingTopic.partitions().isEmpty()) { if (!ongoingTopic.partitions().isEmpty()) {
response.topics().add(ongoingTopic); response.topics().add(ongoingTopic);