KAFKA-10350: add forwarding manager implementation with metrics (#9580)

add forwarding manager implementation with metrics

Reviewers: David Arthur <mumrah@gmail.com>
This commit is contained in:
Boyang Chen 2020-11-11 23:21:10 -08:00 committed by GitHub
parent 156584915a
commit bb34c5c8cc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 95 additions and 53 deletions

View File

@ -279,6 +279,7 @@ object RequestChannel extends Logging {
.append(",principal:").append(session.principal)
.append(",listener:").append(context.listenerName.value)
.append(",clientInformation:").append(context.clientInformation)
.append(",forwarded:").append(isForwarded)
if (temporaryMemoryBytes > 0)
builder.append(",temporaryMemoryBytes:").append(temporaryMemoryBytes)
if (messageConversionsTimeMs > 0)

View File

@ -20,34 +20,28 @@ package kafka.server
import java.util.concurrent.{LinkedBlockingDeque, TimeUnit}
import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler}
import kafka.network.RequestChannel
import kafka.utils.Logging
import org.apache.kafka.clients._
import org.apache.kafka.common.Node
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network._
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, EnvelopeRequest, EnvelopeResponse}
import org.apache.kafka.common.requests.AbstractRequest
import org.apache.kafka.common.security.JaasContext
import org.apache.kafka.common.utils.{LogContext, Time}
import scala.collection.mutable
import scala.jdk.CollectionConverters._
import scala.compat.java8.OptionConverters._
trait BrokerToControllerChannelManager {
def sendRequest(request: AbstractRequest.Builder[_ <: AbstractRequest],
callback: RequestCompletionHandler): Unit
def forwardRequest(request: RequestChannel.Request, responseCallback: AbstractResponse => Unit): Unit
def start(): Unit
def shutdown(): Unit
}
/**
* This class manages the connection between a broker and the controller. It runs a single
* {@link BrokerToControllerRequestThread} which uses the broker's metadata cache as its own metadata to find
@ -61,7 +55,7 @@ class BrokerToControllerChannelManagerImpl(metadataCache: kafka.server.MetadataC
config: KafkaConfig,
channelName: String,
threadNamePrefix: Option[String] = None) extends BrokerToControllerChannelManager with Logging {
private val requestQueue = new LinkedBlockingDeque[BrokerToControllerQueueItem]
protected val requestQueue = new LinkedBlockingDeque[BrokerToControllerQueueItem]
private val logContext = new LogContext(s"[broker-${config.brokerId}-to-controller] ")
private val manualMetadataUpdater = new ManualMetadataUpdater()
private val requestThread = newRequestThread
@ -135,44 +129,6 @@ class BrokerToControllerChannelManagerImpl(metadataCache: kafka.server.MetadataC
requestQueue.put(BrokerToControllerQueueItem(request, callback))
requestThread.wakeup()
}
def forwardRequest(
request: RequestChannel.Request,
responseCallback: AbstractResponse => Unit
): Unit = {
val principalSerde = request.context.principalSerde.asScala.getOrElse(
throw new IllegalArgumentException(s"Cannot deserialize principal from request $request " +
"since there is no serde defined")
)
val serializedPrincipal = principalSerde.serialize(request.context.principal)
val forwardRequestBuffer = request.buffer.duplicate()
forwardRequestBuffer.flip()
val envelopeRequest = new EnvelopeRequest.Builder(
forwardRequestBuffer,
serializedPrincipal,
request.context.clientAddress.getAddress
)
def onClientResponse(clientResponse: ClientResponse): Unit = {
val envelopeResponse = clientResponse.responseBody.asInstanceOf[EnvelopeResponse]
val envelopeError = envelopeResponse.error()
val response = if (envelopeError != Errors.NONE) {
// An envelope error indicates broker misconfiguration (e.g. the principal serde
// might not be defined on the receiving broker). In this case, we do not return
// the error directly to the client since it would not be expected. Instead we
// return `UNKNOWN_SERVER_ERROR` so that the user knows that there is a problem
// on the broker.
debug(s"Forwarded request $request failed with an error in envelope response $envelopeError")
request.body[AbstractRequest].getErrorResponse(Errors.UNKNOWN_SERVER_ERROR.exception())
} else {
AbstractResponse.deserializeBody(envelopeResponse.responseData, request.header)
}
responseCallback(response)
}
requestQueue.put(BrokerToControllerQueueItem(envelopeRequest, onClientResponse))
requestThread.wakeup()
}
}
case class BrokerToControllerQueueItem(request: AbstractRequest.Builder[_ <: AbstractRequest],

View File

@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import kafka.metrics.KafkaMetricsGroup
import kafka.network.RequestChannel
import org.apache.kafka.clients.ClientResponse
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, EnvelopeRequest, EnvelopeResponse}
import org.apache.kafka.common.utils.Time
import scala.compat.java8.OptionConverters._
class ForwardingManager(metadataCache: kafka.server.MetadataCache,
time: Time,
metrics: Metrics,
config: KafkaConfig,
threadNamePrefix: Option[String] = None) extends
BrokerToControllerChannelManagerImpl(metadataCache, time, metrics,
config, "forwardingChannel", threadNamePrefix) with KafkaMetricsGroup {
private val forwardingMetricName = "NumRequestsForwardingToControllerPerSec"
def forwardRequest(request: RequestChannel.Request,
responseCallback: AbstractResponse => Unit): Unit = {
val principalSerde = request.context.principalSerde.asScala.getOrElse(
throw new IllegalArgumentException(s"Cannot deserialize principal from request $request " +
"since there is no serde defined")
)
val serializedPrincipal = principalSerde.serialize(request.context.principal)
val forwardRequestBuffer = request.buffer.duplicate()
forwardRequestBuffer.flip()
val envelopeRequest = new EnvelopeRequest.Builder(
forwardRequestBuffer,
serializedPrincipal,
request.context.clientAddress.getAddress
)
def onClientResponse(clientResponse: ClientResponse): Unit = {
val envelopeResponse = clientResponse.responseBody.asInstanceOf[EnvelopeResponse]
val envelopeError = envelopeResponse.error()
val response = if (envelopeError != Errors.NONE) {
// An envelope error indicates broker misconfiguration (e.g. the principal serde
// might not be defined on the receiving broker). In this case, we do not return
// the error directly to the client since it would not be expected. Instead we
// return `UNKNOWN_SERVER_ERROR` so that the user knows that there is a problem
// on the broker.
debug(s"Forwarded request $request failed with an error in envelope response $envelopeError")
request.body[AbstractRequest].getErrorResponse(Errors.UNKNOWN_SERVER_ERROR.exception())
} else {
AbstractResponse.deserializeBody(envelopeResponse.responseData, request.header)
}
responseCallback(response)
}
sendRequest(envelopeRequest, onClientResponse)
}
override def start(): Unit = {
super.start()
newGauge(forwardingMetricName, () => requestQueue.size())
}
override def shutdown(): Unit = {
removeMetric(forwardingMetricName)
super.shutdown()
}
}

View File

@ -99,7 +99,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val groupCoordinator: GroupCoordinator,
val txnCoordinator: TransactionCoordinator,
val controller: KafkaController,
val forwardingManager: BrokerToControllerChannelManager,
val forwardingManager: ForwardingManager,
val zkClient: KafkaZkClient,
val brokerId: Int,
val config: KafkaConfig,

View File

@ -168,7 +168,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
var kafkaController: KafkaController = null
var forwardingManager: BrokerToControllerChannelManager = null
var forwardingManager: ForwardingManager = null
var alterIsrChannelManager: BrokerToControllerChannelManager = null
@ -331,7 +331,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
if (config.metadataQuorumEnabled) {
/* start forwarding manager */
forwardingManager = new BrokerToControllerChannelManagerImpl(metadataCache, time, metrics, config, "forwardingChannel", threadNamePrefix)
forwardingManager = new ForwardingManager(metadataCache, time, metrics, config, threadNamePrefix)
forwardingManager.start()
}

