KAFKA-15360: Include dirs in BrokerRegistration #14392

BrokerLifecycleManager should send the offline log directories in the BrokerHeartbeatRequests it
sends. Also, when handling BrokerHeartbeatResponses, do so by enqueing a BrokerLifecycleManager
event, rather than trying to do the handling directly in the callback.

Reviewers: Colin P. McCabe <cmccabe@apache.org>, Proven Provenzano <pprovenzano@confluent.io>
This commit is contained in:
Igor Soarez 2023-09-11 22:41:06 +01:00 committed by Colin P. McCabe
parent fa472d26a5
commit eaa6b8abdd
3 changed files with 202 additions and 72 deletions

View File

@ -23,7 +23,7 @@ import kafka.utils.Logging
import org.apache.kafka.clients.ClientResponse
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.message.BrokerRegistrationRequestData.ListenerCollection
import org.apache.kafka.common.message.{BrokerHeartbeatRequestData, BrokerRegistrationRequestData}
import org.apache.kafka.common.message.{BrokerHeartbeatRequestData, BrokerHeartbeatResponseData, BrokerRegistrationRequestData, BrokerRegistrationResponseData}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{BrokerHeartbeatRequest, BrokerHeartbeatResponse, BrokerRegistrationRequest, BrokerRegistrationResponse}
import org.apache.kafka.metadata.{BrokerState, VersionRange}
@ -31,10 +31,9 @@ import org.apache.kafka.queue.EventQueue.DeadlineFunction
import org.apache.kafka.common.utils.{ExponentialBackoff, LogContext, Time}
import org.apache.kafka.queue.{EventQueue, KafkaEventQueue}
import java.util.OptionalLong
import java.util.{Comparator, OptionalLong}
import scala.jdk.CollectionConverters._
/**
* The broker lifecycle manager owns the broker state.
*
@ -57,7 +56,8 @@ class BrokerLifecycleManager(
val config: KafkaConfig,
val time: Time,
val threadNamePrefix: String,
val isZkBroker: Boolean
val isZkBroker: Boolean,
val logDirs: Set[Uuid] = Set.empty[Uuid]
) extends Logging {
private def logPrefix(): String = {
@ -98,7 +98,7 @@ class BrokerLifecycleManager(
/**
* The number of times we've tried and failed to communicate. This variable can only be
* read or written from the event queue thread.
* read or written from the BrokerToControllerRequestThread.
*/
private var failedAttempts = 0L
@ -147,6 +147,12 @@ class BrokerLifecycleManager(
*/
private var readyToUnfence = false
/**
* List of offline directories pending to be sent.
* This variable can only be read or written from the event queue thread.
*/
private var offlineDirsPending = Set[Uuid]()
/**
* True if we sent a event queue to the active controller requesting controlled
* shutdown. This variable can only be read or written from the event queue thread.
@ -229,6 +235,14 @@ class BrokerLifecycleManager(
initialUnfenceFuture
}
/**
* Propagate directory failures to the controller.
* @param directory The ID for the directory that failed.
*/
def propagateDirectoryFailure(directory: Uuid): Unit = {
eventQueue.append(new OfflineDirEvent(directory))
}
def brokerEpoch: Long = _brokerEpoch
def state: BrokerState = _state
@ -283,6 +297,19 @@ class BrokerLifecycleManager(
}
}
private class OfflineDirEvent(val dir: Uuid) extends EventQueue.Event {
override def run(): Unit = {
if (offlineDirsPending.isEmpty) {
offlineDirsPending = Set(dir)
} else {
offlineDirsPending = offlineDirsPending.incl(dir)
}
if (registered) {
scheduleNextCommunicationImmediately()
}
}
}
private class StartupEvent(highestMetadataOffsetProvider: () => Long,
channelManager: NodeToControllerChannelManager,
clusterId: String,
@ -316,6 +343,11 @@ class BrokerLifecycleManager(
setMinSupportedVersion(range.min()).
setMaxSupportedVersion(range.max()))
}
val sortedLogDirs = new util.ArrayList[Uuid]
logDirs.foreach(sortedLogDirs.add(_))
sortedLogDirs.sort(new Comparator[Uuid]() {
override def compare(a: Uuid, b: Uuid): Int = a.compareTo(b)
})
val data = new BrokerRegistrationRequestData().
setBrokerId(nodeId).
setIsMigratingZkBroker(isZkBroker).
@ -324,7 +356,8 @@ class BrokerLifecycleManager(
setIncarnationId(incarnationId).
setListeners(_advertisedListeners).
setRack(rack.orNull).
setPreviousBrokerEpoch(previousBrokerEpoch.orElse(-1L))
setPreviousBrokerEpoch(previousBrokerEpoch.orElse(-1L)).
setLogDirs(sortedLogDirs)
if (isDebugEnabled) {
debug(s"Sending broker registration $data")
}
@ -353,12 +386,10 @@ class BrokerLifecycleManager(
val message = response.responseBody().asInstanceOf[BrokerRegistrationResponse]
val errorCode = Errors.forCode(message.data().errorCode())
if (errorCode == Errors.NONE) {
failedAttempts = 0
_brokerEpoch = message.data().brokerEpoch()
registered = true
initialRegistrationSucceeded = true
info(s"Successfully registered broker $nodeId with broker epoch ${_brokerEpoch}")
scheduleNextCommunicationImmediately() // Immediately send a heartbeat
// this response handler is not invoked from the event handler thread,
// and processing a successful registration response requires updating
// state, so to continue we need to schedule an event
eventQueue.prepend(new BrokerRegistrationResponseEvent(message.data()))
} else {
info(s"Unable to register broker $nodeId because the controller returned " +
s"error $errorCode")
@ -373,6 +404,17 @@ class BrokerLifecycleManager(
}
}
private class BrokerRegistrationResponseEvent(response: BrokerRegistrationResponseData) extends EventQueue.Event {
override def run(): Unit = {
failedAttempts = 0
_brokerEpoch = response.brokerEpoch()
registered = true
initialRegistrationSucceeded = true
info(s"Successfully registered broker $nodeId with broker epoch ${_brokerEpoch}")
scheduleNextCommunicationImmediately() // Immediately send a heartbeat
}
}
private def sendBrokerHeartbeat(): Unit = {
val metadataOffset = _highestMetadataOffsetProvider()
val data = new BrokerHeartbeatRequestData().
@ -380,15 +422,16 @@ class BrokerLifecycleManager(
setBrokerId(nodeId).
setCurrentMetadataOffset(metadataOffset).
setWantFence(!readyToUnfence).
setWantShutDown(_state == BrokerState.PENDING_CONTROLLED_SHUTDOWN)
setWantShutDown(_state == BrokerState.PENDING_CONTROLLED_SHUTDOWN).
setOfflineLogDirs(offlineDirsPending.toSeq.asJava)
if (isTraceEnabled) {
trace(s"Sending broker heartbeat $data")
}
_channelManager.sendRequest(new BrokerHeartbeatRequest.Builder(data),
new BrokerHeartbeatResponseHandler())
val handler = new BrokerHeartbeatResponseHandler(offlineDirsPending)
_channelManager.sendRequest(new BrokerHeartbeatRequest.Builder(data), handler)
}
private class BrokerHeartbeatResponseHandler extends ControllerRequestCompletionHandler {
private class BrokerHeartbeatResponseHandler(dirsInFlight: Set[Uuid]) extends ControllerRequestCompletionHandler {
override def onComplete(response: ClientResponse): Unit = {
if (response.authenticationException() != null) {
error(s"Unable to send broker heartbeat for $nodeId because of an " +
@ -409,55 +452,10 @@ class BrokerLifecycleManager(
val message = response.responseBody().asInstanceOf[BrokerHeartbeatResponse]
val errorCode = Errors.forCode(message.data().errorCode())
if (errorCode == Errors.NONE) {
failedAttempts = 0
_state match {
case BrokerState.STARTING =>
if (message.data().isCaughtUp) {
info(s"The broker has caught up. Transitioning from STARTING to RECOVERY.")
_state = BrokerState.RECOVERY
initialCatchUpFuture.complete(null)
} else {
debug(s"The broker is STARTING. Still waiting to catch up with cluster metadata.")
}
// Schedule the heartbeat after only 10 ms so that in the case where
// there is no recovery work to be done, we start up a bit quicker.
scheduleNextCommunication(NANOSECONDS.convert(10, MILLISECONDS))
case BrokerState.RECOVERY =>
if (!message.data().isFenced) {
info(s"The broker has been unfenced. Transitioning from RECOVERY to RUNNING.")
initialUnfenceFuture.complete(null)
_state = BrokerState.RUNNING
} else {
info(s"The broker is in RECOVERY.")
}
scheduleNextCommunicationAfterSuccess()
case BrokerState.RUNNING =>
debug(s"The broker is RUNNING. Processing heartbeat response.")
scheduleNextCommunicationAfterSuccess()
case BrokerState.PENDING_CONTROLLED_SHUTDOWN =>
if (!message.data().shouldShutDown()) {
info(s"The broker is in PENDING_CONTROLLED_SHUTDOWN state, still waiting " +
"for the active controller.")
if (!gotControlledShutdownResponse) {
// If this is the first pending controlled shutdown response we got,
// schedule our next heartbeat a little bit sooner than we usually would.
// In the case where controlled shutdown completes quickly, this will
// speed things up a little bit.
scheduleNextCommunication(NANOSECONDS.convert(50, MILLISECONDS))
} else {
scheduleNextCommunicationAfterSuccess()
}
} else {
info(s"The controller has asked us to exit controlled shutdown.")
beginShutdown()
}
gotControlledShutdownResponse = true
case BrokerState.SHUTTING_DOWN =>
info(s"The broker is SHUTTING_DOWN. Ignoring heartbeat response.")
case _ =>
error(s"Unexpected broker state ${_state}")
scheduleNextCommunicationAfterSuccess()
}
// this response handler is not invoked from the event handler thread,
// and processing a successful heartbeat response requires updating
// state, so to continue we need to schedule an event
eventQueue.prepend(new BrokerHeartbeatResponseEvent(message.data(), dirsInFlight))
} else {
warn(s"Broker $nodeId sent a heartbeat request but received error $errorCode.")
scheduleNextCommunicationAfterFailure()
@ -471,6 +469,61 @@ class BrokerLifecycleManager(
}
}
private class BrokerHeartbeatResponseEvent(response: BrokerHeartbeatResponseData, dirsInFlight: Set[Uuid]) extends EventQueue.Event {
override def run(): Unit = {
failedAttempts = 0
offlineDirsPending = offlineDirsPending.diff(dirsInFlight)
_state match {
case BrokerState.STARTING =>
if (response.isCaughtUp) {
info(s"The broker has caught up. Transitioning from STARTING to RECOVERY.")
_state = BrokerState.RECOVERY
initialCatchUpFuture.complete(null)
} else {
debug(s"The broker is STARTING. Still waiting to catch up with cluster metadata.")
}
// Schedule the heartbeat after only 10 ms so that in the case where
// there is no recovery work to be done, we start up a bit quicker.
scheduleNextCommunication(NANOSECONDS.convert(10, MILLISECONDS))
case BrokerState.RECOVERY =>
if (!response.isFenced) {
info(s"The broker has been unfenced. Transitioning from RECOVERY to RUNNING.")
initialUnfenceFuture.complete(null)
_state = BrokerState.RUNNING
} else {
info(s"The broker is in RECOVERY.")
}
scheduleNextCommunicationAfterSuccess()
case BrokerState.RUNNING =>
debug(s"The broker is RUNNING. Processing heartbeat response.")
scheduleNextCommunicationAfterSuccess()
case BrokerState.PENDING_CONTROLLED_SHUTDOWN =>
if (!response.shouldShutDown()) {
info(s"The broker is in PENDING_CONTROLLED_SHUTDOWN state, still waiting " +
"for the active controller.")
if (!gotControlledShutdownResponse) {
// If this is the first pending controlled shutdown response we got,
// schedule our next heartbeat a little bit sooner than we usually would.
// In the case where controlled shutdown completes quickly, this will
// speed things up a little bit.
scheduleNextCommunication(NANOSECONDS.convert(50, MILLISECONDS))
} else {
scheduleNextCommunicationAfterSuccess()
}
} else {
info(s"The controller has asked us to exit controlled shutdown.")
beginShutdown()
}
gotControlledShutdownResponse = true
case BrokerState.SHUTTING_DOWN =>
info(s"The broker is SHUTTING_DOWN. Ignoring heartbeat response.")
case _ =>
error(s"Unexpected broker state ${_state}")
scheduleNextCommunicationAfterSuccess()
}
}
}
private def scheduleNextCommunicationImmediately(): Unit = scheduleNextCommunication(0)
private def scheduleNextCommunicationAfterFailure(): Unit = {

View File

@ -176,11 +176,6 @@ class BrokerServer(
config.dynamicConfig.initialize(zkClientOpt = None)
lifecycleManager = new BrokerLifecycleManager(config,
time,
s"broker-${config.nodeId}-",
isZkBroker = false)
/* start scheduler */
kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
kafkaScheduler.startup()
@ -207,6 +202,12 @@ class BrokerServer(
remoteLogManagerOpt = createRemoteLogManager()
lifecycleManager = new BrokerLifecycleManager(config,
time,
s"broker-${config.nodeId}-",
isZkBroker = false,
logDirs = logManager.directoryIds.values.toSet)
// Enable delegation token cache for all SCRAM mechanisms to simplify dynamic update.
// This keeps the cache up-to-date if new SCRAM mechanisms are enabled dynamically.
tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)

View File

@ -20,13 +20,16 @@ package kafka.server
import java.util.{Collections, OptionalLong, Properties}
import kafka.utils.TestUtils
import org.apache.kafka.common.Node
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.message.{BrokerHeartbeatResponseData, BrokerRegistrationResponseData}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{AbstractRequest, BrokerHeartbeatRequest, BrokerHeartbeatResponse, BrokerRegistrationRequest, BrokerRegistrationResponse}
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, BrokerHeartbeatRequest, BrokerHeartbeatResponse, BrokerRegistrationRequest, BrokerRegistrationResponse}
import org.apache.kafka.metadata.BrokerState
import org.junit.jupiter.api.{Test, Timeout}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{Test, Timeout}
import java.util.concurrent.{CompletableFuture, Future}
import scala.jdk.CollectionConverters._
@Timeout(value = 12)
class BrokerLifecycleManagerTest {
@ -38,6 +41,7 @@ class BrokerLifecycleManagerTest {
properties.setProperty(KafkaConfig.QuorumVotersProp, s"2@localhost:9093")
properties.setProperty(KafkaConfig.ControllerListenerNamesProp, "SSL")
properties.setProperty(KafkaConfig.InitialBrokerRegistrationTimeoutMsProp, "300000")
properties.setProperty(KafkaConfig.BrokerHeartbeatIntervalMsProp, "100")
properties
}
@ -182,4 +186,76 @@ class BrokerLifecycleManagerTest {
manager.controlledShutdownFuture.get()
manager.close()
}
def prepareResponse[T<:AbstractRequest](ctx: RegistrationTestContext, response: AbstractResponse): Future[T] = {
val result = new CompletableFuture[T]()
ctx.mockClient.prepareResponseFrom(
(body: AbstractRequest) => result.complete(body.asInstanceOf[T]),
response,
ctx.controllerNodeProvider.node.get
)
result
}
def poll[T](context: RegistrationTestContext, manager: BrokerLifecycleManager, future: Future[T]): T = {
while (!future.isDone || context.mockClient.hasInFlightRequests) {
context.poll()
manager.eventQueue.wakeup()
context.time.sleep(100)
}
future.get
}
@Test
def testOfflineDirsSentUntilHeartbeatSuccess(): Unit = {
val ctx = new RegistrationTestContext(configProperties)
val manager = new BrokerLifecycleManager(ctx.config, ctx.time, "offline-dirs-sent-in-heartbeat-", isZkBroker = false)
val controllerNode = new Node(3000, "localhost", 8021)
ctx.controllerNodeProvider.node.set(controllerNode)
val registration = prepareResponse(ctx, new BrokerRegistrationResponse(new BrokerRegistrationResponseData().setBrokerEpoch(1000)))
val hb1 = prepareResponse[BrokerHeartbeatRequest](ctx, new BrokerHeartbeatResponse(new BrokerHeartbeatResponseData()
.setErrorCode(Errors.NOT_CONTROLLER.code())))
val hb2 = prepareResponse[BrokerHeartbeatRequest](ctx, new BrokerHeartbeatResponse(new BrokerHeartbeatResponseData()))
val hb3 = prepareResponse[BrokerHeartbeatRequest](ctx, new BrokerHeartbeatResponse(new BrokerHeartbeatResponseData()))
val offlineDirs = Set(Uuid.fromString("h3sC4Yk-Q9-fd0ntJTocCA"), Uuid.fromString("ej8Q9_d2Ri6FXNiTxKFiow"))
offlineDirs.foreach(manager.propagateDirectoryFailure)
// start the manager late to prevent a race, and force expectations on the first heartbeat
manager.start(() => ctx.highestMetadataOffset.get(),
ctx.mockChannelManager, ctx.clusterId, ctx.advertisedListeners,
Collections.emptyMap(), OptionalLong.empty())
poll(ctx, manager, registration)
val dirs1 = poll(ctx, manager, hb1).data().offlineLogDirs()
val dirs2 = poll(ctx, manager, hb2).data().offlineLogDirs()
val dirs3 = poll(ctx, manager, hb3).data().offlineLogDirs()
assertEquals(offlineDirs, dirs1.asScala.toSet)
assertEquals(offlineDirs, dirs2.asScala.toSet)
assertEquals(Set.empty, dirs3.asScala.toSet)
manager.close()
}
@Test
def testRegistrationIncludesDirs(): Unit = {
val logDirs = Set("ad5FLIeCTnaQdai5vOjeng", "ybdzUKmYSLK6oiIpI6CPlw").map(Uuid.fromString)
val ctx = new RegistrationTestContext(configProperties)
val manager = new BrokerLifecycleManager(ctx.config, ctx.time, "registration-includes-dirs-",
isZkBroker = false, logDirs)
val controllerNode = new Node(3000, "localhost", 8021)
ctx.controllerNodeProvider.node.set(controllerNode)
val registration = prepareResponse(ctx, new BrokerRegistrationResponse(new BrokerRegistrationResponseData().setBrokerEpoch(1000)))
manager.start(() => ctx.highestMetadataOffset.get(),
ctx.mockChannelManager, ctx.clusterId, ctx.advertisedListeners,
Collections.emptyMap(), OptionalLong.empty())
val request = poll(ctx, manager, registration).asInstanceOf[BrokerRegistrationRequest]
assertEquals(logDirs, request.data.logDirs().asScala.toSet)
manager.close()
}
}