Fix perf regression on LISR requests by asynchronously flushing the partition.metadata file (#11056)

After noticing increased LISR times, we discovered a lot of time was spent synchronously flushing the partition metadata file. This PR changes the code so we asynchronously flush the files.

We ensure files are flushed before appending, renaming or closing the log to ensure we have the partition metadata information on disk. Three new tests have been added to address these cases.

Reviewers:  Lucas Bradstreet <lucas@confluent.io>, Jun Rao <junrao@gmail.com>
This commit is contained in:
Justine Olshan 2021-07-15 14:00:32 -07:00 committed by GitHub
parent e07de97a4c
commit 584213ed20
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 347 additions and 31 deletions

View File

@ -914,13 +914,8 @@ public final class Utils {
*/
public static void flushDir(Path path) throws IOException {
if (path != null) {
FileChannel dir = null;
try {
dir = FileChannel.open(path, StandardOpenOption.READ);
try (FileChannel dir = FileChannel.open(path, StandardOpenOption.READ)) {
dir.force(true);
} finally {
if (dir != null)
dir.close();
}
}
}

View File

@ -235,7 +235,6 @@ case object SnapshotGenerated extends LogStartOffsetIncrementReason {
* @param leaderEpochCache The LeaderEpochFileCache instance (if any) containing state associated
* with the provided logStartOffset and nextOffsetMetadata
* @param producerStateManager The ProducerStateManager instance containing state associated with the provided segments
* @param logDirFailureChannel The LogDirFailureChannel instance to asynchronously handle log directory failure
* @param _topicId optional Uuid to specify the topic ID for the topic if it exists. Should only be specified when
* first creating the log through Partition.makeLeader or Partition.makeFollower. When reloading a log,
* this field will be populated by reading the topic ID value from partition.metadata if it exists.
@ -321,7 +320,8 @@ class Log(@volatile var logStartOffset: Long,
}
}
} else if (keepPartitionMetadataFile) {
_topicId.foreach(partitionMetadataFile.write)
_topicId.foreach(partitionMetadataFile.record)
scheduler.schedule("flush-metadata-file", maybeFlushMetadataFile)
} else {
// We want to keep the file and the in-memory topic ID in sync.
_topicId = None
@ -546,11 +546,18 @@ class Log(@volatile var logStartOffset: Long,
partitionMetadataFile = new PartitionMetadataFile(partitionMetadata, logDirFailureChannel)
}
private def maybeFlushMetadataFile(): Unit = {
partitionMetadataFile.maybeFlush()
}
/** Only used for ZK clusters when we update and start using topic IDs on existing topics */
def assignTopicId(topicId: Uuid): Unit = {
if (keepPartitionMetadataFile) {
partitionMetadataFile.write(topicId)
_topicId = Some(topicId)
if (!partitionMetadataFile.exists()) {
partitionMetadataFile.record(topicId)
scheduler.schedule("flush-metadata-file", maybeFlushMetadataFile)
}
}
}
@ -628,6 +635,7 @@ class Log(@volatile var logStartOffset: Long,
def close(): Unit = {
debug("Closing log")
lock synchronized {
maybeFlushMetadataFile()
localLog.checkIfMemoryMappedBufferClosed()
producerExpireCheck.cancel(true)
maybeHandleIOException(s"Error while renaming dir for $topicPartition in dir ${dir.getParent}") {
@ -648,6 +656,8 @@ class Log(@volatile var logStartOffset: Long,
def renameDir(name: String): Unit = {
lock synchronized {
maybeHandleIOException(s"Error while renaming dir for $topicPartition in log dir ${dir.getParent}") {
// Flush partitionMetadata file before initializing again
maybeFlushMetadataFile()
if (localLog.renameDir(name)) {
producerStateManager.updateParentDir(dir)
// re-initialize leader epoch cache so that LeaderEpochCheckpointFile.checkpoint can correctly reference
@ -731,6 +741,9 @@ class Log(@volatile var logStartOffset: Long,
leaderEpoch: Int,
requestLocal: Option[RequestLocal],
ignoreRecordSize: Boolean): LogAppendInfo = {
// We want to ensure the partition metadata file is written to the log dir before any log data is written to disk.
// This will ensure that any log data can be recovered with the correct topic ID in the case of failure.
maybeFlushMetadataFile()
val appendInfo = analyzeAndValidateRecords(records, origin, ignoreRecordSize, leaderEpoch)

View File

@ -24,7 +24,7 @@ import java.util.regex.Pattern
import kafka.utils.Logging
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.errors.KafkaStorageException
import org.apache.kafka.common.errors.{InconsistentTopicIdException, KafkaStorageException}
import org.apache.kafka.common.utils.Utils
@ -90,27 +90,48 @@ class PartitionMetadataFile(val file: File,
private val tempPath = Paths.get(path.toString + ".tmp")
private val lock = new Object()
private val logDir = file.getParentFile.getParent
@volatile private var dirtyTopicIdOpt : Option[Uuid] = None
def write(topicId: Uuid): Unit = {
lock synchronized {
try {
// write to temp file and then swap with the existing file
val fileOutputStream = new FileOutputStream(tempPath.toFile)
val writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8))
try {
writer.write(PartitionMetadataFileFormatter.toFile(new PartitionMetadata(CurrentVersion,topicId)))
writer.flush()
fileOutputStream.getFD().sync()
} finally {
writer.close()
/**
* Records the topic ID that will be flushed to disk.
*/
def record(topicId: Uuid): Unit = {
// Topic IDs should not differ, but we defensively check here to fail earlier in the case that the IDs somehow differ.
dirtyTopicIdOpt.foreach { dirtyTopicId =>
if (dirtyTopicId != topicId)
throw new InconsistentTopicIdException(s"Tried to record topic ID $topicId to file " +
s"but had already recorded $dirtyTopicId")
}
dirtyTopicIdOpt = Some(topicId)
}
def maybeFlush(): Unit = {
// We check dirtyTopicId first to avoid having to take the lock unnecessarily in the frequently called log append path
dirtyTopicIdOpt.foreach { _ =>
// We synchronize on the actual write to disk
lock synchronized {
dirtyTopicIdOpt.foreach { topicId =>
try {
// write to temp file and then swap with the existing file
val fileOutputStream = new FileOutputStream(tempPath.toFile)
val writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8))
try {
writer.write(PartitionMetadataFileFormatter.toFile(new PartitionMetadata(CurrentVersion, topicId)))
writer.flush()
fileOutputStream.getFD().sync()
} finally {
writer.close()
}
Utils.atomicMoveWithFallback(tempPath, path)
} catch {
case e: IOException =>
val msg = s"Error while writing to partition metadata file ${file.getAbsolutePath}"
logDirFailureChannel.maybeAddOfflineLogDir(logDir, msg, e)
throw new KafkaStorageException(msg, e)
}
dirtyTopicIdOpt = None
}
Utils.atomicMoveWithFallback(tempPath, path)
} catch {
case e: IOException =>
val msg = s"Error while writing to partition metadata file ${file.getAbsolutePath}"
logDirFailureChannel.maybeAddOfflineLogDir(logDir, msg, e)
throw new KafkaStorageException(msg, e)
}
}
}

View File

@ -1827,13 +1827,45 @@ class LogTest {
log.appendAsFollower(second)
}
@Test
def testLogFlushesPartitionMetadataOnAppend(): Unit = {
val logConfig = LogTestUtils.createLogConfig()
val log = createLog(logDir, logConfig)
val record = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("simpleValue".getBytes))
val topicId = Uuid.randomUuid()
log.partitionMetadataFile.record(topicId)
// Should trigger a synchronous flush
log.appendAsLeader(record, leaderEpoch = 0)
assertTrue(log.partitionMetadataFile.exists())
assertEquals(topicId, log.partitionMetadataFile.read().topicId)
}
@Test
def testLogFlushesPartitionMetadataOnClose(): Unit = {
val logConfig = LogTestUtils.createLogConfig()
var log = createLog(logDir, logConfig)
val topicId = Uuid.randomUuid()
log.partitionMetadataFile.record(topicId)
// Should trigger a synchronous flush
log.close()
// We open the log again, and the partition metadata file should exist with the same ID.
log = createLog(logDir, logConfig)
assertTrue(log.partitionMetadataFile.exists())
assertEquals(topicId, log.partitionMetadataFile.read().topicId)
}
@Test
def testLogRecoversTopicId(): Unit = {
val logConfig = LogTestUtils.createLogConfig()
var log = createLog(logDir, logConfig)
val topicId = Uuid.randomUuid()
log.partitionMetadataFile.write(topicId)
log.assignTopicId(topicId)
log.close()
// test recovery case
@ -1869,7 +1901,7 @@ class LogTest {
var log = createLog(logDir, logConfig)
val topicId = Uuid.randomUuid()
log.partitionMetadataFile.write(topicId)
log.assignTopicId(topicId)
log.close()
// test creating a log with a new ID
@ -2235,6 +2267,26 @@ class LogTest {
assertEquals(topicId, log.partitionMetadataFile.read().topicId)
}
@Test
def testTopicIdFlushesBeforeDirectoryRename(): Unit = {
val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1000, indexIntervalBytes = 1, maxMessageBytes = 64 * 1024)
val log = createLog(logDir, logConfig)
// Write a topic ID to the partition metadata file to ensure it is transferred correctly.
val topicId = Uuid.randomUuid()
log.partitionMetadataFile.record(topicId)
// Ensure that after a directory rename, the partition metadata file is written to the right location.
val tp = Log.parseTopicPartitionName(log.dir)
log.renameDir(Log.logDeleteDirName(tp))
assertTrue(PartitionMetadataFile.newFile(log.dir).exists())
assertFalse(PartitionMetadataFile.newFile(this.logDir).exists())
// Check the file holds the correct contents.
assertTrue(log.partitionMetadataFile.exists())
assertEquals(topicId, log.partitionMetadataFile.read().topicId)
}
@Test
def testLeaderEpochCacheClearedAfterDowngradeInAppendedMessages(): Unit = {
val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1000, indexIntervalBytes = 1, maxMessageBytes = 64 * 1024)

View File

@ -0,0 +1,235 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.jmh.server;
import java.util.Properties;
import kafka.cluster.Partition;
import kafka.log.CleanerConfig;
import kafka.log.Defaults;
import kafka.log.LogConfig;
import kafka.log.LogManager;
import kafka.server.AlterIsrManager;
import kafka.server.BrokerTopicStats;
import kafka.server.KafkaConfig;
import kafka.server.LogDirFailureChannel;
import kafka.server.MetadataCache;
import kafka.server.QuotaFactory;
import kafka.server.ReplicaManager;
import kafka.server.ZkMetadataCache;
import kafka.server.checkpoints.OffsetCheckpoints;
import kafka.server.metadata.ConfigRepository;
import kafka.server.metadata.MockConfigRepository;
import kafka.utils.KafkaScheduler;
import kafka.utils.Scheduler;
import kafka.utils.TestUtils;
import kafka.zk.KafkaZkClient;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.LeaderAndIsrRequestData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import scala.collection.JavaConverters;
import scala.Option;
@Warmup(iterations = 5)
@Measurement(iterations = 5)
@Fork(3)
@BenchmarkMode(Mode.AverageTime)
@State(value = Scope.Benchmark)
public class PartitionCreationBench {
@Param({"false", "true"})
public boolean useTopicIds;
@Param({"2000"})
public int numPartitions;
private final String topicName = "foo";
private Option<Uuid> topicId;
private Scheduler scheduler;
private Metrics metrics;
private Time time;
private KafkaConfig brokerProperties;
private ReplicaManager replicaManager;
private QuotaFactory.QuotaManagers quotaManagers;
private LogDirFailureChannel failureChannel;
private LogManager logManager;
private AlterIsrManager alterIsrManager;
private List<TopicPartition> topicPartitions;
@SuppressWarnings("deprecation")
@Setup(Level.Invocation)
public void setup() {
if (useTopicIds)
topicId = Option.apply(Uuid.randomUuid());
else
topicId = Option.empty();
this.scheduler = new KafkaScheduler(1, "scheduler-thread", true);
this.brokerProperties = KafkaConfig.fromProps(TestUtils.createBrokerConfig(
0, TestUtils.MockZkConnect(), true, true, 9092, Option.empty(), Option.empty(),
Option.empty(), true, false, 0, false, 0, false, 0, Option.empty(), 1, true, 1,
(short) 1));
this.metrics = new Metrics();
this.time = Time.SYSTEM;
this.failureChannel = new LogDirFailureChannel(brokerProperties.logDirs().size());
final BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
final List<File> files =
JavaConverters.seqAsJavaList(brokerProperties.logDirs()).stream().map(File::new).collect(Collectors.toList());
CleanerConfig cleanerConfig = CleanerConfig.apply(1,
4 * 1024 * 1024L, 0.9d,
1024 * 1024, 32 * 1024 * 1024,
Double.MAX_VALUE, 15 * 1000, true, "MD5");
ConfigRepository configRepository = new MockConfigRepository();
this.logManager = new LogManager(JavaConverters.asScalaIteratorConverter(files.iterator()).asScala().toSeq(),
JavaConverters.asScalaIteratorConverter(new ArrayList<File>().iterator()).asScala().toSeq(),
configRepository,
createLogConfig(),
cleanerConfig,
1,
1000L,
10000L,
10000L,
1000L,
60000,
scheduler,
brokerTopicStats,
failureChannel,
Time.SYSTEM,
true);
scheduler.startup();
final MetadataCache metadataCache =
new ZkMetadataCache(this.brokerProperties.brokerId());
this.quotaManagers =
QuotaFactory.instantiate(this.brokerProperties,
this.metrics,
this.time, "");
KafkaZkClient zkClient = new KafkaZkClient(null, false, Time.SYSTEM) {
@Override
public Properties getEntityConfigs(String rootEntityType, String sanitizedEntityName) {
return new Properties();
}
};
this.alterIsrManager = TestUtils.createAlterIsrManager();
this.replicaManager = new ReplicaManager(
this.brokerProperties,
this.metrics,
this.time,
Option.apply(zkClient),
this.scheduler,
this.logManager,
new AtomicBoolean(false),
this.quotaManagers,
brokerTopicStats,
metadataCache,
this.failureChannel,
alterIsrManager,
Option.empty());
replicaManager.startup();
replicaManager.checkpointHighWatermarks();
}
@TearDown(Level.Invocation)
public void tearDown() throws Exception {
this.replicaManager.shutdown(false);
logManager.shutdown();
this.metrics.close();
this.scheduler.shutdown();
this.quotaManagers.shutdown();
for (File dir : JavaConverters.asJavaCollection(logManager.liveLogDirs())) {
Utils.delete(dir);
}
}
private static LogConfig createLogConfig() {
Properties logProps = new Properties();
logProps.put(LogConfig.SegmentMsProp(), Defaults.SegmentMs());
logProps.put(LogConfig.SegmentBytesProp(), Defaults.SegmentSize());
logProps.put(LogConfig.RetentionMsProp(), Defaults.RetentionMs());
logProps.put(LogConfig.RetentionBytesProp(), Defaults.RetentionSize());
logProps.put(LogConfig.SegmentJitterMsProp(), Defaults.SegmentJitterMs());
logProps.put(LogConfig.CleanupPolicyProp(), Defaults.CleanupPolicy());
logProps.put(LogConfig.MaxMessageBytesProp(), Defaults.MaxMessageSize());
logProps.put(LogConfig.IndexIntervalBytesProp(), Defaults.IndexInterval());
logProps.put(LogConfig.SegmentIndexBytesProp(), Defaults.MaxIndexSize());
logProps.put(LogConfig.MessageFormatVersionProp(), Defaults.MessageFormatVersion());
logProps.put(LogConfig.FileDeleteDelayMsProp(), Defaults.FileDeleteDelayMs());
return LogConfig.apply(logProps, new scala.collection.immutable.HashSet<>());
}
@Benchmark
@Threads(1)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void makeFollower() {
topicPartitions = new ArrayList<>();
for (int partitionNum = 0; partitionNum < numPartitions; partitionNum++) {
topicPartitions.add(new TopicPartition(topicName, partitionNum));
}
List<Integer> replicas = new ArrayList<>();
replicas.add(0);
replicas.add(1);
replicas.add(2);
OffsetCheckpoints checkpoints = (logDir, topicPartition) -> Option.apply(0L);
for (TopicPartition topicPartition : topicPartitions) {
final Partition partition = this.replicaManager.createPartition(topicPartition);
List<Integer> inSync = new ArrayList<>();
inSync.add(0);
inSync.add(1);
inSync.add(2);
LeaderAndIsrRequestData.LeaderAndIsrPartitionState partitionState = new LeaderAndIsrRequestData.LeaderAndIsrPartitionState()
.setControllerEpoch(0)
.setLeader(0)
.setLeaderEpoch(0)
.setIsr(inSync)
.setZkVersion(1)
.setReplicas(replicas)
.setIsNew(true);
partition.makeFollower(partitionState, checkpoints, topicId);
}
}
}