MINOR: MiniKdc JVM shutdown hook fix (#7946)

Also made all shutdown hooks consistent and added tests

Reviewers: Ismael Juma <ismael@juma.me.uk>, Rajini Sivaram <rajinisivaram@googlemail.com>
This commit is contained in:
Ron Dagostino 2020-01-24 17:21:12 -05:00 committed by Rajini Sivaram
parent 2e351e06b3
commit a3509c0870
32 changed files with 372 additions and 106 deletions

View File

@ -26,6 +26,10 @@ public class Exit {
void execute(int statusCode, String message);
}
public interface ShutdownHookAdder {
void addShutdownHook(String name, Runnable runnable);
}
private static final Procedure DEFAULT_HALT_PROCEDURE = new Procedure() {
@Override
public void execute(int statusCode, String message) {
@ -40,8 +44,19 @@ public class Exit {
}
};
private static final ShutdownHookAdder DEFAULT_SHUTDOWN_HOOK_ADDER = new ShutdownHookAdder() {
@Override
public void addShutdownHook(String name, Runnable runnable) {
if (name != null)
Runtime.getRuntime().addShutdownHook(KafkaThread.nonDaemon(name, runnable));
else
Runtime.getRuntime().addShutdownHook(new Thread(runnable));
}
};
private volatile static Procedure exitProcedure = DEFAULT_EXIT_PROCEDURE;
private volatile static Procedure haltProcedure = DEFAULT_HALT_PROCEDURE;
private volatile static ShutdownHookAdder shutdownHookAdder = DEFAULT_SHUTDOWN_HOOK_ADDER;
public static void exit(int statusCode) {
exit(statusCode, null);
@ -59,6 +74,10 @@ public class Exit {
haltProcedure.execute(statusCode, message);
}
public static void addShutdownHook(String name, Runnable runnable) {
shutdownHookAdder.addShutdownHook(name, runnable);
}
public static void setExitProcedure(Procedure procedure) {
exitProcedure = procedure;
}
@ -67,6 +86,10 @@ public class Exit {
haltProcedure = procedure;
}
public static void setShutdownHookAdder(ShutdownHookAdder shutdownHookAdder) {
Exit.shutdownHookAdder = shutdownHookAdder;
}
public static void resetExitProcedure() {
exitProcedure = DEFAULT_EXIT_PROCEDURE;
}
@ -75,4 +98,7 @@ public class Exit {
haltProcedure = DEFAULT_HALT_PROCEDURE;
}
public static void resetShutdownHookAdder() {
shutdownHookAdder = DEFAULT_SHUTDOWN_HOOK_ADDER;
}
}

View File

@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.utils;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import static org.junit.Assert.assertEquals;
public class ExitTest {
@Test
public void shouldHaltImmediately() {
List<Object> list = new ArrayList<>();
Exit.setHaltProcedure((statusCode, message) -> {
list.add(statusCode);
list.add(message);
});
try {
int statusCode = 0;
String message = "mesaage";
Exit.halt(statusCode);
Exit.halt(statusCode, message);
assertEquals(Arrays.asList(statusCode, null, statusCode, message), list);
} finally {
Exit.resetHaltProcedure();
}
}
@Test
public void shouldExitImmediately() {
List<Object> list = new ArrayList<>();
Exit.setExitProcedure((statusCode, message) -> {
list.add(statusCode);
list.add(message);
});
try {
int statusCode = 0;
String message = "mesaage";
Exit.exit(statusCode);
Exit.exit(statusCode, message);
assertEquals(Arrays.asList(statusCode, null, statusCode, message), list);
} finally {
Exit.resetExitProcedure();
}
}
@Test
public void shouldAddShutdownHookImmediately() {
List<Object> list = new ArrayList<>();
Exit.setShutdownHookAdder((name, runnable) -> {
list.add(name);
list.add(runnable);
});
try {
Runnable runnable = () -> { };
String name = "name";
Exit.addShutdownHook(name, runnable);
assertEquals(Arrays.asList(name, runnable), list);
} finally {
Exit.resetShutdownHookAdder();
}
}
@Test
public void shouldNotInvokeShutdownHookImmediately() {
List<Object> list = new ArrayList<>();
Runnable runnable = () -> list.add(this);
Exit.addShutdownHook("message", runnable);
assertEquals(0, list.size());
}
}

View File

@ -29,6 +29,7 @@ import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -262,14 +263,11 @@ public class TestUtils {
}
file.deleteOnExit();
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
try {
Utils.delete(file);
} catch (IOException e) {
log.error("Error deleting {}", file.getAbsolutePath(), e);
}
Exit.addShutdownHook("delete-temp-file-shutdown-hook", () -> {
try {
Utils.delete(file);
} catch (IOException e) {
log.error("Error deleting {}", file.getAbsolutePath(), e);
}
});

View File

@ -164,7 +164,7 @@ public class MirrorMaker {
}
startLatch = new CountDownLatch(herders.size());
stopLatch = new CountDownLatch(herders.size());
Runtime.getRuntime().addShutdownHook(shutdownHook);
Exit.addShutdownHook("mirror-maker-shutdown-hook", shutdownHook);
for (Herder herder : herders.values()) {
try {
herder.start();

View File

@ -16,6 +16,7 @@
*/
package org.apache.kafka.connect.runtime;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.connect.runtime.rest.RestServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -48,7 +49,7 @@ public class Connect {
public void start() {
try {
log.info("Kafka Connect starting");
Runtime.getRuntime().addShutdownHook(shutdownHook);
Exit.addShutdownHook("connect-shutdown-hook", shutdownHook);
herder.start();
rest.initializeResources(herder);

View File

@ -77,9 +77,7 @@ object Kafka extends Logging {
}
// attach shutdown handler to catch terminating signals as well as normal termination
Runtime.getRuntime().addShutdownHook(new Thread("kafka-shutdown-hook") {
override def run(): Unit = kafkaServerStartable.shutdown()
})
Exit.addShutdownHook("kafka-shutdown-hook", kafkaServerStartable.shutdown)
kafkaServerStartable.startup()
kafkaServerStartable.awaitShutdown()

View File

@ -28,7 +28,7 @@ import com.typesafe.scalalogging.LazyLogging
import joptsimple._
import kafka.common.MessageFormatter
import kafka.utils.Implicits._
import kafka.utils._
import kafka.utils.{Exit, _}
import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, ConsumerRecord, KafkaConsumer}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.{AuthenticationException, TimeoutException, WakeupException}
@ -85,8 +85,7 @@ object ConsoleConsumer extends Logging {
}
def addShutdownHook(consumer: ConsumerWrapper, conf: ConsumerConfig): Unit = {
Runtime.getRuntime.addShutdownHook(new Thread() {
override def run(): Unit = {
Exit.addShutdownHook("consumer-shutdown-hook", {
consumer.wakeup()
shutdownLatch.await()
@ -94,7 +93,6 @@ object ConsoleConsumer extends Logging {
if (conf.enableSystestEventsLogging) {
System.out.println("shutdown_complete")
}
}
})
}

View File

@ -44,11 +44,7 @@ object ConsoleProducer {
val producer = new KafkaProducer[Array[Byte], Array[Byte]](producerProps(config))
Runtime.getRuntime.addShutdownHook(new Thread() {
override def run(): Unit = {
producer.close()
}
})
Exit.addShutdownHook("producer-shutdown-hook", producer.close)
var record: ProducerRecord[Array[Byte], Array[Byte]] = null
do {

View File

@ -515,11 +515,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
offsetCommitIntervalMs = options.valueOf(offsetCommitIntervalMsOpt).intValue()
val numStreams = options.valueOf(numStreamsOpt).intValue()
Runtime.getRuntime.addShutdownHook(new Thread("MirrorMakerShutdownHook") {
override def run(): Unit = {
cleanShutdown()
}
})
Exit.addShutdownHook("MirrorMakerShutdownHook", cleanShutdown())
// create producer
val producerProps = Utils.loadProps(options.valueOf(producerConfigOpt))

View File

@ -200,11 +200,9 @@ object ReplicaVerificationTool extends Logging {
fetcherId = counter.incrementAndGet())
}
Runtime.getRuntime.addShutdownHook(new Thread() {
override def run(): Unit = {
Exit.addShutdownHook("ReplicaVerificationToolShutdownHook", {
info("Stopping all fetchers")
fetcherThreads.foreach(_.shutdown())
}
})
fetcherThreads.foreach(_.start())
println(s"${ReplicaVerificationTool.getCurrentTimeString()}: verification process is started.")

View File

@ -34,20 +34,30 @@ object Exit {
throw new AssertionError("halt should not return, but it did.")
}
def addShutdownHook(name: String, shutdownHook: => Unit): Unit = {
JExit.addShutdownHook(name, () => shutdownHook)
}
def setExitProcedure(exitProcedure: (Int, Option[String]) => Nothing): Unit =
JExit.setExitProcedure(functionToProcedure(exitProcedure))
def setHaltProcedure(haltProcedure: (Int, Option[String]) => Nothing): Unit =
JExit.setHaltProcedure(functionToProcedure(haltProcedure))
def setShutdownHookAdder(shutdownHookAdder: (String, => Unit) => Unit): Unit = {
JExit.setShutdownHookAdder((name, runnable) => shutdownHookAdder(name, runnable.run))
}
def resetExitProcedure(): Unit =
JExit.resetExitProcedure()
def resetHaltProcedure(): Unit =
JExit.resetHaltProcedure()
def resetShutdownHookAdder(): Unit =
JExit.resetShutdownHookAdder()
private def functionToProcedure(procedure: (Int, Option[String]) => Nothing) = new JExit.Procedure {
def execute(statusCode: Int, message: String): Unit = procedure(statusCode, Option(message))
}
}

View File

@ -49,7 +49,7 @@ import org.apache.directory.server.kerberos.shared.keytab.{Keytab, KeytabEntry}
import org.apache.directory.server.protocol.shared.transport.{TcpTransport, UdpTransport}
import org.apache.directory.server.xdbm.Index
import org.apache.directory.shared.kerberos.KerberosTime
import org.apache.kafka.common.utils.{Java, KafkaThread, Utils}
import org.apache.kafka.common.utils.{Java, Utils}
/**
* Mini KDC based on Apache Directory Server that can be embedded in tests or used from command line as a standalone
@ -370,7 +370,7 @@ object MiniKdc {
}
}
private def start(workDir: File, config: Properties, keytabFile: File, principals: Seq[String]): Unit = {
private[minikdc] def start(workDir: File, config: Properties, keytabFile: File, principals: Seq[String]): MiniKdc = {
val miniKdc = new MiniKdc(config, workDir)
miniKdc.start()
miniKdc.createPrincipal(keytabFile, principals: _*)
@ -390,9 +390,8 @@ object MiniKdc {
|
""".stripMargin
println(infoMessage)
Runtime.getRuntime.addShutdownHook(new KafkaThread("minikdc-shutdown-hook", false) {
miniKdc.stop()
})
Exit.addShutdownHook("minikdc-shutdown-hook", miniKdc.stop)
miniKdc
}
val OrgName = "org.name"

View File

@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.security.minikdc
import java.util.Properties
import kafka.utils.TestUtils
import org.junit.Test
import org.junit.Assert._
class MiniKdcTest {
@Test
def shouldNotStopImmediatelyWhenStarted(): Unit = {
val config = new Properties()
config.setProperty("kdc.bind.address", "0.0.0.0")
config.setProperty("transport", "TCP");
config.setProperty("max.ticket.lifetime", "86400000")
config.setProperty("org.name", "Example")
config.setProperty("kdc.port", "0")
config.setProperty("org.domain", "COM")
config.setProperty("max.renewable.lifetime", "604800000")
config.setProperty("instance", "DefaultKrbServer")
val minikdc = MiniKdc.start(TestUtils.tempDir(), config, TestUtils.tempFile(), List("foo"))
val running = System.getProperty(MiniKdc.JavaSecurityKrb5Conf) != null
try {
assertTrue("MiniKdc stopped immediately; it should not have", running)
} finally {
if (running) minikdc.stop()
}
}
}

View File

@ -0,0 +1,122 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.utils
import java.io.IOException
import org.junit.Assert.assertEquals
import org.junit.Test
import org.scalatest.Assertions.intercept
class ExitTest {
@Test
def shouldHaltImmediately(): Unit = {
val array:Array[Any] = Array("a", "b")
def haltProcedure(exitStatus: Int, message: Option[String]) : Nothing = {
array(0) = exitStatus
array(1) = message
throw new IOException()
}
Exit.setHaltProcedure(haltProcedure)
val statusCode = 0
val message = Some("message")
try {
intercept[IOException] {Exit.halt(statusCode)}
assertEquals(statusCode, array(0))
assertEquals(None, array(1))
intercept[IOException] {Exit.halt(statusCode, message)}
assertEquals(statusCode, array(0))
assertEquals(message, array(1))
} finally {
Exit.resetHaltProcedure()
}
}
@Test
def shouldExitImmediately(): Unit = {
val array:Array[Any] = Array("a", "b")
def exitProcedure(exitStatus: Int, message: Option[String]) : Nothing = {
array(0) = exitStatus
array(1) = message
throw new IOException()
}
Exit.setExitProcedure(exitProcedure)
val statusCode = 0
val message = Some("message")
try {
intercept[IOException] {Exit.exit(statusCode)}
assertEquals(statusCode, array(0))
assertEquals(None, array(1))
intercept[IOException] {Exit.exit(statusCode, message)}
assertEquals(statusCode, array(0))
assertEquals(message, array(1))
} finally {
Exit.resetExitProcedure()
}
}
@Test
def shouldAddShutdownHookImmediately(): Unit = {
val name = "name"
val array:Array[Any] = Array("", 0)
// immediately invoke the shutdown hook to mutate the data when a hook is added
def shutdownHookAdder(name: String, shutdownHook: => Unit) : Unit = {
// mutate the first element
array(0) = array(0).toString + name
// invoke the shutdown hook (see below, it mutates the second element)
shutdownHook
}
Exit.setShutdownHookAdder(shutdownHookAdder)
def sideEffect(): Unit = {
// mutate the second element
array(1) = array(1).asInstanceOf[Int] + 1
}
try {
Exit.addShutdownHook(name, sideEffect) // by-name parameter, only invoked due to above shutdownHookAdder
assertEquals(1, array(1))
assertEquals(name * array(1).asInstanceOf[Int], array(0).toString)
Exit.addShutdownHook(name, array(1) = array(1).asInstanceOf[Int] + 1) // by-name parameter, only invoked due to above shutdownHookAdder
assertEquals(2, array(1))
assertEquals(name * array(1).asInstanceOf[Int], array(0).toString)
} finally {
Exit.resetShutdownHookAdder()
}
}
@Test
def shouldNotInvokeShutdownHookImmediately(): Unit = {
val name = "name"
val array:Array[String] = Array(name)
def sideEffect(): Unit = {
// mutate the first element
array(0) = array(0) + name
}
Exit.addShutdownHook(name, sideEffect) // by-name parameter, not invoked
// make sure the first element wasn't mutated
assertEquals(name, array(0))
Exit.addShutdownHook(name, sideEffect()) // by-name parameter, not invoked
// again make sure the first element wasn't mutated
assertEquals(name, array(0))
Exit.addShutdownHook(name, array(0) = array(0) + name) // by-name parameter, not invoked
// again make sure the first element wasn't mutated
assertEquals(name, array(0))
}
}

View File

@ -57,13 +57,11 @@ object StressTestLog {
val reader = new ReaderThread(log)
reader.start()
Runtime.getRuntime().addShutdownHook(new Thread() {
override def run() = {
Exit.addShutdownHook("stress-test-shutdown-hook", {
running.set(false)
writer.join()
reader.join()
Utils.delete(dir)
}
})
while(running.get) {

View File

@ -19,6 +19,7 @@ package org.apache.kafka.streams.tests;
import java.time.Duration;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
@ -53,25 +54,21 @@ public class EosTestClient extends SmokeTestUtil {
private volatile boolean isRunning = true;
public void start() {
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
isRunning = false;
streams.close(Duration.ofSeconds(300));
Exit.addShutdownHook("streams-shutdown-hook", () -> {
isRunning = false;
streams.close(Duration.ofSeconds(300));
// need to wait for callback to avoid race condition
// -> make sure the callback printout to stdout is there as it is expected test output
waitForStateTransitionCallback();
// do not remove these printouts since they are needed for health scripts
if (!uncaughtException) {
System.out.println(System.currentTimeMillis());
System.out.println("EOS-TEST-CLIENT-CLOSED");
System.out.flush();
}
// need to wait for callback to avoid race condition
// -> make sure the callback printout to stdout is there as it is expected test output
waitForStateTransitionCallback();
// do not remove these printouts since they are needed for health scripts
if (!uncaughtException) {
System.out.println(System.currentTimeMillis());
System.out.println("EOS-TEST-CLIENT-CLOSED");
System.out.flush();
}
}));
});
while (isRunning) {
if (streams == null) {

View File

@ -69,11 +69,11 @@ public class EosTestDriver extends SmokeTestUtil {
static void generate(final String kafka) {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
Exit.addShutdownHook("streams-eos-test-driver-shutdown-hook", () -> {
System.out.println("Terminating");
System.out.flush();
isRunning = false;
}));
});
final Properties producerProps = new Properties();
producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "EosTest");

View File

@ -62,12 +62,7 @@ public class ShutdownDeadlockTest {
}
});
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
streams.close(Duration.ofSeconds(5));
}
}));
Exit.addShutdownHook("streams-shutdown-hook", () -> streams.close(Duration.ofSeconds(5)));
final Properties producerProps = new Properties();
producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest");

View File

@ -20,6 +20,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
@ -76,7 +77,7 @@ public class SmokeTestClient extends SmokeTestUtil {
e.printStackTrace();
});
Runtime.getRuntime().addShutdownHook(new Thread(this::close));
Exit.addShutdownHook("streams-shutdown-hook", () -> close());
thread = new Thread(() -> streams.start());
thread.start();

View File

@ -17,6 +17,7 @@
package org.apache.kafka.streams.tests;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
@ -70,15 +71,12 @@ public class StaticMemberTestClient {
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
System.out.println("closing Kafka Streams instance");
System.out.flush();
streams.close();
System.out.println("Static membership test closed");
System.out.flush();
}
Exit.addShutdownHook("streams-shutdown-hook", () -> {
System.out.println("closing Kafka Streams instance");
System.out.flush();
streams.close();
System.out.println("Static membership test closed");
System.out.flush();
});
}
}

View File

@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.KafkaStreams;
@ -114,12 +115,11 @@ public class StreamsBrokerDownResilienceTest {
System.out.println("Start Kafka Streams");
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
Exit.addShutdownHook("streams-shutdown-hook", () -> {
streams.close(Duration.ofSeconds(30));
System.out.println("Complete shutdown of streams resilience test app now");
System.out.flush();
}
));
});
}
private static boolean confirmCorrectConfigs(final Properties properties) {

View File

@ -19,6 +19,7 @@ package org.apache.kafka.streams.tests;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KafkaStreams.State;
@ -110,12 +111,12 @@ public class StreamsNamedRepartitionTest {
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
Exit.addShutdownHook("streams-shutdown-hook", () -> {
System.out.println("closing Kafka Streams instance");
System.out.flush();
streams.close(Duration.ofMillis(5000));
System.out.println("NAMED_REPARTITION_TEST Streams Stopped");
System.out.flush();
}));
});
}
}

View File

@ -19,6 +19,7 @@ package org.apache.kafka.streams.tests;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KafkaStreams.State;
@ -131,15 +132,12 @@ public class StreamsOptimizedTest {
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
System.out.println("closing Kafka Streams instance");
System.out.flush();
streams.close(Duration.ofMillis(5000));
System.out.println("OPTIMIZE_TEST Streams Stopped");
System.out.flush();
}
Exit.addShutdownHook("streams-shutdown-hook", () -> {
System.out.println("closing Kafka Streams instance");
System.out.flush();
streams.close(Duration.ofMillis(5000));
System.out.println("OPTIMIZE_TEST Streams Stopped");
System.out.flush();
});
}

View File

@ -21,6 +21,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
@ -147,10 +148,10 @@ public class StreamsStandByReplicaTest {
System.out.println("Start Kafka Streams");
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
Exit.addShutdownHook("streams-shutdown-hook", () -> {
shutdown(streams);
System.out.println("Shut down streams now");
}));
});
}
private static void shutdown(final KafkaStreams streams) {

View File

@ -27,6 +27,7 @@ import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.ByteBufferInputStream;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.KafkaStreams;
@ -81,13 +82,13 @@ public class StreamsUpgradeTest {
final KafkaStreams streams = buildStreams(streamsProperties);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
Exit.addShutdownHook("streams-shutdown-hook", () -> {
System.out.println("closing Kafka Streams instance");
System.out.flush();
streams.close();
System.out.println("UPGRADE-TEST-CLIENT-CLOSED");
System.out.flush();
}));
});
}
public static KafkaStreams buildStreams(final Properties streamsProperties) {

View File

@ -18,6 +18,7 @@ package org.apache.kafka.streams.tests;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KafkaStreams.State;
@ -108,11 +109,11 @@ public class StreamsUpgradeToCooperativeRebalanceTest {
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
Exit.addShutdownHook("streams-shutdown-hook", () -> {
streams.close();
System.out.println(String.format("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED", upgradePhase));
System.out.flush();
}));
});
}
private static void addTasksToBuilder(final List<String> tasks, final StringBuilder builder) {

View File

@ -33,6 +33,7 @@ import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.utils.Exit;
import java.io.IOException;
import java.time.Duration;
@ -264,7 +265,7 @@ public class TransactionalMessageCopier {
final AtomicBoolean isShuttingDown = new AtomicBoolean(false);
final AtomicLong remainingMessages = new AtomicLong(maxMessages);
final AtomicLong numMessagesProcessed = new AtomicLong(0);
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
Exit.addShutdownHook("transactional-message-copier-shutdown-hook", () -> {
isShuttingDown.set(true);
// Flush any remaining messages
producer.close();
@ -272,7 +273,7 @@ public class TransactionalMessageCopier {
consumer.close();
}
System.out.println(shutDownString(numMessagesProcessed.get(), remainingMessages.get(), transactionalId));
}));
});
try {
Random random = new Random();

View File

@ -644,7 +644,7 @@ public class VerifiableConsumer implements Closeable, OffsetCommitCallback, Cons
try {
final VerifiableConsumer consumer = createFromArgs(parser, args);
Runtime.getRuntime().addShutdownHook(new Thread(() -> consumer.close()));
Exit.addShutdownHook("verifiable-consumer-shutdown-hook", () -> consumer.close());
consumer.run();
} catch (ArgumentParserException e) {

View File

@ -241,10 +241,8 @@ public class VerifiableLog4jAppender {
final VerifiableLog4jAppender appender = createFromArgs(args);
boolean infinite = appender.maxMessages < 0;
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
// Trigger main thread to stop producing messages
appender.stopLogging = true;
}));
// Trigger main thread to stop producing messages when shutting down
Exit.addShutdownHook("verifiable-log4j-appender-shutdown-hook", () -> appender.stopLogging = true);
long maxMessages = infinite ? Long.MAX_VALUE : appender.maxMessages;
for (long i = 0; i < maxMessages; i++) {

View File

@ -517,7 +517,7 @@ public class VerifiableProducer implements AutoCloseable {
final long startMs = System.currentTimeMillis();
ThroughputThrottler throttler = new ThroughputThrottler(producer.throughput, startMs);
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
Exit.addShutdownHook("verifiable-producer-shutdown-hook", () -> {
// Trigger main thread to stop producing messages
producer.stopProducing = true;
@ -529,7 +529,7 @@ public class VerifiableProducer implements AutoCloseable {
double avgThroughput = 1000 * ((producer.numAcked) / (double) (stopMs - startMs));
producer.printJson(new ToolData(producer.numSent, producer.numAcked, producer.throughput, avgThroughput));
}));
});
producer.run(throttler);
} catch (ArgumentParserException e) {

View File

@ -249,7 +249,7 @@ public final class Agent {
log.info("Starting agent process.");
final Agent agent = new Agent(platform, Scheduler.SYSTEM, restServer, resource);
restServer.start(resource);
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
Exit.addShutdownHook("agent-shutdown-hook", () -> {
log.warn("Running agent shutdown hook.");
try {
agent.beginShutdown();
@ -257,7 +257,7 @@ public final class Agent {
} catch (Exception e) {
log.error("Got exception while running agent shutdown hook.", e);
}
}));
});
if (taskSpec != null) {
TaskSpec spec = null;
try {

View File

@ -171,7 +171,7 @@ public final class Coordinator {
final Coordinator coordinator = new Coordinator(platform, Scheduler.SYSTEM,
restServer, resource, ThreadLocalRandom.current().nextLong(0, Long.MAX_VALUE / 2));
restServer.start(resource);
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
Exit.addShutdownHook("coordinator-shutdown-hook", () -> {
log.warn("Running coordinator shutdown hook.");
try {
coordinator.beginShutdown(false);
@ -179,7 +179,7 @@ public final class Coordinator {
} catch (Exception e) {
log.error("Got exception while running coordinator shutdown hook.", e);
}
}));
});
coordinator.waitForShutdown();
}
};