KAFKA-19425: Stop the server when fail to initialize to avoid local segment never got deleted. (#20007)

We found that one broker's local segment on disk never get removed
forever no matter how long it stored. The disk always keep increasing.


![image](https://github.com/user-attachments/assets/42129bb6-7d07-481b-923f-971da3ab12da)
note: Partition 2's node is the exception node.

After we trouble shooting. we find if one broker is very slow to startup
it will cause the
TopicBasedRemoteLogMetadataManager#initializeResources's fail sometime
(it meet expectation due to the server is not ready as fast). Thus it
won't stop the server so that the server still run just with some
exception log but not shutdown.  It won't upload to remote for the local
so that the local segment never to deleted.

So propose the change to shutdown the broker to avoid the silence
critical error which caused the disk keep increasing forever.

Reviewers: Kamal Chandraprakash <kamal.chandraprakash@gmail.com>, Luke
 Chen <showuon@gmail.com>
This commit is contained in:
stroller 2025-07-28 20:17:09 +08:00 committed by GitHub
parent abbb6b3c13
commit e61c297b73
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 33 additions and 10 deletions

View File

@ -27,7 +27,7 @@ import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.internals.FatalExitError;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
@ -85,7 +85,6 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final RemotePartitionMetadataStore remotePartitionMetadataStore;
private final Set<TopicIdPartition> pendingAssignPartitions = Collections.synchronizedSet(new HashSet<>());
private volatile boolean initializationFailed = false;
private final Function<Integer, RemoteLogMetadataTopicPartitioner> partitionerFunction;
public TopicBasedRemoteLogMetadataManager() {
@ -325,6 +324,7 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana
NewTopic newTopic = newRemoteLogMetadataTopic(rlmmConfig);
boolean isTopicCreated = false;
long startTimeMs = time.milliseconds();
boolean initializationFailed = false;
try (Admin admin = Admin.create(rlmmConfig.commonProperties())) {
while (!(initialized.get() || closing.get() || initializationFailed)) {
if (time.milliseconds() - startTimeMs > retryMaxTimeoutMs) {
@ -368,6 +368,11 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana
} catch (KafkaException e) {
log.error("Encountered error while initializing topic-based RLMM resources", e);
initializationFailed = true;
} finally {
if (initializationFailed) {
log.error("Stopping the server as it failed to initialize topic-based RLMM resources");
Exit.exit(1);
}
}
}
@ -457,15 +462,8 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana
return initialized.get();
}
boolean isInitializationFailed() {
return initializationFailed;
}
private void ensureInitializedAndNotClosed() {
if (initializationFailed) {
// If initialization is failed, shutdown the broker.
throw new FatalExitError();
}
if (closing.get() || !initialized.get()) {
throw new IllegalStateException("This instance is in invalid state, initialized: " + initialized +
" close: " + closing);

View File

@ -27,6 +27,7 @@ import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
@ -43,6 +44,9 @@ import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@ -335,6 +339,18 @@ public class TopicBasedRemoteLogMetadataManagerTest {
@ClusterTest
public void testInitializationFailure() throws IOException, InterruptedException {
// Set up a custom exit procedure for testing
final AtomicBoolean exitCalled = new AtomicBoolean(false);
final AtomicInteger exitCode = new AtomicInteger(-1);
final AtomicReference<String> exitMessage = new AtomicReference<>();
// Set custom exit procedure that won't actually exit the process
Exit.setExitProcedure((statusCode, message) -> {
exitCalled.set(true);
exitCode.set(statusCode);
exitMessage.set(message);
});
try (TopicBasedRemoteLogMetadataManager rlmm = new TopicBasedRemoteLogMetadataManager()) {
// configure rlmm without bootstrap servers, so it will fail to initialize admin client.
Map<String, Object> configs = Map.of(
@ -342,7 +358,16 @@ public class TopicBasedRemoteLogMetadataManagerTest {
TopicBasedRemoteLogMetadataManagerConfig.BROKER_ID, 0
);
rlmm.configure(configs);
TestUtils.waitForCondition(rlmm::isInitializationFailed, "Initialization should fail");
// Wait for initialization failure and exit procedure to be called
TestUtils.waitForCondition(() -> exitCalled.get(),
"Exit procedure should be called due to initialization failure");
// Verify exit code
assertEquals(1, exitCode.get(), "Exit code should be 1");
} finally {
// Restore default exit procedure
Exit.resetExitProcedure();
}
}
}