diff --git a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala index b671c700ac6..86395f015c4 100644 --- a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala +++ b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala @@ -78,9 +78,11 @@ class MetadataCacheControllerNodeProvider( } object RaftControllerNodeProvider { - def apply(raftManager: RaftManager[ApiMessageAndVersion], - config: KafkaConfig, - controllerQuorumVoterNodes: Seq[Node]): RaftControllerNodeProvider = { + def apply( + raftManager: RaftManager[ApiMessageAndVersion], + config: KafkaConfig, + controllerQuorumVoterNodes: Seq[Node] + ): RaftControllerNodeProvider = { val controllerListenerName = new ListenerName(config.controllerListenerNames.head) val controllerSecurityProtocol = config.effectiveListenerSecurityProtocolMap.getOrElse(controllerListenerName, SecurityProtocol.forName(controllerListenerName.value())) val controllerSaslMechanism = config.saslMechanismControllerProtocol @@ -98,12 +100,13 @@ object RaftControllerNodeProvider { * Finds the controller node by checking the metadata log manager. * This provider is used when we are using a Raft-based metadata quorum. */ -class RaftControllerNodeProvider(val raftManager: RaftManager[ApiMessageAndVersion], - controllerQuorumVoterNodes: Seq[Node], - val listenerName: ListenerName, - val securityProtocol: SecurityProtocol, - val saslMechanism: String - ) extends ControllerNodeProvider with Logging { +class RaftControllerNodeProvider( + val raftManager: RaftManager[ApiMessageAndVersion], + controllerQuorumVoterNodes: Seq[Node], + val listenerName: ListenerName, + val securityProtocol: SecurityProtocol, + val saslMechanism: String +) extends ControllerNodeProvider with Logging { val idToNode = controllerQuorumVoterNodes.map(node => node.id() -> node).toMap override def get(): Option[Node] = { @@ -133,7 +136,6 @@ object BrokerToControllerChannelManager { } } - trait BrokerToControllerChannelManager { def start(): Unit def shutdown(): Unit @@ -144,7 +146,6 @@ trait BrokerToControllerChannelManager { ): Unit } - /** * This class manages the connection between a broker and the controller. It runs a single * [[BrokerToControllerRequestThread]] which uses the broker's metadata cache as its own metadata to find @@ -250,13 +251,14 @@ class BrokerToControllerChannelManagerImpl( )) } - def controllerApiVersions(): Option[NodeApiVersions] = - requestThread.activeControllerAddress().flatMap( - activeController => if (activeController.id() == config.brokerId) + def controllerApiVersions(): Option[NodeApiVersions] = { + requestThread.activeControllerAddress().flatMap { activeController => + if (activeController.id == config.brokerId) Some(currentNodeApiVersions) else - Option(apiVersions.get(activeController.idString())) - ) + Option(apiVersions.get(activeController.idString)) + } + } } abstract class ControllerRequestCompletionHandler extends RequestCompletionHandler { @@ -351,10 +353,10 @@ class BrokerToControllerRequestThread( requestQueue.putFirst(queueItem) } else if (response.responseBody().errorCounts().containsKey(Errors.NOT_CONTROLLER)) { // just close the controller connection and wait for metadata cache update in doWork - activeControllerAddress().foreach { controllerAddress => { + activeControllerAddress().foreach { controllerAddress => networkClient.disconnect(controllerAddress.idString) updateControllerAddress(null) - }} + } requestQueue.putFirst(queueItem) } else {