KAFKA-1046 Added support for Scala 2.10 builds while maintaining compatibility with 2.8.x; reviewed by Neha and Jun

This commit is contained in:
Christopher Freeman 2013-09-09 15:20:47 -07:00 committed by Neha Narkhede
parent da4512174b
commit c12d2ea9e5
63 changed files with 266 additions and 185 deletions

View File

@ -23,6 +23,7 @@ libraryDependencies ++= Seq(
libraryDependencies <<= (scalaVersion, libraryDependencies) { (sv, deps) => libraryDependencies <<= (scalaVersion, libraryDependencies) { (sv, deps) =>
deps :+ (sv match { deps :+ (sv match {
case "2.8.0" => "org.scalatest" % "scalatest" % "1.2" % "test" case "2.8.0" => "org.scalatest" % "scalatest" % "1.2" % "test"
case v if v.startsWith("2.10") => "org.scalatest" %% "scalatest" % "1.9.1" % "test"
case _ => "org.scalatest" %% "scalatest" % "1.8" % "test" case _ => "org.scalatest" %% "scalatest" % "1.8" % "test"
}) })
} }

View File

@ -47,7 +47,7 @@ object Kafka extends Logging {
kafkaServerStartble.awaitShutdown kafkaServerStartble.awaitShutdown
} }
catch { catch {
case e => fatal(e) case e: Throwable => fatal(e)
} }
System.exit(0) System.exit(0)
} }

View File

@ -68,7 +68,7 @@ object AddPartitionsCommand extends Logging {
addPartitions(zkClient, topic, nPartitions, replicaAssignmentStr) addPartitions(zkClient, topic, nPartitions, replicaAssignmentStr)
println("adding partitions succeeded!") println("adding partitions succeeded!")
} catch { } catch {
case e => case e: Throwable =>
println("adding partitions failed because of " + e.getMessage) println("adding partitions failed because of " + e.getMessage)
println(Utils.stackTrace(e)) println(Utils.stackTrace(e))
} finally { } finally {

View File

@ -90,7 +90,7 @@ object AdminUtils extends Logging {
debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionData)) debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionData))
} catch { } catch {
case e: ZkNodeExistsException => throw new TopicExistsException("topic %s already exists".format(topic)) case e: ZkNodeExistsException => throw new TopicExistsException("topic %s already exists".format(topic))
case e2 => throw new AdministrationException(e2.toString) case e2: Throwable => throw new AdministrationException(e2.toString)
} }
} }

View File

@ -74,7 +74,7 @@ object CreateTopicCommand extends Logging {
createTopic(zkClient, topic, nPartitions, replicationFactor, replicaAssignmentStr) createTopic(zkClient, topic, nPartitions, replicationFactor, replicaAssignmentStr)
println("creation succeeded!") println("creation succeeded!")
} catch { } catch {
case e => case e: Throwable =>
println("creation failed because of " + e.getMessage) println("creation failed because of " + e.getMessage)
println(Utils.stackTrace(e)) println(Utils.stackTrace(e))
} finally { } finally {

View File

@ -54,7 +54,7 @@ object DeleteTopicCommand {
println("deletion succeeded!") println("deletion succeeded!")
} }
catch { catch {
case e => case e: Throwable =>
println("delection failed because of " + e.getMessage) println("delection failed because of " + e.getMessage)
println(Utils.stackTrace(e)) println(Utils.stackTrace(e))
} }

View File

@ -72,7 +72,7 @@ object ListTopicCommand {
showTopic(t, zkClient, reportUnderReplicatedPartitions, reportUnavailablePartitions, liveBrokers) showTopic(t, zkClient, reportUnderReplicatedPartitions, reportUnavailablePartitions, liveBrokers)
} }
catch { catch {
case e => case e: Throwable =>
println("list topic failed because of " + e.getMessage) println("list topic failed because of " + e.getMessage)
println(Utils.stackTrace(e)) println(Utils.stackTrace(e))
} }

View File

@ -60,7 +60,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
preferredReplicaElectionCommand.moveLeaderToPreferredReplica() preferredReplicaElectionCommand.moveLeaderToPreferredReplica()
println("Successfully started preferred replica election for partitions %s".format(partitionsForPreferredReplicaElection)) println("Successfully started preferred replica election for partitions %s".format(partitionsForPreferredReplicaElection))
} catch { } catch {
case e => case e: Throwable =>
println("Failed to start preferred replica election") println("Failed to start preferred replica election")
println(Utils.stackTrace(e)) println(Utils.stackTrace(e))
} finally { } finally {
@ -104,7 +104,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
val partitionsUndergoingPreferredReplicaElection = parsePreferredReplicaElectionData(ZkUtils.readData(zkClient, zkPath)._1) val partitionsUndergoingPreferredReplicaElection = parsePreferredReplicaElectionData(ZkUtils.readData(zkClient, zkPath)._1)
throw new AdministrationException("Preferred replica leader election currently in progress for " + throw new AdministrationException("Preferred replica leader election currently in progress for " +
"%s. Aborting operation".format(partitionsUndergoingPreferredReplicaElection)) "%s. Aborting operation".format(partitionsUndergoingPreferredReplicaElection))
case e2 => throw new AdministrationException(e2.toString) case e2: Throwable => throw new AdministrationException(e2.toString)
} }
} }
} }
@ -116,7 +116,7 @@ class PreferredReplicaLeaderElectionCommand(zkClient: ZkClient, partitions: scal
val validPartitions = partitions.filter(p => validatePartition(zkClient, p.topic, p.partition)) val validPartitions = partitions.filter(p => validatePartition(zkClient, p.topic, p.partition))
PreferredReplicaLeaderElectionCommand.writePreferredReplicaElectionData(zkClient, validPartitions) PreferredReplicaLeaderElectionCommand.writePreferredReplicaElectionData(zkClient, validPartitions)
} catch { } catch {
case e => throw new AdminCommandFailedException("Admin command failed", e) case e: Throwable => throw new AdminCommandFailedException("Admin command failed", e)
} }
} }

View File

@ -119,7 +119,7 @@ object ReassignPartitionsCommand extends Logging {
"The replica assignment is \n" + partitionsToBeReassigned.toString()) "The replica assignment is \n" + partitionsToBeReassigned.toString())
} }
} catch { } catch {
case e => case e: Throwable =>
println("Partitions reassignment failed due to " + e.getMessage) println("Partitions reassignment failed due to " + e.getMessage)
println(Utils.stackTrace(e)) println(Utils.stackTrace(e))
} finally { } finally {
@ -142,7 +142,7 @@ class ReassignPartitionsCommand(zkClient: ZkClient, partitions: collection.Map[T
val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient) val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient)
throw new AdminCommandFailedException("Partition reassignment currently in " + throw new AdminCommandFailedException("Partition reassignment currently in " +
"progress for %s. Aborting operation".format(partitionsBeingReassigned)) "progress for %s. Aborting operation".format(partitionsBeingReassigned))
case e => error("Admin command failed", e); false case e: Throwable => error("Admin command failed", e); false
} }
} }

View File

@ -54,7 +54,7 @@ object ClientUtils extends Logging{
fetchMetaDataSucceeded = true fetchMetaDataSucceeded = true
} }
catch { catch {
case e => case e: Throwable =>
warn("Fetching topic metadata with correlation id %d for topics [%s] from broker [%s] failed" warn("Fetching topic metadata with correlation id %d for topics [%s] from broker [%s] failed"
.format(correlationId, topics, shuffledBrokers(i).toString), e) .format(correlationId, topics, shuffledBrokers(i).toString), e)
t = e t = e

View File

@ -42,7 +42,7 @@ private[kafka] object Broker {
throw new BrokerNotAvailableException("Broker id %d does not exist".format(id)) throw new BrokerNotAvailableException("Broker id %d does not exist".format(id))
} }
} catch { } catch {
case t => throw new KafkaException("Failed to parse the broker info from zookeeper: " + brokerInfoString, t) case t: Throwable => throw new KafkaException("Failed to parse the broker info from zookeeper: " + brokerInfoString, t)
} }
} }

View File

@ -204,7 +204,7 @@ object ConsoleConsumer extends Logging {
formatter.writeTo(messageAndTopic.key, messageAndTopic.message, System.out) formatter.writeTo(messageAndTopic.key, messageAndTopic.message, System.out)
numMessages += 1 numMessages += 1
} catch { } catch {
case e => case e: Throwable =>
if (skipMessageOnError) if (skipMessageOnError)
error("Error processing message, skipping this message: ", e) error("Error processing message, skipping this message: ", e)
else else
@ -220,7 +220,7 @@ object ConsoleConsumer extends Logging {
} }
} }
} catch { } catch {
case e => error("Error processing message, stopping consumer: ", e) case e: Throwable => error("Error processing message, stopping consumer: ", e)
} }
System.err.println("Consumed %d messages".format(numMessages)) System.err.println("Consumed %d messages".format(numMessages))
System.out.flush() System.out.flush()
@ -247,7 +247,7 @@ object ConsoleConsumer extends Logging {
zk.deleteRecursive(dir) zk.deleteRecursive(dir)
zk.close() zk.close()
} catch { } catch {
case _ => // swallow case _: Throwable => // swallow
} }
} }
} }

View File

