From 38ebb8f48a8e580740ce98d152227489c76734ca Mon Sep 17 00:00:00 2001 From: TengYao Chi Date: Mon, 12 Aug 2024 07:42:28 +0800 Subject: [PATCH] KAFKA-17270 Let test code use Exit.java rather than Exit.scala (#16821) Exit.scala is a wrapper around Exit.java, and its main benefit is that it allows methods to throw exceptions by returning Nothing. This benefit is minimal, and since we are planning to phase out Scala code, it would be better to use Exit.java directly and remove Exit.scala. Reviewers: Chia-Ping Tsai --- core/src/main/scala/kafka/Kafka.scala | 6 +- .../main/scala/kafka/admin/AclCommand.scala | 2 +- .../scala/kafka/admin/ConfigCommand.scala | 4 +- .../kafka/admin/ZkSecurityMigrator.scala | 4 +- .../kafka/docker/KafkaDockerWrapper.scala | 5 +- .../src/main/scala/kafka/log/LogManager.scala | 2 +- .../kafka/server/KafkaRequestHandler.scala | 4 +- .../scala/kafka/server/ReplicaManager.scala | 2 +- .../scala/kafka/tools/ConsoleProducer.scala | 4 +- .../main/scala/kafka/tools/MirrorMaker.scala | 4 +- .../kafka/tools/StateChangeLogMerger.scala | 3 +- .../main/scala/kafka/tools/StorageTool.scala | 6 +- .../scala/kafka/tools/TestRaftServer.scala | 6 +- core/src/main/scala/kafka/utils/Exit.scala | 83 ------------ .../test/java/kafka/admin/AclCommandTest.java | 2 +- .../UserScramCredentialsCommandTest.java | 2 +- .../kafka/security/minikdc/MiniKdc.scala | 6 +- .../kafka/tools/LogCompactionTester.scala | 2 +- .../src/test/scala/kafka/utils/ExitTest.scala | 121 ------------------ .../scala/other/kafka/StressTestLog.scala | 4 +- .../other/kafka/TestLinearWriteSpeed.scala | 3 +- .../scala/unit/kafka/KafkaConfigTest.scala | 3 +- .../unit/kafka/server/LogDirFailureTest.scala | 4 +- .../kafka/server/ReplicaManagerTest.scala | 9 +- .../kafka/server/ServerShutdownTest.scala | 4 +- .../kafka/tools/ConsoleProducerTest.scala | 6 +- .../kafka/tools/DumpLogSegmentsTest.scala | 6 +- .../kafka/tools/ClientMetricsCommandTest.java | 2 +- .../apache/kafka/tools/TopicCommandTest.java | 2 +- .../tools/other/ReplicationQuotasTestRig.java | 4 +- 30 files changed, 55 insertions(+), 260 deletions(-) delete mode 100644 core/src/main/scala/kafka/utils/Exit.scala delete mode 100644 core/src/test/scala/kafka/utils/ExitTest.scala diff --git a/core/src/main/scala/kafka/Kafka.scala b/core/src/main/scala/kafka/Kafka.scala index 6b5389fbe32..f32f23d3475 100755 --- a/core/src/main/scala/kafka/Kafka.scala +++ b/core/src/main/scala/kafka/Kafka.scala @@ -21,8 +21,8 @@ import java.util.Properties import joptsimple.OptionParser import kafka.server.{KafkaConfig, KafkaRaftServer, KafkaServer, Server} import kafka.utils.Implicits._ -import kafka.utils.{Exit, Logging} -import org.apache.kafka.common.utils.{Java, LoggingSignalHandler, OperatingSystem, Time, Utils} +import kafka.utils.Logging +import org.apache.kafka.common.utils.{Exit, Java, LoggingSignalHandler, OperatingSystem, Time, Utils} import org.apache.kafka.server.util.CommandLineUtils object Kafka extends Logging { @@ -99,7 +99,7 @@ object Kafka extends Logging { } // attach shutdown handler to catch terminating signals as well as normal termination - Exit.addShutdownHook("kafka-shutdown-hook", { + Exit.addShutdownHook("kafka-shutdown-hook", () => { try server.shutdown() catch { case _: Throwable => diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala b/core/src/main/scala/kafka/admin/AclCommand.scala index 4e99871a9dc..ccc2450594e 100644 --- a/core/src/main/scala/kafka/admin/AclCommand.scala +++ b/core/src/main/scala/kafka/admin/AclCommand.scala @@ -29,7 +29,7 @@ import org.apache.kafka.common.acl.AclPermissionType.{ALLOW, DENY} import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, Resource => JResource, ResourceType => JResourceType} import org.apache.kafka.common.security.JaasUtils import org.apache.kafka.common.security.auth.KafkaPrincipal -import org.apache.kafka.common.utils.{Utils, SecurityUtils => JSecurityUtils} +import org.apache.kafka.common.utils.{Exit, Utils, SecurityUtils => JSecurityUtils} import org.apache.kafka.security.authorizer.{AclEntry, AuthorizerUtils} import org.apache.kafka.server.authorizer.Authorizer import org.apache.kafka.server.config.ZkConfigs diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala index 04dce7a0255..8f84e309562 100644 --- a/core/src/main/scala/kafka/admin/ConfigCommand.scala +++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala @@ -23,7 +23,7 @@ import java.util.{Collections, Optional, Properties} import joptsimple._ import kafka.server.{DynamicBrokerConfig, DynamicConfig, KafkaConfig} import kafka.utils.Implicits._ -import kafka.utils.{Exit, Logging} +import kafka.utils.Logging import kafka.zk.{AdminZkClient, KafkaZkClient} import org.apache.kafka.clients.admin.{Admin, AlterClientQuotasOptions, AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeClusterOptions, DescribeConfigsOptions, ListTopicsOptions, ScramCredentialInfo, UserScramCredentialDeletion, UserScramCredentialUpsertion, Config => JConfig, ScramMechanism => PublicScramMechanism} import org.apache.kafka.common.config.{ConfigResource, TopicConfig} @@ -33,7 +33,7 @@ import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent} import org.apache.kafka.common.security.JaasUtils import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, ScramFormatter, ScramMechanism} -import org.apache.kafka.common.utils.{Sanitizer, Time, Utils} +import org.apache.kafka.common.utils.{Exit, Sanitizer, Time, Utils} import org.apache.kafka.server.config.{ConfigType, QuotaConfigs, ZkConfigs, ZooKeeperInternals} import org.apache.kafka.security.{PasswordEncoder, PasswordEncoderConfigs} import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils} diff --git a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala index b5c54596b21..33fec627486 100644 --- a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala +++ b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala @@ -19,11 +19,11 @@ package kafka.admin import joptsimple.{OptionSet, OptionSpec, OptionSpecBuilder} import kafka.server.KafkaConfig -import kafka.utils.{Exit, Logging, ToolsUtils} import kafka.utils.Implicits._ +import kafka.utils.{Logging, ToolsUtils} import kafka.zk.{ControllerZNode, KafkaZkClient, ZkData, ZkSecurityMigratorUtils} import org.apache.kafka.common.security.JaasUtils -import org.apache.kafka.common.utils.{Time, Utils} +import org.apache.kafka.common.utils.{Exit, Time, Utils} import org.apache.kafka.server.config.ZkConfigs import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils} import org.apache.zookeeper.AsyncCallback.{ChildrenCallback, StatCallback} diff --git a/core/src/main/scala/kafka/docker/KafkaDockerWrapper.scala b/core/src/main/scala/kafka/docker/KafkaDockerWrapper.scala index 8db498db127..361eb2954a2 100644 --- a/core/src/main/scala/kafka/docker/KafkaDockerWrapper.scala +++ b/core/src/main/scala/kafka/docker/KafkaDockerWrapper.scala @@ -18,10 +18,11 @@ package kafka.docker import kafka.Kafka import kafka.tools.StorageTool -import kafka.utils.{Exit, Logging} +import kafka.utils.Logging import net.sourceforge.argparse4j.ArgumentParsers import net.sourceforge.argparse4j.impl.Arguments.store import net.sourceforge.argparse4j.inf.Namespace +import org.apache.kafka.common.utils.Exit import java.nio.charset.StandardCharsets import java.nio.file.{Files, Path, Paths, StandardCopyOption, StandardOpenOption} @@ -41,7 +42,7 @@ object KafkaDockerWrapper extends Logging { case e: Throwable => val errMsg = s"error while preparing configs: ${e.getMessage}" System.err.println(errMsg) - Exit.exit(1, Some(errMsg)) + Exit.exit(1, errMsg) } val formatCmd = formatStorageCmd(finalConfigsPath, envVars) diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 7c04dee1f78..a254c74cb3f 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -27,7 +27,7 @@ import kafka.server._ import kafka.server.metadata.BrokerMetadataPublisher.info import kafka.utils._ import org.apache.kafka.common.{DirectoryId, KafkaException, TopicPartition, Uuid} -import org.apache.kafka.common.utils.{KafkaThread, Time, Utils} +import org.apache.kafka.common.utils.{Exit, KafkaThread, Time, Utils} import org.apache.kafka.common.errors.{InconsistentTopicIdException, KafkaStorageException, LogDirNotFoundException} import scala.jdk.CollectionConverters._ diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala index 6ef9f93180f..454552a08e4 100755 --- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala +++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala @@ -18,14 +18,14 @@ package kafka.server import kafka.network.RequestChannel -import kafka.utils.{Exit, Logging, Pool} +import kafka.utils.{Logging, Pool} import kafka.server.KafkaRequestHandler.{threadCurrentRequest, threadRequestChannel} import java.util.concurrent.{CountDownLatch, TimeUnit} import java.util.concurrent.atomic.AtomicInteger import com.yammer.metrics.core.Meter import org.apache.kafka.common.internals.FatalExitError -import org.apache.kafka.common.utils.{KafkaThread, Time} +import org.apache.kafka.common.utils.{Exit, KafkaThread, Time} import org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics import org.apache.kafka.server.metrics.KafkaMetricsGroup import org.apache.kafka.storage.log.metrics.BrokerTopicMetrics diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 11f2e4ff03c..a3834ba3580 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -51,7 +51,7 @@ import org.apache.kafka.common.replica._ import org.apache.kafka.common.requests.FetchRequest.PartitionData import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.requests._ -import org.apache.kafka.common.utils.Time +import org.apache.kafka.common.utils.{Exit, Time} import org.apache.kafka.common.{ElectionType, IsolationLevel, Node, TopicIdPartition, TopicPartition, Uuid} import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicsDelta} import org.apache.kafka.metadata.LeaderConstants.NO_LEADER diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala index 0f12a49b00a..43b37feb9d6 100644 --- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala @@ -24,12 +24,12 @@ import java.util.regex.Pattern import joptsimple.{OptionException, OptionParser, OptionSet, OptionSpec} import kafka.common.MessageReader import kafka.utils.Implicits._ -import kafka.utils.{Exit, Logging, ToolsUtils} +import kafka.utils.{Logging, ToolsUtils} import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback import org.apache.kafka.clients.producer.{KafkaProducer, Producer, ProducerConfig, ProducerRecord} import org.apache.kafka.common.KafkaException import org.apache.kafka.common.record.CompressionType -import org.apache.kafka.common.utils.Utils +import org.apache.kafka.common.utils.{Exit, Utils} import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils} import org.apache.kafka.tools.api.RecordReader diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index d7332c58270..bdb35f395ef 100755 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -30,7 +30,7 @@ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, Produce import org.apache.kafka.common.errors.{TimeoutException, WakeupException} import org.apache.kafka.common.record.RecordBatch import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer} -import org.apache.kafka.common.utils.{Time, Utils} +import org.apache.kafka.common.utils.{Exit, Time, Utils} import org.apache.kafka.common.{KafkaException, TopicPartition} import org.apache.kafka.server.metrics.KafkaMetricsGroup import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils} @@ -527,7 +527,7 @@ object MirrorMaker extends Logging { offsetCommitIntervalMs = options.valueOf(offsetCommitIntervalMsOpt).intValue() val numStreams = options.valueOf(numStreamsOpt).intValue() - Exit.addShutdownHook("MirrorMakerShutdownHook", cleanShutdown()) + Exit.addShutdownHook("MirrorMakerShutdownHook", () => cleanShutdown()) // create producer val producerProps = Utils.loadProps(options.valueOf(producerConfigOpt)) diff --git a/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala b/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala index 4ac918a63c1..cd840c88a69 100755 --- a/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala +++ b/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala @@ -23,11 +23,12 @@ import scala.util.matching.Regex import collection.mutable import java.util.Date import java.text.SimpleDateFormat -import kafka.utils.{CoreUtils, Exit, Logging} +import kafka.utils.{CoreUtils, Logging} import java.io.{BufferedOutputStream, OutputStream} import java.nio.charset.StandardCharsets import org.apache.kafka.common.internals.Topic +import org.apache.kafka.common.utils.Exit import org.apache.kafka.server.util.CommandLineUtils /** diff --git a/core/src/main/scala/kafka/tools/StorageTool.scala b/core/src/main/scala/kafka/tools/StorageTool.scala index 43eb6579765..91699d31df9 100644 --- a/core/src/main/scala/kafka/tools/StorageTool.scala +++ b/core/src/main/scala/kafka/tools/StorageTool.scala @@ -21,13 +21,13 @@ import kafka.server.KafkaConfig import java.io.PrintStream import java.nio.file.{Files, Paths} -import kafka.utils.{Exit, Logging} +import kafka.utils.Logging import net.sourceforge.argparse4j.ArgumentParsers import net.sourceforge.argparse4j.impl.Arguments.{append, store, storeTrue} import net.sourceforge.argparse4j.inf.{ArgumentParserException, Namespace} import net.sourceforge.argparse4j.internal.HelpScreenException import org.apache.kafka.common.Uuid -import org.apache.kafka.common.utils.Utils +import org.apache.kafka.common.utils.{Exit, Utils} import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils} import org.apache.kafka.metadata.storage.{Formatter, FormatterException} @@ -54,7 +54,7 @@ object StorageTool extends Logging { message = Some(e.getMessage) } message.foreach(System.err.println) - Exit.exit(exitCode, message) + Exit.exit(exitCode, message.orNull) } /** diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala b/core/src/main/scala/kafka/tools/TestRaftServer.scala index 06812f27953..1b6f2260002 100644 --- a/core/src/main/scala/kafka/tools/TestRaftServer.scala +++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala @@ -24,7 +24,7 @@ import joptsimple.{OptionException, OptionSpec} import kafka.network.{DataPlaneAcceptor, SocketServer} import kafka.raft.{KafkaRaftManager, RaftManager} import kafka.server.{KafkaConfig, KafkaRequestHandlerPool, SimpleApiVersionManager} -import kafka.utils.{CoreUtils, Exit, Logging} +import kafka.utils.{CoreUtils, Logging} import org.apache.kafka.common.errors.InvalidConfigurationException import org.apache.kafka.common.message.ApiMessageType.ListenerType import org.apache.kafka.common.metrics.Metrics @@ -33,7 +33,7 @@ import org.apache.kafka.common.metrics.stats.{Meter, Percentile, Percentiles} import org.apache.kafka.common.protocol.{ObjectSerializationCache, Writable} import org.apache.kafka.common.security.scram.internals.ScramMechanism import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache -import org.apache.kafka.common.utils.{Time, Utils} +import org.apache.kafka.common.utils.{Exit, Time, Utils} import org.apache.kafka.common.{TopicPartition, Uuid, protocol} import org.apache.kafka.raft.errors.NotLeaderException import org.apache.kafka.raft.{Batch, BatchReader, Endpoints, LeaderAndEpoch, RaftClient, QuorumConfig} @@ -479,7 +479,7 @@ object TestRaftServer extends Logging { val recordSize = opts.options.valueOf(opts.recordSizeOpt) val server = new TestRaftServer(config, Uuid.fromString(directoryIdAsString), throughput, recordSize) - Exit.addShutdownHook("raft-shutdown-hook", server.shutdown()) + Exit.addShutdownHook("raft-shutdown-hook", () => server.shutdown()) server.startup() server.awaitShutdown() diff --git a/core/src/main/scala/kafka/utils/Exit.scala b/core/src/main/scala/kafka/utils/Exit.scala deleted file mode 100644 index 84027100ec3..00000000000 --- a/core/src/main/scala/kafka/utils/Exit.scala +++ /dev/null @@ -1,83 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.utils - -import org.apache.kafka.common.utils.{Exit => JExit} - -/** - * Internal class that should be used instead of `System.exit()` and `Runtime.getRuntime().halt()` so that tests can - * easily change the behaviour. - */ -object Exit { - - def exit(statusCode: Int, message: Option[String] = None): Nothing = { - JExit.exit(statusCode, message.orNull) - throw new AssertionError("exit should not return, but it did.") - } - - def halt(statusCode: Int, message: Option[String] = None): Nothing = { - JExit.halt(statusCode, message.orNull) - throw new AssertionError("halt should not return, but it did.") - } - - def addShutdownHook(name: String, shutdownHook: => Unit): Unit = { - JExit.addShutdownHook(name, () => shutdownHook) - } - - /** - * For testing only, do not call in main code. - */ - def setExitProcedure(exitProcedure: (Int, Option[String]) => Nothing): Unit = - JExit.setExitProcedure(functionToProcedure(exitProcedure)) - - /** - * For testing only, do not call in main code. - */ - def setHaltProcedure(haltProcedure: (Int, Option[String]) => Nothing): Unit = - JExit.setHaltProcedure(functionToProcedure(haltProcedure)) - - /** - * For testing only, do not call in main code. - */ - def setShutdownHookAdder(shutdownHookAdder: (String, => Unit) => Unit): Unit = { - JExit.setShutdownHookAdder((name, runnable) => shutdownHookAdder(name, runnable.run())) - } - - /** - * For testing only, do not call in main code. - *

Clears the procedure set in [[setExitProcedure]], but does not restore system default behavior of exiting the JVM. - */ - def resetExitProcedure(): Unit = - JExit.resetExitProcedure() - - /** - * For testing only, do not call in main code. - *

Clears the procedure set in [[setHaltProcedure]], but does not restore system default behavior of exiting the JVM. - */ - def resetHaltProcedure(): Unit = - JExit.resetHaltProcedure() - - /** - * For testing only, do not call in main code. - */ - def resetShutdownHookAdder(): Unit = - JExit.resetShutdownHookAdder() - - private def functionToProcedure(procedure: (Int, Option[String]) => Nothing) = new JExit.Procedure { - def execute(statusCode: Int, message: String): Unit = procedure(statusCode, Option(message)) - } -} diff --git a/core/src/test/java/kafka/admin/AclCommandTest.java b/core/src/test/java/kafka/admin/AclCommandTest.java index a1909b558f7..cd40e1a0e6f 100644 --- a/core/src/test/java/kafka/admin/AclCommandTest.java +++ b/core/src/test/java/kafka/admin/AclCommandTest.java @@ -26,7 +26,6 @@ import kafka.test.annotation.ClusterTests; import kafka.test.annotation.Type; import kafka.test.junit.ClusterTestExtensions; import kafka.test.junit.ZkClusterInvocationContext; -import kafka.utils.Exit; import kafka.utils.TestUtils; import org.apache.kafka.common.acl.AccessControlEntry; @@ -38,6 +37,7 @@ import org.apache.kafka.common.resource.ResourcePattern; import org.apache.kafka.common.resource.ResourceType; import org.apache.kafka.common.security.auth.KafkaPrincipal; import org.apache.kafka.common.utils.AppInfoParser; +import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.common.utils.SecurityUtils; import org.apache.kafka.metadata.authorizer.StandardAuthorizer; diff --git a/core/src/test/java/kafka/admin/UserScramCredentialsCommandTest.java b/core/src/test/java/kafka/admin/UserScramCredentialsCommandTest.java index 771369e2419..342fb8cb35f 100644 --- a/core/src/test/java/kafka/admin/UserScramCredentialsCommandTest.java +++ b/core/src/test/java/kafka/admin/UserScramCredentialsCommandTest.java @@ -19,8 +19,8 @@ package kafka.admin; import kafka.test.ClusterInstance; import kafka.test.annotation.ClusterTest; import kafka.test.junit.ClusterTestExtensions; -import kafka.utils.Exit; +import org.apache.kafka.common.utils.Exit; import org.apache.kafka.test.NoRetryException; import org.apache.kafka.test.TestUtils; diff --git a/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala b/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala index 4308aebd713..1ef07a2c800 100644 --- a/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala +++ b/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala @@ -25,7 +25,7 @@ import java.nio.file.Files import java.text.MessageFormat import java.util.{Locale, Properties, UUID} -import kafka.utils.{CoreUtils, Exit, Logging} +import kafka.utils.{CoreUtils, Logging} import scala.jdk.CollectionConverters._ import org.apache.commons.lang.text.StrSubstitutor @@ -49,7 +49,7 @@ import org.apache.directory.server.kerberos.shared.keytab.{Keytab, KeytabEntry} import org.apache.directory.server.protocol.shared.transport.{TcpTransport, UdpTransport} import org.apache.directory.server.xdbm.Index import org.apache.directory.shared.kerberos.KerberosTime -import org.apache.kafka.common.utils.{Java, Utils} +import org.apache.kafka.common.utils.{Exit, Java, Utils} /** * Mini KDC based on Apache Directory Server that can be embedded in tests or used from command line as a standalone @@ -397,7 +397,7 @@ object MiniKdc { | """.stripMargin println(infoMessage) - Exit.addShutdownHook("minikdc-shutdown-hook", miniKdc.stop()) + Exit.addShutdownHook("minikdc-shutdown-hook", () => miniKdc.stop()) miniKdc } diff --git a/core/src/test/scala/kafka/tools/LogCompactionTester.scala b/core/src/test/scala/kafka/tools/LogCompactionTester.scala index 616e0d0b57d..2ea6c3aae6c 100755 --- a/core/src/test/scala/kafka/tools/LogCompactionTester.scala +++ b/core/src/test/scala/kafka/tools/LogCompactionTester.scala @@ -32,7 +32,7 @@ import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, KafkaConsume import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.serialization.{ByteArraySerializer, StringDeserializer} -import org.apache.kafka.common.utils.{AbstractIterator, Utils} +import org.apache.kafka.common.utils.{Exit, AbstractIterator, Utils} import org.apache.kafka.server.util.CommandLineUtils import scala.jdk.CollectionConverters._ diff --git a/core/src/test/scala/kafka/utils/ExitTest.scala b/core/src/test/scala/kafka/utils/ExitTest.scala deleted file mode 100644 index fcb2e9aaded..00000000000 --- a/core/src/test/scala/kafka/utils/ExitTest.scala +++ /dev/null @@ -1,121 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.utils - -import java.io.IOException - -import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows} -import org.junit.jupiter.api.Test - -class ExitTest { - @Test - def shouldHaltImmediately(): Unit = { - val array:Array[Any] = Array("a", "b") - def haltProcedure(exitStatus: Int, message: Option[String]) : Nothing = { - array(0) = exitStatus - array(1) = message - throw new IOException() - } - Exit.setHaltProcedure(haltProcedure) - val statusCode = 0 - val message = Some("message") - try { - assertThrows(classOf[IOException], () => Exit.halt(statusCode)) - assertEquals(statusCode, array(0)) - assertEquals(None, array(1)) - - assertThrows(classOf[IOException], () => Exit.halt(statusCode, message)) - assertEquals(statusCode, array(0)) - assertEquals(message, array(1)) - } finally { - Exit.resetHaltProcedure() - } - } - - @Test - def shouldExitImmediately(): Unit = { - val array:Array[Any] = Array("a", "b") - def exitProcedure(exitStatus: Int, message: Option[String]) : Nothing = { - array(0) = exitStatus - array(1) = message - throw new IOException() - } - Exit.setExitProcedure(exitProcedure) - val statusCode = 0 - val message = Some("message") - try { - assertThrows(classOf[IOException], () => Exit.exit(statusCode)) - assertEquals(statusCode, array(0)) - assertEquals(None, array(1)) - - assertThrows(classOf[IOException], () => Exit.exit(statusCode, message)) - assertEquals(statusCode, array(0)) - assertEquals(message, array(1)) - } finally { - Exit.resetExitProcedure() - } - } - - @Test - def shouldAddShutdownHookImmediately(): Unit = { - val name = "name" - val array:Array[Any] = Array("", 0) - // immediately invoke the shutdown hook to mutate the data when a hook is added - def shutdownHookAdder(name: String, shutdownHook: => Unit) : Unit = { - // mutate the first element - array(0) = array(0).toString + name - // invoke the shutdown hook (see below, it mutates the second element) - shutdownHook - } - Exit.setShutdownHookAdder(shutdownHookAdder) - def sideEffect(): Unit = { - // mutate the second element - array(1) = array(1).asInstanceOf[Int] + 1 - } - try { - Exit.addShutdownHook(name, sideEffect()) // by-name parameter, only invoked due to above shutdownHookAdder - assertEquals(1, array(1)) - assertEquals(name * array(1).asInstanceOf[Int], array(0).toString) - Exit.addShutdownHook(name, array(1) = array(1).asInstanceOf[Int] + 1) // by-name parameter, only invoked due to above shutdownHookAdder - assertEquals(2, array(1)) - assertEquals(name * array(1).asInstanceOf[Int], array(0).toString) - } finally { - Exit.resetShutdownHookAdder() - } - } - - @Test - def shouldNotInvokeShutdownHookImmediately(): Unit = { - val name = "name" - val array:Array[String] = Array(name) - - def sideEffect(): Unit = { - // mutate the first element - array(0) = array(0) + name - } - Exit.addShutdownHook(name, sideEffect()) // by-name parameter, not invoked - // make sure the first element wasn't mutated - assertEquals(name, array(0)) - Exit.addShutdownHook(name, sideEffect()) // by-name parameter, not invoked - // again make sure the first element wasn't mutated - assertEquals(name, array(0)) - Exit.addShutdownHook(name, array(0) = array(0) + name) // by-name parameter, not invoked - // again make sure the first element wasn't mutated - assertEquals(name, array(0)) - } -} diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala b/core/src/test/scala/other/kafka/StressTestLog.scala index b21a4190a58..da3a680ef89 100755 --- a/core/src/test/scala/other/kafka/StressTestLog.scala +++ b/core/src/test/scala/other/kafka/StressTestLog.scala @@ -25,7 +25,7 @@ import kafka.utils._ import org.apache.kafka.clients.consumer.OffsetOutOfRangeException import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.record.FileRecords -import org.apache.kafka.common.utils.Utils +import org.apache.kafka.common.utils.{Exit, Utils} import org.apache.kafka.coordinator.transaction.TransactionLogConfigs import org.apache.kafka.server.util.MockTime import org.apache.kafka.storage.internals.log.{FetchIsolation, LogConfig, LogDirFailureChannel, ProducerStateManagerConfig} @@ -63,7 +63,7 @@ object StressTestLog { val reader = new ReaderThread(log) reader.start() - Exit.addShutdownHook("stress-test-shutdown-hook", { + Exit.addShutdownHook("stress-test-shutdown-hook", () => { running.set(false) writer.join() reader.join() diff --git a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala index b973bac55a6..9dc8a16774b 100755 --- a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala +++ b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala @@ -25,11 +25,10 @@ import java.util.{Properties, Random} import joptsimple._ import kafka.log._ import kafka.server.BrokerTopicStats -import kafka.utils._ import org.apache.kafka.common.compress.{Compression, GzipCompression, Lz4Compression, ZstdCompression} import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.record._ -import org.apache.kafka.common.utils.{Time, Utils} +import org.apache.kafka.common.utils.{Exit, Time, Utils} import org.apache.kafka.coordinator.transaction.TransactionLogConfigs import org.apache.kafka.server.util.{KafkaScheduler, Scheduler} import org.apache.kafka.server.util.CommandLineUtils diff --git a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala index 3a1fc2e4bda..f3e750d4659 100644 --- a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala @@ -20,12 +20,13 @@ import java.nio.file.Files import java.util import java.util.Properties import kafka.server.KafkaConfig -import kafka.utils.{Exit, TestUtils} +import kafka.utils.TestUtils import kafka.utils.TestUtils.assertBadConfigContainingMessage import org.apache.kafka.common.config.SslConfigs import org.apache.kafka.common.config.internals.BrokerSecurityConfigs import org.apache.kafka.common.config.types.Password import org.apache.kafka.common.internals.FatalExitError +import org.apache.kafka.common.utils.Exit import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.raft.QuorumConfig import org.apache.kafka.server.config.{KRaftConfigs, ZkConfigs} diff --git a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala index a8b7a280def..5f885af596e 100644 --- a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala @@ -22,12 +22,12 @@ import java.util.concurrent.{ExecutionException, TimeUnit} import kafka.api.IntegrationTestHarness import kafka.controller.{OfflineReplica, PartitionAndReplica} import kafka.utils.TestUtils.{Checkpoint, LogDirFailureType, Roll, waitUntilTrue} -import kafka.utils.{CoreUtils, Exit, TestUtils} +import kafka.utils.{CoreUtils, TestUtils} import org.apache.kafka.clients.consumer.Consumer import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.{KafkaStorageException, NotLeaderOrFollowerException} -import org.apache.kafka.common.utils.Utils +import org.apache.kafka.common.utils.{Exit, Utils} import org.apache.kafka.metadata.BrokerState import org.apache.kafka.server.config.{ReplicationConfigs, ServerConfigs, ServerLogConfigs} import org.junit.jupiter.api.Assertions._ diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index b1fffa88518..51ca4b927ce 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -29,7 +29,7 @@ import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota} import kafka.server.checkpoints.{LazyOffsetCheckpoints, OffsetCheckpointFile} import kafka.server.epoch.util.MockBlockingSender import kafka.utils.TestUtils.waitUntilTrue -import kafka.utils.{Exit, Pool, TestUtils} +import kafka.utils.{Pool, TestUtils} import kafka.zk.KafkaZkClient import org.apache.kafka.clients.FetchSessionHandler import org.apache.kafka.common.{DirectoryId, IsolationLevel, Node, TopicIdPartition, TopicPartition, Uuid} @@ -52,7 +52,7 @@ import org.apache.kafka.common.requests.FetchRequest.PartitionData import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.requests._ import org.apache.kafka.common.security.auth.KafkaPrincipal -import org.apache.kafka.common.utils.{LogContext, Time, Utils} +import org.apache.kafka.common.utils.{Exit, LogContext, Time, Utils} import org.apache.kafka.coordinator.transaction.TransactionLogConfigs import org.apache.kafka.image._ import org.apache.kafka.metadata.LeaderConstants.NO_LEADER @@ -6698,10 +6698,7 @@ class ReplicaManagerTest { logManager.startup(Set.empty[String]) replicaManager.startup() - def haltProcedure(exitStatus: Int, message: Option[String]): Nothing = { - fail("Test failure, broker should not have halted") - } - Exit.setHaltProcedure(haltProcedure) + Exit.setHaltProcedure((_, _) => fail("Test failure, broker should not have halted")) // When logDirFailureChannel.maybeAddOfflineLogDir(logDirFiles.head.getAbsolutePath, "test failure", null) diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala index 802b206e14d..c10a945f8d2 100644 --- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala @@ -16,7 +16,7 @@ */ package kafka.server -import kafka.utils.{CoreUtils, Exit, TestUtils} +import kafka.utils.{CoreUtils, TestUtils} import java.io.{DataInputStream, File} import java.net.ServerSocket @@ -36,7 +36,7 @@ import org.apache.kafka.common.protocol.ApiKeys import org.apache.kafka.common.requests.LeaderAndIsrRequest import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.serialization.{IntegerDeserializer, IntegerSerializer, StringDeserializer, StringSerializer} -import org.apache.kafka.common.utils.Time +import org.apache.kafka.common.utils.{Exit, Time} import org.apache.kafka.metadata.BrokerState import org.apache.kafka.server.config.{KRaftConfigs, ServerLogConfigs, ZkConfigs} import org.junit.jupiter.api.{BeforeEach, TestInfo, Timeout} diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala index 0c232d19f31..82d031b79a3 100644 --- a/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala +++ b/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala @@ -18,10 +18,10 @@ package kafka.tools import kafka.common.MessageReader - import kafka.tools.ConsoleProducer.LineMessageReader -import kafka.utils.{Exit, TestUtils} +import kafka.utils.TestUtils import org.apache.kafka.clients.producer.{Producer, ProducerConfig, ProducerRecord} +import org.apache.kafka.common.utils.Exit import org.apache.kafka.tools.api.RecordReader import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue} import org.junit.jupiter.api.Test @@ -133,7 +133,7 @@ class ConsoleProducerTest { @Test def testInvalidConfigs(): Unit = { - Exit.setExitProcedure((_, message) => throw new IllegalArgumentException(message.orNull)) + Exit.setExitProcedure((_, message) => throw new IllegalArgumentException(message)) try assertThrows(classOf[IllegalArgumentException], () => new ConsoleProducer.ProducerConfig(invalidArgs)) finally Exit.resetExitProcedure() } diff --git a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala index 977f9dcb3cb..0655d5c965e 100644 --- a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala +++ b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala @@ -28,7 +28,7 @@ import kafka.log.{LogTestUtils, UnifiedLog} import kafka.raft.{KafkaMetadataLog, MetadataLogConfig} import kafka.server.{BrokerTopicStats, KafkaRaftServer} import kafka.tools.DumpLogSegments.{OffsetsMessageParser, TimeIndexDumpErrors} -import kafka.utils.{Exit, TestUtils, VerifiableProperties} +import kafka.utils.{TestUtils, VerifiableProperties} import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.{Assignment, Subscription} import org.apache.kafka.clients.consumer.internals.ConsumerProtocol import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} @@ -37,7 +37,7 @@ import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.metadata.{PartitionChangeRecord, RegisterBrokerRecord, TopicRecord} import org.apache.kafka.common.protocol.{ByteBufferAccessor, ObjectSerializationCache} import org.apache.kafka.common.record.{ControlRecordType, EndTransactionMarker, MemoryRecords, Record, RecordBatch, RecordVersion, SimpleRecord} -import org.apache.kafka.common.utils.Utils +import org.apache.kafka.common.utils.{Exit, Utils} import org.apache.kafka.coordinator.group.{CoordinatorRecord, CoordinatorRecordSerde} import org.apache.kafka.coordinator.group.generated.{ConsumerGroupMemberMetadataValue, ConsumerGroupMetadataKey, ConsumerGroupMetadataValue, GroupMetadataKey, GroupMetadataValue} import org.apache.kafka.coordinator.transaction.TransactionLogConfigs @@ -451,7 +451,7 @@ class DumpLogSegmentsTest { @Test def testDumpRemoteLogMetadataNoFilesFlag(): Unit = { - Exit.setExitProcedure((_, message) => throw new IllegalArgumentException(message.orNull)) + Exit.setExitProcedure((_, message) => throw new IllegalArgumentException(message)) try { val thrown = assertThrows(classOf[IllegalArgumentException], () => runDumpLogSegments(Array("--remote-log-metadata-decoder"))) assertTrue(thrown.getMessage.equals("Missing required argument \"[files]\"")) diff --git a/tools/src/test/java/org/apache/kafka/tools/ClientMetricsCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/ClientMetricsCommandTest.java index 83ec0a38075..804c70fd07c 100644 --- a/tools/src/test/java/org/apache/kafka/tools/ClientMetricsCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/ClientMetricsCommandTest.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.tools; -import kafka.utils.Exit; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AdminClientTestUtils; @@ -27,6 +26,7 @@ import org.apache.kafka.clients.admin.DescribeConfigsResult; import org.apache.kafka.clients.admin.ListClientMetricsResourcesResult; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.utils.Exit; import org.junit.jupiter.api.Test; diff --git a/tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java index d47e113c211..e6399a87072 100644 --- a/tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.tools; -import kafka.utils.Exit; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AdminClientTestUtils; @@ -33,6 +32,7 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartitionInfo; import org.apache.kafka.common.errors.ThrottlingQuotaExceededException; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.utils.Exit; import org.apache.kafka.server.common.AdminCommandFailedException; import org.apache.kafka.server.common.AdminOperationException; diff --git a/tools/src/test/java/org/apache/kafka/tools/other/ReplicationQuotasTestRig.java b/tools/src/test/java/org/apache/kafka/tools/other/ReplicationQuotasTestRig.java index 1fea9fe0a19..cd3cb496906 100644 --- a/tools/src/test/java/org/apache/kafka/tools/other/ReplicationQuotasTestRig.java +++ b/tools/src/test/java/org/apache/kafka/tools/other/ReplicationQuotasTestRig.java @@ -22,7 +22,6 @@ import kafka.server.KafkaServer; import kafka.server.QuorumTestHarness; import kafka.server.QuotaType; import kafka.utils.EmptyTestInfo; -import kafka.utils.Exit; import kafka.utils.TestUtils; import org.apache.kafka.clients.admin.Admin; @@ -35,6 +34,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartitionInfo; import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.tools.reassign.ReassignPartitionsCommand; @@ -123,7 +123,7 @@ public class ReplicationQuotasTestRig { experiments.forEach(def -> run(def, journal, displayChartsOnScreen)); if (!displayChartsOnScreen) - Exit.exit(0, Option.empty()); + Exit.exit(0); } static void run(ExperimentDef config, Journal journal, boolean displayChartsOnScreen) {