diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Exit.java b/clients/src/main/java/org/apache/kafka/common/utils/Exit.java index 20a7b2035ca..976dfe4df98 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/Exit.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Exit.java @@ -22,10 +22,12 @@ package org.apache.kafka.common.utils; */ public class Exit { + @FunctionalInterface public interface Procedure { void execute(int statusCode, String message); } + @FunctionalInterface public interface ShutdownHookAdder { void addShutdownHook(String name, Runnable runnable); } diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index d2512c7a108..6e0688bf6dc 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -55,7 +55,7 @@ import org.apache.kafka.raft.RaftConfig import org.apache.kafka.server.authorizer.Authorizer import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion} import org.apache.kafka.server.common.MetadataVersion._ -import org.apache.kafka.server.fault.ProcessExitingFaultHandler +import org.apache.kafka.server.fault.ProcessTerminatingFaultHandler import org.apache.kafka.server.metrics.KafkaYammerMetrics import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig import org.apache.kafka.server.util.KafkaScheduler @@ -396,7 +396,7 @@ class KafkaServer( metrics, threadNamePrefix, controllerQuorumVotersFuture, - fatalFaultHandler = new ProcessExitingFaultHandler() + fatalFaultHandler = new ProcessTerminatingFaultHandler.Builder().build() ) val controllerNodes = RaftConfig.voterConnectionsToNodes(controllerQuorumVotersFuture.get()).asScala val quorumControllerNodeProvider = RaftControllerNodeProvider(raftManager, config, controllerNodes) diff --git a/core/src/main/scala/kafka/server/SharedServer.scala b/core/src/main/scala/kafka/server/SharedServer.scala index 9e134ec1fdb..97d4a0e470f 100644 --- a/core/src/main/scala/kafka/server/SharedServer.scala +++ b/core/src/main/scala/kafka/server/SharedServer.scala @@ -30,7 +30,7 @@ import org.apache.kafka.image.publisher.{SnapshotEmitter, SnapshotGenerator} import org.apache.kafka.metadata.MetadataRecordSerde import org.apache.kafka.raft.RaftConfig.AddressSpec import org.apache.kafka.server.common.ApiMessageAndVersion -import org.apache.kafka.server.fault.{FaultHandler, LoggingFaultHandler, ProcessExitingFaultHandler} +import org.apache.kafka.server.fault.{FaultHandler, LoggingFaultHandler, ProcessTerminatingFaultHandler} import org.apache.kafka.server.metrics.KafkaYammerMetrics import java.util @@ -60,7 +60,9 @@ class StandardFaultHandlerFactory extends FaultHandlerFactory { action: Runnable ): FaultHandler = { if (fatal) { - new ProcessExitingFaultHandler(action) + new ProcessTerminatingFaultHandler.Builder() + .setAction(action) + .build() } else { new LoggingFaultHandler(name, action) } diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala b/core/src/main/scala/kafka/tools/TestRaftServer.scala index 896ae2407e2..e21a969a302 100644 --- a/core/src/main/scala/kafka/tools/TestRaftServer.scala +++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala @@ -38,7 +38,7 @@ import org.apache.kafka.common.{TopicPartition, Uuid, protocol} import org.apache.kafka.raft.errors.NotLeaderException import org.apache.kafka.raft.{Batch, BatchReader, LeaderAndEpoch, RaftClient, RaftConfig} import org.apache.kafka.server.common.serialization.RecordSerde -import org.apache.kafka.server.fault.ProcessExitingFaultHandler +import org.apache.kafka.server.fault.ProcessTerminatingFaultHandler import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils} import org.apache.kafka.snapshot.SnapshotReader @@ -92,7 +92,7 @@ class TestRaftServer( metrics, Some(threadNamePrefix), CompletableFuture.completedFuture(RaftConfig.parseVoterConnections(config.quorumVoters)), - new ProcessExitingFaultHandler() + new ProcessTerminatingFaultHandler.Builder().build() ) workloadGenerator = new RaftWorkloadGenerator( diff --git a/server-common/src/main/java/org/apache/kafka/server/fault/ProcessExitingFaultHandler.java b/server-common/src/main/java/org/apache/kafka/server/fault/ProcessExitingFaultHandler.java deleted file mode 100644 index b67bcf3fa7d..00000000000 --- a/server-common/src/main/java/org/apache/kafka/server/fault/ProcessExitingFaultHandler.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.server.fault; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.kafka.common.utils.Exit; - - -/** - * This is a fault handler which exits the Java process. - */ -public class ProcessExitingFaultHandler implements FaultHandler { - private static final Logger log = LoggerFactory.getLogger(ProcessExitingFaultHandler.class); - - private final Runnable action; - - public ProcessExitingFaultHandler() { - this.action = () -> { }; - } - - public ProcessExitingFaultHandler(Runnable action) { - this.action = action; - } - - @Override - public RuntimeException handleFault(String failureMessage, Throwable cause) { - if (cause == null) { - log.error("Encountered fatal fault: {}", failureMessage); - } else { - log.error("Encountered fatal fault: {}", failureMessage, cause); - } - try { - action.run(); - } catch (Throwable e) { - log.error("Failed to run ProcessExitingFaultHandler action.", e); - } - Exit.exit(1); - return null; - } -} diff --git a/server-common/src/main/java/org/apache/kafka/server/fault/ProcessTerminatingFaultHandler.java b/server-common/src/main/java/org/apache/kafka/server/fault/ProcessTerminatingFaultHandler.java new file mode 100644 index 00000000000..29ba7b84706 --- /dev/null +++ b/server-common/src/main/java/org/apache/kafka/server/fault/ProcessTerminatingFaultHandler.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.fault; + +import java.util.Objects; +import org.apache.kafka.common.utils.Exit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This is a fault handler which terminates the JVM process. + */ +final public class ProcessTerminatingFaultHandler implements FaultHandler { + private static final Logger log = LoggerFactory.getLogger(ProcessTerminatingFaultHandler.class); + + private final Runnable action; + private final boolean shouldHalt; + + private ProcessTerminatingFaultHandler(boolean shouldHalt, Runnable action) { + this.shouldHalt = shouldHalt; + this.action = action; + } + + @Override + public RuntimeException handleFault(String failureMessage, Throwable cause) { + if (cause == null) { + log.error("Encountered fatal fault: {}", failureMessage); + } else { + log.error("Encountered fatal fault: {}", failureMessage, cause); + } + + try { + action.run(); + } catch (Throwable e) { + log.error("Failed to run terminating action.", e); + } + + int statusCode = 1; + if (shouldHalt) { + Exit.halt(statusCode); + } else { + Exit.exit(statusCode); + } + + return null; + } + + public static final class Builder { + private boolean shouldHalt = true; + private Runnable action = () -> { }; + + /** + * Set if halt or exit should be used. + * + * When {@code value} is {@code false} {@code Exit.exit} is called, otherwise {@code Exit.halt} is + * called. The default value is {@code true}. + * + * The default implementation of {@code Exit.exit} calls {@code Runtime.exit} which + * blocks on all of the shutdown hooks executing. + * + * The default implementation of {@code Exit.halt} calls {@code Runtime.halt} which + * forcibly terminates the JVM. + */ + public Builder setShouldHalt(boolean value) { + shouldHalt = value; + return this; + } + + /** + * Set the {@code Runnable} to run when handling a fault. + */ + public Builder setAction(Runnable action) { + this.action = Objects.requireNonNull(action); + return this; + } + + public ProcessTerminatingFaultHandler build() { + return new ProcessTerminatingFaultHandler(shouldHalt, action); + } + } +} diff --git a/server-common/src/test/java/org/apache/kafka/server/fault/ProcessTerminatingFaultHandlerTest.java b/server-common/src/test/java/org/apache/kafka/server/fault/ProcessTerminatingFaultHandlerTest.java new file mode 100644 index 00000000000..3d2531d1795 --- /dev/null +++ b/server-common/src/test/java/org/apache/kafka/server/fault/ProcessTerminatingFaultHandlerTest.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.fault; + +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Exit.Procedure; +import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +final public class ProcessTerminatingFaultHandlerTest { + private static Procedure terminatingProcedure(AtomicBoolean called) { + return (statusCode, message) -> { + assertEquals(1, statusCode); + assertNull(message); + called.set(true); + }; + } + + @Test + public void testExitIsCalled() { + AtomicBoolean exitCalled = new AtomicBoolean(false); + Exit.setExitProcedure(terminatingProcedure(exitCalled)); + + AtomicBoolean actionCalled = new AtomicBoolean(false); + Runnable action = () -> { + assertFalse(exitCalled.get()); + actionCalled.set(true); + }; + + try { + new ProcessTerminatingFaultHandler.Builder() + .setShouldHalt(false) + .setAction(action) + .build() + .handleFault("", null); + } finally { + Exit.resetExitProcedure(); + } + + assertTrue(exitCalled.get()); + assertTrue(actionCalled.get()); + } + + @Test + public void testHaltIsCalled() { + AtomicBoolean haltCalled = new AtomicBoolean(false); + Exit.setHaltProcedure(terminatingProcedure(haltCalled)); + + AtomicBoolean actionCalled = new AtomicBoolean(false); + Runnable action = () -> { + assertFalse(haltCalled.get()); + actionCalled.set(true); + }; + + try { + new ProcessTerminatingFaultHandler.Builder() + .setAction(action) + .build() + .handleFault("", null); + } finally { + Exit.resetHaltProcedure(); + } + + assertTrue(haltCalled.get()); + assertTrue(actionCalled.get()); + } +}