@ -79,7 +79,7 @@ class ConsumerFetcherManager(private val consumerIdString: String,
} }
} }
} catch { } catch {
case t => { case t: Throwable => {
if (!isRunning.get()) if (!isRunning.get())
throw t /* If this thread is stopped, propagate this exception to kill the thread. */ throw t /* If this thread is stopped, propagate this exception to kill the thread. */
else else
@ -95,7 +95,7 @@ class ConsumerFetcherManager(private val consumerIdString: String,
try { try {
addFetcher(topicAndPartition.topic, topicAndPartition.partition, pti.getFetchOffset(), leaderBroker) addFetcher(topicAndPartition.topic, topicAndPartition.partition, pti.getFetchOffset(), leaderBroker)
} catch { } catch {
case t => { case t: Throwable => {
if (!isRunning.get()) if (!isRunning.get())
throw t /* If this thread is stopped, propagate this exception to kill the thread. */ throw t /* If this thread is stopped, propagate this exception to kill the thread. */
else { else {

View File

@ -84,7 +84,7 @@ class SimpleConsumer(val host: String,
disconnect() disconnect()
throw ioe throw ioe
} }
case e => throw e case e: Throwable => throw e
} }
response response
} }

View File

@ -67,7 +67,7 @@ private[kafka] object TopicCount extends Logging {
case None => throw new KafkaException("error constructing TopicCount : " + topicCountString) case None => throw new KafkaException("error constructing TopicCount : " + topicCountString)
} }
} catch { } catch {
case e => case e: Throwable =>
error("error parsing consumer json string " + topicCountString, e) error("error parsing consumer json string " + topicCountString, e)
throw e throw e
} }

View File

@ -175,7 +175,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
zkClient = null zkClient = null
} }
} catch { } catch {
case e => case e: Throwable =>
fatal("error during consumer connector shutdown", e) fatal("error during consumer connector shutdown", e)
} }
info("ZKConsumerConnector shut down completed") info("ZKConsumerConnector shut down completed")
@ -332,7 +332,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
if (doRebalance) if (doRebalance)
syncedRebalance syncedRebalance
} catch { } catch {
case t => error("error during syncedRebalance", t) case t: Throwable => error("error during syncedRebalance", t)
} }
} }
info("stopping watcher executor thread for consumer " + consumerIdString) info("stopping watcher executor thread for consumer " + consumerIdString)
@ -384,7 +384,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
cluster = getCluster(zkClient) cluster = getCluster(zkClient)
done = rebalance(cluster) done = rebalance(cluster)
} catch { } catch {
case e => case e: Throwable =>
/** occasionally, we may hit a ZK exception because the ZK state is changing while we are iterating. /** occasionally, we may hit a ZK exception because the ZK state is changing while we are iterating.
* For example, a ZK node can disappear between the time we get all children and the time we try to get * For example, a ZK node can disappear between the time we get all children and the time we try to get
* the value of a child. Just let this go since another rebalance will be triggered. * the value of a child. Just let this go since another rebalance will be triggered.
@ -461,7 +461,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
" for topic " + topic + " with consumers: " + curConsumers) " for topic " + topic + " with consumers: " + curConsumers)
for (consumerThreadId <- consumerThreadIdSet) { for (consumerThreadId <- consumerThreadIdSet) {
val myConsumerPosition = curConsumers.findIndexOf(_ == consumerThreadId) val myConsumerPosition = curConsumers.indexOf(consumerThreadId)
assert(myConsumerPosition >= 0) assert(myConsumerPosition >= 0)
val startPart = nPartsPerConsumer*myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart) val startPart = nPartsPerConsumer*myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart)
val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1) val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1)
@ -581,7 +581,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
// The node hasn't been deleted by the original owner. So wait a bit and retry. // The node hasn't been deleted by the original owner. So wait a bit and retry.
info("waiting for the partition ownership to be deleted: " + partition) info("waiting for the partition ownership to be deleted: " + partition)
false false
case e2 => throw e2 case e2: Throwable => throw e2
} }
} }
val hasPartitionOwnershipFailed = partitionOwnershipSuccessful.foldLeft(0)((sum, decision) => sum + (if(decision) 0 else 1)) val hasPartitionOwnershipFailed = partitionOwnershipSuccessful.foldLeft(0)((sum, decision) => sum + (if(decision) 0 else 1))

View File

@ -75,7 +75,7 @@ class ZookeeperTopicEventWatcher(val config:ConsumerConfig,
} }
} }
catch { catch {
case e => case e: Throwable =>
error("error in handling child changes", e) error("error in handling child changes", e)
} }
} }

View File

@ -93,7 +93,7 @@ class ControllerChannelManager (private val controllerContext: ControllerContext
brokerStateInfo(brokerId).requestSendThread.shutdown() brokerStateInfo(brokerId).requestSendThread.shutdown()
brokerStateInfo.remove(brokerId) brokerStateInfo.remove(brokerId)
}catch { }catch {
case e => error("Error while removing broker by the controller", e) case e: Throwable => error("Error while removing broker by the controller", e)
} }
} }
@ -142,7 +142,7 @@ class RequestSendThread(val controllerId: Int,
} }
} }
} catch { } catch {
case e => case e: Throwable =>
warn("Controller %d fails to send a request to broker %d".format(controllerId, toBrokerId), e) warn("Controller %d fails to send a request to broker %d".format(controllerId, toBrokerId), e)
// If there is any socket error (eg, socket timeout), the channel is no longer usable and needs to be recreated. // If there is any socket error (eg, socket timeout), the channel is no longer usable and needs to be recreated.
channel.disconnect() channel.disconnect()

View File

@ -89,14 +89,14 @@ object KafkaController extends Logging {
case None => throw new KafkaException("Failed to parse the controller info json [%s].".format(controllerInfoString)) case None => throw new KafkaException("Failed to parse the controller info json [%s].".format(controllerInfoString))
} }
} catch { } catch {
case t => case t: Throwable =>
// It may be due to an incompatible controller register version // It may be due to an incompatible controller register version
warn("Failed to parse the controller info as json. " warn("Failed to parse the controller info as json. "
+ "Probably this controller is still using the old format [%s] to store the broker id in zookeeper".format(controllerInfoString)) + "Probably this controller is still using the old format [%s] to store the broker id in zookeeper".format(controllerInfoString))
try { try {
return controllerInfoString.toInt return controllerInfoString.toInt
} catch { } catch {
case t => throw new KafkaException("Failed to parse the controller info: " + controllerInfoString + ". This is neither the new or the old format.", t) case t: Throwable => throw new KafkaException("Failed to parse the controller info: " + controllerInfoString + ". This is neither the new or the old format.", t)
} }
} }
} }
@ -436,7 +436,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
.format(topicAndPartition)) .format(topicAndPartition))
} }
} catch { } catch {
case e => error("Error completing reassignment of partition %s".format(topicAndPartition), e) case e: Throwable => error("Error completing reassignment of partition %s".format(topicAndPartition), e)
// remove the partition from the admin path to unblock the admin client // remove the partition from the admin path to unblock the admin client
removePartitionFromReassignedPartitions(topicAndPartition) removePartitionFromReassignedPartitions(topicAndPartition)
} }
@ -448,7 +448,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
controllerContext.partitionsUndergoingPreferredReplicaElection ++= partitions controllerContext.partitionsUndergoingPreferredReplicaElection ++= partitions
partitionStateMachine.handleStateChanges(partitions, OnlinePartition, preferredReplicaPartitionLeaderSelector) partitionStateMachine.handleStateChanges(partitions, OnlinePartition, preferredReplicaPartitionLeaderSelector)
} catch { } catch {
case e => error("Error completing preferred replica leader election for partitions %s".format(partitions.mkString(",")), e) case e: Throwable => error("Error completing preferred replica leader election for partitions %s".format(partitions.mkString(",")), e)
} finally { } finally {
removePartitionsFromPreferredReplicaElection(partitions) removePartitionsFromPreferredReplicaElection(partitions)
} }
@ -514,9 +514,9 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
} catch { } catch {
case e: ZkNodeExistsException => throw new ControllerMovedException("Controller moved to another broker. " + case e: ZkNodeExistsException => throw new ControllerMovedException("Controller moved to another broker. " +
"Aborting controller startup procedure") "Aborting controller startup procedure")
case oe => error("Error while incrementing controller epoch", oe) case oe: Throwable => error("Error while incrementing controller epoch", oe)
} }
case oe => error("Error while incrementing controller epoch", oe) case oe: Throwable => error("Error while incrementing controller epoch", oe)
} }
info("Controller %d incremented epoch to %d".format(config.brokerId, controllerContext.epoch)) info("Controller %d incremented epoch to %d".format(config.brokerId, controllerContext.epoch))
@ -693,7 +693,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionMap)) debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionMap))
} catch { } catch {
case e: ZkNoNodeException => throw new IllegalStateException("Topic %s doesn't exist".format(topicAndPartition.topic)) case e: ZkNoNodeException => throw new IllegalStateException("Topic %s doesn't exist".format(topicAndPartition.topic))
case e2 => throw new KafkaException(e2.toString) case e2: Throwable => throw new KafkaException(e2.toString)
} }
} }
@ -905,7 +905,7 @@ class ReassignedPartitionsIsrChangeListener(controller: KafkaController, topic:
} }
} }
}catch { }catch {
case e => error("Error while handling partition reassignment", e) case e: Throwable => error("Error while handling partition reassignment", e)
} }
} }

View File

@ -17,7 +17,8 @@
package kafka.controller package kafka.controller
import collection._ import collection._
import collection.JavaConversions._ import collection.JavaConversions
import collection.mutable.Buffer
import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicBoolean
import kafka.api.LeaderAndIsr import kafka.api.LeaderAndIsr
import kafka.common.{LeaderElectionNotNeededException, TopicAndPartition, StateChangeFailedException, NoReplicaOnlineException} import kafka.common.{LeaderElectionNotNeededException, TopicAndPartition, StateChangeFailedException, NoReplicaOnlineException}
@ -91,7 +92,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
} }
brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement) brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement)
} catch { } catch {
case e => error("Error while moving some partitions to the online state", e) case e: Throwable => error("Error while moving some partitions to the online state", e)
// TODO: It is not enough to bail out and log an error, it is important to trigger leader election for those partitions // TODO: It is not enough to bail out and log an error, it is important to trigger leader election for those partitions
} }
} }
@ -111,7 +112,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
} }
brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement) brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement)
}catch { }catch {
case e => error("Error while moving some partitions to %s state".format(targetState), e) case e: Throwable => error("Error while moving some partitions to %s state".format(targetState), e)
// TODO: It is not enough to bail out and log an error, it is important to trigger state changes for those partitions // TODO: It is not enough to bail out and log an error, it is important to trigger state changes for those partitions
} }
} }
@ -321,7 +322,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
} catch { } catch {
case lenne: LeaderElectionNotNeededException => // swallow case lenne: LeaderElectionNotNeededException => // swallow
case nroe: NoReplicaOnlineException => throw nroe case nroe: NoReplicaOnlineException => throw nroe
case sce => case sce: Throwable =>
val failMsg = "encountered error while electing leader for partition %s due to: %s.".format(topicAndPartition, sce.getMessage) val failMsg = "encountered error while electing leader for partition %s due to: %s.".format(topicAndPartition, sce.getMessage)
stateChangeLogger.error("Controller %d epoch %d ".format(controllerId, controller.epoch) + failMsg) stateChangeLogger.error("Controller %d epoch %d ".format(controllerId, controller.epoch) + failMsg)
throw new StateChangeFailedException(failMsg, sce) throw new StateChangeFailedException(failMsg, sce)
@ -359,8 +360,11 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
controllerContext.controllerLock synchronized { controllerContext.controllerLock synchronized {
if (hasStarted.get) { if (hasStarted.get) {
try { try {
debug("Topic change listener fired for path %s with children %s".format(parentPath, children.mkString(","))) val currentChildren = {
val currentChildren = JavaConversions.asBuffer(children).toSet import JavaConversions._
debug("Topic change listener fired for path %s with children %s".format(parentPath, children.mkString(",")))
(children: Buffer[String]).toSet
}
val newTopics = currentChildren -- controllerContext.allTopics val newTopics = currentChildren -- controllerContext.allTopics
val deletedTopics = controllerContext.allTopics -- currentChildren val deletedTopics = controllerContext.allTopics -- currentChildren
// val deletedPartitionReplicaAssignment = replicaAssignment.filter(p => deletedTopics.contains(p._1._1)) // val deletedPartitionReplicaAssignment = replicaAssignment.filter(p => deletedTopics.contains(p._1._1))
@ -375,7 +379,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
if(newTopics.size > 0) if(newTopics.size > 0)
controller.onNewTopicCreation(newTopics, addedPartitionReplicaAssignment.keySet.toSet) controller.onNewTopicCreation(newTopics, addedPartitionReplicaAssignment.keySet.toSet)
} catch { } catch {
case e => error("Error while handling new topic", e ) case e: Throwable => error("Error while handling new topic", e )
} }
// TODO: kafka-330 Handle deleted topics // TODO: kafka-330 Handle deleted topics
} }
@ -399,7 +403,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
info("New partitions to be added [%s]".format(partitionsRemainingToBeAdded)) info("New partitions to be added [%s]".format(partitionsRemainingToBeAdded))
controller.onNewPartitionCreation(partitionsRemainingToBeAdded.keySet.toSet) controller.onNewPartitionCreation(partitionsRemainingToBeAdded.keySet.toSet)
} catch { } catch {
case e => error("Error while handling add partitions for data path " + dataPath, e ) case e: Throwable => error("Error while handling add partitions for data path " + dataPath, e )
} }
} }
} }

View File

@ -89,7 +89,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
replicas.foreach(r => handleStateChange(r.topic, r.partition, r.replica, targetState)) replicas.foreach(r => handleStateChange(r.topic, r.partition, r.replica, targetState))
brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement) brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement)
}catch { }catch {
case e => error("Error while moving some replicas to %s state".format(targetState), e) case e: Throwable => error("Error while moving some replicas to %s state".format(targetState), e)
} }
} }
@ -273,7 +273,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
if(deadBrokerIds.size > 0) if(deadBrokerIds.size > 0)
controller.onBrokerFailure(deadBrokerIds.toSeq) controller.onBrokerFailure(deadBrokerIds.toSeq)
} catch { } catch {
case e => error("Error while handling broker changes", e) case e: Throwable => error("Error while handling broker changes", e)
} }
} }
} }

View File

@ -17,10 +17,10 @@
package kafka.javaapi package kafka.javaapi
import scala.collection.JavaConversions
import java.nio.ByteBuffer import java.nio.ByteBuffer
import kafka.common.TopicAndPartition import kafka.common.TopicAndPartition
import kafka.api.{Request, PartitionFetchInfo} import kafka.api.{Request, PartitionFetchInfo}
import scala.collection.mutable
class FetchRequest(correlationId: Int, class FetchRequest(correlationId: Int,
clientId: String, clientId: String,
@ -28,8 +28,10 @@ class FetchRequest(correlationId: Int,
minBytes: Int, minBytes: Int,
requestInfo: java.util.Map[TopicAndPartition, PartitionFetchInfo]) { requestInfo: java.util.Map[TopicAndPartition, PartitionFetchInfo]) {
import scala.collection.JavaConversions._
val underlying = { val underlying = {
val scalaMap = JavaConversions.asMap(requestInfo).toMap val scalaMap = (requestInfo: mutable.Map[TopicAndPartition, PartitionFetchInfo]).toMap
kafka.api.FetchRequest( kafka.api.FetchRequest(
correlationId = correlationId, correlationId = correlationId,
clientId = clientId, clientId = clientId,

View File

@ -40,4 +40,10 @@ private[javaapi] object Implicits extends Logging {
case None => null.asInstanceOf[T] case None => null.asInstanceOf[T]
} }
} }
// used explicitly by ByteBufferMessageSet constructor as due to SI-4141 which affects Scala 2.8.1, implicits are not visible in constructors
implicit def javaListToScalaBuffer[A](l: java.util.List[A]) = {
import scala.collection.JavaConversions._
l: collection.mutable.Buffer[A]
}
} }

View File

@ -19,7 +19,7 @@ package kafka.javaapi
import kafka.common.TopicAndPartition import kafka.common.TopicAndPartition
import kafka.api.{Request, PartitionOffsetRequestInfo} import kafka.api.{Request, PartitionOffsetRequestInfo}
import collection.JavaConversions import scala.collection.mutable
import java.nio.ByteBuffer import java.nio.ByteBuffer
@ -28,7 +28,8 @@ class OffsetRequest(requestInfo: java.util.Map[TopicAndPartition, PartitionOffse
clientId: String) { clientId: String) {
val underlying = { val underlying = {
val scalaMap = JavaConversions.asMap(requestInfo).toMap import collection.JavaConversions._
val scalaMap = (requestInfo: mutable.Map[TopicAndPartition, PartitionOffsetRequestInfo]).toMap
kafka.api.OffsetRequest( kafka.api.OffsetRequest(
requestInfo = scalaMap, requestInfo = scalaMap,
versionId = versionId, versionId = versionId,

View File

@ -17,16 +17,20 @@
package kafka.javaapi package kafka.javaapi
import kafka.cluster.Broker import kafka.cluster.Broker
import scala.collection.JavaConversions.asList import scala.collection.JavaConversions
private[javaapi] object MetadataListImplicits { private[javaapi] object MetadataListImplicits {
implicit def toJavaTopicMetadataList(topicMetadataSeq: Seq[kafka.api.TopicMetadata]): implicit def toJavaTopicMetadataList(topicMetadataSeq: Seq[kafka.api.TopicMetadata]):
java.util.List[kafka.javaapi.TopicMetadata] = java.util.List[kafka.javaapi.TopicMetadata] = {
asList(topicMetadataSeq.map(new kafka.javaapi.TopicMetadata(_))) import JavaConversions._
topicMetadataSeq.map(new kafka.javaapi.TopicMetadata(_))
}
implicit def toPartitionMetadataList(partitionMetadataSeq: Seq[kafka.api.PartitionMetadata]): implicit def toPartitionMetadataList(partitionMetadataSeq: Seq[kafka.api.PartitionMetadata]):
java.util.List[kafka.javaapi.PartitionMetadata] = java.util.List[kafka.javaapi.PartitionMetadata] = {
asList(partitionMetadataSeq.map(new kafka.javaapi.PartitionMetadata(_))) import JavaConversions._
partitionMetadataSeq.map(new kafka.javaapi.PartitionMetadata(_))
}
} }
class TopicMetadata(private val underlying: kafka.api.TopicMetadata) { class TopicMetadata(private val underlying: kafka.api.TopicMetadata) {
@ -51,9 +55,15 @@ class PartitionMetadata(private val underlying: kafka.api.PartitionMetadata) {
underlying.leader underlying.leader
} }
def replicas: java.util.List[Broker] = asList(underlying.replicas) def replicas: java.util.List[Broker] = {
import JavaConversions._
underlying.replicas
}
def isr: java.util.List[Broker] = asList(underlying.isr) def isr: java.util.List[Broker] = {
import JavaConversions._
underlying.isr
}
def errorCode: Short = underlying.errorCode def errorCode: Short = underlying.errorCode

View File

@ -18,7 +18,7 @@ package kafka.javaapi
import kafka.api._ import kafka.api._
import java.nio.ByteBuffer import java.nio.ByteBuffer
import scala.collection.JavaConversions import scala.collection.mutable
class TopicMetadataRequest(val versionId: Short, class TopicMetadataRequest(val versionId: Short,
override val correlationId: Int, override val correlationId: Int,
@ -26,8 +26,10 @@ class TopicMetadataRequest(val versionId: Short,
val topics: java.util.List[String]) val topics: java.util.List[String])
extends RequestOrResponse(Some(kafka.api.RequestKeys.MetadataKey), correlationId) { extends RequestOrResponse(Some(kafka.api.RequestKeys.MetadataKey), correlationId) {
val underlying: kafka.api.TopicMetadataRequest = val underlying: kafka.api.TopicMetadataRequest = {
new kafka.api.TopicMetadataRequest(versionId, correlationId, clientId, JavaConversions.asBuffer(topics)) import scala.collection.JavaConversions._
new kafka.api.TopicMetadataRequest(versionId, correlationId, clientId, topics: mutable.Buffer[String])
}
def this(topics: java.util.List[String]) = def this(topics: java.util.List[String]) =
this(kafka.api.TopicMetadataRequest.CurrentVersion, 0, kafka.api.TopicMetadataRequest.DefaultClientId, topics) this(kafka.api.TopicMetadataRequest.CurrentVersion, 0, kafka.api.TopicMetadataRequest.DefaultClientId, topics)

View File

@ -18,7 +18,8 @@ package kafka.javaapi.consumer
import kafka.serializer._ import kafka.serializer._
import kafka.consumer._ import kafka.consumer._
import scala.collection.JavaConversions.asList import scala.collection.mutable
import scala.collection.JavaConversions
/** /**
@ -71,9 +72,11 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
keyDecoder: Decoder[K], keyDecoder: Decoder[K],
valueDecoder: Decoder[V]) valueDecoder: Decoder[V])
: java.util.Map[String,java.util.List[KafkaStream[K,V]]] = { : java.util.Map[String,java.util.List[KafkaStream[K,V]]] = {
import scala.collection.JavaConversions._
val scalaTopicCountMap: Map[String, Int] = Map.empty[String, Int] ++ asMap(topicCountMap.asInstanceOf[java.util.Map[String, Int]]) val scalaTopicCountMap: Map[String, Int] = {
import JavaConversions._
Map.empty[String, Int] ++ (topicCountMap.asInstanceOf[java.util.Map[String, Int]]: mutable.Map[String, Int])
}
val scalaReturn = underlying.consume(scalaTopicCountMap, keyDecoder, valueDecoder) val scalaReturn = underlying.consume(scalaTopicCountMap, keyDecoder, valueDecoder)
val ret = new java.util.HashMap[String,java.util.List[KafkaStream[K,V]]] val ret = new java.util.HashMap[String,java.util.List[KafkaStream[K,V]]]
for ((topic, streams) <- scalaReturn) { for ((topic, streams) <- scalaReturn) {
@ -88,8 +91,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
def createMessageStreams(topicCountMap: java.util.Map[String,java.lang.Integer]): java.util.Map[String,java.util.List[KafkaStream[Array[Byte],Array[Byte]]]] = def createMessageStreams(topicCountMap: java.util.Map[String,java.lang.Integer]): java.util.Map[String,java.util.List[KafkaStream[Array[Byte],Array[Byte]]]] =
createMessageStreams(topicCountMap, new DefaultDecoder(), new DefaultDecoder()) createMessageStreams(topicCountMap, new DefaultDecoder(), new DefaultDecoder())
def createMessageStreamsByFilter[K,V](topicFilter: TopicFilter, numStreams: Int, keyDecoder: Decoder[K], valueDecoder: Decoder[V]) = def createMessageStreamsByFilter[K,V](topicFilter: TopicFilter, numStreams: Int, keyDecoder: Decoder[K], valueDecoder: Decoder[V]) = {
asList(underlying.createMessageStreamsByFilter(topicFilter, numStreams, keyDecoder, valueDecoder)) import JavaConversions._
underlying.createMessageStreamsByFilter(topicFilter, numStreams, keyDecoder, valueDecoder)
}
def createMessageStreamsByFilter(topicFilter: TopicFilter, numStreams: Int) = def createMessageStreamsByFilter(topicFilter: TopicFilter, numStreams: Int) =
createMessageStreamsByFilter(topicFilter, numStreams, new DefaultDecoder(), new DefaultDecoder()) createMessageStreamsByFilter(topicFilter, numStreams, new DefaultDecoder(), new DefaultDecoder())

View File

@ -20,12 +20,14 @@ import java.util.concurrent.atomic.AtomicLong
import scala.reflect.BeanProperty import scala.reflect.BeanProperty
import java.nio.ByteBuffer import java.nio.ByteBuffer
import kafka.message._ import kafka.message._
import kafka.javaapi.Implicits.javaListToScalaBuffer
class ByteBufferMessageSet(@BeanProperty val buffer: ByteBuffer) extends MessageSet { class ByteBufferMessageSet(@BeanProperty val buffer: ByteBuffer) extends MessageSet {
private val underlying: kafka.message.ByteBufferMessageSet = new kafka.message.ByteBufferMessageSet(buffer) private val underlying: kafka.message.ByteBufferMessageSet = new kafka.message.ByteBufferMessageSet(buffer)
def this(compressionCodec: CompressionCodec, messages: java.util.List[Message]) { def this(compressionCodec: CompressionCodec, messages: java.util.List[Message]) {
this(new kafka.message.ByteBufferMessageSet(compressionCodec, new AtomicLong(0), scala.collection.JavaConversions.asBuffer(messages): _*).buffer) // due to SI-4141 which affects Scala 2.8.1, implicits are not visible in constructors and must be used explicitly
this(new kafka.message.ByteBufferMessageSet(compressionCodec, new AtomicLong(0), javaListToScalaBuffer(messages).toSeq : _*).buffer)
} }
def this(messages: java.util.List[Message]) { def this(messages: java.util.List[Message]) {

View File

@ -19,6 +19,7 @@ package kafka.javaapi.producer
import kafka.producer.ProducerConfig import kafka.producer.ProducerConfig
import kafka.producer.KeyedMessage import kafka.producer.KeyedMessage
import scala.collection.mutable
class Producer[K,V](private val underlying: kafka.producer.Producer[K,V]) // for testing only class Producer[K,V](private val underlying: kafka.producer.Producer[K,V]) // for testing only
{ {
@ -38,7 +39,7 @@ class Producer[K,V](private val underlying: kafka.producer.Producer[K,V]) // for
*/ */
def send(messages: java.util.List[KeyedMessage[K,V]]) { def send(messages: java.util.List[KeyedMessage[K,V]]) {
import collection.JavaConversions._ import collection.JavaConversions._
underlying.send(asBuffer(messages):_*) underlying.send((messages: mutable.Buffer[KeyedMessage[K,V]]).toSeq: _*)
} }
/** /**

View File

@ -318,7 +318,7 @@ private[kafka] class LogManager(val config: KafkaConfig,
if(timeSinceLastFlush >= logFlushInterval) if(timeSinceLastFlush >= logFlushInterval)
log.flush log.flush
} catch { } catch {
case e => case e: Throwable =>
error("Error flushing topic " + log.topicName, e) error("Error flushing topic " + log.topicName, e)
e match { e match {
case _: IOException => case _: IOException =>

View File

@ -82,7 +82,7 @@ private[kafka] class BoundedByteBufferReceive(val maxSize: Int) extends Receive
case e: OutOfMemoryError => case e: OutOfMemoryError =>
error("OOME with size " + size, e) error("OOME with size " + size, e)
throw e throw e
case e2 => case e2: Throwable =>
throw e2 throw e2
} }
buffer buffer

View File

@ -79,7 +79,7 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
// no way to tell if write succeeded. Disconnect and re-throw exception to let client handle retry // no way to tell if write succeeded. Disconnect and re-throw exception to let client handle retry
disconnect() disconnect()
throw e throw e
case e => throw e case e: Throwable => throw e
} }
response response
} }

View File

@ -129,7 +129,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
else else
serializedMessages += KeyedMessage[K,Message](topic = e.topic, key = null.asInstanceOf[K], message = new Message(bytes = encoder.toBytes(e.message))) serializedMessages += KeyedMessage[K,Message](topic = e.topic, key = null.asInstanceOf[K], message = new Message(bytes = encoder.toBytes(e.message)))
} catch { } catch {
case t => case t: Throwable =>
producerStats.serializationErrorRate.mark() producerStats.serializationErrorRate.mark()
if (isSync) { if (isSync) {
throw t throw t
@ -178,7 +178,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
}catch { // Swallow recoverable exceptions and return None so that they can be retried. }catch { // Swallow recoverable exceptions and return None so that they can be retried.
case ute: UnknownTopicOrPartitionException => warn("Failed to collate messages by topic,partition due to: " + ute.getMessage); None case ute: UnknownTopicOrPartitionException => warn("Failed to collate messages by topic,partition due to: " + ute.getMessage); None
case lnae: LeaderNotAvailableException => warn("Failed to collate messages by topic,partition due to: " + lnae.getMessage); None case lnae: LeaderNotAvailableException => warn("Failed to collate messages by topic,partition due to: " + lnae.getMessage); None
case oe => error("Failed to collate messages by topic, partition due to: " + oe.getMessage); None case oe: Throwable => error("Failed to collate messages by topic, partition due to: " + oe.getMessage); None
} }
} }

View File

@ -43,7 +43,7 @@ class ProducerSendThread[K,V](val threadName: String,
try { try {
processEvents processEvents
}catch { }catch {
case e => error("Error in sending events: ", e) case e: Throwable => error("Error in sending events: ", e)
}finally { }finally {
shutdownLatch.countDown shutdownLatch.countDown
} }
@ -103,7 +103,7 @@ class ProducerSendThread[K,V](val threadName: String,
if(size > 0) if(size > 0)
handler.handle(events) handler.handle(events)
}catch { }catch {
case e => error("Error in handling batch of " + size + " events", e) case e: Throwable => error("Error in handling batch of " + size + " events", e)
} }
} }

View File

@ -95,7 +95,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
trace("issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest)) trace("issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest))
response = simpleConsumer.fetch(fetchRequest) response = simpleConsumer.fetch(fetchRequest)
} catch { } catch {
case t => case t: Throwable =>
if (isRunning.get) { if (isRunning.get) {
warn("Error in fetch %s".format(fetchRequest), t) warn("Error in fetch %s".format(fetchRequest), t)
partitionMapLock synchronized { partitionMapLock synchronized {
@ -136,7 +136,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
// 2. If the message is corrupt due to a transient state in the log (truncation, partial writes can cause this), we simply continue and // 2. If the message is corrupt due to a transient state in the log (truncation, partial writes can cause this), we simply continue and
// should get fixed in the subsequent fetches // should get fixed in the subsequent fetches
logger.warn("Found invalid messages during fetch for partition [" + topic + "," + partitionId + "] offset " + currentOffset.get + " error " + ime.getMessage) logger.warn("Found invalid messages during fetch for partition [" + topic + "," + partitionId + "] offset " + currentOffset.get + " error " + ime.getMessage)
case e => case e: Throwable =>
throw new KafkaException("error processing data for partition [%s,%d] offset %d" throw new KafkaException("error processing data for partition [%s,%d] offset %d"
.format(topic, partitionId, currentOffset.get), e) .format(topic, partitionId, currentOffset.get), e)
} }
@ -147,7 +147,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
warn("Current offset %d for partition [%s,%d] out of range; reset offset to %d" warn("Current offset %d for partition [%s,%d] out of range; reset offset to %d"
.format(currentOffset.get, topic, partitionId, newOffset)) .format(currentOffset.get, topic, partitionId, newOffset))
} catch { } catch {
case e => case e: Throwable =>
warn("Error getting offset for partition [%s,%d] to broker %d".format(topic, partitionId, sourceBroker.id), e) warn("Error getting offset for partition [%s,%d] to broker %d".format(topic, partitionId, sourceBroker.id), e)
partitionsWithError += topicAndPartition partitionsWithError += topicAndPartition
} }

View File

@ -264,7 +264,7 @@ class KafkaApis(val requestChannel: RequestChannel,
warn("Produce request with correlation id %d from client %s on partition %s failed due to %s".format( warn("Produce request with correlation id %d from client %s on partition %s failed due to %s".format(
producerRequest.correlationId, producerRequest.clientId, topicAndPartition, nle.getMessage)) producerRequest.correlationId, producerRequest.clientId, topicAndPartition, nle.getMessage))
new ProduceResult(topicAndPartition, nle) new ProduceResult(topicAndPartition, nle)
case e => case e: Throwable =>
BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).failedProduceRequestRate.mark() BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).failedProduceRequestRate.mark()
BrokerTopicStats.getBrokerAllTopicsStats.failedProduceRequestRate.mark() BrokerTopicStats.getBrokerAllTopicsStats.failedProduceRequestRate.mark()
error("Error processing ProducerRequest with correlation id %d from client %s on partition %s" error("Error processing ProducerRequest with correlation id %d from client %s on partition %s"
@ -353,7 +353,7 @@ class KafkaApis(val requestChannel: RequestChannel,
warn("Fetch request with correlation id %d from client %s on partition [%s,%d] failed due to %s".format( warn("Fetch request with correlation id %d from client %s on partition [%s,%d] failed due to %s".format(
fetchRequest.correlationId, fetchRequest.clientId, topic, partition, nle.getMessage)) fetchRequest.correlationId, fetchRequest.clientId, topic, partition, nle.getMessage))
new FetchResponsePartitionData(ErrorMapping.codeFor(nle.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty) new FetchResponsePartitionData(ErrorMapping.codeFor(nle.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty)
case t => case t: Throwable =>
BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark() BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark()
BrokerTopicStats.getBrokerAllTopicsStats.failedFetchRequestRate.mark() BrokerTopicStats.getBrokerAllTopicsStats.failedFetchRequestRate.mark()
error("Error when processing fetch request for partition [%s,%d] offset %d from %s with correlation id %d" error("Error when processing fetch request for partition [%s,%d] offset %d from %s with correlation id %d"
@ -430,7 +430,7 @@ class KafkaApis(val requestChannel: RequestChannel,
warn("Offset request with correlation id %d from client %s on partition %s failed due to %s".format( warn("Offset request with correlation id %d from client %s on partition %s failed due to %s".format(
offsetRequest.correlationId, offsetRequest.clientId, topicAndPartition,nle.getMessage)) offsetRequest.correlationId, offsetRequest.clientId, topicAndPartition,nle.getMessage))
(topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(nle.getClass.asInstanceOf[Class[Throwable]]), Nil) ) (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(nle.getClass.asInstanceOf[Class[Throwable]]), Nil) )
case e => case e: Throwable =>
warn("Error while responding to offset request", e) warn("Error while responding to offset request", e)
(topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), Nil) ) (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), Nil) )
} }
@ -481,7 +481,7 @@ class KafkaApis(val requestChannel: RequestChannel,
isr.filterNot(isrInfo.map(_.id).contains(_)).mkString(",")) isr.filterNot(isrInfo.map(_.id).contains(_)).mkString(","))
new PartitionMetadata(topicAndPartition.partition, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError) new PartitionMetadata(topicAndPartition.partition, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError)
} catch { } catch {
case e => case e: Throwable =>
error("Error while fetching metadata for partition %s".format(topicAndPartition), e) error("Error while fetching metadata for partition %s".format(topicAndPartition), e)
new PartitionMetadata(topicAndPartition.partition, leaderInfo, replicaInfo, isrInfo, new PartitionMetadata(topicAndPartition.partition, leaderInfo, replicaInfo, isrInfo,
ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))

View File

@ -34,7 +34,7 @@ class KafkaServerStartable(val serverConfig: KafkaConfig) extends Logging {
server.startup() server.startup()
} }
catch { catch {
case e => case e: Throwable =>
fatal("Fatal error during KafkaServerStable startup. Prepare to shutdown", e) fatal("Fatal error during KafkaServerStable startup. Prepare to shutdown", e)
shutdown() shutdown()
System.exit(1) System.exit(1)
@ -46,7 +46,7 @@ class KafkaServerStartable(val serverConfig: KafkaConfig) extends Logging {
server.shutdown() server.shutdown()
} }
catch { catch {
case e => case e: Throwable =>
fatal("Fatal error during KafkaServerStable shutdown. Prepare to halt", e) fatal("Fatal error during KafkaServerStable shutdown. Prepare to halt", e)
System.exit(1) System.exit(1)
} }

View File

@ -223,7 +223,7 @@ class ReplicaManager(val config: KafkaConfig,
makeFollower(controllerId, controllerEpoch, topic, partitionId, partitionStateInfo, leaderAndISRRequest.leaders, makeFollower(controllerId, controllerEpoch, topic, partitionId, partitionStateInfo, leaderAndISRRequest.leaders,
leaderAndISRRequest.correlationId) leaderAndISRRequest.correlationId)
} catch { } catch {
case e => case e: Throwable =>
val errorMsg = ("Error on broker %d while processing LeaderAndIsr request correlationId %d received from controller %d " + val errorMsg = ("Error on broker %d while processing LeaderAndIsr request correlationId %d received from controller %d " +
"epoch %d for partition %s").format(localBrokerId, leaderAndISRRequest.correlationId, leaderAndISRRequest.controllerId, "epoch %d for partition %s").format(localBrokerId, leaderAndISRRequest.correlationId, leaderAndISRRequest.controllerId,
leaderAndISRRequest.controllerEpoch, topicAndPartition) leaderAndISRRequest.controllerEpoch, topicAndPartition)

View File

@ -72,7 +72,7 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath:
} }
if (leaderId != -1) if (leaderId != -1)
debug("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId)) debug("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId))
case e2 => case e2: Throwable =>
error("Error while electing or becoming leader on broker %d".format(brokerId), e2) error("Error while electing or becoming leader on broker %d".format(brokerId), e2)
leaderId = -1 leaderId = -1
} }

View File

@ -102,7 +102,7 @@ object ImportZkOffsets extends Logging {
try { try {
ZkUtils.updatePersistentPath(zkClient, partition, offset.toString) ZkUtils.updatePersistentPath(zkClient, partition, offset.toString)
} catch { } catch {
case e => e.printStackTrace() case e: Throwable => e.printStackTrace()
} }
} }
} }

View File

@ -86,7 +86,7 @@ object JmxTool extends Logging {
else else
List(null) List(null)
val names = queries.map((name: ObjectName) => asSet(mbsc.queryNames(name, null))).flatten val names = queries.map((name: ObjectName) => mbsc.queryNames(name, null): mutable.Set[ObjectName]).flatten
val allAttributes: Iterable[(ObjectName, Array[String])] = val allAttributes: Iterable[(ObjectName, Array[String])] =
names.map((name: ObjectName) => (name, mbsc.getMBeanInfo(name).getAttributes().map(_.getName))) names.map((name: ObjectName) => (name, mbsc.getMBeanInfo(name).getAttributes().map(_.getName)))

View File

@ -129,7 +129,7 @@ object MirrorMaker extends Logging {
try { try {
streams = connectors.map(_.createMessageStreamsByFilter(filterSpec, numStreams.intValue(), new DefaultDecoder(), new DefaultDecoder())).flatten streams = connectors.map(_.createMessageStreamsByFilter(filterSpec, numStreams.intValue(), new DefaultDecoder(), new DefaultDecoder())).flatten
} catch { } catch {
case t => case t: Throwable =>
fatal("Unable to create stream - shutting down mirror maker.") fatal("Unable to create stream - shutting down mirror maker.")
connectors.foreach(_.shutdown) connectors.foreach(_.shutdown)
} }
@ -204,7 +204,7 @@ object MirrorMaker extends Logging {
} }
} }
} catch { } catch {
case e => case e: Throwable =>
fatal("Stream unexpectedly exited.", e) fatal("Stream unexpectedly exited.", e)
} finally { } finally {
shutdownLatch.countDown() shutdownLatch.countDown()

View File

@ -217,7 +217,7 @@ object SimpleConsumerShell extends Logging {
formatter.writeTo(key, Utils.readBytes(message.payload), System.out) formatter.writeTo(key, Utils.readBytes(message.payload), System.out)
numMessagesConsumed += 1 numMessagesConsumed += 1
} catch { } catch {
case e => case e: Throwable =>
if (skipMessageOnError) if (skipMessageOnError)
error("Error processing message, skipping this message: ", e) error("Error processing message, skipping this message: ", e)
else else

View File

@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.utils
import scala.annotation.StaticAnnotation
/* Some helpful annotations */
/**
* Indicates that the annotated class is meant to be threadsafe. For an abstract class it is an part of the interface that an implementation
* must respect
*/
class threadsafe extends StaticAnnotation
/**
* Indicates that the annotated class is not threadsafe
*/
class nonthreadsafe extends StaticAnnotation
/**
* Indicates that the annotated class is immutable
*/
class immutable extends StaticAnnotation

View File

@ -32,7 +32,7 @@ object Json extends Logging {
try { try {
JSON.parseFull(input) JSON.parseFull(input)
} catch { } catch {
case t => case t: Throwable =>
throw new KafkaException("Can't parse json string: %s".format(input), t) throw new KafkaException("Can't parse json string: %s".format(input), t)
} }
} }

