KAFKA-17270 Let test code use Exit.java rather than Exit.scala (#16821)

Exit.scala is a wrapper around Exit.java, and its main benefit is that it allows methods to throw exceptions by returning Nothing. This benefit is minimal, and since we are planning to phase out Scala code, it would be better to use Exit.java directly and remove Exit.scala.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
TengYao Chi 2024-08-12 07:42:28 +08:00 committed by GitHub
parent 126b25b51d
commit 38ebb8f48a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
30 changed files with 55 additions and 260 deletions

View File

@ -21,8 +21,8 @@ import java.util.Properties
import joptsimple.OptionParser
import kafka.server.{KafkaConfig, KafkaRaftServer, KafkaServer, Server}
import kafka.utils.Implicits._
import kafka.utils.{Exit, Logging}
import org.apache.kafka.common.utils.{Java, LoggingSignalHandler, OperatingSystem, Time, Utils}
import kafka.utils.Logging
import org.apache.kafka.common.utils.{Exit, Java, LoggingSignalHandler, OperatingSystem, Time, Utils}
import org.apache.kafka.server.util.CommandLineUtils
object Kafka extends Logging {
@ -99,7 +99,7 @@ object Kafka extends Logging {
}
// attach shutdown handler to catch terminating signals as well as normal termination
Exit.addShutdownHook("kafka-shutdown-hook", {
Exit.addShutdownHook("kafka-shutdown-hook", () => {
try server.shutdown()
catch {
case _: Throwable =>

View File

@ -29,7 +29,7 @@ import org.apache.kafka.common.acl.AclPermissionType.{ALLOW, DENY}
import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, Resource => JResource, ResourceType => JResourceType}
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.{Utils, SecurityUtils => JSecurityUtils}
import org.apache.kafka.common.utils.{Exit, Utils, SecurityUtils => JSecurityUtils}
import org.apache.kafka.security.authorizer.{AclEntry, AuthorizerUtils}
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.config.ZkConfigs

View File

@ -23,7 +23,7 @@ import java.util.{Collections, Optional, Properties}
import joptsimple._
import kafka.server.{DynamicBrokerConfig, DynamicConfig, KafkaConfig}
import kafka.utils.Implicits._
import kafka.utils.{Exit, Logging}
import kafka.utils.Logging
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.clients.admin.{Admin, AlterClientQuotasOptions, AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeClusterOptions, DescribeConfigsOptions, ListTopicsOptions, ScramCredentialInfo, UserScramCredentialDeletion, UserScramCredentialUpsertion, Config => JConfig, ScramMechanism => PublicScramMechanism}
import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
@ -33,7 +33,7 @@ import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent}
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, ScramFormatter, ScramMechanism}
import org.apache.kafka.common.utils.{Sanitizer, Time, Utils}
import org.apache.kafka.common.utils.{Exit, Sanitizer, Time, Utils}
import org.apache.kafka.server.config.{ConfigType, QuotaConfigs, ZkConfigs, ZooKeeperInternals}
import org.apache.kafka.security.{PasswordEncoder, PasswordEncoderConfigs}
import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}

View File

@ -19,11 +19,11 @@ package kafka.admin
import joptsimple.{OptionSet, OptionSpec, OptionSpecBuilder}
import kafka.server.KafkaConfig
import kafka.utils.{Exit, Logging, ToolsUtils}
import kafka.utils.Implicits._
import kafka.utils.{Logging, ToolsUtils}
import kafka.zk.{ControllerZNode, KafkaZkClient, ZkData, ZkSecurityMigratorUtils}
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.utils.{Exit, Time, Utils}
import org.apache.kafka.server.config.ZkConfigs
import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
import org.apache.zookeeper.AsyncCallback.{ChildrenCallback, StatCallback}

View File

@ -18,10 +18,11 @@ package kafka.docker
import kafka.Kafka
import kafka.tools.StorageTool
import kafka.utils.{Exit, Logging}
import kafka.utils.Logging
import net.sourceforge.argparse4j.ArgumentParsers
import net.sourceforge.argparse4j.impl.Arguments.store
import net.sourceforge.argparse4j.inf.Namespace
import org.apache.kafka.common.utils.Exit
import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Path, Paths, StandardCopyOption, StandardOpenOption}
@ -41,7 +42,7 @@ object KafkaDockerWrapper extends Logging {
case e: Throwable =>
val errMsg = s"error while preparing configs: ${e.getMessage}"
System.err.println(errMsg)
Exit.exit(1, Some(errMsg))
Exit.exit(1, errMsg)
}
val formatCmd = formatStorageCmd(finalConfigsPath, envVars)

View File

@ -27,7 +27,7 @@ import kafka.server._
import kafka.server.metadata.BrokerMetadataPublisher.info
import kafka.utils._
import org.apache.kafka.common.{DirectoryId, KafkaException, TopicPartition, Uuid}
import org.apache.kafka.common.utils.{KafkaThread, Time, Utils}
import org.apache.kafka.common.utils.{Exit, KafkaThread, Time, Utils}
import org.apache.kafka.common.errors.{InconsistentTopicIdException, KafkaStorageException, LogDirNotFoundException}
import scala.jdk.CollectionConverters._

View File

@ -18,14 +18,14 @@
package kafka.server
import kafka.network.RequestChannel
import kafka.utils.{Exit, Logging, Pool}
import kafka.utils.{Logging, Pool}
import kafka.server.KafkaRequestHandler.{threadCurrentRequest, threadRequestChannel}
import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.concurrent.atomic.AtomicInteger
import com.yammer.metrics.core.Meter
import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.common.utils.{KafkaThread, Time}
import org.apache.kafka.common.utils.{Exit, KafkaThread, Time}
import org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.storage.log.metrics.BrokerTopicMetrics

View File

@ -51,7 +51,7 @@ import org.apache.kafka.common.replica._
import org.apache.kafka.common.requests.FetchRequest.PartitionData
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests._
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.utils.{Exit, Time}
import org.apache.kafka.common.{ElectionType, IsolationLevel, Node, TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicsDelta}
import org.apache.kafka.metadata.LeaderConstants.NO_LEADER

View File

@ -24,12 +24,12 @@ import java.util.regex.Pattern
import joptsimple.{OptionException, OptionParser, OptionSet, OptionSpec}
import kafka.common.MessageReader
import kafka.utils.Implicits._
import kafka.utils.{Exit, Logging, ToolsUtils}
import kafka.utils.{Logging, ToolsUtils}
import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
import org.apache.kafka.clients.producer.{KafkaProducer, Producer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.KafkaException
import org.apache.kafka.common.record.CompressionType
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.utils.{Exit, Utils}
import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
import org.apache.kafka.tools.api.RecordReader

View File

@ -30,7 +30,7 @@ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, Produce
import org.apache.kafka.common.errors.{TimeoutException, WakeupException}
import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer}
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.utils.{Exit, Time, Utils}
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
@ -527,7 +527,7 @@ object MirrorMaker extends Logging {
offsetCommitIntervalMs = options.valueOf(offsetCommitIntervalMsOpt).intValue()
val numStreams = options.valueOf(numStreamsOpt).intValue()
Exit.addShutdownHook("MirrorMakerShutdownHook", cleanShutdown())
Exit.addShutdownHook("MirrorMakerShutdownHook", () => cleanShutdown())
// create producer
val producerProps = Utils.loadProps(options.valueOf(producerConfigOpt))

View File

@ -23,11 +23,12 @@ import scala.util.matching.Regex
import collection.mutable
import java.util.Date
import java.text.SimpleDateFormat
import kafka.utils.{CoreUtils, Exit, Logging}
import kafka.utils.{CoreUtils, Logging}
import java.io.{BufferedOutputStream, OutputStream}
import java.nio.charset.StandardCharsets
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.utils.Exit
import org.apache.kafka.server.util.CommandLineUtils
/**

View File

@ -21,13 +21,13 @@ import kafka.server.KafkaConfig
import java.io.PrintStream
import java.nio.file.{Files, Paths}
import kafka.utils.{Exit, Logging}
import kafka.utils.Logging
import net.sourceforge.argparse4j.ArgumentParsers
import net.sourceforge.argparse4j.impl.Arguments.{append, store, storeTrue}
import net.sourceforge.argparse4j.inf.{ArgumentParserException, Namespace}
import net.sourceforge.argparse4j.internal.HelpScreenException
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.utils.{Exit, Utils}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils}
import org.apache.kafka.metadata.storage.{Formatter, FormatterException}
@ -54,7 +54,7 @@ object StorageTool extends Logging {
message = Some(e.getMessage)
}
message.foreach(System.err.println)
Exit.exit(exitCode, message)
Exit.exit(exitCode, message.orNull)
}
/**

View File

@ -24,7 +24,7 @@ import joptsimple.{OptionException, OptionSpec}
import kafka.network.{DataPlaneAcceptor, SocketServer}
import kafka.raft.{KafkaRaftManager, RaftManager}
import kafka.server.{KafkaConfig, KafkaRequestHandlerPool, SimpleApiVersionManager}
import kafka.utils.{CoreUtils, Exit, Logging}
import kafka.utils.{CoreUtils, Logging}
import org.apache.kafka.common.errors.InvalidConfigurationException
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.metrics.Metrics
@ -33,7 +33,7 @@ import org.apache.kafka.common.metrics.stats.{Meter, Percentile, Percentiles}
import org.apache.kafka.common.protocol.{ObjectSerializationCache, Writable}
import org.apache.kafka.common.security.scram.internals.ScramMechanism
import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.utils.{Exit, Time, Utils}
import org.apache.kafka.common.{TopicPartition, Uuid, protocol}
import org.apache.kafka.raft.errors.NotLeaderException
import org.apache.kafka.raft.{Batch, BatchReader, Endpoints, LeaderAndEpoch, RaftClient, QuorumConfig}
@ -479,7 +479,7 @@ object TestRaftServer extends Logging {
val recordSize = opts.options.valueOf(opts.recordSizeOpt)
val server = new TestRaftServer(config, Uuid.fromString(directoryIdAsString), throughput, recordSize)
Exit.addShutdownHook("raft-shutdown-hook", server.shutdown())
Exit.addShutdownHook("raft-shutdown-hook", () => server.shutdown())
server.startup()
server.awaitShutdown()

View File

@ -1,83 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.utils
import org.apache.kafka.common.utils.{Exit => JExit}
/**
* Internal class that should be used instead of `System.exit()` and `Runtime.getRuntime().halt()` so that tests can
* easily change the behaviour.
*/
object Exit {
def exit(statusCode: Int, message: Option[String] = None): Nothing = {
JExit.exit(statusCode, message.orNull)
throw new AssertionError("exit should not return, but it did.")
}
def halt(statusCode: Int, message: Option[String] = None): Nothing = {
JExit.halt(statusCode, message.orNull)
throw new AssertionError("halt should not return, but it did.")
}
def addShutdownHook(name: String, shutdownHook: => Unit): Unit = {
JExit.addShutdownHook(name, () => shutdownHook)
}
/**
* For testing only, do not call in main code.
*/
def setExitProcedure(exitProcedure: (Int, Option[String]) => Nothing): Unit =
JExit.setExitProcedure(functionToProcedure(exitProcedure))
/**
* For testing only, do not call in main code.
*/
def setHaltProcedure(haltProcedure: (Int, Option[String]) => Nothing): Unit =
JExit.setHaltProcedure(functionToProcedure(haltProcedure))
/**
* For testing only, do not call in main code.
*/
def setShutdownHookAdder(shutdownHookAdder: (String, => Unit) => Unit): Unit = {
JExit.setShutdownHookAdder((name, runnable) => shutdownHookAdder(name, runnable.run()))
}
/**
* For testing only, do not call in main code.
* <p>Clears the procedure set in [[setExitProcedure]], but does not restore system default behavior of exiting the JVM.
*/
def resetExitProcedure(): Unit =
JExit.resetExitProcedure()
/**
* For testing only, do not call in main code.
* <p>Clears the procedure set in [[setHaltProcedure]], but does not restore system default behavior of exiting the JVM.
*/
def resetHaltProcedure(): Unit =
JExit.resetHaltProcedure()
/**
* For testing only, do not call in main code.
*/
def resetShutdownHookAdder(): Unit =
JExit.resetShutdownHookAdder()
private def functionToProcedure(procedure: (Int, Option[String]) => Nothing) = new JExit.Procedure {
def execute(statusCode: Int, message: String): Unit = procedure(statusCode, Option(message))
}
}

View File

@ -26,7 +26,6 @@ import kafka.test.annotation.ClusterTests;
import kafka.test.annotation.Type;
import kafka.test.junit.ClusterTestExtensions;
import kafka.test.junit.ZkClusterInvocationContext;
import kafka.utils.Exit;
import kafka.utils.TestUtils;
import org.apache.kafka.common.acl.AccessControlEntry;
@ -38,6 +37,7 @@ import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.common.utils.SecurityUtils;
import org.apache.kafka.metadata.authorizer.StandardAuthorizer;

View File

@ -19,8 +19,8 @@ package kafka.admin;
import kafka.test.ClusterInstance;
import kafka.test.annotation.ClusterTest;
import kafka.test.junit.ClusterTestExtensions;
import kafka.utils.Exit;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.test.NoRetryException;
import org.apache.kafka.test.TestUtils;

View File

@ -25,7 +25,7 @@ import java.nio.file.Files
import java.text.MessageFormat
import java.util.{Locale, Properties, UUID}
import kafka.utils.{CoreUtils, Exit, Logging}
import kafka.utils.{CoreUtils, Logging}
import scala.jdk.CollectionConverters._
import org.apache.commons.lang.text.StrSubstitutor
@ -49,7 +49,7 @@ import org.apache.directory.server.kerberos.shared.keytab.{Keytab, KeytabEntry}
import org.apache.directory.server.protocol.shared.transport.{TcpTransport, UdpTransport}
import org.apache.directory.server.xdbm.Index
import org.apache.directory.shared.kerberos.KerberosTime
import org.apache.kafka.common.utils.{Java, Utils}
import org.apache.kafka.common.utils.{Exit, Java, Utils}
/**
* Mini KDC based on Apache Directory Server that can be embedded in tests or used from command line as a standalone
@ -397,7 +397,7 @@ object MiniKdc {
|
""".stripMargin
println(infoMessage)
Exit.addShutdownHook("minikdc-shutdown-hook", miniKdc.stop())
Exit.addShutdownHook("minikdc-shutdown-hook", () => miniKdc.stop())
miniKdc
}

View File

@ -32,7 +32,7 @@ import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, KafkaConsume
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringDeserializer}
import org.apache.kafka.common.utils.{AbstractIterator, Utils}
import org.apache.kafka.common.utils.{Exit, AbstractIterator, Utils}
import org.apache.kafka.server.util.CommandLineUtils
import scala.jdk.CollectionConverters._

View File

@ -1,121 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.utils
import java.io.IOException
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
import org.junit.jupiter.api.Test
class ExitTest {
@Test
def shouldHaltImmediately(): Unit = {
val array:Array[Any] = Array("a", "b")
def haltProcedure(exitStatus: Int, message: Option[String]) : Nothing = {
array(0) = exitStatus
array(1) = message
throw new IOException()
}
Exit.setHaltProcedure(haltProcedure)
val statusCode = 0
val message = Some("message")
try {
assertThrows(classOf[IOException], () => Exit.halt(statusCode))
assertEquals(statusCode, array(0))
assertEquals(None, array(1))
assertThrows(classOf[IOException], () => Exit.halt(statusCode, message))
assertEquals(statusCode, array(0))
assertEquals(message, array(1))
} finally {
Exit.resetHaltProcedure()
}
}
@Test
def shouldExitImmediately(): Unit = {
val array:Array[Any] = Array("a", "b")
def exitProcedure(exitStatus: Int, message: Option[String]) : Nothing = {
array(0) = exitStatus
array(1) = message
throw new IOException()
}
Exit.setExitProcedure(exitProcedure)
val statusCode = 0
val message = Some("message")
try {
assertThrows(classOf[IOException], () => Exit.exit(statusCode))
assertEquals(statusCode, array(0))
assertEquals(None, array(1))
assertThrows(classOf[IOException], () => Exit.exit(statusCode, message))
assertEquals(statusCode, array(0))
assertEquals(message, array(1))
} finally {
Exit.resetExitProcedure()
}
}
@Test
def shouldAddShutdownHookImmediately(): Unit = {
val name = "name"
val array:Array[Any] = Array("", 0)
// immediately invoke the shutdown hook to mutate the data when a hook is added
def shutdownHookAdder(name: String, shutdownHook: => Unit) : Unit = {
// mutate the first element
array(0) = array(0).toString + name
// invoke the shutdown hook (see below, it mutates the second element)
shutdownHook
}
Exit.setShutdownHookAdder(shutdownHookAdder)
def sideEffect(): Unit = {
// mutate the second element
array(1) = array(1).asInstanceOf[Int] + 1
}
try {
Exit.addShutdownHook(name, sideEffect()) // by-name parameter, only invoked due to above shutdownHookAdder
assertEquals(1, array(1))
assertEquals(name * array(1).asInstanceOf[Int], array(0).toString)
Exit.addShutdownHook(name, array(1) = array(1).asInstanceOf[Int] + 1) // by-name parameter, only invoked due to above shutdownHookAdder
assertEquals(2, array(1))
assertEquals(name * array(1).asInstanceOf[Int], array(0).toString)
} finally {
Exit.resetShutdownHookAdder()
}
}
@Test
def shouldNotInvokeShutdownHookImmediately(): Unit = {
val name = "name"
val array:Array[String] = Array(name)
def sideEffect(): Unit = {
// mutate the first element
array(0) = array(0) + name
}
Exit.addShutdownHook(name, sideEffect()) // by-name parameter, not invoked
// make sure the first element wasn't mutated
assertEquals(name, array(0))
Exit.addShutdownHook(name, sideEffect()) // by-name parameter, not invoked
// again make sure the first element wasn't mutated
assertEquals(name, array(0))
Exit.addShutdownHook(name, array(0) = array(0) + name) // by-name parameter, not invoked
// again make sure the first element wasn't mutated
assertEquals(name, array(0))
}
}

View File

@ -25,7 +25,7 @@ import kafka.utils._
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.record.FileRecords
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.utils.{Exit, Utils}
import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.log.{FetchIsolation, LogConfig, LogDirFailureChannel, ProducerStateManagerConfig}
@ -63,7 +63,7 @@ object StressTestLog {
val reader = new ReaderThread(log)
reader.start()
Exit.addShutdownHook("stress-test-shutdown-hook", {
Exit.addShutdownHook("stress-test-shutdown-hook", () => {
running.set(false)
writer.join()
reader.join()

View File

@ -25,11 +25,10 @@ import java.util.{Properties, Random}
import joptsimple._
import kafka.log._
import kafka.server.BrokerTopicStats
import kafka.utils._
import org.apache.kafka.common.compress.{Compression, GzipCompression, Lz4Compression, ZstdCompression}
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.utils.{Exit, Time, Utils}
import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
import org.apache.kafka.server.util.{KafkaScheduler, Scheduler}
import org.apache.kafka.server.util.CommandLineUtils

View File

@ -20,12 +20,13 @@ import java.nio.file.Files
import java.util
import java.util.Properties
import kafka.server.KafkaConfig
import kafka.utils.{Exit, TestUtils}
import kafka.utils.TestUtils
import kafka.utils.TestUtils.assertBadConfigContainingMessage
import org.apache.kafka.common.config.SslConfigs
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.common.utils.Exit
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.server.config.{KRaftConfigs, ZkConfigs}

View File

@ -22,12 +22,12 @@ import java.util.concurrent.{ExecutionException, TimeUnit}
import kafka.api.IntegrationTestHarness
import kafka.controller.{OfflineReplica, PartitionAndReplica}
import kafka.utils.TestUtils.{Checkpoint, LogDirFailureType, Roll, waitUntilTrue}
import kafka.utils.{CoreUtils, Exit, TestUtils}
import kafka.utils.{CoreUtils, TestUtils}
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.{KafkaStorageException, NotLeaderOrFollowerException}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.utils.{Exit, Utils}
import org.apache.kafka.metadata.BrokerState
import org.apache.kafka.server.config.{ReplicationConfigs, ServerConfigs, ServerLogConfigs}
import org.junit.jupiter.api.Assertions._

View File

@ -29,7 +29,7 @@ import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
import kafka.server.checkpoints.{LazyOffsetCheckpoints, OffsetCheckpointFile}
import kafka.server.epoch.util.MockBlockingSender
import kafka.utils.TestUtils.waitUntilTrue
import kafka.utils.{Exit, Pool, TestUtils}
import kafka.utils.{Pool, TestUtils}
import kafka.zk.KafkaZkClient
import org.apache.kafka.clients.FetchSessionHandler
import org.apache.kafka.common.{DirectoryId, IsolationLevel, Node, TopicIdPartition, TopicPartition, Uuid}
@ -52,7 +52,7 @@ import org.apache.kafka.common.requests.FetchRequest.PartitionData
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests._
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.{LogContext, Time, Utils}
import org.apache.kafka.common.utils.{Exit, LogContext, Time, Utils}
import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
import org.apache.kafka.image._
import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
@ -6698,10 +6698,7 @@ class ReplicaManagerTest {
logManager.startup(Set.empty[String])
replicaManager.startup()
def haltProcedure(exitStatus: Int, message: Option[String]): Nothing = {
fail("Test failure, broker should not have halted")
}
Exit.setHaltProcedure(haltProcedure)
Exit.setHaltProcedure((_, _) => fail("Test failure, broker should not have halted"))
// When
logDirFailureChannel.maybeAddOfflineLogDir(logDirFiles.head.getAbsolutePath, "test failure", null)

View File

@ -16,7 +16,7 @@
*/
package kafka.server
import kafka.utils.{CoreUtils, Exit, TestUtils}
import kafka.utils.{CoreUtils, TestUtils}
import java.io.{DataInputStream, File}
import java.net.ServerSocket
@ -36,7 +36,7 @@ import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.requests.LeaderAndIsrRequest
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.serialization.{IntegerDeserializer, IntegerSerializer, StringDeserializer, StringSerializer}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.utils.{Exit, Time}
import org.apache.kafka.metadata.BrokerState
import org.apache.kafka.server.config.{KRaftConfigs, ServerLogConfigs, ZkConfigs}
import org.junit.jupiter.api.{BeforeEach, TestInfo, Timeout}

View File

@ -18,10 +18,10 @@
package kafka.tools
import kafka.common.MessageReader
import kafka.tools.ConsoleProducer.LineMessageReader
import kafka.utils.{Exit, TestUtils}
import kafka.utils.TestUtils
import org.apache.kafka.clients.producer.{Producer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.utils.Exit
import org.apache.kafka.tools.api.RecordReader
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue}
import org.junit.jupiter.api.Test
@ -133,7 +133,7 @@ class ConsoleProducerTest {
@Test
def testInvalidConfigs(): Unit = {
Exit.setExitProcedure((_, message) => throw new IllegalArgumentException(message.orNull))
Exit.setExitProcedure((_, message) => throw new IllegalArgumentException(message))
try assertThrows(classOf[IllegalArgumentException], () => new ConsoleProducer.ProducerConfig(invalidArgs))
finally Exit.resetExitProcedure()
}

View File

@ -28,7 +28,7 @@ import kafka.log.{LogTestUtils, UnifiedLog}
import kafka.raft.{KafkaMetadataLog, MetadataLogConfig}
import kafka.server.{BrokerTopicStats, KafkaRaftServer}
import kafka.tools.DumpLogSegments.{OffsetsMessageParser, TimeIndexDumpErrors}
import kafka.utils.{Exit, TestUtils, VerifiableProperties}
import kafka.utils.{TestUtils, VerifiableProperties}
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.{Assignment, Subscription}
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
@ -37,7 +37,7 @@ import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.metadata.{PartitionChangeRecord, RegisterBrokerRecord, TopicRecord}
import org.apache.kafka.common.protocol.{ByteBufferAccessor, ObjectSerializationCache}
import org.apache.kafka.common.record.{ControlRecordType, EndTransactionMarker, MemoryRecords, Record, RecordBatch, RecordVersion, SimpleRecord}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.utils.{Exit, Utils}
import org.apache.kafka.coordinator.group.{CoordinatorRecord, CoordinatorRecordSerde}
import org.apache.kafka.coordinator.group.generated.{ConsumerGroupMemberMetadataValue, ConsumerGroupMetadataKey, ConsumerGroupMetadataValue, GroupMetadataKey, GroupMetadataValue}
import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
@ -451,7 +451,7 @@ class DumpLogSegmentsTest {
@Test
def testDumpRemoteLogMetadataNoFilesFlag(): Unit = {
Exit.setExitProcedure((_, message) => throw new IllegalArgumentException(message.orNull))
Exit.setExitProcedure((_, message) => throw new IllegalArgumentException(message))
try {
val thrown = assertThrows(classOf[IllegalArgumentException], () => runDumpLogSegments(Array("--remote-log-metadata-decoder")))
assertTrue(thrown.getMessage.equals("Missing required argument \"[files]\""))

View File

@ -16,7 +16,6 @@
*/
package org.apache.kafka.tools;
import kafka.utils.Exit;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientTestUtils;
@ -27,6 +26,7 @@ import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.clients.admin.ListClientMetricsResourcesResult;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.Exit;
import org.junit.jupiter.api.Test;

View File

@ -16,7 +16,6 @@
*/
package org.apache.kafka.tools;
import kafka.utils.Exit;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientTestUtils;
@ -33,6 +32,7 @@ import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.errors.ThrottlingQuotaExceededException;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.server.common.AdminCommandFailedException;
import org.apache.kafka.server.common.AdminOperationException;

View File

@ -22,7 +22,6 @@ import kafka.server.KafkaServer;
import kafka.server.QuorumTestHarness;
import kafka.server.QuotaType;
import kafka.utils.EmptyTestInfo;
import kafka.utils.Exit;
import kafka.utils.TestUtils;
import org.apache.kafka.clients.admin.Admin;
@ -35,6 +34,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.tools.reassign.ReassignPartitionsCommand;
@ -123,7 +123,7 @@ public class ReplicationQuotasTestRig {
experiments.forEach(def -> run(def, journal, displayChartsOnScreen));
if (!displayChartsOnScreen)
Exit.exit(0, Option.empty());
Exit.exit(0);
}
static void run(ExperimentDef config, Journal journal, boolean displayChartsOnScreen) {