KAFKA-16463 Delete metadata log on ZK broker startup (#15648)

This patch changes the behavior of the migrating ZK broker to always delete the local metadata log
during startup. This deletion is done immediately before creating the RaftManager which will
re-create the log directory and let the broker re-replicate the log from the active controller.

This new behavior is only present for ZK brokers that having migrations enabled. KRaft brokers,
even those with migrations enabled, will not delete their local metadata log. KRaft controllers are
not impacted by this change.

The rationale for this change is to make it easier for operators to re-attempt a ZK to KRaft
migration after having reverted back to ZK mode. If an operator has reverted back to ZK mode, there
will be an invalid metadata log on the disk of each broker. In order to re-attempt the migration in
the future, this log needs to be deleted. This can be pretty burdensome to the operator for large
clusters, especially since the log deletion must be done while the broker is offline.

Reviewers: Colin P. McCabe <cmccabe@apache.org>, Igor Soarez <soarez@apple.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
David Arthur 2024-04-12 13:21:30 -04:00 committed by GitHub
parent 61baa7ac6b
commit e02ffd852f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 277 additions and 18 deletions

View File

@ -32,13 +32,14 @@ import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater, NetworkClie
import org.apache.kafka.common.KafkaException
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.{ChannelBuilders, ListenerName, NetworkReceive, Selectable, Selector}
import org.apache.kafka.common.protocol.ApiMessage
import org.apache.kafka.common.requests.RequestHeader
import org.apache.kafka.common.security.JaasContext
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.common.utils.{LogContext, Time, Utils}
import org.apache.kafka.raft.RaftConfig.{AddressSpec, InetAddressSpec, NON_ROUTABLE_ADDRESS, UnknownAddressSpec}
import org.apache.kafka.raft.{FileBasedStateStore, KafkaNetworkChannel, KafkaRaftClient, KafkaRaftClientDriver, LeaderAndEpoch, RaftClient, RaftConfig, ReplicatedLog}
import org.apache.kafka.server.ProcessRole
@ -69,6 +70,51 @@ object KafkaRaftManager {
lock
}
/**
* Test if the configured metadata log dir is one of the data log dirs.
*/
def hasDifferentLogDir(config: KafkaConfig): Boolean = {
!config
.logDirs
.map(Paths.get(_).toAbsolutePath)
.contains(Paths.get(config.metadataLogDir).toAbsolutePath)
}
/**
* Obtain the file lock and delete the metadata log directory completely.
*
* This is only used by ZK brokers that are in pre-migration or hybrid mode of the ZK to KRaft migration.
* The rationale for deleting the metadata log in these cases is that it is safe to do on brokers and it
* it makes recovery from a failed migration much easier. See KAFKA-16463.
*
* @param config The broker config
*/
def maybeDeleteMetadataLogDir(config: KafkaConfig): Unit = {
// These constraints are enforced in KafkaServer, but repeating them here to guard against future callers
if (config.processRoles.nonEmpty) {
throw new RuntimeException("Not deleting metadata log dir since this node is in KRaft mode.")
} else if (!config.migrationEnabled) {
throw new RuntimeException("Not deleting metadata log dir since migrations are not enabled.")
} else {
val metadataDir = new File(config.metadataLogDir)
val logDirName = UnifiedLog.logDirName(Topic.CLUSTER_METADATA_TOPIC_PARTITION)
val metadataPartitionDir = KafkaRaftManager.createLogDirectory(metadataDir, logDirName)
val deletionLock = if (hasDifferentLogDir(config)) {
Some(KafkaRaftManager.lockDataDir(metadataDir))
} else {
None
}
try {
Utils.delete(metadataPartitionDir)
} catch {
case t: Throwable => throw new RuntimeException("Failed to delete metadata log", t)
} finally {
deletionLock.foreach(_.destroy())
}
}
}
}
trait RaftManager[T] {
@ -115,10 +161,8 @@ class KafkaRaftManager[T](
private val dataDirLock = {
// Acquire the log dir lock if the metadata log dir is different from the log dirs
val differentMetadataLogDir = !config
.logDirs
.map(Paths.get(_).toAbsolutePath)
.contains(Paths.get(config.metadataLogDir).toAbsolutePath)
val differentMetadataLogDir = KafkaRaftManager.hasDifferentLogDir(config)
// Or this node is only a controller
val isOnlyController = config.processRoles == Set(ProcessRole.ControllerRole)

View File

@ -421,6 +421,11 @@ class KafkaServer(
isZkBroker = true,
logManager.directoryIdsSet)
// For ZK brokers in migration mode, always delete the metadata partition on startup.
logger.info(s"Deleting local metadata log from ${config.metadataLogDir} since this is a ZK broker in migration mode.")
KafkaRaftManager.maybeDeleteMetadataLogDir(config)
logger.info("Successfully deleted local metadata log. It will be re-created.")
// If the ZK broker is in migration mode, start up a RaftManager to learn about the new KRaft controller
val controllerQuorumVotersFuture = CompletableFuture.completedFuture(
RaftConfig.parseVoterConnections(config.quorumVoters))
@ -778,7 +783,7 @@ class KafkaServer(
if (config.requiresZookeeper &&
metadataCache.getControllerId.exists(_.isInstanceOf[KRaftCachedControllerId])) {
info("ZkBroker currently has a KRaft controller. Controlled shutdown will be handled " +
"through broker life cycle manager")
"through broker lifecycle manager")
return true
}
val metadataUpdater = new ManualMetadataUpdater()

View File

@ -389,7 +389,12 @@ class NodeToControllerRequestThread(
debug("Controller isn't cached, looking for local metadata changes")
controllerInformation.node match {
case Some(controllerNode) =>
info(s"Recorded new controller, from now on will use node $controllerNode")
val controllerType = if (controllerInformation.isZkController) {
"ZK"
} else {
"KRaft"
}
info(s"Recorded new $controllerType controller, from now on will use node $controllerNode")
updateControllerAddress(controllerNode)
metadataUpdater.setNodes(Seq(controllerNode).asJava)
case None =>

View File

@ -16,7 +16,7 @@
*/
package kafka.zk
import kafka.server.KafkaConfig
import kafka.server.{KRaftCachedControllerId, KafkaConfig}
import kafka.test.{ClusterConfig, ClusterGenerator, ClusterInstance}
import kafka.test.annotation.{AutoStart, ClusterConfigProperty, ClusterTemplate, ClusterTest, Type}
import kafka.test.junit.ClusterTestExtensions
@ -480,6 +480,87 @@ class ZkMigrationIntegrationTest {
}
}
@ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties = Array(
new ClusterConfigProperty(key = "inter.broker.listener.name", value = "EXTERNAL"),
new ClusterConfigProperty(key = "listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
new ClusterConfigProperty(key = "advertised.listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
new ClusterConfigProperty(key = "listener.security.protocol.map", value = "EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT"),
))
def testDeleteLogOnStartup(zkCluster: ClusterInstance): Unit = {
var admin = zkCluster.createAdminClient()
try {
val newTopics = new util.ArrayList[NewTopic]()
newTopics.add(new NewTopic("testDeleteLogOnStartup", 2, 3.toShort)
.configs(Map(TopicConfig.SEGMENT_BYTES_CONFIG -> "102400", TopicConfig.SEGMENT_MS_CONFIG -> "300000").asJava))
val createTopicResult = admin.createTopics(newTopics)
createTopicResult.all().get(60, TimeUnit.SECONDS)
} finally {
admin.close()
}
// Bootstrap the ZK cluster ID into KRaft
val clusterId = zkCluster.clusterId()
val kraftCluster = new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder().
setBootstrapMetadataVersion(MetadataVersion.IBP_3_8_IV0).
setClusterId(Uuid.fromString(clusterId)).
setNumBrokerNodes(0).
setNumControllerNodes(1).build())
.setConfigProp(KafkaConfig.MigrationEnabledProp, "true")
.setConfigProp(ZkConfigs.ZK_CONNECT_CONFIG, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
.build()
try {
kraftCluster.format()
kraftCluster.startup()
val readyFuture = kraftCluster.controllers().values().asScala.head.controller.waitForReadyBrokers(3)
// Enable migration configs and restart brokers
log.info("Restart brokers in migration mode")
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, "true")
zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, kraftCluster.quorumVotersConfig())
zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
zkCluster.config().serverProperties().put(KafkaConfig.ListenerSecurityProtocolMapProp, "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
zkCluster.rollingBrokerRestart() // This would throw if authorizers weren't allowed
zkCluster.waitForReadyBrokers()
readyFuture.get(30, TimeUnit.SECONDS)
val zkClient = zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying().zkClient
TestUtils.waitUntilTrue(
() => zkClient.getControllerId.contains(3000),
"Timed out waiting for KRaft controller to take over",
30000)
def hasKRaftController: Boolean = {
zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying().brokers.forall(
broker => broker.metadataCache.getControllerId match {
case Some(_: KRaftCachedControllerId) => true
case _ => false
}
)
}
TestUtils.waitUntilTrue(() => hasKRaftController, "Timed out waiting for ZK brokers to see a KRaft controller")
log.info("Restart brokers again")
zkCluster.rollingBrokerRestart()
zkCluster.waitForReadyBrokers()
admin = zkCluster.createAdminClient()
try {
// List topics is served from local MetadataCache on brokers. For ZK brokers this cache is populated by UMR
// which won't be sent until the broker has been unfenced by the KRaft controller. So, seeing the topic in
// the brokers cache tells us it has recreated and re-replicated the metadata log
TestUtils.waitUntilTrue(
() => admin.listTopics().names().get(30, TimeUnit.SECONDS).asScala.contains("testDeleteLogOnStartup"),
"Timed out listing topics",
30000)
} finally {
admin.close()
}
} finally {
shutdownInSequence(zkCluster, kraftCluster)
}
}
// SCRAM and Quota are intermixed. Test Quota Only here
@ClusterTemplate("zkClustersForAllMigrationVersions")
def testDualWrite(zkCluster: ClusterInstance): Unit = {

View File

@ -18,8 +18,7 @@ package kafka.raft
import java.nio.channels.FileChannel
import java.nio.channels.OverlappingFileLockException
import java.nio.file.Path
import java.nio.file.StandardOpenOption
import java.nio.file.{Files, Path, StandardOpenOption}
import java.util.Properties
import java.util.concurrent.CompletableFuture
import kafka.log.LogManager
@ -32,6 +31,7 @@ import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.utils.Time
import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.config.ZkConfigs
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
@ -39,11 +39,36 @@ import org.junit.jupiter.params.provider.ValueSource
import org.apache.kafka.server.fault.FaultHandler
import org.mockito.Mockito._
class RaftManagerTest {
private def createZkBrokerConfig(
migrationEnabled: Boolean,
nodeId: Int,
logDir: Seq[Path],
metadataDir: Option[Path]
): KafkaConfig = {
val props = new Properties
logDir.foreach { value =>
props.setProperty(KafkaConfig.LogDirProp, value.toString)
}
if (migrationEnabled) {
metadataDir.foreach { value =>
props.setProperty(KafkaConfig.MetadataLogDirProp, value.toString)
}
props.setProperty(KafkaConfig.MigrationEnabledProp, "true")
props.setProperty(KafkaConfig.QuorumVotersProp, s"${nodeId}@localhost:9093")
props.setProperty(KafkaConfig.ControllerListenerNamesProp, "SSL")
}
props.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181")
props.setProperty(KafkaConfig.BrokerIdProp, nodeId.toString)
new KafkaConfig(props)
}
private def createConfig(
processRoles: Set[ProcessRole],
nodeId: Int,
logDir: Option[Path],
logDir: Seq[Path],
metadataDir: Option[Path]
): KafkaConfig = {
val props = new Properties
@ -111,7 +136,7 @@ class RaftManagerTest {
createConfig(
processRolesSet,
nodeId,
Some(logDir.toPath),
Seq(logDir.toPath),
None
)
)
@ -123,9 +148,9 @@ class RaftManagerTest {
@ValueSource(strings = Array("metadata-only", "log-only", "both"))
def testLogDirLockWhenControllerOnly(dirType: String): Unit = {
val logDir = if (dirType.equals("metadata-only")) {
None
Seq.empty
} else {
Some(TestUtils.tempDir().toPath)
Seq(TestUtils.tempDir().toPath)
}
val metadataDir = if (dirType.equals("log-only")) {
@ -145,7 +170,7 @@ class RaftManagerTest {
)
)
val lockPath = metadataDir.getOrElse(logDir.get).resolve(LogManager.LockFileName)
val lockPath = metadataDir.getOrElse(logDir.head).resolve(LogManager.LockFileName)
assertTrue(fileLocked(lockPath))
raftManager.shutdown()
@ -155,7 +180,7 @@ class RaftManagerTest {
@Test
def testLogDirLockWhenBrokerOnlyWithSeparateMetadataDir(): Unit = {
val logDir = Some(TestUtils.tempDir().toPath)
val logDir = Seq(TestUtils.tempDir().toPath)
val metadataDir = Some(TestUtils.tempDir().toPath)
val nodeId = 1
@ -169,7 +194,7 @@ class RaftManagerTest {
)
)
val lockPath = metadataDir.getOrElse(logDir.get).resolve(LogManager.LockFileName)
val lockPath = metadataDir.getOrElse(logDir.head).resolve(LogManager.LockFileName)
assertTrue(fileLocked(lockPath))
raftManager.shutdown()
@ -177,6 +202,106 @@ class RaftManagerTest {
assertFalse(fileLocked(lockPath))
}
def createMetadataLog(config: KafkaConfig): Unit = {
val raftManager = createRaftManager(
new TopicPartition("__cluster_metadata", 0),
config
)
raftManager.shutdown()
}
def assertLogDirsExist(
logDirs: Seq[Path],
metadataLogDir: Option[Path],
expectMetadataLog: Boolean
): Unit = {
// In all cases, the log dir and metadata log dir themselves should be untouched
assertTrue(Files.exists(metadataLogDir.get))
logDirs.foreach { logDir =>
assertTrue(Files.exists(logDir), "Should not delete log dir")
}
if (expectMetadataLog) {
assertTrue(Files.exists(metadataLogDir.get.resolve("__cluster_metadata-0")))
} else {
assertFalse(Files.exists(metadataLogDir.get.resolve("__cluster_metadata-0")))
}
}
@Test
def testMigratingZkBrokerDeletesMetadataLog(): Unit = {
val logDirs = Seq(TestUtils.tempDir().toPath)
val metadataLogDir = Some(TestUtils.tempDir().toPath)
val nodeId = 1
val config = createZkBrokerConfig(migrationEnabled = true, nodeId, logDirs, metadataLogDir)
createMetadataLog(config)
KafkaRaftManager.maybeDeleteMetadataLogDir(config)
assertLogDirsExist(logDirs, metadataLogDir, expectMetadataLog = false)
}
@Test
def testNonMigratingZkBrokerDoesNotDeleteMetadataLog(): Unit = {
val logDirs = Seq(TestUtils.tempDir().toPath)
val metadataLogDir = Some(TestUtils.tempDir().toPath)
val nodeId = 1
val config = createZkBrokerConfig(migrationEnabled = false, nodeId, logDirs, metadataLogDir)
// Create the metadata log dir directly as if the broker was previously in migration mode.
// This simulates a misconfiguration after downgrade
Files.createDirectory(metadataLogDir.get.resolve("__cluster_metadata-0"))
val err = assertThrows(classOf[RuntimeException], () => KafkaRaftManager.maybeDeleteMetadataLogDir(config),
"Should have not deleted the metadata log")
assertEquals("Not deleting metadata log dir since migrations are not enabled.", err.getMessage)
assertLogDirsExist(logDirs, metadataLogDir, expectMetadataLog = true)
}
@Test
def testZkBrokerDoesNotDeleteSeparateLogDirs(): Unit = {
val logDirs = Seq(TestUtils.tempDir().toPath, TestUtils.tempDir().toPath)
val metadataLogDir = Some(TestUtils.tempDir().toPath)
val nodeId = 1
val config = createZkBrokerConfig(migrationEnabled = true, nodeId, logDirs, metadataLogDir)
createMetadataLog(config)
KafkaRaftManager.maybeDeleteMetadataLogDir(config)
assertLogDirsExist(logDirs, metadataLogDir, expectMetadataLog = false)
}
@Test
def testZkBrokerDoesNotDeleteSameLogDir(): Unit = {
val logDirs = Seq(TestUtils.tempDir().toPath, TestUtils.tempDir().toPath)
val metadataLogDir = logDirs.headOption
val nodeId = 1
val config = createZkBrokerConfig(migrationEnabled = true, nodeId, logDirs, metadataLogDir)
createMetadataLog(config)
KafkaRaftManager.maybeDeleteMetadataLogDir(config)
assertLogDirsExist(logDirs, metadataLogDir, expectMetadataLog = false)
}
@Test
def testKRaftBrokerDoesNotDeleteMetadataLog(): Unit = {
val logDirs = Seq(TestUtils.tempDir().toPath)
val metadataLogDir = Some(TestUtils.tempDir().toPath)
val nodeId = 1
val config = createConfig(
Set(ProcessRole.BrokerRole),
nodeId,
logDirs,
metadataLogDir
)
createMetadataLog(config)
assertThrows(classOf[RuntimeException], () => KafkaRaftManager.maybeDeleteMetadataLogDir(config),
"Should not have deleted metadata log")
assertLogDirsExist(logDirs, metadataLogDir, expectMetadataLog = true)
}
private def fileLocked(path: Path): Boolean = {
TestUtils.resource(FileChannel.open(path, StandardOpenOption.CREATE, StandardOpenOption.WRITE)) { channel =>
try {
@ -187,5 +312,4 @@ class RaftManagerTest {
}
}
}
}