diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala b/core/src/main/scala/kafka/raft/RaftManager.scala index a9e64fb967b..a16e528d797 100644 --- a/core/src/main/scala/kafka/raft/RaftManager.scala +++ b/core/src/main/scala/kafka/raft/RaftManager.scala @@ -32,13 +32,14 @@ import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater, NetworkClie import org.apache.kafka.common.KafkaException import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.Uuid +import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.network.{ChannelBuilders, ListenerName, NetworkReceive, Selectable, Selector} import org.apache.kafka.common.protocol.ApiMessage import org.apache.kafka.common.requests.RequestHeader import org.apache.kafka.common.security.JaasContext import org.apache.kafka.common.security.auth.SecurityProtocol -import org.apache.kafka.common.utils.{LogContext, Time} +import org.apache.kafka.common.utils.{LogContext, Time, Utils} import org.apache.kafka.raft.RaftConfig.{AddressSpec, InetAddressSpec, NON_ROUTABLE_ADDRESS, UnknownAddressSpec} import org.apache.kafka.raft.{FileBasedStateStore, KafkaNetworkChannel, KafkaRaftClient, KafkaRaftClientDriver, LeaderAndEpoch, RaftClient, RaftConfig, ReplicatedLog} import org.apache.kafka.server.ProcessRole @@ -69,6 +70,51 @@ object KafkaRaftManager { lock } + + /** + * Test if the configured metadata log dir is one of the data log dirs. + */ + def hasDifferentLogDir(config: KafkaConfig): Boolean = { + !config + .logDirs + .map(Paths.get(_).toAbsolutePath) + .contains(Paths.get(config.metadataLogDir).toAbsolutePath) + } + + /** + * Obtain the file lock and delete the metadata log directory completely. + * + * This is only used by ZK brokers that are in pre-migration or hybrid mode of the ZK to KRaft migration. + * The rationale for deleting the metadata log in these cases is that it is safe to do on brokers and it + * it makes recovery from a failed migration much easier. See KAFKA-16463. + * + * @param config The broker config + */ + def maybeDeleteMetadataLogDir(config: KafkaConfig): Unit = { + // These constraints are enforced in KafkaServer, but repeating them here to guard against future callers + if (config.processRoles.nonEmpty) { + throw new RuntimeException("Not deleting metadata log dir since this node is in KRaft mode.") + } else if (!config.migrationEnabled) { + throw new RuntimeException("Not deleting metadata log dir since migrations are not enabled.") + } else { + val metadataDir = new File(config.metadataLogDir) + val logDirName = UnifiedLog.logDirName(Topic.CLUSTER_METADATA_TOPIC_PARTITION) + val metadataPartitionDir = KafkaRaftManager.createLogDirectory(metadataDir, logDirName) + val deletionLock = if (hasDifferentLogDir(config)) { + Some(KafkaRaftManager.lockDataDir(metadataDir)) + } else { + None + } + + try { + Utils.delete(metadataPartitionDir) + } catch { + case t: Throwable => throw new RuntimeException("Failed to delete metadata log", t) + } finally { + deletionLock.foreach(_.destroy()) + } + } + } } trait RaftManager[T] { @@ -115,10 +161,8 @@ class KafkaRaftManager[T]( private val dataDirLock = { // Acquire the log dir lock if the metadata log dir is different from the log dirs - val differentMetadataLogDir = !config - .logDirs - .map(Paths.get(_).toAbsolutePath) - .contains(Paths.get(config.metadataLogDir).toAbsolutePath) + val differentMetadataLogDir = KafkaRaftManager.hasDifferentLogDir(config) + // Or this node is only a controller val isOnlyController = config.processRoles == Set(ProcessRole.ControllerRole) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index b8da5f964de..bcfc1dba343 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -421,6 +421,11 @@ class KafkaServer( isZkBroker = true, logManager.directoryIdsSet) + // For ZK brokers in migration mode, always delete the metadata partition on startup. + logger.info(s"Deleting local metadata log from ${config.metadataLogDir} since this is a ZK broker in migration mode.") + KafkaRaftManager.maybeDeleteMetadataLogDir(config) + logger.info("Successfully deleted local metadata log. It will be re-created.") + // If the ZK broker is in migration mode, start up a RaftManager to learn about the new KRaft controller val controllerQuorumVotersFuture = CompletableFuture.completedFuture( RaftConfig.parseVoterConnections(config.quorumVoters)) @@ -778,7 +783,7 @@ class KafkaServer( if (config.requiresZookeeper && metadataCache.getControllerId.exists(_.isInstanceOf[KRaftCachedControllerId])) { info("ZkBroker currently has a KRaft controller. Controlled shutdown will be handled " + - "through broker life cycle manager") + "through broker lifecycle manager") return true } val metadataUpdater = new ManualMetadataUpdater() diff --git a/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala b/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala index e0f0118ea38..d5908bfea22 100644 --- a/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala +++ b/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala @@ -389,7 +389,12 @@ class NodeToControllerRequestThread( debug("Controller isn't cached, looking for local metadata changes") controllerInformation.node match { case Some(controllerNode) => - info(s"Recorded new controller, from now on will use node $controllerNode") + val controllerType = if (controllerInformation.isZkController) { + "ZK" + } else { + "KRaft" + } + info(s"Recorded new $controllerType controller, from now on will use node $controllerNode") updateControllerAddress(controllerNode) metadataUpdater.setNodes(Seq(controllerNode).asJava) case None => diff --git a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala index 07a7bee6845..df4a4139426 100644 --- a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala @@ -16,7 +16,7 @@ */ package kafka.zk -import kafka.server.KafkaConfig +import kafka.server.{KRaftCachedControllerId, KafkaConfig} import kafka.test.{ClusterConfig, ClusterGenerator, ClusterInstance} import kafka.test.annotation.{AutoStart, ClusterConfigProperty, ClusterTemplate, ClusterTest, Type} import kafka.test.junit.ClusterTestExtensions @@ -480,6 +480,87 @@ class ZkMigrationIntegrationTest { } } + @ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties = Array( + new ClusterConfigProperty(key = "inter.broker.listener.name", value = "EXTERNAL"), + new ClusterConfigProperty(key = "listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"), + new ClusterConfigProperty(key = "advertised.listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"), + new ClusterConfigProperty(key = "listener.security.protocol.map", value = "EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT"), + )) + def testDeleteLogOnStartup(zkCluster: ClusterInstance): Unit = { + var admin = zkCluster.createAdminClient() + try { + val newTopics = new util.ArrayList[NewTopic]() + newTopics.add(new NewTopic("testDeleteLogOnStartup", 2, 3.toShort) + .configs(Map(TopicConfig.SEGMENT_BYTES_CONFIG -> "102400", TopicConfig.SEGMENT_MS_CONFIG -> "300000").asJava)) + val createTopicResult = admin.createTopics(newTopics) + createTopicResult.all().get(60, TimeUnit.SECONDS) + } finally { + admin.close() + } + + // Bootstrap the ZK cluster ID into KRaft + val clusterId = zkCluster.clusterId() + val kraftCluster = new KafkaClusterTestKit.Builder( + new TestKitNodes.Builder(). + setBootstrapMetadataVersion(MetadataVersion.IBP_3_8_IV0). + setClusterId(Uuid.fromString(clusterId)). + setNumBrokerNodes(0). + setNumControllerNodes(1).build()) + .setConfigProp(KafkaConfig.MigrationEnabledProp, "true") + .setConfigProp(ZkConfigs.ZK_CONNECT_CONFIG, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect) + .build() + try { + kraftCluster.format() + kraftCluster.startup() + val readyFuture = kraftCluster.controllers().values().asScala.head.controller.waitForReadyBrokers(3) + + // Enable migration configs and restart brokers + log.info("Restart brokers in migration mode") + zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, "true") + zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, kraftCluster.quorumVotersConfig()) + zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER") + zkCluster.config().serverProperties().put(KafkaConfig.ListenerSecurityProtocolMapProp, "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT") + zkCluster.rollingBrokerRestart() // This would throw if authorizers weren't allowed + zkCluster.waitForReadyBrokers() + readyFuture.get(30, TimeUnit.SECONDS) + + val zkClient = zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying().zkClient + TestUtils.waitUntilTrue( + () => zkClient.getControllerId.contains(3000), + "Timed out waiting for KRaft controller to take over", + 30000) + + def hasKRaftController: Boolean = { + zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying().brokers.forall( + broker => broker.metadataCache.getControllerId match { + case Some(_: KRaftCachedControllerId) => true + case _ => false + } + ) + } + TestUtils.waitUntilTrue(() => hasKRaftController, "Timed out waiting for ZK brokers to see a KRaft controller") + + log.info("Restart brokers again") + zkCluster.rollingBrokerRestart() + zkCluster.waitForReadyBrokers() + + admin = zkCluster.createAdminClient() + try { + // List topics is served from local MetadataCache on brokers. For ZK brokers this cache is populated by UMR + // which won't be sent until the broker has been unfenced by the KRaft controller. So, seeing the topic in + // the brokers cache tells us it has recreated and re-replicated the metadata log + TestUtils.waitUntilTrue( + () => admin.listTopics().names().get(30, TimeUnit.SECONDS).asScala.contains("testDeleteLogOnStartup"), + "Timed out listing topics", + 30000) + } finally { + admin.close() + } + } finally { + shutdownInSequence(zkCluster, kraftCluster) + } + } + // SCRAM and Quota are intermixed. Test Quota Only here @ClusterTemplate("zkClustersForAllMigrationVersions") def testDualWrite(zkCluster: ClusterInstance): Unit = { diff --git a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala index e716bb452d9..f3158d93d7e 100644 --- a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala +++ b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala @@ -18,8 +18,7 @@ package kafka.raft import java.nio.channels.FileChannel import java.nio.channels.OverlappingFileLockException -import java.nio.file.Path -import java.nio.file.StandardOpenOption +import java.nio.file.{Files, Path, StandardOpenOption} import java.util.Properties import java.util.concurrent.CompletableFuture import kafka.log.LogManager @@ -32,6 +31,7 @@ import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.utils.Time import org.apache.kafka.raft.RaftConfig import org.apache.kafka.server.ProcessRole +import org.apache.kafka.server.config.ZkConfigs import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Test import org.junit.jupiter.params.ParameterizedTest @@ -39,11 +39,36 @@ import org.junit.jupiter.params.provider.ValueSource import org.apache.kafka.server.fault.FaultHandler import org.mockito.Mockito._ + class RaftManagerTest { + private def createZkBrokerConfig( + migrationEnabled: Boolean, + nodeId: Int, + logDir: Seq[Path], + metadataDir: Option[Path] + ): KafkaConfig = { + val props = new Properties + logDir.foreach { value => + props.setProperty(KafkaConfig.LogDirProp, value.toString) + } + if (migrationEnabled) { + metadataDir.foreach { value => + props.setProperty(KafkaConfig.MetadataLogDirProp, value.toString) + } + props.setProperty(KafkaConfig.MigrationEnabledProp, "true") + props.setProperty(KafkaConfig.QuorumVotersProp, s"${nodeId}@localhost:9093") + props.setProperty(KafkaConfig.ControllerListenerNamesProp, "SSL") + } + + props.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181") + props.setProperty(KafkaConfig.BrokerIdProp, nodeId.toString) + new KafkaConfig(props) + } + private def createConfig( processRoles: Set[ProcessRole], nodeId: Int, - logDir: Option[Path], + logDir: Seq[Path], metadataDir: Option[Path] ): KafkaConfig = { val props = new Properties @@ -111,7 +136,7 @@ class RaftManagerTest { createConfig( processRolesSet, nodeId, - Some(logDir.toPath), + Seq(logDir.toPath), None ) ) @@ -123,9 +148,9 @@ class RaftManagerTest { @ValueSource(strings = Array("metadata-only", "log-only", "both")) def testLogDirLockWhenControllerOnly(dirType: String): Unit = { val logDir = if (dirType.equals("metadata-only")) { - None + Seq.empty } else { - Some(TestUtils.tempDir().toPath) + Seq(TestUtils.tempDir().toPath) } val metadataDir = if (dirType.equals("log-only")) { @@ -145,7 +170,7 @@ class RaftManagerTest { ) ) - val lockPath = metadataDir.getOrElse(logDir.get).resolve(LogManager.LockFileName) + val lockPath = metadataDir.getOrElse(logDir.head).resolve(LogManager.LockFileName) assertTrue(fileLocked(lockPath)) raftManager.shutdown() @@ -155,7 +180,7 @@ class RaftManagerTest { @Test def testLogDirLockWhenBrokerOnlyWithSeparateMetadataDir(): Unit = { - val logDir = Some(TestUtils.tempDir().toPath) + val logDir = Seq(TestUtils.tempDir().toPath) val metadataDir = Some(TestUtils.tempDir().toPath) val nodeId = 1 @@ -169,7 +194,7 @@ class RaftManagerTest { ) ) - val lockPath = metadataDir.getOrElse(logDir.get).resolve(LogManager.LockFileName) + val lockPath = metadataDir.getOrElse(logDir.head).resolve(LogManager.LockFileName) assertTrue(fileLocked(lockPath)) raftManager.shutdown() @@ -177,6 +202,106 @@ class RaftManagerTest { assertFalse(fileLocked(lockPath)) } + def createMetadataLog(config: KafkaConfig): Unit = { + val raftManager = createRaftManager( + new TopicPartition("__cluster_metadata", 0), + config + ) + raftManager.shutdown() + } + + def assertLogDirsExist( + logDirs: Seq[Path], + metadataLogDir: Option[Path], + expectMetadataLog: Boolean + ): Unit = { + // In all cases, the log dir and metadata log dir themselves should be untouched + assertTrue(Files.exists(metadataLogDir.get)) + logDirs.foreach { logDir => + assertTrue(Files.exists(logDir), "Should not delete log dir") + } + + if (expectMetadataLog) { + assertTrue(Files.exists(metadataLogDir.get.resolve("__cluster_metadata-0"))) + } else { + assertFalse(Files.exists(metadataLogDir.get.resolve("__cluster_metadata-0"))) + } + } + + @Test + def testMigratingZkBrokerDeletesMetadataLog(): Unit = { + val logDirs = Seq(TestUtils.tempDir().toPath) + val metadataLogDir = Some(TestUtils.tempDir().toPath) + val nodeId = 1 + val config = createZkBrokerConfig(migrationEnabled = true, nodeId, logDirs, metadataLogDir) + createMetadataLog(config) + + KafkaRaftManager.maybeDeleteMetadataLogDir(config) + assertLogDirsExist(logDirs, metadataLogDir, expectMetadataLog = false) + } + + @Test + def testNonMigratingZkBrokerDoesNotDeleteMetadataLog(): Unit = { + val logDirs = Seq(TestUtils.tempDir().toPath) + val metadataLogDir = Some(TestUtils.tempDir().toPath) + val nodeId = 1 + + val config = createZkBrokerConfig(migrationEnabled = false, nodeId, logDirs, metadataLogDir) + + // Create the metadata log dir directly as if the broker was previously in migration mode. + // This simulates a misconfiguration after downgrade + Files.createDirectory(metadataLogDir.get.resolve("__cluster_metadata-0")) + + val err = assertThrows(classOf[RuntimeException], () => KafkaRaftManager.maybeDeleteMetadataLogDir(config), + "Should have not deleted the metadata log") + assertEquals("Not deleting metadata log dir since migrations are not enabled.", err.getMessage) + + assertLogDirsExist(logDirs, metadataLogDir, expectMetadataLog = true) + } + + @Test + def testZkBrokerDoesNotDeleteSeparateLogDirs(): Unit = { + val logDirs = Seq(TestUtils.tempDir().toPath, TestUtils.tempDir().toPath) + val metadataLogDir = Some(TestUtils.tempDir().toPath) + val nodeId = 1 + val config = createZkBrokerConfig(migrationEnabled = true, nodeId, logDirs, metadataLogDir) + createMetadataLog(config) + + KafkaRaftManager.maybeDeleteMetadataLogDir(config) + assertLogDirsExist(logDirs, metadataLogDir, expectMetadataLog = false) + } + + @Test + def testZkBrokerDoesNotDeleteSameLogDir(): Unit = { + val logDirs = Seq(TestUtils.tempDir().toPath, TestUtils.tempDir().toPath) + val metadataLogDir = logDirs.headOption + val nodeId = 1 + val config = createZkBrokerConfig(migrationEnabled = true, nodeId, logDirs, metadataLogDir) + createMetadataLog(config) + + KafkaRaftManager.maybeDeleteMetadataLogDir(config) + assertLogDirsExist(logDirs, metadataLogDir, expectMetadataLog = false) + } + + @Test + def testKRaftBrokerDoesNotDeleteMetadataLog(): Unit = { + val logDirs = Seq(TestUtils.tempDir().toPath) + val metadataLogDir = Some(TestUtils.tempDir().toPath) + val nodeId = 1 + val config = createConfig( + Set(ProcessRole.BrokerRole), + nodeId, + logDirs, + metadataLogDir + ) + createMetadataLog(config) + + assertThrows(classOf[RuntimeException], () => KafkaRaftManager.maybeDeleteMetadataLogDir(config), + "Should not have deleted metadata log") + assertLogDirsExist(logDirs, metadataLogDir, expectMetadataLog = true) + + } + private def fileLocked(path: Path): Boolean = { TestUtils.resource(FileChannel.open(path, StandardOpenOption.CREATE, StandardOpenOption.WRITE)) { channel => try { @@ -187,5 +312,4 @@ class RaftManagerTest { } } } - }