KAFKA-18136: Remove zk migration from code base (#18016)

Reviewers: Mickael Maison <mickael.maison@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
TengYao Chi 2024-12-13 01:34:29 +08:00 committed by GitHub
parent 4c5ea05ec8
commit 772aa241b2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
40 changed files with 71 additions and 1465 deletions

View File

@ -61,20 +61,13 @@ object Kafka extends Logging {
props
}
// For Zk mode, the API forwarding is currently enabled only under migration flag. We can
// directly do a static IBP check to see API forwarding is enabled here because IBP check is
// static in Zk mode.
private def enableApiForwarding(config: KafkaConfig) =
config.migrationEnabled && config.interBrokerProtocolVersion.isApiForwardingEnabled
private def buildServer(props: Properties): Server = {
val config = KafkaConfig.fromProps(props, doLog = false)
if (config.requiresZookeeper) {
new KafkaServer(
config,
Time.SYSTEM,
threadNamePrefix = None,
enableForwarding = enableApiForwarding(config)
threadNamePrefix = None
)
} else {
new KafkaRaftServer(

View File

@ -21,7 +21,7 @@ import com.yammer.metrics.core.{Meter, Timer}
import java.util.concurrent.TimeUnit
import kafka.common._
import kafka.cluster.Broker
import kafka.controller.KafkaController.{ActiveBrokerCountMetricName, ActiveControllerCountMetricName, AlterReassignmentsCallback, ControllerStateMetricName, ElectLeadersCallback, FencedBrokerCountMetricName, GlobalPartitionCountMetricName, GlobalTopicCountMetricName, ListReassignmentsCallback, OfflinePartitionsCountMetricName, PreferredReplicaImbalanceCountMetricName, ReplicasIneligibleToDeleteCountMetricName, ReplicasToDeleteCountMetricName, TopicsIneligibleToDeleteCountMetricName, TopicsToDeleteCountMetricName, UpdateFeaturesCallback, ZkMigrationStateMetricName}
import kafka.controller.KafkaController.{ActiveBrokerCountMetricName, ActiveControllerCountMetricName, AlterReassignmentsCallback, ControllerStateMetricName, ElectLeadersCallback, FencedBrokerCountMetricName, GlobalPartitionCountMetricName, GlobalTopicCountMetricName, ListReassignmentsCallback, OfflinePartitionsCountMetricName, PreferredReplicaImbalanceCountMetricName, ReplicasIneligibleToDeleteCountMetricName, ReplicasToDeleteCountMetricName, TopicsIneligibleToDeleteCountMetricName, TopicsToDeleteCountMetricName, UpdateFeaturesCallback}
import kafka.coordinator.transaction.ZkProducerIdManager
import kafka.server._
import kafka.server.metadata.ZkFinalizedFeatureCache
@ -42,7 +42,6 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{AbstractControlRequest, ApiError, LeaderAndIsrResponse, UpdateFeaturesRequest, UpdateMetadataResponse}
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState}
import org.apache.kafka.metadata.migration.ZkMigrationState
import org.apache.kafka.server.BrokerFeatures
import org.apache.kafka.server.common.{AdminOperationException, ProducerIdsBlock}
import org.apache.kafka.server.metrics.KafkaMetricsGroup
@ -81,11 +80,9 @@ object KafkaController extends Logging {
private val ReplicasIneligibleToDeleteCountMetricName = "ReplicasIneligibleToDeleteCount"
private val ActiveBrokerCountMetricName = "ActiveBrokerCount"
private val FencedBrokerCountMetricName = "FencedBrokerCount"
private val ZkMigrationStateMetricName = "ZkMigrationState"
// package private for testing
private[controller] val MetricNames = Set(
ZkMigrationStateMetricName,
ActiveControllerCountMetricName,
OfflinePartitionsCountMetricName,
PreferredReplicaImbalanceCountMetricName,
@ -174,7 +171,6 @@ class KafkaController(val config: KafkaConfig,
/* single-thread scheduler to clean expired tokens */
private val tokenCleanScheduler = new KafkaScheduler(1, true, "delegation-token-cleaner")
metricsGroup.newGauge(ZkMigrationStateMetricName, () => ZkMigrationState.ZK.value().intValue())
metricsGroup.newGauge(ActiveControllerCountMetricName, () => if (isActive) 1 else 0)
metricsGroup.newGauge(OfflinePartitionsCountMetricName, () => offlinePartitionCount)
metricsGroup.newGauge(PreferredReplicaImbalanceCountMetricName, () => preferredReplicaImbalanceCount)

View File

@ -98,8 +98,6 @@ object KafkaRaftManager {
// These constraints are enforced in KafkaServer, but repeating them here to guard against future callers
if (config.processRoles.nonEmpty) {
throw new RuntimeException("Not deleting metadata log dir since this node is in KRaft mode.")
} else if (!config.migrationEnabled) {
throw new RuntimeException("Not deleting metadata log dir since migrations are not enabled.")
} else {
val metadataDir = new File(config.metadataLogDir)
val logDirName = UnifiedLog.logDirName(Topic.CLUSTER_METADATA_TOPIC_PARTITION)

View File

@ -3160,7 +3160,7 @@ class KafkaApis(val requestChannel: RequestChannel,
new KafkaPrincipal(entry.principalType, entry.principalName))
// DelegationToken changes only need to be executed on the controller during migration
if (config.migrationEnabled && (!zkSupport.controller.isActive)) {
if (!zkSupport.controller.isActive) {
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
CreateDelegationTokenResponse.prepareResponse(request.context.requestVersion, requestThrottleMs,
Errors.NOT_CONTROLLER, owner, requester))
@ -3204,7 +3204,7 @@ class KafkaApis(val requestChannel: RequestChannel,
.setExpiryTimestampMs(expiryTimestamp)))
}
// DelegationToken changes only need to be executed on the controller during migration
if (config.migrationEnabled && (!zkSupport.controller.isActive)) {
if (!zkSupport.controller.isActive) {
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
new RenewDelegationTokenResponse(
new RenewDelegationTokenResponseData()
@ -3250,7 +3250,7 @@ class KafkaApis(val requestChannel: RequestChannel,
.setExpiryTimestampMs(expiryTimestamp)))
}
// DelegationToken changes only need to be executed on the controller during migration
if (config.migrationEnabled && (!zkSupport.controller.isActive)) {
if (!zkSupport.controller.isActive) {
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
new ExpireDelegationTokenResponse(
new ExpireDelegationTokenResponseData()

View File

@ -337,9 +337,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
def requiresZookeeper: Boolean = processRoles.isEmpty
def usesSelfManagedQuorum: Boolean = processRoles.nonEmpty
val migrationEnabled: Boolean = getBoolean(KRaftConfigs.MIGRATION_ENABLED_CONFIG)
val migrationMetadataMinBatchSize: Int = getInt(KRaftConfigs.MIGRATION_METADATA_MIN_BATCH_SIZE_CONFIG)
private def parseProcessRoles(): Set[ProcessRole] = {
val roles = getList(KRaftConfigs.PROCESS_ROLES_CONFIG).asScala.map {
case "broker" => ProcessRole.BrokerRole
@ -804,9 +801,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
throw new ConfigException(s"Missing required configuration `${ZkConfigs.ZK_CONNECT_CONFIG}` which has no default value.")
}
if (brokerIdGenerationEnable) {
if (migrationEnabled) {
require(brokerId >= 0, "broker.id generation is incompatible with ZooKeeper migration. Please stop using it before enabling migration (set broker.id to a value greater or equal to 0).")
}
require(brokerId >= -1 && brokerId <= maxReservedBrokerId, "broker.id must be greater than or equal to -1 and not greater than reserved.broker.max.id")
} else {
require(brokerId >= 0, "broker.id must be greater than or equal to 0")
@ -817,11 +811,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
throw new ConfigException(s"Missing configuration `${KRaftConfigs.NODE_ID_CONFIG}` which is required " +
s"when `process.roles` is defined (i.e. when running in KRaft mode).")
}
if (migrationEnabled) {
if (zkConnect == null) {
throw new ConfigException(s"If using `${KRaftConfigs.MIGRATION_ENABLED_CONFIG}` in KRaft mode, `${ZkConfigs.ZK_CONNECT_CONFIG}` must also be set.")
}
}
}
require(logRollTimeMillis >= 1, "log.roll.ms must be greater than or equal to 1")
require(logRollTimeJitterMillis >= 0, "log.roll.jitter.ms must be greater than or equal to 0")
@ -846,15 +835,7 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
)
}
}
def validateQuorumVotersAndQuorumBootstrapServerForMigration(): Unit = {
if (voterIds.isEmpty && quorumConfig.bootstrapServers.isEmpty) {
throw new ConfigException(
s"""If using ${KRaftConfigs.MIGRATION_ENABLED_CONFIG}, either ${QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG} must
|contain the set of bootstrap controllers or ${QuorumConfig.QUORUM_VOTERS_CONFIG} must contain a parseable
|set of controllers.""".stripMargin.replace("\n", " ")
)
}
}
def validateControlPlaneListenerEmptyForKRaft(): Unit = {
require(controlPlaneListenerName.isEmpty,
s"${SocketServerConfigs.CONTROL_PLANE_LISTENER_NAME_CONFIG} is not supported in KRaft mode.")
@ -922,25 +903,9 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
validateAdvertisedControllerListenersNonEmptyForKRaftController()
validateControllerListenerNamesMustAppearInListenersForKRaftController()
} else {
// ZK-based
if (migrationEnabled) {
require(brokerId >= 0,
"broker.id generation is incompatible with ZooKeeper migration. Please stop using it before enabling migration (set broker.id to a value greater or equal to 0).")
validateQuorumVotersAndQuorumBootstrapServerForMigration()
require(controllerListenerNames.nonEmpty,
s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} must not be empty when running in ZooKeeper migration mode: ${controllerListenerNames.asJava}")
require(interBrokerProtocolVersion.isMigrationSupported, s"Cannot enable ZooKeeper migration without setting " +
s"'${ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG}' to 3.4 or higher")
if (logDirs.size > 1) {
require(interBrokerProtocolVersion.isDirectoryAssignmentSupported,
s"Cannot enable ZooKeeper migration with multiple log directories (aka JBOD) without setting " +
s"'${ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG}' to ${MetadataVersion.IBP_3_7_IV2} or higher")
}
} else {
// controller listener names must be empty when not in KRaft mode
require(controllerListenerNames.isEmpty,
s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} must be empty when not running in KRaft mode: ${controllerListenerNames.asJava}")
}
// controller listener names must be empty when not in KRaft mode
require(controllerListenerNames.isEmpty,
s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} must be empty when not running in KRaft mode: ${controllerListenerNames.asJava}")
}
val listenerNames = listeners.map(_.listenerName).toSet

View File

@ -26,15 +26,13 @@ import kafka.log.LogManager
import kafka.log.remote.RemoteLogManager
import kafka.metrics.KafkaMetricsReporter
import kafka.network.{ControlPlaneAcceptor, DataPlaneAcceptor, RequestChannel, SocketServer}
import kafka.raft.KafkaRaftManager
import kafka.server.metadata.{OffsetTrackingListener, ZkConfigRepository, ZkMetadataCache}
import kafka.server.metadata.{ZkConfigRepository, ZkMetadataCache}
import kafka.utils._
import kafka.zk.{AdminZkClient, BrokerInfo, KafkaZkClient}
import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater, MetadataRecoveryStrategy, NetworkClient, NetworkClientUtils}
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.message.BrokerRegistrationRequestData.{Listener, ListenerCollection}
import org.apache.kafka.common.message.ControlledShutdownRequestData
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network._
@ -48,19 +46,15 @@ import org.apache.kafka.common.{Endpoint, Node, TopicPartition}
import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.coordinator.transaction.ProducerIdManager
import org.apache.kafka.image.loader.metrics.MetadataLoaderMetrics
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble.VerificationFlag
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble.VerificationFlag.REQUIRE_V0
import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble}
import org.apache.kafka.metadata.{BrokerState, MetadataRecordSerde, VersionRange}
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.raft.Endpoints
import org.apache.kafka.metadata.BrokerState
import org.apache.kafka.security.CredentialProvider
import org.apache.kafka.server.BrokerFeatures
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.MetadataVersion._
import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion, NodeToControllerChannelManager}
import org.apache.kafka.server.common.NodeToControllerChannelManager
import org.apache.kafka.server.config.{ConfigType, ZkConfigs}
import org.apache.kafka.server.fault.LoggingFaultHandler
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.util.KafkaScheduler
@ -75,7 +69,7 @@ import java.time.Duration
import java.util
import java.util.concurrent._
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import java.util.{Optional, OptionalInt, OptionalLong}
import java.util.{Optional, OptionalInt}
import scala.collection.{Map, Seq}
import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters.RichOption
@ -114,8 +108,7 @@ object KafkaServer {
class KafkaServer(
val config: KafkaConfig,
time: Time = Time.SYSTEM,
threadNamePrefix: Option[String] = None,
enableForwarding: Boolean = false
threadNamePrefix: Option[String] = None
) extends KafkaBroker with Server {
private val startupComplete = new AtomicBoolean(false)
@ -205,7 +198,6 @@ class KafkaServer(
@volatile def kafkaController: KafkaController = _kafkaController
var lifecycleManager: BrokerLifecycleManager = _
private var raftManager: KafkaRaftManager[ApiMessageAndVersion] = _
@volatile var brokerEpochManager: ZkBrokerEpochManager = _
@ -241,9 +233,6 @@ class KafkaServer(
val initialMetaPropsEnsemble = {
val loader = new MetaPropertiesEnsemble.Loader()
loader.addLogDirs(config.logDirs.asJava)
if (config.migrationEnabled) {
loader.addMetadataLogDir(config.metadataLogDir)
}
loader.load()
}
@ -252,11 +241,7 @@ class KafkaServer(
} else {
OptionalInt.of(config.brokerId)
}
val verificationFlags = if (config.migrationEnabled) {
util.EnumSet.noneOf(classOf[VerificationFlag])
} else {
util.EnumSet.of(REQUIRE_V0)
}
val verificationFlags = util.EnumSet.of(REQUIRE_V0)
initialMetaPropsEnsemble.verify(Optional.of(_clusterId), verificationId, verificationFlags)
/* generate brokerId */
@ -294,11 +279,6 @@ class KafkaServer(
val builder = new MetaProperties.Builder(e.getValue).
setClusterId(_clusterId).
setNodeId(config.brokerId)
if (!builder.directoryId().isPresent) {
if (config.migrationEnabled) {
builder.setDirectoryId(copier.generateValidDirectoryId())
}
}
copier.setLogDirProps(logDir, builder.build())
})
copier.emptyLogDirs().clear()
@ -332,8 +312,7 @@ class KafkaServer(
metadataCache = MetadataCache.zkMetadataCache(
config.brokerId,
config.interBrokerProtocolVersion,
brokerFeatures,
config.migrationEnabled)
brokerFeatures)
val controllerNodeProvider = new MetadataCacheControllerNodeProvider(metadataCache, config,
() => Option(quorumControllerNodeProvider).map(_.getControllerInfo()))
@ -360,11 +339,7 @@ class KafkaServer(
clientToControllerChannelManager.start()
/* start forwarding manager */
var autoTopicCreationChannel = Option.empty[NodeToControllerChannelManager]
if (enableForwarding) {
this.forwardingManager = Some(ForwardingManager(clientToControllerChannelManager, metrics))
autoTopicCreationChannel = Some(clientToControllerChannelManager)
}
val autoTopicCreationChannel = Option.empty[NodeToControllerChannelManager]
val apiVersionManager = ApiVersionManager(
ListenerType.ZK_BROKER,
@ -415,81 +390,6 @@ class KafkaServer(
_kafkaController = new KafkaController(config, zkClient, time, metrics, brokerInfo, brokerEpoch, tokenManager, brokerFeatures, metadataCache, threadNamePrefix)
kafkaController.startup()
if (config.migrationEnabled) {
logger.info("Starting up additional components for ZooKeeper migration")
lifecycleManager = new BrokerLifecycleManager(config,
time,
s"zk-broker-${config.nodeId}-",
isZkBroker = true,
logManager.directoryIdsSet)
// For ZK brokers in migration mode, always delete the metadata partition on startup.
logger.info(s"Deleting local metadata log from ${config.metadataLogDir} since this is a ZK broker in migration mode.")
KafkaRaftManager.maybeDeleteMetadataLogDir(config)
logger.info("Successfully deleted local metadata log. It will be re-created.")
// If the ZK broker is in migration mode, start up a RaftManager to learn about the new KRaft controller
val quorumVoters = QuorumConfig.parseVoterConnections(config.quorumConfig.voters)
raftManager = new KafkaRaftManager[ApiMessageAndVersion](
metaPropsEnsemble.clusterId().get(),
config,
// metadata log dir and directory.id must exist because migration is enabled
metaPropsEnsemble.logDirProps.get(metaPropsEnsemble.metadataLogDir.get).directoryId.get,
new MetadataRecordSerde,
KafkaRaftServer.MetadataPartition,
KafkaRaftServer.MetadataTopicId,
time,
metrics,
threadNamePrefix,
CompletableFuture.completedFuture(quorumVoters),
QuorumConfig.parseBootstrapServers(config.quorumConfig.bootstrapServers),
// Endpoint information is only needed for KRaft controllers (voters). ZK brokers
// (observers) can never be KRaft controllers
Endpoints.empty(),
fatalFaultHandler = new LoggingFaultHandler("raftManager", () => shutdown())
)
quorumControllerNodeProvider = RaftControllerNodeProvider(raftManager, config)
val brokerToQuorumChannelManager = new NodeToControllerChannelManagerImpl(
controllerNodeProvider = quorumControllerNodeProvider,
time = time,
metrics = metrics,
config = config,
channelName = "quorum",
s"zk-broker-${config.nodeId}-",
retryTimeoutMs = config.requestTimeoutMs.longValue
)
val listener = new OffsetTrackingListener()
raftManager.register(listener)
raftManager.startup()
val networkListeners = new ListenerCollection()
config.effectiveAdvertisedBrokerListeners.foreach { ep =>
networkListeners.add(new Listener().
setHost(if (Utils.isBlank(ep.host)) InetAddress.getLocalHost.getCanonicalHostName else ep.host).
setName(ep.listenerName.value()).
setPort(if (ep.port == 0) socketServer.boundPort(ep.listenerName) else ep.port).
setSecurityProtocol(ep.securityProtocol.id))
}
val features = BrokerFeatures.createDefaultFeatureMap(BrokerFeatures.createDefault(config.unstableFeatureVersionsEnabled)).asScala
// Even though ZK brokers don't use "metadata.version" feature, we need to overwrite it with our IBP as part of registration
// so the KRaft controller can verify that all brokers are on the same IBP before starting the migration.
val featuresRemapped = features + (MetadataVersion.FEATURE_NAME ->
VersionRange.of(config.interBrokerProtocolVersion.featureLevel(), config.interBrokerProtocolVersion.featureLevel()))
lifecycleManager.start(
() => listener.highestOffset,
brokerToQuorumChannelManager,
clusterId,
networkListeners,
featuresRemapped.asJava,
OptionalLong.empty()
)
logger.debug("Start RaftManager")
}
// Used by ZK brokers during a KRaft migration. When talking to a KRaft controller, we need to use the epoch
// from BrokerLifecycleManager rather than ZK (via KafkaController)
brokerEpochManager = new ZkBrokerEpochManager(metadataCache, kafkaController, Option(lifecycleManager))
@ -630,18 +530,6 @@ class KafkaServer(
dynamicConfigManager = new ZkConfigManager(zkClient, dynamicConfigHandlers)
dynamicConfigManager.startup()
if (config.migrationEnabled && lifecycleManager != null) {
lifecycleManager.initialCatchUpFuture.whenComplete { case (_, t) =>
if (t != null) {
fatal("Encountered an exception when waiting to catch up with KRaft metadata log", t)
shutdown()
} else {
info("Finished catching up on KRaft metadata log, requesting that the KRaft controller unfence this broker")
lifecycleManager.setReadyToUnfence()
}
}
}
val enableRequestProcessingFuture = socketServer.enableRequestProcessing(authorizerFutures)
// Block here until all the authorizer futures are complete
try {
@ -946,21 +834,6 @@ class KafkaServer(
_brokerState = BrokerState.PENDING_CONTROLLED_SHUTDOWN
if (config.migrationEnabled && lifecycleManager != null && metadataCache.getControllerId.exists(_.isInstanceOf[KRaftCachedControllerId])) {
// For now we'll send the heartbeat with WantShutDown set so the KRaft controller can see a broker
// shutting down without waiting for the heartbeat to time out.
info("Notifying KRaft of controlled shutdown")
lifecycleManager.beginControlledShutdown()
try {
lifecycleManager.controlledShutdownFuture.get(5L, TimeUnit.MINUTES)
} catch {
case _: TimeoutException =>
error("Timed out waiting for the controller to approve controlled shutdown")
case e: Throwable =>
error("Got unexpected exception waiting for controlled shutdown future", e)
}
}
val shutdownSucceeded = doControlledShutdown(config.controlledShutdownMaxRetries.intValue)
if (!shutdownSucceeded)
@ -1070,9 +943,6 @@ class KafkaServer(
// Clear all reconfigurable instances stored in DynamicBrokerConfig
config.dynamicConfig.clear()
if (raftManager != null)
CoreUtils.swallow(raftManager.shutdown(), this)
if (lifecycleManager != null) {
lifecycleManager.close()
}

View File

@ -117,10 +117,9 @@ trait MetadataCache {
object MetadataCache {
def zkMetadataCache(brokerId: Int,
metadataVersion: MetadataVersion,
brokerFeatures: BrokerFeatures = BrokerFeatures.createEmpty(),
zkMigrationEnabled: Boolean = false)
brokerFeatures: BrokerFeatures = BrokerFeatures.createEmpty())
: ZkMetadataCache = {
new ZkMetadataCache(brokerId, metadataVersion, brokerFeatures, zkMigrationEnabled)
new ZkMetadataCache(brokerId, metadataVersion, brokerFeatures)
}
def kRaftMetadataCache(

View File

@ -2093,24 +2093,6 @@ class ReplicaManager(val config: KafkaConfig,
s"Latest known controller epoch is $controllerEpoch")
leaderAndIsrRequest.getErrorResponse(0, Errors.STALE_CONTROLLER_EPOCH.exception)
} else {
// In migration mode, reconcile missed topic deletions when handling full LISR from KRaft controller.
// LISR "type" field was previously unspecified (0), so if we see it set to Full (2), then we know the
// request came from a KRaft controller.
//
// Note that we have to do this first, before anything else, since topics may be recreated with the same
// name, but a different ID. And in that case, we need to move aside the old version of those topics
// (with the obsolete topic ID) before doing anything else.
if (config.migrationEnabled &&
leaderAndIsrRequest.isKRaftController &&
leaderAndIsrRequest.requestType() == AbstractControlRequest.Type.FULL)
{
val strays = LogManager.findStrayReplicas(localBrokerId, leaderAndIsrRequest, logManager.allLogs)
stateChangeLogger.info(s"While handling full LeaderAndIsr request from KRaft " +
s"controller $controllerId with correlation id $correlationId, found ${strays.size} " +
"stray partition(s).")
updateStrayLogs(strays)
}
val responseMap = new mutable.HashMap[TopicPartition, Errors]
controllerEpoch = leaderAndIsrRequest.controllerEpoch
@ -2671,16 +2653,12 @@ class ReplicaManager(val config: KafkaConfig,
s"for partitions ${partitionsWithOfflineFutureReplica.mkString(",")} because they are in the failed log directory $dir.")
}
logManager.handleLogDirFailure(dir)
if (dir == new File(config.metadataLogDir).getAbsolutePath && (config.processRoles.nonEmpty || config.migrationEnabled)) {
if (dir == new File(config.metadataLogDir).getAbsolutePath && config.processRoles.nonEmpty) {
fatal(s"Shutdown broker because the metadata log dir $dir has failed")
Exit.halt(1)
}
if (notifyController) {
if (config.migrationEnabled) {
fatal(s"Shutdown broker because some log directory has failed during migration mode: $dir")
Exit.halt(1)
}
if (zkClient.isEmpty) {
if (uuid.isDefined) {
directoryEventHandler.handleFailure(uuid.get)

View File

@ -157,8 +157,7 @@ object ZkMetadataCache {
class ZkMetadataCache(
brokerId: Int,
metadataVersion: MetadataVersion,
brokerFeatures: BrokerFeatures,
zkMigrationEnabled: Boolean = false)
brokerFeatures: BrokerFeatures)
extends MetadataCache with ZkFinalizedFeatureCache with Logging {
private val partitionMetadataLock = new ReentrantReadWriteLock()
@ -476,9 +475,6 @@ class ZkMetadataCache(
stateChangeLogger.error(s"Received UpdateMetadataRequest with Type=FULL (2), but version of " +
updateMetadataRequest.version() + ", which should not be possible. Not treating this as a full " +
"metadata update")
} else if (!zkMigrationEnabled) {
stateChangeLogger.error(s"Received UpdateMetadataRequest with Type=FULL (2), but ZK migrations " +
s"are not enabled on this broker. Not treating this as a full metadata update")
} else {
// When handling a UMR from a KRaft controller, we may have to insert some partition
// deletions at the beginning, to handle the different way topic deletion works in KRaft

View File

@ -30,13 +30,12 @@ import org.apache.kafka.common.security.token.delegation.{DelegationToken, Token
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
import org.apache.kafka.metadata.LeaderAndIsr
import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState
import org.apache.kafka.security.authorizer.AclEntry
import org.apache.kafka.server.config.{ConfigType, ZkConfigs}
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.storage.internals.log.LogConfig
import org.apache.zookeeper.KeeperException.{Code, NodeExistsException}
import org.apache.zookeeper.OpResult.{CheckResult, CreateResult, ErrorResult, SetDataResult}
import org.apache.zookeeper.OpResult.{CreateResult, ErrorResult, SetDataResult}
import org.apache.zookeeper.client.ZKClientConfig
import org.apache.zookeeper.common.ZKConfig
import org.apache.zookeeper.data.{ACL, Stat}
@ -165,92 +164,6 @@ class KafkaZkClient private[zk] (
tryCreateControllerZNodeAndIncrementEpoch()
}
/**
* Registers a given KRaft controller in zookeeper as the active controller. Unlike the ZK equivalent of this method,
* this creates /controller as a persistent znode. This prevents ZK brokers from attempting to claim the controller
* leadership during a KRaft leadership failover.
*
* This method is called at the beginning of a KRaft migration and during subsequent KRaft leadership changes during
* the migration.
*
* To ensure that the KRaft controller epoch exceeds the current ZK controller epoch, this registration algorithm
* uses a conditional update on the /controller and /controller_epoch znodes.
*
* If a new controller is registered concurrently with this registration, one of the two will fail the CAS
* operation on /controller_epoch. For KRaft, we have an extra guard against the registered KRaft epoch going
* backwards. If a KRaft controller had previously registered, an additional CAS operation is done on the /controller
* ZNode to ensure that the KRaft epoch being registered is newer.
*
* @param kraftControllerId ID of the KRaft controller node
* @param kraftControllerEpoch Epoch of the KRaft controller node
* @return A result object containing the written ZK controller epoch and version, or nothing.
*/
def tryRegisterKRaftControllerAsActiveController(kraftControllerId: Int, kraftControllerEpoch: Int): KRaftRegistrationResult = {
val timestamp = time.milliseconds()
val curEpochOpt: Option[(Int, Int)] = getControllerEpoch.map(e => (e._1, e._2.getVersion))
val controllerOpt = getControllerRegistration
// If we have a KRaft epoch registered in /controller, and it is not _older_ than the requested epoch, throw an error.
controllerOpt.flatMap(_.kraftEpoch).foreach { kraftEpochInZk =>
if (kraftEpochInZk >= kraftControllerEpoch) {
throw new ControllerMovedException(s"Cannot register KRaft controller $kraftControllerId with epoch $kraftControllerEpoch " +
s"as the current controller register in ZK has the same or newer epoch $kraftEpochInZk.")
}
}
curEpochOpt match {
case None =>
throw new IllegalStateException(s"Cannot register KRaft controller $kraftControllerId as the active controller " +
s"since there is no ZK controller epoch present.")
case Some((curEpoch: Int, curEpochZk: Int)) =>
val newControllerEpoch = curEpoch + 1
val response = controllerOpt match {
case Some(controller) =>
info(s"KRaft controller $kraftControllerId overwriting ${ControllerZNode.path} to become the active " +
s"controller with ZK epoch $newControllerEpoch. The previous controller was ${controller.broker}.")
retryRequestUntilConnected(
MultiRequest(Seq(
SetDataOp(ControllerEpochZNode.path, ControllerEpochZNode.encode(newControllerEpoch), curEpochZk),
DeleteOp(ControllerZNode.path, controller.zkVersion),
CreateOp(ControllerZNode.path, ControllerZNode.encode(kraftControllerId, timestamp, kraftControllerEpoch),
defaultAcls(ControllerZNode.path), CreateMode.PERSISTENT)))
)
case None =>
info(s"KRaft controller $kraftControllerId creating ${ControllerZNode.path} to become the active " +
s"controller with ZK epoch $newControllerEpoch. There was no active controller.")
retryRequestUntilConnected(
MultiRequest(Seq(
SetDataOp(ControllerEpochZNode.path, ControllerEpochZNode.encode(newControllerEpoch), curEpochZk),
CreateOp(ControllerZNode.path, ControllerZNode.encode(kraftControllerId, timestamp, kraftControllerEpoch),
defaultAcls(ControllerZNode.path), CreateMode.PERSISTENT)))
)
}
val failureSuffix = s"while trying to register KRaft controller $kraftControllerId with ZK epoch " +
s"$newControllerEpoch. KRaft controller was not registered."
response.resultCode match {
case Code.OK =>
info(s"Successfully registered KRaft controller $kraftControllerId with ZK epoch $newControllerEpoch")
// First op is always SetData on /controller_epoch
val setDataResult = response.zkOpResults.head.rawOpResult.asInstanceOf[SetDataResult]
SuccessfulRegistrationResult(newControllerEpoch, setDataResult.getStat.getVersion)
case Code.BADVERSION =>
info(s"The ZK controller epoch changed $failureSuffix")
FailedRegistrationResult()
case Code.NONODE =>
info(s"The ephemeral node at ${ControllerZNode.path} went away $failureSuffix")
FailedRegistrationResult()
case Code.NODEEXISTS =>
info(s"The ephemeral node at ${ControllerZNode.path} was created by another controller $failureSuffix")
FailedRegistrationResult()
case code =>
error(s"ZooKeeper had an error $failureSuffix")
throw KeeperException.create(code)
}
}
}
private def maybeCreateControllerEpochZNode(): (Int, Int) = {
createControllerEpochRaw(KafkaController.InitialControllerEpoch).resultCode match {
case Code.OK =>
@ -1723,36 +1636,6 @@ class KafkaZkClient private[zk] (
}
}
def getOrCreateMigrationState(initialState: ZkMigrationLeadershipState): ZkMigrationLeadershipState = {
val getDataRequest = GetDataRequest(MigrationZNode.path)
val getDataResponse = retryRequestUntilConnected(getDataRequest)
getDataResponse.resultCode match {
case Code.OK =>
MigrationZNode.decode(getDataResponse.data, getDataResponse.stat.getVersion, getDataResponse.stat.getMtime)
case Code.NONODE =>
createInitialMigrationState(initialState)
case _ => throw getDataResponse.resultException.get
}
}
private def createInitialMigrationState(initialState: ZkMigrationLeadershipState): ZkMigrationLeadershipState = {
val createRequest = CreateRequest(
MigrationZNode.path,
MigrationZNode.encode(initialState),
defaultAcls(MigrationZNode.path),
CreateMode.PERSISTENT)
val response = retryRequestUntilConnected(createRequest)
response.maybeThrow()
initialState.withMigrationZkVersion(0)
}
def updateMigrationState(migrationState: ZkMigrationLeadershipState): ZkMigrationLeadershipState = {
val req = SetDataRequest(MigrationZNode.path, MigrationZNode.encode(migrationState), migrationState.migrationZkVersion())
val resp = retryRequestUntilConnected(req)
resp.maybeThrow()
migrationState.withMigrationZkVersion(resp.stat.getVersion)
}
/**
* Return the ACLs of the node of the given path
* @param path the given path for the node
@ -1971,137 +1854,6 @@ class KafkaZkClient private[zk] (
}
}
/**
* Safely performs a sequence of writes to ZooKeeper as part of a KRaft migration. For each request in {@code requests}, we
* wrap the operation in a multi-op transaction that includes a check op on /controller_epoch and /migration. This ensures
* that another KRaft controller or another ZK controller has unexpectedly taken leadership.
*
* In cases of KRaft failover during a migration, it is possible that a write is attempted before the old KRaft controller
* receives the new leader information. In this case, the check op on /migration acts as a guard against multiple writers.
*
* The multi-op for the last request in {@code requests} is used to update the /migration node with the latest migration
* state. This effectively checkpoints the progress of the migration in ZK relative to the metadata log.
*
* Each multi-op request is atomic. The overall sequence of multi-op requests is not atomic and we may fail during any
* of them. When the KRaft controller recovers the migration state, it will re-apply all of the writes needed to update
* the ZK state with the latest KRaft state. In the case of Create or Delete operations, these will fail if applied
* twice, so we need to ignore NodeExists and NoNode failures for those cases.
*
* @param requests A sequence of ZK requests. Only Create, Delete, and SetData are supported.
* @param migrationState The current migration state. This is written out as part of the final multi-op request.
* @return The new version of /migration ZNode and the sequence of responses for the given requests.
*/
def retryMigrationRequestsUntilConnected[Req <: AsyncRequest](requests: Seq[Req],
migrationState: ZkMigrationLeadershipState): (Int, Seq[Req#Response]) = {
if (requests.isEmpty) {
return (migrationState.migrationZkVersion(), Seq.empty)
}
def wrapMigrationRequest(request: Req, lastRequestInBatch: Boolean): MultiRequest = {
// Wrap a single request with the multi-op transactional request.
val checkOp = CheckOp(ControllerEpochZNode.path, migrationState.zkControllerEpochZkVersion())
val migrationOp = if (lastRequestInBatch) {
SetDataOp(MigrationZNode.path, MigrationZNode.encode(migrationState), migrationState.migrationZkVersion())
} else {
CheckOp(MigrationZNode.path, migrationState.migrationZkVersion())
}
request match {
case CreateRequest(path, data, acl, createMode, ctx) =>
MultiRequest(Seq(checkOp, migrationOp, CreateOp(path, data, acl, createMode)), ctx)
case DeleteRequest(path, version, ctx) =>
MultiRequest(Seq(checkOp, migrationOp, DeleteOp(path, version)), ctx)
case SetDataRequest(path, data, version, ctx) =>
MultiRequest(Seq(checkOp, migrationOp, SetDataOp(path, data, version)), ctx)
case _ => throw new IllegalStateException(s"$request does not need controller epoch check")
}
}
def handleUnwrappedMigrationResult(migrationOp: ZkOp, migrationResult: OpResult): Int = {
// Handle just the operation that updated /migration ZNode
val (path: String, data: Option[Array[Byte]], version: Int) = migrationOp match {
case CheckOp(path, version) => (path, None, version)
case SetDataOp(path, data, version) => (path, Some(data), version)
case _ => throw new IllegalStateException("Unexpected result on /migration znode")
}
migrationResult match {
case _: CheckResult => version
case setDataResult: SetDataResult => setDataResult.getStat.getVersion
case errorResult: ErrorResult =>
if (path.equals(MigrationZNode.path)) {
val errorCode = Code.get(errorResult.getErr)
if (errorCode == Code.BADVERSION) {
data match {
case Some(value) =>
val failedPayload = MigrationZNode.decode(value, version, -1)
throw new RuntimeException(
s"Conditional update on KRaft Migration ZNode failed. Sent zkVersion = $version. The failed " +
s"write was: $failedPayload. This indicates that another KRaft controller is making writes to ZooKeeper.")
case None =>
throw new RuntimeException(s"Check op on KRaft Migration ZNode failed. Sent zkVersion = $version. " +
s"This indicates that another KRaft controller is making writes to ZooKeeper.")
}
} else if (errorCode == Code.OK) {
// This means the Check or SetData op would have been ok, but failed because of another operation in this multi-op
version
} else {
throw KeeperException.create(errorCode, path)
}
} else {
throw new RuntimeException(s"Got migration result for incorrect path $path")
}
case _ => throw new RuntimeException(
s"Expected either CheckResult, SetDataResult, or ErrorResult for migration op, but saw $migrationResult")
}
}
def unwrapMigrationResponse(response: AsyncResponse, lastRequestInBatch: Boolean): (AsyncResponse, Int) = {
response match {
case MultiResponse(resultCode, _, ctx, zkOpResults, responseMetadata) =>
zkOpResults match {
case Seq(ZkOpResult(checkOp: CheckOp, checkOpResult), ZkOpResult(migrationOp: CheckOp, migrationResult), zkOpResult) =>
// Matches all requests except or the last one (CheckOp on /migration)
if (lastRequestInBatch) {
throw new IllegalStateException("Should not see a Check operation on /migration in the last request.")
}
handleUnwrappedCheckOp(checkOp, checkOpResult)
val migrationVersion = handleUnwrappedMigrationResult(migrationOp, migrationResult)
(handleUnwrappedZkOp(zkOpResult, resultCode, ctx, responseMetadata), migrationVersion)
case Seq(ZkOpResult(checkOp: CheckOp, checkOpResult), ZkOpResult(migrationOp: SetDataOp, migrationResult), zkOpResult) =>
// Matches the last request in a batch (SetDataOp on /migration)
if (!lastRequestInBatch) {
throw new IllegalStateException("Should only see a SetData operation on /migration in the last request.")
}
handleUnwrappedCheckOp(checkOp, checkOpResult)
val migrationVersion = handleUnwrappedMigrationResult(migrationOp, migrationResult)
(handleUnwrappedZkOp(zkOpResult, resultCode, ctx, responseMetadata), migrationVersion)
case null => throw KeeperException.create(resultCode)
case _ => throw new IllegalStateException(
s"Cannot unwrap $response because it does not contain the expected operations for a migration operation.")
}
case _ => throw new IllegalStateException(s"Cannot unwrap $response because it is not a MultiResponse")
}
}
migrationState.zkControllerEpochZkVersion() match {
case ZkVersion.MatchAnyVersion => throw new IllegalArgumentException(
s"Expected a controller epoch zkVersion when making migration writes, not -1.")
case version if version >= 0 =>
logger.trace(s"Performing ${requests.size} migration update(s) with migrationState=$migrationState")
val wrappedRequests = requests.map(req => wrapMigrationRequest(req, req == requests.last))
val results = retryRequestsUntilConnected(wrappedRequests)
val unwrappedResults = results.map(resp => unwrapMigrationResponse(resp, resp == results.last))
val migrationZkVersion = unwrappedResults.last._2
// Return the new version of /migration and the sequence of responses to the original requests
(migrationZkVersion, unwrappedResults.map(_._1.asInstanceOf[Req#Response]))
case invalidVersion =>
throw new IllegalArgumentException(
s"Expected controller epoch zkVersion $invalidVersion should be non-negative or equal to ${ZkVersion.MatchAnyVersion}")
}
}
private def retryRequestsUntilConnected[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = {
val remainingRequests = new mutable.ArrayBuffer(requests.size) ++= requests
val responses = new mutable.ArrayBuffer[Req#Response]

View File

@ -37,7 +37,6 @@ import org.apache.kafka.common.security.token.delegation.TokenInformation
import org.apache.kafka.common.utils.{SecurityUtils, Time}
import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState}
import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.security.authorizer.AclEntry
import org.apache.kafka.server.common.{MetadataVersion, ProducerIdsBlock}
@ -1044,43 +1043,6 @@ object FeatureZNode {
}
}
object MigrationZNode {
val path = "/migration"
def encode(migration: ZkMigrationLeadershipState): Array[Byte] = {
val jsonMap = Map(
"version" -> 0,
"kraft_controller_id" -> migration.kraftControllerId(),
"kraft_controller_epoch" -> migration.kraftControllerEpoch(),
"kraft_metadata_offset" -> migration.kraftMetadataOffset(),
"kraft_metadata_epoch" -> migration.kraftMetadataEpoch()
)
Json.encodeAsBytes(jsonMap.asJava)
}
def decode(bytes: Array[Byte], zkVersion: Int, modifyTimeMs: Long): ZkMigrationLeadershipState = {
val jsonDataAsString = bytes.map(_.toChar).mkString
Json.parseBytes(bytes).map(_.asJsonObject).flatMap { js =>
val version = js("version").to[Int]
if (version != 0) {
throw new KafkaException(s"Encountered unknown version $version when parsing migration json $jsonDataAsString")
}
val controllerId = js("kraft_controller_id").to[Int]
val controllerEpoch = js("kraft_controller_epoch").to[Int]
val metadataOffset = js("kraft_metadata_offset").to[Long]
val metadataEpoch = js("kraft_metadata_epoch").to[Int]
Some(new ZkMigrationLeadershipState(
controllerId,
controllerEpoch,
metadataOffset,
metadataEpoch,
modifyTimeMs,
zkVersion,
ZkMigrationLeadershipState.EMPTY.zkControllerEpoch(),
ZkMigrationLeadershipState.EMPTY.zkControllerEpochZkVersion()))
}.getOrElse(throw new KafkaException(s"Failed to parse the migration json $jsonDataAsString"))
}
}
object ZkData {
@ -1101,7 +1063,6 @@ object ZkData {
LogDirEventNotificationZNode.path,
DelegationTokenAuthZNode.path,
ExtendedAclZNode.path,
MigrationZNode.path,
FeatureZNode.path) ++ ZkAclStore.securePaths
// These are persistent ZK paths that should exist on kafka broker startup.

View File

@ -82,7 +82,7 @@ class ZooKeeperQuorumImplementation(
startup: Boolean,
threadNamePrefix: Option[String],
): KafkaBroker = {
val server = new KafkaServer(config, time, threadNamePrefix, false)
val server = new KafkaServer(config, time, threadNamePrefix)
if (startup) server.startup()
server
}

View File

@ -408,13 +408,7 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
if (isKRaftTest()) {
createBroker(config, brokerTime(config.brokerId), startup = false)
} else {
TestUtils.createServer(
config,
time = brokerTime(config.brokerId),
threadNamePrefix = None,
startup = false,
enableZkApiForwarding = config.migrationEnabled && config.interBrokerProtocolVersion.isApiForwardingEnabled
)
TestUtils.createServer(config, time = brokerTime(config.brokerId), threadNamePrefix = None, startup = false)
}
}

View File

@ -20,7 +20,7 @@ package kafka.metrics
import java.lang.management.ManagementFactory
import java.util.Properties
import javax.management.ObjectName
import com.yammer.metrics.core.{Gauge, MetricPredicate}
import com.yammer.metrics.core.MetricPredicate
import org.junit.jupiter.api.Assertions._
import kafka.integration.KafkaServerTestHarness
import kafka.server._
@ -33,7 +33,6 @@ import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.metrics.JmxReporter
import org.apache.kafka.common.utils.Time
import org.apache.kafka.metadata.migration.ZkMigrationState
import org.apache.kafka.server.config.ServerLogConfigs
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector}
import org.apache.kafka.storage.log.metrics.BrokerTopicMetrics
@ -229,15 +228,10 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
"kafka.controller:type=KafkaController,name=MetadataErrorCount",
"kafka.controller:type=KafkaController,name=OfflinePartitionsCount",
"kafka.controller:type=KafkaController,name=PreferredReplicaImbalanceCount",
"kafka.controller:type=KafkaController,name=ZkMigrationState",
).foreach(expected => {
assertEquals(1, metrics.keySet.asScala.count(_.getMBeanName.equals(expected)),
s"Unable to find $expected")
})
val zkStateMetricName = metrics.keySet.asScala.filter(_.getMBeanName == "kafka.controller:type=KafkaController,name=ZkMigrationState").head
val zkStateGauge = metrics.get(zkStateMetricName).asInstanceOf[Gauge[Int]]
assertEquals(ZkMigrationState.NONE.value().intValue(), zkStateGauge.value())
}
/**

View File

@ -17,7 +17,7 @@
package kafka.server
import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterInstance, ClusterTest, ClusterTestExtensions, Type}
import org.apache.kafka.common.test.api.{ClusterInstance, ClusterTest, ClusterTestExtensions, Type}
import org.apache.kafka.clients.ClientResponse
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
import org.apache.kafka.common.message.{BrokerRegistrationRequestData, CreateTopicsRequestData}
@ -143,36 +143,7 @@ class BrokerRegistrationRequestTest {
Errors.forCode(resp.topics().find(topicName).errorCode())
}
@ClusterTest(types = Array(Type.KRAFT), brokers = 0, controllers = 1, metadataVersion = MetadataVersion.IBP_3_4_IV0,
serverProperties = Array(new ClusterConfigProperty(key = "zookeeper.metadata.migration.enable", value = "false")))
def testRegisterZkWithKRaftMigrationDisabled(clusterInstance: ClusterInstance): Unit = {
val clusterId = clusterInstance.clusterId()
val channelManager = brokerToControllerChannelManager(clusterInstance)
try {
channelManager.start()
assertEquals(
Errors.BROKER_ID_NOT_REGISTERED,
registerBroker(channelManager, clusterId, 100, Some(1), Some((MetadataVersion.IBP_3_3_IV0, MetadataVersion.IBP_3_3_IV0))))
assertEquals(
Errors.BROKER_ID_NOT_REGISTERED,
registerBroker(channelManager, clusterId, 100, Some(1), None))
assertEquals(
Errors.BROKER_ID_NOT_REGISTERED,
registerBroker(channelManager, clusterId, 100, Some(1), Some((MetadataVersion.IBP_3_4_IV0, MetadataVersion.IBP_3_4_IV0))))
assertEquals(
Errors.NONE,
registerBroker(channelManager, clusterId, 100, None, Some((MetadataVersion.IBP_3_4_IV0, MetadataVersion.IBP_3_4_IV0))))
} finally {
channelManager.shutdown()
}
}
@ClusterTest(types = Array(Type.KRAFT), brokers = 0, controllers = 1, metadataVersion = MetadataVersion.IBP_3_3_IV3,
serverProperties = Array(new ClusterConfigProperty(key = "zookeeper.metadata.migration.enable", value = "false")))
@ClusterTest(types = Array(Type.KRAFT), brokers = 0, controllers = 1, metadataVersion = MetadataVersion.IBP_3_3_IV3)
def testRegisterZkWith33Controller(clusterInstance: ClusterInstance): Unit = {
// Verify that a controller running an old metadata.version cannot register a ZK broker
val clusterId = clusterInstance.clusterId()

View File

@ -1829,45 +1829,6 @@ class KafkaConfigTest {
)
}
@Test
def testMigrationCannotBeEnabledWithJBOD(): Unit = {
val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect, port = TestUtils.MockZkPort, logDirCount = 2)
props.setProperty(KRaftConfigs.MIGRATION_ENABLED_CONFIG, "true")
props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "3000@localhost:9093")
props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
props.setProperty(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG, MetadataVersion.IBP_3_7_IV1.version())
assertEquals(
"requirement failed: Cannot enable ZooKeeper migration with multiple log directories " +
"(aka JBOD) without setting 'inter.broker.protocol.version' to 3.7-IV2 or higher",
assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props)).getMessage)
}
@Test
def testMigrationCannotBeEnabledWithBrokerIdGeneration(): Unit = {
val props = TestUtils.createBrokerConfig(-1, TestUtils.MockZkConnect, port = TestUtils.MockZkPort, logDirCount = 2)
props.setProperty(KRaftConfigs.MIGRATION_ENABLED_CONFIG, "true")
props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "3000@localhost:9093")
props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
assertEquals(
"requirement failed: broker.id generation is incompatible with ZooKeeper migration. Please stop using it before enabling migration (set broker.id to a value greater or equal to 0).",
assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props)).getMessage)
}
@Test
def testMigrationEnabledKRaftMode(): Unit = {
val props = new Properties()
props.putAll(kraftProps())
props.setProperty(KRaftConfigs.MIGRATION_ENABLED_CONFIG, "true")
assertEquals(
"If using `zookeeper.metadata.migration.enable` in KRaft mode, `zookeeper.connect` must also be set.",
assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props)).getMessage)
props.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181")
KafkaConfig.fromProps(props)
}
@Test
def testConsumerGroupSessionTimeoutValidation(): Unit = {
val props = new Properties()

View File

@ -1017,75 +1017,6 @@ class MetadataCacheTest {
(initialTopicIds, initialTopicStates, newTopicIds, newPartitionStates)
}
/**
* Verify the behavior of ZkMetadataCache when handling "Full" UpdateMetadataRequest
*/
@Test
def testHandleFullUpdateMetadataRequestInZkMigration(): Unit = {
val (initialTopicIds, initialTopicStates, newTopicIds, newPartitionStates) = setupInitialAndFullMetadata()
val updateMetadataRequestBuilder = () => new UpdateMetadataRequest.Builder(8, 1, 42, brokerEpoch,
newPartitionStates.asJava, Seq.empty.asJava, newTopicIds.asJava, true, AbstractControlRequest.Type.FULL).build()
def verifyMetadataCache(
updateMetadataRequest: UpdateMetadataRequest,
zkMigrationEnabled: Boolean = true
)(
verifier: ZkMetadataCache => Unit
): Unit = {
val cache = MetadataCache.zkMetadataCache(1, MetadataVersion.latestTesting(), zkMigrationEnabled = zkMigrationEnabled)
cache.updateMetadata(1, new UpdateMetadataRequest.Builder(8, 1, 42, brokerEpoch,
initialTopicStates.flatMap(_._2.values).toList.asJava, Seq.empty.asJava, initialTopicIds.asJava).build())
cache.updateMetadata(1, updateMetadataRequest)
verifier.apply(cache)
}
// KRaft=false Type=FULL, migration disabled
var updateMetadataRequest = updateMetadataRequestBuilder.apply()
updateMetadataRequest.data().setIsKRaftController(true)
updateMetadataRequest.data().setType(AbstractControlRequest.Type.FULL.toByte)
verifyMetadataCache(updateMetadataRequest, zkMigrationEnabled = false) { cache =>
assertEquals(3, cache.getAllTopics().size)
assertTrue(cache.contains("test-topic-1"))
assertTrue(cache.contains("test-topic-1"))
}
// KRaft=true Type=FULL
updateMetadataRequest = updateMetadataRequestBuilder.apply()
verifyMetadataCache(updateMetadataRequest) { cache =>
assertEquals(1, cache.getAllTopics().size)
assertFalse(cache.contains("test-topic-1"))
assertFalse(cache.contains("test-topic-1"))
}
// KRaft=false Type=FULL
updateMetadataRequest = updateMetadataRequestBuilder.apply()
updateMetadataRequest.data().setIsKRaftController(false)
verifyMetadataCache(updateMetadataRequest) { cache =>
assertEquals(3, cache.getAllTopics().size)
assertTrue(cache.contains("test-topic-1"))
assertTrue(cache.contains("test-topic-1"))
}
// KRaft=true Type=INCREMENTAL
updateMetadataRequest = updateMetadataRequestBuilder.apply()
updateMetadataRequest.data().setType(AbstractControlRequest.Type.INCREMENTAL.toByte)
verifyMetadataCache(updateMetadataRequest) { cache =>
assertEquals(3, cache.getAllTopics().size)
assertTrue(cache.contains("test-topic-1"))
assertTrue(cache.contains("test-topic-1"))
}
// KRaft=true Type=UNKNOWN
updateMetadataRequest = updateMetadataRequestBuilder.apply()
updateMetadataRequest.data().setType(AbstractControlRequest.Type.UNKNOWN.toByte)
verifyMetadataCache(updateMetadataRequest) { cache =>
assertEquals(3, cache.getAllTopics().size)
assertTrue(cache.contains("test-topic-1"))
assertTrue(cache.contains("test-topic-1"))
}
}
@Test
def testGetOfflineReplicasConsidersDirAssignment(): Unit = {
case class Broker(id: Int, dirs: util.List[Uuid])

View File

@ -56,13 +56,10 @@ import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.image._
import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState}
import org.apache.kafka.metadata.migration.ZkMigrationState
import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils}
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0
import org.apache.kafka.server.common.{DirectoryEventHandler, KRaftVersion, MetadataVersion, OffsetAndEpoch, RequestLocal, StopPartition}
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerLogConfigs}
import org.apache.kafka.server.config.{ReplicationConfigs, ServerLogConfigs}
import org.apache.kafka.server.log.remote.storage._
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
import org.apache.kafka.server.network.BrokerEndPoint
@ -6245,8 +6242,7 @@ class ReplicaManagerTest {
private def imageFromTopics(topicsImage: TopicsImage): MetadataImage = {
val featuresImageLatest = new FeaturesImage(
Collections.emptyMap(),
MetadataVersion.latestProduction(),
ZkMigrationState.NONE)
MetadataVersion.latestProduction())
new MetadataImage(
new MetadataProvenance(100L, 10, 1000L, true),
featuresImageLatest,
@ -6479,20 +6475,6 @@ class ReplicaManagerTest {
val newFoo0 = new TopicIdPartition(Uuid.fromString("JRCmVxWxQamFs4S8NXYufg"), new TopicPartition("foo", 0))
val bar0 = new TopicIdPartition(Uuid.fromString("69O438ZkTSeqqclTtZO2KA"), new TopicPartition("bar", 0))
def setupReplicaManagerForKRaftMigrationTest(): ReplicaManager = {
setupReplicaManagerWithMockedPurgatories(
brokerId = 3,
timer = new MockTimer(time),
aliveBrokerIds = Seq(0, 1, 2),
propsModifier = props => {
props.setProperty(KRaftConfigs.MIGRATION_ENABLED_CONFIG, "true")
props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "1000@localhost:9093")
props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
props.setProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT")
},
defaultTopicRemoteLogStorageEnable = false)
}
def verifyPartitionIsOnlineAndHasId(
replicaManager: ReplicaManager,
topicIdPartition: TopicIdPartition
@ -6517,59 +6499,6 @@ class ReplicaManagerTest {
assertEquals(HostedPartition.None, partition, s"Expected ${topicIdPartition} to be offline, but it was: ${partition}")
}
@Test
def testFullLairDuringKRaftMigration(): Unit = {
val replicaManager = setupReplicaManagerForKRaftMigrationTest()
try {
val becomeLeaderRequest = LogManagerTest.createLeaderAndIsrRequestForStrayDetection(
Seq(foo0, foo1, bar0), Seq(3, 4, 3))
replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ())
verifyPartitionIsOnlineAndHasId(replicaManager, foo0)
verifyPartitionIsOnlineAndHasId(replicaManager, foo1)
verifyPartitionIsOnlineAndHasId(replicaManager, bar0)
} finally {
replicaManager.shutdown(checkpointHW = false)
}
}
@Test
def testFullLairDuringKRaftMigrationRemovesOld(): Unit = {
val replicaManager = setupReplicaManagerForKRaftMigrationTest()
try {
val becomeLeaderRequest1 = LogManagerTest.createLeaderAndIsrRequestForStrayDetection(
Seq(foo0, foo1, bar0), Seq(3, 4, 3))
replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest1, (_, _) => ())
val becomeLeaderRequest2 = LogManagerTest.createLeaderAndIsrRequestForStrayDetection(
Seq(bar0), Seq(3, 4, 3))
replicaManager.becomeLeaderOrFollower(2, becomeLeaderRequest2, (_, _) => ())
verifyPartitionIsOffline(replicaManager, foo0)
verifyPartitionIsOffline(replicaManager, foo1)
verifyPartitionIsOnlineAndHasId(replicaManager, bar0)
} finally {
replicaManager.shutdown(checkpointHW = false)
}
}
@Test
def testFullLairDuringKRaftMigrationWithTopicRecreations(): Unit = {
val replicaManager = setupReplicaManagerForKRaftMigrationTest()
try {
val becomeLeaderRequest1 = LogManagerTest.createLeaderAndIsrRequestForStrayDetection(
Seq(foo0, foo1, bar0), Seq(3, 4, 3))
replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest1, (_, _) => ())
val becomeLeaderRequest2 = LogManagerTest.createLeaderAndIsrRequestForStrayDetection(
Seq(newFoo0, bar0), Seq(3, 4, 3))
replicaManager.becomeLeaderOrFollower(2, becomeLeaderRequest2, (_, _) => ())
verifyPartitionIsOnlineAndHasId(replicaManager, newFoo0)
verifyPartitionIsOffline(replicaManager, foo1)
verifyPartitionIsOnlineAndHasId(replicaManager, bar0)
} finally {
replicaManager.shutdown(checkpointHW = false)
}
}
@Test
def testRemoteReadQuotaExceeded(): Unit = {
when(mockRemoteLogManager.getFetchThrottleTimeMs).thenReturn(quotaExceededThrottleTime)

View File

@ -159,12 +159,11 @@ object TestUtils extends Logging {
* @param config The configuration of the server
*/
def createServer(config: KafkaConfig, time: Time = Time.SYSTEM): KafkaServer = {
createServer(config, time, None, startup = true, enableZkApiForwarding = false)
createServer(config, time, None, startup = true)
}
def createServer(config: KafkaConfig, time: Time, threadNamePrefix: Option[String],
startup: Boolean, enableZkApiForwarding: Boolean) = {
val server = new KafkaServer(config, time, threadNamePrefix, enableForwarding = enableZkApiForwarding)
def createServer(config: KafkaConfig, time: Time, threadNamePrefix: Option[String], startup: Boolean): KafkaServer = {
val server = new KafkaServer(config, time, threadNamePrefix)
if (startup) server.startup()
server
}

View File

@ -220,7 +220,7 @@ public class ReplicaFetcherThreadBenchmark {
// TODO: fix to support raft
ZkMetadataCache metadataCache = MetadataCache.zkMetadataCache(0,
config.interBrokerProtocolVersion(), BrokerFeatures.createEmpty(), false);
config.interBrokerProtocolVersion(), BrokerFeatures.createEmpty());
metadataCache.updateMetadata(0, updateMetadataRequest);
replicaManager = new ReplicaManagerBuilder().

View File

@ -21,7 +21,6 @@ import org.apache.kafka.common.metadata.AbortTransactionRecord;
import org.apache.kafka.common.metadata.BeginTransactionRecord;
import org.apache.kafka.common.metadata.EndTransactionRecord;
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
import org.apache.kafka.metadata.migration.ZkMigrationState;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
@ -29,8 +28,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
import static org.apache.kafka.metadata.migration.ZkMigrationState.NONE;
import static org.apache.kafka.metadata.migration.ZkMigrationState.POST_MIGRATION;
public class ActivationRecordsGenerator {
@ -90,12 +87,6 @@ public class ActivationRecordsGenerator {
// initialization, etc.
records.addAll(bootstrapMetadata.records());
if (metadataVersion.isMigrationSupported()) {
logMessageBuilder.append("Setting the ZK migration state to NONE since this is a de-novo " +
"KRaft cluster. ");
records.add(NONE.toRecord());
}
activationMessageConsumer.accept(logMessageBuilder.toString().trim());
if (metadataVersion.isMetadataTransactionSupported()) {
records.add(new ApiMessageAndVersion(new EndTransactionRecord(), (short) 0));
@ -108,7 +99,6 @@ public class ActivationRecordsGenerator {
static ControllerResult<Void> recordsForNonEmptyLog(
Consumer<String> activationMessageConsumer,
long transactionStartOffset,
ZkMigrationState zkMigrationState,
MetadataVersion curMetadataVersion
) {
StringBuilder logMessageBuilder = new StringBuilder("Performing controller activation. ");
@ -139,24 +129,6 @@ public class ActivationRecordsGenerator {
.append(". ");
}
if (curMetadataVersion.isMigrationSupported()) {
if (zkMigrationState == NONE || zkMigrationState == POST_MIGRATION) {
logMessageBuilder
.append("Loaded ZK migration state of ")
.append(zkMigrationState)
.append(". ");
if (zkMigrationState == NONE) {
logMessageBuilder.append("This is expected because this is a de-novo KRaft cluster.");
}
} else {
throw new RuntimeException("Cannot load ZkMigrationState." + zkMigrationState +
" because ZK migration is no longer supported.");
}
} else if (zkMigrationState != NONE) {
throw new RuntimeException("Should not have ZkMigrationState." + zkMigrationState +
" on a cluster running metadata version " + curMetadataVersion + ".");
}
activationMessageConsumer.accept(logMessageBuilder.toString().trim());
return ControllerResult.atomicOf(records, null);
}
@ -176,15 +148,13 @@ public class ActivationRecordsGenerator {
boolean isEmpty,
long transactionStartOffset,
BootstrapMetadata bootstrapMetadata,
ZkMigrationState zkMigrationState,
MetadataVersion curMetadataVersion
) {
if (isEmpty) {
return recordsForEmptyLog(activationMessageConsumer, transactionStartOffset,
bootstrapMetadata, bootstrapMetadata.metadataVersion());
} else {
return recordsForNonEmptyLog(activationMessageConsumer, transactionStartOffset,
zkMigrationState, curMetadataVersion);
return recordsForNonEmptyLog(activationMessageConsumer, transactionStartOffset, curMetadataVersion);
}
}
}

View File

@ -19,13 +19,11 @@ package org.apache.kafka.controller;
import org.apache.kafka.clients.admin.FeatureUpdate;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.common.metadata.ZkMigrationStateRecord;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.metadata.FinalizedControllerFeatures;
import org.apache.kafka.metadata.VersionRange;
import org.apache.kafka.metadata.migration.ZkMigrationState;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.Feature;
import org.apache.kafka.server.common.MetadataVersion;
@ -137,11 +135,6 @@ public class FeatureControlManager {
*/
private final TimelineObject<MetadataVersion> metadataVersion;
/**
* The current ZK migration state
*/
private final TimelineObject<ZkMigrationState> migrationControlState;
/**
* The minimum bootstrap version that we can't downgrade before.
*/
@ -165,7 +158,6 @@ public class FeatureControlManager {
this.finalizedVersions = new TimelineHashMap<>(snapshotRegistry, 0);
this.metadataVersion = new TimelineObject<>(snapshotRegistry, metadataVersion);
this.minimumBootstrapVersion = minimumBootstrapVersion;
this.migrationControlState = new TimelineObject<>(snapshotRegistry, ZkMigrationState.NONE);
this.clusterSupportDescriber = clusterSupportDescriber;
}
@ -200,10 +192,6 @@ public class FeatureControlManager {
return metadataVersion.get();
}
ZkMigrationState zkMigrationState() {
return migrationControlState.get();
}
private ApiError updateFeature(
String featureName,
short newVersion,
@ -335,7 +323,6 @@ public class FeatureControlManager {
Consumer<ApiMessageAndVersion> recordConsumer
) {
MetadataVersion currentVersion = metadataVersion();
ZkMigrationState zkMigrationState = zkMigrationState();
final MetadataVersion newVersion;
try {
newVersion = MetadataVersion.fromFeatureLevel(newVersionLevel);
@ -343,12 +330,6 @@ public class FeatureControlManager {
return invalidMetadataVersion(newVersionLevel, "Unknown metadata.version.");
}
// Don't allow metadata.version changes while we're migrating
if (zkMigrationState.inProgress()) {
return invalidMetadataVersion(newVersionLevel, "Unable to modify metadata.version while a " +
"ZK migration is in progress.");
}
// We cannot set a version earlier than IBP_3_3_IV0, since that was the first version that contained
// FeatureLevelRecord itself.
if (newVersion.isLessThan(minimumBootstrapVersion)) {
@ -427,19 +408,6 @@ public class FeatureControlManager {
}
}
public void replay(ZkMigrationStateRecord record) {
ZkMigrationState newState = ZkMigrationState.of(record.zkMigrationState());
ZkMigrationState previousState = migrationControlState.get();
if (previousState.equals(newState)) {
log.debug("Replayed a ZkMigrationStateRecord which did not alter the state from {}.",
previousState);
} else {
migrationControlState.set(newState);
log.info("Replayed a ZkMigrationStateRecord changing the migration state from {} to {}.",
previousState, newState);
}
}
boolean isControllerId(int nodeId) {
return quorumFeatures.isControllerId(nodeId);
}

View File

@ -82,7 +82,6 @@ import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
import org.apache.kafka.common.metadata.UserScramCredentialRecord;
import org.apache.kafka.common.metadata.ZkMigrationStateRecord;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaEntity;
@ -1139,7 +1138,6 @@ public final class QuorumController implements Controller {
logReplayTracker.empty(),
offsetControl.transactionStartOffset(),
bootstrapMetadata,
featureControl.zkMigrationState(),
featureControl.metadataVersion());
} catch (Throwable t) {
throw fatalFaultHandler.handleFault("exception while completing controller " +
@ -1257,7 +1255,9 @@ public final class QuorumController implements Controller {
// NoOpRecord is an empty record and doesn't need to be replayed
break;
case ZK_MIGRATION_STATE_RECORD:
featureControl.replay((ZkMigrationStateRecord) message);
// In 4.0, although migration is no longer supported and ZK has been removed from Kafka,
// users might migrate from ZK to KRaft in version 3.x and then perform a rolling upgrade to 4.0.
// Therefore, this case needs to be retained but will be a no-op.
break;
case BEGIN_TRANSACTION_RECORD:
offsetControl.replay((BeginTransactionRecord) message, offset);

View File

@ -17,7 +17,6 @@
package org.apache.kafka.controller;
import org.apache.kafka.metadata.ControllerRegistration;
import org.apache.kafka.metadata.VersionRange;
import org.apache.kafka.server.common.Feature;
import org.apache.kafka.server.common.MetadataVersion;
@ -110,27 +109,6 @@ public final class QuorumFeatures {
localSupportedFeature(featureName));
}
public Optional<String> reasonAllControllersZkMigrationNotReady(
MetadataVersion metadataVersion,
Map<Integer, ControllerRegistration> controllers
) {
if (!metadataVersion.isMigrationSupported()) {
return Optional.of("The metadata.version too low at " + metadataVersion);
} else if (!metadataVersion.isControllerRegistrationSupported()) {
return Optional.empty();
}
for (int quorumNodeId : quorumNodeIds) {
ControllerRegistration registration = controllers.get(quorumNodeId);
if (registration == null) {
return Optional.of("No registration found for controller " + quorumNodeId);
} else if (!registration.zkMigrationReady()) {
return Optional.of("Controller " + quorumNodeId + " has not enabled " +
"zookeeper.metadata.migration.enable");
}
}
return Optional.empty();
}
@Override
public int hashCode() {
return Objects.hash(nodeId, localSupportedFeatures, quorumNodeIds);

View File

@ -115,9 +115,6 @@ public class ControllerMetadataMetricsPublisher implements MetadataPublisher {
}
}
changes.apply(metrics);
if (delta.featuresDelta() != null) {
delta.featuresDelta().getZkMigrationStateChange().ifPresent(state -> metrics.setZkMigrationState(state.value()));
}
}
private void publishSnapshot(MetadataImage newImage) {
@ -156,7 +153,6 @@ public class ControllerMetadataMetricsPublisher implements MetadataPublisher {
metrics.setGlobalPartitionCount(totalPartitions);
metrics.setOfflinePartitionCount(offlinePartitions);
metrics.setPreferredReplicaImbalanceCount(partitionsWithoutPreferredLeader);
metrics.setZkMigrationState(newImage.features().zkMigrationState().value());
}
@Override

View File

@ -18,8 +18,6 @@
package org.apache.kafka.image;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.common.metadata.ZkMigrationStateRecord;
import org.apache.kafka.metadata.migration.ZkMigrationState;
import org.apache.kafka.server.common.MetadataVersion;
import java.util.HashMap;
@ -38,8 +36,6 @@ public final class FeaturesDelta {
private MetadataVersion metadataVersionChange = null;
private ZkMigrationState zkMigrationStateChange = null;
public FeaturesDelta(FeaturesImage image) {
this.image = image;
}
@ -48,10 +44,6 @@ public final class FeaturesDelta {
return changes;
}
public Optional<ZkMigrationState> getZkMigrationStateChange() {
return Optional.ofNullable(zkMigrationStateChange);
}
public Optional<MetadataVersion> metadataVersionChange() {
return Optional.ofNullable(metadataVersionChange);
}
@ -76,10 +68,6 @@ public final class FeaturesDelta {
}
}
public void replay(ZkMigrationStateRecord record) {
this.zkMigrationStateChange = ZkMigrationState.of(record.zkMigrationState());
}
public FeaturesImage apply() {
Map<String, Short> newFinalizedVersions =
new HashMap<>(image.finalizedVersions().size());
@ -109,13 +97,7 @@ public final class FeaturesDelta {
metadataVersion = metadataVersionChange;
}
final ZkMigrationState zkMigrationState;
if (zkMigrationStateChange == null) {
zkMigrationState = image.zkMigrationState();
} else {
zkMigrationState = zkMigrationStateChange;
}
return new FeaturesImage(newFinalizedVersions, metadataVersion, zkMigrationState);
return new FeaturesImage(newFinalizedVersions, metadataVersion);
}
@Override
@ -123,7 +105,6 @@ public final class FeaturesDelta {
return "FeaturesDelta(" +
"changes=" + changes +
", metadataVersionChange=" + metadataVersionChange +
", zkMigrationStateChange=" + zkMigrationStateChange +
')';
}
}

View File

@ -21,7 +21,6 @@ import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.image.node.FeaturesImageNode;
import org.apache.kafka.image.writer.ImageWriter;
import org.apache.kafka.image.writer.ImageWriterOptions;
import org.apache.kafka.metadata.migration.ZkMigrationState;
import org.apache.kafka.server.common.MetadataVersion;
import java.util.ArrayList;
@ -41,30 +40,23 @@ import java.util.Optional;
public final class FeaturesImage {
public static final FeaturesImage EMPTY = new FeaturesImage(
Collections.emptyMap(),
MetadataVersion.MINIMUM_KRAFT_VERSION,
ZkMigrationState.NONE
MetadataVersion.MINIMUM_KRAFT_VERSION
);
private final Map<String, Short> finalizedVersions;
private final MetadataVersion metadataVersion;
private final ZkMigrationState zkMigrationState;
public FeaturesImage(
Map<String, Short> finalizedVersions,
MetadataVersion metadataVersion,
ZkMigrationState zkMigrationState
) {
MetadataVersion metadataVersion) {
this.finalizedVersions = Collections.unmodifiableMap(finalizedVersions);
this.metadataVersion = metadataVersion;
this.zkMigrationState = zkMigrationState;
}
public boolean isEmpty() {
return finalizedVersions.isEmpty() &&
metadataVersion.equals(MetadataVersion.MINIMUM_KRAFT_VERSION) &&
zkMigrationState.equals(ZkMigrationState.NONE);
metadataVersion.equals(MetadataVersion.MINIMUM_KRAFT_VERSION);
}
public MetadataVersion metadataVersion() {
@ -75,10 +67,6 @@ public final class FeaturesImage {
return finalizedVersions;
}
public ZkMigrationState zkMigrationState() {
return zkMigrationState;
}
private Optional<Short> finalizedVersion(String feature) {
return Optional.ofNullable(finalizedVersions.get(feature));
}
@ -89,14 +77,6 @@ public final class FeaturesImage {
} else {
writeFeatureLevels(writer, options);
}
if (options.metadataVersion().isMigrationSupported()) {
writer.write(0, zkMigrationState.toRecord().message());
} else {
if (!zkMigrationState.equals(ZkMigrationState.NONE)) {
options.handleLoss("the ZK Migration state which was " + zkMigrationState);
}
}
}
private void handleFeatureLevelNotSupported(ImageWriterOptions options) {
@ -131,7 +111,7 @@ public final class FeaturesImage {
@Override
public int hashCode() {
return Objects.hash(finalizedVersions, metadataVersion, zkMigrationState);
return Objects.hash(finalizedVersions, metadataVersion);
}
@Override
@ -139,8 +119,7 @@ public final class FeaturesImage {
if (!(o instanceof FeaturesImage)) return false;
FeaturesImage other = (FeaturesImage) o;
return finalizedVersions.equals(other.finalizedVersions) &&
metadataVersion.equals(other.metadataVersion) &&
zkMigrationState.equals(other.zkMigrationState);
metadataVersion.equals(other.metadataVersion);
}
@Override

View File

@ -38,7 +38,6 @@ import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
import org.apache.kafka.common.metadata.UserScramCredentialRecord;
import org.apache.kafka.common.metadata.ZkMigrationStateRecord;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.server.common.MetadataVersion;
@ -247,7 +246,9 @@ public final class MetadataDelta {
*/
break;
case ZK_MIGRATION_STATE_RECORD:
replay((ZkMigrationStateRecord) record);
// In 4.0, although migration is no longer supported and ZK has been removed from Kafka,
// users might migrate from ZK to KRaft in version 3.x and then perform a rolling upgrade to 4.0.
// Therefore, this case needs to be retained but will be a no-op.
break;
case REGISTER_CONTROLLER_RECORD:
replay((RegisterControllerRecord) record);
@ -345,10 +346,6 @@ public final class MetadataDelta {
getOrCreateScramDelta().replay(record);
}
public void replay(ZkMigrationStateRecord record) {
getOrCreateFeaturesDelta().replay(record);
}
public void replay(RegisterControllerRecord record) {
getOrCreateClusterDelta().replay(record);
}

View File

@ -68,8 +68,6 @@ public class FeaturesImageNode implements MetadataNode {
public MetadataNode child(String name) {
if (name.equals(METADATA_VERSION)) {
return new MetadataLeafNode(image.metadataVersion().toString());
} else if (name.equals(ZK_MIGRATION_STATE)) {
return new MetadataLeafNode(image.zkMigrationState().toString());
} else if (name.startsWith(FINALIZED_PREFIX)) {
String key = name.substring(FINALIZED_PREFIX.length());
return new MetadataLeafNode(

View File

@ -1,202 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.metadata.migration;
import org.apache.kafka.raft.OffsetAndEpoch;
import java.util.Objects;
/**
* Persistent state needed to recover an ongoing migration. This data is stored in ZooKeeper under the "/migration"
* ZNode and is recovered by the active KRaft controller following an election. The absence of this data in ZK indicates
* that no migration has been started.
*/
public class ZkMigrationLeadershipState {
/**
* A Kafka-internal constant used to indicate that the znode version is unknown. See ZkVersion.UnknownVersion.
*/
public static final int UNKNOWN_ZK_VERSION = -2;
// Use -2 as sentinel for "unknown version" for ZK versions to avoid sending an actual -1 "any version"
// when doing ZK writes
public static final ZkMigrationLeadershipState EMPTY =
new ZkMigrationLeadershipState(-1, -1, -1, -1, -1, -2, -1, UNKNOWN_ZK_VERSION);
private final int kraftControllerId;
private final int kraftControllerEpoch;
private final long kraftMetadataOffset;
private final int kraftMetadataEpoch;
private final long lastUpdatedTimeMs;
private final int migrationZkVersion;
private final int zkControllerEpoch;
private final int zkControllerEpochZkVersion;
public ZkMigrationLeadershipState(int kraftControllerId, int kraftControllerEpoch,
long kraftMetadataOffset, int kraftMetadataEpoch,
long lastUpdatedTimeMs, int migrationZkVersion,
int zkControllerEpoch, int zkControllerEpochZkVersion) {
this.kraftControllerId = kraftControllerId;
this.kraftControllerEpoch = kraftControllerEpoch;
this.kraftMetadataOffset = kraftMetadataOffset;
this.kraftMetadataEpoch = kraftMetadataEpoch;
this.lastUpdatedTimeMs = lastUpdatedTimeMs;
this.migrationZkVersion = migrationZkVersion;
this.zkControllerEpoch = zkControllerEpoch;
this.zkControllerEpochZkVersion = zkControllerEpochZkVersion;
}
public ZkMigrationLeadershipState withMigrationZkVersion(int zkVersion) {
return new ZkMigrationLeadershipState(
this.kraftControllerId, this.kraftControllerEpoch, this.kraftMetadataOffset,
this.kraftMetadataEpoch, this.lastUpdatedTimeMs, zkVersion, this.zkControllerEpoch, this.zkControllerEpochZkVersion);
}
public ZkMigrationLeadershipState withZkController(int zkControllerEpoch, int zkControllerEpochZkVersion) {
return new ZkMigrationLeadershipState(
this.kraftControllerId, this.kraftControllerEpoch, this.kraftMetadataOffset,
this.kraftMetadataEpoch, this.lastUpdatedTimeMs, this.migrationZkVersion, zkControllerEpoch, zkControllerEpochZkVersion);
}
public ZkMigrationLeadershipState withUnknownZkController() {
return withZkController(EMPTY.zkControllerEpoch, EMPTY.zkControllerEpochZkVersion);
}
public ZkMigrationLeadershipState withNewKRaftController(int controllerId, int controllerEpoch) {
return new ZkMigrationLeadershipState(
controllerId, controllerEpoch, this.kraftMetadataOffset,
this.kraftMetadataEpoch, this.lastUpdatedTimeMs, this.migrationZkVersion, this.zkControllerEpoch, this.zkControllerEpochZkVersion);
}
public ZkMigrationLeadershipState withKRaftMetadataOffsetAndEpoch(long metadataOffset,
int metadataEpoch) {
return new ZkMigrationLeadershipState(
this.kraftControllerId,
this.kraftControllerEpoch,
metadataOffset,
metadataEpoch,
this.lastUpdatedTimeMs,
this.migrationZkVersion,
this.zkControllerEpoch,
this.zkControllerEpochZkVersion);
}
public int kraftControllerId() {
return kraftControllerId;
}
public int kraftControllerEpoch() {
return kraftControllerEpoch;
}
public long kraftMetadataOffset() {
return kraftMetadataOffset;
}
public int kraftMetadataEpoch() {
return kraftMetadataEpoch;
}
public long lastUpdatedTimeMs() {
return lastUpdatedTimeMs;
}
public int migrationZkVersion() {
return migrationZkVersion;
}
public int zkControllerEpoch() {
return zkControllerEpoch;
}
public int zkControllerEpochZkVersion() {
return zkControllerEpochZkVersion;
}
public boolean initialZkMigrationComplete() {
return kraftMetadataOffset > 0;
}
public OffsetAndEpoch offsetAndEpoch() {
return new OffsetAndEpoch(kraftMetadataOffset, kraftMetadataEpoch);
}
public boolean loggableChangeSinceState(ZkMigrationLeadershipState other) {
if (other == null) {
return false;
}
if (this.equals(other)) {
return false;
} else {
// Did the controller change, or did we finish the migration?
return
this.kraftControllerId != other.kraftControllerId ||
this.kraftControllerEpoch != other.kraftControllerEpoch ||
(!other.initialZkMigrationComplete() && this.initialZkMigrationComplete());
}
}
@Override
public String toString() {
return "ZkMigrationLeadershipState{" +
"kraftControllerId=" + kraftControllerId +
", kraftControllerEpoch=" + kraftControllerEpoch +
", kraftMetadataOffset=" + kraftMetadataOffset +
", kraftMetadataEpoch=" + kraftMetadataEpoch +
", lastUpdatedTimeMs=" + lastUpdatedTimeMs +
", migrationZkVersion=" + migrationZkVersion +
", controllerZkEpoch=" + zkControllerEpoch +
", controllerZkVersion=" + zkControllerEpochZkVersion +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ZkMigrationLeadershipState that = (ZkMigrationLeadershipState) o;
return kraftControllerId == that.kraftControllerId
&& kraftControllerEpoch == that.kraftControllerEpoch
&& kraftMetadataOffset == that.kraftMetadataOffset
&& kraftMetadataEpoch == that.kraftMetadataEpoch
&& lastUpdatedTimeMs == that.lastUpdatedTimeMs
&& migrationZkVersion == that.migrationZkVersion
&& zkControllerEpoch == that.zkControllerEpoch
&& zkControllerEpochZkVersion == that.zkControllerEpochZkVersion;
}
@Override
public int hashCode() {
return Objects.hash(
kraftControllerId,
kraftControllerEpoch,
kraftMetadataOffset,
kraftMetadataEpoch,
lastUpdatedTimeMs,
migrationZkVersion,
zkControllerEpoch,
zkControllerEpochZkVersion);
}
}

View File

@ -1,108 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.metadata.migration;
import org.apache.kafka.common.metadata.ZkMigrationStateRecord;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import java.util.Optional;
/**
* The cluster-wide ZooKeeper migration state.
* </p>
* An enumeration of the possible states of the ZkMigrationState field in ZkMigrationStateRecord.
* This information is persisted in the metadata log and image.
*
* @see org.apache.kafka.common.metadata.ZkMigrationStateRecord
*/
public enum ZkMigrationState {
/**
* The cluster was created in KRaft mode. A cluster that was created in ZK mode can never attain
* this state; the endpoint of migration is POST_MIGRATION, instead. This value is also used as
* the default migration state in an empty metadata log.
*/
NONE((byte) 0),
/**
* A KRaft controller has been elected with "zookeeper.metadata.migration.enable" set to "true".
* The controller is now awaiting the preconditions for starting the migration to KRaft. In this
* state, the metadata log does not yet contain the cluster's data. There is a metadata quorum,
* but it is not doing anything useful yet.
* </p>
* In Kafka 3.4, PRE_MIGRATION was written out as value 1 to the log, but no MIGRATION state
* was ever written. Since this would be an invalid log state in 3.5+, we have swapped the
* enum values for PRE_MIGRATION and MIGRATION. This allows us to handle the upgrade case
* from 3.4 without adding additional fields to the migration record.
*/
PRE_MIGRATION((byte) 2),
/**
* The ZK data has been migrated, and the KRaft controller is now writing metadata to both ZK
* and the metadata log. The controller will remain in this state until all the brokers have
* been restarted in KRaft mode.
*/
MIGRATION((byte) 1),
/**
* The migration from ZK has been fully completed. The cluster is running in KRaft mode. This state
* will persist indefinitely after the migration. In operational terms, this is the same as the NONE
* state.
*/
POST_MIGRATION((byte) 3),
/**
* The controller is a ZK controller. No migration has been performed. This state is never persisted
* and is only used by KafkaController in order to have a unified metric that indicates what kind of
* metadata state the controller is in.
*/
ZK((byte) 4);
private final byte value;
ZkMigrationState(byte value) {
this.value = value;
}
public byte value() {
return value;
}
public ApiMessageAndVersion toRecord() {
return new ApiMessageAndVersion(
new ZkMigrationStateRecord().setZkMigrationState(value()),
(short) 0
);
}
public static ZkMigrationState of(byte value) {
return optionalOf(value)
.orElseThrow(() -> new IllegalArgumentException(String.format("Value %s is not a valid Zk migration state", value)));
}
public static Optional<ZkMigrationState> optionalOf(byte value) {
for (ZkMigrationState state : ZkMigrationState.values()) {
if (state.value == value) {
return Optional.of(state);
}
}
return Optional.empty();
}
public boolean inProgress() {
return this == PRE_MIGRATION || this == MIGRATION;
}
}

View File

@ -23,7 +23,9 @@
// In 3.4, the defined values are: 0 (None), 1 (PreMigration), 2 (Migration), 3 (PostMigration).
// In 3.5, the values for PreMigration and Migration were swapped: 0 (None), 2 (PreMigration), 1 (Migration), 3 (PostMigration).
// This was done to work around the fact that we never wrote Migration or PostMigration records in 3.4
//
// In 4.0, although migration is no longer supported and ZK has been removed from Kafka,
// users might migrate from ZK to KRaft in version 3.x and then perform a rolling upgrade to 4.0.
// Therefore, this generated code needs to be retained.
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [

View File

@ -18,16 +18,13 @@
package org.apache.kafka.controller;
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
import org.apache.kafka.metadata.migration.ZkMigrationState;
import org.apache.kafka.server.common.MetadataVersion;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
/**
* This class is for testing the log message or exception produced by ActivationRecordsGenerator. For tests that
@ -51,140 +48,35 @@ public class ActivationRecordsGeneratorTest {
result = ActivationRecordsGenerator.recordsForEmptyLog(
logMsg -> assertEquals("Performing controller activation. The metadata log appears to be empty. " +
"Appending 1 bootstrap record(s) at metadata.version 3.4-IV0 from bootstrap " +
"source 'test'. Setting the ZK migration state to NONE since this is a de-novo KRaft cluster.", logMsg),
"source 'test'.", logMsg),
-1L,
BootstrapMetadata.fromVersion(MetadataVersion.IBP_3_4_IV0, "test"),
MetadataVersion.IBP_3_4_IV0
);
assertTrue(result.isAtomic());
assertEquals(2, result.records().size());
assertEquals(1, result.records().size());
result = ActivationRecordsGenerator.recordsForEmptyLog(
logMsg -> assertEquals("Performing controller activation. The metadata log appears to be empty. " +
"Appending 1 bootstrap record(s) in metadata transaction at metadata.version 3.6-IV1 from bootstrap " +
"source 'test'. Setting the ZK migration state to NONE since this is a de-novo KRaft cluster.", logMsg),
"source 'test'.", logMsg),
-1L,
BootstrapMetadata.fromVersion(MetadataVersion.IBP_3_6_IV1, "test"),
MetadataVersion.IBP_3_6_IV1
);
assertFalse(result.isAtomic());
assertEquals(4, result.records().size());
assertEquals(3, result.records().size());
result = ActivationRecordsGenerator.recordsForEmptyLog(
logMsg -> assertEquals("Performing controller activation. Aborting partial bootstrap records " +
"transaction at offset 0. Re-appending 1 bootstrap record(s) in new metadata transaction at " +
"metadata.version 3.6-IV1 from bootstrap source 'test'. Setting the ZK migration state to NONE " +
"since this is a de-novo KRaft cluster.", logMsg),
"metadata.version 3.6-IV1 from bootstrap source 'test'.", logMsg),
0L,
BootstrapMetadata.fromVersion(MetadataVersion.IBP_3_6_IV1, "test"),
MetadataVersion.IBP_3_6_IV1
);
assertFalse(result.isAtomic());
assertEquals(5, result.records().size());
}
@Test
public void testActivationMessageForNonEmptyLogNoMigrations() {
ControllerResult<Void> result;
result = ActivationRecordsGenerator.recordsForNonEmptyLog(
logMsg -> assertEquals("Performing controller activation. No metadata.version feature level " +
"record was found in the log. Treating the log as version 3.0-IV1.", logMsg),
-1L,
ZkMigrationState.NONE,
MetadataVersion.MINIMUM_KRAFT_VERSION
);
assertTrue(result.isAtomic());
assertEquals(0, result.records().size());
result = ActivationRecordsGenerator.recordsForNonEmptyLog(
logMsg -> assertEquals("Performing controller activation.", logMsg),
-1L,
ZkMigrationState.NONE,
MetadataVersion.IBP_3_3_IV0
);
assertTrue(result.isAtomic());
assertEquals(0, result.records().size());
result = ActivationRecordsGenerator.recordsForNonEmptyLog(
logMsg -> assertEquals("Performing controller activation. Loaded ZK migration state of NONE. "
+ "This is expected because this is a de-novo KRaft cluster.", logMsg),
-1L,
ZkMigrationState.NONE,
MetadataVersion.IBP_3_4_IV0
);
assertTrue(result.isAtomic());
assertEquals(0, result.records().size());
result = ActivationRecordsGenerator.recordsForNonEmptyLog(
logMsg -> assertEquals("Performing controller activation. Aborting in-progress metadata " +
"transaction at offset 42. Loaded ZK migration state of NONE. " +
"This is expected because this is a de-novo KRaft cluster.", logMsg),
42L,
ZkMigrationState.NONE,
MetadataVersion.IBP_3_6_IV1
);
assertTrue(result.isAtomic());
assertEquals(1, result.records().size());
assertEquals(
"Detected in-progress transaction at offset 42, but the metadata.version 3.6-IV0 does not support " +
"transactions. Cannot continue.",
assertThrows(RuntimeException.class, () ->
ActivationRecordsGenerator.recordsForNonEmptyLog(
logMsg -> fail(),
42L,
ZkMigrationState.NONE,
MetadataVersion.IBP_3_6_IV0
)).getMessage()
);
}
@Test
public void testActivationMessageForNonEmptyLogWithMigrations() {
assertEquals(
"Should not have ZkMigrationState.MIGRATION on a cluster running metadata version 3.3-IV0.",
assertThrows(RuntimeException.class, () ->
ActivationRecordsGenerator.recordsForNonEmptyLog(
logMsg -> fail(),
-1L,
ZkMigrationState.MIGRATION,
MetadataVersion.IBP_3_3_IV0
)).getMessage()
);
assertEquals(
"Cannot load ZkMigrationState.MIGRATION because ZK migration is no longer supported.",
assertThrows(RuntimeException.class, () ->
ActivationRecordsGenerator.recordsForNonEmptyLog(
logMsg -> fail(),
-1L,
ZkMigrationState.MIGRATION,
MetadataVersion.IBP_3_9_IV0
)
).getMessage()
);
ControllerResult<Void> result;
result = ActivationRecordsGenerator.recordsForNonEmptyLog(
logMsg -> assertEquals("Performing controller activation. Loaded ZK migration state of " +
"POST_MIGRATION.", logMsg),
-1L,
ZkMigrationState.POST_MIGRATION,
MetadataVersion.IBP_3_4_IV0
);
assertTrue(result.isAtomic());
assertEquals(0, result.records().size());
result = ActivationRecordsGenerator.recordsForNonEmptyLog(
logMsg -> assertEquals("Performing controller activation. Aborting in-progress metadata " +
"transaction at offset 42. Loaded ZK migration state of POST_MIGRATION.", logMsg),
42L,
ZkMigrationState.POST_MIGRATION,
MetadataVersion.IBP_3_6_IV1
);
assertTrue(result.isAtomic());
assertEquals(1, result.records().size());
assertEquals(4, result.records().size());
}
}

View File

@ -25,8 +25,6 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.metadata.FinalizedControllerFeatures;
import org.apache.kafka.metadata.RecordTestUtils;
import org.apache.kafka.metadata.VersionRange;
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
import org.apache.kafka.metadata.migration.ZkMigrationState;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.Feature;
import org.apache.kafka.server.common.MetadataVersion;
@ -408,40 +406,4 @@ public class FeatureControlManagerTest {
RecordTestUtils.replayAll(manager, result2.records());
assertEquals(Optional.empty(), manager.finalizedFeatures(Long.MAX_VALUE).get(Feature.TEST_VERSION.featureName()));
}
@Test
public void testNoMetadataVersionChangeDuringMigration() {
FeatureControlManager manager = new FeatureControlManager.Builder().
setQuorumFeatures(features(MetadataVersion.FEATURE_NAME,
MetadataVersion.IBP_3_0_IV0.featureLevel(), MetadataVersion.IBP_3_5_IV1.featureLevel())).
setMetadataVersion(MetadataVersion.IBP_3_4_IV0).
build();
BootstrapMetadata bootstrapMetadata = BootstrapMetadata.fromVersion(MetadataVersion.IBP_3_4_IV0, "FeatureControlManagerTest");
RecordTestUtils.replayAll(manager, bootstrapMetadata.records());
RecordTestUtils.replayOne(manager, ZkMigrationState.PRE_MIGRATION.toRecord());
assertEquals(ControllerResult.of(Collections.emptyList(), new ApiError(Errors.INVALID_UPDATE_VERSION,
"Invalid metadata.version 10. Unable to modify metadata.version while a ZK migration is in progress.")),
manager.updateFeatures(
singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_5_IV1.featureLevel()),
singletonMap(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.UPGRADE),
true));
assertEquals(ControllerResult.of(Collections.emptyList(), new ApiError(Errors.INVALID_UPDATE_VERSION,
"Invalid metadata.version 4. Unable to modify metadata.version while a ZK migration is in progress.")),
manager.updateFeatures(
singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_3_IV0.featureLevel()),
singletonMap(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.SAFE_DOWNGRADE),
true));
// Complete the migration
RecordTestUtils.replayOne(manager, ZkMigrationState.POST_MIGRATION.toRecord());
ControllerResult<ApiError> result = manager.updateFeatures(
singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_5_IV1.featureLevel()),
singletonMap(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.UPGRADE),
false);
assertEquals(ApiError.NONE, result.response());
RecordTestUtils.replayAll(manager, result.records());
assertEquals(MetadataVersion.IBP_3_5_IV1, manager.metadataVersion());
}
}

View File

@ -60,7 +60,6 @@ import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpointColle
import org.apache.kafka.common.metadata.RegisterControllerRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
import org.apache.kafka.common.metadata.ZkMigrationStateRecord;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AlterPartitionRequest;
@ -99,7 +98,6 @@ import org.apache.kafka.metadata.RecordTestUtils.ImageDeltaPair;
import org.apache.kafka.metadata.RecordTestUtils.TestThroughAllIntermediateImagesLeadingToFinalImageHelper;
import org.apache.kafka.metadata.authorizer.StandardAuthorizer;
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
import org.apache.kafka.metadata.migration.ZkMigrationState;
import org.apache.kafka.metadata.util.BatchFileWriter;
import org.apache.kafka.metalog.LocalLogManager;
import org.apache.kafka.metalog.LocalLogManagerTestEnv;
@ -814,7 +812,7 @@ public class QuorumControllerTest {
BrokerRegistrationReply reply = active.registerBroker(
ANONYMOUS_CONTEXT,
request).get();
assertTrue(reply.epoch() >= 5, "Unexpected broker epoch " + reply.epoch());
assertTrue(reply.epoch() >= 4, "Unexpected broker epoch " + reply.epoch());
}
}
}
@ -841,7 +839,7 @@ public class QuorumControllerTest {
Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, EligibleLeaderReplicasVersion.ELRV_1.featureLevel()))).
setLogDirs(Collections.singletonList(Uuid.fromString("vBpaRsZVSaGsQT53wtYGtg"))).
setListeners(listeners));
assertEquals(5L, reply.get().epoch());
assertEquals(4L, reply.get().epoch());
CreateTopicsRequestData createTopicsRequestData =
new CreateTopicsRequestData().setTopics(
new CreatableTopicCollection(Collections.singleton(
@ -857,7 +855,7 @@ public class QuorumControllerTest {
get().topics().find("foo").errorMessage());
assertEquals(new BrokerHeartbeatReply(true, false, false, false),
active.processBrokerHeartbeat(ANONYMOUS_CONTEXT, new BrokerHeartbeatRequestData().
setWantFence(false).setBrokerEpoch(5L).setBrokerId(0).
setWantFence(false).setBrokerEpoch(4L).setBrokerId(0).
setCurrentMetadataOffset(100000L)).get());
assertEquals(Errors.NONE.code(), active.createTopics(ANONYMOUS_CONTEXT,
createTopicsRequestData, Collections.singleton("foo")).
@ -983,8 +981,6 @@ public class QuorumControllerTest {
new ApiMessageAndVersion(new FeatureLevelRecord().
setName(MetadataVersion.FEATURE_NAME).
setFeatureLevel(MetadataVersion.IBP_3_7_IV0.featureLevel()), (short) 0),
new ApiMessageAndVersion(new ZkMigrationStateRecord().
setZkMigrationState((byte) 0), (short) 0),
new ApiMessageAndVersion(new EndTransactionRecord(), (short) 0),
new ApiMessageAndVersion(new RegisterControllerRecord().
setControllerId(0).
@ -1477,10 +1473,7 @@ public class QuorumControllerTest {
appender)).getMessage());
}
FeatureControlManager getActivationRecords(
MetadataVersion metadataVersion,
Optional<ZkMigrationState> stateInLog
) {
FeatureControlManager getActivationRecords(MetadataVersion metadataVersion) {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
FeatureControlManager featureControlManager = new FeatureControlManager.Builder()
.setSnapshotRegistry(snapshotRegistry)
@ -1489,10 +1482,9 @@ public class QuorumControllerTest {
ControllerResult<Void> result = ActivationRecordsGenerator.generate(
msg -> { },
stateInLog.isEmpty(),
true,
-1L,
BootstrapMetadata.fromVersion(metadataVersion, "test"),
stateInLog.orElse(ZkMigrationState.NONE),
metadataVersion);
RecordTestUtils.replayAll(featureControlManager, result.records());
return featureControlManager;
@ -1502,34 +1494,23 @@ public class QuorumControllerTest {
public void testActivationRecords33() {
FeatureControlManager featureControl;
featureControl = getActivationRecords(MetadataVersion.IBP_3_3_IV0, Optional.empty());
featureControl = getActivationRecords(MetadataVersion.IBP_3_3_IV0);
assertEquals(MetadataVersion.IBP_3_3_IV0, featureControl.metadataVersion());
assertEquals(ZkMigrationState.NONE, featureControl.zkMigrationState());
featureControl = getActivationRecords(MetadataVersion.IBP_3_3_IV0, Optional.of(ZkMigrationState.NONE));
assertEquals(MetadataVersion.IBP_3_3_IV0, featureControl.metadataVersion());
assertEquals(ZkMigrationState.NONE, featureControl.zkMigrationState());
}
@Test
public void testActivationRecords34() {
FeatureControlManager featureControl;
featureControl = getActivationRecords(MetadataVersion.IBP_3_4_IV0, Optional.empty());
featureControl = getActivationRecords(MetadataVersion.IBP_3_4_IV0);
assertEquals(MetadataVersion.IBP_3_4_IV0, featureControl.metadataVersion());
assertEquals(ZkMigrationState.NONE, featureControl.zkMigrationState());
featureControl = getActivationRecords(MetadataVersion.IBP_3_4_IV0, Optional.of(ZkMigrationState.NONE));
assertEquals(MetadataVersion.IBP_3_4_IV0, featureControl.metadataVersion());
assertEquals(ZkMigrationState.NONE, featureControl.zkMigrationState());
}
@Test
public void testActivationRecordsNonEmptyLog() {
FeatureControlManager featureControl = getActivationRecords(
MetadataVersion.IBP_3_9_IV0, Optional.empty());
MetadataVersion.IBP_3_9_IV0);
assertEquals(MetadataVersion.IBP_3_9_IV0, featureControl.metadataVersion());
assertEquals(ZkMigrationState.NONE, featureControl.zkMigrationState());
}
@Test
@ -1539,7 +1520,6 @@ public class QuorumControllerTest {
true,
0L,
BootstrapMetadata.fromVersion(MetadataVersion.IBP_3_6_IV1, "test"),
ZkMigrationState.NONE,
MetadataVersion.IBP_3_6_IV1);
assertFalse(result.isAtomic());
assertTrue(RecordTestUtils.recordAtIndexAs(
@ -1588,7 +1568,6 @@ public class QuorumControllerTest {
false,
offsetControlManager.transactionStartOffset(),
BootstrapMetadata.fromVersion(MetadataVersion.IBP_3_6_IV1, "test"),
ZkMigrationState.NONE,
MetadataVersion.IBP_3_6_IV1);
assertTrue(result.isAtomic());
@ -1612,7 +1591,6 @@ public class QuorumControllerTest {
false,
offsetControlManager.transactionStartOffset(),
BootstrapMetadata.fromVersion(MetadataVersion.IBP_3_6_IV0, "test"),
ZkMigrationState.NONE,
MetadataVersion.IBP_3_6_IV0)
);
}

View File

@ -17,10 +17,6 @@
package org.apache.kafka.controller;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.metadata.ControllerRegistration;
import org.apache.kafka.metadata.VersionRange;
import org.apache.kafka.server.common.Feature;
import org.apache.kafka.server.common.MetadataVersion;
@ -124,56 +120,4 @@ public class QuorumFeaturesTest {
assertTrue(QUORUM_FEATURES.isControllerId(2));
assertFalse(QUORUM_FEATURES.isControllerId(3));
}
@Test
public void testZkMigrationNotReadyIfMetadataVersionTooLow() {
assertEquals(Optional.of("The metadata.version too low at 3.0-IV1"),
QUORUM_FEATURES.reasonAllControllersZkMigrationNotReady(
MetadataVersion.IBP_3_0_IV1, Collections.emptyMap()));
}
@Test
public void testZkMigrationReadyIfControllerRegistrationNotSupported() {
assertEquals(Optional.empty(),
QUORUM_FEATURES.reasonAllControllersZkMigrationNotReady(
MetadataVersion.IBP_3_4_IV0, Collections.emptyMap()));
}
@Test
public void testZkMigrationNotReadyIfNotAllControllersRegistered() {
assertEquals(Optional.of("No registration found for controller 0"),
QUORUM_FEATURES.reasonAllControllersZkMigrationNotReady(
MetadataVersion.IBP_3_7_IV0, Collections.emptyMap()));
}
@Test
public void testZkMigrationNotReadyIfControllerNotReady() {
assertEquals(Optional.of("Controller 0 has not enabled zookeeper.metadata.migration.enable"),
QUORUM_FEATURES.reasonAllControllersZkMigrationNotReady(
MetadataVersion.IBP_3_7_IV0, Collections.singletonMap(0,
new ControllerRegistration.Builder().
setId(0).
setZkMigrationReady(false).
setIncarnationId(Uuid.fromString("kCBJaDGNQk6x3y5xbtQOpg")).
setListeners(Collections.singletonMap("CONTROLLER",
new Endpoint("CONTROLLER", SecurityProtocol.PLAINTEXT, "localhost", 9093))).
build())));
}
@Test
public void testZkMigrationReadyIfAllControllersReady() {
Map<Integer, ControllerRegistration> controllers = new HashMap<>();
QUORUM_FEATURES.quorumNodeIds().forEach(id ->
controllers.put(id,
new ControllerRegistration.Builder().
setId(id).
setZkMigrationReady(true).
setIncarnationId(Uuid.fromString("kCBJaDGNQk6x3y5xbtQOpg")).
setListeners(Collections.singletonMap("CONTROLLER",
new Endpoint("CONTROLLER", SecurityProtocol.PLAINTEXT, "localhost", 9093))).
build())
);
assertEquals(Optional.empty(), QUORUM_FEATURES.reasonAllControllersZkMigrationNotReady(
MetadataVersion.IBP_3_7_IV0, controllers));
}
}

View File

@ -21,7 +21,6 @@ import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.image.writer.ImageWriterOptions;
import org.apache.kafka.image.writer.RecordListWriter;
import org.apache.kafka.metadata.RecordTestUtils;
import org.apache.kafka.metadata.migration.ZkMigrationState;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
@ -54,7 +53,7 @@ public class FeaturesImageTest {
Map<String, Short> map1 = new HashMap<>();
map1.put("foo", (short) 2);
map1.put("bar", (short) 1);
IMAGE1 = new FeaturesImage(map1, MetadataVersion.latestTesting(), ZkMigrationState.NONE);
IMAGE1 = new FeaturesImage(map1, MetadataVersion.latestTesting());
DELTA1_RECORDS = new ArrayList<>();
// change feature level
@ -76,7 +75,7 @@ public class FeaturesImageTest {
Map<String, Short> map2 = new HashMap<>();
map2.put("foo", (short) 3);
map2.put("baz", (short) 8);
IMAGE2 = new FeaturesImage(map2, MetadataVersion.latestTesting(), ZkMigrationState.NONE);
IMAGE2 = new FeaturesImage(map2, MetadataVersion.latestTesting());
DELTA2_RECORDS = new ArrayList<>();
// remove all features
@ -95,7 +94,7 @@ public class FeaturesImageTest {
RecordTestUtils.replayAll(DELTA2, DELTA2_RECORDS);
Map<String, Short> map3 = Collections.singletonMap("bar", (short) 1);
IMAGE3 = new FeaturesImage(map3, MetadataVersion.latestTesting(), ZkMigrationState.NONE);
IMAGE3 = new FeaturesImage(map3, MetadataVersion.latestTesting());
}
@Test
@ -162,10 +161,9 @@ public class FeaturesImageTest {
public void testEmpty() {
assertTrue(FeaturesImage.EMPTY.isEmpty());
assertFalse(new FeaturesImage(Collections.singletonMap("foo", (short) 1),
FeaturesImage.EMPTY.metadataVersion(), FeaturesImage.EMPTY.zkMigrationState()).isEmpty());
FeaturesImage.EMPTY.metadataVersion()).isEmpty());
assertFalse(new FeaturesImage(FeaturesImage.EMPTY.finalizedVersions(),
MetadataVersion.IBP_3_3_IV0, FeaturesImage.EMPTY.zkMigrationState()).isEmpty());
assertFalse(new FeaturesImage(FeaturesImage.EMPTY.finalizedVersions(),
FeaturesImage.EMPTY.metadataVersion(), ZkMigrationState.MIGRATION).isEmpty());
MetadataVersion.IBP_3_3_IV0).isEmpty());
assertTrue(new FeaturesImage(FeaturesImage.EMPTY.finalizedVersions(), FeaturesImage.EMPTY.metadataVersion()).isEmpty());
}
}

View File

@ -22,7 +22,6 @@ import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.metadata.ZkMigrationStateRecord;
import org.apache.kafka.image.writer.ImageWriterOptions;
import org.apache.kafka.image.writer.RecordListWriter;
import org.apache.kafka.image.writer.UnwritableMetadataException;
@ -183,7 +182,6 @@ public class ImageDowngradeTest {
(short) 2)),
Arrays.asList(
metadataVersionRecord(outputMetadataVersion),
new ApiMessageAndVersion(new ZkMigrationStateRecord(), (short) 0),
TEST_RECORDS.get(0),
new ApiMessageAndVersion(
testPartitionRecord.duplicate().setDirectories(Collections.emptyList()),

View File

@ -148,7 +148,7 @@ public class SnapshotEmitterTest {
assertEquals(0L, emitter.metrics().latestSnapshotGeneratedBytes());
emitter.maybeEmit(MetadataImageTest.IMAGE1);
assertEquals(0L, emitter.metrics().latestSnapshotGeneratedAgeMs());
assertEquals(1600L, emitter.metrics().latestSnapshotGeneratedBytes());
assertEquals(1500L, emitter.metrics().latestSnapshotGeneratedBytes());
FakeSnapshotWriter writer = mockRaftClient.writers.get(
MetadataImageTest.IMAGE1.provenance().snapshotId());
assertNotNull(writer);

View File

@ -28,7 +28,6 @@ import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
import static org.apache.kafka.common.config.ConfigDef.Type.INT;
import static org.apache.kafka.common.config.ConfigDef.Type.LIST;
import static org.apache.kafka.common.config.ConfigDef.Type.LONG;
@ -115,14 +114,6 @@ public class KRaftConfigs {
public static final String SERVER_MAX_STARTUP_TIME_MS_DOC = "The maximum number of milliseconds we will wait for the server to come up. " +
"By default there is no limit. This should be used for testing only.";
/** ZK to KRaft Migration configs */
public static final String MIGRATION_ENABLED_CONFIG = "zookeeper.metadata.migration.enable";
public static final String MIGRATION_ENABLED_DOC = "Enable ZK to KRaft migration";
public static final String MIGRATION_METADATA_MIN_BATCH_SIZE_CONFIG = "zookeeper.metadata.migration.min.batch.size";
public static final int MIGRATION_METADATA_MIN_BATCH_SIZE_DEFAULT = 200;
public static final String MIGRATION_METADATA_MIN_BATCH_SIZE_DOC = "Soft minimum batch size to use when migrating metadata from ZooKeeper to KRaft";
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_CONFIG, LONG, METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES, atLeast(1), HIGH, METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_DOC)
.define(METADATA_SNAPSHOT_MAX_INTERVAL_MS_CONFIG, LONG, METADATA_SNAPSHOT_MAX_INTERVAL_MS_DEFAULT, atLeast(0), HIGH, METADATA_SNAPSHOT_MAX_INTERVAL_MS_DOC)
@ -140,8 +131,5 @@ public class KRaftConfigs {
.define(METADATA_MAX_RETENTION_BYTES_CONFIG, LONG, METADATA_MAX_RETENTION_BYTES_DEFAULT, null, HIGH, METADATA_MAX_RETENTION_BYTES_DOC)
.define(METADATA_MAX_RETENTION_MILLIS_CONFIG, LONG, LogConfig.DEFAULT_RETENTION_MS, null, HIGH, METADATA_MAX_RETENTION_MILLIS_DOC)
.define(METADATA_MAX_IDLE_INTERVAL_MS_CONFIG, INT, METADATA_MAX_IDLE_INTERVAL_MS_DEFAULT, atLeast(0), LOW, METADATA_MAX_IDLE_INTERVAL_MS_DOC)
.defineInternal(SERVER_MAX_STARTUP_TIME_MS_CONFIG, LONG, SERVER_MAX_STARTUP_TIME_MS_DEFAULT, atLeast(0), MEDIUM, SERVER_MAX_STARTUP_TIME_MS_DOC)
.define(MIGRATION_ENABLED_CONFIG, BOOLEAN, false, HIGH, MIGRATION_ENABLED_DOC)
.defineInternal(MIGRATION_METADATA_MIN_BATCH_SIZE_CONFIG, INT, MIGRATION_METADATA_MIN_BATCH_SIZE_DEFAULT, atLeast(1),
MEDIUM, MIGRATION_METADATA_MIN_BATCH_SIZE_DOC);
.defineInternal(SERVER_MAX_STARTUP_TIME_MS_CONFIG, LONG, SERVER_MAX_STARTUP_TIME_MS_DEFAULT, atLeast(0), MEDIUM, SERVER_MAX_STARTUP_TIME_MS_DOC);
}