From 8b49130b928553ad8ee24188b591c2d006d9a394 Mon Sep 17 00:00:00 2001 From: Hong-Yi Chen Date: Mon, 2 Jun 2025 01:18:15 +0800 Subject: [PATCH] KAFKA-19355 Remove interBrokerListenerName from ClusterControlManager (#19866) Following the removal of the ZK-to-KRaft migration code in commit 85bfdf4, controller-to-broker communication is now handled by the control-plane listener (`controller.listener.names`). The `interBrokerListenerName` parameter in `ClusterControlManager` is no longer referenced on the controller side and can be safely removed as dead code. Reviewers: Lan Ding , Ken Huang , Chia-Ping Tsai --- .../main/scala/kafka/server/ControllerServer.scala | 1 - .../kafka/controller/ClusterControlManager.java | 13 ------------- .../apache/kafka/controller/QuorumController.java | 8 -------- 3 files changed, 22 deletions(-) diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala index ff6821b1bb8..e8427fa7e53 100644 --- a/core/src/main/scala/kafka/server/ControllerServer.scala +++ b/core/src/main/scala/kafka/server/ControllerServer.scala @@ -252,7 +252,6 @@ class ControllerServer( setDelegationTokenExpiryTimeMs(delegationTokenManagerConfigs.delegationTokenExpiryTimeMs). setDelegationTokenExpiryCheckIntervalMs(delegationTokenManagerConfigs.delegationTokenExpiryCheckIntervalMs). setUncleanLeaderElectionCheckIntervalMs(config.uncleanLeaderElectionCheckIntervalMs). - setInterBrokerListenerName(config.interBrokerListenerName.value()). setControllerPerformanceSamplePeriodMs(config.controllerPerformanceSamplePeriodMs). setControllerPerformanceAlwaysLogThresholdMs(config.controllerPerformanceAlwaysLogThresholdMs) } diff --git a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java index 64394b07586..14fce5ca0f3 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java @@ -92,7 +92,6 @@ public class ClusterControlManager { private ReplicaPlacer replicaPlacer = null; private FeatureControlManager featureControl = null; private BrokerShutdownHandler brokerShutdownHandler = null; - private String interBrokerListenerName = "PLAINTEXT"; private QuorumControllerMetrics metrics = null; Builder setLogContext(LogContext logContext) { @@ -135,10 +134,6 @@ public class ClusterControlManager { return this; } - Builder setInterBrokerListenerName(String interBrokerListenerName) { - this.interBrokerListenerName = interBrokerListenerName; - return this; - } Builder setMetrics(QuorumControllerMetrics metrics) { this.metrics = metrics; @@ -175,7 +170,6 @@ public class ClusterControlManager { replicaPlacer, featureControl, brokerShutdownHandler, - interBrokerListenerName, metrics ); } @@ -265,11 +259,6 @@ public class ClusterControlManager { private final BrokerShutdownHandler brokerShutdownHandler; - /** - * The statically configured inter-broker listener name. - */ - private final String interBrokerListenerName; - /** * Maps controller IDs to controller registrations. */ @@ -294,7 +283,6 @@ public class ClusterControlManager { ReplicaPlacer replicaPlacer, FeatureControlManager featureControl, BrokerShutdownHandler brokerShutdownHandler, - String interBrokerListenerName, QuorumControllerMetrics metrics ) { this.logContext = logContext; @@ -311,7 +299,6 @@ public class ClusterControlManager { this.controllerRegistrations = new TimelineHashMap<>(snapshotRegistry, 0); this.directoryToBroker = new TimelineHashMap<>(snapshotRegistry, 0); this.brokerShutdownHandler = brokerShutdownHandler; - this.interBrokerListenerName = interBrokerListenerName; this.metrics = metrics; } diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index 365e9503dcb..7dee1e3cd3d 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -220,7 +220,6 @@ public final class QuorumController implements Controller { private long delegationTokenExpiryTimeMs; private long delegationTokenExpiryCheckIntervalMs = TimeUnit.MINUTES.toMillis(5); private long uncleanLeaderElectionCheckIntervalMs = TimeUnit.MINUTES.toMillis(5); - private String interBrokerListenerName = "PLAINTEXT"; public Builder(int nodeId, String clusterId) { this.nodeId = nodeId; @@ -381,10 +380,6 @@ public final class QuorumController implements Controller { return this; } - public Builder setInterBrokerListenerName(String interBrokerListenerName) { - this.interBrokerListenerName = interBrokerListenerName; - return this; - } public QuorumController build() throws Exception { if (raftClient == null) { @@ -443,7 +438,6 @@ public final class QuorumController implements Controller { delegationTokenExpiryTimeMs, delegationTokenExpiryCheckIntervalMs, uncleanLeaderElectionCheckIntervalMs, - interBrokerListenerName, controllerPerformanceSamplePeriodMs, controllerPerformanceAlwaysLogThresholdMs ); @@ -1488,7 +1482,6 @@ public final class QuorumController implements Controller { long delegationTokenExpiryTimeMs, long delegationTokenExpiryCheckIntervalMs, long uncleanLeaderElectionCheckIntervalMs, - String interBrokerListenerName, long controllerPerformanceSamplePeriodMs, long controllerPerformanceAlwaysLogThresholdMs ) { @@ -1530,7 +1523,6 @@ public final class QuorumController implements Controller { setReplicaPlacer(replicaPlacer). setFeatureControlManager(featureControl). setBrokerShutdownHandler(this::handleBrokerShutdown). - setInterBrokerListenerName(interBrokerListenerName). setMetrics(controllerMetrics). build(); this.configurationControl = new ConfigurationControlManager.Builder().