mirror of https://github.com/apache/kafka.git
				
				
				
			MINOR: Use reflection for signal handler and do not enable it for IBM JDK (#5047)
The Signal classes are not available in the compile classpath if --release is used so we use reflection as a workaround. As part of that moved the code to Java and added a simple unit test. Also disabled the signal handler if the IBM JDK is being used due to KAFKA-6918. Manually tested shutdown via ctrl+c and verified that the message is printed.
This commit is contained in:
		
							parent
							
								
									440445e7c5
								
							
						
					
					
						commit
						70f0d0bd3f
					
				|  | @ -0,0 +1,101 @@ | ||||||
|  | /* | ||||||
|  |  * Licensed to the Apache Software Foundation (ASF) under one or more | ||||||
|  |  * contributor license agreements. See the NOTICE file distributed with | ||||||
|  |  * this work for additional information regarding copyright ownership. | ||||||
|  |  * The ASF licenses this file to You under the Apache License, Version 2.0 | ||||||
|  |  * (the "License"); you may not use this file except in compliance with | ||||||
|  |  * the License. You may obtain a copy of the License at | ||||||
|  |  * | ||||||
|  |  *    http://www.apache.org/licenses/LICENSE-2.0 | ||||||
|  |  * | ||||||
|  |  * Unless required by applicable law or agreed to in writing, software | ||||||
|  |  * distributed under the License is distributed on an "AS IS" BASIS, | ||||||
|  |  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||
|  |  * See the License for the specific language governing permissions and | ||||||
|  |  * limitations under the License. | ||||||
|  |  */ | ||||||
|  | package org.apache.kafka.common.utils; | ||||||
|  | 
 | ||||||
|  | import org.slf4j.Logger; | ||||||
|  | import org.slf4j.LoggerFactory; | ||||||
|  | 
 | ||||||
|  | import java.lang.reflect.Constructor; | ||||||
|  | import java.lang.reflect.InvocationHandler; | ||||||
|  | import java.lang.reflect.Method; | ||||||
|  | import java.lang.reflect.Proxy; | ||||||
|  | import java.util.Map; | ||||||
|  | import java.util.concurrent.ConcurrentHashMap; | ||||||
|  | 
 | ||||||
|  | public class LoggingSignalHandler { | ||||||
|  | 
 | ||||||
|  |     private static final Logger log = LoggerFactory.getLogger(LoggingSignalHandler.class); | ||||||
|  | 
 | ||||||
|  |     private final Constructor<?> signalConstructor; | ||||||
|  |     private final Class<?> signalHandlerClass; | ||||||
|  |     private final Class<?> signalClass; | ||||||
|  |     private final Method signalHandleMethod; | ||||||
|  |     private final Method signalGetNameMethod; | ||||||
|  |     private final Method signalHandlerHandleMethod; | ||||||
|  | 
 | ||||||
|  |     /** | ||||||
|  |      * Create an instance of this class. | ||||||
|  |      * | ||||||
|  |      * @throws ReflectiveOperationException if the underlying API has changed in an incompatible manner. | ||||||
|  |      */ | ||||||
|  |     public LoggingSignalHandler() throws ReflectiveOperationException { | ||||||
|  |         signalClass = Class.forName("sun.misc.Signal"); | ||||||
|  |         signalConstructor = signalClass.getConstructor(String.class); | ||||||
|  |         signalHandlerClass = Class.forName("sun.misc.SignalHandler"); | ||||||
|  |         signalHandlerHandleMethod = signalHandlerClass.getMethod("handle", signalClass); | ||||||
|  |         signalHandleMethod = signalClass.getMethod("handle", signalClass, signalHandlerClass); | ||||||
|  |         signalGetNameMethod = signalClass.getMethod("getName"); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     /** | ||||||
|  |      * Register signal handler to log termination due to SIGTERM, SIGHUP and SIGINT (control-c). This method | ||||||
|  |      * does not currently work on Windows. | ||||||
|  |      * | ||||||
|  |      * @implNote sun.misc.Signal and sun.misc.SignalHandler are described as "not encapsulated" in | ||||||
|  |      * http://openjdk.java.net/jeps/260. However, they are not available in the compile classpath if the `--release` | ||||||
|  |      * flag is used. As a workaround, we rely on reflection. | ||||||
|  |      */ | ||||||
|  |     public void register() throws ReflectiveOperationException { | ||||||
|  |         Map<String, Object> jvmSignalHandlers = new ConcurrentHashMap<>(); | ||||||
|  |         register("TERM", jvmSignalHandlers); | ||||||
|  |         register("INT", jvmSignalHandlers); | ||||||
|  |         register("HUP", jvmSignalHandlers); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     private Object createSignalHandler(final Map<String, Object> jvmSignalHandlers) { | ||||||
|  |         InvocationHandler invocationHandler = new InvocationHandler() { | ||||||
|  | 
 | ||||||
|  |             private String getName(Object signal) throws ReflectiveOperationException { | ||||||
|  |                 return (String) signalGetNameMethod.invoke(signal); | ||||||
|  |             } | ||||||
|  | 
 | ||||||
|  |             private void handle(Object signalHandler, Object signal) throws ReflectiveOperationException { | ||||||
|  |                 signalHandlerHandleMethod.invoke(signalHandler, signal); | ||||||
|  |             } | ||||||
|  | 
 | ||||||
|  |             @Override | ||||||
|  |             public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { | ||||||
|  |                 Object signal = args[0]; | ||||||
|  |                 log.info("Terminating process due to signal {}", signal); | ||||||
|  |                 Object handler = jvmSignalHandlers.get(getName(signal)); | ||||||
|  |                 if (handler != null) | ||||||
|  |                     handle(handler, signal); | ||||||
|  |                 return null; | ||||||
|  |             } | ||||||
|  |         }; | ||||||
|  |         return Proxy.newProxyInstance(Utils.getContextOrKafkaClassLoader(), new Class[] {signalHandlerClass}, | ||||||
|  |                 invocationHandler); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     private void register(String signalName, final Map<String, Object> jvmSignalHandlers) throws ReflectiveOperationException { | ||||||
|  |         Object signal = signalConstructor.newInstance(signalName); | ||||||
|  |         Object signalHandler = createSignalHandler(jvmSignalHandlers); | ||||||
|  |         Object oldHandler = signalHandleMethod.invoke(null, signal, signalHandler); | ||||||
|  |         if (oldHandler != null) | ||||||
|  |             jvmSignalHandlers.put(signalName, oldHandler); | ||||||
|  |     } | ||||||
|  | } | ||||||
|  | @ -0,0 +1,28 @@ | ||||||
|  | /* | ||||||
|  |  * Licensed to the Apache Software Foundation (ASF) under one or more | ||||||
|  |  * contributor license agreements. See the NOTICE file distributed with | ||||||
|  |  * this work for additional information regarding copyright ownership. | ||||||
|  |  * The ASF licenses this file to You under the Apache License, Version 2.0 | ||||||
|  |  * (the "License"); you may not use this file except in compliance with | ||||||
|  |  * the License. You may obtain a copy of the License at | ||||||
|  |  * | ||||||
|  |  *    http://www.apache.org/licenses/LICENSE-2.0 | ||||||
|  |  * | ||||||
|  |  * Unless required by applicable law or agreed to in writing, software | ||||||
|  |  * distributed under the License is distributed on an "AS IS" BASIS, | ||||||
|  |  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||
|  |  * See the License for the specific language governing permissions and | ||||||
|  |  * limitations under the License. | ||||||
|  |  */ | ||||||
|  | package org.apache.kafka.common.utils; | ||||||
|  | 
 | ||||||
|  | import org.junit.Test; | ||||||
|  | 
 | ||||||
|  | public class LoggingSignalHandlerTest { | ||||||
|  | 
 | ||||||
|  |     @Test | ||||||
|  |     public void testRegister() throws ReflectiveOperationException { | ||||||
|  |         new LoggingSignalHandler().register(); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  | } | ||||||
|  | @ -18,14 +18,12 @@ | ||||||
| package kafka | package kafka | ||||||
| 
 | 
 | ||||||
| import java.util.Properties | import java.util.Properties | ||||||
| import java.util.concurrent.ConcurrentHashMap |  | ||||||
| 
 | 
 | ||||||
| import sun.misc.{Signal, SignalHandler} |  | ||||||
| import joptsimple.OptionParser | import joptsimple.OptionParser | ||||||
| import kafka.utils.Implicits._ | import kafka.utils.Implicits._ | ||||||
| import kafka.server.{KafkaServer, KafkaServerStartable} | import kafka.server.{KafkaServer, KafkaServerStartable} | ||||||
| import kafka.utils.{CommandLineUtils, Exit, Logging} | import kafka.utils.{CommandLineUtils, Exit, Logging} | ||||||
| import org.apache.kafka.common.utils.{OperatingSystem, Utils} | import org.apache.kafka.common.utils.{Java, LoggingSignalHandler, OperatingSystem, Utils} | ||||||
| 
 | 
 | ||||||
| import scala.collection.JavaConverters._ | import scala.collection.JavaConverters._ | ||||||
| 
 | 
 | ||||||
|  | @ -55,34 +53,19 @@ object Kafka extends Logging { | ||||||
|     props |     props | ||||||
|   } |   } | ||||||
| 
 | 
 | ||||||
|   private def registerLoggingSignalHandler(): Unit = { |  | ||||||
|     val jvmSignalHandlers = new ConcurrentHashMap[String, SignalHandler]().asScala |  | ||||||
|     val handler = new SignalHandler() { |  | ||||||
|       override def handle(signal: Signal) { |  | ||||||
|         info(s"Terminating process due to signal $signal") |  | ||||||
|         jvmSignalHandlers.get(signal.getName).foreach(_.handle(signal)) |  | ||||||
|       } |  | ||||||
|     } |  | ||||||
|     def registerHandler(signalName: String) { |  | ||||||
|       val oldHandler = Signal.handle(new Signal(signalName), handler) |  | ||||||
|       if (oldHandler != null) |  | ||||||
|         jvmSignalHandlers.put(signalName, oldHandler) |  | ||||||
|     } |  | ||||||
| 
 |  | ||||||
|     if (!OperatingSystem.IS_WINDOWS) { |  | ||||||
|       registerHandler("TERM") |  | ||||||
|       registerHandler("INT") |  | ||||||
|       registerHandler("HUP") |  | ||||||
|     } |  | ||||||
|   } |  | ||||||
| 
 |  | ||||||
|   def main(args: Array[String]): Unit = { |   def main(args: Array[String]): Unit = { | ||||||
|     try { |     try { | ||||||
|       val serverProps = getPropsFromArgs(args) |       val serverProps = getPropsFromArgs(args) | ||||||
|       val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps) |       val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps) | ||||||
| 
 | 
 | ||||||
|       // register signal handler to log termination due to SIGTERM, SIGHUP and SIGINT (control-c) |       try { | ||||||
|       registerLoggingSignalHandler() |         if (!OperatingSystem.IS_WINDOWS && !Java.isIbmJdk) | ||||||
|  |           new LoggingSignalHandler().register() | ||||||
|  |       } catch { | ||||||
|  |         case e: ReflectiveOperationException => | ||||||
|  |           warn("Failed to register optional signal handler that logs a message when the process is terminated " + | ||||||
|  |             s"by a signal. Reason for registration failure is: $e", e) | ||||||
|  |       } | ||||||
| 
 | 
 | ||||||
|       // attach shutdown handler to catch terminating signals as well as normal termination |       // attach shutdown handler to catch terminating signals as well as normal termination | ||||||
|       Runtime.getRuntime().addShutdownHook(new Thread("kafka-shutdown-hook") { |       Runtime.getRuntime().addShutdownHook(new Thread("kafka-shutdown-hook") { | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue