From 772aa241b2f1c3e8c8b7ad73ddc910c8272f2d1d Mon Sep 17 00:00:00 2001
From: TengYao Chi
Date: Fri, 13 Dec 2024 01:34:29 +0800
Subject: [PATCH] KAFKA-18136: Remove zk migration from code base (#18016)
Reviewers: Mickael Maison , Chia-Ping Tsai
---
core/src/main/scala/kafka/Kafka.scala | 9 +-
.../kafka/controller/KafkaController.scala | 6 +-
.../main/scala/kafka/raft/RaftManager.scala | 2 -
.../main/scala/kafka/server/KafkaApis.scala | 6 +-
.../main/scala/kafka/server/KafkaConfig.scala | 43 +--
.../main/scala/kafka/server/KafkaServer.scala | 146 +---------
.../scala/kafka/server/MetadataCache.scala | 5 +-
.../scala/kafka/server/ReplicaManager.scala | 24 +-
.../server/metadata/ZkMetadataCache.scala | 6 +-
.../main/scala/kafka/zk/KafkaZkClient.scala | 250 +-----------------
core/src/main/scala/kafka/zk/ZkData.scala | 39 ---
.../kafka/server/QuorumTestHarness.scala | 2 +-
.../integration/KafkaServerTestHarness.scala | 8 +-
.../unit/kafka/metrics/MetricsTest.scala | 8 +-
.../BrokerRegistrationRequestTest.scala | 33 +--
.../unit/kafka/server/KafkaConfigTest.scala | 39 ---
.../unit/kafka/server/MetadataCacheTest.scala | 69 -----
.../kafka/server/ReplicaManagerTest.scala | 75 +-----
.../scala/unit/kafka/utils/TestUtils.scala | 7 +-
.../ReplicaFetcherThreadBenchmark.java | 2 +-
.../ActivationRecordsGenerator.java | 32 +--
.../controller/FeatureControlManager.java | 32 ---
.../kafka/controller/QuorumController.java | 6 +-
.../kafka/controller/QuorumFeatures.java | 22 --
.../ControllerMetadataMetricsPublisher.java | 4 -
.../org/apache/kafka/image/FeaturesDelta.java | 21 +-
.../org/apache/kafka/image/FeaturesImage.java | 31 +--
.../org/apache/kafka/image/MetadataDelta.java | 9 +-
.../kafka/image/node/FeaturesImageNode.java | 2 -
.../migration/ZkMigrationLeadershipState.java | 202 --------------
.../metadata/migration/ZkMigrationState.java | 108 --------
.../common/metadata/ZkMigrationRecord.json | 4 +-
.../ActivationRecordsGeneratorTest.java | 120 +--------
.../controller/FeatureControlManagerTest.java | 38 ---
.../controller/QuorumControllerTest.java | 38 +--
.../kafka/controller/QuorumFeaturesTest.java | 56 ----
.../apache/kafka/image/FeaturesImageTest.java | 14 +-
.../kafka/image/ImageDowngradeTest.java | 2 -
.../image/publisher/SnapshotEmitterTest.java | 2 +-
.../kafka/server/config/KRaftConfigs.java | 14 +-
40 files changed, 71 insertions(+), 1465 deletions(-)
delete mode 100644 metadata/src/main/java/org/apache/kafka/metadata/migration/ZkMigrationLeadershipState.java
delete mode 100644 metadata/src/main/java/org/apache/kafka/metadata/migration/ZkMigrationState.java
diff --git a/core/src/main/scala/kafka/Kafka.scala b/core/src/main/scala/kafka/Kafka.scala
index f32f23d3475..1c783ca7dc0 100755
--- a/core/src/main/scala/kafka/Kafka.scala
+++ b/core/src/main/scala/kafka/Kafka.scala
@@ -61,20 +61,13 @@ object Kafka extends Logging {
props
}
- // For Zk mode, the API forwarding is currently enabled only under migration flag. We can
- // directly do a static IBP check to see API forwarding is enabled here because IBP check is
- // static in Zk mode.
- private def enableApiForwarding(config: KafkaConfig) =
- config.migrationEnabled && config.interBrokerProtocolVersion.isApiForwardingEnabled
-
private def buildServer(props: Properties): Server = {
val config = KafkaConfig.fromProps(props, doLog = false)
if (config.requiresZookeeper) {
new KafkaServer(
config,
Time.SYSTEM,
- threadNamePrefix = None,
- enableForwarding = enableApiForwarding(config)
+ threadNamePrefix = None
)
} else {
new KafkaRaftServer(
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 5d886a30401..e794bceeca7 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -21,7 +21,7 @@ import com.yammer.metrics.core.{Meter, Timer}
import java.util.concurrent.TimeUnit
import kafka.common._
import kafka.cluster.Broker
-import kafka.controller.KafkaController.{ActiveBrokerCountMetricName, ActiveControllerCountMetricName, AlterReassignmentsCallback, ControllerStateMetricName, ElectLeadersCallback, FencedBrokerCountMetricName, GlobalPartitionCountMetricName, GlobalTopicCountMetricName, ListReassignmentsCallback, OfflinePartitionsCountMetricName, PreferredReplicaImbalanceCountMetricName, ReplicasIneligibleToDeleteCountMetricName, ReplicasToDeleteCountMetricName, TopicsIneligibleToDeleteCountMetricName, TopicsToDeleteCountMetricName, UpdateFeaturesCallback, ZkMigrationStateMetricName}
+import kafka.controller.KafkaController.{ActiveBrokerCountMetricName, ActiveControllerCountMetricName, AlterReassignmentsCallback, ControllerStateMetricName, ElectLeadersCallback, FencedBrokerCountMetricName, GlobalPartitionCountMetricName, GlobalTopicCountMetricName, ListReassignmentsCallback, OfflinePartitionsCountMetricName, PreferredReplicaImbalanceCountMetricName, ReplicasIneligibleToDeleteCountMetricName, ReplicasToDeleteCountMetricName, TopicsIneligibleToDeleteCountMetricName, TopicsToDeleteCountMetricName, UpdateFeaturesCallback}
import kafka.coordinator.transaction.ZkProducerIdManager
import kafka.server._
import kafka.server.metadata.ZkFinalizedFeatureCache
@@ -42,7 +42,6 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{AbstractControlRequest, ApiError, LeaderAndIsrResponse, UpdateFeaturesRequest, UpdateMetadataResponse}
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState}
-import org.apache.kafka.metadata.migration.ZkMigrationState
import org.apache.kafka.server.BrokerFeatures
import org.apache.kafka.server.common.{AdminOperationException, ProducerIdsBlock}
import org.apache.kafka.server.metrics.KafkaMetricsGroup
@@ -81,11 +80,9 @@ object KafkaController extends Logging {
private val ReplicasIneligibleToDeleteCountMetricName = "ReplicasIneligibleToDeleteCount"
private val ActiveBrokerCountMetricName = "ActiveBrokerCount"
private val FencedBrokerCountMetricName = "FencedBrokerCount"
- private val ZkMigrationStateMetricName = "ZkMigrationState"
// package private for testing
private[controller] val MetricNames = Set(
- ZkMigrationStateMetricName,
ActiveControllerCountMetricName,
OfflinePartitionsCountMetricName,
PreferredReplicaImbalanceCountMetricName,
@@ -174,7 +171,6 @@ class KafkaController(val config: KafkaConfig,
/* single-thread scheduler to clean expired tokens */
private val tokenCleanScheduler = new KafkaScheduler(1, true, "delegation-token-cleaner")
- metricsGroup.newGauge(ZkMigrationStateMetricName, () => ZkMigrationState.ZK.value().intValue())
metricsGroup.newGauge(ActiveControllerCountMetricName, () => if (isActive) 1 else 0)
metricsGroup.newGauge(OfflinePartitionsCountMetricName, () => offlinePartitionCount)
metricsGroup.newGauge(PreferredReplicaImbalanceCountMetricName, () => preferredReplicaImbalanceCount)
diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala b/core/src/main/scala/kafka/raft/RaftManager.scala
index 4fe5020a974..79d2d2d4f22 100644
--- a/core/src/main/scala/kafka/raft/RaftManager.scala
+++ b/core/src/main/scala/kafka/raft/RaftManager.scala
@@ -98,8 +98,6 @@ object KafkaRaftManager {
// These constraints are enforced in KafkaServer, but repeating them here to guard against future callers
if (config.processRoles.nonEmpty) {
throw new RuntimeException("Not deleting metadata log dir since this node is in KRaft mode.")
- } else if (!config.migrationEnabled) {
- throw new RuntimeException("Not deleting metadata log dir since migrations are not enabled.")
} else {
val metadataDir = new File(config.metadataLogDir)
val logDirName = UnifiedLog.logDirName(Topic.CLUSTER_METADATA_TOPIC_PARTITION)
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index ba5eef40e5c..566b04dfd25 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -3160,7 +3160,7 @@ class KafkaApis(val requestChannel: RequestChannel,
new KafkaPrincipal(entry.principalType, entry.principalName))
// DelegationToken changes only need to be executed on the controller during migration
- if (config.migrationEnabled && (!zkSupport.controller.isActive)) {
+ if (!zkSupport.controller.isActive) {
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
CreateDelegationTokenResponse.prepareResponse(request.context.requestVersion, requestThrottleMs,
Errors.NOT_CONTROLLER, owner, requester))
@@ -3204,7 +3204,7 @@ class KafkaApis(val requestChannel: RequestChannel,
.setExpiryTimestampMs(expiryTimestamp)))
}
// DelegationToken changes only need to be executed on the controller during migration
- if (config.migrationEnabled && (!zkSupport.controller.isActive)) {
+ if (!zkSupport.controller.isActive) {
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
new RenewDelegationTokenResponse(
new RenewDelegationTokenResponseData()
@@ -3250,7 +3250,7 @@ class KafkaApis(val requestChannel: RequestChannel,
.setExpiryTimestampMs(expiryTimestamp)))
}
// DelegationToken changes only need to be executed on the controller during migration
- if (config.migrationEnabled && (!zkSupport.controller.isActive)) {
+ if (!zkSupport.controller.isActive) {
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
new ExpireDelegationTokenResponse(
new ExpireDelegationTokenResponseData()
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 9502e81f3e4..92c3b524957 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -337,9 +337,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
def requiresZookeeper: Boolean = processRoles.isEmpty
def usesSelfManagedQuorum: Boolean = processRoles.nonEmpty
- val migrationEnabled: Boolean = getBoolean(KRaftConfigs.MIGRATION_ENABLED_CONFIG)
- val migrationMetadataMinBatchSize: Int = getInt(KRaftConfigs.MIGRATION_METADATA_MIN_BATCH_SIZE_CONFIG)
-
private def parseProcessRoles(): Set[ProcessRole] = {
val roles = getList(KRaftConfigs.PROCESS_ROLES_CONFIG).asScala.map {
case "broker" => ProcessRole.BrokerRole
@@ -804,9 +801,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
throw new ConfigException(s"Missing required configuration `${ZkConfigs.ZK_CONNECT_CONFIG}` which has no default value.")
}
if (brokerIdGenerationEnable) {
- if (migrationEnabled) {
- require(brokerId >= 0, "broker.id generation is incompatible with ZooKeeper migration. Please stop using it before enabling migration (set broker.id to a value greater or equal to 0).")
- }
require(brokerId >= -1 && brokerId <= maxReservedBrokerId, "broker.id must be greater than or equal to -1 and not greater than reserved.broker.max.id")
} else {
require(brokerId >= 0, "broker.id must be greater than or equal to 0")
@@ -817,11 +811,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
throw new ConfigException(s"Missing configuration `${KRaftConfigs.NODE_ID_CONFIG}` which is required " +
s"when `process.roles` is defined (i.e. when running in KRaft mode).")
}
- if (migrationEnabled) {
- if (zkConnect == null) {
- throw new ConfigException(s"If using `${KRaftConfigs.MIGRATION_ENABLED_CONFIG}` in KRaft mode, `${ZkConfigs.ZK_CONNECT_CONFIG}` must also be set.")
- }
- }
}
require(logRollTimeMillis >= 1, "log.roll.ms must be greater than or equal to 1")
require(logRollTimeJitterMillis >= 0, "log.roll.jitter.ms must be greater than or equal to 0")
@@ -846,15 +835,7 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
)
}
}
- def validateQuorumVotersAndQuorumBootstrapServerForMigration(): Unit = {
- if (voterIds.isEmpty && quorumConfig.bootstrapServers.isEmpty) {
- throw new ConfigException(
- s"""If using ${KRaftConfigs.MIGRATION_ENABLED_CONFIG}, either ${QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG} must
- |contain the set of bootstrap controllers or ${QuorumConfig.QUORUM_VOTERS_CONFIG} must contain a parseable
- |set of controllers.""".stripMargin.replace("\n", " ")
- )
- }
- }
+
def validateControlPlaneListenerEmptyForKRaft(): Unit = {
require(controlPlaneListenerName.isEmpty,
s"${SocketServerConfigs.CONTROL_PLANE_LISTENER_NAME_CONFIG} is not supported in KRaft mode.")
@@ -922,25 +903,9 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
validateAdvertisedControllerListenersNonEmptyForKRaftController()
validateControllerListenerNamesMustAppearInListenersForKRaftController()
} else {
- // ZK-based
- if (migrationEnabled) {
- require(brokerId >= 0,
- "broker.id generation is incompatible with ZooKeeper migration. Please stop using it before enabling migration (set broker.id to a value greater or equal to 0).")
- validateQuorumVotersAndQuorumBootstrapServerForMigration()
- require(controllerListenerNames.nonEmpty,
- s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} must not be empty when running in ZooKeeper migration mode: ${controllerListenerNames.asJava}")
- require(interBrokerProtocolVersion.isMigrationSupported, s"Cannot enable ZooKeeper migration without setting " +
- s"'${ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG}' to 3.4 or higher")
- if (logDirs.size > 1) {
- require(interBrokerProtocolVersion.isDirectoryAssignmentSupported,
- s"Cannot enable ZooKeeper migration with multiple log directories (aka JBOD) without setting " +
- s"'${ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG}' to ${MetadataVersion.IBP_3_7_IV2} or higher")
- }
- } else {
- // controller listener names must be empty when not in KRaft mode
- require(controllerListenerNames.isEmpty,
- s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} must be empty when not running in KRaft mode: ${controllerListenerNames.asJava}")
- }
+ // controller listener names must be empty when not in KRaft mode
+ require(controllerListenerNames.isEmpty,
+ s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} must be empty when not running in KRaft mode: ${controllerListenerNames.asJava}")
}
val listenerNames = listeners.map(_.listenerName).toSet
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 0cb6ee48726..afd8429e57d 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -26,15 +26,13 @@ import kafka.log.LogManager
import kafka.log.remote.RemoteLogManager
import kafka.metrics.KafkaMetricsReporter
import kafka.network.{ControlPlaneAcceptor, DataPlaneAcceptor, RequestChannel, SocketServer}
-import kafka.raft.KafkaRaftManager
-import kafka.server.metadata.{OffsetTrackingListener, ZkConfigRepository, ZkMetadataCache}
+import kafka.server.metadata.{ZkConfigRepository, ZkMetadataCache}
import kafka.utils._
import kafka.zk.{AdminZkClient, BrokerInfo, KafkaZkClient}
import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater, MetadataRecoveryStrategy, NetworkClient, NetworkClientUtils}
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.message.ApiMessageType.ListenerType
-import org.apache.kafka.common.message.BrokerRegistrationRequestData.{Listener, ListenerCollection}
import org.apache.kafka.common.message.ControlledShutdownRequestData
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network._
@@ -48,19 +46,15 @@ import org.apache.kafka.common.{Endpoint, Node, TopicPartition}
import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.coordinator.transaction.ProducerIdManager
import org.apache.kafka.image.loader.metrics.MetadataLoaderMetrics
-import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble.VerificationFlag
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble.VerificationFlag.REQUIRE_V0
import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble}
-import org.apache.kafka.metadata.{BrokerState, MetadataRecordSerde, VersionRange}
-import org.apache.kafka.raft.QuorumConfig
-import org.apache.kafka.raft.Endpoints
+import org.apache.kafka.metadata.BrokerState
import org.apache.kafka.security.CredentialProvider
import org.apache.kafka.server.BrokerFeatures
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.MetadataVersion._
-import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion, NodeToControllerChannelManager}
+import org.apache.kafka.server.common.NodeToControllerChannelManager
import org.apache.kafka.server.config.{ConfigType, ZkConfigs}
-import org.apache.kafka.server.fault.LoggingFaultHandler
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.util.KafkaScheduler
@@ -75,7 +69,7 @@ import java.time.Duration
import java.util
import java.util.concurrent._
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
-import java.util.{Optional, OptionalInt, OptionalLong}
+import java.util.{Optional, OptionalInt}
import scala.collection.{Map, Seq}
import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters.RichOption
@@ -114,8 +108,7 @@ object KafkaServer {
class KafkaServer(
val config: KafkaConfig,
time: Time = Time.SYSTEM,
- threadNamePrefix: Option[String] = None,
- enableForwarding: Boolean = false
+ threadNamePrefix: Option[String] = None
) extends KafkaBroker with Server {
private val startupComplete = new AtomicBoolean(false)
@@ -205,7 +198,6 @@ class KafkaServer(
@volatile def kafkaController: KafkaController = _kafkaController
var lifecycleManager: BrokerLifecycleManager = _
- private var raftManager: KafkaRaftManager[ApiMessageAndVersion] = _
@volatile var brokerEpochManager: ZkBrokerEpochManager = _
@@ -241,9 +233,6 @@ class KafkaServer(
val initialMetaPropsEnsemble = {
val loader = new MetaPropertiesEnsemble.Loader()
loader.addLogDirs(config.logDirs.asJava)
- if (config.migrationEnabled) {
- loader.addMetadataLogDir(config.metadataLogDir)
- }
loader.load()
}
@@ -252,11 +241,7 @@ class KafkaServer(
} else {
OptionalInt.of(config.brokerId)
}
- val verificationFlags = if (config.migrationEnabled) {
- util.EnumSet.noneOf(classOf[VerificationFlag])
- } else {
- util.EnumSet.of(REQUIRE_V0)
- }
+ val verificationFlags = util.EnumSet.of(REQUIRE_V0)
initialMetaPropsEnsemble.verify(Optional.of(_clusterId), verificationId, verificationFlags)
/* generate brokerId */
@@ -294,11 +279,6 @@ class KafkaServer(
val builder = new MetaProperties.Builder(e.getValue).
setClusterId(_clusterId).
setNodeId(config.brokerId)
- if (!builder.directoryId().isPresent) {
- if (config.migrationEnabled) {
- builder.setDirectoryId(copier.generateValidDirectoryId())
- }
- }
copier.setLogDirProps(logDir, builder.build())
})
copier.emptyLogDirs().clear()
@@ -332,8 +312,7 @@ class KafkaServer(
metadataCache = MetadataCache.zkMetadataCache(
config.brokerId,
config.interBrokerProtocolVersion,
- brokerFeatures,
- config.migrationEnabled)
+ brokerFeatures)
val controllerNodeProvider = new MetadataCacheControllerNodeProvider(metadataCache, config,
() => Option(quorumControllerNodeProvider).map(_.getControllerInfo()))
@@ -360,11 +339,7 @@ class KafkaServer(
clientToControllerChannelManager.start()
/* start forwarding manager */
- var autoTopicCreationChannel = Option.empty[NodeToControllerChannelManager]
- if (enableForwarding) {
- this.forwardingManager = Some(ForwardingManager(clientToControllerChannelManager, metrics))
- autoTopicCreationChannel = Some(clientToControllerChannelManager)
- }
+ val autoTopicCreationChannel = Option.empty[NodeToControllerChannelManager]
val apiVersionManager = ApiVersionManager(
ListenerType.ZK_BROKER,
@@ -415,81 +390,6 @@ class KafkaServer(
_kafkaController = new KafkaController(config, zkClient, time, metrics, brokerInfo, brokerEpoch, tokenManager, brokerFeatures, metadataCache, threadNamePrefix)
kafkaController.startup()
- if (config.migrationEnabled) {
- logger.info("Starting up additional components for ZooKeeper migration")
- lifecycleManager = new BrokerLifecycleManager(config,
- time,
- s"zk-broker-${config.nodeId}-",
- isZkBroker = true,
- logManager.directoryIdsSet)
-
- // For ZK brokers in migration mode, always delete the metadata partition on startup.
- logger.info(s"Deleting local metadata log from ${config.metadataLogDir} since this is a ZK broker in migration mode.")
- KafkaRaftManager.maybeDeleteMetadataLogDir(config)
- logger.info("Successfully deleted local metadata log. It will be re-created.")
-
- // If the ZK broker is in migration mode, start up a RaftManager to learn about the new KRaft controller
- val quorumVoters = QuorumConfig.parseVoterConnections(config.quorumConfig.voters)
- raftManager = new KafkaRaftManager[ApiMessageAndVersion](
- metaPropsEnsemble.clusterId().get(),
- config,
- // metadata log dir and directory.id must exist because migration is enabled
- metaPropsEnsemble.logDirProps.get(metaPropsEnsemble.metadataLogDir.get).directoryId.get,
- new MetadataRecordSerde,
- KafkaRaftServer.MetadataPartition,
- KafkaRaftServer.MetadataTopicId,
- time,
- metrics,
- threadNamePrefix,
- CompletableFuture.completedFuture(quorumVoters),
- QuorumConfig.parseBootstrapServers(config.quorumConfig.bootstrapServers),
- // Endpoint information is only needed for KRaft controllers (voters). ZK brokers
- // (observers) can never be KRaft controllers
- Endpoints.empty(),
- fatalFaultHandler = new LoggingFaultHandler("raftManager", () => shutdown())
- )
- quorumControllerNodeProvider = RaftControllerNodeProvider(raftManager, config)
- val brokerToQuorumChannelManager = new NodeToControllerChannelManagerImpl(
- controllerNodeProvider = quorumControllerNodeProvider,
- time = time,
- metrics = metrics,
- config = config,
- channelName = "quorum",
- s"zk-broker-${config.nodeId}-",
- retryTimeoutMs = config.requestTimeoutMs.longValue
- )
-
- val listener = new OffsetTrackingListener()
- raftManager.register(listener)
- raftManager.startup()
-
- val networkListeners = new ListenerCollection()
- config.effectiveAdvertisedBrokerListeners.foreach { ep =>
- networkListeners.add(new Listener().
- setHost(if (Utils.isBlank(ep.host)) InetAddress.getLocalHost.getCanonicalHostName else ep.host).
- setName(ep.listenerName.value()).
- setPort(if (ep.port == 0) socketServer.boundPort(ep.listenerName) else ep.port).
- setSecurityProtocol(ep.securityProtocol.id))
- }
-
- val features = BrokerFeatures.createDefaultFeatureMap(BrokerFeatures.createDefault(config.unstableFeatureVersionsEnabled)).asScala
-
- // Even though ZK brokers don't use "metadata.version" feature, we need to overwrite it with our IBP as part of registration
- // so the KRaft controller can verify that all brokers are on the same IBP before starting the migration.
- val featuresRemapped = features + (MetadataVersion.FEATURE_NAME ->
- VersionRange.of(config.interBrokerProtocolVersion.featureLevel(), config.interBrokerProtocolVersion.featureLevel()))
-
- lifecycleManager.start(
- () => listener.highestOffset,
- brokerToQuorumChannelManager,
- clusterId,
- networkListeners,
- featuresRemapped.asJava,
- OptionalLong.empty()
- )
- logger.debug("Start RaftManager")
- }
-
// Used by ZK brokers during a KRaft migration. When talking to a KRaft controller, we need to use the epoch
// from BrokerLifecycleManager rather than ZK (via KafkaController)
brokerEpochManager = new ZkBrokerEpochManager(metadataCache, kafkaController, Option(lifecycleManager))
@@ -630,18 +530,6 @@ class KafkaServer(
dynamicConfigManager = new ZkConfigManager(zkClient, dynamicConfigHandlers)
dynamicConfigManager.startup()
- if (config.migrationEnabled && lifecycleManager != null) {
- lifecycleManager.initialCatchUpFuture.whenComplete { case (_, t) =>
- if (t != null) {
- fatal("Encountered an exception when waiting to catch up with KRaft metadata log", t)
- shutdown()
- } else {
- info("Finished catching up on KRaft metadata log, requesting that the KRaft controller unfence this broker")
- lifecycleManager.setReadyToUnfence()
- }
- }
- }
-
val enableRequestProcessingFuture = socketServer.enableRequestProcessing(authorizerFutures)
// Block here until all the authorizer futures are complete
try {
@@ -946,21 +834,6 @@ class KafkaServer(
_brokerState = BrokerState.PENDING_CONTROLLED_SHUTDOWN
- if (config.migrationEnabled && lifecycleManager != null && metadataCache.getControllerId.exists(_.isInstanceOf[KRaftCachedControllerId])) {
- // For now we'll send the heartbeat with WantShutDown set so the KRaft controller can see a broker
- // shutting down without waiting for the heartbeat to time out.
- info("Notifying KRaft of controlled shutdown")
- lifecycleManager.beginControlledShutdown()
- try {
- lifecycleManager.controlledShutdownFuture.get(5L, TimeUnit.MINUTES)
- } catch {
- case _: TimeoutException =>
- error("Timed out waiting for the controller to approve controlled shutdown")
- case e: Throwable =>
- error("Got unexpected exception waiting for controlled shutdown future", e)
- }
- }
-
val shutdownSucceeded = doControlledShutdown(config.controlledShutdownMaxRetries.intValue)
if (!shutdownSucceeded)
@@ -1070,9 +943,6 @@ class KafkaServer(
// Clear all reconfigurable instances stored in DynamicBrokerConfig
config.dynamicConfig.clear()
- if (raftManager != null)
- CoreUtils.swallow(raftManager.shutdown(), this)
-
if (lifecycleManager != null) {
lifecycleManager.close()
}
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala
index 4b14f04483e..c98431c44e9 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -117,10 +117,9 @@ trait MetadataCache {
object MetadataCache {
def zkMetadataCache(brokerId: Int,
metadataVersion: MetadataVersion,
- brokerFeatures: BrokerFeatures = BrokerFeatures.createEmpty(),
- zkMigrationEnabled: Boolean = false)
+ brokerFeatures: BrokerFeatures = BrokerFeatures.createEmpty())
: ZkMetadataCache = {
- new ZkMetadataCache(brokerId, metadataVersion, brokerFeatures, zkMigrationEnabled)
+ new ZkMetadataCache(brokerId, metadataVersion, brokerFeatures)
}
def kRaftMetadataCache(
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index cb7005f4020..292b0fd70a0 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -2093,24 +2093,6 @@ class ReplicaManager(val config: KafkaConfig,
s"Latest known controller epoch is $controllerEpoch")
leaderAndIsrRequest.getErrorResponse(0, Errors.STALE_CONTROLLER_EPOCH.exception)
} else {
- // In migration mode, reconcile missed topic deletions when handling full LISR from KRaft controller.
- // LISR "type" field was previously unspecified (0), so if we see it set to Full (2), then we know the
- // request came from a KRaft controller.
- //
- // Note that we have to do this first, before anything else, since topics may be recreated with the same
- // name, but a different ID. And in that case, we need to move aside the old version of those topics
- // (with the obsolete topic ID) before doing anything else.
- if (config.migrationEnabled &&
- leaderAndIsrRequest.isKRaftController &&
- leaderAndIsrRequest.requestType() == AbstractControlRequest.Type.FULL)
- {
- val strays = LogManager.findStrayReplicas(localBrokerId, leaderAndIsrRequest, logManager.allLogs)
- stateChangeLogger.info(s"While handling full LeaderAndIsr request from KRaft " +
- s"controller $controllerId with correlation id $correlationId, found ${strays.size} " +
- "stray partition(s).")
- updateStrayLogs(strays)
- }
-
val responseMap = new mutable.HashMap[TopicPartition, Errors]
controllerEpoch = leaderAndIsrRequest.controllerEpoch
@@ -2671,16 +2653,12 @@ class ReplicaManager(val config: KafkaConfig,
s"for partitions ${partitionsWithOfflineFutureReplica.mkString(",")} because they are in the failed log directory $dir.")
}
logManager.handleLogDirFailure(dir)
- if (dir == new File(config.metadataLogDir).getAbsolutePath && (config.processRoles.nonEmpty || config.migrationEnabled)) {
+ if (dir == new File(config.metadataLogDir).getAbsolutePath && config.processRoles.nonEmpty) {
fatal(s"Shutdown broker because the metadata log dir $dir has failed")
Exit.halt(1)
}
if (notifyController) {
- if (config.migrationEnabled) {
- fatal(s"Shutdown broker because some log directory has failed during migration mode: $dir")
- Exit.halt(1)
- }
if (zkClient.isEmpty) {
if (uuid.isDefined) {
directoryEventHandler.handleFailure(uuid.get)
diff --git a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala
index 3205a24aa44..36684f7ef0f 100755
--- a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala
+++ b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala
@@ -157,8 +157,7 @@ object ZkMetadataCache {
class ZkMetadataCache(
brokerId: Int,
metadataVersion: MetadataVersion,
- brokerFeatures: BrokerFeatures,
- zkMigrationEnabled: Boolean = false)
+ brokerFeatures: BrokerFeatures)
extends MetadataCache with ZkFinalizedFeatureCache with Logging {
private val partitionMetadataLock = new ReentrantReadWriteLock()
@@ -476,9 +475,6 @@ class ZkMetadataCache(
stateChangeLogger.error(s"Received UpdateMetadataRequest with Type=FULL (2), but version of " +
updateMetadataRequest.version() + ", which should not be possible. Not treating this as a full " +
"metadata update")
- } else if (!zkMigrationEnabled) {
- stateChangeLogger.error(s"Received UpdateMetadataRequest with Type=FULL (2), but ZK migrations " +
- s"are not enabled on this broker. Not treating this as a full metadata update")
} else {
// When handling a UMR from a KRaft controller, we may have to insert some partition
// deletions at the beginning, to handle the different way topic deletion works in KRaft
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index 106d5075dc4..3c9740fb5ce 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -30,13 +30,12 @@ import org.apache.kafka.common.security.token.delegation.{DelegationToken, Token
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
import org.apache.kafka.metadata.LeaderAndIsr
-import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState
import org.apache.kafka.security.authorizer.AclEntry
import org.apache.kafka.server.config.{ConfigType, ZkConfigs}
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.storage.internals.log.LogConfig
import org.apache.zookeeper.KeeperException.{Code, NodeExistsException}
-import org.apache.zookeeper.OpResult.{CheckResult, CreateResult, ErrorResult, SetDataResult}
+import org.apache.zookeeper.OpResult.{CreateResult, ErrorResult, SetDataResult}
import org.apache.zookeeper.client.ZKClientConfig
import org.apache.zookeeper.common.ZKConfig
import org.apache.zookeeper.data.{ACL, Stat}
@@ -165,92 +164,6 @@ class KafkaZkClient private[zk] (
tryCreateControllerZNodeAndIncrementEpoch()
}
- /**
- * Registers a given KRaft controller in zookeeper as the active controller. Unlike the ZK equivalent of this method,
- * this creates /controller as a persistent znode. This prevents ZK brokers from attempting to claim the controller
- * leadership during a KRaft leadership failover.
- *
- * This method is called at the beginning of a KRaft migration and during subsequent KRaft leadership changes during
- * the migration.
- *
- * To ensure that the KRaft controller epoch exceeds the current ZK controller epoch, this registration algorithm
- * uses a conditional update on the /controller and /controller_epoch znodes.
- *
- * If a new controller is registered concurrently with this registration, one of the two will fail the CAS
- * operation on /controller_epoch. For KRaft, we have an extra guard against the registered KRaft epoch going
- * backwards. If a KRaft controller had previously registered, an additional CAS operation is done on the /controller
- * ZNode to ensure that the KRaft epoch being registered is newer.
- *
- * @param kraftControllerId ID of the KRaft controller node
- * @param kraftControllerEpoch Epoch of the KRaft controller node
- * @return A result object containing the written ZK controller epoch and version, or nothing.
- */
- def tryRegisterKRaftControllerAsActiveController(kraftControllerId: Int, kraftControllerEpoch: Int): KRaftRegistrationResult = {
- val timestamp = time.milliseconds()
- val curEpochOpt: Option[(Int, Int)] = getControllerEpoch.map(e => (e._1, e._2.getVersion))
- val controllerOpt = getControllerRegistration
-
- // If we have a KRaft epoch registered in /controller, and it is not _older_ than the requested epoch, throw an error.
- controllerOpt.flatMap(_.kraftEpoch).foreach { kraftEpochInZk =>
- if (kraftEpochInZk >= kraftControllerEpoch) {
- throw new ControllerMovedException(s"Cannot register KRaft controller $kraftControllerId with epoch $kraftControllerEpoch " +
- s"as the current controller register in ZK has the same or newer epoch $kraftEpochInZk.")
- }
- }
-
- curEpochOpt match {
- case None =>
- throw new IllegalStateException(s"Cannot register KRaft controller $kraftControllerId as the active controller " +
- s"since there is no ZK controller epoch present.")
- case Some((curEpoch: Int, curEpochZk: Int)) =>
- val newControllerEpoch = curEpoch + 1
-
- val response = controllerOpt match {
- case Some(controller) =>
- info(s"KRaft controller $kraftControllerId overwriting ${ControllerZNode.path} to become the active " +
- s"controller with ZK epoch $newControllerEpoch. The previous controller was ${controller.broker}.")
- retryRequestUntilConnected(
- MultiRequest(Seq(
- SetDataOp(ControllerEpochZNode.path, ControllerEpochZNode.encode(newControllerEpoch), curEpochZk),
- DeleteOp(ControllerZNode.path, controller.zkVersion),
- CreateOp(ControllerZNode.path, ControllerZNode.encode(kraftControllerId, timestamp, kraftControllerEpoch),
- defaultAcls(ControllerZNode.path), CreateMode.PERSISTENT)))
- )
- case None =>
- info(s"KRaft controller $kraftControllerId creating ${ControllerZNode.path} to become the active " +
- s"controller with ZK epoch $newControllerEpoch. There was no active controller.")
- retryRequestUntilConnected(
- MultiRequest(Seq(
- SetDataOp(ControllerEpochZNode.path, ControllerEpochZNode.encode(newControllerEpoch), curEpochZk),
- CreateOp(ControllerZNode.path, ControllerZNode.encode(kraftControllerId, timestamp, kraftControllerEpoch),
- defaultAcls(ControllerZNode.path), CreateMode.PERSISTENT)))
- )
- }
-
- val failureSuffix = s"while trying to register KRaft controller $kraftControllerId with ZK epoch " +
- s"$newControllerEpoch. KRaft controller was not registered."
- response.resultCode match {
- case Code.OK =>
- info(s"Successfully registered KRaft controller $kraftControllerId with ZK epoch $newControllerEpoch")
- // First op is always SetData on /controller_epoch
- val setDataResult = response.zkOpResults.head.rawOpResult.asInstanceOf[SetDataResult]
- SuccessfulRegistrationResult(newControllerEpoch, setDataResult.getStat.getVersion)
- case Code.BADVERSION =>
- info(s"The ZK controller epoch changed $failureSuffix")
- FailedRegistrationResult()
- case Code.NONODE =>
- info(s"The ephemeral node at ${ControllerZNode.path} went away $failureSuffix")
- FailedRegistrationResult()
- case Code.NODEEXISTS =>
- info(s"The ephemeral node at ${ControllerZNode.path} was created by another controller $failureSuffix")
- FailedRegistrationResult()
- case code =>
- error(s"ZooKeeper had an error $failureSuffix")
- throw KeeperException.create(code)
- }
- }
- }
-
private def maybeCreateControllerEpochZNode(): (Int, Int) = {
createControllerEpochRaw(KafkaController.InitialControllerEpoch).resultCode match {
case Code.OK =>
@@ -1723,36 +1636,6 @@ class KafkaZkClient private[zk] (
}
}
- def getOrCreateMigrationState(initialState: ZkMigrationLeadershipState): ZkMigrationLeadershipState = {
- val getDataRequest = GetDataRequest(MigrationZNode.path)
- val getDataResponse = retryRequestUntilConnected(getDataRequest)
- getDataResponse.resultCode match {
- case Code.OK =>
- MigrationZNode.decode(getDataResponse.data, getDataResponse.stat.getVersion, getDataResponse.stat.getMtime)
- case Code.NONODE =>
- createInitialMigrationState(initialState)
- case _ => throw getDataResponse.resultException.get
- }
- }
-
- private def createInitialMigrationState(initialState: ZkMigrationLeadershipState): ZkMigrationLeadershipState = {
- val createRequest = CreateRequest(
- MigrationZNode.path,
- MigrationZNode.encode(initialState),
- defaultAcls(MigrationZNode.path),
- CreateMode.PERSISTENT)
- val response = retryRequestUntilConnected(createRequest)
- response.maybeThrow()
- initialState.withMigrationZkVersion(0)
- }
-
- def updateMigrationState(migrationState: ZkMigrationLeadershipState): ZkMigrationLeadershipState = {
- val req = SetDataRequest(MigrationZNode.path, MigrationZNode.encode(migrationState), migrationState.migrationZkVersion())
- val resp = retryRequestUntilConnected(req)
- resp.maybeThrow()
- migrationState.withMigrationZkVersion(resp.stat.getVersion)
- }
-
/**
* Return the ACLs of the node of the given path
* @param path the given path for the node
@@ -1971,137 +1854,6 @@ class KafkaZkClient private[zk] (
}
}
- /**
- * Safely performs a sequence of writes to ZooKeeper as part of a KRaft migration. For each request in {@code requests}, we
- * wrap the operation in a multi-op transaction that includes a check op on /controller_epoch and /migration. This ensures
- * that another KRaft controller or another ZK controller has unexpectedly taken leadership.
- *
- * In cases of KRaft failover during a migration, it is possible that a write is attempted before the old KRaft controller
- * receives the new leader information. In this case, the check op on /migration acts as a guard against multiple writers.
- *
- * The multi-op for the last request in {@code requests} is used to update the /migration node with the latest migration
- * state. This effectively checkpoints the progress of the migration in ZK relative to the metadata log.
- *
- * Each multi-op request is atomic. The overall sequence of multi-op requests is not atomic and we may fail during any
- * of them. When the KRaft controller recovers the migration state, it will re-apply all of the writes needed to update
- * the ZK state with the latest KRaft state. In the case of Create or Delete operations, these will fail if applied
- * twice, so we need to ignore NodeExists and NoNode failures for those cases.
- *
- * @param requests A sequence of ZK requests. Only Create, Delete, and SetData are supported.
- * @param migrationState The current migration state. This is written out as part of the final multi-op request.
- * @return The new version of /migration ZNode and the sequence of responses for the given requests.
- */
- def retryMigrationRequestsUntilConnected[Req <: AsyncRequest](requests: Seq[Req],
- migrationState: ZkMigrationLeadershipState): (Int, Seq[Req#Response]) = {
-
- if (requests.isEmpty) {
- return (migrationState.migrationZkVersion(), Seq.empty)
- }
-
- def wrapMigrationRequest(request: Req, lastRequestInBatch: Boolean): MultiRequest = {
- // Wrap a single request with the multi-op transactional request.
- val checkOp = CheckOp(ControllerEpochZNode.path, migrationState.zkControllerEpochZkVersion())
- val migrationOp = if (lastRequestInBatch) {
- SetDataOp(MigrationZNode.path, MigrationZNode.encode(migrationState), migrationState.migrationZkVersion())
- } else {
- CheckOp(MigrationZNode.path, migrationState.migrationZkVersion())
- }
-
- request match {
- case CreateRequest(path, data, acl, createMode, ctx) =>
- MultiRequest(Seq(checkOp, migrationOp, CreateOp(path, data, acl, createMode)), ctx)
- case DeleteRequest(path, version, ctx) =>
- MultiRequest(Seq(checkOp, migrationOp, DeleteOp(path, version)), ctx)
- case SetDataRequest(path, data, version, ctx) =>
- MultiRequest(Seq(checkOp, migrationOp, SetDataOp(path, data, version)), ctx)
- case _ => throw new IllegalStateException(s"$request does not need controller epoch check")
- }
- }
-
- def handleUnwrappedMigrationResult(migrationOp: ZkOp, migrationResult: OpResult): Int = {
- // Handle just the operation that updated /migration ZNode
- val (path: String, data: Option[Array[Byte]], version: Int) = migrationOp match {
- case CheckOp(path, version) => (path, None, version)
- case SetDataOp(path, data, version) => (path, Some(data), version)
- case _ => throw new IllegalStateException("Unexpected result on /migration znode")
- }
-
- migrationResult match {
- case _: CheckResult => version
- case setDataResult: SetDataResult => setDataResult.getStat.getVersion
- case errorResult: ErrorResult =>
- if (path.equals(MigrationZNode.path)) {
- val errorCode = Code.get(errorResult.getErr)
- if (errorCode == Code.BADVERSION) {
- data match {
- case Some(value) =>
- val failedPayload = MigrationZNode.decode(value, version, -1)
- throw new RuntimeException(
- s"Conditional update on KRaft Migration ZNode failed. Sent zkVersion = $version. The failed " +
- s"write was: $failedPayload. This indicates that another KRaft controller is making writes to ZooKeeper.")
- case None =>
- throw new RuntimeException(s"Check op on KRaft Migration ZNode failed. Sent zkVersion = $version. " +
- s"This indicates that another KRaft controller is making writes to ZooKeeper.")
- }
- } else if (errorCode == Code.OK) {
- // This means the Check or SetData op would have been ok, but failed because of another operation in this multi-op
- version
- } else {
- throw KeeperException.create(errorCode, path)
- }
- } else {
- throw new RuntimeException(s"Got migration result for incorrect path $path")
- }
- case _ => throw new RuntimeException(
- s"Expected either CheckResult, SetDataResult, or ErrorResult for migration op, but saw $migrationResult")
- }
- }
-
- def unwrapMigrationResponse(response: AsyncResponse, lastRequestInBatch: Boolean): (AsyncResponse, Int) = {
- response match {
- case MultiResponse(resultCode, _, ctx, zkOpResults, responseMetadata) =>
- zkOpResults match {
- case Seq(ZkOpResult(checkOp: CheckOp, checkOpResult), ZkOpResult(migrationOp: CheckOp, migrationResult), zkOpResult) =>
- // Matches all requests except or the last one (CheckOp on /migration)
- if (lastRequestInBatch) {
- throw new IllegalStateException("Should not see a Check operation on /migration in the last request.")
- }
- handleUnwrappedCheckOp(checkOp, checkOpResult)
- val migrationVersion = handleUnwrappedMigrationResult(migrationOp, migrationResult)
- (handleUnwrappedZkOp(zkOpResult, resultCode, ctx, responseMetadata), migrationVersion)
- case Seq(ZkOpResult(checkOp: CheckOp, checkOpResult), ZkOpResult(migrationOp: SetDataOp, migrationResult), zkOpResult) =>
- // Matches the last request in a batch (SetDataOp on /migration)
- if (!lastRequestInBatch) {
- throw new IllegalStateException("Should only see a SetData operation on /migration in the last request.")
- }
- handleUnwrappedCheckOp(checkOp, checkOpResult)
- val migrationVersion = handleUnwrappedMigrationResult(migrationOp, migrationResult)
- (handleUnwrappedZkOp(zkOpResult, resultCode, ctx, responseMetadata), migrationVersion)
- case null => throw KeeperException.create(resultCode)
- case _ => throw new IllegalStateException(
- s"Cannot unwrap $response because it does not contain the expected operations for a migration operation.")
- }
- case _ => throw new IllegalStateException(s"Cannot unwrap $response because it is not a MultiResponse")
- }
- }
-
- migrationState.zkControllerEpochZkVersion() match {
- case ZkVersion.MatchAnyVersion => throw new IllegalArgumentException(
- s"Expected a controller epoch zkVersion when making migration writes, not -1.")
- case version if version >= 0 =>
- logger.trace(s"Performing ${requests.size} migration update(s) with migrationState=$migrationState")
- val wrappedRequests = requests.map(req => wrapMigrationRequest(req, req == requests.last))
- val results = retryRequestsUntilConnected(wrappedRequests)
- val unwrappedResults = results.map(resp => unwrapMigrationResponse(resp, resp == results.last))
- val migrationZkVersion = unwrappedResults.last._2
- // Return the new version of /migration and the sequence of responses to the original requests
- (migrationZkVersion, unwrappedResults.map(_._1.asInstanceOf[Req#Response]))
- case invalidVersion =>
- throw new IllegalArgumentException(
- s"Expected controller epoch zkVersion $invalidVersion should be non-negative or equal to ${ZkVersion.MatchAnyVersion}")
- }
- }
-
private def retryRequestsUntilConnected[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = {
val remainingRequests = new mutable.ArrayBuffer(requests.size) ++= requests
val responses = new mutable.ArrayBuffer[Req#Response]
diff --git a/core/src/main/scala/kafka/zk/ZkData.scala b/core/src/main/scala/kafka/zk/ZkData.scala
index 7c1ec8ab565..d4c92150909 100644
--- a/core/src/main/scala/kafka/zk/ZkData.scala
+++ b/core/src/main/scala/kafka/zk/ZkData.scala
@@ -37,7 +37,6 @@ import org.apache.kafka.common.security.token.delegation.TokenInformation
import org.apache.kafka.common.utils.{SecurityUtils, Time}
import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState}
-import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.security.authorizer.AclEntry
import org.apache.kafka.server.common.{MetadataVersion, ProducerIdsBlock}
@@ -1044,43 +1043,6 @@ object FeatureZNode {
}
}
-object MigrationZNode {
- val path = "/migration"
-
- def encode(migration: ZkMigrationLeadershipState): Array[Byte] = {
- val jsonMap = Map(
- "version" -> 0,
- "kraft_controller_id" -> migration.kraftControllerId(),
- "kraft_controller_epoch" -> migration.kraftControllerEpoch(),
- "kraft_metadata_offset" -> migration.kraftMetadataOffset(),
- "kraft_metadata_epoch" -> migration.kraftMetadataEpoch()
- )
- Json.encodeAsBytes(jsonMap.asJava)
- }
-
- def decode(bytes: Array[Byte], zkVersion: Int, modifyTimeMs: Long): ZkMigrationLeadershipState = {
- val jsonDataAsString = bytes.map(_.toChar).mkString
- Json.parseBytes(bytes).map(_.asJsonObject).flatMap { js =>
- val version = js("version").to[Int]
- if (version != 0) {
- throw new KafkaException(s"Encountered unknown version $version when parsing migration json $jsonDataAsString")
- }
- val controllerId = js("kraft_controller_id").to[Int]
- val controllerEpoch = js("kraft_controller_epoch").to[Int]
- val metadataOffset = js("kraft_metadata_offset").to[Long]
- val metadataEpoch = js("kraft_metadata_epoch").to[Int]
- Some(new ZkMigrationLeadershipState(
- controllerId,
- controllerEpoch,
- metadataOffset,
- metadataEpoch,
- modifyTimeMs,
- zkVersion,
- ZkMigrationLeadershipState.EMPTY.zkControllerEpoch(),
- ZkMigrationLeadershipState.EMPTY.zkControllerEpochZkVersion()))
- }.getOrElse(throw new KafkaException(s"Failed to parse the migration json $jsonDataAsString"))
- }
-}
object ZkData {
@@ -1101,7 +1063,6 @@ object ZkData {
LogDirEventNotificationZNode.path,
DelegationTokenAuthZNode.path,
ExtendedAclZNode.path,
- MigrationZNode.path,
FeatureZNode.path) ++ ZkAclStore.securePaths
// These are persistent ZK paths that should exist on kafka broker startup.
diff --git a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
index ac59f026b0c..ce953990af8 100755
--- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
@@ -82,7 +82,7 @@ class ZooKeeperQuorumImplementation(
startup: Boolean,
threadNamePrefix: Option[String],
): KafkaBroker = {
- val server = new KafkaServer(config, time, threadNamePrefix, false)
+ val server = new KafkaServer(config, time, threadNamePrefix)
if (startup) server.startup()
server
}
diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index 8a8772ea08d..ad7187ec5e1 100755
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -408,13 +408,7 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
if (isKRaftTest()) {
createBroker(config, brokerTime(config.brokerId), startup = false)
} else {
- TestUtils.createServer(
- config,
- time = brokerTime(config.brokerId),
- threadNamePrefix = None,
- startup = false,
- enableZkApiForwarding = config.migrationEnabled && config.interBrokerProtocolVersion.isApiForwardingEnabled
- )
+ TestUtils.createServer(config, time = brokerTime(config.brokerId), threadNamePrefix = None, startup = false)
}
}
diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
index 4bb0eccbb18..1fa12ef990b 100644
--- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
+++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
@@ -20,7 +20,7 @@ package kafka.metrics
import java.lang.management.ManagementFactory
import java.util.Properties
import javax.management.ObjectName
-import com.yammer.metrics.core.{Gauge, MetricPredicate}
+import com.yammer.metrics.core.MetricPredicate
import org.junit.jupiter.api.Assertions._
import kafka.integration.KafkaServerTestHarness
import kafka.server._
@@ -33,7 +33,6 @@ import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.metrics.JmxReporter
import org.apache.kafka.common.utils.Time
-import org.apache.kafka.metadata.migration.ZkMigrationState
import org.apache.kafka.server.config.ServerLogConfigs
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector}
import org.apache.kafka.storage.log.metrics.BrokerTopicMetrics
@@ -229,15 +228,10 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
"kafka.controller:type=KafkaController,name=MetadataErrorCount",
"kafka.controller:type=KafkaController,name=OfflinePartitionsCount",
"kafka.controller:type=KafkaController,name=PreferredReplicaImbalanceCount",
- "kafka.controller:type=KafkaController,name=ZkMigrationState",
).foreach(expected => {
assertEquals(1, metrics.keySet.asScala.count(_.getMBeanName.equals(expected)),
s"Unable to find $expected")
})
-
- val zkStateMetricName = metrics.keySet.asScala.filter(_.getMBeanName == "kafka.controller:type=KafkaController,name=ZkMigrationState").head
- val zkStateGauge = metrics.get(zkStateMetricName).asInstanceOf[Gauge[Int]]
- assertEquals(ZkMigrationState.NONE.value().intValue(), zkStateGauge.value())
}
/**
diff --git a/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala b/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala
index 1a24eeb460a..5456ab1f69d 100644
--- a/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala
@@ -17,7 +17,7 @@
package kafka.server
-import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterInstance, ClusterTest, ClusterTestExtensions, Type}
+import org.apache.kafka.common.test.api.{ClusterInstance, ClusterTest, ClusterTestExtensions, Type}
import org.apache.kafka.clients.ClientResponse
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
import org.apache.kafka.common.message.{BrokerRegistrationRequestData, CreateTopicsRequestData}
@@ -143,36 +143,7 @@ class BrokerRegistrationRequestTest {
Errors.forCode(resp.topics().find(topicName).errorCode())
}
- @ClusterTest(types = Array(Type.KRAFT), brokers = 0, controllers = 1, metadataVersion = MetadataVersion.IBP_3_4_IV0,
- serverProperties = Array(new ClusterConfigProperty(key = "zookeeper.metadata.migration.enable", value = "false")))
- def testRegisterZkWithKRaftMigrationDisabled(clusterInstance: ClusterInstance): Unit = {
- val clusterId = clusterInstance.clusterId()
- val channelManager = brokerToControllerChannelManager(clusterInstance)
- try {
- channelManager.start()
-
- assertEquals(
- Errors.BROKER_ID_NOT_REGISTERED,
- registerBroker(channelManager, clusterId, 100, Some(1), Some((MetadataVersion.IBP_3_3_IV0, MetadataVersion.IBP_3_3_IV0))))
-
- assertEquals(
- Errors.BROKER_ID_NOT_REGISTERED,
- registerBroker(channelManager, clusterId, 100, Some(1), None))
-
- assertEquals(
- Errors.BROKER_ID_NOT_REGISTERED,
- registerBroker(channelManager, clusterId, 100, Some(1), Some((MetadataVersion.IBP_3_4_IV0, MetadataVersion.IBP_3_4_IV0))))
-
- assertEquals(
- Errors.NONE,
- registerBroker(channelManager, clusterId, 100, None, Some((MetadataVersion.IBP_3_4_IV0, MetadataVersion.IBP_3_4_IV0))))
- } finally {
- channelManager.shutdown()
- }
- }
-
- @ClusterTest(types = Array(Type.KRAFT), brokers = 0, controllers = 1, metadataVersion = MetadataVersion.IBP_3_3_IV3,
- serverProperties = Array(new ClusterConfigProperty(key = "zookeeper.metadata.migration.enable", value = "false")))
+ @ClusterTest(types = Array(Type.KRAFT), brokers = 0, controllers = 1, metadataVersion = MetadataVersion.IBP_3_3_IV3)
def testRegisterZkWith33Controller(clusterInstance: ClusterInstance): Unit = {
// Verify that a controller running an old metadata.version cannot register a ZK broker
val clusterId = clusterInstance.clusterId()
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 179a5db7ac7..a349564b951 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -1829,45 +1829,6 @@ class KafkaConfigTest {
)
}
- @Test
- def testMigrationCannotBeEnabledWithJBOD(): Unit = {
- val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect, port = TestUtils.MockZkPort, logDirCount = 2)
- props.setProperty(KRaftConfigs.MIGRATION_ENABLED_CONFIG, "true")
- props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "3000@localhost:9093")
- props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
- props.setProperty(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG, MetadataVersion.IBP_3_7_IV1.version())
-
- assertEquals(
- "requirement failed: Cannot enable ZooKeeper migration with multiple log directories " +
- "(aka JBOD) without setting 'inter.broker.protocol.version' to 3.7-IV2 or higher",
- assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props)).getMessage)
- }
-
- @Test
- def testMigrationCannotBeEnabledWithBrokerIdGeneration(): Unit = {
- val props = TestUtils.createBrokerConfig(-1, TestUtils.MockZkConnect, port = TestUtils.MockZkPort, logDirCount = 2)
- props.setProperty(KRaftConfigs.MIGRATION_ENABLED_CONFIG, "true")
- props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "3000@localhost:9093")
- props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
- assertEquals(
- "requirement failed: broker.id generation is incompatible with ZooKeeper migration. Please stop using it before enabling migration (set broker.id to a value greater or equal to 0).",
- assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props)).getMessage)
- }
-
- @Test
- def testMigrationEnabledKRaftMode(): Unit = {
- val props = new Properties()
- props.putAll(kraftProps())
- props.setProperty(KRaftConfigs.MIGRATION_ENABLED_CONFIG, "true")
-
- assertEquals(
- "If using `zookeeper.metadata.migration.enable` in KRaft mode, `zookeeper.connect` must also be set.",
- assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props)).getMessage)
-
- props.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181")
- KafkaConfig.fromProps(props)
- }
-
@Test
def testConsumerGroupSessionTimeoutValidation(): Unit = {
val props = new Properties()
diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
index 4e70652494c..8a1a04f6b93 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
@@ -1017,75 +1017,6 @@ class MetadataCacheTest {
(initialTopicIds, initialTopicStates, newTopicIds, newPartitionStates)
}
- /**
- * Verify the behavior of ZkMetadataCache when handling "Full" UpdateMetadataRequest
- */
- @Test
- def testHandleFullUpdateMetadataRequestInZkMigration(): Unit = {
- val (initialTopicIds, initialTopicStates, newTopicIds, newPartitionStates) = setupInitialAndFullMetadata()
-
- val updateMetadataRequestBuilder = () => new UpdateMetadataRequest.Builder(8, 1, 42, brokerEpoch,
- newPartitionStates.asJava, Seq.empty.asJava, newTopicIds.asJava, true, AbstractControlRequest.Type.FULL).build()
-
- def verifyMetadataCache(
- updateMetadataRequest: UpdateMetadataRequest,
- zkMigrationEnabled: Boolean = true
- )(
- verifier: ZkMetadataCache => Unit
- ): Unit = {
- val cache = MetadataCache.zkMetadataCache(1, MetadataVersion.latestTesting(), zkMigrationEnabled = zkMigrationEnabled)
- cache.updateMetadata(1, new UpdateMetadataRequest.Builder(8, 1, 42, brokerEpoch,
- initialTopicStates.flatMap(_._2.values).toList.asJava, Seq.empty.asJava, initialTopicIds.asJava).build())
- cache.updateMetadata(1, updateMetadataRequest)
- verifier.apply(cache)
- }
-
- // KRaft=false Type=FULL, migration disabled
- var updateMetadataRequest = updateMetadataRequestBuilder.apply()
- updateMetadataRequest.data().setIsKRaftController(true)
- updateMetadataRequest.data().setType(AbstractControlRequest.Type.FULL.toByte)
- verifyMetadataCache(updateMetadataRequest, zkMigrationEnabled = false) { cache =>
- assertEquals(3, cache.getAllTopics().size)
- assertTrue(cache.contains("test-topic-1"))
- assertTrue(cache.contains("test-topic-1"))
- }
-
- // KRaft=true Type=FULL
- updateMetadataRequest = updateMetadataRequestBuilder.apply()
- verifyMetadataCache(updateMetadataRequest) { cache =>
- assertEquals(1, cache.getAllTopics().size)
- assertFalse(cache.contains("test-topic-1"))
- assertFalse(cache.contains("test-topic-1"))
- }
-
- // KRaft=false Type=FULL
- updateMetadataRequest = updateMetadataRequestBuilder.apply()
- updateMetadataRequest.data().setIsKRaftController(false)
- verifyMetadataCache(updateMetadataRequest) { cache =>
- assertEquals(3, cache.getAllTopics().size)
- assertTrue(cache.contains("test-topic-1"))
- assertTrue(cache.contains("test-topic-1"))
- }
-
- // KRaft=true Type=INCREMENTAL
- updateMetadataRequest = updateMetadataRequestBuilder.apply()
- updateMetadataRequest.data().setType(AbstractControlRequest.Type.INCREMENTAL.toByte)
- verifyMetadataCache(updateMetadataRequest) { cache =>
- assertEquals(3, cache.getAllTopics().size)
- assertTrue(cache.contains("test-topic-1"))
- assertTrue(cache.contains("test-topic-1"))
- }
-
- // KRaft=true Type=UNKNOWN
- updateMetadataRequest = updateMetadataRequestBuilder.apply()
- updateMetadataRequest.data().setType(AbstractControlRequest.Type.UNKNOWN.toByte)
- verifyMetadataCache(updateMetadataRequest) { cache =>
- assertEquals(3, cache.getAllTopics().size)
- assertTrue(cache.contains("test-topic-1"))
- assertTrue(cache.contains("test-topic-1"))
- }
- }
-
@Test
def testGetOfflineReplicasConsidersDirAssignment(): Unit = {
case class Broker(id: Int, dirs: util.List[Uuid])
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 044a7d490d7..68bf1ec9922 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -56,13 +56,10 @@ import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.image._
import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState}
-import org.apache.kafka.metadata.migration.ZkMigrationState
import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils}
-import org.apache.kafka.network.SocketServerConfigs
-import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0
import org.apache.kafka.server.common.{DirectoryEventHandler, KRaftVersion, MetadataVersion, OffsetAndEpoch, RequestLocal, StopPartition}
-import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerLogConfigs}
+import org.apache.kafka.server.config.{ReplicationConfigs, ServerLogConfigs}
import org.apache.kafka.server.log.remote.storage._
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
import org.apache.kafka.server.network.BrokerEndPoint
@@ -6245,8 +6242,7 @@ class ReplicaManagerTest {
private def imageFromTopics(topicsImage: TopicsImage): MetadataImage = {
val featuresImageLatest = new FeaturesImage(
Collections.emptyMap(),
- MetadataVersion.latestProduction(),
- ZkMigrationState.NONE)
+ MetadataVersion.latestProduction())
new MetadataImage(
new MetadataProvenance(100L, 10, 1000L, true),
featuresImageLatest,
@@ -6479,20 +6475,6 @@ class ReplicaManagerTest {
val newFoo0 = new TopicIdPartition(Uuid.fromString("JRCmVxWxQamFs4S8NXYufg"), new TopicPartition("foo", 0))
val bar0 = new TopicIdPartition(Uuid.fromString("69O438ZkTSeqqclTtZO2KA"), new TopicPartition("bar", 0))
- def setupReplicaManagerForKRaftMigrationTest(): ReplicaManager = {
- setupReplicaManagerWithMockedPurgatories(
- brokerId = 3,
- timer = new MockTimer(time),
- aliveBrokerIds = Seq(0, 1, 2),
- propsModifier = props => {
- props.setProperty(KRaftConfigs.MIGRATION_ENABLED_CONFIG, "true")
- props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "1000@localhost:9093")
- props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
- props.setProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT")
- },
- defaultTopicRemoteLogStorageEnable = false)
- }
-
def verifyPartitionIsOnlineAndHasId(
replicaManager: ReplicaManager,
topicIdPartition: TopicIdPartition
@@ -6517,59 +6499,6 @@ class ReplicaManagerTest {
assertEquals(HostedPartition.None, partition, s"Expected ${topicIdPartition} to be offline, but it was: ${partition}")
}
- @Test
- def testFullLairDuringKRaftMigration(): Unit = {
- val replicaManager = setupReplicaManagerForKRaftMigrationTest()
- try {
- val becomeLeaderRequest = LogManagerTest.createLeaderAndIsrRequestForStrayDetection(
- Seq(foo0, foo1, bar0), Seq(3, 4, 3))
- replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ())
- verifyPartitionIsOnlineAndHasId(replicaManager, foo0)
- verifyPartitionIsOnlineAndHasId(replicaManager, foo1)
- verifyPartitionIsOnlineAndHasId(replicaManager, bar0)
- } finally {
- replicaManager.shutdown(checkpointHW = false)
- }
- }
-
- @Test
- def testFullLairDuringKRaftMigrationRemovesOld(): Unit = {
- val replicaManager = setupReplicaManagerForKRaftMigrationTest()
- try {
- val becomeLeaderRequest1 = LogManagerTest.createLeaderAndIsrRequestForStrayDetection(
- Seq(foo0, foo1, bar0), Seq(3, 4, 3))
- replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest1, (_, _) => ())
- val becomeLeaderRequest2 = LogManagerTest.createLeaderAndIsrRequestForStrayDetection(
- Seq(bar0), Seq(3, 4, 3))
- replicaManager.becomeLeaderOrFollower(2, becomeLeaderRequest2, (_, _) => ())
-
- verifyPartitionIsOffline(replicaManager, foo0)
- verifyPartitionIsOffline(replicaManager, foo1)
- verifyPartitionIsOnlineAndHasId(replicaManager, bar0)
- } finally {
- replicaManager.shutdown(checkpointHW = false)
- }
- }
-
- @Test
- def testFullLairDuringKRaftMigrationWithTopicRecreations(): Unit = {
- val replicaManager = setupReplicaManagerForKRaftMigrationTest()
- try {
- val becomeLeaderRequest1 = LogManagerTest.createLeaderAndIsrRequestForStrayDetection(
- Seq(foo0, foo1, bar0), Seq(3, 4, 3))
- replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest1, (_, _) => ())
- val becomeLeaderRequest2 = LogManagerTest.createLeaderAndIsrRequestForStrayDetection(
- Seq(newFoo0, bar0), Seq(3, 4, 3))
- replicaManager.becomeLeaderOrFollower(2, becomeLeaderRequest2, (_, _) => ())
-
- verifyPartitionIsOnlineAndHasId(replicaManager, newFoo0)
- verifyPartitionIsOffline(replicaManager, foo1)
- verifyPartitionIsOnlineAndHasId(replicaManager, bar0)
- } finally {
- replicaManager.shutdown(checkpointHW = false)
- }
- }
-
@Test
def testRemoteReadQuotaExceeded(): Unit = {
when(mockRemoteLogManager.getFetchThrottleTimeMs).thenReturn(quotaExceededThrottleTime)
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index c81fa2b189d..b2447e7f7c5 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -159,12 +159,11 @@ object TestUtils extends Logging {
* @param config The configuration of the server
*/
def createServer(config: KafkaConfig, time: Time = Time.SYSTEM): KafkaServer = {
- createServer(config, time, None, startup = true, enableZkApiForwarding = false)
+ createServer(config, time, None, startup = true)
}
- def createServer(config: KafkaConfig, time: Time, threadNamePrefix: Option[String],
- startup: Boolean, enableZkApiForwarding: Boolean) = {
- val server = new KafkaServer(config, time, threadNamePrefix, enableForwarding = enableZkApiForwarding)
+ def createServer(config: KafkaConfig, time: Time, threadNamePrefix: Option[String], startup: Boolean): KafkaServer = {
+ val server = new KafkaServer(config, time, threadNamePrefix)
if (startup) server.startup()
server
}
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
index d8ccd14bdc2..b624fc07358 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
@@ -220,7 +220,7 @@ public class ReplicaFetcherThreadBenchmark {
// TODO: fix to support raft
ZkMetadataCache metadataCache = MetadataCache.zkMetadataCache(0,
- config.interBrokerProtocolVersion(), BrokerFeatures.createEmpty(), false);
+ config.interBrokerProtocolVersion(), BrokerFeatures.createEmpty());
metadataCache.updateMetadata(0, updateMetadataRequest);
replicaManager = new ReplicaManagerBuilder().
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ActivationRecordsGenerator.java b/metadata/src/main/java/org/apache/kafka/controller/ActivationRecordsGenerator.java
index b2ef4fe4f11..b09f3511e19 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ActivationRecordsGenerator.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ActivationRecordsGenerator.java
@@ -21,7 +21,6 @@ import org.apache.kafka.common.metadata.AbortTransactionRecord;
import org.apache.kafka.common.metadata.BeginTransactionRecord;
import org.apache.kafka.common.metadata.EndTransactionRecord;
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
-import org.apache.kafka.metadata.migration.ZkMigrationState;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
@@ -29,8 +28,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
-import static org.apache.kafka.metadata.migration.ZkMigrationState.NONE;
-import static org.apache.kafka.metadata.migration.ZkMigrationState.POST_MIGRATION;
public class ActivationRecordsGenerator {
@@ -90,12 +87,6 @@ public class ActivationRecordsGenerator {
// initialization, etc.
records.addAll(bootstrapMetadata.records());
- if (metadataVersion.isMigrationSupported()) {
- logMessageBuilder.append("Setting the ZK migration state to NONE since this is a de-novo " +
- "KRaft cluster. ");
- records.add(NONE.toRecord());
- }
-
activationMessageConsumer.accept(logMessageBuilder.toString().trim());
if (metadataVersion.isMetadataTransactionSupported()) {
records.add(new ApiMessageAndVersion(new EndTransactionRecord(), (short) 0));
@@ -108,7 +99,6 @@ public class ActivationRecordsGenerator {
static ControllerResult recordsForNonEmptyLog(
Consumer activationMessageConsumer,
long transactionStartOffset,
- ZkMigrationState zkMigrationState,
MetadataVersion curMetadataVersion
) {
StringBuilder logMessageBuilder = new StringBuilder("Performing controller activation. ");
@@ -139,24 +129,6 @@ public class ActivationRecordsGenerator {
.append(". ");
}
- if (curMetadataVersion.isMigrationSupported()) {
- if (zkMigrationState == NONE || zkMigrationState == POST_MIGRATION) {
- logMessageBuilder
- .append("Loaded ZK migration state of ")
- .append(zkMigrationState)
- .append(". ");
- if (zkMigrationState == NONE) {
- logMessageBuilder.append("This is expected because this is a de-novo KRaft cluster.");
- }
- } else {
- throw new RuntimeException("Cannot load ZkMigrationState." + zkMigrationState +
- " because ZK migration is no longer supported.");
- }
- } else if (zkMigrationState != NONE) {
- throw new RuntimeException("Should not have ZkMigrationState." + zkMigrationState +
- " on a cluster running metadata version " + curMetadataVersion + ".");
- }
-
activationMessageConsumer.accept(logMessageBuilder.toString().trim());
return ControllerResult.atomicOf(records, null);
}
@@ -176,15 +148,13 @@ public class ActivationRecordsGenerator {
boolean isEmpty,
long transactionStartOffset,
BootstrapMetadata bootstrapMetadata,
- ZkMigrationState zkMigrationState,
MetadataVersion curMetadataVersion
) {
if (isEmpty) {
return recordsForEmptyLog(activationMessageConsumer, transactionStartOffset,
bootstrapMetadata, bootstrapMetadata.metadataVersion());
} else {
- return recordsForNonEmptyLog(activationMessageConsumer, transactionStartOffset,
- zkMigrationState, curMetadataVersion);
+ return recordsForNonEmptyLog(activationMessageConsumer, transactionStartOffset, curMetadataVersion);
}
}
}
diff --git a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
index 9d3481cee7e..4eb8dfd22b3 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
@@ -19,13 +19,11 @@ package org.apache.kafka.controller;
import org.apache.kafka.clients.admin.FeatureUpdate;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
-import org.apache.kafka.common.metadata.ZkMigrationStateRecord;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.metadata.FinalizedControllerFeatures;
import org.apache.kafka.metadata.VersionRange;
-import org.apache.kafka.metadata.migration.ZkMigrationState;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.Feature;
import org.apache.kafka.server.common.MetadataVersion;
@@ -137,11 +135,6 @@ public class FeatureControlManager {
*/
private final TimelineObject metadataVersion;
- /**
- * The current ZK migration state
- */
- private final TimelineObject migrationControlState;
-
/**
* The minimum bootstrap version that we can't downgrade before.
*/
@@ -165,7 +158,6 @@ public class FeatureControlManager {
this.finalizedVersions = new TimelineHashMap<>(snapshotRegistry, 0);
this.metadataVersion = new TimelineObject<>(snapshotRegistry, metadataVersion);
this.minimumBootstrapVersion = minimumBootstrapVersion;
- this.migrationControlState = new TimelineObject<>(snapshotRegistry, ZkMigrationState.NONE);
this.clusterSupportDescriber = clusterSupportDescriber;
}
@@ -200,10 +192,6 @@ public class FeatureControlManager {
return metadataVersion.get();
}
- ZkMigrationState zkMigrationState() {
- return migrationControlState.get();
- }
-
private ApiError updateFeature(
String featureName,
short newVersion,
@@ -335,7 +323,6 @@ public class FeatureControlManager {
Consumer recordConsumer
) {
MetadataVersion currentVersion = metadataVersion();
- ZkMigrationState zkMigrationState = zkMigrationState();
final MetadataVersion newVersion;
try {
newVersion = MetadataVersion.fromFeatureLevel(newVersionLevel);
@@ -343,12 +330,6 @@ public class FeatureControlManager {
return invalidMetadataVersion(newVersionLevel, "Unknown metadata.version.");
}
- // Don't allow metadata.version changes while we're migrating
- if (zkMigrationState.inProgress()) {
- return invalidMetadataVersion(newVersionLevel, "Unable to modify metadata.version while a " +
- "ZK migration is in progress.");
- }
-
// We cannot set a version earlier than IBP_3_3_IV0, since that was the first version that contained
// FeatureLevelRecord itself.
if (newVersion.isLessThan(minimumBootstrapVersion)) {
@@ -427,19 +408,6 @@ public class FeatureControlManager {
}
}
- public void replay(ZkMigrationStateRecord record) {
- ZkMigrationState newState = ZkMigrationState.of(record.zkMigrationState());
- ZkMigrationState previousState = migrationControlState.get();
- if (previousState.equals(newState)) {
- log.debug("Replayed a ZkMigrationStateRecord which did not alter the state from {}.",
- previousState);
- } else {
- migrationControlState.set(newState);
- log.info("Replayed a ZkMigrationStateRecord changing the migration state from {} to {}.",
- previousState, newState);
- }
- }
-
boolean isControllerId(int nodeId) {
return quorumFeatures.isControllerId(nodeId);
}
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index aaffb9084ef..3e3b362c85b 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -82,7 +82,6 @@ import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
import org.apache.kafka.common.metadata.UserScramCredentialRecord;
-import org.apache.kafka.common.metadata.ZkMigrationStateRecord;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaEntity;
@@ -1139,7 +1138,6 @@ public final class QuorumController implements Controller {
logReplayTracker.empty(),
offsetControl.transactionStartOffset(),
bootstrapMetadata,
- featureControl.zkMigrationState(),
featureControl.metadataVersion());
} catch (Throwable t) {
throw fatalFaultHandler.handleFault("exception while completing controller " +
@@ -1257,7 +1255,9 @@ public final class QuorumController implements Controller {
// NoOpRecord is an empty record and doesn't need to be replayed
break;
case ZK_MIGRATION_STATE_RECORD:
- featureControl.replay((ZkMigrationStateRecord) message);
+ // In 4.0, although migration is no longer supported and ZK has been removed from Kafka,
+ // users might migrate from ZK to KRaft in version 3.x and then perform a rolling upgrade to 4.0.
+ // Therefore, this case needs to be retained but will be a no-op.
break;
case BEGIN_TRANSACTION_RECORD:
offsetControl.replay((BeginTransactionRecord) message, offset);
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java
index 9b79b576044..cc615c1966e 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java
@@ -17,7 +17,6 @@
package org.apache.kafka.controller;
-import org.apache.kafka.metadata.ControllerRegistration;
import org.apache.kafka.metadata.VersionRange;
import org.apache.kafka.server.common.Feature;
import org.apache.kafka.server.common.MetadataVersion;
@@ -110,27 +109,6 @@ public final class QuorumFeatures {
localSupportedFeature(featureName));
}
- public Optional reasonAllControllersZkMigrationNotReady(
- MetadataVersion metadataVersion,
- Map controllers
- ) {
- if (!metadataVersion.isMigrationSupported()) {
- return Optional.of("The metadata.version too low at " + metadataVersion);
- } else if (!metadataVersion.isControllerRegistrationSupported()) {
- return Optional.empty();
- }
- for (int quorumNodeId : quorumNodeIds) {
- ControllerRegistration registration = controllers.get(quorumNodeId);
- if (registration == null) {
- return Optional.of("No registration found for controller " + quorumNodeId);
- } else if (!registration.zkMigrationReady()) {
- return Optional.of("Controller " + quorumNodeId + " has not enabled " +
- "zookeeper.metadata.migration.enable");
- }
- }
- return Optional.empty();
- }
-
@Override
public int hashCode() {
return Objects.hash(nodeId, localSupportedFeatures, quorumNodeIds);
diff --git a/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsPublisher.java b/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsPublisher.java
index 7459fe657af..c4aec110793 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsPublisher.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsPublisher.java
@@ -115,9 +115,6 @@ public class ControllerMetadataMetricsPublisher implements MetadataPublisher {
}
}
changes.apply(metrics);
- if (delta.featuresDelta() != null) {
- delta.featuresDelta().getZkMigrationStateChange().ifPresent(state -> metrics.setZkMigrationState(state.value()));
- }
}
private void publishSnapshot(MetadataImage newImage) {
@@ -156,7 +153,6 @@ public class ControllerMetadataMetricsPublisher implements MetadataPublisher {
metrics.setGlobalPartitionCount(totalPartitions);
metrics.setOfflinePartitionCount(offlinePartitions);
metrics.setPreferredReplicaImbalanceCount(partitionsWithoutPreferredLeader);
- metrics.setZkMigrationState(newImage.features().zkMigrationState().value());
}
@Override
diff --git a/metadata/src/main/java/org/apache/kafka/image/FeaturesDelta.java b/metadata/src/main/java/org/apache/kafka/image/FeaturesDelta.java
index 66e371835fa..587e42d7c98 100644
--- a/metadata/src/main/java/org/apache/kafka/image/FeaturesDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/FeaturesDelta.java
@@ -18,8 +18,6 @@
package org.apache.kafka.image;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
-import org.apache.kafka.common.metadata.ZkMigrationStateRecord;
-import org.apache.kafka.metadata.migration.ZkMigrationState;
import org.apache.kafka.server.common.MetadataVersion;
import java.util.HashMap;
@@ -38,8 +36,6 @@ public final class FeaturesDelta {
private MetadataVersion metadataVersionChange = null;
- private ZkMigrationState zkMigrationStateChange = null;
-
public FeaturesDelta(FeaturesImage image) {
this.image = image;
}
@@ -48,10 +44,6 @@ public final class FeaturesDelta {
return changes;
}
- public Optional getZkMigrationStateChange() {
- return Optional.ofNullable(zkMigrationStateChange);
- }
-
public Optional metadataVersionChange() {
return Optional.ofNullable(metadataVersionChange);
}
@@ -76,10 +68,6 @@ public final class FeaturesDelta {
}
}
- public void replay(ZkMigrationStateRecord record) {
- this.zkMigrationStateChange = ZkMigrationState.of(record.zkMigrationState());
- }
-
public FeaturesImage apply() {
Map newFinalizedVersions =
new HashMap<>(image.finalizedVersions().size());
@@ -109,13 +97,7 @@ public final class FeaturesDelta {
metadataVersion = metadataVersionChange;
}
- final ZkMigrationState zkMigrationState;
- if (zkMigrationStateChange == null) {
- zkMigrationState = image.zkMigrationState();
- } else {
- zkMigrationState = zkMigrationStateChange;
- }
- return new FeaturesImage(newFinalizedVersions, metadataVersion, zkMigrationState);
+ return new FeaturesImage(newFinalizedVersions, metadataVersion);
}
@Override
@@ -123,7 +105,6 @@ public final class FeaturesDelta {
return "FeaturesDelta(" +
"changes=" + changes +
", metadataVersionChange=" + metadataVersionChange +
- ", zkMigrationStateChange=" + zkMigrationStateChange +
')';
}
}
diff --git a/metadata/src/main/java/org/apache/kafka/image/FeaturesImage.java b/metadata/src/main/java/org/apache/kafka/image/FeaturesImage.java
index cbdb5c6489a..c1729b3b082 100644
--- a/metadata/src/main/java/org/apache/kafka/image/FeaturesImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/FeaturesImage.java
@@ -21,7 +21,6 @@ import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.image.node.FeaturesImageNode;
import org.apache.kafka.image.writer.ImageWriter;
import org.apache.kafka.image.writer.ImageWriterOptions;
-import org.apache.kafka.metadata.migration.ZkMigrationState;
import org.apache.kafka.server.common.MetadataVersion;
import java.util.ArrayList;
@@ -41,30 +40,23 @@ import java.util.Optional;
public final class FeaturesImage {
public static final FeaturesImage EMPTY = new FeaturesImage(
Collections.emptyMap(),
- MetadataVersion.MINIMUM_KRAFT_VERSION,
- ZkMigrationState.NONE
+ MetadataVersion.MINIMUM_KRAFT_VERSION
);
private final Map finalizedVersions;
private final MetadataVersion metadataVersion;
- private final ZkMigrationState zkMigrationState;
-
public FeaturesImage(
Map finalizedVersions,
- MetadataVersion metadataVersion,
- ZkMigrationState zkMigrationState
- ) {
+ MetadataVersion metadataVersion) {
this.finalizedVersions = Collections.unmodifiableMap(finalizedVersions);
this.metadataVersion = metadataVersion;
- this.zkMigrationState = zkMigrationState;
}
public boolean isEmpty() {
return finalizedVersions.isEmpty() &&
- metadataVersion.equals(MetadataVersion.MINIMUM_KRAFT_VERSION) &&
- zkMigrationState.equals(ZkMigrationState.NONE);
+ metadataVersion.equals(MetadataVersion.MINIMUM_KRAFT_VERSION);
}
public MetadataVersion metadataVersion() {
@@ -75,10 +67,6 @@ public final class FeaturesImage {
return finalizedVersions;
}
- public ZkMigrationState zkMigrationState() {
- return zkMigrationState;
- }
-
private Optional finalizedVersion(String feature) {
return Optional.ofNullable(finalizedVersions.get(feature));
}
@@ -89,14 +77,6 @@ public final class FeaturesImage {
} else {
writeFeatureLevels(writer, options);
}
-
- if (options.metadataVersion().isMigrationSupported()) {
- writer.write(0, zkMigrationState.toRecord().message());
- } else {
- if (!zkMigrationState.equals(ZkMigrationState.NONE)) {
- options.handleLoss("the ZK Migration state which was " + zkMigrationState);
- }
- }
}
private void handleFeatureLevelNotSupported(ImageWriterOptions options) {
@@ -131,7 +111,7 @@ public final class FeaturesImage {
@Override
public int hashCode() {
- return Objects.hash(finalizedVersions, metadataVersion, zkMigrationState);
+ return Objects.hash(finalizedVersions, metadataVersion);
}
@Override
@@ -139,8 +119,7 @@ public final class FeaturesImage {
if (!(o instanceof FeaturesImage)) return false;
FeaturesImage other = (FeaturesImage) o;
return finalizedVersions.equals(other.finalizedVersions) &&
- metadataVersion.equals(other.metadataVersion) &&
- zkMigrationState.equals(other.zkMigrationState);
+ metadataVersion.equals(other.metadataVersion);
}
@Override
diff --git a/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java b/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java
index b4120ad8595..ae021a6f2fb 100644
--- a/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java
@@ -38,7 +38,6 @@ import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
import org.apache.kafka.common.metadata.UserScramCredentialRecord;
-import org.apache.kafka.common.metadata.ZkMigrationStateRecord;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.server.common.MetadataVersion;
@@ -247,7 +246,9 @@ public final class MetadataDelta {
*/
break;
case ZK_MIGRATION_STATE_RECORD:
- replay((ZkMigrationStateRecord) record);
+ // In 4.0, although migration is no longer supported and ZK has been removed from Kafka,
+ // users might migrate from ZK to KRaft in version 3.x and then perform a rolling upgrade to 4.0.
+ // Therefore, this case needs to be retained but will be a no-op.
break;
case REGISTER_CONTROLLER_RECORD:
replay((RegisterControllerRecord) record);
@@ -345,10 +346,6 @@ public final class MetadataDelta {
getOrCreateScramDelta().replay(record);
}
- public void replay(ZkMigrationStateRecord record) {
- getOrCreateFeaturesDelta().replay(record);
- }
-
public void replay(RegisterControllerRecord record) {
getOrCreateClusterDelta().replay(record);
}
diff --git a/metadata/src/main/java/org/apache/kafka/image/node/FeaturesImageNode.java b/metadata/src/main/java/org/apache/kafka/image/node/FeaturesImageNode.java
index 8882c7fe425..286e31dba0a 100644
--- a/metadata/src/main/java/org/apache/kafka/image/node/FeaturesImageNode.java
+++ b/metadata/src/main/java/org/apache/kafka/image/node/FeaturesImageNode.java
@@ -68,8 +68,6 @@ public class FeaturesImageNode implements MetadataNode {
public MetadataNode child(String name) {
if (name.equals(METADATA_VERSION)) {
return new MetadataLeafNode(image.metadataVersion().toString());
- } else if (name.equals(ZK_MIGRATION_STATE)) {
- return new MetadataLeafNode(image.zkMigrationState().toString());
} else if (name.startsWith(FINALIZED_PREFIX)) {
String key = name.substring(FINALIZED_PREFIX.length());
return new MetadataLeafNode(
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkMigrationLeadershipState.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkMigrationLeadershipState.java
deleted file mode 100644
index 15b8b789ae7..00000000000
--- a/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkMigrationLeadershipState.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.metadata.migration;
-
-import org.apache.kafka.raft.OffsetAndEpoch;
-
-import java.util.Objects;
-
-/**
- * Persistent state needed to recover an ongoing migration. This data is stored in ZooKeeper under the "/migration"
- * ZNode and is recovered by the active KRaft controller following an election. The absence of this data in ZK indicates
- * that no migration has been started.
- */
-public class ZkMigrationLeadershipState {
- /**
- * A Kafka-internal constant used to indicate that the znode version is unknown. See ZkVersion.UnknownVersion.
- */
- public static final int UNKNOWN_ZK_VERSION = -2;
-
- // Use -2 as sentinel for "unknown version" for ZK versions to avoid sending an actual -1 "any version"
- // when doing ZK writes
- public static final ZkMigrationLeadershipState EMPTY =
- new ZkMigrationLeadershipState(-1, -1, -1, -1, -1, -2, -1, UNKNOWN_ZK_VERSION);
-
- private final int kraftControllerId;
-
- private final int kraftControllerEpoch;
-
- private final long kraftMetadataOffset;
-
- private final int kraftMetadataEpoch;
-
- private final long lastUpdatedTimeMs;
-
- private final int migrationZkVersion;
-
- private final int zkControllerEpoch;
-
- private final int zkControllerEpochZkVersion;
-
-
- public ZkMigrationLeadershipState(int kraftControllerId, int kraftControllerEpoch,
- long kraftMetadataOffset, int kraftMetadataEpoch,
- long lastUpdatedTimeMs, int migrationZkVersion,
- int zkControllerEpoch, int zkControllerEpochZkVersion) {
- this.kraftControllerId = kraftControllerId;
- this.kraftControllerEpoch = kraftControllerEpoch;
- this.kraftMetadataOffset = kraftMetadataOffset;
- this.kraftMetadataEpoch = kraftMetadataEpoch;
- this.lastUpdatedTimeMs = lastUpdatedTimeMs;
- this.migrationZkVersion = migrationZkVersion;
- this.zkControllerEpoch = zkControllerEpoch;
- this.zkControllerEpochZkVersion = zkControllerEpochZkVersion;
- }
-
- public ZkMigrationLeadershipState withMigrationZkVersion(int zkVersion) {
- return new ZkMigrationLeadershipState(
- this.kraftControllerId, this.kraftControllerEpoch, this.kraftMetadataOffset,
- this.kraftMetadataEpoch, this.lastUpdatedTimeMs, zkVersion, this.zkControllerEpoch, this.zkControllerEpochZkVersion);
- }
-
- public ZkMigrationLeadershipState withZkController(int zkControllerEpoch, int zkControllerEpochZkVersion) {
- return new ZkMigrationLeadershipState(
- this.kraftControllerId, this.kraftControllerEpoch, this.kraftMetadataOffset,
- this.kraftMetadataEpoch, this.lastUpdatedTimeMs, this.migrationZkVersion, zkControllerEpoch, zkControllerEpochZkVersion);
- }
-
- public ZkMigrationLeadershipState withUnknownZkController() {
- return withZkController(EMPTY.zkControllerEpoch, EMPTY.zkControllerEpochZkVersion);
- }
-
-
- public ZkMigrationLeadershipState withNewKRaftController(int controllerId, int controllerEpoch) {
- return new ZkMigrationLeadershipState(
- controllerId, controllerEpoch, this.kraftMetadataOffset,
- this.kraftMetadataEpoch, this.lastUpdatedTimeMs, this.migrationZkVersion, this.zkControllerEpoch, this.zkControllerEpochZkVersion);
- }
-
- public ZkMigrationLeadershipState withKRaftMetadataOffsetAndEpoch(long metadataOffset,
- int metadataEpoch) {
- return new ZkMigrationLeadershipState(
- this.kraftControllerId,
- this.kraftControllerEpoch,
- metadataOffset,
- metadataEpoch,
- this.lastUpdatedTimeMs,
- this.migrationZkVersion,
- this.zkControllerEpoch,
- this.zkControllerEpochZkVersion);
- }
-
- public int kraftControllerId() {
- return kraftControllerId;
- }
-
- public int kraftControllerEpoch() {
- return kraftControllerEpoch;
- }
-
- public long kraftMetadataOffset() {
- return kraftMetadataOffset;
- }
-
- public int kraftMetadataEpoch() {
- return kraftMetadataEpoch;
- }
-
- public long lastUpdatedTimeMs() {
- return lastUpdatedTimeMs;
- }
-
- public int migrationZkVersion() {
- return migrationZkVersion;
- }
-
- public int zkControllerEpoch() {
- return zkControllerEpoch;
- }
-
- public int zkControllerEpochZkVersion() {
- return zkControllerEpochZkVersion;
- }
-
- public boolean initialZkMigrationComplete() {
- return kraftMetadataOffset > 0;
- }
-
- public OffsetAndEpoch offsetAndEpoch() {
- return new OffsetAndEpoch(kraftMetadataOffset, kraftMetadataEpoch);
- }
-
- public boolean loggableChangeSinceState(ZkMigrationLeadershipState other) {
- if (other == null) {
- return false;
- }
- if (this.equals(other)) {
- return false;
- } else {
- // Did the controller change, or did we finish the migration?
- return
- this.kraftControllerId != other.kraftControllerId ||
- this.kraftControllerEpoch != other.kraftControllerEpoch ||
- (!other.initialZkMigrationComplete() && this.initialZkMigrationComplete());
- }
- }
-
- @Override
- public String toString() {
- return "ZkMigrationLeadershipState{" +
- "kraftControllerId=" + kraftControllerId +
- ", kraftControllerEpoch=" + kraftControllerEpoch +
- ", kraftMetadataOffset=" + kraftMetadataOffset +
- ", kraftMetadataEpoch=" + kraftMetadataEpoch +
- ", lastUpdatedTimeMs=" + lastUpdatedTimeMs +
- ", migrationZkVersion=" + migrationZkVersion +
- ", controllerZkEpoch=" + zkControllerEpoch +
- ", controllerZkVersion=" + zkControllerEpochZkVersion +
- '}';
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- ZkMigrationLeadershipState that = (ZkMigrationLeadershipState) o;
- return kraftControllerId == that.kraftControllerId
- && kraftControllerEpoch == that.kraftControllerEpoch
- && kraftMetadataOffset == that.kraftMetadataOffset
- && kraftMetadataEpoch == that.kraftMetadataEpoch
- && lastUpdatedTimeMs == that.lastUpdatedTimeMs
- && migrationZkVersion == that.migrationZkVersion
- && zkControllerEpoch == that.zkControllerEpoch
- && zkControllerEpochZkVersion == that.zkControllerEpochZkVersion;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(
- kraftControllerId,
- kraftControllerEpoch,
- kraftMetadataOffset,
- kraftMetadataEpoch,
- lastUpdatedTimeMs,
- migrationZkVersion,
- zkControllerEpoch,
- zkControllerEpochZkVersion);
- }
-}
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkMigrationState.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkMigrationState.java
deleted file mode 100644
index ff8ebd08b38..00000000000
--- a/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkMigrationState.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.metadata.migration;
-
-import org.apache.kafka.common.metadata.ZkMigrationStateRecord;
-import org.apache.kafka.server.common.ApiMessageAndVersion;
-
-import java.util.Optional;
-
-/**
- * The cluster-wide ZooKeeper migration state.
- *
- * An enumeration of the possible states of the ZkMigrationState field in ZkMigrationStateRecord.
- * This information is persisted in the metadata log and image.
- *
- * @see org.apache.kafka.common.metadata.ZkMigrationStateRecord
- */
-public enum ZkMigrationState {
- /**
- * The cluster was created in KRaft mode. A cluster that was created in ZK mode can never attain
- * this state; the endpoint of migration is POST_MIGRATION, instead. This value is also used as
- * the default migration state in an empty metadata log.
- */
- NONE((byte) 0),
-
- /**
- * A KRaft controller has been elected with "zookeeper.metadata.migration.enable" set to "true".
- * The controller is now awaiting the preconditions for starting the migration to KRaft. In this
- * state, the metadata log does not yet contain the cluster's data. There is a metadata quorum,
- * but it is not doing anything useful yet.
- *
- * In Kafka 3.4, PRE_MIGRATION was written out as value 1 to the log, but no MIGRATION state
- * was ever written. Since this would be an invalid log state in 3.5+, we have swapped the
- * enum values for PRE_MIGRATION and MIGRATION. This allows us to handle the upgrade case
- * from 3.4 without adding additional fields to the migration record.
- */
- PRE_MIGRATION((byte) 2),
-
- /**
- * The ZK data has been migrated, and the KRaft controller is now writing metadata to both ZK
- * and the metadata log. The controller will remain in this state until all the brokers have
- * been restarted in KRaft mode.
- */
- MIGRATION((byte) 1),
-
- /**
- * The migration from ZK has been fully completed. The cluster is running in KRaft mode. This state
- * will persist indefinitely after the migration. In operational terms, this is the same as the NONE
- * state.
- */
- POST_MIGRATION((byte) 3),
-
- /**
- * The controller is a ZK controller. No migration has been performed. This state is never persisted
- * and is only used by KafkaController in order to have a unified metric that indicates what kind of
- * metadata state the controller is in.
- */
- ZK((byte) 4);
-
- private final byte value;
-
- ZkMigrationState(byte value) {
- this.value = value;
- }
-
- public byte value() {
- return value;
- }
-
- public ApiMessageAndVersion toRecord() {
- return new ApiMessageAndVersion(
- new ZkMigrationStateRecord().setZkMigrationState(value()),
- (short) 0
- );
- }
-
- public static ZkMigrationState of(byte value) {
- return optionalOf(value)
- .orElseThrow(() -> new IllegalArgumentException(String.format("Value %s is not a valid Zk migration state", value)));
- }
-
- public static Optional optionalOf(byte value) {
- for (ZkMigrationState state : ZkMigrationState.values()) {
- if (state.value == value) {
- return Optional.of(state);
- }
- }
- return Optional.empty();
- }
-
- public boolean inProgress() {
- return this == PRE_MIGRATION || this == MIGRATION;
- }
-}
diff --git a/metadata/src/main/resources/common/metadata/ZkMigrationRecord.json b/metadata/src/main/resources/common/metadata/ZkMigrationRecord.json
index aaaed4f4a08..7d7a61626dd 100644
--- a/metadata/src/main/resources/common/metadata/ZkMigrationRecord.json
+++ b/metadata/src/main/resources/common/metadata/ZkMigrationRecord.json
@@ -23,7 +23,9 @@
// In 3.4, the defined values are: 0 (None), 1 (PreMigration), 2 (Migration), 3 (PostMigration).
// In 3.5, the values for PreMigration and Migration were swapped: 0 (None), 2 (PreMigration), 1 (Migration), 3 (PostMigration).
// This was done to work around the fact that we never wrote Migration or PostMigration records in 3.4
- //
+ // In 4.0, although migration is no longer supported and ZK has been removed from Kafka,
+ // users might migrate from ZK to KRaft in version 3.x and then perform a rolling upgrade to 4.0.
+ // Therefore, this generated code needs to be retained.
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ActivationRecordsGeneratorTest.java b/metadata/src/test/java/org/apache/kafka/controller/ActivationRecordsGeneratorTest.java
index 5f948a79885..48cdd3bdeda 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ActivationRecordsGeneratorTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ActivationRecordsGeneratorTest.java
@@ -18,16 +18,13 @@
package org.apache.kafka.controller;
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
-import org.apache.kafka.metadata.migration.ZkMigrationState;
import org.apache.kafka.server.common.MetadataVersion;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
/**
* This class is for testing the log message or exception produced by ActivationRecordsGenerator. For tests that
@@ -51,140 +48,35 @@ public class ActivationRecordsGeneratorTest {
result = ActivationRecordsGenerator.recordsForEmptyLog(
logMsg -> assertEquals("Performing controller activation. The metadata log appears to be empty. " +
"Appending 1 bootstrap record(s) at metadata.version 3.4-IV0 from bootstrap " +
- "source 'test'. Setting the ZK migration state to NONE since this is a de-novo KRaft cluster.", logMsg),
+ "source 'test'.", logMsg),
-1L,
BootstrapMetadata.fromVersion(MetadataVersion.IBP_3_4_IV0, "test"),
MetadataVersion.IBP_3_4_IV0
);
assertTrue(result.isAtomic());
- assertEquals(2, result.records().size());
+ assertEquals(1, result.records().size());
result = ActivationRecordsGenerator.recordsForEmptyLog(
logMsg -> assertEquals("Performing controller activation. The metadata log appears to be empty. " +
"Appending 1 bootstrap record(s) in metadata transaction at metadata.version 3.6-IV1 from bootstrap " +
- "source 'test'. Setting the ZK migration state to NONE since this is a de-novo KRaft cluster.", logMsg),
+ "source 'test'.", logMsg),
-1L,
BootstrapMetadata.fromVersion(MetadataVersion.IBP_3_6_IV1, "test"),
MetadataVersion.IBP_3_6_IV1
);
assertFalse(result.isAtomic());
- assertEquals(4, result.records().size());
+ assertEquals(3, result.records().size());
result = ActivationRecordsGenerator.recordsForEmptyLog(
logMsg -> assertEquals("Performing controller activation. Aborting partial bootstrap records " +
"transaction at offset 0. Re-appending 1 bootstrap record(s) in new metadata transaction at " +
- "metadata.version 3.6-IV1 from bootstrap source 'test'. Setting the ZK migration state to NONE " +
- "since this is a de-novo KRaft cluster.", logMsg),
+ "metadata.version 3.6-IV1 from bootstrap source 'test'.", logMsg),
0L,
BootstrapMetadata.fromVersion(MetadataVersion.IBP_3_6_IV1, "test"),
MetadataVersion.IBP_3_6_IV1
);
assertFalse(result.isAtomic());
- assertEquals(5, result.records().size());
- }
-
- @Test
- public void testActivationMessageForNonEmptyLogNoMigrations() {
- ControllerResult result;
-
- result = ActivationRecordsGenerator.recordsForNonEmptyLog(
- logMsg -> assertEquals("Performing controller activation. No metadata.version feature level " +
- "record was found in the log. Treating the log as version 3.0-IV1.", logMsg),
- -1L,
- ZkMigrationState.NONE,
- MetadataVersion.MINIMUM_KRAFT_VERSION
- );
- assertTrue(result.isAtomic());
- assertEquals(0, result.records().size());
-
- result = ActivationRecordsGenerator.recordsForNonEmptyLog(
- logMsg -> assertEquals("Performing controller activation.", logMsg),
- -1L,
- ZkMigrationState.NONE,
- MetadataVersion.IBP_3_3_IV0
- );
- assertTrue(result.isAtomic());
- assertEquals(0, result.records().size());
-
- result = ActivationRecordsGenerator.recordsForNonEmptyLog(
- logMsg -> assertEquals("Performing controller activation. Loaded ZK migration state of NONE. "
- + "This is expected because this is a de-novo KRaft cluster.", logMsg),
- -1L,
- ZkMigrationState.NONE,
- MetadataVersion.IBP_3_4_IV0
- );
- assertTrue(result.isAtomic());
- assertEquals(0, result.records().size());
-
- result = ActivationRecordsGenerator.recordsForNonEmptyLog(
- logMsg -> assertEquals("Performing controller activation. Aborting in-progress metadata " +
- "transaction at offset 42. Loaded ZK migration state of NONE. " +
- "This is expected because this is a de-novo KRaft cluster.", logMsg),
- 42L,
- ZkMigrationState.NONE,
- MetadataVersion.IBP_3_6_IV1
- );
- assertTrue(result.isAtomic());
- assertEquals(1, result.records().size());
-
- assertEquals(
- "Detected in-progress transaction at offset 42, but the metadata.version 3.6-IV0 does not support " +
- "transactions. Cannot continue.",
- assertThrows(RuntimeException.class, () ->
- ActivationRecordsGenerator.recordsForNonEmptyLog(
- logMsg -> fail(),
- 42L,
- ZkMigrationState.NONE,
- MetadataVersion.IBP_3_6_IV0
- )).getMessage()
- );
- }
-
- @Test
- public void testActivationMessageForNonEmptyLogWithMigrations() {
- assertEquals(
- "Should not have ZkMigrationState.MIGRATION on a cluster running metadata version 3.3-IV0.",
- assertThrows(RuntimeException.class, () ->
- ActivationRecordsGenerator.recordsForNonEmptyLog(
- logMsg -> fail(),
- -1L,
- ZkMigrationState.MIGRATION,
- MetadataVersion.IBP_3_3_IV0
- )).getMessage()
- );
-
- assertEquals(
- "Cannot load ZkMigrationState.MIGRATION because ZK migration is no longer supported.",
- assertThrows(RuntimeException.class, () ->
- ActivationRecordsGenerator.recordsForNonEmptyLog(
- logMsg -> fail(),
- -1L,
- ZkMigrationState.MIGRATION,
- MetadataVersion.IBP_3_9_IV0
- )
- ).getMessage()
- );
-
- ControllerResult result;
- result = ActivationRecordsGenerator.recordsForNonEmptyLog(
- logMsg -> assertEquals("Performing controller activation. Loaded ZK migration state of " +
- "POST_MIGRATION.", logMsg),
- -1L,
- ZkMigrationState.POST_MIGRATION,
- MetadataVersion.IBP_3_4_IV0
- );
- assertTrue(result.isAtomic());
- assertEquals(0, result.records().size());
-
- result = ActivationRecordsGenerator.recordsForNonEmptyLog(
- logMsg -> assertEquals("Performing controller activation. Aborting in-progress metadata " +
- "transaction at offset 42. Loaded ZK migration state of POST_MIGRATION.", logMsg),
- 42L,
- ZkMigrationState.POST_MIGRATION,
- MetadataVersion.IBP_3_6_IV1
- );
- assertTrue(result.isAtomic());
- assertEquals(1, result.records().size());
+ assertEquals(4, result.records().size());
}
}
diff --git a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
index 31221da79ef..e2d3970445e 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
@@ -25,8 +25,6 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.metadata.FinalizedControllerFeatures;
import org.apache.kafka.metadata.RecordTestUtils;
import org.apache.kafka.metadata.VersionRange;
-import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
-import org.apache.kafka.metadata.migration.ZkMigrationState;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.Feature;
import org.apache.kafka.server.common.MetadataVersion;
@@ -408,40 +406,4 @@ public class FeatureControlManagerTest {
RecordTestUtils.replayAll(manager, result2.records());
assertEquals(Optional.empty(), manager.finalizedFeatures(Long.MAX_VALUE).get(Feature.TEST_VERSION.featureName()));
}
-
- @Test
- public void testNoMetadataVersionChangeDuringMigration() {
- FeatureControlManager manager = new FeatureControlManager.Builder().
- setQuorumFeatures(features(MetadataVersion.FEATURE_NAME,
- MetadataVersion.IBP_3_0_IV0.featureLevel(), MetadataVersion.IBP_3_5_IV1.featureLevel())).
- setMetadataVersion(MetadataVersion.IBP_3_4_IV0).
- build();
- BootstrapMetadata bootstrapMetadata = BootstrapMetadata.fromVersion(MetadataVersion.IBP_3_4_IV0, "FeatureControlManagerTest");
- RecordTestUtils.replayAll(manager, bootstrapMetadata.records());
- RecordTestUtils.replayOne(manager, ZkMigrationState.PRE_MIGRATION.toRecord());
-
- assertEquals(ControllerResult.of(Collections.emptyList(), new ApiError(Errors.INVALID_UPDATE_VERSION,
- "Invalid metadata.version 10. Unable to modify metadata.version while a ZK migration is in progress.")),
- manager.updateFeatures(
- singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_5_IV1.featureLevel()),
- singletonMap(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.UPGRADE),
- true));
-
- assertEquals(ControllerResult.of(Collections.emptyList(), new ApiError(Errors.INVALID_UPDATE_VERSION,
- "Invalid metadata.version 4. Unable to modify metadata.version while a ZK migration is in progress.")),
- manager.updateFeatures(
- singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_3_IV0.featureLevel()),
- singletonMap(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.SAFE_DOWNGRADE),
- true));
-
- // Complete the migration
- RecordTestUtils.replayOne(manager, ZkMigrationState.POST_MIGRATION.toRecord());
- ControllerResult result = manager.updateFeatures(
- singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_5_IV1.featureLevel()),
- singletonMap(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.UPGRADE),
- false);
- assertEquals(ApiError.NONE, result.response());
- RecordTestUtils.replayAll(manager, result.records());
- assertEquals(MetadataVersion.IBP_3_5_IV1, manager.metadataVersion());
- }
}
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index 5b8cb44f0a6..7c712c40598 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -60,7 +60,6 @@ import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpointColle
import org.apache.kafka.common.metadata.RegisterControllerRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
-import org.apache.kafka.common.metadata.ZkMigrationStateRecord;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AlterPartitionRequest;
@@ -99,7 +98,6 @@ import org.apache.kafka.metadata.RecordTestUtils.ImageDeltaPair;
import org.apache.kafka.metadata.RecordTestUtils.TestThroughAllIntermediateImagesLeadingToFinalImageHelper;
import org.apache.kafka.metadata.authorizer.StandardAuthorizer;
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
-import org.apache.kafka.metadata.migration.ZkMigrationState;
import org.apache.kafka.metadata.util.BatchFileWriter;
import org.apache.kafka.metalog.LocalLogManager;
import org.apache.kafka.metalog.LocalLogManagerTestEnv;
@@ -814,7 +812,7 @@ public class QuorumControllerTest {
BrokerRegistrationReply reply = active.registerBroker(
ANONYMOUS_CONTEXT,
request).get();
- assertTrue(reply.epoch() >= 5, "Unexpected broker epoch " + reply.epoch());
+ assertTrue(reply.epoch() >= 4, "Unexpected broker epoch " + reply.epoch());
}
}
}
@@ -841,7 +839,7 @@ public class QuorumControllerTest {
Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, EligibleLeaderReplicasVersion.ELRV_1.featureLevel()))).
setLogDirs(Collections.singletonList(Uuid.fromString("vBpaRsZVSaGsQT53wtYGtg"))).
setListeners(listeners));
- assertEquals(5L, reply.get().epoch());
+ assertEquals(4L, reply.get().epoch());
CreateTopicsRequestData createTopicsRequestData =
new CreateTopicsRequestData().setTopics(
new CreatableTopicCollection(Collections.singleton(
@@ -857,7 +855,7 @@ public class QuorumControllerTest {
get().topics().find("foo").errorMessage());
assertEquals(new BrokerHeartbeatReply(true, false, false, false),
active.processBrokerHeartbeat(ANONYMOUS_CONTEXT, new BrokerHeartbeatRequestData().
- setWantFence(false).setBrokerEpoch(5L).setBrokerId(0).
+ setWantFence(false).setBrokerEpoch(4L).setBrokerId(0).
setCurrentMetadataOffset(100000L)).get());
assertEquals(Errors.NONE.code(), active.createTopics(ANONYMOUS_CONTEXT,
createTopicsRequestData, Collections.singleton("foo")).
@@ -983,8 +981,6 @@ public class QuorumControllerTest {
new ApiMessageAndVersion(new FeatureLevelRecord().
setName(MetadataVersion.FEATURE_NAME).
setFeatureLevel(MetadataVersion.IBP_3_7_IV0.featureLevel()), (short) 0),
- new ApiMessageAndVersion(new ZkMigrationStateRecord().
- setZkMigrationState((byte) 0), (short) 0),
new ApiMessageAndVersion(new EndTransactionRecord(), (short) 0),
new ApiMessageAndVersion(new RegisterControllerRecord().
setControllerId(0).
@@ -1477,10 +1473,7 @@ public class QuorumControllerTest {
appender)).getMessage());
}
- FeatureControlManager getActivationRecords(
- MetadataVersion metadataVersion,
- Optional stateInLog
- ) {
+ FeatureControlManager getActivationRecords(MetadataVersion metadataVersion) {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
FeatureControlManager featureControlManager = new FeatureControlManager.Builder()
.setSnapshotRegistry(snapshotRegistry)
@@ -1489,10 +1482,9 @@ public class QuorumControllerTest {
ControllerResult result = ActivationRecordsGenerator.generate(
msg -> { },
- stateInLog.isEmpty(),
+ true,
-1L,
BootstrapMetadata.fromVersion(metadataVersion, "test"),
- stateInLog.orElse(ZkMigrationState.NONE),
metadataVersion);
RecordTestUtils.replayAll(featureControlManager, result.records());
return featureControlManager;
@@ -1502,34 +1494,23 @@ public class QuorumControllerTest {
public void testActivationRecords33() {
FeatureControlManager featureControl;
- featureControl = getActivationRecords(MetadataVersion.IBP_3_3_IV0, Optional.empty());
+ featureControl = getActivationRecords(MetadataVersion.IBP_3_3_IV0);
assertEquals(MetadataVersion.IBP_3_3_IV0, featureControl.metadataVersion());
- assertEquals(ZkMigrationState.NONE, featureControl.zkMigrationState());
-
- featureControl = getActivationRecords(MetadataVersion.IBP_3_3_IV0, Optional.of(ZkMigrationState.NONE));
- assertEquals(MetadataVersion.IBP_3_3_IV0, featureControl.metadataVersion());
- assertEquals(ZkMigrationState.NONE, featureControl.zkMigrationState());
}
@Test
public void testActivationRecords34() {
FeatureControlManager featureControl;
- featureControl = getActivationRecords(MetadataVersion.IBP_3_4_IV0, Optional.empty());
+ featureControl = getActivationRecords(MetadataVersion.IBP_3_4_IV0);
assertEquals(MetadataVersion.IBP_3_4_IV0, featureControl.metadataVersion());
- assertEquals(ZkMigrationState.NONE, featureControl.zkMigrationState());
-
- featureControl = getActivationRecords(MetadataVersion.IBP_3_4_IV0, Optional.of(ZkMigrationState.NONE));
- assertEquals(MetadataVersion.IBP_3_4_IV0, featureControl.metadataVersion());
- assertEquals(ZkMigrationState.NONE, featureControl.zkMigrationState());
}
@Test
public void testActivationRecordsNonEmptyLog() {
FeatureControlManager featureControl = getActivationRecords(
- MetadataVersion.IBP_3_9_IV0, Optional.empty());
+ MetadataVersion.IBP_3_9_IV0);
assertEquals(MetadataVersion.IBP_3_9_IV0, featureControl.metadataVersion());
- assertEquals(ZkMigrationState.NONE, featureControl.zkMigrationState());
}
@Test
@@ -1539,7 +1520,6 @@ public class QuorumControllerTest {
true,
0L,
BootstrapMetadata.fromVersion(MetadataVersion.IBP_3_6_IV1, "test"),
- ZkMigrationState.NONE,
MetadataVersion.IBP_3_6_IV1);
assertFalse(result.isAtomic());
assertTrue(RecordTestUtils.recordAtIndexAs(
@@ -1588,7 +1568,6 @@ public class QuorumControllerTest {
false,
offsetControlManager.transactionStartOffset(),
BootstrapMetadata.fromVersion(MetadataVersion.IBP_3_6_IV1, "test"),
- ZkMigrationState.NONE,
MetadataVersion.IBP_3_6_IV1);
assertTrue(result.isAtomic());
@@ -1612,7 +1591,6 @@ public class QuorumControllerTest {
false,
offsetControlManager.transactionStartOffset(),
BootstrapMetadata.fromVersion(MetadataVersion.IBP_3_6_IV0, "test"),
- ZkMigrationState.NONE,
MetadataVersion.IBP_3_6_IV0)
);
}
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java
index 044a1610185..2ecf2f75cfb 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java
@@ -17,10 +17,6 @@
package org.apache.kafka.controller;
-import org.apache.kafka.common.Endpoint;
-import org.apache.kafka.common.Uuid;
-import org.apache.kafka.common.security.auth.SecurityProtocol;
-import org.apache.kafka.metadata.ControllerRegistration;
import org.apache.kafka.metadata.VersionRange;
import org.apache.kafka.server.common.Feature;
import org.apache.kafka.server.common.MetadataVersion;
@@ -124,56 +120,4 @@ public class QuorumFeaturesTest {
assertTrue(QUORUM_FEATURES.isControllerId(2));
assertFalse(QUORUM_FEATURES.isControllerId(3));
}
-
- @Test
- public void testZkMigrationNotReadyIfMetadataVersionTooLow() {
- assertEquals(Optional.of("The metadata.version too low at 3.0-IV1"),
- QUORUM_FEATURES.reasonAllControllersZkMigrationNotReady(
- MetadataVersion.IBP_3_0_IV1, Collections.emptyMap()));
- }
-
- @Test
- public void testZkMigrationReadyIfControllerRegistrationNotSupported() {
- assertEquals(Optional.empty(),
- QUORUM_FEATURES.reasonAllControllersZkMigrationNotReady(
- MetadataVersion.IBP_3_4_IV0, Collections.emptyMap()));
- }
-
- @Test
- public void testZkMigrationNotReadyIfNotAllControllersRegistered() {
- assertEquals(Optional.of("No registration found for controller 0"),
- QUORUM_FEATURES.reasonAllControllersZkMigrationNotReady(
- MetadataVersion.IBP_3_7_IV0, Collections.emptyMap()));
- }
-
- @Test
- public void testZkMigrationNotReadyIfControllerNotReady() {
- assertEquals(Optional.of("Controller 0 has not enabled zookeeper.metadata.migration.enable"),
- QUORUM_FEATURES.reasonAllControllersZkMigrationNotReady(
- MetadataVersion.IBP_3_7_IV0, Collections.singletonMap(0,
- new ControllerRegistration.Builder().
- setId(0).
- setZkMigrationReady(false).
- setIncarnationId(Uuid.fromString("kCBJaDGNQk6x3y5xbtQOpg")).
- setListeners(Collections.singletonMap("CONTROLLER",
- new Endpoint("CONTROLLER", SecurityProtocol.PLAINTEXT, "localhost", 9093))).
- build())));
- }
-
- @Test
- public void testZkMigrationReadyIfAllControllersReady() {
- Map controllers = new HashMap<>();
- QUORUM_FEATURES.quorumNodeIds().forEach(id ->
- controllers.put(id,
- new ControllerRegistration.Builder().
- setId(id).
- setZkMigrationReady(true).
- setIncarnationId(Uuid.fromString("kCBJaDGNQk6x3y5xbtQOpg")).
- setListeners(Collections.singletonMap("CONTROLLER",
- new Endpoint("CONTROLLER", SecurityProtocol.PLAINTEXT, "localhost", 9093))).
- build())
- );
- assertEquals(Optional.empty(), QUORUM_FEATURES.reasonAllControllersZkMigrationNotReady(
- MetadataVersion.IBP_3_7_IV0, controllers));
- }
}
diff --git a/metadata/src/test/java/org/apache/kafka/image/FeaturesImageTest.java b/metadata/src/test/java/org/apache/kafka/image/FeaturesImageTest.java
index a23678098fb..1df5ff65563 100644
--- a/metadata/src/test/java/org/apache/kafka/image/FeaturesImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/FeaturesImageTest.java
@@ -21,7 +21,6 @@ import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.image.writer.ImageWriterOptions;
import org.apache.kafka.image.writer.RecordListWriter;
import org.apache.kafka.metadata.RecordTestUtils;
-import org.apache.kafka.metadata.migration.ZkMigrationState;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
@@ -54,7 +53,7 @@ public class FeaturesImageTest {
Map map1 = new HashMap<>();
map1.put("foo", (short) 2);
map1.put("bar", (short) 1);
- IMAGE1 = new FeaturesImage(map1, MetadataVersion.latestTesting(), ZkMigrationState.NONE);
+ IMAGE1 = new FeaturesImage(map1, MetadataVersion.latestTesting());
DELTA1_RECORDS = new ArrayList<>();
// change feature level
@@ -76,7 +75,7 @@ public class FeaturesImageTest {
Map map2 = new HashMap<>();
map2.put("foo", (short) 3);
map2.put("baz", (short) 8);
- IMAGE2 = new FeaturesImage(map2, MetadataVersion.latestTesting(), ZkMigrationState.NONE);
+ IMAGE2 = new FeaturesImage(map2, MetadataVersion.latestTesting());
DELTA2_RECORDS = new ArrayList<>();
// remove all features
@@ -95,7 +94,7 @@ public class FeaturesImageTest {
RecordTestUtils.replayAll(DELTA2, DELTA2_RECORDS);
Map map3 = Collections.singletonMap("bar", (short) 1);
- IMAGE3 = new FeaturesImage(map3, MetadataVersion.latestTesting(), ZkMigrationState.NONE);
+ IMAGE3 = new FeaturesImage(map3, MetadataVersion.latestTesting());
}
@Test
@@ -162,10 +161,9 @@ public class FeaturesImageTest {
public void testEmpty() {
assertTrue(FeaturesImage.EMPTY.isEmpty());
assertFalse(new FeaturesImage(Collections.singletonMap("foo", (short) 1),
- FeaturesImage.EMPTY.metadataVersion(), FeaturesImage.EMPTY.zkMigrationState()).isEmpty());
+ FeaturesImage.EMPTY.metadataVersion()).isEmpty());
assertFalse(new FeaturesImage(FeaturesImage.EMPTY.finalizedVersions(),
- MetadataVersion.IBP_3_3_IV0, FeaturesImage.EMPTY.zkMigrationState()).isEmpty());
- assertFalse(new FeaturesImage(FeaturesImage.EMPTY.finalizedVersions(),
- FeaturesImage.EMPTY.metadataVersion(), ZkMigrationState.MIGRATION).isEmpty());
+ MetadataVersion.IBP_3_3_IV0).isEmpty());
+ assertTrue(new FeaturesImage(FeaturesImage.EMPTY.finalizedVersions(), FeaturesImage.EMPTY.metadataVersion()).isEmpty());
}
}
diff --git a/metadata/src/test/java/org/apache/kafka/image/ImageDowngradeTest.java b/metadata/src/test/java/org/apache/kafka/image/ImageDowngradeTest.java
index 68fc8053bd2..ba636f3bdba 100644
--- a/metadata/src/test/java/org/apache/kafka/image/ImageDowngradeTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/ImageDowngradeTest.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.metadata.TopicRecord;
-import org.apache.kafka.common.metadata.ZkMigrationStateRecord;
import org.apache.kafka.image.writer.ImageWriterOptions;
import org.apache.kafka.image.writer.RecordListWriter;
import org.apache.kafka.image.writer.UnwritableMetadataException;
@@ -183,7 +182,6 @@ public class ImageDowngradeTest {
(short) 2)),
Arrays.asList(
metadataVersionRecord(outputMetadataVersion),
- new ApiMessageAndVersion(new ZkMigrationStateRecord(), (short) 0),
TEST_RECORDS.get(0),
new ApiMessageAndVersion(
testPartitionRecord.duplicate().setDirectories(Collections.emptyList()),
diff --git a/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotEmitterTest.java b/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotEmitterTest.java
index b92a1876e6c..d866c1b00ba 100644
--- a/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotEmitterTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotEmitterTest.java
@@ -148,7 +148,7 @@ public class SnapshotEmitterTest {
assertEquals(0L, emitter.metrics().latestSnapshotGeneratedBytes());
emitter.maybeEmit(MetadataImageTest.IMAGE1);
assertEquals(0L, emitter.metrics().latestSnapshotGeneratedAgeMs());
- assertEquals(1600L, emitter.metrics().latestSnapshotGeneratedBytes());
+ assertEquals(1500L, emitter.metrics().latestSnapshotGeneratedBytes());
FakeSnapshotWriter writer = mockRaftClient.writers.get(
MetadataImageTest.IMAGE1.provenance().snapshotId());
assertNotNull(writer);
diff --git a/server/src/main/java/org/apache/kafka/server/config/KRaftConfigs.java b/server/src/main/java/org/apache/kafka/server/config/KRaftConfigs.java
index d2cf35f33a5..b30a04b6a79 100644
--- a/server/src/main/java/org/apache/kafka/server/config/KRaftConfigs.java
+++ b/server/src/main/java/org/apache/kafka/server/config/KRaftConfigs.java
@@ -28,7 +28,6 @@ import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
-import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
import static org.apache.kafka.common.config.ConfigDef.Type.INT;
import static org.apache.kafka.common.config.ConfigDef.Type.LIST;
import static org.apache.kafka.common.config.ConfigDef.Type.LONG;
@@ -115,14 +114,6 @@ public class KRaftConfigs {
public static final String SERVER_MAX_STARTUP_TIME_MS_DOC = "The maximum number of milliseconds we will wait for the server to come up. " +
"By default there is no limit. This should be used for testing only.";
- /** ZK to KRaft Migration configs */
- public static final String MIGRATION_ENABLED_CONFIG = "zookeeper.metadata.migration.enable";
- public static final String MIGRATION_ENABLED_DOC = "Enable ZK to KRaft migration";
-
- public static final String MIGRATION_METADATA_MIN_BATCH_SIZE_CONFIG = "zookeeper.metadata.migration.min.batch.size";
- public static final int MIGRATION_METADATA_MIN_BATCH_SIZE_DEFAULT = 200;
- public static final String MIGRATION_METADATA_MIN_BATCH_SIZE_DOC = "Soft minimum batch size to use when migrating metadata from ZooKeeper to KRaft";
-
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_CONFIG, LONG, METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES, atLeast(1), HIGH, METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_DOC)
.define(METADATA_SNAPSHOT_MAX_INTERVAL_MS_CONFIG, LONG, METADATA_SNAPSHOT_MAX_INTERVAL_MS_DEFAULT, atLeast(0), HIGH, METADATA_SNAPSHOT_MAX_INTERVAL_MS_DOC)
@@ -140,8 +131,5 @@ public class KRaftConfigs {
.define(METADATA_MAX_RETENTION_BYTES_CONFIG, LONG, METADATA_MAX_RETENTION_BYTES_DEFAULT, null, HIGH, METADATA_MAX_RETENTION_BYTES_DOC)
.define(METADATA_MAX_RETENTION_MILLIS_CONFIG, LONG, LogConfig.DEFAULT_RETENTION_MS, null, HIGH, METADATA_MAX_RETENTION_MILLIS_DOC)
.define(METADATA_MAX_IDLE_INTERVAL_MS_CONFIG, INT, METADATA_MAX_IDLE_INTERVAL_MS_DEFAULT, atLeast(0), LOW, METADATA_MAX_IDLE_INTERVAL_MS_DOC)
- .defineInternal(SERVER_MAX_STARTUP_TIME_MS_CONFIG, LONG, SERVER_MAX_STARTUP_TIME_MS_DEFAULT, atLeast(0), MEDIUM, SERVER_MAX_STARTUP_TIME_MS_DOC)
- .define(MIGRATION_ENABLED_CONFIG, BOOLEAN, false, HIGH, MIGRATION_ENABLED_DOC)
- .defineInternal(MIGRATION_METADATA_MIN_BATCH_SIZE_CONFIG, INT, MIGRATION_METADATA_MIN_BATCH_SIZE_DEFAULT, atLeast(1),
- MEDIUM, MIGRATION_METADATA_MIN_BATCH_SIZE_DOC);
+ .defineInternal(SERVER_MAX_STARTUP_TIME_MS_CONFIG, LONG, SERVER_MAX_STARTUP_TIME_MS_DEFAULT, atLeast(0), MEDIUM, SERVER_MAX_STARTUP_TIME_MS_DOC);
}