View File

@ -64,7 +64,7 @@ object Mx4jLoader extends Logging {
case e: ClassNotFoundException => { case e: ClassNotFoundException => {
info("Will not load MX4J, mx4j-tools.jar is not in the classpath"); info("Will not load MX4J, mx4j-tools.jar is not in the classpath");
} }
case e => { case e: Throwable => {
warn("Could not start register mbean in JMX", e); warn("Could not start register mbean in JMX", e);
} }
} }

View File

@ -19,6 +19,7 @@ package kafka.utils
import java.util.ArrayList import java.util.ArrayList
import java.util.concurrent._ import java.util.concurrent._
import collection.mutable
import collection.JavaConversions import collection.JavaConversions
import kafka.common.KafkaException import kafka.common.KafkaException
import java.lang.Object import java.lang.Object
@ -71,10 +72,15 @@ class Pool[K,V](valueFactory: Option[(K) => V] = None) extends Iterable[(K, V)]
def remove(key: K): V = pool.remove(key) def remove(key: K): V = pool.remove(key)
def keys = JavaConversions.asSet(pool.keySet()) def keys: mutable.Set[K] = {
import JavaConversions._
pool.keySet()
}
def values: Iterable[V] = def values: Iterable[V] = {
JavaConversions.asIterable(new ArrayList[V](pool.values())) import JavaConversions._
new ArrayList[V](pool.values())
}
def clear() { pool.clear() } def clear() { pool.clear() }

