diff --git a/core/src/main/scala/kafka/Kafka.scala b/core/src/main/scala/kafka/Kafka.scala index 6b5389fbe32..f32f23d3475 100755 --- a/core/src/main/scala/kafka/Kafka.scala +++ b/core/src/main/scala/kafka/Kafka.scala @@ -21,8 +21,8 @@ import java.util.Properties import joptsimple.OptionParser import kafka.server.{KafkaConfig, KafkaRaftServer, KafkaServer, Server} import kafka.utils.Implicits._ -import kafka.utils.{Exit, Logging} -import org.apache.kafka.common.utils.{Java, LoggingSignalHandler, OperatingSystem, Time, Utils} +import kafka.utils.Logging +import org.apache.kafka.common.utils.{Exit, Java, LoggingSignalHandler, OperatingSystem, Time, Utils} import org.apache.kafka.server.util.CommandLineUtils object Kafka extends Logging { @@ -99,7 +99,7 @@ object Kafka extends Logging { } // attach shutdown handler to catch terminating signals as well as normal termination - Exit.addShutdownHook("kafka-shutdown-hook", { + Exit.addShutdownHook("kafka-shutdown-hook", () => { try server.shutdown() catch { case _: Throwable => diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala b/core/src/main/scala/kafka/admin/AclCommand.scala index 4e99871a9dc..ccc2450594e 100644 --- a/core/src/main/scala/kafka/admin/AclCommand.scala +++ b/core/src/main/scala/kafka/admin/AclCommand.scala @@ -29,7 +29,7 @@ import org.apache.kafka.common.acl.AclPermissionType.{ALLOW, DENY} import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, Resource => JResource, ResourceType => JResourceType} import org.apache.kafka.common.security.JaasUtils import org.apache.kafka.common.security.auth.KafkaPrincipal -import org.apache.kafka.common.utils.{Utils, SecurityUtils => JSecurityUtils} +import org.apache.kafka.common.utils.{Exit, Utils, SecurityUtils => JSecurityUtils} import org.apache.kafka.security.authorizer.{AclEntry, AuthorizerUtils} import org.apache.kafka.server.authorizer.Authorizer import org.apache.kafka.server.config.ZkConfigs diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala index 04dce7a0255..8f84e309562 100644 --- a/core/src/main/scala/kafka/admin/ConfigCommand.scala +++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala @@ -23,7 +23,7 @@ import java.util.{Collections, Optional, Properties} import joptsimple._ import kafka.server.{DynamicBrokerConfig, DynamicConfig, KafkaConfig} import kafka.utils.Implicits._ -import kafka.utils.{Exit, Logging} +import kafka.utils.Logging import kafka.zk.{AdminZkClient, KafkaZkClient} import org.apache.kafka.clients.admin.{Admin, AlterClientQuotasOptions, AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeClusterOptions, DescribeConfigsOptions, ListTopicsOptions, ScramCredentialInfo, UserScramCredentialDeletion, UserScramCredentialUpsertion, Config => JConfig, ScramMechanism => PublicScramMechanism} import org.apache.kafka.common.config.{ConfigResource, TopicConfig} @@ -33,7 +33,7 @@ import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent} import org.apache.kafka.common.security.JaasUtils import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, ScramFormatter, ScramMechanism} -import org.apache.kafka.common.utils.{Sanitizer, Time, Utils} +import org.apache.kafka.common.utils.{Exit, Sanitizer, Time, Utils} import org.apache.kafka.server.config.{ConfigType, QuotaConfigs, ZkConfigs, ZooKeeperInternals} import org.apache.kafka.security.{PasswordEncoder, PasswordEncoderConfigs} import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils} diff --git a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala index b5c54596b21..33fec627486 100644 --- a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala +++ b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala @@ -19,11 +19,11 @@ package kafka.admin import joptsimple.{OptionSet, OptionSpec, OptionSpecBuilder} import kafka.server.KafkaConfig -import kafka.utils.{Exit, Logging, ToolsUtils} import kafka.utils.Implicits._ +import kafka.utils.{Logging, ToolsUtils} import kafka.zk.{ControllerZNode, KafkaZkClient, ZkData, ZkSecurityMigratorUtils} import org.apache.kafka.common.security.JaasUtils -import org.apache.kafka.common.utils.{Time, Utils} +import org.apache.kafka.common.utils.{Exit, Time, Utils} import org.apache.kafka.server.config.ZkConfigs import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils} import org.apache.zookeeper.AsyncCallback.{ChildrenCallback, StatCallback} diff --git a/core/src/main/scala/kafka/docker/KafkaDockerWrapper.scala b/core/src/main/scala/kafka/docker/KafkaDockerWrapper.scala index 8db498db127..361eb2954a2 100644 --- a/core/src/main/scala/kafka/docker/KafkaDockerWrapper.scala +++ b/core/src/main/scala/kafka/docker/KafkaDockerWrapper.scala @@ -18,10 +18,11 @@ package kafka.docker import kafka.Kafka import kafka.tools.StorageTool -import kafka.utils.{Exit, Logging} +import kafka.utils.Logging import net.sourceforge.argparse4j.ArgumentParsers import net.sourceforge.argparse4j.impl.Arguments.store import net.sourceforge.argparse4j.inf.Namespace +import org.apache.kafka.common.utils.Exit import java.nio.charset.StandardCharsets import java.nio.file.{Files, Path, Paths, StandardCopyOption, StandardOpenOption} @@ -41,7 +42,7 @@ object KafkaDockerWrapper extends Logging { case e: Throwable => val errMsg = s"error while preparing configs: ${e.getMessage}" System.err.println(errMsg) - Exit.exit(1, Some(errMsg)) + Exit.exit(1, errMsg) } val formatCmd = formatStorageCmd(finalConfigsPath, envVars) diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 7c04dee1f78..a254c74cb3f 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -27,7 +27,7 @@ import kafka.server._ import kafka.server.metadata.BrokerMetadataPublisher.info import kafka.utils._ import org.apache.kafka.common.{DirectoryId, KafkaException, TopicPartition, Uuid} -import org.apache.kafka.common.utils.{KafkaThread, Time, Utils} +import org.apache.kafka.common.utils.{Exit, KafkaThread, Time, Utils} import org.apache.kafka.common.errors.{InconsistentTopicIdException, KafkaStorageException, LogDirNotFoundException} import scala.jdk.CollectionConverters._ diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala index 6ef9f93180f..454552a08e4 100755 --- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala +++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala @@ -18,14 +18,14 @@ package kafka.server import kafka.network.RequestChannel -import kafka.utils.{Exit, Logging, Pool} +import kafka.utils.{Logging, Pool} import kafka.server.KafkaRequestHandler.{threadCurrentRequest, threadRequestChannel} import java.util.concurrent.{CountDownLatch, TimeUnit} import java.util.concurrent.atomic.AtomicInteger import com.yammer.metrics.core.Meter import org.apache.kafka.common.internals.FatalExitError -import org.apache.kafka.common.utils.{KafkaThread, Time} +import org.apache.kafka.common.utils.{Exit, KafkaThread, Time} import org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics import org.apache.kafka.server.metrics.KafkaMetricsGroup import org.apache.kafka.storage.log.metrics.BrokerTopicMetrics diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 11f2e4ff03c..a3834ba3580 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -51,7 +51,7 @@ import org.apache.kafka.common.replica._ import org.apache.kafka.common.requests.FetchRequest.PartitionData import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.requests._ -import org.apache.kafka.common.utils.Time +import org.apache.kafka.common.utils.{Exit, Time} import org.apache.kafka.common.{ElectionType, IsolationLevel, Node, TopicIdPartition, TopicPartition, Uuid} import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicsDelta} import org.apache.kafka.metadata.LeaderConstants.NO_LEADER diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala index 0f12a49b00a..43b37feb9d6 100644 --- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala @@ -24,12 +24,12 @@ import java.util.regex.Pattern import joptsimple.{OptionException, OptionParser, OptionSet, OptionSpec} import kafka.common.MessageReader import kafka.utils.Implicits._ -import kafka.utils.{Exit, Logging, ToolsUtils} +import kafka.utils.{Logging, ToolsUtils} import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback import org.apache.kafka.clients.producer.{KafkaProducer, Producer, ProducerConfig, ProducerRecord} import org.apache.kafka.common.KafkaException import org.apache.kafka.common.record.CompressionType -import org.apache.kafka.common.utils.Utils +import org.apache.kafka.common.utils.{Exit, Utils} import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils} import org.apache.kafka.tools.api.RecordReader diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index d7332c58270..bdb35f395ef 100755 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -30,7 +30,7 @@ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, Produce import org.apache.kafka.common.errors.{TimeoutException, WakeupException} import org.apache.kafka.common.record.RecordBatch import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer} -import org.apache.kafka.common.utils.{Time, Utils} +import org.apache.kafka.common.utils.{Exit, Time, Utils} import org.apache.kafka.common.{KafkaException, TopicPartition} import org.apache.kafka.server.metrics.KafkaMetricsGroup import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils} @@ -527,7 +527,7 @@ object MirrorMaker extends Logging { offsetCommitIntervalMs = options.valueOf(offsetCommitIntervalMsOpt).intValue() val numStreams = options.valueOf(numStreamsOpt).intValue() - Exit.addShutdownHook("MirrorMakerShutdownHook", cleanShutdown()) + Exit.addShutdownHook("MirrorMakerShutdownHook", () => cleanShutdown()) // create producer val producerProps = Utils.loadProps(options.valueOf(producerConfigOpt)) diff --git a/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala b/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala index 4ac918a63c1..cd840c88a69 100755 --- a/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala +++ b/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala @@ -23,11 +23,12 @@ import scala.util.matching.Regex import collection.mutable import java.util.Date import java.text.SimpleDateFormat -import kafka.utils.{CoreUtils, Exit, Logging} +import kafka.utils.{CoreUtils, Logging} import java.io.{BufferedOutputStream, OutputStream} import java.nio.charset.StandardCharsets import org.apache.kafka.common.internals.Topic +import org.apache.kafka.common.utils.Exit import org.apache.kafka.server.util.CommandLineUtils /** diff --git a/core/src/main/scala/kafka/tools/StorageTool.scala b/core/src/main/scala/kafka/tools/StorageTool.scala index 43eb6579765..91699d31df9 100644 --- a/core/src/main/scala/kafka/tools/StorageTool.scala +++ b/core/src/main/scala/kafka/tools/StorageTool.scala @@ -21,13 +21,13 @@ import kafka.server.KafkaConfig import java.io.PrintStream import java.nio.file.{Files, Paths} -import kafka.utils.{Exit, Logging} +import kafka.utils.Logging import net.sourceforge.argparse4j.ArgumentParsers import net.sourceforge.argparse4j.impl.Arguments.{append, store, storeTrue} import net.sourceforge.argparse4j.inf.{ArgumentParserException, Namespace} import net.sourceforge.argparse4j.internal.HelpScreenException import org.apache.kafka.common.Uuid -import org.apache.kafka.common.utils.Utils +import org.apache.kafka.common.utils.{Exit, Utils} import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils} import org.apache.kafka.metadata.storage.{Formatter, FormatterException} @@ -54,7 +54,7 @@ object StorageTool extends Logging { message = Some(e.getMessage) } message.foreach(System.err.println) - Exit.exit(exitCode, message) + Exit.exit(exitCode, message.orNull) } /** diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala b/core/src/main/scala/kafka/tools/TestRaftServer.scala index 06812f27953..1b6f2260002 100644 --- a/core/src/main/scala/kafka/tools/TestRaftServer.scala +++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala @@ -24,7 +24,7 @@ import joptsimple.{OptionException, OptionSpec} import kafka.network.{DataPlaneAcceptor, SocketServer} import kafka.raft.{KafkaRaftManager, RaftManager} import kafka.server.{KafkaConfig, KafkaRequestHandlerPool, SimpleApiVersionManager} -import kafka.utils.{CoreUtils, Exit, Logging} +import kafka.utils.{CoreUtils, Logging} import org.apache.kafka.common.errors.InvalidConfigurationException import org.apache.kafka.common.message.ApiMessageType.ListenerType import org.apache.kafka.common.metrics.Metrics @@ -33,7 +33,7 @@ import org.apache.kafka.common.metrics.stats.{Meter, Percentile, Percentiles} import org.apache.kafka.common.protocol.{ObjectSerializationCache, Writable} import org.apache.kafka.common.security.scram.internals.ScramMechanism import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache -import org.apache.kafka.common.utils.{Time, Utils} +import org.apache.kafka.common.utils.{Exit, Time, Utils} import org.apache.kafka.common.{TopicPartition, Uuid, protocol} import org.apache.kafka.raft.errors.NotLeaderException import org.apache.kafka.raft.{Batch, BatchReader, Endpoints, LeaderAndEpoch, RaftClient, QuorumConfig} @@ -479,7 +479,7 @@ object TestRaftServer extends Logging { val recordSize = opts.options.valueOf(opts.recordSizeOpt) val server = new TestRaftServer(config, Uuid.fromString(directoryIdAsString), throughput, recordSize) - Exit.addShutdownHook("raft-shutdown-hook", server.shutdown()) + Exit.addShutdownHook("raft-shutdown-hook", () => server.shutdown()) server.startup() server.awaitShutdown() diff --git a/core/src/main/scala/kafka/utils/Exit.scala b/core/src/main/scala/kafka/utils/Exit.scala deleted file mode 100644 index 84027100ec3..00000000000 --- a/core/src/main/scala/kafka/utils/Exit.scala +++ /dev/null @@ -1,83 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.utils - -import org.apache.kafka.common.utils.{Exit => JExit} - -/** - * Internal class that should be used instead of `System.exit()` and `Runtime.getRuntime().halt()` so that tests can - * easily change the behaviour. - */ -object Exit { - - def exit(statusCode: Int, message: Option[String] = None): Nothing = { - JExit.exit(statusCode, message.orNull) - throw new AssertionError("exit should not return, but it did.") - } - - def halt(statusCode: Int, message: Option[String] = None): Nothing = { - JExit.halt(statusCode, message.orNull) - throw new AssertionError("halt should not return, but it did.") - } - - def addShutdownHook(name: String, shutdownHook: => Unit): Unit = { - JExit.addShutdownHook(name, () => shutdownHook) - } - - /** - * For testing only, do not call in main code. - */ - def setExitProcedure(exitProcedure: (Int, Option[String]) => Nothing): Unit = - JExit.setExitProcedure(functionToProcedure(exitProcedure)) - - /** - * For testing only, do not call in main code. - */ - def setHaltProcedure(haltProcedure: (Int, Option[String]) => Nothing): Unit = - JExit.setHaltProcedure(functionToProcedure(haltProcedure)) - - /** - * For testing only, do not call in main code. - */ - def setShutdownHookAdder(shutdownHookAdder: (String, => Unit) => Unit): Unit = { - JExit.setShutdownHookAdder((name, runnable) => shutdownHookAdder(name, runnable.run())) - } - - /** - * For testing only, do not call in main code. - *
Clears the procedure set in [[setExitProcedure]], but does not restore system default behavior of exiting the JVM. - */ - def resetExitProcedure(): Unit = - JExit.resetExitProcedure() - - /** - * For testing only, do not call in main code. - *
Clears the procedure set in [[setHaltProcedure]], but does not restore system default behavior of exiting the JVM. - */ - def resetHaltProcedure(): Unit = - JExit.resetHaltProcedure() - - /** - * For testing only, do not call in main code. - */ - def resetShutdownHookAdder(): Unit = - JExit.resetShutdownHookAdder() - - private def functionToProcedure(procedure: (Int, Option[String]) => Nothing) = new JExit.Procedure { - def execute(statusCode: Int, message: String): Unit = procedure(statusCode, Option(message)) - } -} diff --git a/core/src/test/java/kafka/admin/AclCommandTest.java b/core/src/test/java/kafka/admin/AclCommandTest.java index a1909b558f7..cd40e1a0e6f 100644 --- a/core/src/test/java/kafka/admin/AclCommandTest.java +++ b/core/src/test/java/kafka/admin/AclCommandTest.java @@ -26,7 +26,6 @@ import kafka.test.annotation.ClusterTests; import kafka.test.annotation.Type; import kafka.test.junit.ClusterTestExtensions; import kafka.test.junit.ZkClusterInvocationContext; -import kafka.utils.Exit; import kafka.utils.TestUtils; import org.apache.kafka.common.acl.AccessControlEntry; @@ -38,6 +37,7 @@ import org.apache.kafka.common.resource.ResourcePattern; import org.apache.kafka.common.resource.ResourceType; import org.apache.kafka.common.security.auth.KafkaPrincipal; import org.apache.kafka.common.utils.AppInfoParser; +import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.common.utils.SecurityUtils; import org.apache.kafka.metadata.authorizer.StandardAuthorizer; diff --git a/core/src/test/java/kafka/admin/UserScramCredentialsCommandTest.java b/core/src/test/java/kafka/admin/UserScramCredentialsCommandTest.java index 771369e2419..342fb8cb35f 100644 --- a/core/src/test/java/kafka/admin/UserScramCredentialsCommandTest.java +++ b/core/src/test/java/kafka/admin/UserScramCredentialsCommandTest.java @@ -19,8 +19,8 @@ package kafka.admin; import kafka.test.ClusterInstance; import kafka.test.annotation.ClusterTest; import kafka.test.junit.ClusterTestExtensions; -import kafka.utils.Exit; +import org.apache.kafka.common.utils.Exit; import org.apache.kafka.test.NoRetryException; import org.apache.kafka.test.TestUtils; diff --git a/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala b/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala index 4308aebd713..1ef07a2c800 100644 --- a/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala +++ b/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala @@ -25,7 +25,7 @@ import java.nio.file.Files import java.text.MessageFormat import java.util.{Locale, Properties, UUID} -import kafka.utils.{CoreUtils, Exit, Logging} +import kafka.utils.{CoreUtils, Logging} import scala.jdk.CollectionConverters._ import org.apache.commons.lang.text.StrSubstitutor @@ -49,7 +49,7 @@ import org.apache.directory.server.kerberos.shared.keytab.{Keytab, KeytabEntry} import org.apache.directory.server.protocol.shared.transport.{TcpTransport, UdpTransport} import org.apache.directory.server.xdbm.Index import org.apache.directory.shared.kerberos.KerberosTime -import org.apache.kafka.common.utils.{Java, Utils} +import org.apache.kafka.common.utils.{Exit, Java, Utils} /** * Mini KDC based on Apache Directory Server that can be embedded in tests or used from command line as a standalone @@ -397,7 +397,7 @@ object MiniKdc { | """.stripMargin println(infoMessage) - Exit.addShutdownHook("minikdc-shutdown-hook", miniKdc.stop()) + Exit.addShutdownHook("minikdc-shutdown-hook", () => miniKdc.stop()) miniKdc } diff --git a/core/src/test/scala/kafka/tools/LogCompactionTester.scala b/core/src/test/scala/kafka/tools/LogCompactionTester.scala index 616e0d0b57d..2ea6c3aae6c 100755 --- a/core/src/test/scala/kafka/tools/LogCompactionTester.scala +++ b/core/src/test/scala/kafka/tools/LogCompactionTester.scala @@ -32,7 +32,7 @@ import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, KafkaConsume import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.serialization.{ByteArraySerializer, StringDeserializer} -import org.apache.kafka.common.utils.{AbstractIterator, Utils} +import org.apache.kafka.common.utils.{Exit, AbstractIterator, Utils} import org.apache.kafka.server.util.CommandLineUtils import scala.jdk.CollectionConverters._ diff --git a/core/src/test/scala/kafka/utils/ExitTest.scala b/core/src/test/scala/kafka/utils/ExitTest.scala deleted file mode 100644 index fcb2e9aaded..00000000000 --- a/core/src/test/scala/kafka/utils/ExitTest.scala +++ /dev/null @@ -1,121 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.utils - -import java.io.IOException - -import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows} -import org.junit.jupiter.api.Test - -class ExitTest { - @Test - def shouldHaltImmediately(): Unit = { - val array:Array[Any] = Array("a", "b") - def haltProcedure(exitStatus: Int, message: Option[String]) : Nothing = { - array(0) = exitStatus - array(1) = message - throw new IOException() - } - Exit.setHaltProcedure(haltProcedure) - val statusCode = 0 - val message = Some("message") - try { - assertThrows(classOf[IOException], () => Exit.halt(statusCode)) - assertEquals(statusCode, array(0)) - assertEquals(None, array(1)) - - assertThrows(classOf[IOException], () => Exit.halt(statusCode, message)) - assertEquals(statusCode, array(0)) - assertEquals(message, array(1)) - } finally { - Exit.resetHaltProcedure() - } - } - - @Test - def shouldExitImmediately(): Unit = { - val array:Array[Any] = Array("a", "b") - def exitProcedure(exitStatus: Int, message: Option[String]) : Nothing = { - array(0) = exitStatus - array(1) = message - throw new IOException() - } - Exit.setExitProcedure(exitProcedure) - val statusCode = 0 - val message = Some("message") - try { - assertThrows(classOf[IOException], () => Exit.exit(statusCode)) - assertEquals(statusCode, array(0)) - assertEquals(None, array(1)) - - assertThrows(classOf[IOException], () => Exit.exit(statusCode, message)) - assertEquals(statusCode, array(0)) - assertEquals(message, array(1)) - } finally { - Exit.resetExitProcedure() - } - } - - @Test - def shouldAddShutdownHookImmediately(): Unit = { - val name = "name" - val array:Array[Any] = Array("", 0) - // immediately invoke the shutdown hook to mutate the data when a hook is added - def shutdownHookAdder(name: String, shutdownHook: => Unit) : Unit = { - // mutate the first element - array(0) = array(0).toString + name - // invoke the shutdown hook (see below, it mutates the second element) - shutdownHook - } - Exit.setShutdownHookAdder(shutdownHookAdder) - def sideEffect(): Unit = { - // mutate the second element - array(1) = array(1).asInstanceOf[Int] + 1 - } - try { - Exit.addShutdownHook(name, sideEffect()) // by-name parameter, only invoked due to above shutdownHookAdder - assertEquals(1, array(1)) - assertEquals(name * array(1).asInstanceOf[Int], array(0).toString) - Exit.addShutdownHook(name, array(1) = array(1).asInstanceOf[Int] + 1) // by-name parameter, only invoked due to above shutdownHookAdder - assertEquals(2, array(1)) - assertEquals(name * array(1).asInstanceOf[Int], array(0).toString) - } finally { - Exit.resetShutdownHookAdder() - } - } - - @Test - def shouldNotInvokeShutdownHookImmediately(): Unit = { - val name = "name" - val array:Array[String] = Array(name) - - def sideEffect(): Unit = { - // mutate the first element - array(0) = array(0) + name - } - Exit.addShutdownHook(name, sideEffect()) // by-name parameter, not invoked - // make sure the first element wasn't mutated - assertEquals(name, array(0)) - Exit.addShutdownHook(name, sideEffect()) // by-name parameter, not invoked - // again make sure the first element wasn't mutated - assertEquals(name, array(0)) - Exit.addShutdownHook(name, array(0) = array(0) + name) // by-name parameter, not invoked - // again make sure the first element wasn't mutated - assertEquals(name, array(0)) - } -} diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala b/core/src/test/scala/other/kafka/StressTestLog.scala index b21a4190a58..da3a680ef89 100755 --- a/core/src/test/scala/other/kafka/StressTestLog.scala +++ b/core/src/test/scala/other/kafka/StressTestLog.scala @@ -25,7 +25,7 @@ import kafka.utils._ import org.apache.kafka.clients.consumer.OffsetOutOfRangeException import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.record.FileRecords -import org.apache.kafka.common.utils.Utils +import org.apache.kafka.common.utils.{Exit, Utils} import org.apache.kafka.coordinator.transaction.TransactionLogConfigs import org.apache.kafka.server.util.MockTime import org.apache.kafka.storage.internals.log.{FetchIsolation, LogConfig, LogDirFailureChannel, ProducerStateManagerConfig} @@ -63,7 +63,7 @@ object StressTestLog { val reader = new ReaderThread(log) reader.start() - Exit.addShutdownHook("stress-test-shutdown-hook", { + Exit.addShutdownHook("stress-test-shutdown-hook", () => { running.set(false) writer.join() reader.join() diff --git a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala index b973bac55a6..9dc8a16774b 100755 --- a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala +++ b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala @@ -25,11 +25,10 @@ import java.util.{Properties, Random} import joptsimple._ import kafka.log._ import kafka.server.BrokerTopicStats -import kafka.utils._ import org.apache.kafka.common.compress.{Compression, GzipCompression, Lz4Compression, ZstdCompression} import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.record._ -import org.apache.kafka.common.utils.{Time, Utils} +import org.apache.kafka.common.utils.{Exit, Time, Utils} import org.apache.kafka.coordinator.transaction.TransactionLogConfigs import org.apache.kafka.server.util.{KafkaScheduler, Scheduler} import org.apache.kafka.server.util.CommandLineUtils diff --git a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala index 3a1fc2e4bda..f3e750d4659 100644 --- a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala @@ -20,12 +20,13 @@ import java.nio.file.Files import java.util import java.util.Properties import kafka.server.KafkaConfig -import kafka.utils.{Exit, TestUtils} +import kafka.utils.TestUtils import kafka.utils.TestUtils.assertBadConfigContainingMessage import org.apache.kafka.common.config.SslConfigs import org.apache.kafka.common.config.internals.BrokerSecurityConfigs import org.apache.kafka.common.config.types.Password import org.apache.kafka.common.internals.FatalExitError +import org.apache.kafka.common.utils.Exit import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.raft.QuorumConfig import org.apache.kafka.server.config.{KRaftConfigs, ZkConfigs} diff --git a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala index a8b7a280def..5f885af596e 100644 --- a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala @@ -22,12 +22,12 @@ import java.util.concurrent.{ExecutionException, TimeUnit} import kafka.api.IntegrationTestHarness import kafka.controller.{OfflineReplica, PartitionAndReplica} import kafka.utils.TestUtils.{Checkpoint, LogDirFailureType, Roll, waitUntilTrue} -import kafka.utils.{CoreUtils, Exit, TestUtils} +import kafka.utils.{CoreUtils, TestUtils} import org.apache.kafka.clients.consumer.Consumer import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.{KafkaStorageException, NotLeaderOrFollowerException} -import org.apache.kafka.common.utils.Utils +import org.apache.kafka.common.utils.{Exit, Utils} import org.apache.kafka.metadata.BrokerState import org.apache.kafka.server.config.{ReplicationConfigs, ServerConfigs, ServerLogConfigs} import org.junit.jupiter.api.Assertions._ diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index b1fffa88518..51ca4b927ce 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -29,7 +29,7 @@ import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota} import kafka.server.checkpoints.{LazyOffsetCheckpoints, OffsetCheckpointFile} import kafka.server.epoch.util.MockBlockingSender import kafka.utils.TestUtils.waitUntilTrue -import kafka.utils.{Exit, Pool, TestUtils} +import kafka.utils.{Pool, TestUtils} import kafka.zk.KafkaZkClient import org.apache.kafka.clients.FetchSessionHandler import org.apache.kafka.common.{DirectoryId, IsolationLevel, Node, TopicIdPartition, TopicPartition, Uuid} @@ -52,7 +52,7 @@ import org.apache.kafka.common.requests.FetchRequest.PartitionData import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.requests._ import org.apache.kafka.common.security.auth.KafkaPrincipal -import org.apache.kafka.common.utils.{LogContext, Time, Utils} +import org.apache.kafka.common.utils.{Exit, LogContext, Time, Utils} import org.apache.kafka.coordinator.transaction.TransactionLogConfigs import org.apache.kafka.image._ import org.apache.kafka.metadata.LeaderConstants.NO_LEADER @@ -6698,10 +6698,7 @@ class ReplicaManagerTest { logManager.startup(Set.empty[String]) replicaManager.startup() - def haltProcedure(exitStatus: Int, message: Option[String]): Nothing = { - fail("Test failure, broker should not have halted") - } - Exit.setHaltProcedure(haltProcedure) + Exit.setHaltProcedure((_, _) => fail("Test failure, broker should not have halted")) // When logDirFailureChannel.maybeAddOfflineLogDir(logDirFiles.head.getAbsolutePath, "test failure", null) diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala index 802b206e14d..c10a945f8d2 100644 --- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala @@ -16,7 +16,7 @@ */ package kafka.server -import kafka.utils.{CoreUtils, Exit, TestUtils} +import kafka.utils.{CoreUtils, TestUtils} import java.io.{DataInputStream, File} import java.net.ServerSocket @@ -36,7 +36,7 @@ import org.apache.kafka.common.protocol.ApiKeys import org.apache.kafka.common.requests.LeaderAndIsrRequest import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.serialization.{IntegerDeserializer, IntegerSerializer, StringDeserializer, StringSerializer} -import org.apache.kafka.common.utils.Time +import org.apache.kafka.common.utils.{Exit, Time} import org.apache.kafka.metadata.BrokerState import org.apache.kafka.server.config.{KRaftConfigs, ServerLogConfigs, ZkConfigs} import org.junit.jupiter.api.{BeforeEach, TestInfo, Timeout} diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala index 0c232d19f31..82d031b79a3 100644 --- a/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala +++ b/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala @@ -18,10 +18,10 @@ package kafka.tools import kafka.common.MessageReader - import kafka.tools.ConsoleProducer.LineMessageReader -import kafka.utils.{Exit, TestUtils} +import kafka.utils.TestUtils import org.apache.kafka.clients.producer.{Producer, ProducerConfig, ProducerRecord} +import org.apache.kafka.common.utils.Exit import org.apache.kafka.tools.api.RecordReader import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue} import org.junit.jupiter.api.Test @@ -133,7 +133,7 @@ class ConsoleProducerTest { @Test def testInvalidConfigs(): Unit = { - Exit.setExitProcedure((_, message) => throw new IllegalArgumentException(message.orNull)) + Exit.setExitProcedure((_, message) => throw new IllegalArgumentException(message)) try assertThrows(classOf[IllegalArgumentException], () => new ConsoleProducer.ProducerConfig(invalidArgs)) finally Exit.resetExitProcedure() } diff --git a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala index 977f9dcb3cb..0655d5c965e 100644 --- a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala +++ b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala @@ -28,7 +28,7 @@ import kafka.log.{LogTestUtils, UnifiedLog} import kafka.raft.{KafkaMetadataLog, MetadataLogConfig} import kafka.server.{BrokerTopicStats, KafkaRaftServer} import kafka.tools.DumpLogSegments.{OffsetsMessageParser, TimeIndexDumpErrors} -import kafka.utils.{Exit, TestUtils, VerifiableProperties} +import kafka.utils.{TestUtils, VerifiableProperties} import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.{Assignment, Subscription} import org.apache.kafka.clients.consumer.internals.ConsumerProtocol import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} @@ -37,7 +37,7 @@ import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.metadata.{PartitionChangeRecord, RegisterBrokerRecord, TopicRecord} import org.apache.kafka.common.protocol.{ByteBufferAccessor, ObjectSerializationCache} import org.apache.kafka.common.record.{ControlRecordType, EndTransactionMarker, MemoryRecords, Record, RecordBatch, RecordVersion, SimpleRecord} -import org.apache.kafka.common.utils.Utils +import org.apache.kafka.common.utils.{Exit, Utils} import org.apache.kafka.coordinator.group.{CoordinatorRecord, CoordinatorRecordSerde} import org.apache.kafka.coordinator.group.generated.{ConsumerGroupMemberMetadataValue, ConsumerGroupMetadataKey, ConsumerGroupMetadataValue, GroupMetadataKey, GroupMetadataValue} import org.apache.kafka.coordinator.transaction.TransactionLogConfigs @@ -451,7 +451,7 @@ class DumpLogSegmentsTest { @Test def testDumpRemoteLogMetadataNoFilesFlag(): Unit = { - Exit.setExitProcedure((_, message) => throw new IllegalArgumentException(message.orNull)) + Exit.setExitProcedure((_, message) => throw new IllegalArgumentException(message)) try { val thrown = assertThrows(classOf[IllegalArgumentException], () => runDumpLogSegments(Array("--remote-log-metadata-decoder"))) assertTrue(thrown.getMessage.equals("Missing required argument \"[files]\"")) diff --git a/tools/src/test/java/org/apache/kafka/tools/ClientMetricsCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/ClientMetricsCommandTest.java index 83ec0a38075..804c70fd07c 100644 --- a/tools/src/test/java/org/apache/kafka/tools/ClientMetricsCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/ClientMetricsCommandTest.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.tools; -import kafka.utils.Exit; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AdminClientTestUtils; @@ -27,6 +26,7 @@ import org.apache.kafka.clients.admin.DescribeConfigsResult; import org.apache.kafka.clients.admin.ListClientMetricsResourcesResult; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.utils.Exit; import org.junit.jupiter.api.Test; diff --git a/tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java index d47e113c211..e6399a87072 100644 --- a/tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.tools; -import kafka.utils.Exit; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AdminClientTestUtils; @@ -33,6 +32,7 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartitionInfo; import org.apache.kafka.common.errors.ThrottlingQuotaExceededException; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.utils.Exit; import org.apache.kafka.server.common.AdminCommandFailedException; import org.apache.kafka.server.common.AdminOperationException; diff --git a/tools/src/test/java/org/apache/kafka/tools/other/ReplicationQuotasTestRig.java b/tools/src/test/java/org/apache/kafka/tools/other/ReplicationQuotasTestRig.java index 1fea9fe0a19..cd3cb496906 100644 --- a/tools/src/test/java/org/apache/kafka/tools/other/ReplicationQuotasTestRig.java +++ b/tools/src/test/java/org/apache/kafka/tools/other/ReplicationQuotasTestRig.java @@ -22,7 +22,6 @@ import kafka.server.KafkaServer; import kafka.server.QuorumTestHarness; import kafka.server.QuotaType; import kafka.utils.EmptyTestInfo; -import kafka.utils.Exit; import kafka.utils.TestUtils; import org.apache.kafka.clients.admin.Admin; @@ -35,6 +34,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartitionInfo; import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.tools.reassign.ReassignPartitionsCommand; @@ -123,7 +123,7 @@ public class ReplicationQuotasTestRig { experiments.forEach(def -> run(def, journal, displayChartsOnScreen)); if (!displayChartsOnScreen) - Exit.exit(0, Option.empty()); + Exit.exit(0); } static void run(ExperimentDef config, Journal journal, boolean displayChartsOnScreen) {