KAFKA-13092: Perf regression in LISR requests (#11073)

After noticing increased LISR times, we discovered a lot of time was spent synchronously flushing the partition metadata file. This PR changes the code so we asynchronously flush the files.

We ensure files are flushed before appending, renaming or closing the log to ensure we have the partition metadata information on disk. Three new tests have been added to address these cases.

(cherry-picked from 584213ed20)

Reviewers:  Lucas Bradstreet <lucas@confluent.io>, Jun Rao <junrao@gmail.com>, David Jacot <djacot@confluent.io>
This commit is contained in:
Justine Olshan 2021-08-31 00:26:02 -07:00 committed by GitHub
parent 5bdd5bf730
commit fb47f6a9b7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 360 additions and 30 deletions

View File

@ -577,14 +577,6 @@ class Log(@volatile private var _dir: File,
partitionMetadataFile = new PartitionMetadataFile(partitionMetadata, logDirFailureChannel)
}
/** Only used for ZK clusters when we update and start using topic IDs on existing topics */
def assignTopicId(topicId: Uuid): Unit = {
if (keepPartitionMetadataFile) {
partitionMetadataFile.write(topicId)
this.topicId = topicId
}
}
private def initializeLeaderEpochCache(): Unit = lock synchronized {
val leaderEpochFile = LeaderEpochCheckpointFile.newFile(dir)
@ -609,6 +601,29 @@ class Log(@volatile private var _dir: File,
}
}
private def maybeFlushMetadataFile(): Unit = {
partitionMetadataFile.maybeFlush()
}
/** Only used for ZK clusters when we update and start using topic IDs on existing topics */
def assignTopicId(topicId: Uuid): Unit = {
if (!this.topicId.equals(Uuid.ZERO_UUID)) {
if (!this.topicId.equals(topicId)) {
// we should never get here as the topic IDs should have been checked in becomeLeaderOrFollower
throw new InconsistentTopicIdException(s"Tried to assign topic ID $topicId to log for topic partition $topicPartition," +
s"but log already contained topic ID ${this.topicId}")
}
}
if (keepPartitionMetadataFile) {
this.topicId = topicId
if (!partitionMetadataFile.exists()) {
partitionMetadataFile.record(topicId)
scheduler.schedule("flush-metadata-file", maybeFlushMetadataFile)
}
}
}
/**
* Removes any temporary files found in log directory, and creates a list of all .swap files which could be swapped
* in place of existing segment(s). For log splitting, we know that any .swap file whose base offset is higher than
@ -1056,6 +1071,7 @@ class Log(@volatile private var _dir: File,
def close(): Unit = {
debug("Closing log")
lock synchronized {
maybeFlushMetadataFile()
checkIfMemoryMappedBufferClosed()
producerExpireCheck.cancel(true)
maybeHandleIOException(s"Error while renaming dir for $topicPartition in dir ${dir.getParent}") {
@ -1076,6 +1092,8 @@ class Log(@volatile private var _dir: File,
def renameDir(name: String): Unit = {
lock synchronized {
maybeHandleIOException(s"Error while renaming dir for $topicPartition in log dir ${dir.getParent}") {
// Flush partitionMetadata file before initializing again
maybeFlushMetadataFile()
val renamedDir = new File(dir.getParent, name)
Utils.atomicMoveWithFallback(dir.toPath, renamedDir.toPath)
if (renamedDir != dir) {
@ -1160,6 +1178,9 @@ class Log(@volatile private var _dir: File,
validateAndAssignOffsets: Boolean,
leaderEpoch: Int,
ignoreRecordSize: Boolean): LogAppendInfo = {
// We want to ensure the partition metadata file is written to the log dir before any log data is written to disk.
// This will ensure that any log data can be recovered with the correct topic ID in the case of failure.
maybeFlushMetadataFile()
val appendInfo = analyzeAndValidateRecords(records, origin, ignoreRecordSize, leaderEpoch)

View File

@ -24,7 +24,7 @@ import java.util.regex.Pattern
import kafka.utils.Logging
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.errors.KafkaStorageException
import org.apache.kafka.common.errors.{InconsistentTopicIdException, KafkaStorageException}
import org.apache.kafka.common.utils.Utils
@ -90,27 +90,48 @@ class PartitionMetadataFile(val file: File,
private val tempPath = Paths.get(path.toString + ".tmp")
private val lock = new Object()
private val logDir = file.getParentFile.getParent
@volatile private var dirtyTopicIdOpt : Option[Uuid] = None
def write(topicId: Uuid): Unit = {
lock synchronized {
try {
// write to temp file and then swap with the existing file
val fileOutputStream = new FileOutputStream(tempPath.toFile)
val writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8))
try {
writer.write(PartitionMetadataFileFormatter.toFile(new PartitionMetadata(CurrentVersion,topicId)))
writer.flush()
fileOutputStream.getFD().sync()
} finally {
writer.close()
/**
* Records the topic ID that will be flushed to disk.
*/
def record(topicId: Uuid): Unit = {
// Topic IDs should not differ, but we defensively check here to fail earlier in the case that the IDs somehow differ.
dirtyTopicIdOpt.foreach { dirtyTopicId =>
if (dirtyTopicId != topicId)
throw new InconsistentTopicIdException(s"Tried to record topic ID $topicId to file " +
s"but had already recorded $dirtyTopicId")
}
dirtyTopicIdOpt = Some(topicId)
}
def maybeFlush(): Unit = {
// We check dirtyTopicId first to avoid having to take the lock unnecessarily in the frequently called log append path
dirtyTopicIdOpt.foreach { _ =>
// We synchronize on the actual write to disk
lock synchronized {
dirtyTopicIdOpt.foreach { topicId =>
try {
// write to temp file and then swap with the existing file
val fileOutputStream = new FileOutputStream(tempPath.toFile)
val writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8))
try {
writer.write(PartitionMetadataFileFormatter.toFile(new PartitionMetadata(CurrentVersion, topicId)))
writer.flush()
fileOutputStream.getFD().sync()
} finally {
writer.close()
}
Utils.atomicMoveWithFallback(tempPath, path)
} catch {
case e: IOException =>
val msg = s"Error while writing to partition metadata file ${file.getAbsolutePath}"
logDirFailureChannel.maybeAddOfflineLogDir(logDir, msg, e)
throw new KafkaStorageException(msg, e)
}
dirtyTopicIdOpt = None
}
Utils.atomicMoveWithFallback(tempPath, path)
} catch {
case e: IOException =>
val msg = s"Error while writing to partition metadata file ${file.getAbsolutePath}"
logDirFailureChannel.maybeAddOfflineLogDir(logDir, msg, e)
throw new KafkaStorageException(msg, e)
}
}
}