View File

@ -67,7 +67,7 @@ object Utils extends Logging {
fun() fun()
} }
catch { catch {
case t => case t: Throwable =>
// log any error and the stack trace // log any error and the stack trace
error("error in loggedRunnable", t) error("error in loggedRunnable", t)
} }

View File

@ -271,7 +271,7 @@ object ZkUtils extends Logging {
storedData = readData(client, path)._1 storedData = readData(client, path)._1
} catch { } catch {
case e1: ZkNoNodeException => // the node disappeared; treat as if node existed and let caller handles this case e1: ZkNoNodeException => // the node disappeared; treat as if node existed and let caller handles this
case e2 => throw e2 case e2: Throwable => throw e2
} }
if (storedData == null || storedData != data) { if (storedData == null || storedData != data) {
info("conflict in " + path + " data: " + data + " stored data: " + storedData) info("conflict in " + path + " data: " + data + " stored data: " + storedData)
@ -281,7 +281,7 @@ object ZkUtils extends Logging {
info(path + " exists with value " + data + " during connection loss; this is ok") info(path + " exists with value " + data + " during connection loss; this is ok")
} }
} }
case e2 => throw e2 case e2: Throwable => throw e2
} }
} }
@ -321,7 +321,7 @@ object ZkUtils extends Logging {
case None => // the node disappeared; retry creating the ephemeral node immediately case None => // the node disappeared; retry creating the ephemeral node immediately
} }
} }
case e2 => throw e2 case e2: Throwable => throw e2
} }
} }
} }
@ -360,10 +360,10 @@ object ZkUtils extends Logging {
} catch { } catch {
case e: ZkNodeExistsException => case e: ZkNodeExistsException =>
client.writeData(path, data) client.writeData(path, data)
case e2 => throw e2 case e2: Throwable => throw e2
} }
} }
case e2 => throw e2 case e2: Throwable => throw e2
} }
} }
@ -416,7 +416,7 @@ object ZkUtils extends Logging {
createParentPath(client, path) createParentPath(client, path)
client.createEphemeral(path, data) client.createEphemeral(path, data)
} }
case e2 => throw e2 case e2: Throwable => throw e2
} }
} }
@ -428,7 +428,7 @@ object ZkUtils extends Logging {
// this can happen during a connection loss event, return normally // this can happen during a connection loss event, return normally
info(path + " deleted during connection loss; this is ok") info(path + " deleted during connection loss; this is ok")
false false
case e2 => throw e2 case e2: Throwable => throw e2
} }
} }
@ -439,7 +439,7 @@ object ZkUtils extends Logging {
case e: ZkNoNodeException => case e: ZkNoNodeException =>
// this can happen during a connection loss event, return normally // this can happen during a connection loss event, return normally
info(path + " deleted during connection loss; this is ok") info(path + " deleted during connection loss; this is ok")
case e2 => throw e2 case e2: Throwable => throw e2
} }
} }
@ -449,7 +449,7 @@ object ZkUtils extends Logging {
zk.deleteRecursive(dir) zk.deleteRecursive(dir)
zk.close() zk.close()
} catch { } catch {
case _ => // swallow case _: Throwable => // swallow
} }
} }
@ -466,7 +466,7 @@ object ZkUtils extends Logging {
} catch { } catch {
case e: ZkNoNodeException => case e: ZkNoNodeException =>
(None, stat) (None, stat)
case e2 => throw e2 case e2: Throwable => throw e2
} }
dataAndStat dataAndStat
} }
@ -484,7 +484,7 @@ object ZkUtils extends Logging {
client.getChildren(path) client.getChildren(path)
} catch { } catch {
case e: ZkNoNodeException => return Nil case e: ZkNoNodeException => return Nil
case e2 => throw e2 case e2: Throwable => throw e2
} }
} }
@ -675,7 +675,7 @@ object ZkUtils extends Logging {
case nne: ZkNoNodeException => case nne: ZkNoNodeException =>
ZkUtils.createPersistentPath(zkClient, zkPath, jsonData) ZkUtils.createPersistentPath(zkClient, zkPath, jsonData)
debug("Created path %s with %s for partition reassignment".format(zkPath, jsonData)) debug("Created path %s with %s for partition reassignment".format(zkPath, jsonData))
case e2 => throw new AdministrationException(e2.toString) case e2: Throwable => throw new AdministrationException(e2.toString)
} }
} }
} }

