From 3589f45656c4fad3e018176dd9f1c7dcb5b03024 Mon Sep 17 00:00:00 2001 From: Logan Zhu Date: Wed, 24 Jul 2024 19:44:54 +0800 Subject: [PATCH] MINOR: Replace lambda expressions with method references for ReplicationControlManager (#16547) Reviewers: Xuan-Zhang Gong , Chia-Ping Tsai --- .../kafka/controller/ReplicationControlManager.java | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java index 31a2b725122..81abfbeb537 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java @@ -1216,7 +1216,7 @@ public class ReplicationControlManager { } int[] newIsr = partitionData.newIsrWithEpochs().stream() - .mapToInt(brokerState -> brokerState.brokerId()).toArray(); + .mapToInt(BrokerState::brokerId).toArray(); if (!Replicas.validateIsr(partition.replicas, newIsr)) { log.error("Rejecting AlterPartition request from node {} for {}-{} because " + @@ -1560,6 +1560,7 @@ public class ReplicationControlManager { if (states.current() != states.next()) { switch (states.next()) { case FENCED: + case SHUTDOWN_NOW: handleBrokerFenced(brokerId, records); break; case UNFENCED: @@ -1568,9 +1569,6 @@ public class ReplicationControlManager { case CONTROLLED_SHUTDOWN: handleBrokerInControlledShutdown(brokerId, brokerEpoch, records); break; - case SHUTDOWN_NOW: - handleBrokerFenced(brokerId, records); - break; } } heartbeatManager.touch(brokerId, @@ -2078,7 +2076,7 @@ public class ReplicationControlManager { tp.partitionId(), new LeaderAcceptor(clusterControl, part), featureControl.metadataVersion(), - getTopicEffectiveMinIsr(topics.get(tp.topicId()).name.toString()) + getTopicEffectiveMinIsr(topics.get(tp.topicId()).name) ); builder.setZkMigrationEnabled(clusterControl.zkRegistrationAllowed()); builder.setEligibleLeaderReplicasEnabled(isElrEnabled()); @@ -2202,9 +2200,7 @@ public class ReplicationControlManager { for (int partitionId : partitionIds) { Optional ongoing = getOngoingPartitionReassignment(topicInfo, partitionId); - if (ongoing.isPresent()) { - ongoingTopic.partitions().add(ongoing.get()); - } + ongoing.ifPresent(ongoingPartitionReassignment -> ongoingTopic.partitions().add(ongoingPartitionReassignment)); } if (!ongoingTopic.partitions().isEmpty()) { response.topics().add(ongoingTopic);