KAFKA-16349: Prevent race conditions in Exit class from stopping test JVM (#15484)

Signed-off-by: Greg Harris <greg.harris@aiven.io>
Reviewers: Chris Egerton <chrise@aiven.io>
This commit is contained in:
Greg Harris 2024-03-28 20:07:42 -07:00 committed by GitHub
parent 71bcac3b6a
commit bf5e04e416
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 185 additions and 72 deletions

View File

@ -105,10 +105,10 @@
<allow pkg="org.apache.kafka.server.util.json" />
<allow class="org.apache.kafka.server.util.TopicFilter.IncludeList" />
<allow class="org.apache.kafka.test.TestUtils" />
<subpackage name="timer">
<allow class="org.apache.kafka.server.util.MockTime" />
<allow class="org.apache.kafka.server.util.ShutdownableThread" />
<allow class="org.apache.kafka.test.TestUtils" />
</subpackage>
</subpackage>
</subpackage>

View File

@ -43,6 +43,14 @@ public class Exit {
Runtime.getRuntime().addShutdownHook(new Thread(runnable));
};
private static final Procedure NOOP_HALT_PROCEDURE = (statusCode, message) -> {
throw new IllegalStateException("Halt called after resetting procedures; possible race condition present in test");
};
private static final Procedure NOOP_EXIT_PROCEDURE = (statusCode, message) -> {
throw new IllegalStateException("Exit called after resetting procedures; possible race condition present in test");
};
private volatile static Procedure exitProcedure = DEFAULT_EXIT_PROCEDURE;
private volatile static Procedure haltProcedure = DEFAULT_HALT_PROCEDURE;
private volatile static ShutdownHookAdder shutdownHookAdder = DEFAULT_SHUTDOWN_HOOK_ADDER;
@ -67,26 +75,47 @@ public class Exit {
shutdownHookAdder.addShutdownHook(name, runnable);
}
/**
* For testing only, do not call in main code.
*/
public static void setExitProcedure(Procedure procedure) {
exitProcedure = procedure;
}
/**
* For testing only, do not call in main code.
*/
public static void setHaltProcedure(Procedure procedure) {
haltProcedure = procedure;
}
/**
* For testing only, do not call in main code.
*/
public static void setShutdownHookAdder(ShutdownHookAdder shutdownHookAdder) {
Exit.shutdownHookAdder = shutdownHookAdder;
}
/**
* For testing only, do not call in main code.
* <p>Clears the procedure set in {@link #setExitProcedure(Procedure)}, but does not restore system default behavior of exiting the JVM.
*/
public static void resetExitProcedure() {
exitProcedure = DEFAULT_EXIT_PROCEDURE;
exitProcedure = NOOP_EXIT_PROCEDURE;
}
/**
* For testing only, do not call in main code.
* <p>Clears the procedure set in {@link #setHaltProcedure(Procedure)}, but does not restore system default behavior of exiting the JVM.
*/
public static void resetHaltProcedure() {
haltProcedure = DEFAULT_HALT_PROCEDURE;
haltProcedure = NOOP_HALT_PROCEDURE;
}
/**
* For testing only, do not call in main code.
* <p>Restores the system default shutdown hook behavior.
*/
public static void resetShutdownHookAdder() {
shutdownHookAdder = DEFAULT_SHUTDOWN_HOOK_ADDER;
}

View File

@ -38,22 +38,42 @@ object Exit {
JExit.addShutdownHook(name, () => shutdownHook)
}
/**
* For testing only, do not call in main code.
*/
def setExitProcedure(exitProcedure: (Int, Option[String]) => Nothing): Unit =
JExit.setExitProcedure(functionToProcedure(exitProcedure))
/**
* For testing only, do not call in main code.
*/
def setHaltProcedure(haltProcedure: (Int, Option[String]) => Nothing): Unit =
JExit.setHaltProcedure(functionToProcedure(haltProcedure))
/**
* For testing only, do not call in main code.
*/
def setShutdownHookAdder(shutdownHookAdder: (String, => Unit) => Unit): Unit = {
JExit.setShutdownHookAdder((name, runnable) => shutdownHookAdder(name, runnable.run()))
}
/**
* For testing only, do not call in main code.
* <p>Clears the procedure set in [[setExitProcedure]], but does not restore system default behavior of exiting the JVM.
*/
def resetExitProcedure(): Unit =
JExit.resetExitProcedure()
/**
* For testing only, do not call in main code.
* <p>Clears the procedure set in [[setHaltProcedure]], but does not restore system default behavior of exiting the JVM.
*/
def resetHaltProcedure(): Unit =
JExit.resetHaltProcedure()
/**
* For testing only, do not call in main code.
*/
def resetShutdownHookAdder(): Unit =
JExit.resetShutdownHookAdder()

View File

@ -1,69 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.utils
import java.util.concurrent.{CountDownLatch, TimeUnit}
import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.server.util.ShutdownableThread
import org.junit.jupiter.api.{AfterEach, Test}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
class ShutdownableThreadTest {
@AfterEach
def tearDown(): Unit = Exit.resetExitProcedure()
@Test
def testShutdownWhenCalledAfterThreadStart(): Unit = {
@volatile var statusCodeOption: Option[Int] = None
Exit.setExitProcedure { (statusCode, _) =>
statusCodeOption = Some(statusCode)
// Sleep until interrupted to emulate the fact that `System.exit()` never returns
Thread.sleep(Long.MaxValue)
throw new AssertionError
}
val latch = new CountDownLatch(1)
val thread = new ShutdownableThread("shutdownable-thread-test") {
override def doWork(): Unit = {
latch.countDown()
throw new FatalExitError
}
}
thread.start()
assertTrue(latch.await(10, TimeUnit.SECONDS), "doWork was not invoked")
thread.shutdown()
TestUtils.waitUntilTrue(() => statusCodeOption.isDefined, "Status code was not set by exit procedure")
assertEquals(1, statusCodeOption.get)
}
@Test
def testIsThreadStarted(): Unit = {
val latch = new CountDownLatch(1)
val thread = new ShutdownableThread("shutdownable-thread-test") {
override def doWork(): Unit = {
latch.countDown()
}
}
assertFalse(thread.isStarted)
thread.start()
latch.await()
assertTrue(thread.isStarted)
thread.shutdown()
}
}

View File

@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.util;
import org.apache.kafka.common.internals.FatalExitError;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
public class ShutdownableThreadTest {
@AfterEach
public void tearDown() {
Exit.resetExitProcedure();
}
@Test
public void testShutdownWhenCalledAfterThreadStart() throws InterruptedException {
AtomicReference<Optional<Integer>> statusCodeOption = new AtomicReference<>(Optional.empty());
Exit.setExitProcedure((statusCode, ignored) -> {
statusCodeOption.set(Optional.of(statusCode));
// Sleep until interrupted to emulate the fact that `System.exit()` never returns
Utils.sleep(Long.MAX_VALUE);
throw new AssertionError();
});
CountDownLatch latch = new CountDownLatch(1);
ShutdownableThread thread = new ShutdownableThread("shutdownable-thread-test") {
@Override
public void doWork() {
latch.countDown();
throw new FatalExitError();
}
};
thread.start();
assertTrue(latch.await(10, TimeUnit.SECONDS), "doWork was not invoked");
thread.shutdown();
TestUtils.waitForCondition(() -> statusCodeOption.get().isPresent(), "Status code was not set by exit procedure");
assertEquals(1, statusCodeOption.get().get());
}
@Test
public void testIsThreadStarted() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
ShutdownableThread thread = new ShutdownableThread("shutdownable-thread-test") {
@Override
public void doWork() {
latch.countDown();
}
};
assertFalse(thread.isStarted());
thread.start();
assertTrue(latch.await(10, TimeUnit.SECONDS), "doWork was not invoked");
assertTrue(thread.isStarted());
thread.shutdown();
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testShutdownWhenTestTimesOut(boolean isInterruptible) {
// Mask the exit procedure so the contents of the "test" can't shut down the JVM
Exit.setExitProcedure((statusCode, ignored) -> {
throw new FatalExitError();
});
// This latch will be triggered only after the "test" finishes
CountDownLatch afterTest = new CountDownLatch(1);
try {
// This is the "test", which uses a ShutdownableThread with exit procedures masked.
CountDownLatch startupLatch = new CountDownLatch(1);
ShutdownableThread thread = new ShutdownableThread("shutdownable-thread-timeout", isInterruptible) {
@Override
public void doWork() {
// Tell the test that we finished starting, and are ready to shut down.
startupLatch.countDown();
if (isInterruptible) {
// Swallow the interruption sent by the thread
try {
afterTest.await();
} catch (InterruptedException ignored) {
}
}
// Trigger a fatal exit, after the test has completed and the exit procedure masking has been cleaned up
try {
afterTest.await();
} catch (InterruptedException ignored) {
}
throw new FatalExitError();
}
};
thread.start();
startupLatch.await();
// Interrupt ourselves, as if the test was interrupted for timing out
Thread.currentThread().interrupt();
thread.shutdown();
fail("Shutdown should have been interrupted");
} catch (InterruptedException ignored) {
// Swallow the interruption, so that the surrounding test passes
} finally {
Exit.resetExitProcedure();
}
// After the test has stopped waiting for the thread and cleaned up the Exit procedure, let the thread continue
afterTest.countDown();
}
}