View File

@ -104,7 +104,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness {
fail("Topic should not exist") fail("Topic should not exist")
} catch { } catch {
case e: AdministrationException => //this is good case e: AdministrationException => //this is good
case e2 => throw e2 case e2: Throwable => throw e2
} }
} }
@ -114,7 +114,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness {
fail("Add partitions should fail") fail("Add partitions should fail")
} catch { } catch {
case e: AdministrationException => //this is good case e: AdministrationException => //this is good
case e2 => throw e2 case e2: Throwable => throw e2
} }
} }

View File

@ -38,7 +38,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
} }
catch { catch {
case e: AdministrationException => // this is good case e: AdministrationException => // this is good
case e2 => throw e2 case e2: Throwable => throw e2
} }
// test wrong replication factor // test wrong replication factor
@ -48,7 +48,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
} }
catch { catch {
case e: AdministrationException => // this is good case e: AdministrationException => // this is good
case e2 => throw e2 case e2: Throwable => throw e2
} }
// correct assignment // correct assignment
@ -84,7 +84,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
} }
catch { catch {
case e: AdministrationException => // this is good case e: AdministrationException => // this is good
case e2 => throw e2 case e2: Throwable => throw e2
} }
// non-exist brokers // non-exist brokers
@ -95,7 +95,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
} }
catch { catch {
case e: AdministrationException => // this is good case e: AdministrationException => // this is good
case e2 => throw e2 case e2: Throwable => throw e2
} }
// inconsistent replication factor // inconsistent replication factor
@ -106,7 +106,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
} }
catch { catch {
case e: AdministrationException => // this is good case e: AdministrationException => // this is good
case e2 => throw e2 case e2: Throwable => throw e2
} }
// good assignment // good assignment
@ -170,7 +170,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
fail("shouldn't be able to create a topic already exists") fail("shouldn't be able to create a topic already exists")
} catch { } catch {
case e: TopicExistsException => // this is good case e: TopicExistsException => // this is good
case e2 => throw e2 case e2: Throwable => throw e2
} }
} }

