diff --git a/checkstyle/import-control-server-common.xml b/checkstyle/import-control-server-common.xml index 2c5c652e979..5af34ca4a1e 100644 --- a/checkstyle/import-control-server-common.xml +++ b/checkstyle/import-control-server-common.xml @@ -105,10 +105,10 @@ + - diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Exit.java b/clients/src/main/java/org/apache/kafka/common/utils/Exit.java index 6d503ca2ad5..1ef1ee15b5b 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/Exit.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Exit.java @@ -43,6 +43,14 @@ public class Exit { Runtime.getRuntime().addShutdownHook(new Thread(runnable)); }; + private static final Procedure NOOP_HALT_PROCEDURE = (statusCode, message) -> { + throw new IllegalStateException("Halt called after resetting procedures; possible race condition present in test"); + }; + + private static final Procedure NOOP_EXIT_PROCEDURE = (statusCode, message) -> { + throw new IllegalStateException("Exit called after resetting procedures; possible race condition present in test"); + }; + private volatile static Procedure exitProcedure = DEFAULT_EXIT_PROCEDURE; private volatile static Procedure haltProcedure = DEFAULT_HALT_PROCEDURE; private volatile static ShutdownHookAdder shutdownHookAdder = DEFAULT_SHUTDOWN_HOOK_ADDER; @@ -67,26 +75,47 @@ public class Exit { shutdownHookAdder.addShutdownHook(name, runnable); } + /** + * For testing only, do not call in main code. + */ public static void setExitProcedure(Procedure procedure) { exitProcedure = procedure; } + /** + * For testing only, do not call in main code. + */ public static void setHaltProcedure(Procedure procedure) { haltProcedure = procedure; } + /** + * For testing only, do not call in main code. + */ public static void setShutdownHookAdder(ShutdownHookAdder shutdownHookAdder) { Exit.shutdownHookAdder = shutdownHookAdder; } + /** + * For testing only, do not call in main code. + *

Clears the procedure set in {@link #setExitProcedure(Procedure)}, but does not restore system default behavior of exiting the JVM. + */ public static void resetExitProcedure() { - exitProcedure = DEFAULT_EXIT_PROCEDURE; + exitProcedure = NOOP_EXIT_PROCEDURE; } + /** + * For testing only, do not call in main code. + *

Clears the procedure set in {@link #setHaltProcedure(Procedure)}, but does not restore system default behavior of exiting the JVM. + */ public static void resetHaltProcedure() { - haltProcedure = DEFAULT_HALT_PROCEDURE; + haltProcedure = NOOP_HALT_PROCEDURE; } + /** + * For testing only, do not call in main code. + *

Restores the system default shutdown hook behavior. + */ public static void resetShutdownHookAdder() { shutdownHookAdder = DEFAULT_SHUTDOWN_HOOK_ADDER; } diff --git a/core/src/main/scala/kafka/utils/Exit.scala b/core/src/main/scala/kafka/utils/Exit.scala index eddd929af55..84027100ec3 100644 --- a/core/src/main/scala/kafka/utils/Exit.scala +++ b/core/src/main/scala/kafka/utils/Exit.scala @@ -38,22 +38,42 @@ object Exit { JExit.addShutdownHook(name, () => shutdownHook) } + /** + * For testing only, do not call in main code. + */ def setExitProcedure(exitProcedure: (Int, Option[String]) => Nothing): Unit = JExit.setExitProcedure(functionToProcedure(exitProcedure)) + /** + * For testing only, do not call in main code. + */ def setHaltProcedure(haltProcedure: (Int, Option[String]) => Nothing): Unit = JExit.setHaltProcedure(functionToProcedure(haltProcedure)) + /** + * For testing only, do not call in main code. + */ def setShutdownHookAdder(shutdownHookAdder: (String, => Unit) => Unit): Unit = { JExit.setShutdownHookAdder((name, runnable) => shutdownHookAdder(name, runnable.run())) } + /** + * For testing only, do not call in main code. + *

Clears the procedure set in [[setExitProcedure]], but does not restore system default behavior of exiting the JVM. + */ def resetExitProcedure(): Unit = JExit.resetExitProcedure() + /** + * For testing only, do not call in main code. + *

Clears the procedure set in [[setHaltProcedure]], but does not restore system default behavior of exiting the JVM. + */ def resetHaltProcedure(): Unit = JExit.resetHaltProcedure() + /** + * For testing only, do not call in main code. + */ def resetShutdownHookAdder(): Unit = JExit.resetShutdownHookAdder() diff --git a/core/src/test/scala/unit/kafka/utils/ShutdownableThreadTest.scala b/core/src/test/scala/unit/kafka/utils/ShutdownableThreadTest.scala deleted file mode 100644 index da998903d27..00000000000 --- a/core/src/test/scala/unit/kafka/utils/ShutdownableThreadTest.scala +++ /dev/null @@ -1,69 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.utils - -import java.util.concurrent.{CountDownLatch, TimeUnit} -import org.apache.kafka.common.internals.FatalExitError -import org.apache.kafka.server.util.ShutdownableThread -import org.junit.jupiter.api.{AfterEach, Test} -import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} - -class ShutdownableThreadTest { - - @AfterEach - def tearDown(): Unit = Exit.resetExitProcedure() - - @Test - def testShutdownWhenCalledAfterThreadStart(): Unit = { - @volatile var statusCodeOption: Option[Int] = None - Exit.setExitProcedure { (statusCode, _) => - statusCodeOption = Some(statusCode) - // Sleep until interrupted to emulate the fact that `System.exit()` never returns - Thread.sleep(Long.MaxValue) - throw new AssertionError - } - val latch = new CountDownLatch(1) - val thread = new ShutdownableThread("shutdownable-thread-test") { - override def doWork(): Unit = { - latch.countDown() - throw new FatalExitError - } - } - thread.start() - assertTrue(latch.await(10, TimeUnit.SECONDS), "doWork was not invoked") - - thread.shutdown() - TestUtils.waitUntilTrue(() => statusCodeOption.isDefined, "Status code was not set by exit procedure") - assertEquals(1, statusCodeOption.get) - } - - @Test - def testIsThreadStarted(): Unit = { - val latch = new CountDownLatch(1) - val thread = new ShutdownableThread("shutdownable-thread-test") { - override def doWork(): Unit = { - latch.countDown() - } - } - assertFalse(thread.isStarted) - thread.start() - latch.await() - assertTrue(thread.isStarted) - - thread.shutdown() - } -} diff --git a/server-common/src/test/java/org/apache/kafka/server/util/ShutdownableThreadTest.java b/server-common/src/test/java/org/apache/kafka/server/util/ShutdownableThreadTest.java new file mode 100644 index 00000000000..e59acc9767e --- /dev/null +++ b/server-common/src/test/java/org/apache/kafka/server/util/ShutdownableThreadTest.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.util; + +import org.apache.kafka.common.internals.FatalExitError; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +public class ShutdownableThreadTest { + + @AfterEach + public void tearDown() { + Exit.resetExitProcedure(); + } + + @Test + public void testShutdownWhenCalledAfterThreadStart() throws InterruptedException { + AtomicReference> statusCodeOption = new AtomicReference<>(Optional.empty()); + Exit.setExitProcedure((statusCode, ignored) -> { + statusCodeOption.set(Optional.of(statusCode)); + // Sleep until interrupted to emulate the fact that `System.exit()` never returns + Utils.sleep(Long.MAX_VALUE); + throw new AssertionError(); + }); + CountDownLatch latch = new CountDownLatch(1); + ShutdownableThread thread = new ShutdownableThread("shutdownable-thread-test") { + @Override + public void doWork() { + latch.countDown(); + throw new FatalExitError(); + } + }; + thread.start(); + assertTrue(latch.await(10, TimeUnit.SECONDS), "doWork was not invoked"); + + thread.shutdown(); + TestUtils.waitForCondition(() -> statusCodeOption.get().isPresent(), "Status code was not set by exit procedure"); + assertEquals(1, statusCodeOption.get().get()); + } + + @Test + public void testIsThreadStarted() throws InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + ShutdownableThread thread = new ShutdownableThread("shutdownable-thread-test") { + @Override + public void doWork() { + latch.countDown(); + } + }; + assertFalse(thread.isStarted()); + thread.start(); + assertTrue(latch.await(10, TimeUnit.SECONDS), "doWork was not invoked"); + assertTrue(thread.isStarted()); + + thread.shutdown(); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testShutdownWhenTestTimesOut(boolean isInterruptible) { + // Mask the exit procedure so the contents of the "test" can't shut down the JVM + Exit.setExitProcedure((statusCode, ignored) -> { + throw new FatalExitError(); + }); + // This latch will be triggered only after the "test" finishes + CountDownLatch afterTest = new CountDownLatch(1); + try { + // This is the "test", which uses a ShutdownableThread with exit procedures masked. + CountDownLatch startupLatch = new CountDownLatch(1); + ShutdownableThread thread = new ShutdownableThread("shutdownable-thread-timeout", isInterruptible) { + @Override + public void doWork() { + // Tell the test that we finished starting, and are ready to shut down. + startupLatch.countDown(); + if (isInterruptible) { + // Swallow the interruption sent by the thread + try { + afterTest.await(); + } catch (InterruptedException ignored) { + } + } + // Trigger a fatal exit, after the test has completed and the exit procedure masking has been cleaned up + try { + afterTest.await(); + } catch (InterruptedException ignored) { + } + throw new FatalExitError(); + } + }; + thread.start(); + startupLatch.await(); + // Interrupt ourselves, as if the test was interrupted for timing out + Thread.currentThread().interrupt(); + thread.shutdown(); + fail("Shutdown should have been interrupted"); + } catch (InterruptedException ignored) { + // Swallow the interruption, so that the surrounding test passes + } finally { + Exit.resetExitProcedure(); + } + // After the test has stopped waiting for the thread and cleaned up the Exit procedure, let the thread continue + afterTest.countDown(); + } +}