View File

@ -2547,7 +2547,7 @@ class LogTest {
var log = createLog(logDir, logConfig)
val topicId = Uuid.randomUuid()
log.partitionMetadataFile.write(topicId)
log.assignTopicId(topicId)
log.close()
// test recovery case
@ -2556,6 +2556,37 @@ class LogTest {
log.close()
}
def testLogFlushesPartitionMetadataOnAppend(): Unit = {
val logConfig = LogTest.createLogConfig()
val log = createLog(logDir, logConfig)
val record = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("simpleValue".getBytes))
val topicId = Uuid.randomUuid()
log.partitionMetadataFile.record(topicId)
// Should trigger a synchronous flush
log.appendAsLeader(record, leaderEpoch = 0)
assertTrue(log.partitionMetadataFile.exists())
assertEquals(topicId, log.partitionMetadataFile.read().topicId)
}
@Test
def testLogFlushesPartitionMetadataOnClose(): Unit = {
val logConfig = LogTest.createLogConfig()
var log = createLog(logDir, logConfig)
val topicId = Uuid.randomUuid()
log.partitionMetadataFile.record(topicId)
// Should trigger a synchronous flush
log.close()
// We open the log again, and the partition metadata file should exist with the same ID.
log = createLog(logDir, logConfig)
assertTrue(log.partitionMetadataFile.exists())
assertEquals(topicId, log.partitionMetadataFile.read().topicId)
}
/**
* Test building the time index on the follower by setting assignOffsets to false.
*/
@ -3117,7 +3148,7 @@ class LogTest {
// Write a topic ID to the partition metadata file to ensure it is transferred correctly.
val id = Uuid.randomUuid()
log.topicId = id
log.partitionMetadataFile.write(id)
log.assignTopicId(id)
log.appendAsLeader(TestUtils.records(List(new SimpleRecord("foo".getBytes()))), leaderEpoch = 5)
assertEquals(Some(5), log.latestEpoch)
@ -3135,6 +3166,26 @@ class LogTest {
assertEquals(id, log.partitionMetadataFile.read().topicId)
}
@Test
def testTopicIdFlushesBeforeDirectoryRename(): Unit = {
val logConfig = LogTest.createLogConfig(segmentBytes = 1000, indexIntervalBytes = 1, maxMessageBytes = 64 * 1024)
val log = createLog(logDir, logConfig)
// Write a topic ID to the partition metadata file to ensure it is transferred correctly.
val topicId = Uuid.randomUuid()
log.partitionMetadataFile.record(topicId)
// Ensure that after a directory rename, the partition metadata file is written to the right location.
val tp = Log.parseTopicPartitionName(log.dir)
log.renameDir(Log.logDeleteDirName(tp))
assertTrue(PartitionMetadataFile.newFile(log.dir).exists())
assertFalse(PartitionMetadataFile.newFile(this.logDir).exists())
// Check the file holds the correct contents.
assertTrue(log.partitionMetadataFile.exists())
assertEquals(topicId, log.partitionMetadataFile.read().topicId)
}
@Test
def testLeaderEpochCacheClearedAfterDowngradeInAppendedMessages(): Unit = {
val logConfig = LogTest.createLogConfig(segmentBytes = 1000, indexIntervalBytes = 1, maxMessageBytes = 64 * 1024)

View File

@ -0,0 +1,237 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.jmh.server;
import java.util.Properties;
import kafka.cluster.Partition;
import kafka.log.CleanerConfig;
import kafka.log.Defaults;
import kafka.log.LogConfig;
import kafka.log.LogManager;
import kafka.server.AlterIsrManager;
import kafka.server.BrokerTopicStats;
import kafka.server.KafkaConfig;
import kafka.server.LogDirFailureChannel;
import kafka.server.MetadataCache;
import kafka.server.QuotaFactory;
import kafka.server.ReplicaManager;
import kafka.server.ZkMetadataCache;
import kafka.server.checkpoints.OffsetCheckpoints;
import kafka.server.metadata.CachedConfigRepository;
import kafka.server.metadata.ConfigRepository;
import kafka.utils.KafkaScheduler;
import kafka.utils.Scheduler;
import kafka.utils.TestUtils;
import kafka.zk.KafkaZkClient;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.LeaderAndIsrRequestData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import scala.collection.JavaConverters;
import scala.Option;
@Warmup(iterations = 5)
@Measurement(iterations = 5)
@Fork(3)
@BenchmarkMode(Mode.AverageTime)
@State(value = Scope.Benchmark)
public class PartitionCreationBench {
@Param({"false", "true"})
public boolean useTopicIds;
@Param({"2000"})
public int numPartitions;
private final String topicName = "foo";
private Option<Uuid> topicId;
private Scheduler scheduler;
private Metrics metrics;
private Time time;
private KafkaConfig brokerProperties;
private ReplicaManager replicaManager;
private QuotaFactory.QuotaManagers quotaManagers;
private LogDirFailureChannel failureChannel;
private LogManager logManager;
private AlterIsrManager alterIsrManager;
private List<TopicPartition> topicPartitions;
@SuppressWarnings("deprecation")
@Setup(Level.Invocation)
public void setup() {
if (useTopicIds)
topicId = Option.apply(Uuid.randomUuid());
else
topicId = Option.empty();
this.scheduler = new KafkaScheduler(1, "scheduler-thread", true);
this.brokerProperties = KafkaConfig.fromProps(TestUtils.createBrokerConfig(
0, TestUtils.MockZkConnect(), true, true, 9092, Option.empty(), Option.empty(),
Option.empty(), true, false, 0, false, 0, false, 0, Option.empty(), 1, true, 1,
(short) 1));
this.metrics = new Metrics();
this.time = Time.SYSTEM;
this.failureChannel = new LogDirFailureChannel(brokerProperties.logDirs().size());
final BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
final List<File> files =
JavaConverters.seqAsJavaList(brokerProperties.logDirs()).stream().map(File::new).collect(Collectors.toList());
CleanerConfig cleanerConfig = CleanerConfig.apply(1,
4 * 1024 * 1024L, 0.9d,
1024 * 1024, 32 * 1024 * 1024,
Double.MAX_VALUE, 15 * 1000, true, "MD5");
ConfigRepository configRepository = new CachedConfigRepository();
this.logManager = new LogManager(JavaConverters.asScalaIteratorConverter(files.iterator()).asScala().toSeq(),
JavaConverters.asScalaIteratorConverter(new ArrayList<File>().iterator()).asScala().toSeq(),
configRepository,
createLogConfig(),
cleanerConfig,
1,
1000L,
10000L,
10000L,
1000L,
60000,
scheduler,
brokerTopicStats,
failureChannel,
Time.SYSTEM,
true);
scheduler.startup();
final MetadataCache metadataCache =
new ZkMetadataCache(this.brokerProperties.brokerId());
this.quotaManagers =
QuotaFactory.instantiate(this.brokerProperties,
this.metrics,
this.time, "");
KafkaZkClient zkClient = new KafkaZkClient(null, false, Time.SYSTEM) {
@Override
public Properties getEntityConfigs(String rootEntityType, String sanitizedEntityName) {
return new Properties();
}
};
this.alterIsrManager = TestUtils.createAlterIsrManager();
this.replicaManager = new ReplicaManager(
this.brokerProperties,
this.metrics,
this.time,
Option.apply(zkClient),
this.scheduler,
this.logManager,
new AtomicBoolean(false),
this.quotaManagers,
brokerTopicStats,
metadataCache,
this.failureChannel,
alterIsrManager,
configRepository,
Option.empty());
replicaManager.startup();
replicaManager.checkpointHighWatermarks();
}
@TearDown(Level.Invocation)
public void tearDown() throws Exception {
this.replicaManager.shutdown(false);
logManager.shutdown();
this.metrics.close();
this.scheduler.shutdown();
this.quotaManagers.shutdown();
for (File dir : JavaConverters.asJavaCollection(logManager.liveLogDirs())) {
Utils.delete(dir);
}
}
private static LogConfig createLogConfig() {
Properties logProps = new Properties();
logProps.put(LogConfig.SegmentMsProp(), Defaults.SegmentMs());
logProps.put(LogConfig.SegmentBytesProp(), Defaults.SegmentSize());
logProps.put(LogConfig.RetentionMsProp(), Defaults.RetentionMs());
logProps.put(LogConfig.RetentionBytesProp(), Defaults.RetentionSize());
logProps.put(LogConfig.SegmentJitterMsProp(), Defaults.SegmentJitterMs());
logProps.put(LogConfig.CleanupPolicyProp(), Defaults.CleanupPolicy());
logProps.put(LogConfig.MaxMessageBytesProp(), Defaults.MaxMessageSize());
logProps.put(LogConfig.IndexIntervalBytesProp(), Defaults.IndexInterval());
logProps.put(LogConfig.SegmentIndexBytesProp(), Defaults.MaxIndexSize());
logProps.put(LogConfig.MessageFormatVersionProp(), Defaults.MessageFormatVersion());
logProps.put(LogConfig.FileDeleteDelayMsProp(), Defaults.FileDeleteDelayMs());
return LogConfig.apply(logProps, new scala.collection.immutable.HashSet<>());
}
@Benchmark
@Threads(1)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void makeFollower() {
topicPartitions = new ArrayList<>();
for (int partitionNum = 0; partitionNum < numPartitions; partitionNum++) {
topicPartitions.add(new TopicPartition(topicName, partitionNum));
}
List<Integer> replicas = new ArrayList<>();
replicas.add(0);
replicas.add(1);
replicas.add(2);
OffsetCheckpoints checkpoints = (logDir, topicPartition) -> Option.apply(0L);
for (TopicPartition topicPartition : topicPartitions) {
final Partition partition = this.replicaManager.createPartition(topicPartition);
List<Integer> inSync = new ArrayList<>();
inSync.add(0);
inSync.add(1);
inSync.add(2);
LeaderAndIsrRequestData.LeaderAndIsrPartitionState partitionState = new LeaderAndIsrRequestData.LeaderAndIsrPartitionState()
.setControllerEpoch(0)
.setLeader(0)
.setLeaderEpoch(0)
.setIsr(inSync)
.setZkVersion(1)
.setReplicas(replicas)
.setIsNew(true);
partition.makeFollower(partitionState, checkpoints);
topicId.foreach(partition::checkOrSetTopicId);
}
}
}