View File

@ -83,7 +83,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
fail("should get an exception") fail("should get an exception")
} catch { } catch {
case e: ConsumerTimeoutException => // this is ok case e: ConsumerTimeoutException => // this is ok
case e => throw e case e: Throwable => throw e
} }
} }
@ -406,10 +406,10 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
} }
def getZKChildrenValues(path : String) : Seq[Tuple2[String,String]] = { def getZKChildrenValues(path : String) : Seq[Tuple2[String,String]] = {
import scala.collection.JavaConversions import scala.collection.JavaConversions._
val children = zkClient.getChildren(path) val children = zkClient.getChildren(path)
Collections.sort(children) Collections.sort(children)
val childrenAsSeq : Seq[java.lang.String] = JavaConversions.asBuffer(children) val childrenAsSeq : Seq[java.lang.String] = (children: mutable.Buffer[String]).toSeq
childrenAsSeq.map(partition => childrenAsSeq.map(partition =>
(partition, zkClient.readData(path + "/" + partition).asInstanceOf[String])) (partition, zkClient.readData(path + "/" + partition).asInstanceOf[String]))
} }

View File

@ -85,7 +85,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
val ms = 0.until(messagesPerNode).map(x => header + conf.brokerId + "-" + partition + "-" + x) val ms = 0.until(messagesPerNode).map(x => header + conf.brokerId + "-" + partition + "-" + x)
messages ++= ms messages ++= ms
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._
javaProducer.send(asList(ms.map(new KeyedMessage[Int, String](topic, partition, _)))) javaProducer.send(ms.map(new KeyedMessage[Int, String](topic, partition, _)): java.util.List[KeyedMessage[Int, String]])
} }
javaProducer.close javaProducer.close
messages messages
@ -103,7 +103,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
def getMessages(nMessagesPerThread: Int, def getMessages(nMessagesPerThread: Int,
jTopicMessageStreams: java.util.Map[String, java.util.List[KafkaStream[String, String]]]): List[String] = { jTopicMessageStreams: java.util.Map[String, java.util.List[KafkaStream[String, String]]]): List[String] = {
var messages: List[String] = Nil var messages: List[String] = Nil
val topicMessageStreams = asMap(jTopicMessageStreams) import scala.collection.JavaConversions._
val topicMessageStreams: collection.mutable.Map[String, java.util.List[KafkaStream[String, String]]] = jTopicMessageStreams
for ((topic, messageStreams) <- topicMessageStreams) { for ((topic, messageStreams) <- topicMessageStreams) {
for (messageStream <- messageStreams) { for (messageStream <- messageStreams) {
val iterator = messageStream.iterator val iterator = messageStream.iterator

View File

@ -29,8 +29,7 @@ trait BaseMessageSetTestCases extends JUnitSuite {
def createMessageSet(messages: Seq[Message], compressed: CompressionCodec = NoCompressionCodec): MessageSet def createMessageSet(messages: Seq[Message], compressed: CompressionCodec = NoCompressionCodec): MessageSet
def toMessageIterator(messageSet: MessageSet): Iterator[Message] = { def toMessageIterator(messageSet: MessageSet): Iterator[Message] = {
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._
val messages = asIterable(messageSet) messageSet.map(m => m.message).iterator
messages.map(m => m.message).iterator
} }
@Test @Test
@ -44,7 +43,7 @@ trait BaseMessageSetTestCases extends JUnitSuite {
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._
val m = createMessageSet(messages) val m = createMessageSet(messages)
// two iterators over the same set should give the same results // two iterators over the same set should give the same results
TestUtils.checkEquals(asIterator(m.iterator), asIterator(m.iterator)) TestUtils.checkEquals(m.iterator, m.iterator)
} }
@Test @Test
@ -52,7 +51,7 @@ trait BaseMessageSetTestCases extends JUnitSuite {
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._
val m = createMessageSet(messages, DefaultCompressionCodec) val m = createMessageSet(messages, DefaultCompressionCodec)
// two iterators over the same set should give the same results // two iterators over the same set should give the same results
TestUtils.checkEquals(asIterator(m.iterator), asIterator(m.iterator)) TestUtils.checkEquals(m.iterator, m.iterator)
} }
@Test @Test

View File

@ -5,7 +5,7 @@
* The ASF licenses this file to You under the Apache License, Version 2.0 * The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with * (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at * the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
@ -30,7 +30,7 @@ import scala.Some
import kafka.server.KafkaConfig import kafka.server.KafkaConfig
class LogTest extends JUnitSuite { class LogTest extends JUnitSuite {
var logDir: File = null var logDir: File = null
val time = new MockTime val time = new MockTime
var config: KafkaConfig = null var config: KafkaConfig = null
@ -46,7 +46,7 @@ class LogTest extends JUnitSuite {
def tearDown() { def tearDown() {
Utils.rm(logDir) Utils.rm(logDir)
} }
def createEmptyLogs(dir: File, offsets: Int*) { def createEmptyLogs(dir: File, offsets: Int*) {
for(offset <- offsets) { for(offset <- offsets) {
Log.logFilename(dir, offset).createNewFile() Log.logFilename(dir, offset).createNewFile()
@ -168,19 +168,19 @@ class LogTest extends JUnitSuite {
val lastRead = log.read(startOffset = numMessages, maxLength = 1024*1024, maxOffset = Some(numMessages + 1)) val lastRead = log.read(startOffset = numMessages, maxLength = 1024*1024, maxOffset = Some(numMessages + 1))
assertEquals("Should be no more messages", 0, lastRead.size) assertEquals("Should be no more messages", 0, lastRead.size)
} }
/** Test the case where we have compressed batches of messages */ /** Test the case where we have compressed batches of messages */
@Test @Test
def testCompressedMessages() { def testCompressedMessages() {
/* this log should roll after every messageset */ /* this log should roll after every messageset */
val log = new Log(logDir, 10, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time) val log = new Log(logDir, 10, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
/* append 2 compressed message sets, each with two messages giving offsets 0, 1, 2, 3 */ /* append 2 compressed message sets, each with two messages giving offsets 0, 1, 2, 3 */
log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes), new Message("there".getBytes))) log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes), new Message("there".getBytes)))
log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("alpha".getBytes), new Message("beta".getBytes))) log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("alpha".getBytes), new Message("beta".getBytes)))
def read(offset: Int) = ByteBufferMessageSet.decompress(log.read(offset, 4096).head.message) def read(offset: Int) = ByteBufferMessageSet.decompress(log.read(offset, 4096).head.message)
/* we should always get the first message in the compressed set when reading any offset in the set */ /* we should always get the first message in the compressed set when reading any offset in the set */
assertEquals("Read at offset 0 should produce 0", 0, read(0).head.offset) assertEquals("Read at offset 0 should produce 0", 0, read(0).head.offset)
assertEquals("Read at offset 1 should produce 0", 0, read(1).head.offset) assertEquals("Read at offset 1 should produce 0", 0, read(1).head.offset)
@ -202,7 +202,7 @@ class LogTest extends JUnitSuite {
assertContains(makeRanges(5,8), 5) assertContains(makeRanges(5,8), 5)
assertContains(makeRanges(5,8), 6) assertContains(makeRanges(5,8), 6)
} }
@Test @Test
def testEdgeLogRollsStartingAtZero() { def testEdgeLogRollsStartingAtZero() {
// first test a log segment starting at 0 // first test a log segment starting at 0
@ -226,7 +226,7 @@ class LogTest extends JUnitSuite {
for(i <- 0 until numMessages) for(i <- 0 until numMessages)
log.append(TestUtils.singleMessageSet(i.toString.getBytes)) log.append(TestUtils.singleMessageSet(i.toString.getBytes))
val curOffset = log.logEndOffset val curOffset = log.logEndOffset
// time goes by; the log file is deleted // time goes by; the log file is deleted
log.markDeletedWhile(_ => true) log.markDeletedWhile(_ => true)
@ -262,7 +262,7 @@ class LogTest extends JUnitSuite {
case e:MessageSizeTooLargeException => // this is good case e:MessageSizeTooLargeException => // this is good
} }
} }
@Test @Test
def testLogRecoversToCorrectOffset() { def testLogRecoversToCorrectOffset() {
val numMessages = 100 val numMessages = 100
@ -276,15 +276,15 @@ class LogTest extends JUnitSuite {
val lastIndexOffset = log.segments.view.last.index.lastOffset val lastIndexOffset = log.segments.view.last.index.lastOffset
val numIndexEntries = log.segments.view.last.index.entries val numIndexEntries = log.segments.view.last.index.entries
log.close() log.close()
// test non-recovery case // test non-recovery case
log = new Log(logDir, segmentSize, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, indexIntervalBytes = indexInterval, maxIndexSize = 4096) log = new Log(logDir, segmentSize, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, indexIntervalBytes = indexInterval, maxIndexSize = 4096)
assertEquals("Should have %d messages when log is reopened w/o recovery".format(numMessages), numMessages, log.logEndOffset) assertEquals("Should have %d messages when log is reopened w/o recovery".format(numMessages), numMessages, log.logEndOffset)
assertEquals("Should have same last index offset as before.", lastIndexOffset, log.segments.view.last.index.lastOffset) assertEquals("Should have same last index offset as before.", lastIndexOffset, log.segments.view.last.index.lastOffset)
assertEquals("Should have same number of index entries as before.", numIndexEntries, log.segments.view.last.index.entries) assertEquals("Should have same number of index entries as before.", numIndexEntries, log.segments.view.last.index.entries)
log.close() log.close()
// test // test
log = new Log(logDir, segmentSize, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = true, indexIntervalBytes = indexInterval, maxIndexSize = 4096) log = new Log(logDir, segmentSize, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = true, indexIntervalBytes = indexInterval, maxIndexSize = 4096)
assertEquals("Should have %d messages when log is reopened with recovery".format(numMessages), numMessages, log.logEndOffset) assertEquals("Should have %d messages when log is reopened with recovery".format(numMessages), numMessages, log.logEndOffset)
assertEquals("Should have same last index offset as before.", lastIndexOffset, log.segments.view.last.index.lastOffset) assertEquals("Should have same last index offset as before.", lastIndexOffset, log.segments.view.last.index.lastOffset)
@ -305,10 +305,10 @@ class LogTest extends JUnitSuite {
for (i<- 1 to msgPerSeg) for (i<- 1 to msgPerSeg)
log.append(set) log.append(set)
assertEquals("There should be exactly 1 segments.", 1, log.numberOfSegments) assertEquals("There should be exactly 1 segments.", 1, log.numberOfSegments)
assertEquals("Log end offset should be equal to number of messages", msgPerSeg, log.logEndOffset) assertEquals("Log end offset should be equal to number of messages", msgPerSeg, log.logEndOffset)
val lastOffset = log.logEndOffset val lastOffset = log.logEndOffset
val size = log.size val size = log.size
log.truncateTo(log.logEndOffset) // keep the entire log log.truncateTo(log.logEndOffset) // keep the entire log
@ -326,7 +326,7 @@ class LogTest extends JUnitSuite {
for (i<- 1 to msgPerSeg) for (i<- 1 to msgPerSeg)
log.append(set) log.append(set)
assertEquals("Should be back to original offset", log.logEndOffset, lastOffset) assertEquals("Should be back to original offset", log.logEndOffset, lastOffset)
assertEquals("Should be back to original size", log.size, size) assertEquals("Should be back to original size", log.size, size)
log.truncateAndStartWithNewOffset(log.logEndOffset - (msgPerSeg - 1)) log.truncateAndStartWithNewOffset(log.logEndOffset - (msgPerSeg - 1))
@ -371,14 +371,14 @@ class LogTest extends JUnitSuite {
def testAppendWithoutOffsetAssignment() { def testAppendWithoutOffsetAssignment() {
for(codec <- List(NoCompressionCodec, DefaultCompressionCodec)) { for(codec <- List(NoCompressionCodec, DefaultCompressionCodec)) {
logDir.mkdir() logDir.mkdir()
var log = new Log(logDir, var log = new Log(logDir,
maxLogFileSize = 64*1024, maxLogFileSize = 64*1024,
maxMessageSize = config.messageMaxBytes, maxMessageSize = config.messageMaxBytes,
maxIndexSize = 1000, maxIndexSize = 1000,
indexIntervalBytes = 10000, indexIntervalBytes = 10000,
needsRecovery = true) needsRecovery = true)
val messages = List("one", "two", "three", "four", "five", "six") val messages = List("one", "two", "three", "four", "five", "six")
val ms = new ByteBufferMessageSet(compressionCodec = codec, val ms = new ByteBufferMessageSet(compressionCodec = codec,
offsetCounter = new AtomicLong(0), offsetCounter = new AtomicLong(0),
messages = messages.map(s => new Message(s.getBytes)):_*) messages = messages.map(s => new Message(s.getBytes)):_*)
val firstOffset = ms.toList.head.offset val firstOffset = ms.toList.head.offset
@ -391,7 +391,7 @@ class LogTest extends JUnitSuite {
log.delete() log.delete()
} }
} }
/** /**
* When we open a log any index segments without an associated log segment should be deleted. * When we open a log any index segments without an associated log segment should be deleted.
*/ */
@ -399,22 +399,22 @@ class LogTest extends JUnitSuite {
def testBogusIndexSegmentsAreRemoved() { def testBogusIndexSegmentsAreRemoved() {
val bogusIndex1 = Log.indexFilename(logDir, 0) val bogusIndex1 = Log.indexFilename(logDir, 0)
val bogusIndex2 = Log.indexFilename(logDir, 5) val bogusIndex2 = Log.indexFilename(logDir, 5)
val set = TestUtils.singleMessageSet("test".getBytes()) val set = TestUtils.singleMessageSet("test".getBytes())
val log = new Log(logDir, val log = new Log(logDir,
maxLogFileSize = set.sizeInBytes * 5, maxLogFileSize = set.sizeInBytes * 5,
maxMessageSize = config.messageMaxBytes, maxMessageSize = config.messageMaxBytes,
maxIndexSize = 1000, maxIndexSize = 1000,
indexIntervalBytes = 1, indexIntervalBytes = 1,
needsRecovery = false) needsRecovery = false)
assertTrue("The first index file should have been replaced with a larger file", bogusIndex1.length > 0) assertTrue("The first index file should have been replaced with a larger file", bogusIndex1.length > 0)
assertFalse("The second index file should have been deleted.", bogusIndex2.exists) assertFalse("The second index file should have been deleted.", bogusIndex2.exists)
// check that we can append to the log // check that we can append to the log
for(i <- 0 until 10) for(i <- 0 until 10)
log.append(set) log.append(set)
log.delete() log.delete()
} }
@ -423,38 +423,38 @@ class LogTest extends JUnitSuite {
val set = TestUtils.singleMessageSet("test".getBytes()) val set = TestUtils.singleMessageSet("test".getBytes())
// create a log // create a log
var log = new Log(logDir, var log = new Log(logDir,
maxLogFileSize = set.sizeInBytes * 5, maxLogFileSize = set.sizeInBytes * 5,
maxMessageSize = config.messageMaxBytes, maxMessageSize = config.messageMaxBytes,
maxIndexSize = 1000, maxIndexSize = 1000,
indexIntervalBytes = 10000, indexIntervalBytes = 10000,
needsRecovery = true) needsRecovery = true)
// add enough messages to roll over several segments then close and re-open and attempt to truncate // add enough messages to roll over several segments then close and re-open and attempt to truncate
for(i <- 0 until 100) for(i <- 0 until 100)
log.append(set) log.append(set)
log.close() log.close()
log = new Log(logDir, log = new Log(logDir,
maxLogFileSize = set.sizeInBytes * 5, maxLogFileSize = set.sizeInBytes * 5,
maxMessageSize = config.messageMaxBytes, maxMessageSize = config.messageMaxBytes,
maxIndexSize = 1000, maxIndexSize = 1000,
indexIntervalBytes = 10000, indexIntervalBytes = 10000,
needsRecovery = true) needsRecovery = true)
log.truncateTo(3) log.truncateTo(3)
assertEquals("All but one segment should be deleted.", 1, log.numberOfSegments) assertEquals("All but one segment should be deleted.", 1, log.numberOfSegments)
assertEquals("Log end offset should be 3.", 3, log.logEndOffset) assertEquals("Log end offset should be 3.", 3, log.logEndOffset)
} }
def assertContains(ranges: Array[Range], offset: Long) = { def assertContains(ranges: Array[Range], offset: Long) = {
Log.findRange(ranges, offset) match { Log.findRange(ranges, offset) match {
case Some(range) => case Some(range) =>
assertTrue(range + " does not contain " + offset, range.contains(offset)) assertTrue(range + " does not contain " + offset, range.contains(offset))
case None => fail("No range found, but expected to find " + offset) case None => fail("No range found, but expected to find " + offset)
} }
} }
class SimpleRange(val start: Long, val size: Long) extends Range class SimpleRange(val start: Long, val size: Long) extends Range
def makeRanges(breaks: Int*): Array[Range] = { def makeRanges(breaks: Int*): Array[Range] = {
val list = new ArrayList[Range] val list = new ArrayList[Range]
var prior = 0 var prior = 0
@ -464,5 +464,5 @@ class LogTest extends JUnitSuite {
} }
list.toArray(new Array[Range](list.size)) list.toArray(new Array[Range](list.size))
} }
} }

View File

@ -30,14 +30,15 @@ class KafkaTimerTest extends JUnit3Suite {
val clock = new ManualClock val clock = new ManualClock
val testRegistry = new MetricsRegistry(clock) val testRegistry = new MetricsRegistry(clock)
val metric = testRegistry.newTimer(this.getClass, "TestTimer") val metric = testRegistry.newTimer(this.getClass, "TestTimer")
val Epsilon = java.lang.Double.longBitsToDouble(0x3ca0000000000000L)
val timer = new KafkaTimer(metric) val timer = new KafkaTimer(metric)
timer.time { timer.time {
clock.addMillis(1000) clock.addMillis(1000)
} }
assertEquals(1, metric.count()) assertEquals(1, metric.count())
assertTrue((metric.max() - 1000).abs <= Double.Epsilon) assertTrue((metric.max() - 1000).abs <= Epsilon)
assertTrue((metric.min() - 1000).abs <= Double.Epsilon) assertTrue((metric.min() - 1000).abs <= Epsilon)
} }
private class ManualClock extends Clock { private class ManualClock extends Clock {

View File

@ -263,7 +263,7 @@ class AsyncProducerTest extends JUnit3Suite {
} }
catch { catch {
// should not throw any exception // should not throw any exception
case e => fail("Should not throw any exception") case e: Throwable => fail("Should not throw any exception")
} }
} }
@ -450,7 +450,8 @@ class AsyncProducerTest extends JUnit3Suite {
val topic = "topic1" val topic = "topic1"
val msgs = TestUtils.getMsgStrings(5) val msgs = TestUtils.getMsgStrings(5)
val scalaProducerData = msgs.map(m => new KeyedMessage[String, String](topic, m)) val scalaProducerData = msgs.map(m => new KeyedMessage[String, String](topic, m))
val javaProducerData = scala.collection.JavaConversions.asList(scalaProducerData) import scala.collection.JavaConversions._
val javaProducerData: java.util.List[KeyedMessage[String, String]] = scalaProducerData
val mockScalaProducer = EasyMock.createMock(classOf[kafka.producer.Producer[String, String]]) val mockScalaProducer = EasyMock.createMock(classOf[kafka.producer.Producer[String, String]])
mockScalaProducer.send(scalaProducerData.head) mockScalaProducer.send(scalaProducerData.head)

View File

@ -108,7 +108,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
fail("Test should fail because the broker list provided are not valid") fail("Test should fail because the broker list provided are not valid")
} catch { } catch {
case e: FailedToSendMessageException => case e: FailedToSendMessageException =>
case oe => fail("fails with exception", oe) case oe: Throwable => fail("fails with exception", oe)
} finally { } finally {
producer1.close() producer1.close()
} }
@ -121,7 +121,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
try{ try{
producer2.send(new KeyedMessage[String, String](topic, "test", "test1")) producer2.send(new KeyedMessage[String, String](topic, "test", "test1"))
} catch { } catch {
case e => fail("Should succeed sending the message", e) case e: Throwable => fail("Should succeed sending the message", e)
} finally { } finally {
producer2.close() producer2.close()
} }
@ -134,7 +134,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
try{ try{
producer3.send(new KeyedMessage[String, String](topic, "test", "test1")) producer3.send(new KeyedMessage[String, String](topic, "test", "test1"))
} catch { } catch {
case e => fail("Should succeed sending the message", e) case e: Throwable => fail("Should succeed sending the message", e)
} finally { } finally {
producer3.close() producer3.close()
} }
@ -191,7 +191,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
} }
catch { catch {
case se: FailedToSendMessageException => true case se: FailedToSendMessageException => true
case e => fail("Not expected", e) case e: Throwable => fail("Not expected", e)
} }
finally { finally {
producer2.close() producer2.close()
@ -225,7 +225,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
// on broker 0 // on broker 0
producer.send(new KeyedMessage[String, String](topic, "test", "test1")) producer.send(new KeyedMessage[String, String](topic, "test", "test1"))
} catch { } catch {
case e => fail("Unexpected exception: " + e) case e: Throwable => fail("Unexpected exception: " + e)
} }
// kill the broker // kill the broker
@ -238,7 +238,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
fail("Should fail since no leader exists for the partition.") fail("Should fail since no leader exists for the partition.")
} catch { } catch {
case e : TestFailedException => throw e // catch and re-throw the failure message case e : TestFailedException => throw e // catch and re-throw the failure message
case e2 => // otherwise success case e2: Throwable => // otherwise success
} }
// restart server 1 // restart server 1
@ -287,7 +287,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
assertTrue("Message set should have 1 message", messageSet1.hasNext) assertTrue("Message set should have 1 message", messageSet1.hasNext)
assertEquals(new Message("test".getBytes), messageSet1.next.message) assertEquals(new Message("test".getBytes), messageSet1.next.message)
} catch { } catch {
case e => case e: Exception => producer.close; fail("Not expected", e) case e: Throwable => case e: Exception => producer.close; fail("Not expected", e)
} }
// stop IO threads and request handling, but leave networking operational // stop IO threads and request handling, but leave networking operational

