From edd0efdebfe45a472d7f5765d79b1ae54d71e4ae Mon Sep 17 00:00:00 2001 From: Gaurav Narula Date: Tue, 10 Jun 2025 18:09:52 +0100 Subject: [PATCH] KAFKA-19221 Propagate IOException on LogSegment#close (#19607) Log segment closure results in right sizing the segment on disk along with the associated index files. This is specially important for TimeIndexes where a failure to right size may eventually cause log roll failures leading to under replication and log cleaner failures. This change uses `Utils.closeAll` which propagates exceptions, resulting in an "unclean" shutdown. That would then cause the broker to attempt to recover the log segment and the index on next startup, thereby avoiding the failures described above. Reviewers: Omnia Ibrahim , Jun Rao , Chia-Ping Tsai --- .../kafka/common/record/FileRecords.java | 4 ++ .../server/LogManagerIntegrationTest.java | 66 +++++++++++++++++++ .../storage/internals/log/AbstractIndex.java | 4 +- .../storage/internals/log/LogSegment.java | 5 +- .../storage/internals/log/LogSegments.java | 4 +- .../internals/log/TransactionIndex.java | 2 +- .../internals/log/LogSegmentsTest.java | 23 ++++++- .../internals/log/OffsetIndexTest.java | 1 - 8 files changed, 99 insertions(+), 10 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java index 4ec7db604bc..2f5e2e50dde 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java @@ -211,6 +211,10 @@ public class FileRecords extends AbstractRecords implements Closeable { * Close this record set */ public void close() throws IOException { + if (!channel.isOpen()) { + return; + } + flush(); trim(); channel.close(); diff --git a/server/src/test/java/org/apache/kafka/server/LogManagerIntegrationTest.java b/server/src/test/java/org/apache/kafka/server/LogManagerIntegrationTest.java index 6a5e49825d7..15a57bbe22d 100644 --- a/server/src/test/java/org/apache/kafka/server/LogManagerIntegrationTest.java +++ b/server/src/test/java/org/apache/kafka/server/LogManagerIntegrationTest.java @@ -34,9 +34,11 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.test.ClusterInstance; import org.apache.kafka.common.test.api.ClusterTest; import org.apache.kafka.common.test.api.Type; +import org.apache.kafka.storage.internals.checkpoint.CleanShutdownFileHandler; import org.apache.kafka.storage.internals.checkpoint.PartitionMetadataFile; import org.apache.kafka.test.TestUtils; +import java.io.File; import java.io.IOException; import java.time.Duration; import java.util.ArrayList; @@ -57,6 +59,70 @@ public class LogManagerIntegrationTest { this.cluster = cluster; } + @ClusterTest(types = {Type.KRAFT}) + public void testIOExceptionOnLogSegmentCloseResultsInRecovery() throws IOException, InterruptedException, ExecutionException { + try (Admin admin = cluster.admin()) { + admin.createTopics(List.of(new NewTopic("foo", 1, (short) 1))).all().get(); + } + cluster.waitForTopic("foo", 1); + + // Produce some data into the topic + Map producerConfigs = Map.of( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers(), + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(), + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName() + ); + + try (Producer producer = new KafkaProducer<>(producerConfigs)) { + producer.send(new ProducerRecord<>("foo", 0, null, "bar")).get(); + producer.flush(); + } + + var broker = cluster.brokers().get(0); + + File timeIndexFile = broker.logManager() + .getLog(new TopicPartition("foo", 0), false).get() + .activeSegment() + .timeIndexFile(); + + // Set read only so that we throw an IOException on shutdown + assertTrue(timeIndexFile.exists()); + assertTrue(timeIndexFile.setReadOnly()); + + broker.shutdown(); + + assertEquals(1, broker.config().logDirs().size()); + String logDir = broker.config().logDirs().get(0); + CleanShutdownFileHandler cleanShutdownFileHandler = new CleanShutdownFileHandler(logDir); + assertFalse(cleanShutdownFileHandler.exists(), "Did not expect the clean shutdown file to exist"); + + // Ensure we have a corrupt index on broker shutdown + long maxIndexSize = broker.config().logIndexSizeMaxBytes(); + long expectedIndexSize = 12 * (maxIndexSize / 12); + assertEquals(expectedIndexSize, timeIndexFile.length()); + + // Allow write permissions before startup + assertTrue(timeIndexFile.setWritable(true)); + + broker.startup(); + // make sure there is no error during load logs + assertTrue(cluster.firstFatalException().isEmpty()); + try (Admin admin = cluster.admin()) { + TestUtils.waitForCondition(() -> { + List partitionInfos = admin.describeTopics(List.of("foo")) + .topicNameValues().get("foo").get().partitions(); + return partitionInfos.get(0).leader().id() == 0; + }, "Partition does not have a leader assigned"); + } + + // Ensure that sanity check does not fail + broker.logManager() + .getLog(new TopicPartition("foo", 0), false).get() + .activeSegment() + .timeIndex() + .sanityCheck(); + } + @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, brokers = 3) public void testRestartBrokerNoErrorIfMissingPartitionMetadata() throws IOException, ExecutionException, InterruptedException { diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java index bf1bd802f5e..f2e7c9830bd 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java @@ -263,7 +263,9 @@ public abstract class AbstractIndex implements Closeable { public void trimToValidSize() throws IOException { lock.lock(); try { - resize(entrySize() * entries); + if (mmap != null) { + resize(entrySize() * entries); + } } finally { lock.unlock(); } diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java index cead2434087..9a16eaa4a18 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java @@ -751,10 +751,7 @@ public class LogSegment implements Closeable { public void close() throws IOException { if (maxTimestampAndOffsetSoFar != TimestampOffset.UNKNOWN) Utils.swallow(LOGGER, Level.WARN, "maybeAppend", () -> timeIndex().maybeAppend(maxTimestampSoFar(), shallowOffsetOfMaxTimestampSoFar(), true)); - Utils.closeQuietly(lazyOffsetIndex, "offsetIndex", LOGGER); - Utils.closeQuietly(lazyTimeIndex, "timeIndex", LOGGER); - Utils.closeQuietly(log, "log", LOGGER); - Utils.closeQuietly(txnIndex, "txnIndex", LOGGER); + Utils.closeAll(lazyOffsetIndex, lazyTimeIndex, log, txnIndex); } /** diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegments.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegments.java index a79602d56d1..4b89d7115a4 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegments.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegments.java @@ -17,6 +17,7 @@ package org.apache.kafka.storage.internals.log; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Utils; import java.io.Closeable; import java.io.File; @@ -104,8 +105,7 @@ public class LogSegments implements Closeable { */ @Override public void close() throws IOException { - for (LogSegment s : values()) - s.close(); + Utils.closeAll(values().toArray(new LogSegment[0])); } /** diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java index 9e28c253a5e..076dfa0627c 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java @@ -109,7 +109,7 @@ public class TransactionIndex implements Closeable { public void close() throws IOException { FileChannel channel = channelOrNull(); - if (channel != null) + if (channel != null && channel.isOpen()) channel.close(); maybeChannel = Optional.empty(); } diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentsTest.java b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentsTest.java index fcf02cc7b76..4136a76995a 100644 --- a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentsTest.java +++ b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentsTest.java @@ -38,7 +38,10 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class LogSegmentsTest { @@ -47,7 +50,7 @@ public class LogSegmentsTest { /* create a segment with the given base offset */ private static LogSegment createSegment(Long offset) throws IOException { - return LogTestUtils.createSegment(offset, logDir, 10, Time.SYSTEM); + return spy(LogTestUtils.createSegment(offset, logDir, 10, Time.SYSTEM)); } @BeforeEach @@ -274,4 +277,22 @@ public class LogSegmentsTest { } } + @Test + public void testCloseClosesAllLogSegmentsOnExceptionWhileClosingOne() throws IOException { + LogSegment seg1 = createSegment(0L); + LogSegment seg2 = createSegment(100L); + LogSegment seg3 = createSegment(200L); + LogSegments segments = new LogSegments(topicPartition); + segments.add(seg1); + segments.add(seg2); + segments.add(seg3); + + doThrow(new IOException("Failure")).when(seg2).close(); + + assertThrows(IOException.class, segments::close, "Expected IOException to be thrown"); + verify(seg1).close(); + verify(seg2).close(); + verify(seg3).close(); + } + } diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/log/OffsetIndexTest.java b/storage/src/test/java/org/apache/kafka/storage/internals/log/OffsetIndexTest.java index 918e9dd409c..ad7fa590852 100644 --- a/storage/src/test/java/org/apache/kafka/storage/internals/log/OffsetIndexTest.java +++ b/storage/src/test/java/org/apache/kafka/storage/internals/log/OffsetIndexTest.java @@ -225,7 +225,6 @@ public class OffsetIndexTest { idx.forceUnmap(); // mmap should be null after unmap causing lookup to throw a NPE assertThrows(NullPointerException.class, () -> idx.lookup(1)); - assertThrows(NullPointerException.class, idx::close); } @Test