KAFKA-19221 Propagate IOException on LogSegment#close (#19607)

Log segment closure results in right sizing the segment on disk along
with the associated index files.

This is specially important for TimeIndexes where a failure to right
size may eventually cause log roll failures leading to under replication
and log cleaner failures.

This change uses `Utils.closeAll` which propagates exceptions, resulting
in an "unclean" shutdown. That would then cause the broker to attempt to
recover the log segment and the index on next startup, thereby avoiding
the failures described above.

Reviewers: Omnia Ibrahim <o.g.h.ibrahim@gmail.com>, Jun Rao
 <junrao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Gaurav Narula 2025-06-10 18:09:52 +01:00 committed by GitHub
parent 997abe464f
commit edd0efdebf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 99 additions and 10 deletions

View File

@ -211,6 +211,10 @@ public class FileRecords extends AbstractRecords implements Closeable {
* Close this record set * Close this record set
*/ */
public void close() throws IOException { public void close() throws IOException {
if (!channel.isOpen()) {
return;
}
flush(); flush();
trim(); trim();
channel.close(); channel.close();

View File

@ -34,9 +34,11 @@ import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.test.ClusterInstance; import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterTest; import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.Type; import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.storage.internals.checkpoint.CleanShutdownFileHandler;
import org.apache.kafka.storage.internals.checkpoint.PartitionMetadataFile; import org.apache.kafka.storage.internals.checkpoint.PartitionMetadataFile;
import org.apache.kafka.test.TestUtils; import org.apache.kafka.test.TestUtils;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.time.Duration; import java.time.Duration;
import java.util.ArrayList; import java.util.ArrayList;
@ -57,6 +59,70 @@ public class LogManagerIntegrationTest {
this.cluster = cluster; this.cluster = cluster;
} }
@ClusterTest(types = {Type.KRAFT})
public void testIOExceptionOnLogSegmentCloseResultsInRecovery() throws IOException, InterruptedException, ExecutionException {
try (Admin admin = cluster.admin()) {
admin.createTopics(List.of(new NewTopic("foo", 1, (short) 1))).all().get();
}
cluster.waitForTopic("foo", 1);
// Produce some data into the topic
Map<String, Object> producerConfigs = Map.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers(),
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(),
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()
);
try (Producer<String, String> producer = new KafkaProducer<>(producerConfigs)) {
producer.send(new ProducerRecord<>("foo", 0, null, "bar")).get();
producer.flush();
}
var broker = cluster.brokers().get(0);
File timeIndexFile = broker.logManager()
.getLog(new TopicPartition("foo", 0), false).get()
.activeSegment()
.timeIndexFile();
// Set read only so that we throw an IOException on shutdown
assertTrue(timeIndexFile.exists());
assertTrue(timeIndexFile.setReadOnly());
broker.shutdown();
assertEquals(1, broker.config().logDirs().size());
String logDir = broker.config().logDirs().get(0);
CleanShutdownFileHandler cleanShutdownFileHandler = new CleanShutdownFileHandler(logDir);
assertFalse(cleanShutdownFileHandler.exists(), "Did not expect the clean shutdown file to exist");
// Ensure we have a corrupt index on broker shutdown
long maxIndexSize = broker.config().logIndexSizeMaxBytes();
long expectedIndexSize = 12 * (maxIndexSize / 12);
assertEquals(expectedIndexSize, timeIndexFile.length());
// Allow write permissions before startup
assertTrue(timeIndexFile.setWritable(true));
broker.startup();
// make sure there is no error during load logs
assertTrue(cluster.firstFatalException().isEmpty());
try (Admin admin = cluster.admin()) {
TestUtils.waitForCondition(() -> {
List<TopicPartitionInfo> partitionInfos = admin.describeTopics(List.of("foo"))
.topicNameValues().get("foo").get().partitions();
return partitionInfos.get(0).leader().id() == 0;
}, "Partition does not have a leader assigned");
}
// Ensure that sanity check does not fail
broker.logManager()
.getLog(new TopicPartition("foo", 0), false).get()
.activeSegment()
.timeIndex()
.sanityCheck();
}
@ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, brokers = 3) @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, brokers = 3)
public void testRestartBrokerNoErrorIfMissingPartitionMetadata() throws IOException, ExecutionException, InterruptedException { public void testRestartBrokerNoErrorIfMissingPartitionMetadata() throws IOException, ExecutionException, InterruptedException {

View File

@ -263,7 +263,9 @@ public abstract class AbstractIndex implements Closeable {
public void trimToValidSize() throws IOException { public void trimToValidSize() throws IOException {
lock.lock(); lock.lock();
try { try {
resize(entrySize() * entries); if (mmap != null) {
resize(entrySize() * entries);
}
} finally { } finally {
lock.unlock(); lock.unlock();
} }

View File

@ -751,10 +751,7 @@ public class LogSegment implements Closeable {
public void close() throws IOException { public void close() throws IOException {
if (maxTimestampAndOffsetSoFar != TimestampOffset.UNKNOWN) if (maxTimestampAndOffsetSoFar != TimestampOffset.UNKNOWN)
Utils.swallow(LOGGER, Level.WARN, "maybeAppend", () -> timeIndex().maybeAppend(maxTimestampSoFar(), shallowOffsetOfMaxTimestampSoFar(), true)); Utils.swallow(LOGGER, Level.WARN, "maybeAppend", () -> timeIndex().maybeAppend(maxTimestampSoFar(), shallowOffsetOfMaxTimestampSoFar(), true));
Utils.closeQuietly(lazyOffsetIndex, "offsetIndex", LOGGER); Utils.closeAll(lazyOffsetIndex, lazyTimeIndex, log, txnIndex);
Utils.closeQuietly(lazyTimeIndex, "timeIndex", LOGGER);
Utils.closeQuietly(log, "log", LOGGER);
Utils.closeQuietly(txnIndex, "txnIndex", LOGGER);
} }
/** /**

View File

@ -17,6 +17,7 @@
package org.apache.kafka.storage.internals.log; package org.apache.kafka.storage.internals.log;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Utils;
import java.io.Closeable; import java.io.Closeable;
import java.io.File; import java.io.File;
@ -104,8 +105,7 @@ public class LogSegments implements Closeable {
*/ */
@Override @Override
public void close() throws IOException { public void close() throws IOException {
for (LogSegment s : values()) Utils.closeAll(values().toArray(new LogSegment[0]));
s.close();
} }
/** /**

View File

@ -109,7 +109,7 @@ public class TransactionIndex implements Closeable {
public void close() throws IOException { public void close() throws IOException {
FileChannel channel = channelOrNull(); FileChannel channel = channelOrNull();
if (channel != null) if (channel != null && channel.isOpen())
channel.close(); channel.close();
maybeChannel = Optional.empty(); maybeChannel = Optional.empty();
} }

View File

@ -38,7 +38,10 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
public class LogSegmentsTest { public class LogSegmentsTest {
@ -47,7 +50,7 @@ public class LogSegmentsTest {
/* create a segment with the given base offset */ /* create a segment with the given base offset */
private static LogSegment createSegment(Long offset) throws IOException { private static LogSegment createSegment(Long offset) throws IOException {
return LogTestUtils.createSegment(offset, logDir, 10, Time.SYSTEM); return spy(LogTestUtils.createSegment(offset, logDir, 10, Time.SYSTEM));
} }
@BeforeEach @BeforeEach
@ -274,4 +277,22 @@ public class LogSegmentsTest {
} }
} }
@Test
public void testCloseClosesAllLogSegmentsOnExceptionWhileClosingOne() throws IOException {
LogSegment seg1 = createSegment(0L);
LogSegment seg2 = createSegment(100L);
LogSegment seg3 = createSegment(200L);
LogSegments segments = new LogSegments(topicPartition);
segments.add(seg1);
segments.add(seg2);
segments.add(seg3);
doThrow(new IOException("Failure")).when(seg2).close();
assertThrows(IOException.class, segments::close, "Expected IOException to be thrown");
verify(seg1).close();
verify(seg2).close();
verify(seg3).close();
}
} }

View File

@ -225,7 +225,6 @@ public class OffsetIndexTest {
idx.forceUnmap(); idx.forceUnmap();
// mmap should be null after unmap causing lookup to throw a NPE // mmap should be null after unmap causing lookup to throw a NPE
assertThrows(NullPointerException.class, () -> idx.lookup(1)); assertThrows(NullPointerException.class, () -> idx.lookup(1));
assertThrows(NullPointerException.class, idx::close);
} }
@Test @Test