View File

@ -136,7 +136,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(new Array[Byte](configs(0).messageMaxBytes + 1))), acks = 0)) new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(new Array[Byte](configs(0).messageMaxBytes + 1))), acks = 0))
} catch { } catch {
case e : java.io.IOException => // success case e : java.io.IOException => // success
case e2 => throw e2 case e2: Throwable => throw e2
} }
} }
@ -205,7 +205,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
Assert.fail("Should have received timeout exception since request handling is stopped.") Assert.fail("Should have received timeout exception since request handling is stopped.")
} catch { } catch {
case e: SocketTimeoutException => /* success */ case e: SocketTimeoutException => /* success */
case e => Assert.fail("Unexpected exception when expecting timeout: " + e) case e: Throwable => Assert.fail("Unexpected exception when expecting timeout: " + e)
} }
val t2 = SystemTime.milliseconds val t2 = SystemTime.milliseconds
// make sure we don't wait fewer than timeoutMs for a response // make sure we don't wait fewer than timeoutMs for a response

View File

@ -410,7 +410,7 @@ object TestUtils extends Logging {
ZkUtils.updatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), ZkUtils.updatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition),
ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch)) ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch))
} catch { } catch {
case oe => error("Error while electing leader for partition [%s,%d]".format(topic, partition), oe) case oe: Throwable => error("Error while electing leader for partition [%s,%d]".format(topic, partition), oe)
} }
} }
} }

