KAFKA-14493: Introduce Zk to KRaft migration state machine STUBs in KRaft controller. (#12998)

This patch introduces a preliminary state machine that can be used by KRaft
controller to drive online migration from Zk to KRaft.

MigrationState -- Defines the states we can have while migration from Zk to
KRaft.

KRaftMigrationDriver -- Defines the state transitions, and events to handle
actions like controller change, metadata change, broker change and have
interfaces through which it claims Zk controllership, performs zk writes and
sends RPCs to ZkBrokers.

MigrationClient -- Interface that defines the functions used to claim and
relinquish Zk controllership, read to and write from Zk.

Co-authored-by: David Arthur <mumrah@gmail.com>
Reviewers: Colin P. McCabe <cmccabe@apache.org>
This commit is contained in:
Akhilesh C 2023-01-09 10:44:11 -08:00 committed by Colin P. McCabe
parent 373d6ac470
commit 79c19da68d
52 changed files with 1388 additions and 185 deletions

View File

@ -89,6 +89,7 @@
<allow pkg="kafka.network"/>
<allow pkg="kafka.api"/>
<allow pkg="kafka.server"/>
<allow pkg="kafka.zk" />
<allow pkg="org.apache.kafka.clients.admin"/>
<allow pkg="integration.kafka.server" class="IntegrationTestHelper"/>
<subpackage name="annotation">

View File

@ -279,6 +279,8 @@
<allow pkg="org.apache.kafka.raft" />
<allow pkg="org.apache.kafka.server.authorizer" />
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.server.fault" />
<allow pkg="org.apache.kafka.server.config" />
<allow pkg="org.apache.kafka.server.util"/>
<allow pkg="org.apache.kafka.test" />
<subpackage name="authorizer">

View File

@ -36,7 +36,7 @@ public class BrokerRegistrationRequest extends AbstractRequest {
@Override
public short oldestAllowedVersion() {
if (data.isMigratingZkBroker()) {
if (data.migratingZkBrokerEpoch() != -1) {
return (short) 1;
} else {
return (short) 0;

View File

@ -13,6 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// Version 1 adds Zk broker epoch to the request if the broker is migrating from Zk mode to KRaft mode.
{
"apiKey":62,
"type": "request",
@ -51,7 +52,7 @@
},
{ "name": "Rack", "type": "string", "versions": "0+", "nullableVersions": "0+",
"about": "The rack which this broker is in." },
{ "name": "IsMigratingZkBroker", "type": "bool", "versions": "1+", "default": "false",
"about": "Set by a ZK broker if the required configurations for ZK migration are present." }
{ "name": "MigratingZkBrokerEpoch", "type": "int64", "versions": "1+", "default": "-1",
"about": "If the required configurations for ZK migration are present, this value is set to the ZK broker epoch" }
]
}

View File

@ -13,6 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// Version 1 adds Zk broker epoch to the request if the broker is migrating from Zk mode to KRaft mode.
{
"apiKey": 62,
"type": "response",

View File

@ -68,13 +68,13 @@ sealed class MigrationControllerChannelContext(
}
override val liveBrokerIdAndEpochs: collection.Map[Int, Long] = {
image.cluster().zkBrokers().asScala.map {
image.cluster().brokers().asScala.map {
case (brokerId, broker) => brokerId.intValue() -> broker.epoch()
}
}
override val liveOrShuttingDownBrokers: collection.Set[Broker] = {
image.cluster().zkBrokers().asScala.values.map { registration =>
image.cluster().brokers().asScala.values.map { registration =>
Broker.fromBrokerRegistration(registration)
}.toSet
}

View File

@ -32,10 +32,10 @@ import scala.jdk.CollectionConverters._
class MigrationPropagator(
nodeId: Int,
config: KafkaConfig,
metadataVersionProvider: () => MetadataVersion,
config: KafkaConfig
) extends LegacyPropagator {
@volatile private var _image = MetadataImage.EMPTY
@volatile private var metadataVersion = MetadataVersion.IBP_3_4_IV0
val stateChangeLogger = new StateChangeLogger(nodeId, inControllerContext = true, None)
val channelManager = new ControllerChannelManager(
() => _image.highestOffsetAndEpoch().epoch(),
@ -48,7 +48,7 @@ class MigrationPropagator(
val requestBatch = new MigrationPropagatorBatch(
config,
metadataProvider,
metadataVersionProvider,
() => metadataVersion,
channelManager,
stateChangeLogger
)
@ -201,4 +201,8 @@ class MigrationPropagator(
override def clear(): Unit = {
requestBatch.clear()
}
override def setMetadataVersion(newMetadataVersion: MetadataVersion): Unit = {
metadataVersion = newMetadataVersion
}
}

View File

@ -55,7 +55,8 @@ class BrokerLifecycleManager(
val config: KafkaConfig,
val time: Time,
val threadNamePrefix: Option[String],
val isZkBroker: Boolean = false
val isZkBroker: Boolean,
val zkBrokerEpochSupplier: () => Long
) extends Logging {
val logContext = new LogContext(s"[BrokerLifecycleManager id=${config.nodeId}] ")
@ -271,7 +272,7 @@ class BrokerLifecycleManager(
_advertisedListeners = advertisedListeners.duplicate()
_supportedFeatures = new util.HashMap[String, VersionRange](supportedFeatures)
if (!isZkBroker) {
// ZK brokers don't block on registration during startup
// Only KRaft brokers block on registration during startup
eventQueue.scheduleDeferred("initialRegistrationTimeout",
new DeadlineFunction(time.nanoseconds() + initialTimeoutNs),
new RegistrationTimeoutEvent())
@ -290,9 +291,20 @@ class BrokerLifecycleManager(
setMinSupportedVersion(range.min()).
setMaxSupportedVersion(range.max()))
}
val migrationZkBrokerEpoch: Long = {
if (isZkBroker) {
val zkBrokerEpoch: Long = Option(zkBrokerEpochSupplier).map(_.apply()).getOrElse(-1)
if (zkBrokerEpoch < 0) {
throw new IllegalStateException("Trying to sending BrokerRegistration in migration Zk " +
"broker without valid zk broker epoch")
}
zkBrokerEpoch
} else
-1
}
val data = new BrokerRegistrationRequestData().
setBrokerId(nodeId).
setIsMigratingZkBroker(isZkBroker).
setMigratingZkBrokerEpoch(migrationZkBrokerEpoch).
setClusterId(_clusterId).
setFeatures(features).
setIncarnationId(incarnationId).

View File

@ -103,10 +103,16 @@ class RawMetaProperties(val props: Properties = new Properties()) {
object MetaProperties {
def parse(properties: RawMetaProperties): MetaProperties = {
properties.requireVersion(expectedVersion = 1)
val clusterId = require(ClusterIdKey, properties.clusterId)
val nodeId = require(NodeIdKey, properties.nodeId)
new MetaProperties(clusterId, nodeId)
if (properties.version == 1) {
val nodeId = require(NodeIdKey, properties.nodeId)
new MetaProperties(clusterId, nodeId)
} else if (properties.version == 0) {
val brokerId = require(BrokerIdKey, properties.brokerId)
new MetaProperties(clusterId, brokerId)
} else {
throw new RuntimeException(s"Expected version 0 or 1, but got version ${properties.version}")
}
}
def require[T](key: String, value: Option[T]): T = {
@ -182,7 +188,8 @@ object BrokerMetadataCheckpoint extends Logging {
if (brokerMetadataMap.isEmpty) {
(new RawMetaProperties(), offlineDirs)
} else {
val numDistinctMetaProperties = brokerMetadataMap.values.toSet.size
val parsedProperties = brokerMetadataMap.values.map(props => MetaProperties.parse(new RawMetaProperties(props)))
val numDistinctMetaProperties = parsedProperties.toSet.size
if (numDistinctMetaProperties > 1) {
val builder = new StringBuilder

View File

@ -182,7 +182,9 @@ class BrokerServer(
lifecycleManager = new BrokerLifecycleManager(config,
time,
threadNamePrefix)
threadNamePrefix,
isZkBroker = false,
() => -1)
/* start scheduler */
kafkaScheduler = new KafkaScheduler(config.backgroundThreads)

View File

@ -17,11 +17,9 @@
package kafka.server
import java.util.OptionalLong
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.{CompletableFuture, TimeUnit}
import kafka.cluster.Broker.ServerInfo
import kafka.metrics.{KafkaMetricsGroup, LinuxIoMetricsCollector}
import kafka.migration.MigrationPropagator
import kafka.network.{DataPlaneAcceptor, SocketServer}
import kafka.raft.KafkaRaftManager
import kafka.security.CredentialProvider
@ -29,6 +27,7 @@ import kafka.server.KafkaConfig.{AlterConfigPolicyClassNameProp, CreateTopicPoli
import kafka.server.KafkaRaftServer.BrokerRole
import kafka.server.QuotaFactory.QuotaManagers
import kafka.utils.{CoreUtils, Logging}
import kafka.zk.{KafkaZkClient, ZkMigrationClient}
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.security.scram.internals.ScramMechanism
import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
@ -36,17 +35,40 @@ import org.apache.kafka.common.utils.LogContext
import org.apache.kafka.common.{ClusterResource, Endpoint}
import org.apache.kafka.controller.{Controller, QuorumController, QuorumFeatures}
import org.apache.kafka.metadata.KafkaConfigSchema
import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
import org.apache.kafka.metadata.migration.{KRaftMigrationDriver, LegacyPropagator}
import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.policy.{AlterConfigPolicy, CreateTopicPolicy}
import scala.jdk.CollectionConverters._
import java.util.OptionalLong
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.{CompletableFuture, TimeUnit}
import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._
case class ControllerMigrationSupport(
zkClient: KafkaZkClient,
migrationDriver: KRaftMigrationDriver,
brokersRpcClient: LegacyPropagator
) {
def shutdown(logging: Logging): Unit = {
if (zkClient != null) {
CoreUtils.swallow(zkClient.close(), logging)
}
if (brokersRpcClient != null) {
CoreUtils.swallow(brokersRpcClient.shutdown(), logging)
}
if (migrationDriver != null) {
CoreUtils.swallow(migrationDriver.close(), logging)
}
}
}
/**
* A Kafka controller that runs in KRaft (Kafka Raft) mode.
@ -81,6 +103,7 @@ class ControllerServer(
var quotaManagers: QuotaManagers = _
var controllerApis: ControllerApis = _
var controllerApisHandlerPool: KafkaRequestHandlerPool = _
var migrationSupport: Option[ControllerMigrationSupport] = None
private def maybeChangeStatus(from: ProcessStatus, to: ProcessStatus): Boolean = {
lock.lock()
@ -112,7 +135,6 @@ class ControllerServer(
maybeChangeStatus(STARTING, STARTED)
this.logIdent = new LogContext(s"[ControllerServer id=${config.nodeId}] ").logPrefix()
newGauge("ClusterId", () => clusterId)
newGauge("yammer-metrics-count", () => KafkaYammerMetrics.defaultRegistry.allMetrics.size)
@ -217,6 +239,26 @@ class ControllerServer(
doRemoteKraftSetup()
}
if (config.migrationEnabled) {
val zkClient = KafkaZkClient.createZkClient("KRaft Migration", time, config, KafkaServer.zkClientConfigFromKafkaConfig(config))
val migrationClient = new ZkMigrationClient(zkClient)
val propagator: LegacyPropagator = new MigrationPropagator(config.nodeId, config)
val migrationDriver = new KRaftMigrationDriver(
config.nodeId,
controller.asInstanceOf[QuorumController].zkRecordConsumer(),
migrationClient,
propagator,
publisher => sharedServer.loader.installPublishers(java.util.Collections.singletonList(publisher)),
sharedServer.faultHandlerFactory.build(
"zk migration",
fatal = false,
() => {}
)
)
migrationDriver.start()
migrationSupport = Some(ControllerMigrationSupport(zkClient, migrationDriver, propagator))
}
quotaManagers = QuotaFactory.instantiate(config,
metrics,
time,
@ -267,6 +309,7 @@ class ControllerServer(
sharedServer.ensureNotRaftLeader()
if (socketServer != null)
CoreUtils.swallow(socketServer.stopProcessingRequests(), this)
migrationSupport.foreach(_.shutdown(this))
if (controller != null)
controller.beginShutdown()
if (socketServer != null)

View File

@ -268,7 +268,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val leaderAndIsrRequest = request.body[LeaderAndIsrRequest]
authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
if (isBrokerEpochStale(zkSupport, leaderAndIsrRequest.brokerEpoch)) {
if (zkSupport.isBrokerEpochStale(leaderAndIsrRequest.brokerEpoch, leaderAndIsrRequest.isKRaftController)) {
// When the broker restarts very quickly, it is possible for this broker to receive request intended
// for its previous generation so the broker should skip the stale request.
info(s"Received LeaderAndIsr request with broker epoch ${leaderAndIsrRequest.brokerEpoch} " +
@ -289,7 +289,7 @@ class KafkaApis(val requestChannel: RequestChannel,
// stop serving data to clients for the topic being deleted
val stopReplicaRequest = request.body[StopReplicaRequest]
authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
if (isBrokerEpochStale(zkSupport, stopReplicaRequest.brokerEpoch)) {
if (zkSupport.isBrokerEpochStale(stopReplicaRequest.brokerEpoch, stopReplicaRequest.isKRaftController)) {
// When the broker restarts very quickly, it is possible for this broker to receive request intended
// for its previous generation so the broker should skip the stale request.
info(s"Received StopReplica request with broker epoch ${stopReplicaRequest.brokerEpoch} " +
@ -349,7 +349,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val updateMetadataRequest = request.body[UpdateMetadataRequest]
authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
if (isBrokerEpochStale(zkSupport, updateMetadataRequest.brokerEpoch)) {
if (zkSupport.isBrokerEpochStale(updateMetadataRequest.brokerEpoch, updateMetadataRequest.isKRaftController)) {
// When the broker restarts very quickly, it is possible for this broker to receive request intended
// for its previous generation so the broker should skip the stale request.
info(s"Received UpdateMetadata request with broker epoch ${updateMetadataRequest.brokerEpoch} " +
@ -3179,7 +3179,7 @@ class KafkaApis(val requestChannel: RequestChannel,
describeClientQuotasRequest.getErrorResponse(requestThrottleMs, Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
} else {
metadataSupport match {
case ZkSupport(adminManager, controller, zkClient, forwardingManager, metadataCache) =>
case ZkSupport(adminManager, controller, zkClient, forwardingManager, metadataCache, _) =>
val result = adminManager.describeClientQuotas(describeClientQuotasRequest.filter)
val entriesData = result.iterator.map { case (quotaEntity, quotaValues) =>
@ -3531,17 +3531,6 @@ class KafkaApis(val requestChannel: RequestChannel,
}
request.temporaryMemoryBytes = conversionStats.temporaryMemoryBytes
}
private def isBrokerEpochStale(zkSupport: ZkSupport, brokerEpochInRequest: Long): Boolean = {
// Broker epoch in LeaderAndIsr/UpdateMetadata/StopReplica request is unknown
// if the controller hasn't been upgraded to use KIP-380
if (brokerEpochInRequest == AbstractControlRequest.UNKNOWN_BROKER_EPOCH) false
else {
// brokerEpochInRequest > controller.brokerEpoch is possible in rare scenarios where the controller gets notified
// about the new broker epoch and sends a control request with this epoch before the broker learns about it
brokerEpochInRequest < zkSupport.controller.brokerEpoch
}
}
}
object KafkaApis {

View File

@ -2126,6 +2126,12 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
throw new ConfigException(s"Missing configuration `${KafkaConfig.NodeIdProp}` which is required " +
s"when `process.roles` is defined (i.e. when running in KRaft mode).")
}
if (migrationEnabled) {
if (zkConnect == null) {
throw new ConfigException(s"Missing required configuration `${KafkaConfig.ZkConnectProp}` which has no default value. " +
s"`${KafkaConfig.ZkConnectProp}` is required because `${KafkaConfig.MigrationEnabledProp} is set to true.")
}
}
}
require(logRollTimeMillis >= 1, "log.roll.ms must be greater than or equal to 1")
require(logRollTimeJitterMillis >= 0, "log.roll.jitter.ms must be greater than or equal to 0")
@ -2233,7 +2239,9 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
if (migrationEnabled) {
validateNonEmptyQuorumVotersForKRaft()
require(controllerListenerNames.nonEmpty,
s"${KafkaConfig.ControllerListenerNamesProp} must not be empty when running in ZK migration mode: ${controllerListenerNames.asJava}")
s"${KafkaConfig.ControllerListenerNamesProp} must not be empty when running in ZooKeeper migration mode: ${controllerListenerNames.asJava}")
require(interBrokerProtocolVersion.isMigrationSupported, s"Cannot enable ZooKeeper migration without setting " +
s"'${KafkaConfig.InterBrokerProtocolVersionProp}' to 3.4 or higher")
} else {
// controller listener names must be empty when not in KRaft mode
require(controllerListenerNames.isEmpty,

View File

@ -187,6 +187,10 @@ class KafkaServer(
var lifecycleManager: BrokerLifecycleManager = _
@volatile var brokerEpochManager: ZkBrokerEpochManager = _
def brokerEpochSupplier(): Long = Option(brokerEpochManager).map(_.get()).getOrElse(-1)
/**
* Start up API for bringing up a single instance of the Kafka server.
* Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers
@ -205,11 +209,6 @@ class KafkaServer(
if (canStartup) {
_brokerState = BrokerState.STARTING
lifecycleManager = new BrokerLifecycleManager(config,
time,
threadNamePrefix,
isZkBroker = true)
/* setup zookeeper */
initZkClient(time)
configRepository = new ZkConfigRepository(new AdminZkClient(zkClient))
@ -341,7 +340,7 @@ class KafkaServer(
time = time,
metrics = metrics,
threadNamePrefix = threadNamePrefix,
brokerEpochSupplier = () => kafkaController.brokerEpoch
brokerEpochSupplier = brokerEpochSupplier
)
} else {
AlterPartitionManager(kafkaScheduler, time, zkClient)
@ -359,7 +358,22 @@ class KafkaServer(
val zkMetaProperties = ZkMetaProperties(clusterId, config.brokerId)
checkpointBrokerMetadata(zkMetaProperties)
/* start token manager */
tokenManager = new DelegationTokenManager(config, tokenCache, time , zkClient)
tokenManager.startup()
/* start kafka controller */
_kafkaController = new KafkaController(config, zkClient, time, metrics, brokerInfo, brokerEpoch, tokenManager, brokerFeatures, metadataCache, threadNamePrefix)
kafkaController.startup()
if (config.migrationEnabled) {
logger.info("Starting up additional components for ZooKeeper migration")
lifecycleManager = new BrokerLifecycleManager(config,
time,
threadNamePrefix,
isZkBroker = true,
() => kafkaController.brokerEpoch)
// If the ZK broker is in migration mode, start up a RaftManager to learn about the new KRaft controller
val kraftMetaProps = MetaProperties(zkMetaProperties.clusterId, zkMetaProperties.brokerId)
val controllerQuorumVotersFuture = CompletableFuture.completedFuture(
@ -389,6 +403,7 @@ class KafkaServer(
val listener = new OffsetTrackingListener()
raftManager.register(listener)
raftManager.startup()
val networkListeners = new ListenerCollection()
config.effectiveAdvertisedListeners.foreach { ep =>
@ -412,17 +427,12 @@ class KafkaServer(
networkListeners,
ibpAsFeature
)
raftManager.startup()
logger.debug("Start RaftManager")
}
/* start token manager */
tokenManager = new DelegationTokenManager(config, tokenCache, time , zkClient)
tokenManager.startup()
/* start kafka controller */
_kafkaController = new KafkaController(config, zkClient, time, metrics, brokerInfo, brokerEpoch, tokenManager, brokerFeatures, metadataCache, threadNamePrefix)
kafkaController.startup()
// Used by ZK brokers during a KRaft migration. When talking to a KRaft controller, we need to use the epoch
// from BrokerLifecycleManager rather than ZK (via KafkaController)
brokerEpochManager = new ZkBrokerEpochManager(metadataCache, kafkaController, Option(lifecycleManager))
adminManager = new ZkAdminManager(config, metrics, metadataCache, zkClient)
@ -435,7 +445,7 @@ class KafkaServer(
val producerIdManager = if (config.interBrokerProtocolVersion.isAllocateProducerIdsSupported) {
ProducerIdManager.rpc(
config.brokerId,
brokerEpochSupplier = () => kafkaController.brokerEpoch,
brokerEpochSupplier = brokerEpochSupplier,
clientToControllerChannelManager,
config.requestTimeoutMs
)
@ -480,7 +490,7 @@ class KafkaServer(
KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))
/* start processing requests */
val zkSupport = ZkSupport(adminManager, kafkaController, zkClient, forwardingManager, metadataCache)
val zkSupport = ZkSupport(adminManager, kafkaController, zkClient, forwardingManager, metadataCache, brokerEpochManager)
def createKafkaApis(requestChannel: RequestChannel): KafkaApis = new KafkaApis(
requestChannel = requestChannel,
@ -631,7 +641,7 @@ class KafkaServer(
metadataCache.getControllerId.exists(_.isInstanceOf[KRaftCachedControllerId])) {
info("ZkBroker currently has a KRaft controller. Controlled shutdown will be handled " +
"through broker life cycle manager")
return false
return true
}
val metadataUpdater = new ManualMetadataUpdater()
val networkClient = {
@ -784,6 +794,15 @@ class KafkaServer(
// shutting down without waiting for the heartbeat to time out.
info("Notifying KRaft of controlled shutdown")
lifecycleManager.beginControlledShutdown()
try {
lifecycleManager.controlledShutdownFuture.get(5L, TimeUnit.MINUTES)
} catch {
case _: TimeoutException =>
error("Timed out waiting for the controller to approve controlled shutdown")
case e: Throwable =>
error("Got unexpected exception waiting for controlled shutdown future", e)
}
// TODO fix this ^
}
val shutdownSucceeded = doControlledShutdown(config.controlledShutdownMaxRetries.intValue)

View File

@ -75,8 +75,10 @@ case class ZkSupport(adminManager: ZkAdminManager,
controller: KafkaController,
zkClient: KafkaZkClient,
forwardingManager: Option[ForwardingManager],
metadataCache: ZkMetadataCache) extends MetadataSupport {
metadataCache: ZkMetadataCache,
brokerEpochManager: ZkBrokerEpochManager) extends MetadataSupport {
override def requireZkOrThrow(createException: => Exception): ZkSupport = this
override def requireRaftOrThrow(createException: => Exception): RaftSupport = throw createException
override def ensureConsistentWith(config: KafkaConfig): Unit = {
@ -86,9 +88,14 @@ case class ZkSupport(adminManager: ZkAdminManager,
}
override def canForward(): Boolean = forwardingManager.isDefined && (!controller.isActive)
def isBrokerEpochStale(brokerEpochInRequest: Long, isKRaftControllerRequest: Boolean): Boolean = {
brokerEpochManager.isBrokerEpochStale(brokerEpochInRequest, isKRaftControllerRequest)
}
}
case class RaftSupport(fwdMgr: ForwardingManager, metadataCache: KRaftMetadataCache)
case class RaftSupport(fwdMgr: ForwardingManager,
metadataCache: KRaftMetadataCache)
extends MetadataSupport {
override val forwardingManager: Option[ForwardingManager] = Some(fwdMgr)
override def requireZkOrThrow(createException: => Exception): ZkSupport = throw createException

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import kafka.controller.KafkaController
import org.apache.kafka.common.requests.AbstractControlRequest
class ZkBrokerEpochManager(metadataCache: MetadataCache,
controller: KafkaController,
lifecycleManagerOpt: Option[BrokerLifecycleManager]) {
def get(): Long = {
lifecycleManagerOpt match {
case Some(lifecycleManager) => metadataCache.getControllerId match {
case Some(_: ZkCachedControllerId) => controller.brokerEpoch
case Some(_: KRaftCachedControllerId) => lifecycleManager.brokerEpoch
case None => controller.brokerEpoch
}
case None => controller.brokerEpoch
}
}
def isBrokerEpochStale(brokerEpochInRequest: Long, isKRaftControllerRequest: Boolean): Boolean = {
if (brokerEpochInRequest == AbstractControlRequest.UNKNOWN_BROKER_EPOCH) {
false
} else if (isKRaftControllerRequest) {
if (lifecycleManagerOpt.isDefined) {
brokerEpochInRequest < lifecycleManagerOpt.get.brokerEpoch
} else {
throw new IllegalStateException("Expected BrokerLifecycleManager to be non-null.")
}
} else {
// brokerEpochInRequest > controller.brokerEpoch is possible in rare scenarios where the controller gets notified
// about the new broker epoch and sends a control request with this epoch before the broker learns about it
brokerEpochInRequest < controller.brokerEpoch
}
}
}

View File

@ -1952,7 +1952,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
def wrapMigrationRequest(request: Req, lastRequestInBatch: Boolean): MultiRequest = {
// Wrap a single request with the multi-op transactional request.
val checkOp = CheckOp(ControllerEpochZNode.path, migrationState.controllerZkVersion())
val checkOp = CheckOp(ControllerEpochZNode.path, migrationState.zkControllerEpochZkVersion())
val migrationOp = if (lastRequestInBatch) {
SetDataOp(MigrationZNode.path, MigrationZNode.encode(migrationState), migrationState.migrationZkVersion())
} else {
@ -2037,7 +2037,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
}
}
migrationState.controllerZkVersion() match {
migrationState.zkControllerEpochZkVersion() match {
case ZkVersion.MatchAnyVersion => throw new IllegalArgumentException(
s"Expected a controller epoch zkVersion when making migration writes, not -1.")
case version if version >= 0 =>

View File

@ -1068,7 +1068,8 @@ object MigrationZNode {
metadataEpoch,
modifyTimeMs,
zkVersion,
ZkVersion.UnknownVersion))
ZkMigrationLeadershipState.EMPTY.zkControllerEpoch(),
ZkMigrationLeadershipState.EMPTY.zkControllerEpochZkVersion()))
}.getOrElse(throw new KafkaException(s"Failed to parse the migration json $jsonDataAsString"))
}
}

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@ -28,6 +28,7 @@ import org.apache.kafka.common.metadata.ClientQuotaRecord.EntityData
import org.apache.kafka.common.metadata._
import org.apache.kafka.common.quota.ClientQuotaEntity
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.image.{MetadataDelta, MetadataImage}
import org.apache.kafka.metadata.{LeaderRecoveryState, PartitionRegistration}
import org.apache.kafka.metadata.migration.{MigrationClient, ZkMigrationLeadershipState}
import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion, ProducerIdsBlock}
@ -40,7 +41,10 @@ import java.util.function.Consumer
import scala.collection.Seq
import scala.jdk.CollectionConverters._
/**
* Migration client in KRaft controller responsible for handling communication to Zookeeper and
* the ZkBrokers present in the cluster.
*/
class ZkMigrationClient(zkClient: KafkaZkClient) extends MigrationClient with Logging {
override def getOrCreateMigrationRecoveryState(initialState: ZkMigrationLeadershipState): ZkMigrationLeadershipState = {
@ -54,19 +58,20 @@ class ZkMigrationClient(zkClient: KafkaZkClient) extends MigrationClient with Lo
override def claimControllerLeadership(state: ZkMigrationLeadershipState): ZkMigrationLeadershipState = {
zkClient.tryRegisterKRaftControllerAsActiveController(state.kraftControllerId(), state.kraftControllerEpoch()) match {
case SuccessfulRegistrationResult(_, controllerEpochZkVersion) => state.withControllerZkVersion(controllerEpochZkVersion)
case FailedRegistrationResult() => state.withControllerZkVersion(-1)
case SuccessfulRegistrationResult(controllerEpoch, controllerEpochZkVersion) =>
state.withZkController(controllerEpoch, controllerEpochZkVersion)
case FailedRegistrationResult() => state.withUnknownZkController()
}
}
override def releaseControllerLeadership(state: ZkMigrationLeadershipState): ZkMigrationLeadershipState = {
try {
zkClient.deleteController(state.controllerZkVersion())
state.withControllerZkVersion(-1)
zkClient.deleteController(state.zkControllerEpochZkVersion())
state.withUnknownZkController()
} catch {
case _: ControllerMovedException =>
// If the controller moved, no need to release
state.withControllerZkVersion(-1)
state.withUnknownZkController()
case t: Throwable =>
throw new RuntimeException("Could not release controller leadership due to underlying error", t)
}
@ -267,8 +272,18 @@ class ZkMigrationClient(zkClient: KafkaZkClient) extends MigrationClient with Lo
}
val requests = Seq(createTopicZNode, createPartitionsZNode) ++ createPartitionZNodeReqs
val (migrationZkVersion, _) = zkClient.retryMigrationRequestsUntilConnected(requests, state)
state.withMigrationZkVersion(migrationZkVersion)
val (migrationZkVersion, responses) = zkClient.retryMigrationRequestsUntilConnected(requests, state)
val resultCodes = responses.map { response => response.path -> response.resultCode }.toMap
if (resultCodes(TopicZNode.path(topicName)).equals(Code.NODEEXISTS)) {
// topic already created, just return
state
} else if (resultCodes.forall { case (_, code) => code.equals(Code.OK) } ) {
// ok
state.withMigrationZkVersion(migrationZkVersion)
} else {
// not ok
throw new RuntimeException(s"Failed to create or update topic $topicName. ZK operation had results $resultCodes")
}
}
private def createTopicPartition(topicPartition: TopicPartition): CreateRequest = {
@ -314,8 +329,13 @@ class ZkMigrationClient(zkClient: KafkaZkClient) extends MigrationClient with Lo
if (requests.isEmpty) {
state
} else {
val (migrationZkVersion, _) = zkClient.retryMigrationRequestsUntilConnected(requests.toSeq, state)
state.withMigrationZkVersion(migrationZkVersion)
val (migrationZkVersion, responses) = zkClient.retryMigrationRequestsUntilConnected(requests.toSeq, state)
val resultCodes = responses.map { response => response.path -> response.resultCode }.toMap
if (resultCodes.forall { case (_, code) => code.equals(Code.OK) } ) {
state.withMigrationZkVersion(migrationZkVersion)
} else {
throw new RuntimeException(s"Failed to update partition states: $topicPartitions. ZK transaction had results $resultCodes")
}
}
}
@ -435,4 +455,10 @@ class ZkMigrationClient(zkClient: KafkaZkClient) extends MigrationClient with Lo
state
}
}
override def writeMetadataDeltaToZookeeper(delta: MetadataDelta,
image: MetadataImage,
state: ZkMigrationLeadershipState): ZkMigrationLeadershipState = {
state
}
}

View File

@ -25,6 +25,7 @@ import kafka.test.ClusterConfig;
import kafka.test.ClusterInstance;
import kafka.testkit.KafkaClusterTestKit;
import kafka.testkit.TestKitNodes;
import kafka.zk.EmbeddedZookeeper;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.network.ListenerName;
@ -66,11 +67,13 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
private final ClusterConfig clusterConfig;
private final AtomicReference<KafkaClusterTestKit> clusterReference;
private final AtomicReference<EmbeddedZookeeper> zkReference;
private final boolean isCoResident;
public RaftClusterInvocationContext(ClusterConfig clusterConfig, boolean isCoResident) {
this.clusterConfig = clusterConfig;
this.clusterReference = new AtomicReference<>();
this.zkReference = new AtomicReference<>();
this.isCoResident = isCoResident;
}
@ -84,7 +87,7 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
@Override
public List<Extension> getAdditionalExtensions() {
RaftClusterInstance clusterInstance = new RaftClusterInstance(clusterReference, clusterConfig);
RaftClusterInstance clusterInstance = new RaftClusterInstance(clusterReference, zkReference, clusterConfig);
return Arrays.asList(
(BeforeTestExecutionCallback) context -> {
TestKitNodes nodes = new TestKitNodes.Builder().
@ -98,6 +101,11 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
});
KafkaClusterTestKit.Builder builder = new KafkaClusterTestKit.Builder(nodes);
if (Boolean.parseBoolean(clusterConfig.serverProperties().getProperty("zookeeper.metadata.migration.enable", "false"))) {
zkReference.set(new EmbeddedZookeeper());
builder.setConfigProp("zookeeper.connect", String.format("localhost:%d", zkReference.get().port()));
}
// Copy properties into the TestKit builder
clusterConfig.serverProperties().forEach((key, value) -> builder.setConfigProp(key.toString(), value.toString()));
// KAFKA-12512 need to pass security protocol and listener name here
@ -120,13 +128,15 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
public static class RaftClusterInstance implements ClusterInstance {
private final AtomicReference<KafkaClusterTestKit> clusterReference;
private final AtomicReference<EmbeddedZookeeper> zkReference;
private final ClusterConfig clusterConfig;
final AtomicBoolean started = new AtomicBoolean(false);
final AtomicBoolean stopped = new AtomicBoolean(false);
private final ConcurrentLinkedQueue<Admin> admins = new ConcurrentLinkedQueue<>();
RaftClusterInstance(AtomicReference<KafkaClusterTestKit> clusterReference, ClusterConfig clusterConfig) {
RaftClusterInstance(AtomicReference<KafkaClusterTestKit> clusterReference, AtomicReference<EmbeddedZookeeper> zkReference, ClusterConfig clusterConfig) {
this.clusterReference = clusterReference;
this.zkReference = zkReference;
this.clusterConfig = clusterConfig;
}
@ -247,6 +257,9 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
if (stopped.compareAndSet(false, true)) {
admins.forEach(admin -> Utils.closeQuietly(admin, "admin"));
Utils.closeQuietly(clusterReference.get(), "cluster");
if (zkReference.get() != null) {
Utils.closeQuietly(zkReference.get(), "zk");
}
}
}

View File

@ -339,6 +339,5 @@ public class ZkClusterInvocationContext implements TestTemplateInvocationContext
private Stream<KafkaServer> servers() {
return JavaConverters.asJavaCollection(clusterReference.get().servers()).stream();
}
}
}

View File

@ -20,6 +20,7 @@ package kafka.server
import kafka.test.ClusterInstance
import kafka.test.annotation.{ClusterTest, Type}
import kafka.test.junit.ClusterTestExtensions
import kafka.test.junit.ZkClusterInvocationContext.ZkClusterInstance
import kafka.testkit.{KafkaClusterTestKit, TestKitNodes}
import org.apache.kafka.common.Uuid
import org.apache.kafka.raft.RaftConfig
@ -55,6 +56,7 @@ class KafkaServerKRaftRegistrationTest {
setNumBrokerNodes(0).
setNumControllerNodes(1).build())
.setConfigProp(KafkaConfig.MigrationEnabledProp, "true")
.setConfigProp(KafkaConfig.ZkConnectProp, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
.build()
try {
kraftCluster.format()
@ -79,6 +81,7 @@ class KafkaServerKRaftRegistrationTest {
case t: Throwable => fail("Had some other error waiting for brokers", t)
}
} finally {
zkCluster.stop()
kraftCluster.close()
}
}

View File

@ -59,7 +59,7 @@ class BrokerLifecycleManagerTest {
def listenerName: ListenerName = new ListenerName("PLAINTEXT")
def securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT;
def securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT
def saslMechanism: String = SaslConfigs.DEFAULT_SASL_MECHANISM
@ -98,14 +98,14 @@ class BrokerLifecycleManagerTest {
@Test
def testCreateAndClose(): Unit = {
val context = new BrokerLifecycleManagerTestContext(configProperties)
val manager = new BrokerLifecycleManager(context.config, context.time, None)
val manager = new BrokerLifecycleManager(context.config, context.time, None, isZkBroker = false, () => -1)
manager.close()
}
@Test
def testCreateStartAndClose(): Unit = {
val context = new BrokerLifecycleManagerTestContext(configProperties)
val manager = new BrokerLifecycleManager(context.config, context.time, None)
val manager = new BrokerLifecycleManager(context.config, context.time, None, isZkBroker = false, () => -1)
assertEquals(BrokerState.NOT_RUNNING, manager.state)
manager.start(() => context.highestMetadataOffset.get(),
context.mockChannelManager, context.clusterId, context.advertisedListeners,
@ -120,7 +120,7 @@ class BrokerLifecycleManagerTest {
@Test
def testSuccessfulRegistration(): Unit = {
val context = new BrokerLifecycleManagerTestContext(configProperties)
val manager = new BrokerLifecycleManager(context.config, context.time, None)
val manager = new BrokerLifecycleManager(context.config, context.time, None, isZkBroker = false, () => -1)
val controllerNode = new Node(3000, "localhost", 8021)
context.controllerNodeProvider.node.set(controllerNode)
context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(
@ -140,7 +140,7 @@ class BrokerLifecycleManagerTest {
def testRegistrationTimeout(): Unit = {
val context = new BrokerLifecycleManagerTestContext(configProperties)
val controllerNode = new Node(3000, "localhost", 8021)
val manager = new BrokerLifecycleManager(context.config, context.time, None)
val manager = new BrokerLifecycleManager(context.config, context.time, None, isZkBroker = false, () => -1)
context.controllerNodeProvider.node.set(controllerNode)
def newDuplicateRegistrationResponse(): Unit = {
context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(
@ -181,7 +181,7 @@ class BrokerLifecycleManagerTest {
@Test
def testControlledShutdown(): Unit = {
val context = new BrokerLifecycleManagerTestContext(configProperties)
val manager = new BrokerLifecycleManager(context.config, context.time, None)
val manager = new BrokerLifecycleManager(context.config, context.time, None, isZkBroker = false, () => -1)
val controllerNode = new Node(3000, "localhost", 8021)
context.controllerNodeProvider.node.set(controllerNode)
context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(

View File

@ -92,7 +92,7 @@ class BrokerRegistrationRequestTest {
channelManager: BrokerToControllerChannelManager,
clusterId: String,
brokerId: Int,
zk: Boolean,
zkEpoch: Option[Long],
ibpToSend: Option[(MetadataVersion, MetadataVersion)]
): Errors = {
val features = new BrokerRegistrationRequestData.FeatureCollection()
@ -110,7 +110,7 @@ class BrokerRegistrationRequestTest {
.setBrokerId(brokerId)
.setClusterId(clusterId)
.setIncarnationId(Uuid.randomUuid())
.setIsMigratingZkBroker(zk)
.setMigratingZkBrokerEpoch(zkEpoch.getOrElse(-1L))
.setFeatures(features)
Errors.forCode(sendAndRecieve(channelManager, req).errorCode())
@ -126,19 +126,19 @@ class BrokerRegistrationRequestTest {
assertEquals(
Errors.BROKER_ID_NOT_REGISTERED,
registerBroker(channelManager, clusterId, 100, true, Some((MetadataVersion.IBP_3_3_IV0, MetadataVersion.IBP_3_3_IV0))))
registerBroker(channelManager, clusterId, 100, Some(1), Some((MetadataVersion.IBP_3_3_IV0, MetadataVersion.IBP_3_3_IV0))))
assertEquals(
Errors.BROKER_ID_NOT_REGISTERED,
registerBroker(channelManager, clusterId, 100, true, None))
registerBroker(channelManager, clusterId, 100, Some(1), None))
assertEquals(
Errors.BROKER_ID_NOT_REGISTERED,
registerBroker(channelManager, clusterId, 100, true, Some((MetadataVersion.IBP_3_4_IV0, MetadataVersion.IBP_3_4_IV0))))
registerBroker(channelManager, clusterId, 100, Some(1), Some((MetadataVersion.IBP_3_4_IV0, MetadataVersion.IBP_3_4_IV0))))
assertEquals(
Errors.NONE,
registerBroker(channelManager, clusterId, 100, false, Some((MetadataVersion.IBP_3_4_IV0, MetadataVersion.IBP_3_4_IV0))))
registerBroker(channelManager, clusterId, 100, None, Some((MetadataVersion.IBP_3_4_IV0, MetadataVersion.IBP_3_4_IV0))))
} finally {
channelManager.shutdown()
}
@ -154,19 +154,19 @@ class BrokerRegistrationRequestTest {
assertEquals(
Errors.BROKER_ID_NOT_REGISTERED,
registerBroker(channelManager, clusterId, 100, true, Some((MetadataVersion.IBP_3_3_IV0, MetadataVersion.IBP_3_3_IV0))))
registerBroker(channelManager, clusterId, 100, Some(1), Some((MetadataVersion.IBP_3_3_IV0, MetadataVersion.IBP_3_3_IV0))))
assertEquals(
Errors.BROKER_ID_NOT_REGISTERED,
registerBroker(channelManager, clusterId, 100, true, None))
registerBroker(channelManager, clusterId, 100, Some(1), None))
assertEquals(
Errors.BROKER_ID_NOT_REGISTERED,
registerBroker(channelManager, clusterId, 100, true, Some((MetadataVersion.IBP_3_4_IV0, MetadataVersion.IBP_3_4_IV0))))
registerBroker(channelManager, clusterId, 100, Some(1), Some((MetadataVersion.IBP_3_4_IV0, MetadataVersion.IBP_3_4_IV0))))
assertEquals(
Errors.NONE,
registerBroker(channelManager, clusterId, 100, false, Some((MetadataVersion.IBP_3_3_IV3, MetadataVersion.IBP_3_4_IV0))))
registerBroker(channelManager, clusterId, 100, None, Some((MetadataVersion.IBP_3_3_IV3, MetadataVersion.IBP_3_4_IV0))))
} finally {
channelManager.shutdown()
}
@ -182,19 +182,19 @@ class BrokerRegistrationRequestTest {
assertEquals(
Errors.NONE,
registerBroker(channelManager, clusterId, 100, true, Some((MetadataVersion.IBP_3_4_IV0, MetadataVersion.IBP_3_4_IV0))))
registerBroker(channelManager, clusterId, 100, Some(1), Some((MetadataVersion.IBP_3_4_IV0, MetadataVersion.IBP_3_4_IV0))))
assertEquals(
Errors.UNSUPPORTED_VERSION,
registerBroker(channelManager, clusterId, 100, true, None))
registerBroker(channelManager, clusterId, 100, Some(1), None))
assertEquals(
Errors.UNSUPPORTED_VERSION,
registerBroker(channelManager, clusterId, 100, true, Some((MetadataVersion.IBP_3_3_IV3, MetadataVersion.IBP_3_3_IV3))))
registerBroker(channelManager, clusterId, 100, Some(1), Some((MetadataVersion.IBP_3_3_IV3, MetadataVersion.IBP_3_3_IV3))))
assertEquals(
Errors.NONE,
registerBroker(channelManager, clusterId, 100, false, Some((MetadataVersion.IBP_3_4_IV0, MetadataVersion.IBP_3_4_IV0))))
registerBroker(channelManager, clusterId, 100, None, Some((MetadataVersion.IBP_3_4_IV0, MetadataVersion.IBP_3_4_IV0))))
} finally {
channelManager.shutdown()
}

View File

@ -116,6 +116,7 @@ class KafkaApisTest {
private val brokerId = 1
// KRaft tests should override this with a KRaftMetadataCache
private var metadataCache: MetadataCache = MetadataCache.zkMetadataCache(brokerId, MetadataVersion.latest())
private val brokerEpochManager: ZkBrokerEpochManager = new ZkBrokerEpochManager(metadataCache, controller, None)
private val clientQuotaManager: ClientQuotaManager = mock(classOf[ClientQuotaManager])
private val clientRequestQuotaManager: ClientRequestQuotaManager = mock(classOf[ClientRequestQuotaManager])
private val clientControllerQuotaManager: ControllerMutationQuotaManager = mock(classOf[ControllerMutationQuotaManager])
@ -171,7 +172,7 @@ class KafkaApisTest {
} else {
metadataCache match {
case zkMetadataCache: ZkMetadataCache =>
ZkSupport(adminManager, controller, zkClient, forwardingManagerOpt, zkMetadataCache)
ZkSupport(adminManager, controller, zkClient, forwardingManagerOpt, zkMetadataCache, brokerEpochManager)
case _ => throw new IllegalStateException("Test must set an instance of ZkMetadataCache")
}
}
@ -185,8 +186,8 @@ class KafkaApisTest {
val apiVersionManager = new SimpleApiVersionManager(listenerType, enabledApis, BrokerFeatures.defaultSupportedFeatures())
new KafkaApis(
metadataSupport = metadataSupport,
requestChannel = requestChannel,
metadataSupport = metadataSupport,
replicaManager = replicaManager,
groupCoordinator = groupCoordinator,
newGroupCoordinator = newGroupCoordinator,

View File

@ -20,10 +20,12 @@ package kafka.zk
import org.apache.zookeeper.server.ZooKeeperServer
import org.apache.zookeeper.server.NIOServerCnxnFactory
import kafka.utils.{CoreUtils, Logging, TestUtils}
import java.net.InetSocketAddress
import java.net.InetSocketAddress
import org.apache.kafka.common.utils.Utils
import java.io.Closeable
/**
* ZooKeeperServer wrapper that starts the server with temporary directories during construction and deletes
* the directories when `shutdown()` is called.
@ -34,7 +36,7 @@ import org.apache.kafka.common.utils.Utils
// This should be named EmbeddedZooKeeper for consistency with other classes, but since this is widely used by other
// projects (even though it's internal), we keep the name as it is until we have a publicly supported test library for
// others to use.
class EmbeddedZookeeper() extends Logging {
class EmbeddedZookeeper() extends Closeable with Logging {
val snapshotDir = TestUtils.tempDir()
val logDir = TestUtils.tempDir()
@ -65,5 +67,6 @@ class EmbeddedZookeeper() extends Logging {
Utils.delete(logDir)
Utils.delete(snapshotDir)
}
override def close(): Unit = shutdown()
}

View File

@ -1441,8 +1441,8 @@ class KafkaZkClientTest extends QuorumTestHarness {
@Test
def testFailToUpdateMigrationZNode(): Unit = {
val (_, stat) = zkClient.getControllerEpoch.get
var migrationState = new ZkMigrationLeadershipState(3000, 42, 100, 42, Time.SYSTEM.milliseconds(), -1, stat.getVersion)
val (controllerEpoch, stat) = zkClient.getControllerEpoch.get
var migrationState = new ZkMigrationLeadershipState(3000, 42, 100, 42, Time.SYSTEM.milliseconds(), -1, controllerEpoch, stat.getVersion)
migrationState = zkClient.getOrCreateMigrationState(migrationState)
assertEquals(0, migrationState.migrationZkVersion())
@ -1454,7 +1454,7 @@ class KafkaZkClientTest extends QuorumTestHarness {
CreateRequest("/foo", Array(), zkClient.defaultAcls("/foo"), CreateMode.PERSISTENT),
)
migrationState = migrationState.withControllerZkVersion(stat.getVersion)
migrationState = migrationState.withZkController(controllerEpoch, stat.getVersion)
zkClient.retryMigrationRequestsUntilConnected(requests_bad, migrationState) match {
case (zkVersion: Int, requests: Seq[AsyncRequest#Response]) =>
assertEquals(0, zkVersion)
@ -1476,7 +1476,7 @@ class KafkaZkClientTest extends QuorumTestHarness {
CreateRequest("/foo/bar/eggs", Array(), zkClient.defaultAcls("/foo"), CreateMode.PERSISTENT),
)
migrationState = migrationState.withControllerZkVersion(stat.getVersion)
migrationState = migrationState.withZkController(controllerEpoch, stat.getVersion)
zkClient.retryMigrationRequestsUntilConnected(requests_good, migrationState) match {
case (zkVersion: Int, requests: Seq[AsyncRequest#Response]) =>
assertEquals(1, zkVersion)

View File

@ -60,8 +60,8 @@ class ZkMigrationClientTest extends QuorumTestHarness {
}
private def initialMigrationState: ZkMigrationLeadershipState = {
val (_, stat) = zkClient.getControllerEpoch.get
new ZkMigrationLeadershipState(3000, InitialControllerEpoch, 100, InitialKRaftEpoch, Time.SYSTEM.milliseconds(), -1, stat.getVersion)
val (epoch, stat) = zkClient.getControllerEpoch.get
new ZkMigrationLeadershipState(3000, InitialControllerEpoch, 100, InitialKRaftEpoch, Time.SYSTEM.milliseconds(), -1, epoch, stat.getVersion)
}
@Test
@ -143,6 +143,22 @@ class ZkMigrationClientTest extends QuorumTestHarness {
assertEquals(List(1, 2, 3), partition1.isr)
}
@Test
def testIdempotentCreateTopics(): Unit = {
assertEquals(0, migrationState.migrationZkVersion())
val partitions = Map(
0 -> new PartitionRegistration(Array(0, 1, 2), Array(0, 1, 2), Array(), Array(), 0, LeaderRecoveryState.RECOVERED, 0, -1),
1 -> new PartitionRegistration(Array(1, 2, 3), Array(1, 2, 3), Array(), Array(), 1, LeaderRecoveryState.RECOVERED, 0, -1)
).map { case (k, v) => Integer.valueOf(k) -> v }.asJava
val topicId = Uuid.randomUuid()
migrationState = migrationClient.createTopic("test", topicId, partitions, migrationState)
assertEquals(1, migrationState.migrationZkVersion())
migrationState = migrationClient.createTopic("test", topicId, partitions, migrationState)
assertEquals(1, migrationState.migrationZkVersion())
}
// Write Client Quotas using ZkMigrationClient and read them back using AdminZkClient
private def writeClientQuotaAndVerify(migrationClient: ZkMigrationClient,
adminZkClient: AdminZkClient,
@ -217,20 +233,22 @@ class ZkMigrationClientTest extends QuorumTestHarness {
def testClaimAbsentController(): Unit = {
assertEquals(0, migrationState.migrationZkVersion())
migrationState = migrationClient.claimControllerLeadership(migrationState)
assertEquals(1, migrationState.controllerZkVersion())
assertEquals(1, migrationState.zkControllerEpochZkVersion())
}
@Test
def testExistingKRaftControllerClaim(): Unit = {
assertEquals(0, migrationState.migrationZkVersion())
migrationState = migrationClient.claimControllerLeadership(migrationState)
assertEquals(1, migrationState.controllerZkVersion())
assertEquals(1, migrationState.zkControllerEpochZkVersion())
// We don't require a KRaft controller to release the controller in ZK before another KRaft controller
// can claim it. This is because KRaft leadership comes from Raft and we are just synchronizing it to ZK.
var otherNodeState = new ZkMigrationLeadershipState(3001, 43, 100, 42, Time.SYSTEM.milliseconds(), -1, -1)
var otherNodeState = ZkMigrationLeadershipState.EMPTY
.withNewKRaftController(3001, 43)
.withKRaftMetadataOffsetAndEpoch(100, 42);
otherNodeState = migrationClient.claimControllerLeadership(otherNodeState)
assertEquals(2, otherNodeState.controllerZkVersion())
assertEquals(2, otherNodeState.zkControllerEpochZkVersion())
assertEquals(3001, otherNodeState.kraftControllerId())
assertEquals(43, otherNodeState.kraftControllerEpoch())
}
@ -241,7 +259,7 @@ class ZkMigrationClientTest extends QuorumTestHarness {
migrationState = migrationState.withNewKRaftController(3001, InitialControllerEpoch)
migrationState = migrationClient.claimControllerLeadership(migrationState)
assertEquals(1, migrationState.controllerZkVersion())
assertEquals(1, migrationState.zkControllerEpochZkVersion())
migrationState = migrationState.withNewKRaftController(3001, InitialControllerEpoch - 1)
val t1 = assertThrows(classOf[ControllerMovedException], () => migrationClient.claimControllerLeadership(migrationState))
@ -266,7 +284,7 @@ class ZkMigrationClientTest extends QuorumTestHarness {
assertEquals(zkVersion, 1)
migrationState = migrationClient.claimControllerLeadership(migrationState)
assertEquals(2, migrationState.controllerZkVersion())
assertEquals(2, migrationState.zkControllerEpochZkVersion())
zkClient.getControllerEpoch match {
case Some((zkEpoch, stat)) =>
assertEquals(3, zkEpoch)

View File

@ -23,6 +23,7 @@ import kafka.coordinator.transaction.TransactionCoordinator;
import kafka.network.RequestChannel;
import kafka.network.RequestConvertToJson;
import kafka.server.AutoTopicCreationManager;
import kafka.server.ZkBrokerEpochManager;
import kafka.server.BrokerFeatures;
import kafka.server.BrokerTopicStats;
import kafka.server.ClientQuotaManager;
@ -180,7 +181,8 @@ public class MetadataRequestBenchmark {
KafkaConfig config = new KafkaConfig(kafkaProps);
return new KafkaApisBuilder().
setRequestChannel(requestChannel).
setMetadataSupport(new ZkSupport(adminManager, kafkaController, kafkaZkClient, Option.empty(), metadataCache)).
setMetadataSupport(new ZkSupport(adminManager, kafkaController, kafkaZkClient,
Option.empty(), metadataCache, new ZkBrokerEpochManager(metadataCache, kafkaController, Option.empty()))).
setReplicaManager(replicaManager).
setGroupCoordinator(groupCoordinator).
setTxnCoordinator(transactionCoordinator).

View File

@ -336,13 +336,13 @@ public class ClusterControlManager {
}
}
if (request.isMigratingZkBroker() && !zkRegistrationAllowed()) {
if (request.migratingZkBrokerEpoch() != -1 && !zkRegistrationAllowed()) {
throw new BrokerIdNotRegisteredException("Controller does not support registering ZK brokers.");
}
RegisterBrokerRecord record = new RegisterBrokerRecord().
setBrokerId(brokerId).
setIsMigratingZkBroker(request.isMigratingZkBroker()).
setMigratingZkBrokerEpoch(request.migratingZkBrokerEpoch()).
setIncarnationId(request.incarnationId()).
setBrokerEpoch(brokerEpoch).
setRack(request.rack());
@ -420,12 +420,13 @@ public class ClusterControlManager {
feature.minSupportedVersion(), feature.maxSupportedVersion()));
}
// Update broker registrations.
BrokerRegistration prevRegistration = brokerRegistrations.put(brokerId,
new BrokerRegistration(brokerId, record.brokerEpoch(),
record.incarnationId(), listeners, features,
Optional.ofNullable(record.rack()), record.fenced(),
record.inControlledShutdown(), record.isMigratingZkBroker()));
record.inControlledShutdown(), BrokerRegistration.zkBrokerEpoch(record.migratingZkBrokerEpoch())));
if (heartbeatManager != null) {
if (prevRegistration != null) heartbeatManager.remove(brokerId);
heartbeatManager.register(brokerId, record.fenced());

View File

@ -145,6 +145,7 @@ final class ControllerMetricsManager {
case ACCESS_CONTROL_ENTRY_RECORD:
case REMOVE_ACCESS_CONTROL_ENTRY_RECORD:
case NO_OP_RECORD:
case ZK_MIGRATION_STATE_RECORD:
// These record types do not affect metrics
break;
default:

View File

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.controller;
import org.apache.kafka.common.metadata.ZkMigrationStateRecord;
import org.apache.kafka.metadata.migration.ZkMigrationState;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineObject;
public class MigrationControlManager {
private final TimelineObject<ZkMigrationState> zkMigrationState;
MigrationControlManager(SnapshotRegistry snapshotRegistry) {
zkMigrationState = new TimelineObject<>(snapshotRegistry, ZkMigrationState.NONE);
}
ZkMigrationState zkMigrationState() {
return zkMigrationState.get();
}
void replay(ZkMigrationStateRecord record) {
zkMigrationState.set(ZkMigrationState.of(record.zkMigrationState()));
}
}

View File

@ -50,8 +50,8 @@ import org.apache.kafka.common.message.UpdateFeaturesRequestData;
import org.apache.kafka.common.message.UpdateFeaturesResponseData;
import org.apache.kafka.common.metadata.AccessControlEntryRecord;
import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.ClientQuotaRecord;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.common.metadata.FenceBrokerRecord;
import org.apache.kafka.common.metadata.MetadataRecordType;
@ -65,6 +65,7 @@ import org.apache.kafka.common.metadata.RemoveTopicRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
import org.apache.kafka.common.metadata.ZkMigrationStateRecord;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaEntity;
@ -78,6 +79,8 @@ import org.apache.kafka.metadata.FinalizedControllerFeatures;
import org.apache.kafka.metadata.KafkaConfigSchema;
import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer;
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
import org.apache.kafka.metadata.migration.ZkMigrationState;
import org.apache.kafka.metadata.migration.ZkRecordConsumer;
import org.apache.kafka.metadata.placement.ReplicaPlacer;
import org.apache.kafka.metadata.placement.StripedReplicaPlacer;
import org.apache.kafka.queue.EventQueue.EarliestDeadlineFunction;
@ -566,6 +569,10 @@ public final class QuorumController implements Controller {
return configurationControl;
}
public ZkRecordConsumer zkRecordConsumer() {
return zkRecordConsumer;
}
<T> CompletableFuture<T> appendReadEvent(
String name,
OptionalLong deadlineNs,
@ -819,6 +826,62 @@ public final class QuorumController implements Controller {
return event.future();
}
class MigrationRecordConsumer implements ZkRecordConsumer {
private volatile OffsetAndEpoch highestMigrationRecordOffset;
class MigrationWriteOperation implements ControllerWriteOperation<Void> {
private final List<ApiMessageAndVersion> batch;
MigrationWriteOperation(List<ApiMessageAndVersion> batch) {
this.batch = batch;
}
@Override
public ControllerResult<Void> generateRecordsAndResult() {
return ControllerResult.atomicOf(batch, null);
}
public void processBatchEndOffset(long offset) {
highestMigrationRecordOffset = new OffsetAndEpoch(offset, curClaimEpoch);
}
}
@Override
public void beginMigration() {
// TODO use KIP-868 transaction
ControllerWriteEvent<Void> event = new ControllerWriteEvent<>("Begin ZK Migration",
new MigrationWriteOperation(
Collections.singletonList(
new ApiMessageAndVersion(
new ZkMigrationStateRecord().setZkMigrationState(ZkMigrationState.PRE_MIGRATION.value()),
ZkMigrationStateRecord.LOWEST_SUPPORTED_VERSION)
)));
queue.append(event);
}
@Override
public CompletableFuture<?> acceptBatch(List<ApiMessageAndVersion> recordBatch) {
if (queue.size() > 100) { // TODO configure this
CompletableFuture<Void> future = new CompletableFuture<>();
future.completeExceptionally(new NotControllerException("Cannot accept migration record batch. Controller queue is too large"));
return future;
}
ControllerWriteEvent<Void> batchEvent = new ControllerWriteEvent<>("ZK Migration Batch",
new MigrationWriteOperation(recordBatch));
queue.append(batchEvent);
return batchEvent.future;
}
@Override
public OffsetAndEpoch completeMigration() {
// TODO write migration record, use KIP-868 transaction
return highestMigrationRecordOffset;
}
@Override
public void abortMigration() {
// TODO use KIP-868 transaction
}
}
class QuorumMetaLogListener implements RaftClient.Listener<ApiMessageAndVersion> {
@Override
public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
@ -1349,6 +1412,9 @@ public final class QuorumController implements Controller {
case NO_OP_RECORD:
// NoOpRecord is an empty record and doesn't need to be replayed
break;
case ZK_MIGRATION_STATE_RECORD:
// TODO handle this
break;
default:
throw new RuntimeException("Unhandled record type " + type);
}
@ -1571,6 +1637,8 @@ public final class QuorumController implements Controller {
*/
private final BootstrapMetadata bootstrapMetadata;
private final ZkRecordConsumer zkRecordConsumer;
/**
* The maximum number of records per batch to allow.
*/
@ -1672,6 +1740,7 @@ public final class QuorumController implements Controller {
this.metaLogListener = new QuorumMetaLogListener();
this.curClaimEpoch = -1;
this.needToCompleteAuthorizerLoad = authorizer.isPresent();
this.zkRecordConsumer = new MigrationRecordConsumer();
updateWriteOffset(-1);
resetToEmptyState();
@ -2029,11 +2098,6 @@ public final class QuorumController implements Controller {
return curClaimEpoch;
}
// Visible for testing
MetadataVersion metadataVersion() {
return featureControl.metadataVersion();
}
@Override
public void close() throws InterruptedException {
queue.close();

View File

@ -64,6 +64,7 @@ public final class ClusterDelta {
.stream()
.filter(Optional::isPresent)
.map(Optional::get)
.filter(registration -> registration.isMigratingZkBroker() && !registration.fenced())
.map(BrokerRegistration::id)
.collect(Collectors.toSet());
}

View File

@ -34,9 +34,15 @@ public final class ClusterImage {
public static final ClusterImage EMPTY = new ClusterImage(Collections.emptyMap());
private final Map<Integer, BrokerRegistration> brokers;
private final Map<Integer, BrokerRegistration> zkBrokers;
public ClusterImage(Map<Integer, BrokerRegistration> brokers) {
this.brokers = Collections.unmodifiableMap(brokers);
this.zkBrokers = Collections.unmodifiableMap(brokers
.entrySet()
.stream()
.filter(entry -> entry.getValue().isMigratingZkBroker())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
}
public boolean isEmpty() {
@ -52,7 +58,7 @@ public final class ClusterImage {
brokers
.entrySet()
.stream()
.filter(x -> x.getValue().isMigratingZkBroker())
.filter(x -> x.getValue().isMigratingZkBroker() && !x.getValue().fenced())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
}

View File

@ -206,6 +206,9 @@ public final class MetadataDelta {
* updating the highest offset and epoch.
*/
break;
case ZK_MIGRATION_STATE_RECORD:
// TODO handle this
break;
default:
throw new RuntimeException("Unknown metadata record type " + type);
}

View File

@ -18,6 +18,7 @@
package org.apache.kafka.image.loader;
import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.raft.LeaderAndEpoch;
import java.util.Objects;
@ -31,6 +32,11 @@ public class LogDeltaManifest {
*/
private final MetadataProvenance provenance;
/**
* The current leader and epoch at the end of this delta.
*/
private final LeaderAndEpoch leaderAndEpoch;
/**
* The number of batches that were loaded.
*/
@ -48,11 +54,13 @@ public class LogDeltaManifest {
public LogDeltaManifest(
MetadataProvenance provenance,
LeaderAndEpoch leaderAndEpoch,
int numBatches,
long elapsedNs,
long numBytes
) {
this.provenance = provenance;
this.leaderAndEpoch = leaderAndEpoch;
this.numBatches = numBatches;
this.elapsedNs = elapsedNs;
this.numBytes = numBytes;
@ -63,6 +71,10 @@ public class LogDeltaManifest {
return provenance;
}
public LeaderAndEpoch leaderAndEpoch() {
return leaderAndEpoch;
}
public int numBatches() {
return numBatches;
}
@ -79,6 +91,7 @@ public class LogDeltaManifest {
public int hashCode() {
return Objects.hash(
provenance,
leaderAndEpoch,
numBatches,
elapsedNs,
numBytes);
@ -89,6 +102,7 @@ public class LogDeltaManifest {
if (o == null || !o.getClass().equals(this.getClass())) return false;
LogDeltaManifest other = (LogDeltaManifest) o;
return provenance.equals(other.provenance) &&
leaderAndEpoch == other.leaderAndEpoch &&
numBatches == other.numBatches &&
elapsedNs == other.elapsedNs &&
numBytes == other.numBytes;
@ -98,6 +112,7 @@ public class LogDeltaManifest {
public String toString() {
return "LogDeltaManifest(" +
"provenance=" + provenance +
", leaderAndEpoch=" + leaderAndEpoch +
", numBatches=" + numBatches +
", elapsedNs=" + elapsedNs +
", numBytes=" + numBytes +

View File

@ -359,6 +359,7 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
long elapsedNs = time.nanoseconds() - startNs;
metrics.updateBatchProcessingTime(elapsedNs);
return new LogDeltaManifest(provenance,
currentLeaderAndEpoch,
numBatches,
elapsedNs,
numBytes);

View File

@ -49,6 +49,14 @@ public class BrokerRegistration {
return listenersMap;
}
public static Optional<Long> zkBrokerEpoch(long value) {
if (value == -1) {
return Optional.empty();
} else {
return Optional.of(value);
}
}
private final int id;
private final long epoch;
private final Uuid incarnationId;
@ -57,7 +65,7 @@ public class BrokerRegistration {
private final Optional<String> rack;
private final boolean fenced;
private final boolean inControlledShutdown;
private final boolean isMigratingZkBroker;
private final Optional<Long> migratingZkBrokerEpoch;
// Visible for testing
public BrokerRegistration(int id,
@ -69,7 +77,7 @@ public class BrokerRegistration {
boolean fenced,
boolean inControlledShutdown) {
this(id, epoch, incarnationId, listenersToMap(listeners), supportedFeatures, rack,
fenced, inControlledShutdown, false);
fenced, inControlledShutdown, Optional.empty());
}
public BrokerRegistration(int id,
@ -80,9 +88,9 @@ public class BrokerRegistration {
Optional<String> rack,
boolean fenced,
boolean inControlledShutdown,
boolean isMigratingZkBroker) {
Optional<Long> migratingZkBrokerEpoch) {
this(id, epoch, incarnationId, listenersToMap(listeners), supportedFeatures, rack,
fenced, inControlledShutdown, isMigratingZkBroker);
fenced, inControlledShutdown, migratingZkBrokerEpoch);
}
// Visible for testing
@ -94,7 +102,7 @@ public class BrokerRegistration {
Optional<String> rack,
boolean fenced,
boolean inControlledShutdown) {
this(id, epoch, incarnationId, listeners, supportedFeatures, rack, fenced, inControlledShutdown, false);
this(id, epoch, incarnationId, listeners, supportedFeatures, rack, fenced, inControlledShutdown, Optional.empty());
}
public BrokerRegistration(int id,
@ -105,7 +113,7 @@ public class BrokerRegistration {
Optional<String> rack,
boolean fenced,
boolean inControlledShutdown,
boolean isMigratingZkBroker) {
Optional<Long> migratingZkBrokerEpoch) {
this.id = id;
this.epoch = epoch;
this.incarnationId = incarnationId;
@ -123,7 +131,7 @@ public class BrokerRegistration {
this.rack = rack;
this.fenced = fenced;
this.inControlledShutdown = inControlledShutdown;
this.isMigratingZkBroker = isMigratingZkBroker;
this.migratingZkBrokerEpoch = migratingZkBrokerEpoch;
}
public static BrokerRegistration fromRecord(RegisterBrokerRecord record) {
@ -147,7 +155,7 @@ public class BrokerRegistration {
Optional.ofNullable(record.rack()),
record.fenced(),
record.inControlledShutdown(),
record.isMigratingZkBroker());
zkBrokerEpoch(record.migratingZkBrokerEpoch()));
}
public int id() {
@ -191,7 +199,11 @@ public class BrokerRegistration {
}
public boolean isMigratingZkBroker() {
return isMigratingZkBroker;
return migratingZkBrokerEpoch.isPresent();
}
public Optional<Long> migratingZkBrokerEpoch() {
return migratingZkBrokerEpoch;
}
public ApiMessageAndVersion toRecord(ImageWriterOptions options) {
@ -210,9 +222,9 @@ public class BrokerRegistration {
}
}
if (isMigratingZkBroker) {
if (migratingZkBrokerEpoch.isPresent()) {
if (options.metadataVersion().isMigrationSupported()) {
registrationRecord.setIsMigratingZkBroker(isMigratingZkBroker);
registrationRecord.setMigratingZkBrokerEpoch(migratingZkBrokerEpoch.get());
} else {
options.handleLoss("the isMigratingZkBroker state of one or more brokers");
}
@ -241,7 +253,7 @@ public class BrokerRegistration {
@Override
public int hashCode() {
return Objects.hash(id, epoch, incarnationId, listeners, supportedFeatures,
rack, fenced, inControlledShutdown, isMigratingZkBroker);
rack, fenced, inControlledShutdown, migratingZkBrokerEpoch);
}
@Override
@ -256,7 +268,7 @@ public class BrokerRegistration {
other.rack.equals(rack) &&
other.fenced == fenced &&
other.inControlledShutdown == inControlledShutdown &&
other.isMigratingZkBroker == isMigratingZkBroker;
other.migratingZkBrokerEpoch.equals(migratingZkBrokerEpoch);
}
@Override
@ -277,7 +289,7 @@ public class BrokerRegistration {
bld.append(", rack=").append(rack);
bld.append(", fenced=").append(fenced);
bld.append(", inControlledShutdown=").append(inControlledShutdown);
bld.append(", isMigratingZkBroker=").append(isMigratingZkBroker);
bld.append(", migratingZkBrokerEpoch=").append(migratingZkBrokerEpoch.orElse(-1L));
bld.append(")");
return bld.toString();
}
@ -301,7 +313,7 @@ public class BrokerRegistration {
rack,
newFenced,
newInControlledShutdownChange,
isMigratingZkBroker
migratingZkBrokerEpoch
);
}
}

View File

@ -0,0 +1,514 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.metadata.migration;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.image.loader.LogDeltaManifest;
import org.apache.kafka.image.loader.SnapshotManifest;
import org.apache.kafka.image.publisher.MetadataPublisher;
import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.queue.KafkaEventQueue;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.server.fault.FaultHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
/**
* This class orchestrates and manages the state related to a ZK to KRaft migration. An event thread is used to
* serialize events coming from various threads and listeners.
*/
public class KRaftMigrationDriver implements MetadataPublisher {
private final Time time;
private final Logger log;
private final int nodeId;
private final MigrationClient zkMigrationClient;
private final LegacyPropagator propagator;
private final ZkRecordConsumer zkRecordConsumer;
private final KafkaEventQueue eventQueue;
private final FaultHandler faultHandler;
/**
* A callback for when the migration state has been recovered from ZK. This is used to delay the installation of this
* MetadataPublisher with MetadataLoader.
*/
private final Consumer<KRaftMigrationDriver> initialZkLoadHandler;
private volatile LeaderAndEpoch leaderAndEpoch;
private volatile MigrationState migrationState;
private volatile ZkMigrationLeadershipState migrationLeadershipState;
private volatile MetadataImage image;
public KRaftMigrationDriver(
int nodeId,
ZkRecordConsumer zkRecordConsumer,
MigrationClient zkMigrationClient,
LegacyPropagator propagator,
Consumer<KRaftMigrationDriver> initialZkLoadHandler,
FaultHandler faultHandler
) {
this.nodeId = nodeId;
this.zkRecordConsumer = zkRecordConsumer;
this.zkMigrationClient = zkMigrationClient;
this.propagator = propagator;
this.time = Time.SYSTEM;
this.log = LoggerFactory.getLogger(KRaftMigrationDriver.class);
this.migrationState = MigrationState.UNINITIALIZED;
this.migrationLeadershipState = ZkMigrationLeadershipState.EMPTY;
this.eventQueue = new KafkaEventQueue(Time.SYSTEM, new LogContext("KRaftMigrationDriver"), "kraft-migration");
this.image = MetadataImage.EMPTY;
this.leaderAndEpoch = LeaderAndEpoch.UNKNOWN;
this.initialZkLoadHandler = initialZkLoadHandler;
this.faultHandler = faultHandler;
}
public void start() {
eventQueue.prepend(new PollEvent());
}
public void shutdown() throws InterruptedException {
eventQueue.beginShutdown("KRaftMigrationDriver#shutdown");
log.debug("Shutting down KRaftMigrationDriver");
eventQueue.close();
}
private void initializeMigrationState() {
log.info("Recovering migration state");
apply("Recovery", zkMigrationClient::getOrCreateMigrationRecoveryState);
String maybeDone = migrationLeadershipState.zkMigrationComplete() ? "done" : "not done";
log.info("Recovered migration state {}. ZK migration is {}.", migrationLeadershipState, maybeDone);
initialZkLoadHandler.accept(this);
// Let's transition to INACTIVE state and wait for leadership events.
transitionTo(MigrationState.INACTIVE);
}
private boolean isControllerQuorumReadyForMigration() {
// TODO implement this
return true;
}
private boolean areZkBrokersReadyForMigration() {
if (image == MetadataImage.EMPTY) {
// TODO maybe add WAIT_FOR_INITIAL_METADATA_PUBLISH state to avoid this kind of check?
log.info("Waiting for initial metadata publish before checking if Zk brokers are registered.");
return false;
}
Set<Integer> kraftRegisteredZkBrokers = image.cluster().zkBrokers().keySet();
Set<Integer> zkRegisteredZkBrokers = zkMigrationClient.readBrokerIdsFromTopicAssignments();
zkRegisteredZkBrokers.removeAll(kraftRegisteredZkBrokers);
if (zkRegisteredZkBrokers.isEmpty()) {
return true;
} else {
log.info("Still waiting for ZK brokers {} to register with KRaft.", zkRegisteredZkBrokers);
return false;
}
}
private void apply(String name, Function<ZkMigrationLeadershipState, ZkMigrationLeadershipState> stateMutator) {
ZkMigrationLeadershipState beforeState = this.migrationLeadershipState;
ZkMigrationLeadershipState afterState = stateMutator.apply(beforeState);
log.trace("{} transitioned from {} to {}", name, beforeState, afterState);
this.migrationLeadershipState = afterState;
}
private boolean isValidStateChange(MigrationState newState) {
if (migrationState == newState)
return true;
switch (migrationState) {
case UNINITIALIZED:
case DUAL_WRITE:
return newState == MigrationState.INACTIVE;
case INACTIVE:
return newState == MigrationState.WAIT_FOR_CONTROLLER_QUORUM;
case WAIT_FOR_CONTROLLER_QUORUM:
return
newState == MigrationState.INACTIVE ||
newState == MigrationState.WAIT_FOR_BROKERS;
case WAIT_FOR_BROKERS:
return
newState == MigrationState.INACTIVE ||
newState == MigrationState.BECOME_CONTROLLER;
case BECOME_CONTROLLER:
return
newState == MigrationState.INACTIVE ||
newState == MigrationState.ZK_MIGRATION ||
newState == MigrationState.KRAFT_CONTROLLER_TO_BROKER_COMM;
case ZK_MIGRATION:
return
newState == MigrationState.INACTIVE ||
newState == MigrationState.KRAFT_CONTROLLER_TO_BROKER_COMM;
case KRAFT_CONTROLLER_TO_BROKER_COMM:
return
newState == MigrationState.INACTIVE ||
newState == MigrationState.DUAL_WRITE;
default:
log.error("Migration driver trying to transition from an unknown state {}", migrationState);
return false;
}
}
private void transitionTo(MigrationState newState) {
if (!isValidStateChange(newState)) {
log.error("Error transition in migration driver from {} to {}", migrationState, newState);
return;
}
if (newState != migrationState) {
log.debug("{} transitioning from {} to {} state", nodeId, migrationState, newState);
} else {
log.trace("{} transitioning from {} to {} state", nodeId, migrationState, newState);
}
switch (newState) {
case UNINITIALIZED:
// No state can transition to UNITIALIZED.
throw new IllegalStateException("Illegal transition from " + migrationState + " to " + newState + " " +
"state in Zk to KRaft migration");
case INACTIVE:
// Any state can go to INACTIVE.
break;
}
migrationState = newState;
}
@Override
public String name() {
return "KRaftMigrationDriver";
}
@Override
public void publishSnapshot(MetadataDelta delta, MetadataImage newImage, SnapshotManifest manifest) {
eventQueue.append(new MetadataChangeEvent(delta, newImage, manifest.provenance(), true));
}
@Override
public void publishLogDelta(MetadataDelta delta, MetadataImage newImage, LogDeltaManifest manifest) {
if (!leaderAndEpoch.equals(manifest.leaderAndEpoch())) {
eventQueue.append(new KRaftLeaderEvent(manifest.leaderAndEpoch()));
}
eventQueue.append(new MetadataChangeEvent(delta, newImage, manifest.provenance(), false));
}
@Override
public void close() throws Exception {
eventQueue.close();
}
// Events handled by Migration Driver.
abstract class MigrationEvent implements EventQueue.Event {
@Override
public void handleException(Throwable e) {
KRaftMigrationDriver.this.faultHandler.handleFault("Error during ZK migration", e);
}
}
class PollEvent extends MigrationEvent {
@Override
public void run() throws Exception {
switch (migrationState) {
case UNINITIALIZED:
initializeMigrationState();
break;
case INACTIVE:
// Nothing to do when the driver is inactive. We need to wait on the
// controller node's state to move forward.
break;
case WAIT_FOR_CONTROLLER_QUORUM:
eventQueue.append(new WaitForControllerQuorumEvent());
break;
case BECOME_CONTROLLER:
eventQueue.append(new BecomeZkControllerEvent());
break;
case WAIT_FOR_BROKERS:
eventQueue.append(new WaitForZkBrokersEvent());
break;
case ZK_MIGRATION:
eventQueue.append(new MigrateMetadataEvent());
break;
case KRAFT_CONTROLLER_TO_BROKER_COMM:
eventQueue.append(new SendRPCsToBrokersEvent());
break;
case DUAL_WRITE:
// Nothing to do in the PollEvent. If there's metadata change, we use
// MetadataChange event to drive the writes to Zookeeper.
break;
}
// Poll again after some time
long deadline = time.nanoseconds() + NANOSECONDS.convert(1, SECONDS);
eventQueue.scheduleDeferred(
"poll",
new EventQueue.DeadlineFunction(deadline),
new PollEvent());
}
@Override
public void handleException(Throwable e) {
log.error("Had an exception in " + this.getClass().getSimpleName(), e);
}
}
class KRaftLeaderEvent extends MigrationEvent {
private final LeaderAndEpoch leaderAndEpoch;
KRaftLeaderEvent(LeaderAndEpoch leaderAndEpoch) {
this.leaderAndEpoch = leaderAndEpoch;
}
@Override
public void run() throws Exception {
// We can either be the active controller or just resigned from being the controller.
KRaftMigrationDriver.this.leaderAndEpoch = leaderAndEpoch;
boolean isActive = leaderAndEpoch.isLeader(KRaftMigrationDriver.this.nodeId);
switch (migrationState) {
case UNINITIALIZED:
// Poll and retry after initialization
long deadline = time.nanoseconds() + NANOSECONDS.convert(10, SECONDS);
eventQueue.scheduleDeferred(
"poll",
new EventQueue.DeadlineFunction(deadline),
this);
break;
default:
if (!isActive) {
apply("KRaftLeaderEvent is not active", state -> ZkMigrationLeadershipState.EMPTY);
transitionTo(MigrationState.INACTIVE);
} else {
// Apply the new KRaft state
apply("KRaftLeaderEvent is active", state -> state.withNewKRaftController(nodeId, leaderAndEpoch.epoch()));
// Before becoming the controller fo ZkBrokers, we need to make sure the
// Controller Quorum can handle migration.
transitionTo(MigrationState.WAIT_FOR_CONTROLLER_QUORUM);
}
break;
}
}
@Override
public void handleException(Throwable e) {
log.error("Had an exception in " + this.getClass().getSimpleName(), e);
}
}
class WaitForControllerQuorumEvent extends MigrationEvent {
@Override
public void run() throws Exception {
switch (migrationState) {
case WAIT_FOR_CONTROLLER_QUORUM:
if (isControllerQuorumReadyForMigration()) {
log.debug("Controller Quorum is ready for Zk to KRaft migration");
// Note that leadership would not change here. Hence we do not need to
// `apply` any leadership state change.
transitionTo(MigrationState.WAIT_FOR_BROKERS);
}
break;
default:
// Ignore the event as we're not trying to become controller anymore.
break;
}
}
@Override
public void handleException(Throwable e) {
log.error("Had an exception in " + this.getClass().getSimpleName(), e);
}
}
class BecomeZkControllerEvent extends MigrationEvent {
@Override
public void run() throws Exception {
switch (migrationState) {
case BECOME_CONTROLLER:
// TODO: Handle unhappy path.
apply("BecomeZkLeaderEvent", zkMigrationClient::claimControllerLeadership);
if (migrationLeadershipState.zkControllerEpochZkVersion() == -1) {
// We could not claim leadership, stay in BECOME_CONTROLLER to retry
} else {
if (!migrationLeadershipState.zkMigrationComplete()) {
transitionTo(MigrationState.ZK_MIGRATION);
} else {
transitionTo(MigrationState.KRAFT_CONTROLLER_TO_BROKER_COMM);
}
}
break;
default:
// Ignore the event as we're not trying to become controller anymore.
break;
}
}
@Override
public void handleException(Throwable e) {
log.error("Had an exception in " + this.getClass().getSimpleName(), e);
}
}
class WaitForZkBrokersEvent extends MigrationEvent {
@Override
public void run() throws Exception {
switch (migrationState) {
case WAIT_FOR_BROKERS:
if (areZkBrokersReadyForMigration()) {
log.debug("Zk brokers are registered and ready for migration");
transitionTo(MigrationState.BECOME_CONTROLLER);
}
break;
default:
// Ignore the event as we're not in the appropriate state anymore.
break;
}
}
@Override
public void handleException(Throwable e) {
log.error("Had an exception in " + this.getClass().getSimpleName(), e);
}
}
class MigrateMetadataEvent extends MigrationEvent {
@Override
public void run() throws Exception {
Set<Integer> brokersInMetadata = new HashSet<>();
log.info("Starting ZK migration");
zkRecordConsumer.beginMigration();
try {
AtomicInteger count = new AtomicInteger(0);
zkMigrationClient.readAllMetadata(batch -> {
try {
log.info("Migrating {} records from ZK: {}", batch.size(), batch);
CompletableFuture<?> future = zkRecordConsumer.acceptBatch(batch);
count.addAndGet(batch.size());
future.get();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e.getCause());
}
}, brokersInMetadata::add);
OffsetAndEpoch offsetAndEpochAfterMigration = zkRecordConsumer.completeMigration();
log.info("Completed migration of metadata from Zookeeper to KRaft. A total of {} metadata records were " +
"generated. The current metadata offset is now {} with an epoch of {}. Saw {} brokers in the " +
"migrated metadata {}.",
count.get(),
offsetAndEpochAfterMigration.offset(),
offsetAndEpochAfterMigration.epoch(),
brokersInMetadata.size(),
brokersInMetadata);
ZkMigrationLeadershipState newState = migrationLeadershipState.withKRaftMetadataOffsetAndEpoch(
offsetAndEpochAfterMigration.offset(),
offsetAndEpochAfterMigration.epoch());
apply("Migrate metadata from Zk", state -> zkMigrationClient.setMigrationRecoveryState(newState));
transitionTo(MigrationState.KRAFT_CONTROLLER_TO_BROKER_COMM);
} catch (Throwable t) {
zkRecordConsumer.abortMigration();
// TODO ???
}
}
@Override
public void handleException(Throwable e) {
log.error("Had an exception in " + this.getClass().getSimpleName(), e);
}
}
class SendRPCsToBrokersEvent extends MigrationEvent {
@Override
public void run() throws Exception {
// Ignore sending RPCs to the brokers since we're no longer in the state.
if (migrationState == MigrationState.KRAFT_CONTROLLER_TO_BROKER_COMM) {
if (image.highestOffsetAndEpoch().compareTo(migrationLeadershipState.offsetAndEpoch()) >= 0) {
propagator.sendRPCsToBrokersFromMetadataImage(image, migrationLeadershipState.zkControllerEpoch());
// Migration leadership state doesn't change since we're not doing any Zk writes.
transitionTo(MigrationState.DUAL_WRITE);
}
}
}
}
class MetadataChangeEvent extends MigrationEvent {
private final MetadataDelta delta;
private final MetadataImage image;
private final MetadataProvenance provenance;
private final boolean isSnapshot;
MetadataChangeEvent(MetadataDelta delta, MetadataImage image, MetadataProvenance provenance, boolean isSnapshot) {
this.delta = delta;
this.image = image;
this.provenance = provenance;
this.isSnapshot = isSnapshot;
}
@Override
public void run() throws Exception {
KRaftMigrationDriver.this.image = image;
if (migrationState != MigrationState.DUAL_WRITE) {
log.trace("Received metadata change, but the controller is not in dual-write " +
"mode. Ignoring the change to be replicated to Zookeeper");
return;
}
if (delta.featuresDelta() != null) {
propagator.setMetadataVersion(image.features().metadataVersion());
}
if (image.highestOffsetAndEpoch().compareTo(migrationLeadershipState.offsetAndEpoch()) >= 0) {
if (delta.topicsDelta() != null) {
delta.topicsDelta().changedTopics().forEach((topicId, topicDelta) -> {
if (delta.topicsDelta().createdTopicIds().contains(topicId)) {
apply("Create topic " + topicDelta.name(), migrationState ->
zkMigrationClient.createTopic(
topicDelta.name(),
topicId,
topicDelta.partitionChanges(),
migrationState));
} else {
apply("Updating topic " + topicDelta.name(), migrationState ->
zkMigrationClient.updateTopicPartitions(
Collections.singletonMap(topicDelta.name(), topicDelta.partitionChanges()),
migrationState));
}
});
}
apply("Write MetadataDelta to Zk", state -> zkMigrationClient.writeMetadataDeltaToZookeeper(delta, image, state));
// TODO: Unhappy path: Probably relinquish leadership and let new controller
// retry the write?
propagator.sendRPCsToBrokersFromMetadataDelta(delta, image,
migrationLeadershipState.zkControllerEpoch());
} else {
String metadataType = isSnapshot ? "snapshot" : "delta";
log.info("Ignoring {} {} which contains metadata that has already been written to ZK.", metadataType, provenance);
}
}
}
}

View File

@ -18,6 +18,7 @@ package org.apache.kafka.metadata.migration;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.common.MetadataVersion;
public interface LegacyPropagator {
@ -34,4 +35,6 @@ public interface LegacyPropagator {
void sendRPCsToBrokersFromMetadataImage(MetadataImage image, int zkControllerEpoch);
void clear();
void setMetadataVersion(MetadataVersion metadataVersion);
}

View File

@ -17,6 +17,8 @@
package org.apache.kafka.metadata.migration;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.server.common.ApiMessageAndVersion;
@ -88,4 +90,15 @@ public interface MigrationClient {
Set<Integer> readBrokerIds();
Set<Integer> readBrokerIdsFromTopicAssignments();
/**
* Convert the Metadata delta to Zookeeper writes and persist the changes. On successful
* write, update the migration state with new metadata offset and epoch.
* @param delta Changes in the cluster metadata
* @param image New metadata after the changes in `delta` are applied
* @param state Current migration state before writing to Zookeeper.
*/
ZkMigrationLeadershipState writeMetadataDeltaToZookeeper(MetadataDelta delta,
MetadataImage image,
ZkMigrationLeadershipState state);
}

View File

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.metadata.migration;
/**
* UNINITIALIZEDINACTIVEDUAL_WRITE
*
*
*
*
*
*
* WAIT_FOR_CONTROLLER_QUORUMZK_MIGRATIONKRAFT_CONTROLLER_TO_BROKER_COMM
*
*
*
*
*
*
* BECOME_CONTROLLERWAIT_FOR_BROKERS
*/
public enum MigrationState {
UNINITIALIZED(false), // Initial state.
INACTIVE(false), // State when not the active controller.
WAIT_FOR_CONTROLLER_QUORUM(false), // Ensure all the quorum nodes are ready for migration.
WAIT_FOR_BROKERS(false), // Wait for Zk brokers to be ready for migration.
BECOME_CONTROLLER(false), // Become controller for the Zk Brokers.
ZK_MIGRATION(true), // The cluster has satisfied the migration criteria
KRAFT_CONTROLLER_TO_BROKER_COMM(true), // First communication from Controller to send full RPCs to the Zk brokers.
DUAL_WRITE(true); // The data has been migrated
private final boolean isActiveController;
MigrationState(boolean isActiveController) {
this.isActiveController = isActiveController;
}
boolean isActiveController() {
return isActiveController;
}
}

View File

@ -16,6 +16,8 @@
*/
package org.apache.kafka.metadata.migration;
import org.apache.kafka.raft.OffsetAndEpoch;
import java.util.Objects;
/**
@ -25,7 +27,9 @@ import java.util.Objects;
*/
public class ZkMigrationLeadershipState {
public static final ZkMigrationLeadershipState EMPTY = new ZkMigrationLeadershipState(-1, -1, -1, -1, -1, -1, -1);
// Use -2 as sentinel for "unknown version" for ZK versions to avoid sending an actual -1 "any version"
// when doing ZK writes
public static final ZkMigrationLeadershipState EMPTY = new ZkMigrationLeadershipState(-1, -1, -1, -1, -1, -2, -1, -2);
private final int kraftControllerId;
@ -39,36 +43,59 @@ public class ZkMigrationLeadershipState {
private final int migrationZkVersion;
private final int controllerZkVersion;
private final int zkControllerEpoch;
private final int zkControllerEpochZkVersion;
public ZkMigrationLeadershipState(int kraftControllerId, int kraftControllerEpoch,
long kraftMetadataOffset, int kraftMetadataEpoch,
long lastUpdatedTimeMs, int migrationZkVersion, int controllerZkVersion) {
long lastUpdatedTimeMs, int migrationZkVersion,
int zkControllerEpoch, int zkControllerEpochZkVersion) {
this.kraftControllerId = kraftControllerId;
this.kraftControllerEpoch = kraftControllerEpoch;
this.kraftMetadataOffset = kraftMetadataOffset;
this.kraftMetadataEpoch = kraftMetadataEpoch;
this.lastUpdatedTimeMs = lastUpdatedTimeMs;
this.migrationZkVersion = migrationZkVersion;
this.controllerZkVersion = controllerZkVersion;
this.zkControllerEpoch = zkControllerEpoch;
this.zkControllerEpochZkVersion = zkControllerEpochZkVersion;
}
public ZkMigrationLeadershipState withMigrationZkVersion(int zkVersion) {
return new ZkMigrationLeadershipState(
this.kraftControllerId, this.kraftControllerEpoch, this.kraftMetadataOffset,
this.kraftMetadataEpoch, this.lastUpdatedTimeMs, zkVersion, this.controllerZkVersion);
this.kraftMetadataEpoch, this.lastUpdatedTimeMs, zkVersion, this.zkControllerEpoch, this.zkControllerEpochZkVersion);
}
public ZkMigrationLeadershipState withControllerZkVersion(int zkVersion) {
public ZkMigrationLeadershipState withZkController(int zkControllerEpoch, int zkControllerEpochZkVersion) {
return new ZkMigrationLeadershipState(
this.kraftControllerId, this.kraftControllerEpoch, this.kraftMetadataOffset,
this.kraftMetadataEpoch, this.lastUpdatedTimeMs, this.migrationZkVersion, zkVersion);
this.kraftMetadataEpoch, this.lastUpdatedTimeMs, this.migrationZkVersion, zkControllerEpoch, zkControllerEpochZkVersion);
}
public ZkMigrationLeadershipState withUnknownZkController() {
return withZkController(EMPTY.zkControllerEpoch, EMPTY.zkControllerEpochZkVersion);
}
public ZkMigrationLeadershipState withNewKRaftController(int controllerId, int controllerEpoch) {
return new ZkMigrationLeadershipState(
controllerId, controllerEpoch, this.kraftMetadataOffset,
this.kraftMetadataEpoch, this.lastUpdatedTimeMs, this.migrationZkVersion, this.controllerZkVersion);
this.kraftMetadataEpoch, this.lastUpdatedTimeMs, this.migrationZkVersion, this.zkControllerEpoch, this.zkControllerEpochZkVersion);
}
public ZkMigrationLeadershipState withKRaftMetadataOffsetAndEpoch(long metadataOffset,
int metadataEpoch) {
return new ZkMigrationLeadershipState(
this.kraftControllerId,
this.kraftControllerEpoch,
metadataOffset,
metadataEpoch,
this.lastUpdatedTimeMs,
this.migrationZkVersion,
this.zkControllerEpoch,
this.zkControllerEpochZkVersion);
}
public int kraftControllerId() {
@ -95,14 +122,22 @@ public class ZkMigrationLeadershipState {
return migrationZkVersion;
}
public int controllerZkVersion() {
return controllerZkVersion;
public int zkControllerEpoch() {
return zkControllerEpoch;
}
public int zkControllerEpochZkVersion() {
return zkControllerEpochZkVersion;
}
public boolean zkMigrationComplete() {
return kraftMetadataOffset > 0;
}
public OffsetAndEpoch offsetAndEpoch() {
return new OffsetAndEpoch(kraftMetadataOffset, kraftMetadataEpoch);
}
@Override
public String toString() {
return "ZkMigrationLeadershipState{" +
@ -112,7 +147,8 @@ public class ZkMigrationLeadershipState {
", kraftMetadataEpoch=" + kraftMetadataEpoch +
", lastUpdatedTimeMs=" + lastUpdatedTimeMs +
", migrationZkVersion=" + migrationZkVersion +
", controllerZkVersion=" + controllerZkVersion +
", controllerZkEpoch=" + zkControllerEpoch +
", controllerZkVersion=" + zkControllerEpochZkVersion +
'}';
}
@ -127,7 +163,8 @@ public class ZkMigrationLeadershipState {
&& kraftMetadataEpoch == that.kraftMetadataEpoch
&& lastUpdatedTimeMs == that.lastUpdatedTimeMs
&& migrationZkVersion == that.migrationZkVersion
&& controllerZkVersion == that.controllerZkVersion;
&& zkControllerEpoch == that.zkControllerEpoch
&& zkControllerEpochZkVersion == that.zkControllerEpochZkVersion;
}
@Override
@ -139,6 +176,7 @@ public class ZkMigrationLeadershipState {
kraftMetadataEpoch,
lastUpdatedTimeMs,
migrationZkVersion,
controllerZkVersion);
zkControllerEpoch,
zkControllerEpochZkVersion);
}
}

View File

@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.metadata.migration;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import java.util.List;
import java.util.concurrent.CompletableFuture;
public interface ZkRecordConsumer {
void beginMigration();
CompletableFuture<?> acceptBatch(List<ApiMessageAndVersion> recordBatch);
OffsetAndEpoch completeMigration();
void abortMigration();
}

View File

@ -22,8 +22,8 @@
"fields": [
{ "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
"about": "The broker id." },
{ "name": "IsMigratingZkBroker", "type": "bool", "versions": "2+", "default": false,
"about": "True if the registering broker is a ZK broker." },
{ "name": "MigratingZkBrokerEpoch", "type": "int64", "versions": "2+", "default": "-1",
"about": "The ZK broker epoch if this record is for a ZK broker. Otherwise, -1" },
{ "name": "IncarnationId", "type": "uuid", "versions": "0+",
"about": "The incarnation ID of the broker process" },
{ "name": "BrokerEpoch", "type": "int64", "versions": "0+",

View File

@ -28,6 +28,7 @@ import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.image.publisher.MetadataPublisher;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.BatchReader;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
@ -412,7 +413,7 @@ public class MetadataLoaderTest {
assertEquals(400L, loader.lastAppliedOffset());
}
assertTrue(publishers.get(0).closed);
assertEquals(new LogDeltaManifest(new MetadataProvenance(400, 100, 4000), 1,
assertEquals(new LogDeltaManifest(new MetadataProvenance(400, 100, 4000), LeaderAndEpoch.UNKNOWN, 1,
3000000L, 10),
publishers.get(0).latestLogDeltaManifest);
assertEquals(MetadataVersion.IBP_3_3_IV1,

View File

@ -23,6 +23,7 @@ import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.image.loader.LogDeltaManifest;
import org.apache.kafka.metadata.RecordTestUtils;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.server.fault.FaultHandlerException;
import org.apache.kafka.server.fault.MockFaultHandler;
import org.apache.kafka.test.TestUtils;
@ -99,14 +100,14 @@ public class SnapshotGeneratorTest {
build()) {
// Publish a log delta batch. This one will not trigger a snapshot yet.
generator.publishLogDelta(TEST_DELTA, TEST_IMAGE,
new LogDeltaManifest(MetadataProvenance.EMPTY, 1, 100, 100));
new LogDeltaManifest(MetadataProvenance.EMPTY, LeaderAndEpoch.UNKNOWN, 1, 100, 100));
// Publish a log delta batch. This will trigger a snapshot.
generator.publishLogDelta(TEST_DELTA, TEST_IMAGE,
new LogDeltaManifest(MetadataProvenance.EMPTY, 1, 100, 100));
new LogDeltaManifest(MetadataProvenance.EMPTY, LeaderAndEpoch.UNKNOWN, 1, 100, 100));
// Publish a log delta batch. This one will be ignored because there are other images
// queued for writing.
generator.publishLogDelta(TEST_DELTA, TEST_IMAGE,
new LogDeltaManifest(MetadataProvenance.EMPTY, 1, 100, 2000));
new LogDeltaManifest(MetadataProvenance.EMPTY, LeaderAndEpoch.UNKNOWN, 1, 100, 2000));
assertEquals(Collections.emptyList(), emitter.images());
emitter.setReady();
}
@ -128,7 +129,7 @@ public class SnapshotGeneratorTest {
disabledReason.compareAndSet(null, "we are testing disable()");
// No snapshots are generated because snapshots are disabled.
generator.publishLogDelta(TEST_DELTA, TEST_IMAGE,
new LogDeltaManifest(MetadataProvenance.EMPTY, 1, 100, 100));
new LogDeltaManifest(MetadataProvenance.EMPTY, LeaderAndEpoch.UNKNOWN, 1, 100, 100));
}
assertEquals(Collections.emptyList(), emitter.images());
faultHandler.maybeRethrowFirstException();
@ -147,17 +148,17 @@ public class SnapshotGeneratorTest {
build()) {
// This image isn't published yet.
generator.publishLogDelta(TEST_DELTA, TEST_IMAGE,
new LogDeltaManifest(MetadataProvenance.EMPTY, 1, 100, 50));
new LogDeltaManifest(MetadataProvenance.EMPTY, LeaderAndEpoch.UNKNOWN, 1, 100, 50));
assertEquals(Collections.emptyList(), emitter.images());
mockTime.sleep(TimeUnit.MINUTES.toNanos(40));
// Next image is published because of the time delay.
generator.publishLogDelta(TEST_DELTA, TEST_IMAGE,
new LogDeltaManifest(MetadataProvenance.EMPTY, 1, 100, 50));
new LogDeltaManifest(MetadataProvenance.EMPTY, LeaderAndEpoch.UNKNOWN, 1, 100, 50));
TestUtils.waitForCondition(() -> emitter.images().size() == 1, "images.size == 1");
// bytesSinceLastSnapshot was reset to 0 by the previous snapshot,
// so this does not trigger a new snapshot.
generator.publishLogDelta(TEST_DELTA, TEST_IMAGE,
new LogDeltaManifest(MetadataProvenance.EMPTY, 1, 100, 150));
new LogDeltaManifest(MetadataProvenance.EMPTY, LeaderAndEpoch.UNKNOWN, 1, 100, 150));
}
assertEquals(Arrays.asList(TEST_IMAGE), emitter.images());
faultHandler.maybeRethrowFirstException();
@ -173,7 +174,7 @@ public class SnapshotGeneratorTest {
build()) {
for (int i = 0; i < 2; i++) {
generator.publishLogDelta(TEST_DELTA, TEST_IMAGE,
new LogDeltaManifest(MetadataProvenance.EMPTY, 1, 10000, 50000));
new LogDeltaManifest(MetadataProvenance.EMPTY, LeaderAndEpoch.UNKNOWN, 1, 10000, 50000));
}
}
assertEquals(Collections.emptyList(), emitter.images());

View File

@ -54,13 +54,19 @@ public class BrokerRegistrationTest {
Stream.of(new SimpleEntry<>("foo", VersionRange.of((short) 2, (short) 3)),
new SimpleEntry<>("bar", VersionRange.of((short) 1, (short) 4))).collect(
Collectors.toMap(SimpleEntry::getKey, SimpleEntry::getValue)),
Optional.of("myrack"), false, true));
Optional.of("myrack"), false, true),
new BrokerRegistration(3, 0, Uuid.fromString("1t8VyWx2TCSTpUWuqj-FOw"),
Arrays.asList(new Endpoint("INTERNAL", SecurityProtocol.PLAINTEXT, "localhost", 9093)),
Stream.of(new SimpleEntry<>("metadata.version", VersionRange.of((short) 7, (short) 7)))
.collect(Collectors.toMap(SimpleEntry::getKey, SimpleEntry::getValue)),
Optional.empty(), false, true, Optional.of(10L)));
@Test
public void testValues() {
assertEquals(0, REGISTRATIONS.get(0).id());
assertEquals(1, REGISTRATIONS.get(1).id());
assertEquals(2, REGISTRATIONS.get(2).id());
assertEquals(3, REGISTRATIONS.get(3).id());
}
@Test
@ -69,9 +75,13 @@ public class BrokerRegistrationTest {
assertNotEquals(REGISTRATIONS.get(1), REGISTRATIONS.get(0));
assertNotEquals(REGISTRATIONS.get(0), REGISTRATIONS.get(2));
assertNotEquals(REGISTRATIONS.get(2), REGISTRATIONS.get(0));
assertNotEquals(REGISTRATIONS.get(3), REGISTRATIONS.get(0));
assertNotEquals(REGISTRATIONS.get(3), REGISTRATIONS.get(1));
assertNotEquals(REGISTRATIONS.get(3), REGISTRATIONS.get(2));
assertEquals(REGISTRATIONS.get(0), REGISTRATIONS.get(0));
assertEquals(REGISTRATIONS.get(1), REGISTRATIONS.get(1));
assertEquals(REGISTRATIONS.get(2), REGISTRATIONS.get(2));
assertEquals(REGISTRATIONS.get(3), REGISTRATIONS.get(3));
}
@Test
@ -80,14 +90,20 @@ public class BrokerRegistrationTest {
"incarnationId=3MfdxWlNSn2UDYsmDP1pYg, listeners=[Endpoint(" +
"listenerName='INTERNAL', securityProtocol=PLAINTEXT, " +
"host='localhost', port=9091)], supportedFeatures={foo: 1-2}, " +
"rack=Optional.empty, fenced=true, inControlledShutdown=false, isMigratingZkBroker=false)",
"rack=Optional.empty, fenced=true, inControlledShutdown=false, migratingZkBrokerEpoch=-1)",
REGISTRATIONS.get(1).toString());
assertEquals("BrokerRegistration(id=2, epoch=0, " +
"incarnationId=eY7oaG1RREie5Kk9uy1l6g, listeners=[Endpoint(" +
"listenerName='INTERNAL', securityProtocol=PLAINTEXT, " +
"host='localhost', port=9092)], supportedFeatures={bar: 1-4, foo: 2-3}, " +
"rack=Optional[myrack], fenced=false, inControlledShutdown=true, isMigratingZkBroker=false)",
"rack=Optional[myrack], fenced=false, inControlledShutdown=true, migratingZkBrokerEpoch=-1)",
REGISTRATIONS.get(2).toString());
assertEquals("BrokerRegistration(id=3, epoch=0, " +
"incarnationId=1t8VyWx2TCSTpUWuqj-FOw, listeners=[Endpoint(" +
"listenerName='INTERNAL', securityProtocol=PLAINTEXT, " +
"host='localhost', port=9093)], supportedFeatures={metadata.version: 7}, " +
"rack=Optional.empty, fenced=false, inControlledShutdown=true, migratingZkBrokerEpoch=10)",
REGISTRATIONS.get(3).toString());
}
@Test
@ -95,6 +111,7 @@ public class BrokerRegistrationTest {
testRoundTrip(REGISTRATIONS.get(0));
testRoundTrip(REGISTRATIONS.get(1));
testRoundTrip(REGISTRATIONS.get(2));
testRoundTrip(REGISTRATIONS.get(3));
}
private void testRoundTrip(BrokerRegistration registration) {
@ -117,5 +134,7 @@ public class BrokerRegistrationTest {
REGISTRATIONS.get(1).node("INTERNAL"));
assertEquals(Optional.of(new Node(2, "localhost", 9092, "myrack")),
REGISTRATIONS.get(2).node("INTERNAL"));
assertEquals(Optional.of(new Node(3, "localhost", 9093, null)),
REGISTRATIONS.get(3).node("INTERNAL"));
}
}

View File

@ -203,6 +203,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
remote_kafka=None,
controller_num_nodes_override=0,
allow_zk_with_kraft=False,
quorum_info_provider=None
):
"""
:param context: test context
@ -262,15 +263,19 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
:param KafkaService remote_kafka: process.roles=controller for this cluster when not None; ignored when using ZooKeeper
:param int controller_num_nodes_override: the number of nodes to use in the cluster, instead of 5, 3, or 1 based on num_nodes, if positive, not using ZooKeeper, and remote_kafka is not None; ignored otherwise
:param bool allow_zk_with_kraft: if True, then allow a KRaft broker or controller to also use ZooKeeper
:param quorum_info_provider: A function that takes this KafkaService as an argument and returns a ServiceQuorumInfo. If this is None, then the ServiceQuorumInfo is generated from the test context
"""
self.zk = zk
self.remote_kafka = remote_kafka
self.allow_zk_with_kraft = allow_zk_with_kraft
self.quorum_info = quorum.ServiceQuorumInfo(self, context)
if quorum_info_provider is None:
self.quorum_info = quorum.ServiceQuorumInfo.from_test_context(self, context)
else:
self.quorum_info = quorum_info_provider(self)
self.controller_quorum = None # will define below if necessary
self.remote_controller_quorum = None # will define below if necessary
self.configured_for_zk_migration = False
if num_nodes < 1:
raise Exception("Must set a positive number of nodes: %i" % num_nodes)
@ -427,6 +432,38 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
self.colocated_nodes_started = 0
self.nodes_to_start = self.nodes
def reconfigure_zk_for_migration(self, kraft_quorum):
self.configured_for_zk_migration = True
self.controller_quorum = kraft_quorum
# Set the migration properties
self.server_prop_overrides.extend([
["zookeeper.metadata.migration.enable", "true"],
["controller.quorum.voters", kraft_quorum.controller_quorum_voters],
["controller.listener.names", kraft_quorum.controller_listener_names]
])
# Add a port mapping for the controller listener.
# This is not added to "advertised.listeners" because of configured_for_zk_migration=True
self.port_mappings[kraft_quorum.controller_listener_names] = kraft_quorum.port_mappings.get(kraft_quorum.controller_listener_names)
def reconfigure_zk_as_kraft(self, kraft_quorum):
self.configured_for_zk_migration = True
# Remove the configs we set in reconfigure_zk_for_migration
props = []
for prop in self.server_prop_overrides:
if not prop[0].startswith("controller"):
props.append(prop)
self.server_prop_overrides.clear()
self.server_prop_overrides.extend(props)
del self.port_mappings[kraft_quorum.controller_listener_names]
# Set the quorum info to remote KRaft
self.quorum_info = quorum.ServiceQuorumInfo(quorum.remote_kraft, self)
self.remote_controller_quorum = kraft_quorum
self.controller_quorum = kraft_quorum
def num_kraft_controllers(self, num_nodes_broker_role, controller_num_nodes_override):
if controller_num_nodes_override < 0:
raise Exception("controller_num_nodes_override must not be negative: %i" % controller_num_nodes_override)
@ -567,7 +604,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
def alive(self, node):
return len(self.pids(node)) > 0
def start(self, add_principals="", nodes_to_skip=[], timeout_sec=60):
def start(self, add_principals="", nodes_to_skip=[], timeout_sec=60, **kwargs):
"""
Start the Kafka broker and wait until it registers its ID in ZooKeeper
Startup will be skipped for any nodes in nodes_to_skip. These nodes can be started later via add_broker
@ -606,7 +643,9 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
if self.remote_controller_quorum:
self.remote_controller_quorum.start()
Service.start(self)
Service.start(self, **kwargs)
if self.concurrent_start:
# We didn't wait while starting each individual node, so wait for them all now
for node in self.nodes_to_start:
@ -764,7 +803,9 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
return cmd
def controller_listener_name_list(self, node):
if self.quorum_info.using_zk:
if self.quorum_info.using_zk and self.configured_for_zk_migration:
return [self.controller_listener_name(self.controller_quorum.controller_security_protocol)]
elif self.quorum_info.using_zk:
return []
broker_to_controller_listener_name = self.controller_listener_name(self.controller_quorum.controller_security_protocol)
# Brokers always use the first controller listener, so include a second, inter-controller listener if and only if:
@ -775,7 +816,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
self.controller_quorum.intercontroller_security_protocol != self.controller_quorum.controller_security_protocol) \
else [broker_to_controller_listener_name]
def start_node(self, node, timeout_sec=60):
def start_node(self, node, timeout_sec=60, **kwargs):
if node not in self.nodes_to_start:
return
node.account.mkdirs(KafkaService.PERSISTENT_ROOT)

View File

@ -78,9 +78,32 @@ class ServiceQuorumInfo:
True iff quorum_type==COLOCATED_KRAFT
"""
def __init__(self, kafka, context):
def __init__(self, quorum_type, kafka):
"""
:param quorum_type : str
The type of quorum being used. Either "ZK", "COLOCATED_KRAFT", or "REMOTE_KRAFT"
:param context : TestContext
The test context within which the this instance and the
given Kafka service is being instantiated
"""
if quorum_type != zk and kafka.zk and not kafka.allow_zk_with_kraft:
raise Exception("Cannot use ZooKeeper while specifying a KRaft metadata quorum unless explicitly allowing it")
if kafka.remote_kafka and quorum_type != remote_kraft:
raise Exception("Cannot specify a remote Kafka service unless using a remote KRaft metadata quorum (should not happen)")
self.kafka = kafka
self.quorum_type = quorum_type
self.using_zk = quorum_type == zk
self.using_kraft = not self.using_zk
self.has_brokers = self.using_kraft and not kafka.remote_kafka
self.has_controllers = quorum_type == colocated_kraft or kafka.remote_kafka
self.has_brokers_and_controllers = quorum_type == colocated_kraft
@staticmethod
def from_test_context(kafka, context):
"""
:param kafka : KafkaService
The service for which this instance exposes quorum-related
information
@ -90,17 +113,8 @@ class ServiceQuorumInfo:
"""
quorum_type = for_test(context)
if quorum_type != zk and kafka.zk and not kafka.allow_zk_with_kraft:
raise Exception("Cannot use ZooKeeper while specifying a KRaft metadata quorum unless explicitly allowing it")
if kafka.remote_kafka and quorum_type != remote_kraft:
raise Exception("Cannot specify a remote Kafka service unless using a remote KRaft metadata quorum (should not happen)")
self.kafka = kafka
self.quorum_type = quorum_type
self.using_zk = quorum_type == zk
self.using_kraft = not self.using_zk
self.has_brokers = self.using_kraft and not kafka.remote_kafka
self.has_controllers = quorum_type == colocated_kraft or kafka.remote_kafka
self.has_brokers_and_controllers = quorum_type == colocated_kraft
return ServiceQuorumInfo(quorum_type, kafka)
class NodeQuorumInfo:
"""

View File

@ -0,0 +1,117 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from functools import partial
import time
from ducktape.utils.util import wait_until
from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.services.kafka import KafkaService
from kafkatest.services.kafka.config_property import CLUSTER_ID
from kafkatest.services.kafka.quorum import remote_kraft, ServiceQuorumInfo, zk
from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
from kafkatest.utils import is_int
from kafkatest.version import DEV_BRANCH
class TestMigration(ProduceConsumeValidateTest):
def __init__(self, test_context):
super(TestMigration, self).__init__(test_context=test_context)
def setUp(self):
self.topic = "test_topic"
self.partitions = 3
self.replication_factor = 3
# Producer and consumer
self.producer_throughput = 1000
self.num_producers = 1
self.num_consumers = 1
def wait_until_rejoin(self):
for partition in range(0, self.partitions):
wait_until(lambda: len(self.kafka.isr_idx_list(self.topic, partition)) == self.replication_factor, timeout_sec=60,
backoff_sec=1, err_msg="Replicas did not rejoin the ISR in a reasonable amount of time")
def do_migration(self):
# Start up KRaft controller in migration mode
remote_quorum = partial(ServiceQuorumInfo, remote_kraft)
controller = KafkaService(self.test_context, num_nodes=1, zk=self.zk, version=DEV_BRANCH,
allow_zk_with_kraft=True,
remote_kafka=self.kafka,
server_prop_overrides=[["zookeeper.connect", self.zk.connect_setting()],
["zookeeper.metadata.migration.enable", "true"]],
quorum_info_provider=remote_quorum)
controller.start()
self.logger.info("Restarting ZK brokers in migration mode")
self.kafka.reconfigure_zk_for_migration(controller)
for node in self.kafka.nodes:
self.kafka.stop_node(node)
self.kafka.start_node(node)
self.wait_until_rejoin()
self.logger.info("Restarting ZK brokers as KRaft brokers")
time.sleep(10)
self.kafka.reconfigure_zk_as_kraft(controller)
for node in self.kafka.nodes:
self.kafka.stop_node(node)
self.kafka.start_node(node)
self.wait_until_rejoin()
def test_online_migration(self):
zk_quorum = partial(ServiceQuorumInfo, zk)
self.zk = ZookeeperService(self.test_context, num_nodes=1, version=DEV_BRANCH)
self.kafka = KafkaService(self.test_context,
num_nodes=3,
zk=self.zk,
version=DEV_BRANCH,
quorum_info_provider=zk_quorum,
allow_zk_with_kraft=True,
server_prop_overrides=[["zookeeper.metadata.migration.enable", "false"]])
self.kafka.security_protocol = "PLAINTEXT"
self.kafka.interbroker_security_protocol = "PLAINTEXT"
self.zk.start()
self.logger.info("Pre-generating clusterId for ZK.")
cluster_id_json = """{"version": "1", "id": "%s"}""" % CLUSTER_ID
self.zk.create(path="/cluster")
self.zk.create(path="/cluster/id", value=cluster_id_json)
self.kafka.start()
topic_cfg = {
"topic": self.topic,
"partitions": self.partitions,
"replication-factor": self.replication_factor,
"configs": {"min.insync.replicas": 2}
}
self.kafka.create_topic(topic_cfg)
self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka,
self.topic, throughput=self.producer_throughput,
message_validator=is_int,
compression_types=["none"],
version=DEV_BRANCH)
self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka,
self.topic, consumer_timeout_ms=30000,
message_validator=is_int, version=DEV_BRANCH)
self.run_produce_consume_validate(core_test_action=self.do_migration)