mirror of https://github.com/apache/kafka.git
KAFKA-16082 Avoid resuming future replica if current replica is in the same directory (#15777)
It is observed that for scenario (3), i.e. a broker crashes while it waits for the future replica to catch up for the second time and the `dir1` is unavailable when the broker is restarted, the broker tries to create the partition in `dir2` according to the metadata in the controller. However, ReplicaManager also tries to resume the stale future replica which was abandoned when the broker crashed. This results in the renaming of the future replica to fail eventually because the directory for the topic partition already exists in `dir2` and the broker then marks `dir2` as offline. This PR attempts to fix this behaviour by ignoring any future replicas which are in the same directory as where the log exists. It further marks the stale future replica for deletion. Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
6e998cffdd
commit
4ee66bb269
|
@ -1175,6 +1175,36 @@ class LogManager(logDirs: Seq[File],
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def recoverAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): Unit = {
|
||||||
|
val abandonedFutureLogs = findAbandonedFutureLogs(brokerId, newTopicsImage)
|
||||||
|
abandonedFutureLogs.foreach { case (futureLog, currentLog) =>
|
||||||
|
val tp = futureLog.topicPartition
|
||||||
|
// We invoke abortAndPauseCleaning here because log cleaner runs asynchronously and replaceCurrentWithFutureLog
|
||||||
|
// invokes resumeCleaning which requires log cleaner's internal state to have a key for the given topic partition.
|
||||||
|
abortAndPauseCleaning(tp)
|
||||||
|
|
||||||
|
if (currentLog.isDefined)
|
||||||
|
info(s"Attempting to recover abandoned future log for $tp at $futureLog and removing ${currentLog.get}")
|
||||||
|
else
|
||||||
|
info(s"Attempting to recover abandoned future log for $tp at $futureLog")
|
||||||
|
replaceCurrentWithFutureLog(currentLog, futureLog)
|
||||||
|
info(s"Successfully recovered abandoned future log for $tp")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private def findAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): Iterable[(UnifiedLog, Option[UnifiedLog])] = {
|
||||||
|
futureLogs.values.flatMap { futureLog =>
|
||||||
|
val topicId = futureLog.topicId.getOrElse {
|
||||||
|
throw new RuntimeException(s"The log dir $futureLog does not have a topic ID, " +
|
||||||
|
"which is not allowed when running in KRaft mode.")
|
||||||
|
}
|
||||||
|
val partitionId = futureLog.topicPartition.partition()
|
||||||
|
Option(newTopicsImage.getPartition(topicId, partitionId))
|
||||||
|
.filter(pr => directoryId(futureLog.parentDir).contains(pr.directory(brokerId)))
|
||||||
|
.map(_ => (futureLog, Option(currentLogs.get(futureLog.topicPartition)).filter(currentLog => currentLog.topicId.contains(topicId))))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Mark the partition directory in the source log directory for deletion and
|
* Mark the partition directory in the source log directory for deletion and
|
||||||
* rename the future log of this partition in the destination log directory to be the current log
|
* rename the future log of this partition in the destination log directory to be the current log
|
||||||
|
@ -1186,49 +1216,62 @@ class LogManager(logDirs: Seq[File],
|
||||||
val sourceLog = currentLogs.get(topicPartition)
|
val sourceLog = currentLogs.get(topicPartition)
|
||||||
val destLog = futureLogs.get(topicPartition)
|
val destLog = futureLogs.get(topicPartition)
|
||||||
|
|
||||||
info(s"Attempting to replace current log $sourceLog with $destLog for $topicPartition")
|
|
||||||
if (sourceLog == null)
|
if (sourceLog == null)
|
||||||
throw new KafkaStorageException(s"The current replica for $topicPartition is offline")
|
throw new KafkaStorageException(s"The current replica for $topicPartition is offline")
|
||||||
if (destLog == null)
|
if (destLog == null)
|
||||||
throw new KafkaStorageException(s"The future replica for $topicPartition is offline")
|
throw new KafkaStorageException(s"The future replica for $topicPartition is offline")
|
||||||
|
|
||||||
destLog.renameDir(UnifiedLog.logDirName(topicPartition), true)
|
info(s"Attempting to replace current log $sourceLog with $destLog for $topicPartition")
|
||||||
// the metrics tags still contain "future", so we have to remove it.
|
replaceCurrentWithFutureLog(Option(sourceLog), destLog, updateHighWatermark = true)
|
||||||
// we will add metrics back after sourceLog remove the metrics
|
info(s"The current replica is successfully replaced with the future replica for $topicPartition")
|
||||||
destLog.removeLogMetrics()
|
}
|
||||||
destLog.updateHighWatermark(sourceLog.highWatermark)
|
}
|
||||||
|
|
||||||
// Now that future replica has been successfully renamed to be the current replica
|
def replaceCurrentWithFutureLog(sourceLog: Option[UnifiedLog], destLog: UnifiedLog, updateHighWatermark: Boolean = false): Unit = {
|
||||||
// Update the cached map and log cleaner as appropriate.
|
val topicPartition = destLog.topicPartition
|
||||||
futureLogs.remove(topicPartition)
|
|
||||||
currentLogs.put(topicPartition, destLog)
|
destLog.renameDir(UnifiedLog.logDirName(topicPartition), shouldReinitialize = true)
|
||||||
if (cleaner != null) {
|
// the metrics tags still contain "future", so we have to remove it.
|
||||||
cleaner.alterCheckpointDir(topicPartition, sourceLog.parentDirFile, destLog.parentDirFile)
|
// we will add metrics back after sourceLog remove the metrics
|
||||||
resumeCleaning(topicPartition)
|
destLog.removeLogMetrics()
|
||||||
|
if (updateHighWatermark && sourceLog.isDefined) {
|
||||||
|
destLog.updateHighWatermark(sourceLog.get.highWatermark)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now that future replica has been successfully renamed to be the current replica
|
||||||
|
// Update the cached map and log cleaner as appropriate.
|
||||||
|
futureLogs.remove(topicPartition)
|
||||||
|
currentLogs.put(topicPartition, destLog)
|
||||||
|
if (cleaner != null) {
|
||||||
|
sourceLog.foreach { srcLog =>
|
||||||
|
cleaner.alterCheckpointDir(topicPartition, srcLog.parentDirFile, destLog.parentDirFile)
|
||||||
}
|
}
|
||||||
|
resumeCleaning(topicPartition)
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
sourceLog.renameDir(UnifiedLog.logDeleteDirName(topicPartition), true)
|
sourceLog.foreach { srcLog =>
|
||||||
|
srcLog.renameDir(UnifiedLog.logDeleteDirName(topicPartition), shouldReinitialize = true)
|
||||||
// Now that replica in source log directory has been successfully renamed for deletion.
|
// Now that replica in source log directory has been successfully renamed for deletion.
|
||||||
// Close the log, update checkpoint files, and enqueue this log to be deleted.
|
// Close the log, update checkpoint files, and enqueue this log to be deleted.
|
||||||
sourceLog.close()
|
srcLog.close()
|
||||||
val logDir = sourceLog.parentDirFile
|
val logDir = srcLog.parentDirFile
|
||||||
val logsToCheckpoint = logsInDir(logDir)
|
val logsToCheckpoint = logsInDir(logDir)
|
||||||
checkpointRecoveryOffsetsInDir(logDir, logsToCheckpoint)
|
checkpointRecoveryOffsetsInDir(logDir, logsToCheckpoint)
|
||||||
checkpointLogStartOffsetsInDir(logDir, logsToCheckpoint)
|
checkpointLogStartOffsetsInDir(logDir, logsToCheckpoint)
|
||||||
sourceLog.removeLogMetrics()
|
srcLog.removeLogMetrics()
|
||||||
destLog.newMetrics()
|
addLogToBeDeleted(srcLog)
|
||||||
addLogToBeDeleted(sourceLog)
|
|
||||||
} catch {
|
|
||||||
case e: KafkaStorageException =>
|
|
||||||
// If sourceLog's log directory is offline, we need close its handlers here.
|
|
||||||
// handleLogDirFailure() will not close handlers of sourceLog because it has been removed from currentLogs map
|
|
||||||
sourceLog.closeHandlers()
|
|
||||||
sourceLog.removeLogMetrics()
|
|
||||||
throw e
|
|
||||||
}
|
}
|
||||||
|
destLog.newMetrics()
|
||||||
info(s"The current replica is successfully replaced with the future replica for $topicPartition")
|
} catch {
|
||||||
|
case e: KafkaStorageException =>
|
||||||
|
// If sourceLog's log directory is offline, we need close its handlers here.
|
||||||
|
// handleLogDirFailure() will not close handlers of sourceLog because it has been removed from currentLogs map
|
||||||
|
sourceLog.foreach { srcLog =>
|
||||||
|
srcLog.closeHandlers()
|
||||||
|
srcLog.removeLogMetrics()
|
||||||
|
}
|
||||||
|
throw e
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -294,6 +294,14 @@ class BrokerMetadataPublisher(
|
||||||
isStray = log => LogManager.isStrayKraftReplica(brokerId, newImage.topics(), log)
|
isStray = log => LogManager.isStrayKraftReplica(brokerId, newImage.topics(), log)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Rename all future replicas which are in the same directory as the
|
||||||
|
// one assigned by the controller. This can only happen due to a disk
|
||||||
|
// failure and broker shutdown after the directory assignment has been
|
||||||
|
// updated in the controller but before the future replica could be
|
||||||
|
// promoted.
|
||||||
|
// See KAFKA-16082 for details.
|
||||||
|
logManager.recoverAbandonedFutureLogs(brokerId, newImage.topics())
|
||||||
|
|
||||||
// Make the LogCleaner available for reconfiguration. We can't do this prior to this
|
// Make the LogCleaner available for reconfiguration. We can't do this prior to this
|
||||||
// point because LogManager#startup creates the LogCleaner object, if
|
// point because LogManager#startup creates the LogCleaner object, if
|
||||||
// log.cleaner.enable is true. TODO: improve this (see KAFKA-13610)
|
// log.cleaner.enable is true. TODO: improve this (see KAFKA-13610)
|
||||||
|
|
|
@ -17,10 +17,12 @@
|
||||||
|
|
||||||
package kafka.server
|
package kafka.server
|
||||||
|
|
||||||
|
import kafka.log.UnifiedLog
|
||||||
import kafka.network.SocketServer
|
import kafka.network.SocketServer
|
||||||
import kafka.server.IntegrationTestUtils.connectAndReceive
|
import kafka.server.IntegrationTestUtils.connectAndReceive
|
||||||
import kafka.testkit.{BrokerNode, KafkaClusterTestKit, TestKitNodes}
|
import kafka.testkit.{BrokerNode, KafkaClusterTestKit, TestKitNodes}
|
||||||
import kafka.utils.TestUtils
|
import kafka.utils.TestUtils
|
||||||
|
import org.apache.commons.io.FileUtils
|
||||||
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
|
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
|
||||||
import org.apache.kafka.clients.admin._
|
import org.apache.kafka.clients.admin._
|
||||||
import org.apache.kafka.common.acl.{AclBinding, AclBindingFilter}
|
import org.apache.kafka.common.acl.{AclBinding, AclBindingFilter}
|
||||||
|
@ -1369,6 +1371,129 @@ class KRaftClusterTest {
|
||||||
cluster.close()
|
cluster.close()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
def testAbandonedFutureReplicaRecovered_mainReplicaInOfflineLogDir(): Unit = {
|
||||||
|
val cluster = new KafkaClusterTestKit.Builder(
|
||||||
|
new TestKitNodes.Builder().
|
||||||
|
setBootstrapMetadataVersion(MetadataVersion.IBP_3_7_IV2).
|
||||||
|
setBrokerNodes(3, 2).
|
||||||
|
setNumControllerNodes(1).build()).
|
||||||
|
build()
|
||||||
|
try {
|
||||||
|
cluster.format()
|
||||||
|
cluster.startup()
|
||||||
|
val admin = Admin.create(cluster.clientProperties())
|
||||||
|
try {
|
||||||
|
val broker0 = cluster.brokers().get(0)
|
||||||
|
val broker1 = cluster.brokers().get(1)
|
||||||
|
val foo0 = new TopicPartition("foo", 0)
|
||||||
|
|
||||||
|
admin.createTopics(Arrays.asList(
|
||||||
|
new NewTopic("foo", 3, 3.toShort))).all().get()
|
||||||
|
|
||||||
|
// Wait until foo-0 is created on broker0.
|
||||||
|
TestUtils.retry(60000) {
|
||||||
|
assertTrue(broker0.logManager.getLog(foo0).isDefined)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Shut down broker0 and wait until the ISR of foo-0 is set to [1, 2]
|
||||||
|
broker0.shutdown()
|
||||||
|
TestUtils.retry(60000) {
|
||||||
|
val info = broker1.metadataCache.getPartitionInfo("foo", 0)
|
||||||
|
assertTrue(info.isDefined)
|
||||||
|
assertEquals(Set(1, 2), info.get.isr().asScala.toSet)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Modify foo-0 so that it refers to a future replica.
|
||||||
|
// This is equivalent to a failure during the promotion of the future replica and a restart with directory for
|
||||||
|
// the main replica being offline
|
||||||
|
val log = broker0.logManager.getLog(foo0).get
|
||||||
|
log.renameDir(UnifiedLog.logFutureDirName(foo0), shouldReinitialize = false)
|
||||||
|
|
||||||
|
// Start up broker0 and wait until the ISR of foo-0 is set to [0, 1, 2]
|
||||||
|
broker0.startup()
|
||||||
|
TestUtils.retry(60000) {
|
||||||
|
val info = broker1.metadataCache.getPartitionInfo("foo", 0)
|
||||||
|
assertTrue(info.isDefined)
|
||||||
|
assertEquals(Set(0, 1, 2), info.get.isr().asScala.toSet)
|
||||||
|
assertTrue(broker0.logManager.getLog(foo0, isFuture = true).isEmpty)
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
admin.close()
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
cluster.close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
def testAbandonedFutureReplicaRecovered_mainReplicaInOnlineLogDir(): Unit = {
|
||||||
|
val cluster = new KafkaClusterTestKit.Builder(
|
||||||
|
new TestKitNodes.Builder().
|
||||||
|
setBootstrapMetadataVersion(MetadataVersion.IBP_3_7_IV2).
|
||||||
|
setBrokerNodes(3, 2).
|
||||||
|
setNumControllerNodes(1).build()).
|
||||||
|
build()
|
||||||
|
try {
|
||||||
|
cluster.format()
|
||||||
|
cluster.startup()
|
||||||
|
val admin = Admin.create(cluster.clientProperties())
|
||||||
|
try {
|
||||||
|
val broker0 = cluster.brokers().get(0)
|
||||||
|
val broker1 = cluster.brokers().get(1)
|
||||||
|
val foo0 = new TopicPartition("foo", 0)
|
||||||
|
|
||||||
|
admin.createTopics(Arrays.asList(
|
||||||
|
new NewTopic("foo", 3, 3.toShort))).all().get()
|
||||||
|
|
||||||
|
// Wait until foo-0 is created on broker0.
|
||||||
|
TestUtils.retry(60000) {
|
||||||
|
assertTrue(broker0.logManager.getLog(foo0).isDefined)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Shut down broker0 and wait until the ISR of foo-0 is set to [1, 2]
|
||||||
|
broker0.shutdown()
|
||||||
|
TestUtils.retry(60000) {
|
||||||
|
val info = broker1.metadataCache.getPartitionInfo("foo", 0)
|
||||||
|
assertTrue(info.isDefined)
|
||||||
|
assertEquals(Set(1, 2), info.get.isr().asScala.toSet)
|
||||||
|
}
|
||||||
|
|
||||||
|
val log = broker0.logManager.getLog(foo0).get
|
||||||
|
|
||||||
|
// Copy foo-0 to targetParentDir
|
||||||
|
// This is so that we can rename the main replica to a future down below
|
||||||
|
val parentDir = log.parentDir
|
||||||
|
val targetParentDir = broker0.config.logDirs.filter(_ != parentDir).head
|
||||||
|
val targetDirFile = new File(targetParentDir, log.dir.getName)
|
||||||
|
FileUtils.copyDirectory(log.dir, targetDirFile)
|
||||||
|
assertTrue(targetDirFile.exists())
|
||||||
|
|
||||||
|
// Rename original log to a future
|
||||||
|
// This is equivalent to a failure during the promotion of the future replica and a restart with directory for
|
||||||
|
// the main replica being online
|
||||||
|
val originalLogFile = log.dir
|
||||||
|
log.renameDir(UnifiedLog.logFutureDirName(foo0), shouldReinitialize = false)
|
||||||
|
assertFalse(originalLogFile.exists())
|
||||||
|
|
||||||
|
// Start up broker0 and wait until the ISR of foo-0 is set to [0, 1, 2]
|
||||||
|
broker0.startup()
|
||||||
|
TestUtils.retry(60000) {
|
||||||
|
val info = broker1.metadataCache.getPartitionInfo("foo", 0)
|
||||||
|
assertTrue(info.isDefined)
|
||||||
|
assertEquals(Set(0, 1, 2), info.get.isr().asScala.toSet)
|
||||||
|
assertTrue(broker0.logManager.getLog(foo0, isFuture = true).isEmpty)
|
||||||
|
assertFalse(targetDirFile.exists())
|
||||||
|
assertTrue(originalLogFile.exists())
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
admin.close()
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
cluster.close()
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class BadAuthorizer() extends Authorizer {
|
class BadAuthorizer() extends Authorizer {
|
||||||
|
|
Loading…
Reference in New Issue