View File

@ -175,7 +175,7 @@ object ConsumerPerformance {
case _: InterruptedException => case _: InterruptedException =>
case _: ClosedByInterruptException => case _: ClosedByInterruptException =>
case _: ConsumerTimeoutException => case _: ConsumerTimeoutException =>
case e => throw e case e: Throwable => throw e
} }
totalMessagesRead.addAndGet(messagesRead) totalMessagesRead.addAndGet(messagesRead)
totalBytesRead.addAndGet(bytesRead) totalBytesRead.addAndGet(bytesRead)

View File

@ -41,7 +41,8 @@ object KafkaBuild extends Build {
</license> </license>
</licenses>, </licenses>,
scalacOptions ++= Seq("-deprecation", "-unchecked", "-g:none"), scalacOptions ++= Seq("-deprecation", "-unchecked", "-g:none"),
crossScalaVersions := Seq("2.8.0","2.8.2", "2.9.1", "2.9.2"), crossScalaVersions := Seq("2.8.0","2.8.2", "2.9.1", "2.9.2", "2.10.1"),
excludeFilter in unmanagedSources <<= scalaVersion(v => if (v.startsWith("2.8")) "*_2.9+.scala" else "*_2.8.scala"),
scalaVersion := "2.8.0", scalaVersion := "2.8.0",
version := "0.8.0-beta1", version := "0.8.0-beta1",
publishTo := Some("Apache Maven Repo" at "https://repository.apache.org/service/local/staging/deploy/maven2"), publishTo := Some("Apache Maven Repo" at "https://repository.apache.org/service/local/staging/deploy/maven2"),