KAFKA-15046: Get rid of unnecessary fsyncs inside UnifiedLog.lock to stabilize performance (#14242)

While any blocking operation under holding the UnifiedLog.lock could lead to serious performance (even availability) issues, currently there are several paths that calls fsync(2) inside the lock
In the meantime the lock is held, all subsequent produces against the partition may block
This easily causes all request-handlers to be busy on bad disk performance
Even worse, when a disk experiences tens of seconds of glitch (it's not rare in spinning drives), it makes the broker to unable to process any requests with unfenced from the cluster (i.e. "zombie" like status)
This PR gets rid of 4 cases of essentially-unnecessary fsync(2) calls performed under the lock:
(1) ProducerStateManager.takeSnapshot at UnifiedLog.roll
I moved fsync(2) call to the scheduler thread as part of existing "flush-log" job (before incrementing recovery point)
Since it's still ensured that the snapshot is flushed before incrementing recovery point, this change shouldn't cause any problem
(2) ProducerStateManager.removeAndMarkSnapshotForDeletion as part of log segment deletion
This method calls Utils.atomicMoveWithFallback with needFlushParentDir = true internally, which calls fsync.
I changed it to call Utils.atomicMoveWithFallback with needFlushParentDir = false (which is consistent behavior with index files deletion. index files deletion also doesn't flush parent dir)
This change shouldn't cause problems neither.
(3) LeaderEpochFileCache.truncateFromStart when incrementing log-start-offset
This path is called from deleteRecords on request-handler threads.
Here, we don't need fsync(2) either actually.
On unclean shutdown, few leader epochs might be remained in the file but it will be handled by LogLoader on start-up so not a problem
(4) LeaderEpochFileCache.truncateFromEnd as part of log truncation
Likewise, we don't need fsync(2) here, since any epochs which are untruncated on unclean shutdown will be handled on log loading procedure

Reviewers: Luke Chen <showuon@gmail.com>, Divij Vaidya <diviv@amazon.com>, Justine Olshan <jolshan@confluent.io>, Jun Rao <junrao@gmail.com>
This commit is contained in:
Okada Haruki 2023-11-30 02:43:44 +09:00 committed by GitHub
parent f1819f4480
commit d71d0639d9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 118 additions and 49 deletions

View File

@ -1021,6 +1021,17 @@ public final class Utils {
}
}
/**
* Flushes dirty file with swallowing {@link NoSuchFileException}
*/
public static void flushFileIfExists(Path path) throws IOException {
try (FileChannel fileChannel = FileChannel.open(path, StandardOpenOption.READ)) {
fileChannel.force(true);
} catch (NoSuchFileException e) {
log.warn("Failed to flush file {}", path, e);
}
}
/**
* Closes all the provided closeables.
* @throws IOException if any of the close methods throws an IOException.
@ -1543,7 +1554,7 @@ public final class Utils {
* Checks if a string is null, empty or whitespace only.
* @param str a string to be checked
* @return true if the string is null, empty or whitespace only; otherwise, return false.
*/
*/
public static boolean isBlank(String str) {
return str == null || str.trim().isEmpty();
}

View File

@ -44,7 +44,7 @@ import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, BatchMetadata, CompletedTxn, EpochEntry, FetchDataInfo, FetchIsolation, LastRecord, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, LogValidator, ProducerAppendInfo, ProducerStateManager, ProducerStateManagerConfig, RollParams, VerificationGuard}
import java.io.{File, IOException}
import java.nio.file.Files
import java.nio.file.{Files, Path}
import java.util
import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
import java.util.stream.Collectors
@ -1656,10 +1656,16 @@ class UnifiedLog(@volatile var logStartOffset: Long,
// may actually be ahead of the current producer state end offset (which corresponds to the log end offset),
// we manually override the state offset here prior to taking the snapshot.
producerStateManager.updateMapEndOffset(newSegment.baseOffset)
producerStateManager.takeSnapshot()
// We avoid potentially-costly fsync call, since we acquire UnifiedLog#lock here
// which could block subsequent produces in the meantime.
// flush is done in the scheduler thread along with segment flushing below
val maybeSnapshot = producerStateManager.takeSnapshot(false)
updateHighWatermarkWithLogEndOffset()
// Schedule an asynchronous flush of the old segment
scheduler.scheduleOnce("flush-log", () => flushUptoOffsetExclusive(newSegment.baseOffset))
scheduler.scheduleOnce("flush-log", () => {
maybeSnapshot.ifPresent(f => flushProducerStateSnapshot(f.toPath))
flushUptoOffsetExclusive(newSegment.baseOffset)
})
newSegment
}
@ -1742,6 +1748,12 @@ class UnifiedLog(@volatile var logStartOffset: Long,
producerStateManager.mapEndOffset
}
private[log] def flushProducerStateSnapshot(snapshot: Path): Unit = {
maybeHandleIOException(s"Error while deleting producer state snapshot $snapshot for $topicPartition in dir ${dir.getParent}") {
Utils.flushFileIfExists(snapshot)
}
}
/**
* Truncate this log so that it ends with the greatest offset < targetOffset.
*

View File

@ -68,7 +68,7 @@ class OffsetCheckpointFile(val file: File, logDirFailureChannel: LogDirFailureCh
def write(offsets: Map[TopicPartition, Long]): Unit = {
val list: java.util.List[(TopicPartition, Long)] = new java.util.ArrayList[(TopicPartition, Long)](offsets.size)
offsets.foreach(x => list.add(x))
checkpoint.write(list)
checkpoint.write(list, true)
}
def read(): Map[TopicPartition, Long] = {

View File

@ -184,7 +184,7 @@ public class RemoteLogManagerTest {
List<EpochEntry> epochs = Collections.emptyList();
@Override
public void write(Collection<EpochEntry> epochs) {
public void write(Collection<EpochEntry> epochs, boolean ignored) {
this.epochs = new ArrayList<>(epochs);
}

View File

@ -404,7 +404,7 @@ class LogSegmentTest {
val checkpoint: LeaderEpochCheckpoint = new LeaderEpochCheckpoint {
private var epochs = Seq.empty[EpochEntry]
override def write(epochs: util.Collection[EpochEntry]): Unit = {
override def write(epochs: util.Collection[EpochEntry], ignored: Boolean): Unit = {
this.epochs = epochs.asScala.toSeq
}

View File

@ -42,8 +42,8 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.EnumSource
import org.mockito.ArgumentMatchers
import org.mockito.ArgumentMatchers.anyLong
import org.mockito.Mockito.{mock, when}
import org.mockito.ArgumentMatchers.{any, anyLong}
import org.mockito.Mockito.{doThrow, mock, spy, when}
import java.io._
import java.nio.ByteBuffer
@ -3625,7 +3625,7 @@ class UnifiedLogTest {
val records = TestUtils.singletonRecords(value = s"test$i".getBytes)
log.appendAsLeader(records, leaderEpoch = 0)
}
log.updateHighWatermark(90L)
log.maybeIncrementLogStartOffset(20L, LogStartOffsetIncrementReason.SegmentDeletion)
assertEquals(20, log.logStartOffset)
@ -3911,6 +3911,24 @@ class UnifiedLogTest {
log.appendAsLeader(transactionalRecords, leaderEpoch = 0, verificationGuard = verificationGuard)
}
@Test
def testRecoveryPointNotIncrementedOnProducerStateSnapshotFlushFailure(): Unit = {
val logConfig = LogTestUtils.createLogConfig()
val log = spy(createLog(logDir, logConfig))
doThrow(new KafkaStorageException("Injected exception")).when(log).flushProducerStateSnapshot(any())
log.appendAsLeader(TestUtils.singletonRecords("a".getBytes), leaderEpoch = 0)
try {
log.roll(Some(1L))
} catch {
case _: KafkaStorageException => // ignore
}
// check that the recovery point isn't incremented
assertEquals(0L, log.recoveryPoint)
}
@Test
def testDeletableSegmentsFilter(): Unit = {
val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)

View File

@ -97,7 +97,7 @@ class OffsetCheckpointFileWithFailureHandlerTest extends Logging {
val logDirFailureChannel = new LogDirFailureChannel(10)
val checkpointFile = new CheckpointFileWithFailureHandler(file, OffsetCheckpointFile.CurrentVersion + 1,
OffsetCheckpointFile.Formatter, logDirFailureChannel, file.getParent)
checkpointFile.write(Collections.singletonList(new TopicPartition("foo", 5) -> 10L))
checkpointFile.write(Collections.singletonList(new TopicPartition("foo", 5) -> 10L), true)
assertThrows(classOf[KafkaStorageException], () => new OffsetCheckpointFile(checkpointFile.file, logDirFailureChannel).read())
}

View File

@ -38,7 +38,7 @@ class LeaderEpochFileCacheTest {
val tp = new TopicPartition("TestTopic", 5)
private val checkpoint: LeaderEpochCheckpoint = new LeaderEpochCheckpoint {
private var epochs: Seq[EpochEntry] = Seq()
override def write(epochs: java.util.Collection[EpochEntry]): Unit = this.epochs = epochs.asScala.toSeq
override def write(epochs: java.util.Collection[EpochEntry], ignored: Boolean): Unit = this.epochs = epochs.asScala.toSeq
override def read(): java.util.List[EpochEntry] = this.epochs.asJava
}

View File

@ -72,7 +72,7 @@ public class CheckpointFile<T> {
tempPath = Paths.get(absolutePath + ".tmp");
}
public void write(Collection<T> entries) throws IOException {
public void write(Collection<T> entries, boolean sync) throws IOException {
synchronized (lock) {
// write to temp file and then swap with the existing file
try (FileOutputStream fileOutputStream = new FileOutputStream(tempPath.toFile());
@ -80,10 +80,12 @@ public class CheckpointFile<T> {
CheckpointWriteBuffer<T> checkpointWriteBuffer = new CheckpointWriteBuffer<>(writer, version, formatter);
checkpointWriteBuffer.write(entries);
writer.flush();
fileOutputStream.getFD().sync();
if (sync) {
fileOutputStream.getFD().sync();
}
}
Utils.atomicMoveWithFallback(tempPath, absolutePath);
Utils.atomicMoveWithFallback(tempPath, absolutePath, sync);
}
}

View File

@ -68,7 +68,7 @@ public class CommittedOffsetsFile {
}
public synchronized void writeEntries(Map<Integer, Long> committedOffsets) throws IOException {
checkpointFile.write(committedOffsets.entrySet());
checkpointFile.write(committedOffsets.entrySet(), true);
}
public synchronized Map<Integer, Long> readEntries() throws IOException {
@ -83,4 +83,4 @@ public class CommittedOffsetsFile {
return partitionToOffsets;
}
}
}

View File

@ -41,9 +41,9 @@ public class CheckpointFileWithFailureHandler<T> {
checkpointFile = new CheckpointFile<>(file, version, formatter);
}
public void write(Collection<T> entries) {
public void write(Collection<T> entries, boolean sync) {
try {
checkpointFile.write(entries);
checkpointFile.write(entries, sync);
} catch (IOException e) {
String msg = "Error while writing to checkpoint file " + file.getAbsolutePath();
logDirFailureChannel.maybeAddOfflineLogDir(logDir, msg, e);

View File

@ -42,7 +42,7 @@ import java.util.List;
public class InMemoryLeaderEpochCheckpoint implements LeaderEpochCheckpoint {
private List<EpochEntry> epochs = Collections.emptyList();
public void write(Collection<EpochEntry> epochs) {
public void write(Collection<EpochEntry> epochs, boolean ignored) {
this.epochs = new ArrayList<>(epochs);
}
@ -60,4 +60,4 @@ public class InMemoryLeaderEpochCheckpoint implements LeaderEpochCheckpoint {
return ByteBuffer.wrap(stream.toByteArray());
}
}
}

View File

@ -22,8 +22,13 @@ import java.util.Collection;
import java.util.List;
public interface LeaderEpochCheckpoint {
// in file-backed checkpoint implementation, the content should be
// synced to the device if `sync` is true
void write(Collection<EpochEntry> epochs, boolean sync);
void write(Collection<EpochEntry> epochs);
default void write(Collection<EpochEntry> epochs) {
write(epochs, true);
}
List<EpochEntry> read();
}

View File

@ -53,7 +53,11 @@ public class LeaderEpochCheckpointFile implements LeaderEpochCheckpoint {
}
public void write(Collection<EpochEntry> epochs) {
checkpoint.write(epochs);
write(epochs, true);
}
public void write(Collection<EpochEntry> epochs, boolean sync) {
checkpoint.write(epochs, sync);
}
public List<EpochEntry> read() {
@ -75,4 +79,4 @@ public class LeaderEpochCheckpointFile implements LeaderEpochCheckpoint {
return (strings.length == 2) ? Optional.of(new EpochEntry(Integer.parseInt(strings[0]), Long.parseLong(strings[1]))) : Optional.empty();
}
}
}
}

View File

@ -73,7 +73,7 @@ public class LeaderEpochFileCache {
EpochEntry entry = new EpochEntry(epoch, startOffset);
if (assign(entry)) {
log.debug("Appended new epoch entry {}. Cache now contains {} entries.", entry, epochs.size());
flush();
writeToFile(true);
}
}
@ -83,7 +83,7 @@ public class LeaderEpochFileCache {
log.debug("Appended new epoch entry {}. Cache now contains {} entries.", entry, epochs.size());
}
});
if (!entries.isEmpty()) flush();
if (!entries.isEmpty()) writeToFile(true);
}
private boolean isUpdateNeeded(EpochEntry entry) {
@ -152,11 +152,6 @@ public class LeaderEpochFileCache {
return removedEpochs;
}
public LeaderEpochFileCache cloneWithLeaderEpochCheckpoint(LeaderEpochCheckpoint leaderEpochCheckpoint) {
flushTo(leaderEpochCheckpoint);
return new LeaderEpochFileCache(this.topicPartition, leaderEpochCheckpoint);
}
public boolean nonEmpty() {
lock.readLock().lock();
try {
@ -318,7 +313,14 @@ public class LeaderEpochFileCache {
if (endOffset >= 0 && epochEntry.isPresent() && epochEntry.get().startOffset >= endOffset) {
List<EpochEntry> removedEntries = removeFromEnd(x -> x.startOffset >= endOffset);
flush();
// We intentionally don't force flushing change to the device here because:
// - To avoid fsync latency
// * fsync latency could be huge on a disk glitch, which is not rare in spinning drives
// * This method is called by ReplicaFetcher threads, which could block replica fetching
// then causing ISR shrink or high produce response time degradation in remote scope on high fsync latency.
// - Even when stale epochs remained in LeaderEpoch file due to the unclean shutdown, it will be handled by
// another truncateFromEnd call on log loading procedure so it won't be a problem
writeToFile(false);
log.debug("Cleared entries {} from epoch cache after truncating to end offset {}, leaving {} entries in the cache.", removedEntries, endOffset, epochs.size());
}
@ -345,7 +347,14 @@ public class LeaderEpochFileCache {
EpochEntry updatedFirstEntry = new EpochEntry(firstBeforeStartOffset.epoch, startOffset);
epochs.put(updatedFirstEntry.epoch, updatedFirstEntry);
flush();
// We intentionally don't force flushing change to the device here because:
// - To avoid fsync latency
// * fsync latency could be huge on a disk glitch, which is not rare in spinning drives
// * This method is called as part of deleteRecords with holding UnifiedLog#lock.
// - Meanwhile all produces against the partition will be blocked, which causes req-handlers to exhaust
// - Even when stale epochs remained in LeaderEpoch file due to the unclean shutdown, it will be recovered by
// another truncateFromStart call on log loading procedure so it won't be a problem
writeToFile(false);
log.debug("Cleared entries {} and rewrote first entry {} after truncating to start offset {}, leaving {} in the cache.", removedEntries, updatedFirstEntry, startOffset, epochs.size());
}
@ -394,7 +403,7 @@ public class LeaderEpochFileCache {
lock.writeLock().lock();
try {
epochs.clear();
flush();
writeToFile(true);
} finally {
lock.writeLock().unlock();
}
@ -431,16 +440,12 @@ public class LeaderEpochFileCache {
}
}
private void flushTo(LeaderEpochCheckpoint leaderEpochCheckpoint) {
private void writeToFile(boolean sync) {
lock.readLock().lock();
try {
leaderEpochCheckpoint.write(epochs.values());
checkpoint.write(epochs.values(), sync);
} finally {
lock.readLock().unlock();
}
}
private void flush() {
flushTo(this.checkpoint);
}
}

View File

@ -462,14 +462,21 @@ public class ProducerStateManager {
}
/**
* Take a snapshot at the current end offset if one does not already exist.
* Take a snapshot at the current end offset if one does not already exist with syncing the change to the device
*/
public void takeSnapshot() throws IOException {
takeSnapshot(true);
}
/**
* Take a snapshot at the current end offset if one does not already exist, then return the snapshot file if taken.
*/
public Optional<File> takeSnapshot(boolean sync) throws IOException {
// If not a new offset, then it is not worth taking another snapshot
if (lastMapOffset > lastSnapOffset) {
SnapshotFile snapshotFile = new SnapshotFile(LogFileUtils.producerSnapshotFile(logDir, lastMapOffset));
long start = time.hiResClockMs();
writeSnapshot(snapshotFile.file(), producers);
writeSnapshot(snapshotFile.file(), producers, sync);
log.info("Wrote producer snapshot at offset {} with {} producer ids in {} ms.", lastMapOffset,
producers.size(), time.hiResClockMs() - start);
@ -477,7 +484,10 @@ public class ProducerStateManager {
// Update the last snap offset according to the serialized map
lastSnapOffset = lastMapOffset;
return Optional.of(snapshotFile.file());
}
return Optional.empty();
}
/**
@ -635,7 +645,7 @@ public class ProducerStateManager {
// deletion, so ignoring the exception here just means that the intended operation was
// already completed.
try {
snapshotFile.renameTo(LogFileUtils.DELETED_FILE_SUFFIX);
snapshotFile.renameToDelete();
return Optional.of(snapshotFile);
} catch (NoSuchFileException ex) {
log.info("Failed to rename producer state snapshot {} with deletion suffix because it was already deleted", snapshotFile.file().getAbsoluteFile());
@ -684,7 +694,7 @@ public class ProducerStateManager {
}
}
private static void writeSnapshot(File file, Map<Long, ProducerStateEntry> entries) throws IOException {
private static void writeSnapshot(File file, Map<Long, ProducerStateEntry> entries, boolean sync) throws IOException {
Struct struct = new Struct(PID_SNAPSHOT_MAP_SCHEMA);
struct.set(VERSION_FIELD, PRODUCER_SNAPSHOT_VERSION);
struct.set(CRC_FIELD, 0L); // we'll fill this after writing the entries
@ -716,7 +726,9 @@ public class ProducerStateManager {
try (FileChannel fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE)) {
fileChannel.write(buffer);
fileChannel.force(true);
if (sync) {
fileChannel.force(true);
}
}
}

View File

@ -60,10 +60,10 @@ public class SnapshotFile {
return file;
}
public void renameTo(String newSuffix) throws IOException {
File renamed = new File(Utils.replaceSuffix(file.getPath(), "", newSuffix));
public void renameToDelete() throws IOException {
File renamed = new File(Utils.replaceSuffix(file.getPath(), "", LogFileUtils.DELETED_FILE_SUFFIX));
try {
Utils.atomicMoveWithFallback(file.toPath(), renamed.toPath());
Utils.atomicMoveWithFallback(file.toPath(), renamed.toPath(), false);
} finally {
file = renamed;
}
@ -76,4 +76,4 @@ public class SnapshotFile {
", file=" + file +
')';
}
}
}