KAFKA-4039; Fix deadlock during shutdown due to log truncation not allowed

Author: Maysam Yabandeh <myabandeh@dropbox.com>
Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Jun Rao <junrao@gmail.com>

Closes #2474 from ijuma/kafka-4039-deadlock-during-shutdown
This commit is contained in:
Maysam Yabandeh 2017-02-02 22:22:31 +00:00 committed by Ismael Juma
parent 76550dd895
commit cb674e5487
64 changed files with 602 additions and 231 deletions

View File

@ -42,6 +42,7 @@
<allow pkg="org.apache.kafka.common" exact-match="true" />
<allow pkg="org.apache.kafka.common.security" />
<allow pkg="org.apache.kafka.common.utils" />
<allow pkg="org.apache.kafka.common.errors" exact-match="true" />
<subpackage name="common">
<disallow pkg="org.apache.kafka.clients" />

View File

@ -0,0 +1,47 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.common.internals;
import org.apache.kafka.common.utils.Exit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* An error that indicates the need to exit the JVM process. This should only be used by the server or command-line
* tools. Clients should never shutdown the JVM process.
*
* This exception is expected to be caught at the highest level of the thread so that no shared lock is held by
* the thread when it calls {@link Exit#exit(int)}.
*/
public class FatalExitError extends Error {
private static final Logger log = LoggerFactory.getLogger(FatalExitError.class);
private final static long serialVersionUID = 1L;
private final int statusCode;
public FatalExitError(int statusCode) {
if (statusCode == 0)
throw new IllegalArgumentException("statusCode must not be 0");
this.statusCode = statusCode;
}
public FatalExitError() {
this(1);
}
public int statusCode() {
return statusCode;
}
}

View File

@ -0,0 +1,79 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.utils;
/**
* Internal class that should be used instead of `Exit.exit()` and `Runtime.getRuntime().halt()` so that tests can
* easily change the behaviour.
*/
public class Exit {
public interface Procedure {
void execute(int statusCode, String message);
}
private static final Procedure DEFAULT_HALT_PROCEDURE = new Procedure() {
@Override
public void execute(int statusCode, String message) {
Runtime.getRuntime().halt(statusCode);
}
};
private static final Procedure DEFAULT_EXIT_PROCEDURE = new Procedure() {
@Override
public void execute(int statusCode, String message) {
System.exit(statusCode);
}
};
private volatile static Procedure exitProcedure = DEFAULT_EXIT_PROCEDURE;
private volatile static Procedure haltProcedure = DEFAULT_HALT_PROCEDURE;
public static void exit(int statusCode) {
exit(statusCode, null);
}
public static void exit(int statusCode, String message) {
exitProcedure.execute(statusCode, message);
}
public static void halt(int statusCode) {
halt(statusCode, null);
}
public static void halt(int statusCode, String message) {
haltProcedure.execute(statusCode, message);
}
public static void setExitProcedure(Procedure procedure) {
exitProcedure = procedure;
}
public static void setHaltProcedure(Procedure procedure) {
haltProcedure = procedure;
}
public static void resetExitProcedure() {
exitProcedure = DEFAULT_EXIT_PROCEDURE;
}
public static void resetHaltProcedure() {
haltProcedure = DEFAULT_HALT_PROCEDURE;
}
}

View File

@ -546,7 +546,7 @@ public class Utils {
*/
public static void croak(String message) {
System.err.println(message);
System.exit(1);
Exit.exit(1);
}
/**

View File

@ -17,6 +17,7 @@
package org.apache.kafka.connect.cli;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.runtime.Connect;
@ -52,7 +53,7 @@ public class ConnectDistributed {
public static void main(String[] args) throws Exception {
if (args.length < 1) {
log.info("Usage: ConnectDistributed worker.properties");
System.exit(1);
Exit.exit(1);
}
String workerPropsFile = args[0];

View File

@ -17,6 +17,7 @@
package org.apache.kafka.connect.cli;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.runtime.Connect;
@ -57,7 +58,7 @@ public class ConnectStandalone {
if (args.length < 2) {
log.info("Usage: ConnectStandalone worker.properties connector1.properties [connector2.properties ...]");
System.exit(1);
Exit.exit(1);
}
String workerPropsFile = args[0];

View File

@ -20,6 +20,7 @@ package org.apache.kafka.connect.runtime.distributed;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.connector.Connector;
@ -205,7 +206,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
log.info("Herder stopped");
} catch (Throwable t) {
log.error("Uncaught exception in herder work thread, exiting: ", t);
System.exit(1);
Exit.exit(1);
}
}

View File

@ -21,7 +21,7 @@ import java.util.Properties
import joptsimple.OptionParser
import kafka.server.{KafkaServer, KafkaServerStartable}
import kafka.utils.{CommandLineUtils, Logging}
import kafka.utils.{CommandLineUtils, Exit, Logging}
import org.apache.kafka.common.utils.Utils
import scala.collection.JavaConverters._
@ -58,20 +58,18 @@ object Kafka extends Logging {
val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)
// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread() {
override def run() = {
kafkaServerStartable.shutdown
}
Runtime.getRuntime().addShutdownHook(new Thread("kafka-shutdown-hook") {
override def run(): Unit = kafkaServerStartable.shutdown()
})
kafkaServerStartable.startup
kafkaServerStartable.awaitShutdown
kafkaServerStartable.startup()
kafkaServerStartable.awaitShutdown()
}
catch {
case e: Throwable =>
fatal(e)
System.exit(1)
Exit.exit(1)
}
System.exit(0)
Exit.exit(0)
}
}

View File

@ -54,7 +54,7 @@ object AclCommand {
case e: Throwable =>
println(s"Error while executing ACL command: ${e.getMessage}")
println(Utils.stackTrace(e))
System.exit(-1)
Exit.exit(1)
}
}

View File

@ -73,7 +73,7 @@ object TopicCommand extends Logging {
exitCode = 1
} finally {
zkUtils.close()
System.exit(exitCode)
Exit.exit(exitCode)
}
}
@ -360,7 +360,7 @@ object TopicCommand extends Logging {
println("Are you sure you want to continue? [y/n]")
if (!Console.readLine().equalsIgnoreCase("y")) {
println("Ending your session")
System.exit(0)
Exit.exit(0)
}
}

View File

@ -144,7 +144,7 @@ class Partition(val topic: String,
} catch {
case e: IOException =>
fatal(s"Error deleting the log for partition $topicPartition", e)
Runtime.getRuntime.halt(1)
Exit.halt(1)
}
}
}

View File

@ -19,7 +19,6 @@ package kafka.consumer
import kafka.server.{AbstractFetcherManager, AbstractFetcherThread, BrokerAndInitialOffset}
import kafka.cluster.{BrokerEndPoint, Cluster}
import org.apache.kafka.common.protocol.SecurityProtocol
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.utils.Time
@ -98,7 +97,7 @@ class ConsumerFetcherManager(private val consumerIdString: String,
topicPartition -> BrokerAndInitialOffset(broker, partitionMap(topicPartition).getFetchOffset())}
)
} catch {
case t: Throwable => {
case t: Throwable =>
if (!isRunning.get())
throw t /* If this thread is stopped, propagate this exception to kill the thread. */
else {
@ -108,7 +107,6 @@ class ConsumerFetcherManager(private val consumerIdString: String,
lock.unlock()
}
}
}
shutdownIdleFetcherThreads()
Thread.sleep(config.refreshLeaderBackoffMs)

View File

@ -36,7 +36,7 @@ import java.util.concurrent.atomic.AtomicLong
import com.yammer.metrics.core.Gauge
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.internals.PartitionStates
import org.apache.kafka.common.internals.{FatalExitError, PartitionStates}
import org.apache.kafka.common.record.MemoryRecords
/**
@ -177,6 +177,7 @@ abstract class AbstractFetcherThread(name: String,
error("Current offset %d for partition [%s,%d] out of range; reset offset to %d"
.format(currentPartitionFetchState.offset, topic, partitionId, newOffset))
} catch {
case e: FatalExitError => throw e
case e: Throwable =>
error("Error getting offset for partition [%s,%d] to broker %d".format(topic, partitionId, sourceBroker.id), e)
updatePartitionsWithError(topicPartition)

View File

@ -34,8 +34,9 @@ import kafka.network._
import kafka.network.RequestChannel.{Response, Session}
import kafka.security.auth
import kafka.security.auth.{Authorizer, ClusterAction, Create, Delete, Describe, Group, Operation, Read, Resource, Write}
import kafka.utils.{Logging, ZKGroupTopicDirs, ZkUtils}
import kafka.utils.{Exit, Logging, ZKGroupTopicDirs, ZkUtils}
import org.apache.kafka.common.errors.{ClusterAuthorizationException, NotLeaderForPartitionException, TopicExistsException, UnknownTopicOrPartitionException, UnsupportedForMessageFormatException}
import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.{ApiKeys, Errors, Protocol}
@ -101,6 +102,7 @@ class KafkaApis(val requestChannel: RequestChannel,
case requestId => throw new KafkaException("Unknown api code " + requestId)
}
} catch {
case e: FatalExitError => throw e
case e: Throwable =>
if (request.requestObj != null) {
request.requestObj.handleError(e, requestChannel, request)
@ -154,9 +156,10 @@ class KafkaApis(val requestChannel: RequestChannel,
requestChannel.sendResponse(new Response(request, leaderAndIsrResponse))
} catch {
case e: FatalExitError => throw e
case e: KafkaStorageException =>
fatal("Disk error during leadership change.", e)
Runtime.getRuntime.halt(1)
Exit.halt(1)
}
}

View File

@ -26,7 +26,6 @@ import kafka.cluster.EndPoint
import kafka.metrics.KafkaMetricsGroup
import kafka.utils._
import org.I0Itec.zkclient.IZkStateListener
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.SecurityProtocol
import org.apache.zookeeper.Watcher.Event.KeeperState

View File

@ -20,9 +20,10 @@ package kafka.server
import kafka.network._
import kafka.utils._
import kafka.metrics.KafkaMetricsGroup
import java.util.concurrent.TimeUnit
import java.util.concurrent.{CountDownLatch, TimeUnit}
import com.yammer.metrics.core.Meter
import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.common.utils.{Time, Utils}
/**
@ -36,9 +37,10 @@ class KafkaRequestHandler(id: Int,
apis: KafkaApis,
time: Time) extends Runnable with Logging {
this.logIdent = "[Kafka Request Handler " + id + " on Broker " + brokerId + "], "
private val latch = new CountDownLatch(1)
def run() {
while(true) {
while (true) {
try {
var req : RequestChannel.Request = null
while (req == null) {
@ -52,21 +54,27 @@ class KafkaRequestHandler(id: Int,
aggregateIdleMeter.mark(idleTime / totalHandlerThreads)
}
if(req eq RequestChannel.AllDone) {
debug("Kafka request handler %d on broker %d received shut down command".format(
id, brokerId))
if (req eq RequestChannel.AllDone) {
debug("Kafka request handler %d on broker %d received shut down command".format(id, brokerId))
latch.countDown()
return
}
req.requestDequeueTimeMs = time.milliseconds
trace("Kafka request handler %d on broker %d handling request %s".format(id, brokerId, req))
apis.handle(req)
} catch {
case e: FatalExitError =>
latch.countDown()
Exit.exit(e.statusCode)
case e: Throwable => error("Exception when handling request", e)
}
}
}
def shutdown(): Unit = requestChannel.sendRequest(RequestChannel.AllDone)
def initiateShutdown(): Unit = requestChannel.sendRequest(RequestChannel.AllDone)
def awaitShutdown(): Unit = latch.await()
}
class KafkaRequestHandlerPool(val brokerId: Int,
@ -79,20 +87,18 @@ class KafkaRequestHandlerPool(val brokerId: Int,
private val aggregateIdleMeter = newMeter("RequestHandlerAvgIdlePercent", "percent", TimeUnit.NANOSECONDS)
this.logIdent = "[Kafka Request Handler on Broker " + brokerId + "], "
val threads = new Array[Thread](numThreads)
val runnables = new Array[KafkaRequestHandler](numThreads)
for(i <- 0 until numThreads) {
runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis, time)
threads(i) = Utils.daemonThread("kafka-request-handler-" + i, runnables(i))
threads(i).start()
Utils.daemonThread("kafka-request-handler-" + i, runnables(i)).start()
}
def shutdown() {
info("shutting down")
for(handler <- runnables)
handler.shutdown
for(thread <- threads)
thread.join
for (handler <- runnables)
handler.initiateShutdown()
for (handler <- runnables)
handler.awaitShutdown()
info("shut down completely")
}
}

View File

@ -214,8 +214,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
socketServer.startup()
/* start replica manager */
replicaManager = new ReplicaManager(config, metrics, time, zkUtils, kafkaScheduler, logManager,
isShuttingDown, quotaManagers.follower)
replicaManager = createReplicaManager(isShuttingDown)
replicaManager.startup()
/* start kafka controller */
@ -290,12 +289,15 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
}
}
def notifyClusterListeners(clusterListeners: Seq[AnyRef]): Unit = {
private def notifyClusterListeners(clusterListeners: Seq[AnyRef]): Unit = {
val clusterResourceListeners = new ClusterResourceListeners
clusterResourceListeners.maybeAddAll(clusterListeners.asJava)
clusterResourceListeners.onUpdate(new ClusterResource(clusterId))
}
protected def createReplicaManager(isShuttingDown: AtomicBoolean): ReplicaManager =
new ReplicaManager(config, metrics, time, zkUtils, kafkaScheduler, logManager, isShuttingDown, quotaManagers.follower)
private def initZk(): ZkUtils = {
info(s"Connecting to zookeeper on ${config.zkConnect}")
@ -571,7 +573,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
try {
info("shutting down")
if(isStartingUp.get)
if (isStartingUp.get)
throw new IllegalStateException("Kafka server is still starting up, cannot shut down!")
val canShutdown = isShuttingDown.compareAndSet(false, true)

View File

@ -20,7 +20,7 @@ package kafka.server
import java.util.Properties
import kafka.metrics.KafkaMetricsReporter
import kafka.utils.{VerifiableProperties, Logging}
import kafka.utils.{Exit, Logging, VerifiableProperties}
object KafkaServerStartable {
def fromProps(serverProps: Properties) = {
@ -35,26 +35,22 @@ class KafkaServerStartable(val serverConfig: KafkaConfig, reporters: Seq[KafkaMe
def this(serverConfig: KafkaConfig) = this(serverConfig, Seq.empty)
def startup() {
try {
server.startup()
}
try server.startup()
catch {
case e: Throwable =>
fatal("Fatal error during KafkaServerStartable startup. Prepare to shutdown", e)
// KafkaServer already calls shutdown() internally, so this is purely for logging & the exit code
System.exit(1)
case _: Throwable =>
// KafkaServer.startup() calls shutdown() in case of exceptions, so we invoke `exit` to set the status code
fatal("Exiting Kafka.")
Exit.exit(1)
}
}
def shutdown() {
try {
server.shutdown()
}
try server.shutdown()
catch {
case e: Throwable =>
fatal("Fatal error during KafkaServerStable shutdown. Prepare to halt", e)
case _: Throwable =>
fatal("Halting Kafka.")
// Calling exit() can lead to deadlock as exit() can be called multiple times. Force exit.
Runtime.getRuntime.halt(1)
Exit.halt(1)
}
}
@ -66,8 +62,7 @@ class KafkaServerStartable(val serverConfig: KafkaConfig, reporters: Seq[KafkaMe
server.brokerState.newState(newState)
}
def awaitShutdown() =
server.awaitShutdown
def awaitShutdown(): Unit = server.awaitShutdown()
}

View File

@ -22,7 +22,7 @@ import java.util.regex.Pattern
import org.apache.kafka.common.utils.Utils
import scala.collection._
import kafka.utils.Logging
import kafka.utils.{Exit, Logging}
import kafka.common._
import java.io._
@ -66,7 +66,7 @@ class OffsetCheckpoint(val file: File) extends Logging {
case e: FileNotFoundException =>
if (FileSystems.getDefault.isReadOnly) {
fatal("Halting writes to offset checkpoint file because the underlying file system is inaccessible : ", e)
Runtime.getRuntime.halt(1)
Exit.halt(1)
}
throw e
} finally {

View File

@ -21,19 +21,15 @@ import kafka.cluster.BrokerEndPoint
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.utils.Time
class ReplicaFetcherManager(brokerConfig: KafkaConfig, replicaMgr: ReplicaManager, metrics: Metrics, time: Time, threadNamePrefix: Option[String] = None, quotaManager: ReplicationQuotaManager)
class ReplicaFetcherManager(brokerConfig: KafkaConfig, protected val replicaManager: ReplicaManager, metrics: Metrics,
time: Time, threadNamePrefix: Option[String] = None, quotaManager: ReplicationQuotaManager)
extends AbstractFetcherManager("ReplicaFetcherManager on broker " + brokerConfig.brokerId,
"Replica", brokerConfig.numReplicaFetchers) {
override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): AbstractFetcherThread = {
val threadName = threadNamePrefix match {
case None =>
"ReplicaFetcherThread-%d-%d".format(fetcherId, sourceBroker.id)
case Some(p) =>
"%s:ReplicaFetcherThread-%d-%d".format(p, fetcherId, sourceBroker.id)
}
new ReplicaFetcherThread(threadName, fetcherId, sourceBroker, brokerConfig,
replicaMgr, metrics, time, quotaManager)
val prefix = threadNamePrefix.map(tp => s"${tp}:").getOrElse("")
val threadName = s"${prefix}ReplicaFetcherThread-$fetcherId-${sourceBroker.id}"
new ReplicaFetcherThread(threadName, fetcherId, sourceBroker, brokerConfig, replicaManager, metrics, time, quotaManager)
}
def shutdown() {

View File

@ -26,13 +26,15 @@ import kafka.log.LogConfig
import kafka.api.{KAFKA_0_10_0_IV0, KAFKA_0_10_1_IV1, KAFKA_0_10_1_IV2, KAFKA_0_9_0}
import kafka.common.KafkaStorageException
import ReplicaFetcherThread._
import org.apache.kafka.clients.{ClientRequest, ClientResponse, ManualMetadataUpdater, NetworkClient}
import org.apache.kafka.common.network.{ChannelBuilders, Mode, NetworkReceive, Selectable, Selector}
import kafka.utils.Exit
import org.apache.kafka.clients.{ClientResponse, ManualMetadataUpdater, NetworkClient}
import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.common.network.{ChannelBuilders, NetworkReceive, Selectable, Selector}
import org.apache.kafka.common.requests.{AbstractRequest, FetchResponse, ListOffsetRequest, ListOffsetResponse}
import org.apache.kafka.common.requests.{FetchRequest => JFetchRequest}
import org.apache.kafka.common.{Node, TopicPartition}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.{ApiKeys, Errors, ProtoUtils}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.MemoryRecords
import org.apache.kafka.common.security.JaasContext
import org.apache.kafka.common.utils.Time
@ -143,7 +145,7 @@ class ReplicaFetcherThread(name: String,
} catch {
case e: KafkaStorageException =>
fatal(s"Disk error while replicating data for $topicPartition", e)
Runtime.getRuntime.halt(1)
Exit.halt(1)
}
}
@ -181,11 +183,11 @@ class ReplicaFetcherThread(name: String,
// we should never encounter this situation since a non-ISR leader cannot be elected if disallowed by the broker configuration.
if (!LogConfig.fromProps(brokerConfig.originals, AdminUtils.fetchEntityConfig(replicaMgr.zkUtils,
ConfigType.Topic, topicPartition.topic)).uncleanLeaderElectionEnable) {
// Log a fatal error and shutdown the broker to ensure that data loss does not unexpectedly occur.
fatal("Exiting because log truncation is not allowed for partition %s,".format(topicPartition) +
" Current leader %d's latest offset %d is less than replica %d's latest offset %d"
.format(sourceBroker.id, leaderEndOffset, brokerConfig.brokerId, replica.logEndOffset.messageOffset))
System.exit(1)
// Log a fatal error and shutdown the broker to ensure that data loss does not occur unexpectedly.
fatal(s"Exiting because log truncation is not allowed for partition $topicPartition, current leader " +
s"${sourceBroker.id}'s latest offset $leaderEndOffset is less than replica ${brokerConfig.brokerId}'s latest " +
s"offset ${replica.logEndOffset.messageOffset}")
throw new FatalExitError
}
warn("Replica %d for partition %s reset its fetch offset from %d to current leader %d's latest offset %d"

View File

@ -116,7 +116,7 @@ class ReplicaManager(val config: KafkaConfig,
private val allPartitions = new Pool[TopicPartition, Partition](valueFactory = Some(tp =>
new Partition(tp.topic, tp.partition, time, this)))
private val replicaStateChangeLock = new Object
val replicaFetcherManager = new ReplicaFetcherManager(config, this, metrics, time, threadNamePrefix, quotaManager)
val replicaFetcherManager = createReplicaFetcherManager(metrics, time, threadNamePrefix, quotaManager)
private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false)
val highWatermarkCheckpoints = config.logDirs.map(dir => (new File(dir).getAbsolutePath, new OffsetCheckpoint(new File(dir, ReplicaManager.HighWatermarkFilename)))).toMap
private var hwThreadInitialized = false
@ -134,9 +134,7 @@ class ReplicaManager(val config: KafkaConfig,
val leaderCount = newGauge(
"LeaderCount",
new Gauge[Int] {
def value = {
getLeaderPartitions().size
}
def value = getLeaderPartitions.size
}
)
val partitionCount = newGauge(
@ -148,15 +146,14 @@ class ReplicaManager(val config: KafkaConfig,
val underReplicatedPartitions = newGauge(
"UnderReplicatedPartitions",
new Gauge[Int] {
def value = underReplicatedPartitionCount()
def value = underReplicatedPartitionCount
}
)
val isrExpandRate = newMeter("IsrExpandsPerSec", "expands", TimeUnit.SECONDS)
val isrShrinkRate = newMeter("IsrShrinksPerSec", "shrinks", TimeUnit.SECONDS)
def underReplicatedPartitionCount(): Int = {
getLeaderPartitions().count(_.isUnderReplicated)
}
def underReplicatedPartitionCount: Int =
getLeaderPartitions.count(_.isUnderReplicated)
def startHighWaterMarksCheckPointThread() = {
if(highWatermarkCheckPointThreadStarted.compareAndSet(false, true))
@ -411,7 +408,7 @@ class ReplicaManager(val config: KafkaConfig,
// it is supposed to indicate un-expected failures of a broker in handling a produce request
case e: KafkaStorageException =>
fatal("Halting due to unrecoverable I/O error while handling produce request: ", e)
Runtime.getRuntime.halt(1)
Exit.halt(1)
(topicPartition, null)
case e@ (_: UnknownTopicOrPartitionException |
_: NotLeaderForPartitionException |
@ -928,9 +925,8 @@ class ReplicaManager(val config: KafkaConfig,
}
}
private def getLeaderPartitions(): List[Partition] = {
private def getLeaderPartitions: List[Partition] =
allPartitions.values.filter(_.leaderReplicaIfLocal.isDefined).toList
}
def getHighWatermark(topicPartition: TopicPartition): Option[Long] = {
getPartition(topicPartition).flatMap { partition =>
@ -949,7 +945,7 @@ class ReplicaManager(val config: KafkaConfig,
} catch {
case e: IOException =>
fatal("Error writing to highwatermark file: ", e)
Runtime.getRuntime.halt(1)
Exit.halt(1)
}
}
}
@ -964,4 +960,9 @@ class ReplicaManager(val config: KafkaConfig,
checkpointHighWatermarks()
info("Shut down completely")
}
protected def createReplicaFetcherManager(metrics: Metrics, time: Time, threadNamePrefix: Option[String], quotaManager: ReplicationQuotaManager) = {
new ReplicaFetcherManager(config, this, metrics, time, threadNamePrefix, quotaManager)
}
}

View File

@ -51,7 +51,7 @@ object ConsoleConsumer extends Logging {
} catch {
case e: Throwable =>
error("Unknown error when running consumer: ", e)
System.exit(1)
Exit.exit(1)
}
}
@ -89,14 +89,14 @@ object ConsoleConsumer extends Logging {
def checkZk(config: ConsumerConfig) {
if (!checkZkPathExists(config.options.valueOf(config.zkConnectOpt), "/brokers/ids")) {
System.err.println("No brokers found in ZK.")
System.exit(1)
Exit.exit(1)
}
if (!config.options.has(config.deleteConsumerOffsetsOpt) && config.options.has(config.resetBeginningOpt) &&
checkZkPathExists(config.options.valueOf(config.zkConnectOpt), "/consumers/" + config.consumerProps.getProperty("group.id") + "/offsets")) {
System.err.println("Found previous offset information for this group " + config.consumerProps.getProperty("group.id")
+ ". Please use --delete-consumer-offsets to delete previous offsets metadata")
System.exit(1)
Exit.exit(1)
}
}
@ -177,7 +177,7 @@ object ConsoleConsumer extends Logging {
checkZkPathExists(config.options.valueOf(config.zkConnectOpt), "/consumers/" + props.getProperty("group.id") + "/offsets")) {
System.err.println("Found previous offset information for this group " + props.getProperty("group.id")
+ ". Please use --delete-consumer-offsets to delete previous offsets metadata")
System.exit(1)
Exit.exit(1)
}
if (config.options.has(config.deleteConsumerOffsetsOpt))
@ -389,13 +389,12 @@ object ConsoleConsumer extends Logging {
groupIdPassed = false
}
def tryParse(parser: OptionParser, args: Array[String]) = {
def tryParse(parser: OptionParser, args: Array[String]): OptionSet = {
try
parser.parse(args: _*)
catch {
case e: OptionException =>
Utils.croak(e.getMessage)
null
CommandLineUtils.printUsageAndDie(parser, e.getMessage)
}
}
}

View File

@ -20,7 +20,7 @@ package kafka.tools
import kafka.common._
import kafka.message._
import kafka.serializer._
import kafka.utils.{CommandLineUtils, ToolsUtils}
import kafka.utils.{CommandLineUtils, Exit, ToolsUtils}
import kafka.producer.{NewShinyProducer, OldProducer}
import java.util.Properties
import java.io._
@ -62,12 +62,12 @@ object ConsoleProducer {
} catch {
case e: joptsimple.OptionException =>
System.err.println(e.getMessage)
System.exit(1)
Exit.exit(1)
case e: Exception =>
e.printStackTrace
System.exit(1)
Exit.exit(1)
}
System.exit(0)
Exit.exit(0)
}
def getReaderProps(config: ProducerConfig): Properties = {

View File

@ -126,7 +126,7 @@ object ConsumerOffsetChecker extends Logging {
if (options.has("help")) {
parser.printHelpOn(System.out)
System.exit(0)
Exit.exit(0)
}
CommandLineUtils.checkRequiredArgs(parser, options, groupOpt, zkConnectOpt)

View File

@ -19,6 +19,7 @@ package kafka.tools
import java.util.{Arrays, Collections, Properties}
import kafka.utils.Exit
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.clients.producer._
import org.apache.kafka.common.utils.Utils
@ -44,7 +45,7 @@ object EndToEndLatency {
def main(args: Array[String]) {
if (args.length != 5 && args.length != 6) {
System.err.println("USAGE: java " + getClass.getName + " broker_list topic num_messages producer_acks message_size_bytes [optional] properties_file")
System.exit(1)
Exit.exit(1)
}
val brokerList = args(0)

View File

@ -18,9 +18,11 @@
package kafka.tools
import java.io.FileWriter
import joptsimple._
import kafka.utils.{Logging, ZkUtils, ZKGroupTopicDirs, CommandLineUtils}
import kafka.utils.{CommandLineUtils, Exit, Logging, ZKGroupTopicDirs, ZkUtils}
import org.apache.kafka.common.security.JaasUtils
import scala.collection.JavaConverters._
@ -64,7 +66,7 @@ object ExportZkOffsets extends Logging {
if (options.has("help")) {
parser.printHelpOn(System.out)
System.exit(0)
Exit.exit(0)
}
CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt, outFileOpt)

View File

@ -20,10 +20,10 @@ package kafka.tools
import kafka.consumer._
import joptsimple._
import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest}
import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo}
import kafka.common.TopicAndPartition
import kafka.client.ClientUtils
import kafka.utils.{ToolsUtils, CommandLineUtils}
import kafka.utils.{CommandLineUtils, Exit, ToolsUtils}
object GetOffsetShell {
@ -80,7 +80,7 @@ object GetOffsetShell {
if(topicsMetadata.size != 1 || !topicsMetadata.head.topic.equals(topic)) {
System.err.println(("Error: no valid topic metadata for topic: %s, " + " probably the topic does not exist, run ").format(topic) +
"kafka-list-topic.sh to verify")
System.exit(1)
Exit.exit(1)
}
val partitions =
if(partitionList == "") {

View File

@ -19,8 +19,9 @@ package kafka.tools
import java.io.BufferedReader
import java.io.FileReader
import joptsimple._
import kafka.utils.{Logging, ZkUtils, CommandLineUtils}
import kafka.utils.{CommandLineUtils, Exit, Logging, ZkUtils}
import org.I0Itec.zkclient.ZkClient
import org.apache.kafka.common.security.JaasUtils
@ -61,7 +62,7 @@ object ImportZkOffsets extends Logging {
if (options.has("help")) {
parser.printHelpOn(System.out)
System.exit(0)
Exit.exit(0)
}
CommandLineUtils.checkRequiredArgs(parser, options, inFileOpt)

View File

@ -22,11 +22,13 @@ import java.util.Date
import java.text.SimpleDateFormat
import javax.management._
import javax.management.remote._
import joptsimple.OptionParser
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.math._
import kafka.utils.{CommandLineUtils, Logging}
import kafka.utils.{CommandLineUtils, Exit, Logging}
object JmxTool extends Logging {
@ -71,7 +73,7 @@ object JmxTool extends Logging {
if(options.has(helpOpt)) {
parser.printHelpOn(System.out)
System.exit(0)
Exit.exit(0)
}
val url = new JMXServiceURL(options.valueOf(jmxServiceUrlOpt))

View File

@ -18,11 +18,10 @@
package kafka.tools
import kafka.metrics.KafkaMetricsReporter
import kafka.producer.{OldProducer, NewShinyProducer}
import kafka.utils.{ToolsUtils, VerifiableProperties, Logging, CommandLineUtils}
import kafka.producer.{NewShinyProducer, OldProducer}
import kafka.utils.{CommandLineUtils, Exit, Logging, ToolsUtils, VerifiableProperties}
import kafka.message.CompressionCodec
import kafka.serializer._
import java.util.concurrent.{CountDownLatch, Executors}
import java.util.concurrent.atomic.AtomicLong
import java.util._
@ -67,7 +66,7 @@ object ProducerPerformance extends Logging {
config.dateFormat.format(startMs), config.dateFormat.format(endMs),
config.compressionCodec.codec, config.messageSize, config.batchSize, totalMBSent,
totalMBSent / elapsedSecs, totalMessagesSent.get, totalMessagesSent.get / elapsedSecs))
System.exit(0)
Exit.exit(0)
}
class ProducerPerfConfig(args: Array[String]) extends PerfConfig(args) {

View File

@ -136,7 +136,7 @@ object ReplicaVerificationTool extends Logging {
if (filteredTopicMetadata.isEmpty) {
error("No topics found. " + topicWhiteListOpt + ", if specified, is either filtering out all topics or there is no topic.")
System.exit(1)
Exit.exit(1)
}
val topicPartitionReplicaList: Seq[TopicPartitionReplica] = filteredTopicMetadata.flatMap(
@ -302,7 +302,7 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa
+ ": replica " + messageInfoFromFirstReplica.replicaId + "'s offset "
+ messageInfoFromFirstReplica.offset + " doesn't match replica "
+ replicaId + "'s offset " + logEntry.offset)
System.exit(1)
Exit.exit(1)
}
if (messageInfoFromFirstReplica.checksum != logEntry.record.checksum)
println(ReplicaVerificationTool.getCurrentTimeString + ": partition "

View File

@ -113,7 +113,7 @@ object SimpleConsumerPerformance {
config.dateFormat.format(reportTime), config.fetchSize, totalMBRead, totalMBRead/elapsed,
totalMessagesRead, totalMessagesRead/elapsed))
}
System.exit(0)
Exit.exit(0)
}
class ConsumerPerfConfig(args: Array[String]) extends PerfConfig(args) {

View File

@ -133,7 +133,7 @@ object SimpleConsumerShell extends Logging {
val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers, clientId, maxWaitMs).topicsMetadata
if(topicsMetadata.size != 1 || !topicsMetadata.head.topic.equals(topic)) {
System.err.println(("Error: no valid topic metadata for topic: %s, " + "what we get from server is only: %s").format(topic, topicsMetadata))
System.exit(1)
Exit.exit(1)
}
// validating partition id
@ -141,7 +141,7 @@ object SimpleConsumerShell extends Logging {
val partitionMetadataOpt = partitionsMetadata.find(p => p.partitionId == partitionId)
if (partitionMetadataOpt.isEmpty) {
System.err.println("Error: partition %d does not exist for topic %s".format(partitionId, topic))
System.exit(1)
Exit.exit(1)
}
// validating replica id and initializing target broker
@ -151,7 +151,7 @@ object SimpleConsumerShell extends Logging {
replicaOpt = partitionMetadataOpt.get.leader
if (replicaOpt.isEmpty) {
System.err.println("Error: user specifies to fetch from leader for partition (%s, %d) which has not been elected yet".format(topic, partitionId))
System.exit(1)
Exit.exit(1)
}
}
else {
@ -159,7 +159,7 @@ object SimpleConsumerShell extends Logging {
replicaOpt = replicasForPartition.find(r => r.id == replicaId)
if(replicaOpt.isEmpty) {
System.err.println("Error: replica %d does not exist for partition (%s, %d)".format(replicaId, topic, partitionId))
System.exit(1)
Exit.exit(1)
}
}
fetchTargetBroker = replicaOpt.get
@ -167,7 +167,7 @@ object SimpleConsumerShell extends Logging {
// initializing starting offset
if(startingOffset < OffsetRequest.EarliestTime) {
System.err.println("Invalid starting offset: %d".format(startingOffset))
System.exit(1)
Exit.exit(1)
}
if (startingOffset < 0) {
val simpleConsumer = new SimpleConsumer(fetchTargetBroker.host,
@ -180,7 +180,7 @@ object SimpleConsumerShell extends Logging {
} catch {
case t: Throwable =>
System.err.println("Error in getting earliest or latest offset due to: " + Utils.stackTrace(t))
System.exit(1)
Exit.exit(1)
} finally {
if (simpleConsumer != null)
simpleConsumer.close()
@ -240,7 +240,7 @@ object SimpleConsumerShell extends Logging {
System.err.println("Unable to write to standard out, closing consumer.")
formatter.close()
simpleConsumer.close()
System.exit(1)
Exit.exit(1)
}
}
}

View File

@ -18,11 +18,13 @@
package kafka.tools
import joptsimple._
import scala.util.matching.Regex
import collection.mutable
import java.util.Date
import java.text.SimpleDateFormat
import kafka.utils.{CoreUtils, Logging, CommandLineUtils}
import kafka.utils.{CommandLineUtils, CoreUtils, Exit, Logging}
import kafka.common.Topic
import java.io.{BufferedOutputStream, OutputStream}
@ -92,12 +94,12 @@ object StateChangeLogMerger extends Logging {
if ((!options.has(filesOpt) && !options.has(regexOpt)) || (options.has(filesOpt) && options.has(regexOpt))) {
System.err.println("Provide arguments to exactly one of the two options \"" + filesOpt + "\" or \"" + regexOpt + "\"")
parser.printHelpOn(System.err)
System.exit(1)
Exit.exit(1)
}
if (options.has(partitionsOpt) && !options.has(topicOpt)) {
System.err.println("The option \"" + topicOpt + "\" needs to be provided an argument when specifying partition ids")
parser.printHelpOn(System.err)
System.exit(1)
Exit.exit(1)
}
// Populate data structures.
@ -118,7 +120,7 @@ object StateChangeLogMerger extends Logging {
val duplicatePartitions = CoreUtils.duplicates(partitions)
if (duplicatePartitions.nonEmpty) {
System.err.println("The list of partitions contains repeated entries: %s".format(duplicatePartitions.mkString(",")))
System.exit(1)
Exit.exit(1)
}
}
startDate = dateFormat.parse(options.valueOf(startTimeOpt).replace('\"', ' ').trim)
@ -193,4 +195,4 @@ object StateChangeLogMerger extends Logging {
}
}
}
}

View File

@ -29,6 +29,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.security.JaasUtils;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.Exit;
import java.io.IOException;
import java.util.HashSet;
@ -271,7 +272,7 @@ public class StreamsResetter {
}
public static void main(final String[] args) {
System.exit(new StreamsResetter().run(args));
Exit.exit(new StreamsResetter().run(args));
}
}

View File

@ -21,7 +21,7 @@ import org.I0Itec.zkclient.ZkClient
import kafka.consumer.{ConsumerConfig, SimpleConsumer}
import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo}
import kafka.common.{KafkaException, TopicAndPartition}
import kafka.utils.{CoreUtils, ZKGroupTopicDirs, ZkUtils}
import kafka.utils.{CoreUtils, Exit, ZKGroupTopicDirs, ZkUtils}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.SecurityProtocol
import org.apache.kafka.common.security.JaasUtils
@ -86,6 +86,6 @@ object UpdateOffsetsInZK {
private def usage() = {
println("USAGE: " + UpdateOffsetsInZK.getClass.getName + " [earliest | latest] consumer.properties topic")
System.exit(1)
Exit.exit(1)
}
}

View File

@ -19,8 +19,7 @@ package kafka.tools
import joptsimple.OptionParser
import org.apache.kafka.common.security._
import kafka.utils.{Logging, ZKGroupTopicDirs, ZkUtils, CommandLineUtils}
import kafka.utils.{CommandLineUtils, Exit, Logging, ZKGroupTopicDirs, ZkUtils}
object VerifyConsumerRebalance extends Logging {
def main(args: Array[String]) {
@ -39,7 +38,7 @@ object VerifyConsumerRebalance extends Logging {
if (options.has("help")) {
parser.printHelpOn(System.out)
System.exit(0)
Exit.exit(0)
}
CommandLineUtils.checkRequiredArgs(parser, options, groupOpt)

View File

@ -17,12 +17,13 @@
package kafka.tools
import kafka.utils.Exit
import org.apache.zookeeper.ZooKeeperMain
class ZooKeeperMainWrapper(args: Array[String]) extends ZooKeeperMain(args) {
def runCmd(): Unit = {
processCmd(this.cl)
System.exit(0)
Exit.exit(0)
}
}

View File

@ -25,21 +25,11 @@ import java.util.Properties
*/
object CommandLineUtils extends Logging {
trait ExitPolicy {
def exit(msg: String): Nothing
}
val DEFAULT_EXIT_POLICY = new ExitPolicy {
override def exit(msg: String): Nothing = sys.exit(1)
}
private var exitPolicy = DEFAULT_EXIT_POLICY
/**
* Check that all the listed options are present
*/
def checkRequiredArgs(parser: OptionParser, options: OptionSet, required: OptionSpec[_]*) {
for(arg <- required) {
for (arg <- required) {
if(!options.has(arg))
printUsageAndDie(parser, "Missing required argument \"" + arg + "\"")
}
@ -63,11 +53,9 @@ object CommandLineUtils extends Logging {
def printUsageAndDie(parser: OptionParser, message: String): Nothing = {
System.err.println(message)
parser.printHelpOn(System.err)
exitPolicy.exit(message)
Exit.exit(1, Some(message))
}
def exitPolicy(policy: ExitPolicy): Unit = this.exitPolicy = policy
/**
* Parse key-value pairs in the form key=value
*/
@ -75,7 +63,7 @@ object CommandLineUtils extends Logging {
val splits = args.map(_ split "=").filterNot(_.length == 0)
val props = new Properties
for(a <- splits) {
for (a <- splits) {
if (a.length == 1) {
if (acceptMissingValue) props.put(a(0), "")
else throw new IllegalArgumentException(s"Missing value for key ${a(0)}")
@ -83,7 +71,7 @@ object CommandLineUtils extends Logging {
else if (a.length == 2) props.put(a(0), a(1))
else {
System.err.println("Invalid command line properties: " + args.mkString(" "))
System.exit(1)
Exit.exit(1)
}
}
props

View File

@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.utils
import org.apache.kafka.common.utils.{Exit => JExit}
/**
* Internal class that should be used instead of `Exit.exit()` and `Runtime.getRuntime().halt()` so that tests can
* easily change the behaviour.
*/
object Exit {
def exit(statusCode: Int, message: Option[String] = None): Nothing = {
JExit.exit(statusCode, message.orNull)
throw new AssertionError("exit should not return, but it did.")
}
def halt(statusCode: Int, message: Option[String] = None): Nothing = {
JExit.halt(statusCode, message.orNull)
throw new AssertionError("halt should not return, but it did.")
}
def setExitProcedure(exitProcedure: (Int, Option[String]) => Nothing): Unit =
JExit.setExitProcedure(functionToProcedure(exitProcedure))
def setHaltProcedure(haltProcedure: (Int, Option[String]) => Nothing): Unit =
JExit.setExitProcedure(functionToProcedure(haltProcedure))
def resetExitProcedure(): Unit =
JExit.resetExitProcedure()
def resetHaltProcedure(): Unit =
JExit.resetHaltProcedure()
private def functionToProcedure(procedure: (Int, Option[String]) => Nothing) = new JExit.Procedure {
def execute(statusCode: Int, message: String): Unit = procedure(statusCode, Option(message))
}
}

View File

@ -20,6 +20,8 @@ package kafka.utils
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.CountDownLatch
import org.apache.kafka.common.internals.FatalExitError
abstract class ShutdownableThread(val name: String, val isInterruptible: Boolean = true)
extends Thread(name) with Logging {
this.setDaemon(false)
@ -33,9 +35,8 @@ abstract class ShutdownableThread(val name: String, val isInterruptible: Boolean
}
def initiateShutdown(): Boolean = {
if(isRunning.compareAndSet(true, false)) {
if (isRunning.compareAndSet(true, false)) {
info("Shutting down")
isRunning.set(false)
if (isInterruptible)
interrupt()
true
@ -57,17 +58,21 @@ abstract class ShutdownableThread(val name: String, val isInterruptible: Boolean
def doWork(): Unit
override def run(): Unit = {
info("Starting ")
try{
while(isRunning.get()){
info("Starting")
try {
while (isRunning.get)
doWork()
}
} catch{
} catch {
case e: FatalExitError =>
isRunning.set(false)
shutdownLatch.countDown()
info("Stopped")
Exit.exit(e.statusCode())
case e: Throwable =>
if(isRunning.get())
error("Error due to ", e)
if (isRunning.get())
error("Error due to", e)
}
shutdownLatch.countDown()
info("Stopped ")
info("Stopped")
}
}
}

View File

@ -10,7 +10,7 @@
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package integration.kafka.api
package kafka.api
import kafka.common.Topic
import kafka.integration.KafkaServerTestHarness

View File

@ -0,0 +1,145 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import java.util.concurrent.atomic.AtomicBoolean
import kafka.admin.AdminUtils
import kafka.cluster.BrokerEndPoint
import kafka.server.ReplicaFetcherThread.{FetchRequest, PartitionData}
import kafka.utils.{Exit, TestUtils, ZkUtils}
import kafka.utils.TestUtils.createBrokerConfigs
import kafka.zk.ZooKeeperTestHarness
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.FetchResponse
import org.apache.kafka.common.utils.Time
import org.junit.{After, Test}
import scala.collection.Map
import scala.collection.JavaConverters._
import scala.concurrent.Future
class ReplicaFetcherThreadFatalErrorTest extends ZooKeeperTestHarness {
private var brokers: Seq[KafkaServer] = null
@volatile private var shutdownCompleted = false
@After
override def tearDown() {
Exit.resetExitProcedure()
brokers.foreach(_.shutdown())
super.tearDown()
}
/**
* Verifies that a follower shuts down if the offset for an `added partition` is out of range and if a fatal
* exception is thrown from `handleOffsetOutOfRange`. It's a bit tricky to ensure that there are no deadlocks
* when the shutdown hook is invoked and hence this test.
*/
@Test
def testFatalErrorInAddPartitions(): Unit = {
// Unlike `TestUtils.createTopic`, this doesn't wait for metadata propagation as the broker shuts down before
// the metadata is propagated.
def createTopic(zkUtils: ZkUtils, topic: String): Unit = {
AdminUtils.createTopic(zkUtils, topic, partitions = 1, replicationFactor = 2)
TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0)
}
val props = createBrokerConfigs(2, zkConnect)
brokers = props.map(KafkaConfig.fromProps).map(config => createServer(config, { params =>
import params._
new ReplicaFetcherThread(threadName, fetcherId, sourceBroker, config, replicaManager, metrics, time, quotaManager) {
override def handleOffsetOutOfRange(topicPartition: TopicPartition): Long = throw new FatalExitError
override def addPartitions(partitionAndOffsets: Map[TopicPartition, Long]): Unit =
super.addPartitions(partitionAndOffsets.mapValues(_ => -1))
}
}))
createTopic(zkUtils, "topic")
TestUtils.waitUntilTrue(() => shutdownCompleted, "Shutdown of follower did not complete")
}
/**
* Verifies that a follower shuts down if the offset of a partition in the fetch response is out of range and if a
* fatal exception is thrown from `handleOffsetOutOfRange`. It's a bit tricky to ensure that there are no deadlocks
* when the shutdown hook is invoked and hence this test.
*/
@Test
def testFatalErrorInProcessFetchRequest(): Unit = {
val props = createBrokerConfigs(2, zkConnect)
brokers = props.map(KafkaConfig.fromProps).map(config => createServer(config, { params =>
import params._
new ReplicaFetcherThread(threadName, fetcherId, sourceBroker, config, replicaManager, metrics, time, quotaManager) {
override def handleOffsetOutOfRange(topicPartition: TopicPartition): Long = throw new FatalExitError
override protected def fetch(fetchRequest: FetchRequest): Seq[(TopicPartition, PartitionData)] = {
fetchRequest.underlying.fetchData.asScala.keys.toSeq.map { tp =>
(tp, new PartitionData(new FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE.code, -1, null)))
}
}
}
}))
TestUtils.createTopic(zkUtils, "topic", numPartitions = 1, replicationFactor = 2, servers = brokers)
TestUtils.waitUntilTrue(() => shutdownCompleted, "Shutdown of follower did not complete")
}
private case class FetcherThreadParams(threadName: String, fetcherId: Int, sourceBroker: BrokerEndPoint,
replicaManager: ReplicaManager, metrics: Metrics, time: Time,
quotaManager: ReplicationQuotaManager)
private def createServer(config: KafkaConfig, fetcherThread: FetcherThreadParams => ReplicaFetcherThread): KafkaServer = {
val time = Time.SYSTEM
val server = new KafkaServer(config, time) {
override def createReplicaManager(isShuttingDown: AtomicBoolean): ReplicaManager = {
new ReplicaManager(config, metrics, time, zkUtils, kafkaScheduler, logManager, isShuttingDown,
quotaManagers.follower) {
override protected def createReplicaFetcherManager(metrics: Metrics, time: Time, threadNamePrefix: Option[String],
quotaManager: ReplicationQuotaManager) =
new ReplicaFetcherManager(config, this, metrics, time, threadNamePrefix, quotaManager) {
override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): AbstractFetcherThread = {
val prefix = threadNamePrefix.map(tp => s"${tp}:").getOrElse("")
val threadName = s"${prefix}ReplicaFetcherThread-$fetcherId-${sourceBroker.id}"
fetcherThread(new FetcherThreadParams(threadName, fetcherId, sourceBroker, replicaManager, metrics,
time, quotaManager))
}
}
}
}
}
Exit.setExitProcedure { (_, _) =>
import scala.concurrent.ExecutionContext.Implicits._
// Run in a separate thread like shutdown hooks
Future {
server.shutdown()
shutdownCompleted = true
}
// Sleep until interrupted to emulate the fact that `System.exit()` never returns
Thread.sleep(Long.MaxValue)
throw new AssertionError
}
server.startup()
server
}
}

View File

@ -25,7 +25,7 @@ import java.nio.file.Files
import java.text.MessageFormat
import java.util.{Locale, Properties, UUID}
import kafka.utils.{CoreUtils, Logging}
import kafka.utils.{CoreUtils, Exit, Logging}
import scala.collection.JavaConverters._
import org.apache.commons.io.IOUtils
@ -355,7 +355,7 @@ object MiniKdc {
start(workDir, config, keytabFile, principals)
case _ =>
println("Arguments: <WORKDIR> <MINIKDCPROPERTIES> <KEYTABFILE> [<PRINCIPALS>]+")
sys.exit(1)
Exit.exit(1)
}
}

View File

@ -100,7 +100,7 @@ object TestLogCleaning {
if(options.has(dumpOpt)) {
dumpLog(new File(options.valueOf(dumpOpt)))
System.exit(0)
Exit.exit(0)
}
CommandLineUtils.checkRequiredArgs(parser, options, brokerOpt, zkConnectOpt, numMessagesOpt)

View File

@ -26,7 +26,7 @@ import org.apache.kafka.common.TopicPartition
import kafka.server.{KafkaConfig, KafkaServer, QuotaType}
import kafka.utils.TestUtils._
import kafka.utils.ZkUtils._
import kafka.utils.{CoreUtils, Logging, TestUtils, ZkUtils}
import kafka.utils.{CoreUtils, Exit, Logging, TestUtils, ZkUtils}
import kafka.zk.ZooKeeperTestHarness
import org.apache.kafka.clients.producer.ProducerRecord
import org.jfree.chart.plot.PlotOrientation
@ -70,7 +70,7 @@ object ReplicationQuotasTestRig {
experiments.foreach(run(_, journal, displayChartsOnScreen))
if (!displayChartsOnScreen)
System.exit(0)
Exit.exit(0)
}
def run(config: ExperimentDef, journal: Journal, displayChartsOnScreen: Boolean) {

View File

@ -18,7 +18,7 @@
package kafka
import org.apache.log4j.PropertyConfigurator
import kafka.utils.Logging
import kafka.utils.{Exit, Logging}
import serializer.Encoder
object TestKafkaAppender extends Logging {
@ -27,7 +27,7 @@ object TestKafkaAppender extends Logging {
if(args.length < 1) {
println("USAGE: " + TestKafkaAppender.getClass.getName + " log4j_config")
System.exit(1)
Exit.exit(1)
}
try {
@ -36,7 +36,7 @@ object TestKafkaAppender extends Logging {
case e: Exception =>
System.err.println("KafkaAppender could not be initialized ! Exiting..")
e.printStackTrace()
System.exit(1)
Exit.exit(1)
}
for (_ <- 1 to 10)

View File

@ -121,7 +121,7 @@ object TestLinearWriteSpeed {
writables(i) = new LogWritable(new File(dir, "kafka-test-" + i), new LogConfig(logProperties), scheduler, messageSet)
} else {
System.err.println("Must specify what to write to with one of --log, --channel, or --mmap")
System.exit(1)
Exit.exit(1)
}
}
bytesToWrite = (bytesToWrite / numFiles) * numFiles

View File

@ -18,16 +18,19 @@
package other.kafka
import kafka.api._
import kafka.utils.{ZkUtils, ShutdownableThread}
import kafka.utils.{Exit, ShutdownableThread, ZkUtils}
import org.apache.kafka.common.protocol.Errors
import scala.collection._
import kafka.client.ClientUtils
import joptsimple.OptionParser
import kafka.common.{OffsetAndMetadata, TopicAndPartition}
import kafka.network.BlockingChannel
import scala.util.Random
import java.io.IOException
import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import java.nio.channels.ClosedByInterruptException
@ -237,7 +240,7 @@ object TestOffsetManager {
if (options.has(helpOpt)) {
parser.printHelpOn(System.out)
System.exit(0)
Exit.exit(0)
}
val commitIntervalMs = options.valueOf(commitIntervalOpt).intValue()

View File

@ -17,42 +17,22 @@
package kafka
import java.io.{File, FileOutputStream}
import java.security.Permission
import java.util
import kafka.server.KafkaConfig
import kafka.utils.Exit
import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.internals.FatalExitError
import org.junit.{After, Before, Test}
import org.junit.Assert._
class KafkaTest {
val originalSecurityManager: SecurityManager = System.getSecurityManager
class ExitCalled extends SecurityException {
}
private class NoExitSecurityManager extends SecurityManager {
override def checkExit(status: Int): Unit = {
throw new ExitCalled
}
override def checkPermission(perm : Permission): Unit = {
}
override def checkPermission(perm : Permission, context: Object): Unit = {
}
}
@Before
def setSecurityManager() : Unit = {
System.setSecurityManager(new NoExitSecurityManager)
}
def setUp(): Unit = Exit.setExitProcedure((status, _) => throw new FatalExitError(status))
@After
def setOriginalSecurityManager() : Unit = {
System.setSecurityManager(originalSecurityManager)
}
def tearDown(): Unit = Exit.resetExitProcedure()
@Test
def testGetKafkaConfigFromArgs(): Unit = {
@ -77,25 +57,25 @@ class KafkaTest {
assertEquals(util.Arrays.asList("compact","delete"), config4.logCleanupPolicy)
}
@Test(expected = classOf[ExitCalled])
@Test(expected = classOf[FatalExitError])
def testGetKafkaConfigFromArgsWrongSetValue(): Unit = {
val propertiesFile = prepareDefaultConfig()
KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "--override", "a=b=c")))
}
@Test(expected = classOf[ExitCalled])
@Test(expected = classOf[FatalExitError])
def testGetKafkaConfigFromArgsNonArgsAtTheEnd(): Unit = {
val propertiesFile = prepareDefaultConfig()
KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "--override", "broker.id=1", "broker.id=2")))
}
@Test(expected = classOf[ExitCalled])
@Test(expected = classOf[FatalExitError])
def testGetKafkaConfigFromArgsNonArgsOnly(): Unit = {
val propertiesFile = prepareDefaultConfig()
KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "broker.id=1", "broker.id=2")))
}
@Test(expected = classOf[ExitCalled])
@Test(expected = classOf[FatalExitError])
def testGetKafkaConfigFromArgsNonArgsAtTheBegging(): Unit = {
val propertiesFile = prepareDefaultConfig()
KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "broker.id=1", "--override", "broker.id=2")))

View File

@ -16,8 +16,7 @@
*/
package kafka.admin
import kafka.utils.CommandLineUtils
import kafka.utils.CommandLineUtils.ExitPolicy
import kafka.utils.Exit
import org.junit.Assert.assertTrue
import org.junit.{After, Before, Test}
import org.scalatest.junit.JUnitSuite
@ -26,14 +25,12 @@ class ReassignPartitionsCommandArgsTest extends JUnitSuite {
@Before
def setUp() {
CommandLineUtils.exitPolicy(new ExitPolicy {
override def exit(msg: String): Nothing = throw new IllegalArgumentException(msg)
})
Exit.setExitProcedure((_, message) => throw new IllegalArgumentException(message.orNull))
}
@After
def tearDown() {
CommandLineUtils.exitPolicy(CommandLineUtils.DEFAULT_EXIT_POLICY)
Exit.resetExitProcedure()
}
/**

View File

@ -18,6 +18,8 @@
package kafka.log
import java.nio._
import kafka.utils.Exit
import org.junit._
import org.scalatest.junit.JUnitSuite
import org.junit.Assert._
@ -73,7 +75,7 @@ object OffsetMapTest {
def main(args: Array[String]) {
if(args.length != 2) {
System.err.println("USAGE: java OffsetMapTest size load")
System.exit(1)
Exit.exit(1)
}
val test = new OffsetMapTest()
val size = args(0).toInt

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
package unit.kafka.server
package kafka.server
import org.apache.kafka.common.requests.ApiVersionsResponse
import org.apache.kafka.common.protocol.{ApiKeys, ProtoUtils, Protocol}

View File

@ -24,7 +24,6 @@ import org.junit.{After, Before, Test}
import kafka.zk.ZooKeeperTestHarness
import kafka.utils.TestUtils
import TestUtils._
import kafka.common._
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringSerializer

View File

@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.utils
import java.util.concurrent.{CountDownLatch, TimeUnit}
import org.apache.kafka.common.internals.FatalExitError
import org.junit.{After, Test}
import org.junit.Assert.{assertEquals, assertTrue}
class ShutdownableThreadTest {
@After
def tearDown(): Unit = Exit.resetExitProcedure()
@Test
def testShutdownWhenCalledAfterThreadStart(): Unit = {
@volatile var statusCodeOption: Option[Int] = None
Exit.setExitProcedure { (statusCode, _) =>
statusCodeOption = Some(statusCode)
// Sleep until interrupted to emulate the fact that `System.exit()` never returns
Thread.sleep(Long.MaxValue)
throw new AssertionError
}
val latch = new CountDownLatch(1)
val thread = new ShutdownableThread("shutdownable-thread-test") {
override def doWork: Unit = {
latch.countDown()
throw new FatalExitError
}
}
thread.start()
assertTrue("doWork was not invoked", latch.await(10, TimeUnit.SECONDS))
thread.shutdown()
TestUtils.waitUntilTrue(() => statusCodeOption.isDefined, "Status code was not set by exit procedure")
assertEquals(1, statusCodeOption.get)
}
}

View File

@ -247,7 +247,7 @@ object TestUtils extends Logging {
// create topic
AdminUtils.createTopic(zkUtils, topic, numPartitions, replicationFactor, topicConfig)
// wait until the update metadata request for new topic reaches all servers
(0 until numPartitions).map { case i =>
(0 until numPartitions).map { i =>
TestUtils.waitUntilMetadataIsPropagated(servers, topic, i)
i -> TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, i)
}.toMap
@ -876,7 +876,7 @@ object TestUtils extends Logging {
"Reassigned partition [%s,%d] is unavailable".format(topic, partitionToBeReassigned))
TestUtils.waitUntilTrue(() => {
val leaderBroker = servers.filter(s => s.config.brokerId == leader.get).head
leaderBroker.replicaManager.underReplicatedPartitionCount() == 0
leaderBroker.replicaManager.underReplicatedPartitionCount == 0
},
"Reassigned partition [%s,%d] is under-replicated as reported by the leader %d".format(topic, partitionToBeReassigned, leader.get))
}

View File

@ -21,6 +21,7 @@ import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.ForeachAction;
@ -56,7 +57,7 @@ public class ShutdownDeadlockTest {
streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(final Thread t, final Throwable e) {
System.exit(-1);
Exit.exit(1);
}
});

View File

@ -30,6 +30,7 @@ import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;
@ -169,7 +170,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
public void onCompletion(final RecordMetadata metadata, final Exception exception) {
if (exception != null) {
exception.printStackTrace();
System.exit(-1);
Exit.exit(1);
}
}
});

View File

@ -39,6 +39,7 @@ import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
@ -127,10 +128,10 @@ public class ClientCompatibilityTest {
} catch (ArgumentParserException e) {
if (args.length == 0) {
parser.printHelp();
System.exit(0);
Exit.exit(0);
} else {
parser.handleError(e);
System.exit(1);
Exit.exit(1);
}
}
TestConfig testConfig = new TestConfig(res);
@ -140,10 +141,10 @@ public class ClientCompatibilityTest {
} catch (Throwable t) {
System.out.printf("FAILED: Caught exception %s\n\n", t.getMessage());
t.printStackTrace();
System.exit(1);
Exit.exit(1);
}
System.out.println("SUCCESS.");
System.exit(0);
Exit.exit(0);
}
private static String toHexString(byte[] buf) {
@ -345,7 +346,7 @@ public class ClientCompatibilityTest {
} catch (RuntimeException e) {
System.out.println("The second message in this topic was not ours. Please use a new " +
"topic when running this program.");
System.exit(1);
Exit.exit(1);
}
} catch (RecordTooLargeException e) {
log.debug("Got RecordTooLargeException", e);

View File

@ -36,6 +36,7 @@ import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.ArgumentParserException;
import net.sourceforge.argparse4j.inf.Namespace;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
public class ProducerPerformance {
@ -129,10 +130,10 @@ public class ProducerPerformance {
} catch (ArgumentParserException e) {
if (args.length == 0) {
parser.printHelp();
System.exit(0);
Exit.exit(0);
} else {
parser.handleError(e);
System.exit(1);
Exit.exit(1);
}
}

View File

@ -40,6 +40,7 @@ import org.apache.kafka.clients.consumer.RoundRobinAssignor;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import java.io.Closeable;
@ -602,7 +603,7 @@ public class VerifiableConsumer implements Closeable, OffsetCommitCallback, Cons
ArgumentParser parser = argParser();
if (args.length == 0) {
parser.printHelp();
System.exit(0);
Exit.exit(0);
}
try {
@ -617,7 +618,7 @@ public class VerifiableConsumer implements Closeable, OffsetCommitCallback, Cons
consumer.run();
} catch (ArgumentParserException e) {
parser.handleError(e);
System.exit(1);
Exit.exit(1);
}
}

View File

@ -22,6 +22,7 @@ import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.ArgumentParserException;
import net.sourceforge.argparse4j.inf.Namespace;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kafka.common.utils.Exit;
import org.apache.log4j.Logger;
import org.apache.log4j.PropertyConfigurator;
@ -217,10 +218,10 @@ public class VerifiableLog4jAppender {
} catch (ArgumentParserException e) {
if (args.length == 0) {
parser.printHelp();
System.exit(0);
Exit.exit(0);
} else {
parser.handleError(e);
System.exit(1);
Exit.exit(1);
}
}

View File

@ -41,6 +41,7 @@ import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.ArgumentParserException;
import net.sourceforge.argparse4j.inf.Namespace;
import org.apache.kafka.common.utils.Exit;
/**
* Primarily intended for use with system testing, this producer prints metadata
@ -207,10 +208,10 @@ public class VerifiableProducer {
} catch (ArgumentParserException e) {
if (args.length == 0) {
parser.printHelp();
System.exit(0);
Exit.exit(0);
} else {
parser.handleError(e);
System.exit(1);
Exit.exit(1);
}
}