diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java index 4ec7db604bc..2f5e2e50dde 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java @@ -211,6 +211,10 @@ public class FileRecords extends AbstractRecords implements Closeable { * Close this record set */ public void close() throws IOException { + if (!channel.isOpen()) { + return; + } + flush(); trim(); channel.close(); diff --git a/server/src/test/java/org/apache/kafka/server/LogManagerIntegrationTest.java b/server/src/test/java/org/apache/kafka/server/LogManagerIntegrationTest.java index 6a5e49825d7..15a57bbe22d 100644 --- a/server/src/test/java/org/apache/kafka/server/LogManagerIntegrationTest.java +++ b/server/src/test/java/org/apache/kafka/server/LogManagerIntegrationTest.java @@ -34,9 +34,11 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.test.ClusterInstance; import org.apache.kafka.common.test.api.ClusterTest; import org.apache.kafka.common.test.api.Type; +import org.apache.kafka.storage.internals.checkpoint.CleanShutdownFileHandler; import org.apache.kafka.storage.internals.checkpoint.PartitionMetadataFile; import org.apache.kafka.test.TestUtils; +import java.io.File; import java.io.IOException; import java.time.Duration; import java.util.ArrayList; @@ -57,6 +59,70 @@ public class LogManagerIntegrationTest { this.cluster = cluster; } + @ClusterTest(types = {Type.KRAFT}) + public void testIOExceptionOnLogSegmentCloseResultsInRecovery() throws IOException, InterruptedException, ExecutionException { + try (Admin admin = cluster.admin()) { + admin.createTopics(List.of(new NewTopic("foo", 1, (short) 1))).all().get(); + } + cluster.waitForTopic("foo", 1); + + // Produce some data into the topic + Map producerConfigs = Map.of( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers(), + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(), + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName() + ); + + try (Producer producer = new KafkaProducer<>(producerConfigs)) { + producer.send(new ProducerRecord<>("foo", 0, null, "bar")).get(); + producer.flush(); + } + + var broker = cluster.brokers().get(0); + + File timeIndexFile = broker.logManager() + .getLog(new TopicPartition("foo", 0), false).get() + .activeSegment() + .timeIndexFile(); + + // Set read only so that we throw an IOException on shutdown + assertTrue(timeIndexFile.exists()); + assertTrue(timeIndexFile.setReadOnly()); + + broker.shutdown(); + + assertEquals(1, broker.config().logDirs().size()); + String logDir = broker.config().logDirs().get(0); + CleanShutdownFileHandler cleanShutdownFileHandler = new CleanShutdownFileHandler(logDir); + assertFalse(cleanShutdownFileHandler.exists(), "Did not expect the clean shutdown file to exist"); + + // Ensure we have a corrupt index on broker shutdown + long maxIndexSize = broker.config().logIndexSizeMaxBytes(); + long expectedIndexSize = 12 * (maxIndexSize / 12); + assertEquals(expectedIndexSize, timeIndexFile.length()); + + // Allow write permissions before startup + assertTrue(timeIndexFile.setWritable(true)); + + broker.startup(); + // make sure there is no error during load logs + assertTrue(cluster.firstFatalException().isEmpty()); + try (Admin admin = cluster.admin()) { + TestUtils.waitForCondition(() -> { + List partitionInfos = admin.describeTopics(List.of("foo")) + .topicNameValues().get("foo").get().partitions(); + return partitionInfos.get(0).leader().id() == 0; + }, "Partition does not have a leader assigned"); + } + + // Ensure that sanity check does not fail + broker.logManager() + .getLog(new TopicPartition("foo", 0), false).get() + .activeSegment() + .timeIndex() + .sanityCheck(); + } + @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, brokers = 3) public void testRestartBrokerNoErrorIfMissingPartitionMetadata() throws IOException, ExecutionException, InterruptedException { diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java index bf1bd802f5e..f2e7c9830bd 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java @@ -263,7 +263,9 @@ public abstract class AbstractIndex implements Closeable { public void trimToValidSize() throws IOException { lock.lock(); try { - resize(entrySize() * entries); + if (mmap != null) { + resize(entrySize() * entries); + } } finally { lock.unlock(); } diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java index cead2434087..9a16eaa4a18 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java @@ -751,10 +751,7 @@ public class LogSegment implements Closeable { public void close() throws IOException { if (maxTimestampAndOffsetSoFar != TimestampOffset.UNKNOWN) Utils.swallow(LOGGER, Level.WARN, "maybeAppend", () -> timeIndex().maybeAppend(maxTimestampSoFar(), shallowOffsetOfMaxTimestampSoFar(), true)); - Utils.closeQuietly(lazyOffsetIndex, "offsetIndex", LOGGER); - Utils.closeQuietly(lazyTimeIndex, "timeIndex", LOGGER); - Utils.closeQuietly(log, "log", LOGGER); - Utils.closeQuietly(txnIndex, "txnIndex", LOGGER); + Utils.closeAll(lazyOffsetIndex, lazyTimeIndex, log, txnIndex); } /** diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegments.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegments.java index a79602d56d1..4b89d7115a4 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegments.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegments.java @@ -17,6 +17,7 @@ package org.apache.kafka.storage.internals.log; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Utils; import java.io.Closeable; import java.io.File; @@ -104,8 +105,7 @@ public class LogSegments implements Closeable { */ @Override public void close() throws IOException { - for (LogSegment s : values()) - s.close(); + Utils.closeAll(values().toArray(new LogSegment[0])); } /** diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java index 9e28c253a5e..076dfa0627c 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java @@ -109,7 +109,7 @@ public class TransactionIndex implements Closeable { public void close() throws IOException { FileChannel channel = channelOrNull(); - if (channel != null) + if (channel != null && channel.isOpen()) channel.close(); maybeChannel = Optional.empty(); } diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentsTest.java b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentsTest.java index fcf02cc7b76..4136a76995a 100644 --- a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentsTest.java +++ b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentsTest.java @@ -38,7 +38,10 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class LogSegmentsTest { @@ -47,7 +50,7 @@ public class LogSegmentsTest { /* create a segment with the given base offset */ private static LogSegment createSegment(Long offset) throws IOException { - return LogTestUtils.createSegment(offset, logDir, 10, Time.SYSTEM); + return spy(LogTestUtils.createSegment(offset, logDir, 10, Time.SYSTEM)); } @BeforeEach @@ -274,4 +277,22 @@ public class LogSegmentsTest { } } + @Test + public void testCloseClosesAllLogSegmentsOnExceptionWhileClosingOne() throws IOException { + LogSegment seg1 = createSegment(0L); + LogSegment seg2 = createSegment(100L); + LogSegment seg3 = createSegment(200L); + LogSegments segments = new LogSegments(topicPartition); + segments.add(seg1); + segments.add(seg2); + segments.add(seg3); + + doThrow(new IOException("Failure")).when(seg2).close(); + + assertThrows(IOException.class, segments::close, "Expected IOException to be thrown"); + verify(seg1).close(); + verify(seg2).close(); + verify(seg3).close(); + } + } diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/log/OffsetIndexTest.java b/storage/src/test/java/org/apache/kafka/storage/internals/log/OffsetIndexTest.java index 918e9dd409c..ad7fa590852 100644 --- a/storage/src/test/java/org/apache/kafka/storage/internals/log/OffsetIndexTest.java +++ b/storage/src/test/java/org/apache/kafka/storage/internals/log/OffsetIndexTest.java @@ -225,7 +225,6 @@ public class OffsetIndexTest { idx.forceUnmap(); // mmap should be null after unmap causing lookup to throw a NPE assertThrows(NullPointerException.class, () -> idx.lookup(1)); - assertThrows(NullPointerException.class, idx::close); } @Test