View File

@ -84,7 +84,7 @@ class KafkaApisTest {
private val adminManager: AdminManager = EasyMock.createNiceMock(classOf[AdminManager])
private val txnCoordinator: TransactionCoordinator = EasyMock.createNiceMock(classOf[TransactionCoordinator])
private val controller: KafkaController = EasyMock.createNiceMock(classOf[KafkaController])
private val forwardingManager: BrokerToControllerChannelManager = EasyMock.createNiceMock(classOf[BrokerToControllerChannelManager])
private val forwardingManager: ForwardingManager = EasyMock.createNiceMock(classOf[ForwardingManager])
private val hostAddress: Array[Byte] = InetAddress.getByName("192.168.1.1").getAddress
private val kafkaPrincipalSerde: Option[KafkaPrincipalSerde] = Option(new KafkaPrincipalSerde {
override def serialize(principal: KafkaPrincipal): Array[Byte] = null

View File

@ -23,13 +23,13 @@ import kafka.coordinator.transaction.TransactionCoordinator;
import kafka.network.RequestChannel;
import kafka.server.AdminManager;
import kafka.server.BrokerFeatures;
import kafka.server.BrokerToControllerChannelManager;
import kafka.server.BrokerTopicStats;
import kafka.server.ClientQuotaManager;
import kafka.server.ClientRequestQuotaManager;
import kafka.server.ControllerMutationQuotaManager;
import kafka.server.FetchManager;
import kafka.server.FinalizedFeatureCache;
import kafka.server.ForwardingManager;
import kafka.server.KafkaApis;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
@ -98,7 +98,7 @@ public class MetadataRequestBenchmark {
private AdminManager adminManager = Mockito.mock(AdminManager.class);
private TransactionCoordinator transactionCoordinator = Mockito.mock(TransactionCoordinator.class);
private KafkaController kafkaController = Mockito.mock(KafkaController.class);
private BrokerToControllerChannelManager brokerToControllerChannelManager = Mockito.mock(BrokerToControllerChannelManager.class);
private ForwardingManager forwardingManager = Mockito.mock(ForwardingManager.class);
private KafkaZkClient kafkaZkClient = Mockito.mock(KafkaZkClient.class);
private Metrics metrics = new Metrics();
private int brokerId = 1;
@ -175,7 +175,7 @@ public class MetadataRequestBenchmark {
groupCoordinator,
transactionCoordinator,
kafkaController,
brokerToControllerChannelManager,
forwardingManager,
kafkaZkClient,
brokerId,
new KafkaConfig(kafkaProps),