KAFKA-48 Patch to add "long poll" support to fetch requests.

git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8@1332413 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Edward Jay Kreps 2012-04-30 21:34:49 +00:00
parent d73355017a
commit 1bedde76c2
28 changed files with 746 additions and 304 deletions

View File

@ -30,9 +30,11 @@ brokerid=0
# The port the socket server listens on
port=9092
# The number of processor threads the socket server uses for receiving and answering requests.
# Defaults to the number of cores on the machine
num.threads=8
# The number of threads handling network requests
network.threads=2
# The number of threads doing disk I/O
io.threads=2
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer=1048576

View File

@ -76,6 +76,11 @@ case class OffsetDetail(topic: String, partitions: Seq[Int], offsets: Seq[Long],
object FetchRequest {
val CurrentVersion = 1.shortValue()
val DefaultCorrelationId = -1
val DefaultClientId = ""
val DefaultReplicaId = -1
val DefaultMaxWait = 0
val DefaultMinBytes = 0
def readFrom(buffer: ByteBuffer): FetchRequest = {
val versionId = buffer.getShort
@ -94,13 +99,13 @@ object FetchRequest {
}
case class FetchRequest( versionId: Short,
correlationId: Int,
clientId: String,
replicaId: Int,
maxWait: Int,
minBytes: Int,
offsetInfo: Seq[OffsetDetail] ) extends Request(RequestKeys.Fetch) {
case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
correlationId: Int = FetchRequest.DefaultCorrelationId,
clientId: String = FetchRequest.DefaultClientId,
replicaId: Int = FetchRequest.DefaultReplicaId,
maxWait: Int = FetchRequest.DefaultMaxWait,
minBytes: Int = FetchRequest.DefaultMinBytes,
offsetInfo: Seq[OffsetDetail] ) extends Request(RequestKeys.Fetch) {
// ensure that a topic "X" appears in at most one OffsetDetail
def validate() {
@ -138,13 +143,14 @@ case class FetchRequest( versionId: Short,
def sizeInBytes: Int = 2 + 4 + (2 + clientId.length()) + 4 + 4 + 4 + offsetInfo.foldLeft(4)(_ + _.sizeInBytes())
}
class FetchRequestBuilder() {
private var correlationId = -1
private var correlationId = FetchRequest.DefaultCorrelationId
private val versionId = FetchRequest.CurrentVersion
private var clientId = ""
private var replicaId = -1 // sensible default
private var maxWait = -1 // sensible default
private var minBytes = -1 // sensible default
private var clientId = FetchRequest.DefaultClientId
private var replicaId = FetchRequest.DefaultReplicaId
private var maxWait = FetchRequest.DefaultMaxWait
private var minBytes = FetchRequest.DefaultMinBytes
private val requestMap = new HashMap[String, Tuple3[Buffer[Int], Buffer[Long], Buffer[Int]]]
def addFetch(topic: String, partition: Int, offset: Long, fetchSize: Int) = {

View File

@ -42,7 +42,7 @@ case class PartitionData(partition: Int, error: Int = ErrorMapping.NoError, init
def this(partition: Int, messages: MessageSet) = this(partition, ErrorMapping.NoError, 0L, messages)
def getTranslatedPartition(topic: String, randomSelector: String => Int): Int = {
def translatePartition(topic: String, randomSelector: String => Int): Int = {
if (partition == ProducerRequest.RandomPartition)
return randomSelector(topic)
else

View File

@ -110,7 +110,7 @@ case class ProducerRequest( versionId: Short,
}
}
def getNumTopicPartitions = data.foldLeft(0)(_ + _.partitionData.length)
def topicPartitionCount = data.foldLeft(0)(_ + _.partitionData.length)
def expectResponse = requiredAcks > 0
}

View File

@ -33,6 +33,8 @@ object ConsumerConfig {
val MaxRebalanceRetries = 4
val AutoOffsetReset = OffsetRequest.SmallestTimeString
val ConsumerTimeoutMs = -1
val MinFetchBytes = 1
val MaxFetchWaitMs = 3000
val MirrorTopicsWhitelist = ""
val MirrorTopicsBlacklist = ""
val MirrorConsumerNumThreads = 1
@ -52,7 +54,7 @@ class ConsumerConfig(props: Properties) extends ZKConfig(props) {
* Set this explicitly for only testing purpose. */
val consumerId: Option[String] = Option(Utils.getString(props, "consumerid", null))
/** the socket timeout for network requests */
/** the socket timeout for network requests. The actual timeout set will be max.fetch.wait + socket.timeout.ms. */
val socketTimeoutMs = Utils.getInt(props, "socket.timeout.ms", SocketTimeout)
/** the socket receive buffer for network requests */
@ -61,10 +63,6 @@ class ConsumerConfig(props: Properties) extends ZKConfig(props) {
/** the number of byes of messages to attempt to fetch */
val fetchSize = Utils.getInt(props, "fetch.size", FetchSize)
/** to avoid repeatedly polling a broker node which has no new data
we will backoff every time we get an empty set from the broker*/
val fetcherBackoffMs: Long = Utils.getInt(props, "fetcher.backoff.ms", DefaultFetcherBackoffMs)
/** if true, periodically commit to zookeeper the offset of messages already fetched by the consumer */
val autoCommit = Utils.getBoolean(props, "autocommit.enable", AutoCommit)
@ -77,6 +75,12 @@ class ConsumerConfig(props: Properties) extends ZKConfig(props) {
/** max number of retries during rebalance */
val maxRebalanceRetries = Utils.getInt(props, "rebalance.retries.max", MaxRebalanceRetries)
/** the minimum amount of data the server should return for a fetch request. If insufficient data is available the request will block */
val minFetchBytes = Utils.getInt(props, "min.fetch.bytes", MinFetchBytes)
/** the maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediate satisfy min.fetch.bytes */
val maxFetchWaitMs = Utils.getInt(props, "max.fetch.wait.ms", MaxFetchWaitMs)
/** backoff time between retries during rebalance */
val rebalanceBackoffMs = Utils.getInt(props, "rebalance.backoff.ms", zkSyncTimeMs)

View File

@ -18,6 +18,7 @@
package kafka.consumer
import java.io.IOException
import java.nio.channels.ClosedByInterruptException
import java.util.concurrent.CountDownLatch
import kafka.api.{FetchRequestBuilder, OffsetRequest}
import kafka.cluster.Broker
@ -33,7 +34,7 @@ class FetcherRunnable(val name: String,
val partitionTopicInfos: List[PartitionTopicInfo])
extends Thread(name) with Logging {
private val shutdownLatch = new CountDownLatch(1)
private val simpleConsumer = new SimpleConsumer(broker.host, broker.port, config.socketTimeoutMs,
private val simpleConsumer = new SimpleConsumer(broker.host, broker.port, config.maxFetchWaitMs + config.socketTimeoutMs,
config.socketBufferSize)
@volatile
private var stopped = false
@ -54,19 +55,19 @@ class FetcherRunnable(val name: String,
var reqId = 0
try {
while (!stopped) {
// TODO: fix up the max wait and min bytes
val builder = new FetchRequestBuilder().
correlationId(reqId).
clientId(config.consumerId.getOrElse(name)).
maxWait(0).
minBytes(0)
maxWait(config.maxFetchWaitMs).
minBytes(config.minFetchBytes)
partitionTopicInfos.foreach(pti =>
builder.addFetch(pti.topic, pti.partitionId, pti.getFetchOffset(), config.fetchSize)
)
val fetchRequest = builder.build()
trace("fetch request: " + fetchRequest)
val start = System.currentTimeMillis
val response = simpleConsumer.fetch(fetchRequest)
trace("Fetch completed in " + (System.currentTimeMillis - start) + " ms with max wait of " + config.maxFetchWaitMs)
var read = 0L
for(infopti <- partitionTopicInfos) {
@ -74,7 +75,7 @@ class FetcherRunnable(val name: String,
try {
var done = false
if(messages.getErrorCode == ErrorMapping.OffsetOutOfRangeCode) {
info("offset for " + infopti + " out of range")
info("Offset for " + infopti + " out of range")
// see if we can fix this error
val resetOffset = resetConsumerOffsets(infopti.topic, infopti.partitionId)
if(resetOffset >= 0) {
@ -86,36 +87,35 @@ class FetcherRunnable(val name: String,
if (!done)
read += infopti.enqueue(messages, infopti.getFetchOffset)
} catch {
case e1: IOException =>
case e: IOException =>
// something is wrong with the socket, re-throw the exception to stop the fetcher
throw e1
case e2 =>
throw e
case e =>
if (!stopped) {
// this is likely a repeatable error, log it and trigger an exception in the consumer
error("error in FetcherRunnable for " + infopti, e2)
infopti.enqueueError(e2, infopti.getFetchOffset)
error("Error in fetcher for " + infopti, e)
infopti.enqueueError(e, infopti.getFetchOffset)
}
// re-throw the exception to stop the fetcher
throw e2
throw e
}
}
reqId = if(reqId == Int.MaxValue) 0 else reqId + 1
trace("fetched bytes: " + read)
if(read == 0) {
debug("backing off " + config.fetcherBackoffMs + " ms")
Thread.sleep(config.fetcherBackoffMs)
}
}
} catch {
case e: ClosedByInterruptException =>
// we interrupted ourselves, close quietly
debug("Fetch request interrupted, exiting...")
case e =>
if (stopped)
info("FecherRunnable " + this + " interrupted")
if(stopped)
info("Fetcher stopped...")
else
error("error in FetcherRunnable ", e)
error("Error in fetcher ", e)
}
info("stopping fetcher " + name + " to host " + broker.host)
info("Stopping fetcher " + name + " to host " + broker.host)
Utils.swallow(logger.info, simpleConsumer.close)
shutdownComplete()
}
@ -139,7 +139,7 @@ class FetcherRunnable(val name: String,
val topicDirs = new ZKGroupTopicDirs(config.groupId, topic)
// reset manually in zookeeper
info("updating partition " + partitionId + " for topic " + topic + " with " +
info("Updating partition " + partitionId + " for topic " + topic + " with " +
(if(offset == OffsetRequest.EarliestTime) "earliest " else " latest ") + "offset " + offsets(0))
ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + partitionId, offsets(0).toString)

View File

@ -66,7 +66,7 @@ class SimpleConsumer( val host: String,
response = blockingChannel.receive()
} catch {
case e : java.io.IOException =>
info("Reconnect in due to socket error: ", e)
info("Reconnect due to socket error: ", e)
// retry once
try {
reconnect()

View File

@ -92,7 +92,7 @@ class FileMessageSet private[kafka](private[message] val channel: FileChannel,
/**
* Return a message set which is a view into this set starting from the given offset and with the given size limit.
*/
def read(readOffset: Long, size: Long): MessageSet = {
def read(readOffset: Long, size: Long): FileMessageSet = {
new FileMessageSet(channel, this.offset + readOffset, scala.math.min(this.offset + readOffset + size, highWaterMark),
false, new AtomicBoolean(false))
}

View File

@ -22,7 +22,10 @@ import java.util.concurrent._
object RequestChannel {
val AllDone = new Request(1, 2, null, 0)
case class Request(processor: Int, requestKey: Any, request: Receive, start: Long)
case class Response(processor: Int, requestKey: Any, response: Send, start: Long, elapsed: Long)
case class Response(processor: Int, requestKey: Any, response: Send, start: Long, elapsed: Long) {
def this(request: Request, send: Send, ellapsed: Long) =
this(request.processor, request.requestKey, send, request.start, ellapsed)
}
}
class RequestChannel(val numProcessors: Int, val queueSize: Int) {

View File

@ -238,7 +238,7 @@ private[kafka] class Processor(val id: Int,
private def processNewResponses() {
var curr = requestChannel.receiveResponse(id)
while(curr != null) {
trace("Socket server received response to send: " + curr)
trace("Socket server received response to send, registering for write: " + curr)
val key = curr.requestKey.asInstanceOf[SelectionKey]
try {
key.interestOps(SelectionKey.OP_WRITE)

View File

@ -65,7 +65,7 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging {
throw new MissingConfigException("topic must be specified by the Kafka log4j appender")
if(serializerClass == null) {
serializerClass = "kafka.serializer.StringEncoder"
LogLog.warn("Using default encoder - kafka.serializer.StringEncoder")
LogLog.debug("Using default encoder - kafka.serializer.StringEncoder")
}
props.put("serializer.class", serializerClass)
val config : ProducerConfig = new ProducerConfig(props)

View File

@ -65,7 +65,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
val partitionedData = partitionAndCollate(messages)
val failedProduceRequests = new ListBuffer[ProducerData[K,Message]]
try {
for ( (brokerid, eventsPerBrokerMap) <- partitionedData ) {
for ((brokerid, eventsPerBrokerMap) <- partitionedData) {
if (logger.isTraceEnabled)
eventsPerBrokerMap.foreach(partitionAndEvent => trace("Handling event for Topic: %s, Broker: %d, Partitions: %s"
.format(partitionAndEvent._1, brokerid, partitionAndEvent._2)))
@ -163,7 +163,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
throw new NoLeaderForPartitionException("No leader for some partition(s) on broker %d".format(brokerId))
if(messagesPerTopic.size > 0) {
val topics = new HashMap[String, ListBuffer[PartitionData]]()
for( ((topicName, partitionId), messagesSet) <- messagesPerTopic ) {
for(((topicName, partitionId), messagesSet) <- messagesPerTopic) {
topics.get(topicName) match {
case Some(x) => trace("found " + topicName)
case None => topics += topicName -> new ListBuffer[PartitionData]() //create a new listbuffer for this topic

View File

@ -19,55 +19,80 @@ package kafka.server
import java.io.IOException
import java.lang.IllegalStateException
import java.util.concurrent.atomic._
import kafka.admin.{CreateTopicCommand, AdminUtils}
import kafka.api._
import kafka.log._
import kafka.message._
import kafka.network._
import org.apache.log4j.Logger
import scala.collection.mutable.ListBuffer
import scala.collection._
import kafka.utils.{SystemTime, Logging}
import kafka.common.{FetchRequestFormatException, ErrorMapping}
import kafka.common._
import scala.math._
/**
* Logic to handle the various Kafka requests
*/
class KafkaApis(val logManager: LogManager, val kafkaZookeeper: KafkaZooKeeper) extends Logging {
class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager, val kafkaZookeeper: KafkaZooKeeper) extends Logging {
private val fetchRequestPurgatory = new FetchRequestPurgatory(requestChannel)
private val requestLogger = Logger.getLogger("kafka.request.logger")
def handle(receive: Receive): Option[Send] = {
val apiId = receive.buffer.getShort()
/**
* Top-level method that handles all requests and multiplexes to the right api
*/
def handle(request: RequestChannel.Request) {
val apiId = request.request.buffer.getShort()
apiId match {
case RequestKeys.Produce => handleProducerRequest(receive)
case RequestKeys.Fetch => handleFetchRequest(receive)
case RequestKeys.Offsets => handleOffsetRequest(receive)
case RequestKeys.TopicMetadata => handleTopicMetadataRequest(receive)
case RequestKeys.Produce => handleProducerRequest(request)
case RequestKeys.Fetch => handleFetchRequest(request)
case RequestKeys.Offsets => handleOffsetRequest(request)
case RequestKeys.TopicMetadata => handleTopicMetadataRequest(request)
case _ => throw new IllegalStateException("No mapping found for handler id " + apiId)
}
}
def handleProducerRequest(receive: Receive): Option[Send] = {
/**
* Handle a produce request
*/
def handleProducerRequest(request: RequestChannel.Request) {
val produceRequest = ProducerRequest.readFrom(request.request.buffer)
val sTime = SystemTime.milliseconds
val request = ProducerRequest.readFrom(receive.buffer)
if(requestLogger.isTraceEnabled)
requestLogger.trace("Producer request " + request.toString)
val response = handleProducerRequest(request)
val response = produce(produceRequest)
debug("kafka produce time " + (SystemTime.milliseconds - sTime) + " ms")
Some(new ProducerResponseSend(response))
requestChannel.sendResponse(new RequestChannel.Response(request, new ProducerResponseSend(response), -1))
// Now check any outstanding fetches this produce just unblocked
var satisfied = new mutable.ArrayBuffer[DelayedFetch]
for(topicData <- produceRequest.data) {
for(partition <- topicData.partitionData)
satisfied ++= fetchRequestPurgatory.update((topicData.topic, partition.partition), topicData)
}
// send any newly unblocked responses
for(fetchReq <- satisfied) {
val topicData = readMessageSets(fetchReq.fetch.offsetInfo)
val response = new FetchResponse(FetchRequest.CurrentVersion, fetchReq.fetch.correlationId, topicData)
requestChannel.sendResponse(new RequestChannel.Response(fetchReq.request, new FetchResponseSend(response, ErrorMapping.NoError), -1))
}
}
private def handleProducerRequest(request: ProducerRequest): ProducerResponse = {
val requestSize = request.getNumTopicPartitions
/**
* Helper method for handling a parsed producer request
*/
private def produce(request: ProducerRequest): ProducerResponse = {
val requestSize = request.topicPartitionCount
val errors = new Array[Short](requestSize)
val offsets = new Array[Long](requestSize)
var msgIndex = -1
for( topicData <- request.data ) {
for( partitionData <- topicData.partitionData ) {
for(topicData <- request.data) {
for(partitionData <- topicData.partitionData) {
msgIndex += 1
val partition = partitionData.getTranslatedPartition(topicData.topic, logManager.chooseRandomPartition)
val partition = partitionData.translatePartition(topicData.topic, logManager.chooseRandomPartition)
try {
// TODO: need to handle ack's here! Will probably move to another method.
kafkaZookeeper.ensurePartitionOnThisBroker(topicData.topic, partition)
@ -82,7 +107,7 @@ class KafkaApis(val logManager: LogManager, val kafkaZookeeper: KafkaZooKeeper)
e match {
case _: IOException =>
fatal("Halting due to unrecoverable I/O error while handling producer request: " + e.getMessage, e)
Runtime.getRuntime.halt(1)
System.exit(1)
case _ =>
errors(msgIndex) = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]).toShort
offsets(msgIndex) = -1
@ -93,8 +118,11 @@ class KafkaApis(val logManager: LogManager, val kafkaZookeeper: KafkaZooKeeper)
new ProducerResponse(ProducerResponse.CurrentVersion, request.correlationId, errors, offsets)
}
def handleFetchRequest(request: Receive): Option[Send] = {
val fetchRequest = FetchRequest.readFrom(request.buffer)
/**
* Handle a fetch request
*/
def handleFetchRequest(request: RequestChannel.Request) {
val fetchRequest = FetchRequest.readFrom(request.request.buffer)
if(requestLogger.isTraceEnabled)
requestLogger.trace("Fetch request " + fetchRequest.toString)
@ -104,13 +132,54 @@ class KafkaApis(val logManager: LogManager, val kafkaZookeeper: KafkaZooKeeper)
} catch {
case e:FetchRequestFormatException =>
val response = new FetchResponse(FetchResponse.CurrentVersion, fetchRequest.correlationId, Array.empty)
return Some(new FetchResponseSend(response, ErrorMapping.InvalidFetchRequestFormatCode))
val channelResponse = new RequestChannel.Response(request, new FetchResponseSend(response, ErrorMapping.InvalidFetchRequestFormatCode), -1)
requestChannel.sendResponse(channelResponse)
}
val fetchedData = new ListBuffer[TopicData]()
// if there are enough bytes available right now we can answer the request, otherwise we have to punt
val availableBytes = availableFetchBytes(fetchRequest)
if(fetchRequest.maxWait <= 0 || availableBytes >= fetchRequest.minBytes) {
val topicData = readMessageSets(fetchRequest.offsetInfo)
val response = new FetchResponse(FetchRequest.CurrentVersion, fetchRequest.correlationId, topicData)
requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response, ErrorMapping.NoError), -1))
} else {
// create a list of (topic, partition) pairs to use as keys for this delayed request
val keys: Seq[Any] = fetchRequest.offsetInfo.flatMap(o => o.partitions.map((o.topic, _)))
val delayedFetch = new DelayedFetch(keys, request, fetchRequest, fetchRequest.maxWait, availableBytes)
fetchRequestPurgatory.watch(delayedFetch)
}
}
/**
* Calculate the number of available bytes for the given fetch request
*/
private def availableFetchBytes(fetchRequest: FetchRequest): Long = {
var totalBytes = 0L
for(offsetDetail <- fetchRequest.offsetInfo) {
val info = new ListBuffer[PartitionData]()
for(i <- 0 until offsetDetail.partitions.size) {
try {
val maybeLog = logManager.getLog(offsetDetail.topic, offsetDetail.partitions(i))
val available = maybeLog match {
case Some(log) => max(0, log.getHighwaterMark - offsetDetail.offsets(i))
case None => 0
}
totalBytes += math.min(offsetDetail.fetchSizes(i), available)
} catch {
case e: InvalidPartitionException =>
info("Invalid partition " + offsetDetail.partitions(i) + "in fetch request from client '" + fetchRequest.clientId + "'")
}
}
}
totalBytes
}
/**
* Read from all the offset details given and produce an array of topic datas
*/
private def readMessageSets(offsets: Seq[OffsetDetail]): Array[TopicData] = {
val fetchedData = new mutable.ArrayBuffer[TopicData]()
for(offsetDetail <- offsets) {
val info = new mutable.ArrayBuffer[PartitionData]()
val topic = offsetDetail.topic
val (partitions, offsets, fetchSizes) = (offsetDetail.partitions, offsetDetail.offsets, offsetDetail.fetchSizes)
for( (partition, offset, fetchSize) <- (partitions, offsets, fetchSizes).zipped.map((_,_,_)) ) {
@ -122,10 +191,12 @@ class KafkaApis(val logManager: LogManager, val kafkaZookeeper: KafkaZooKeeper)
}
fetchedData.append(new TopicData(topic, info.toArray))
}
val response = new FetchResponse(FetchRequest.CurrentVersion, fetchRequest.correlationId, fetchedData.toArray )
Some(new FetchResponseSend(response, ErrorMapping.NoError))
fetchedData.toArray
}
/**
* Read from a single topic/partition at the given offset
*/
private def readMessageSet(topic: String, partition: Int, offset: Long, maxSize: Int): Either[Int, MessageSet] = {
var response: Either[Int, MessageSet] = null
try {
@ -140,22 +211,28 @@ class KafkaApis(val logManager: LogManager, val kafkaZookeeper: KafkaZooKeeper)
response
}
def handleOffsetRequest(request: Receive): Option[Send] = {
val offsetRequest = OffsetRequest.readFrom(request.buffer)
/**
* Service the offset request API
*/
def handleOffsetRequest(request: RequestChannel.Request) {
val offsetRequest = OffsetRequest.readFrom(request.request.buffer)
if(requestLogger.isTraceEnabled)
requestLogger.trace("Offset request " + offsetRequest.toString)
val offsets = logManager.getOffsets(offsetRequest)
val response = new OffsetArraySend(offsets)
Some(response)
requestChannel.sendResponse(new RequestChannel.Response(request, response, -1))
}
def handleTopicMetadataRequest(request: Receive): Option[Send] = {
val metadataRequest = TopicMetadataRequest.readFrom(request.buffer)
/**
* Service the topic metadata request API
*/
def handleTopicMetadataRequest(request: RequestChannel.Request) {
val metadataRequest = TopicMetadataRequest.readFrom(request.request.buffer)
if(requestLogger.isTraceEnabled)
requestLogger.trace("Topic metadata request " + metadataRequest.toString())
val topicsMetadata = new ListBuffer[TopicMetadata]()
val topicsMetadata = new mutable.ArrayBuffer[TopicMetadata]()
val config = logManager.getServerConfig
val zkClient = kafkaZookeeper.getZookeeperClient
val topicMetadataList = AdminUtils.getTopicMetaDataFromZK(metadataRequest.topics, zkClient)
@ -181,6 +258,37 @@ class KafkaApis(val logManager: LogManager, val kafkaZookeeper: KafkaZooKeeper)
}
}
info("Sending response for topic metadata request")
Some(new TopicMetadataSend(topicsMetadata))
requestChannel.sendResponse(new RequestChannel.Response(request, new TopicMetadataSend(topicsMetadata), -1))
}
/**
* A delayed fetch request
*/
class DelayedFetch(keys: Seq[Any], request: RequestChannel.Request, val fetch: FetchRequest, delayMs: Long, initialSize: Long) extends DelayedRequest(keys, request, delayMs) {
val bytesAccumulated = new AtomicLong(initialSize)
}
/**
* A holding pen for fetch requests waiting to be satisfied
*/
class FetchRequestPurgatory(requestChannel: RequestChannel) extends RequestPurgatory[DelayedFetch, TopicData] {
/**
* A fetch request is satisfied when it has accumulated enough data to meet the min_bytes field
*/
def checkSatisfied(topicData: TopicData, delayedFetch: DelayedFetch): Boolean = {
val messageDataSize = topicData.partitionData.map(_.messages.sizeInBytes).sum
val accumulatedSize = delayedFetch.bytesAccumulated.addAndGet(messageDataSize)
accumulatedSize >= delayedFetch.fetch.minBytes
}
/**
* When a request expires just answer it with whatever data is present
*/
def expire(delayed: DelayedFetch) {
val topicData = readMessageSets(delayed.fetch.offsetInfo)
val response = new FetchResponse(FetchRequest.CurrentVersion, delayed.fetch.correlationId, topicData)
requestChannel.sendResponse(new RequestChannel.Response(delayed.request, new FetchResponseSend(response, ErrorMapping.NoError), -1))
}
}
}

View File

@ -17,13 +17,14 @@
package kafka.server
import org.apache.log4j._
import kafka.network._
import kafka.utils._
/**
* Thread that answers kafka requests.
* A thread that answers kafka requests.
*/
class KafkaRequestHandler(val requestChannel: RequestChannel, val handle: (Receive) => Option[Send]) extends Runnable with Logging {
class KafkaRequestHandler(val requestChannel: RequestChannel, apis: KafkaApis) extends Runnable with Logging {
def run() {
while(true) {
@ -31,38 +32,23 @@ class KafkaRequestHandler(val requestChannel: RequestChannel, val handle: (Recei
trace("Processor " + Thread.currentThread.getName + " got request " + req)
if(req == RequestChannel.AllDone)
return
handle(req.request) match {
case Some(send) => {
val resp = new RequestChannel.Response(processor = req.processor,
requestKey = req.requestKey,
response = send,
start = req.start,
elapsed = -1)
requestChannel.sendResponse(resp)
trace("Processor " + Thread.currentThread.getName + " sent response " + resp)
}
case None =>
}
apis.handle(req)
}
}
def shutdown() {
requestChannel.sendRequest(RequestChannel.AllDone)
}
def shutdown(): Unit = requestChannel.sendRequest(RequestChannel.AllDone)
}
/**
* Pool of request handling threads.
*/
class KafkaRequestHandlerPool(val requestChannel: RequestChannel, val handler: (Receive) => Option[Send], numThreads: Int) {
class KafkaRequestHandlerPool(val requestChannel: RequestChannel,
val apis: KafkaApis,
numThreads: Int) {
val threads = new Array[Thread](numThreads)
val runnables = new Array[KafkaRequestHandler](numThreads)
for(i <- 0 until numThreads) {
runnables(i) = new KafkaRequestHandler(requestChannel, handler)
threads(i) = new Thread(runnables(i), "kafka-request-handler-" + i)
threads(i).setDaemon(true)
runnables(i) = new KafkaRequestHandler(requestChannel, apis)
threads(i) = Utils.daemonThread("kafka-request-handler-" + i, runnables(i))
threads(i).start()
}

View File

@ -70,8 +70,8 @@ class KafkaServer(val config: KafkaConfig) extends Logging {
kafkaZookeeper = new KafkaZooKeeper(config, addReplica, getReplica)
requestHandlerPool = new KafkaRequestHandlerPool(socketServer.requestChannel,
new KafkaApis(logManager, kafkaZookeeper).handle, config.numIoThreads)
val apis = new KafkaApis(socketServer.requestChannel, logManager, kafkaZookeeper)
requestHandlerPool = new KafkaRequestHandlerPool(socketServer.requestChannel, apis, config.numIoThreads)
socketServer.startup
Mx4jLoader.maybeLoad

View File

@ -0,0 +1,278 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import scala.collection._
import java.util.LinkedList
import java.util.concurrent._
import java.util.concurrent.atomic._
import kafka.api._
import kafka.network._
import kafka.utils._
/**
* A request whose processing needs to be delayed for at most the given delayMs
* The associated keys are used for bookeeping, and represent the "trigger" that causes this request to check if it is satisfied,
* for example a key could be a (topic, partition) pair.
*/
class DelayedRequest(val keys: Seq[Any], val request: RequestChannel.Request, delayMs: Long) extends DelayedItem[RequestChannel.Request](request, delayMs) {
val satisfied = new AtomicBoolean(false)
}
/**
* A helper class for dealing with asynchronous requests with a timeout. A DelayedRequest has a request to delay
* and also a list of keys that can trigger the action. Implementations can add customized logic to control what it means for a given
* request to be satisfied. For example it could be that we are waiting for user-specified number of acks on a given (topic, partition)
* to be able to respond to a request or it could be that we are waiting for a given number of bytes to accumulate on a given request
* to be able to respond to that request (in the simple case we might wait for at least one byte to avoid busy waiting).
*
* For us the key is generally a (topic, partition) pair.
* By calling
* watch(delayedRequest)
* we will add triggers for each of the given keys. It is up to the user to then call
* val satisfied = update(key, request)
* when a request relevant to the given key occurs. This triggers bookeeping logic and returns back any requests satisfied by this
* new request.
*
* An implementation provides extends two helper functions
* def checkSatisfied(request: R, delayed: T): Boolean
* this function returns true if the given request (in combination with whatever previous requests have happened) satisfies the delayed
* request delayed. This method will likely also need to do whatever bookkeeping is necessary.
*
* The second function is
* def expire(delayed: T)
* this function handles delayed requests that have hit their time limit without being satisfied.
*
*/
abstract class RequestPurgatory[T <: DelayedRequest, R] {
/* a list of requests watching each key */
private val watchersForKey = new ConcurrentHashMap[Any, Watchers]
/* background thread expiring requests that have been waiting too long */
private val expiredRequestReaper = new ExpiredRequestReaper
private val expirationThread = Utils.daemonThread("request-expiration-task", expiredRequestReaper)
expirationThread.start()
/**
* Add a new delayed request watching the contained keys
*/
def watch(delayedRequest: T) {
for(key <- delayedRequest.keys) {
var lst = watchersFor(key)
lst.add(delayedRequest)
}
expiredRequestReaper.enqueue(delayedRequest)
}
/**
* Update any watchers and return a list of newly satisfied requests.
*/
def update(key: Any, request: R): Seq[T] = {
val w = watchersForKey.get(key)
if(w == null)
Seq.empty
else
w.collectSatisfiedRequests(request)
}
private def watchersFor(key: Any): Watchers = {
var lst = watchersForKey.get(key)
if(lst == null) {
watchersForKey.putIfAbsent(key, new Watchers)
lst = watchersForKey.get(key)
}
lst
}
/**
* Check if this request satisfied this delayed request
*/
protected def checkSatisfied(request: R, delayed: T): Boolean
/**
* Handle an expired delayed request
*/
protected def expire(delayed: T)
/**
* Shutdown the expirey thread
*/
def shutdown() {
expiredRequestReaper.shutdown()
}
/**
* A linked list of DelayedRequests watching some key with some associated bookeeping logic
*/
private class Watchers {
/* a few magic parameters to help do cleanup to avoid accumulating old watchers */
private val CleanupThresholdSize = 100
private val CleanupThresholdPrct = 0.5
private val requests = new LinkedList[T]
/* you can only change this if you have added something or marked something satisfied */
var liveCount = 0.0
def add(t: T) {
synchronized {
requests.add(t)
liveCount += 1
maybePurge()
}
}
private def maybePurge() {
if(requests.size > CleanupThresholdSize && liveCount / requests.size < CleanupThresholdPrct) {
val iter = requests.iterator()
while(iter.hasNext) {
val curr = iter.next
if(curr.satisfied.get())
iter.remove()
}
}
}
def decLiveCount() {
synchronized {
liveCount -= 1
}
}
def collectSatisfiedRequests(request: R): Seq[T] = {
val response = new mutable.ArrayBuffer[T]
synchronized {
val iter = requests.iterator()
while(iter.hasNext) {
val curr = iter.next
if(curr.satisfied.get) {
// another thread has satisfied this request, remove it
iter.remove()
} else {
if(checkSatisfied(request, curr)) {
iter.remove()
val updated = curr.satisfied.compareAndSet(false, true)
if(updated == true) {
response += curr
liveCount -= 1
expiredRequestReaper.satisfyRequest()
}
}
}
}
}
response
}
}
/**
* Runnable to expire requests that have sat unfullfilled past their deadline
*/
private class ExpiredRequestReaper extends Runnable with Logging {
/* a few magic parameters to help do cleanup to avoid accumulating old watchers */
private val CleanupThresholdSize = 100
private val CleanupThresholdPrct = 0.5
private val delayed = new DelayQueue[T]
private val running = new AtomicBoolean(true)
private val shutdownLatch = new CountDownLatch(1)
private val needsPurge = new AtomicBoolean(false)
/* The count of elements in the delay queue that are unsatisfied */
private val unsatisfied = new AtomicInteger(0)
/** Main loop for the expiry thread */
def run() {
while(running.get) {
try {
val curr = pollExpired()
expire(curr)
} catch {
case ie: InterruptedException =>
if(needsPurge.getAndSet(false)) {
val purged = purgeSatisfied()
debug("Forced purge of " + purged + " requests from delay queue.")
}
case e: Exception =>
error("Error in long poll expiry thread: ", e)
}
}
shutdownLatch.countDown()
}
/** Add a request to be expired */
def enqueue(t: T) {
delayed.add(t)
unsatisfied.incrementAndGet()
if(unsatisfied.get > CleanupThresholdSize && unsatisfied.get / delayed.size.toDouble < CleanupThresholdPrct)
forcePurge()
}
private def forcePurge() {
needsPurge.set(true)
expirationThread.interrupt()
}
/** Shutdown the expiry thread*/
def shutdown() {
debug("Shutting down request expiry thread")
running.set(false)
expirationThread.interrupt()
shutdownLatch.await()
}
/** Record the fact that we satisfied a request in the stats for the expiry queue */
def satisfyRequest(): Unit = unsatisfied.getAndDecrement()
/**
* Get the next expired event
*/
private def pollExpired(): T = {
while(true) {
val curr = delayed.take()
val updated = curr.satisfied.compareAndSet(false, true)
if(updated) {
unsatisfied.getAndDecrement()
for(key <- curr.keys)
watchersFor(key).decLiveCount()
return curr
}
}
throw new RuntimeException("This should not happen")
}
/**
* Delete all expired events from the delay queue
*/
private def purgeSatisfied(): Int = {
var purged = 0
val iter = delayed.iterator()
while(iter.hasNext) {
val curr = iter.next()
if(curr.satisfied.get) {
iter.remove()
purged += 1
}
}
purged
}
}
}

View File

@ -39,7 +39,7 @@ class DelayedItem[T](val item: T, delay: Long, unit: TimeUnit) extends Delayed {
def compareTo(d: Delayed): Int = {
val delayed = d.asInstanceOf[DelayedItem[T]]
val myEnd = createdMs + delayMs
val yourEnd = delayed.createdMs - delayed.delayMs
val yourEnd = delayed.createdMs + delayed.delayMs
if(myEnd < yourEnd) -1
else if(myEnd > yourEnd) 1

View File

@ -23,7 +23,6 @@ import java.lang.IllegalStateException
/**
* A scheduler for running jobs in the background
* TODO: ScheduledThreadPoolExecutor notriously swallows exceptions
*/
class KafkaScheduler(val numThreads: Int, val baseThreadName: String, isDaemon: Boolean) extends Logging {
private val threadId = new AtomicLong(0)
@ -32,11 +31,7 @@ class KafkaScheduler(val numThreads: Int, val baseThreadName: String, isDaemon:
def startUp = {
executor = new ScheduledThreadPoolExecutor(numThreads, new ThreadFactory() {
def newThread(runnable: Runnable): Thread = {
val t = new Thread(runnable, baseThreadName + threadId.getAndIncrement)
t.setDaemon(isDaemon)
t
}
def newThread(runnable: Runnable): Thread = Utils.daemonThread(baseThreadName + threadId.getAndIncrement, runnable)
})
executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false)
executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false)
@ -57,12 +52,12 @@ class KafkaScheduler(val numThreads: Int, val baseThreadName: String, isDaemon:
def shutdownNow() {
checkIfExecutorHasStarted
executor.shutdownNow()
info("force shutdown scheduler " + baseThreadName)
info("Forcing shutdown of scheduler " + baseThreadName)
}
def shutdown() {
checkIfExecutorHasStarted
executor.shutdown()
info("shutdown scheduler " + baseThreadName)
info("Shutdown scheduler " + baseThreadName)
}
}

View File

@ -97,6 +97,11 @@ object Utils extends Logging {
def newThread(name: String, runnable: Runnable, daemon: Boolean): Thread = {
val thread = new Thread(runnable, name)
thread.setDaemon(daemon)
thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
def uncaughtException(t: Thread, e: Throwable) {
error("Uncaught exception in thread '" + t.getName + "':", e)
}
})
thread
}

View File

@ -18,7 +18,7 @@ log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
log4j.logger.kafka=ERROR
log4j.logger.kafka=INFO
# zkclient can be verbose, during debugging it is common to adjust is separately
log4j.logger.org.I0Itec.zkclient.ZkClient=WARN

View File

@ -0,0 +1,72 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka
import java.util.Properties
import kafka.consumer._
import kafka.producer._
import kafka.message._
object TestEndToEndLatency {
def main(args: Array[String]) {
if(args.length != 2) {
System.err.println("USAGE: java " + getClass().getName + " zookeeper_connect num_messages")
System.exit(1)
}
val zkConnect = args(0)
val numMessages = args(1).toInt
val topic = "test"
val consumerProps = new Properties()
consumerProps.put("groupid", topic)
consumerProps.put("auto.commit", "true")
consumerProps.put("autooffset.reset", "largest")
consumerProps.put("zk.connect", zkConnect)
consumerProps.put("socket.timeout.ms", 1201000.toString)
val config = new ConsumerConfig(consumerProps)
val connector = Consumer.create(config)
var stream = connector.createMessageStreams(Map(topic -> 1)).get(topic).head.head
val iter = stream.iterator
val producerProps = new Properties()
producerProps.put("zk.connect", zkConnect)
producerProps.put("producer.type", "sync")
val producer = new Producer[Any, Any](new ProducerConfig(producerProps))
val message = new Message("hello there beautiful".getBytes)
var totalTime = 0.0
var totalSize = 0L
for(i <- 0 until numMessages) {
var begin = System.nanoTime
producer.send(new ProducerData(topic, message))
val received = iter.next
val ellapsed = System.nanoTime - begin
// poor man's progress bar
if(i % 10000 == 0)
println(i + "\t" + ellapsed / 1000.0 / 1000.0)
totalTime += ellapsed
totalSize += received.size
}
println("Avg latency: " + (totalTime / numMessages / 1000.0 / 1000.0)) + "ms"
producer.close()
connector.shutdown()
System.exit(0)
}
}

View File

@ -35,6 +35,7 @@ import kafka.utils.TestUtils._
class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with Logging {
val RebalanceBackoffMs = 5000
var dirs : ZKGroupTopicDirs = null
val zookeeperConnect = TestZKUtils.zookeeperConnect
val numNodes = 2
@ -63,7 +64,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
}
def testBasic() {
val requestHandlerLogger = Logger.getLogger(classOf[KafkaApis])
val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
requestHandlerLogger.setLevel(Level.FATAL)
// test consumer timeout logic
@ -117,8 +118,9 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
zkConsumerConnector1.commitOffsets
// create a consumer
val consumerConfig2 = new ConsumerConfig(
TestUtils.createConsumerProperties(zkConnect, group, consumer2))
val consumerConfig2 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer2)) {
override val rebalanceBackoffMs = RebalanceBackoffMs
}
val zkConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, true)
val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Predef.Map(topic -> 1))
// send some messages to each broker
@ -202,8 +204,9 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
zkConsumerConnector1.commitOffsets
// create a consumer
val consumerConfig2 = new ConsumerConfig(
TestUtils.createConsumerProperties(zkConnect, group, consumer2))
val consumerConfig2 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer2)) {
override val rebalanceBackoffMs = RebalanceBackoffMs
}
val zkConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, true)
val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Predef.Map(topic -> 1))
// send some messages to each broker
@ -300,7 +303,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
val actual_2 = getZKChildrenValues(dirs.consumerOwnerDir)
val expected_2 = List( ("0", "group1_consumer0-0"),
("1", "group1_consumer0-0"))
assertEquals(expected_2, actual_2)
assertEquals(expected_2, actual_2)
zkConsumerConnector1.shutdown
@ -372,7 +375,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
// also check partition ownership
val actual_1 = getZKChildrenValues(dirs.consumerOwnerDir)
val expected_1 = List( ("0", "group1_consumer1-0"))
assertEquals(expected_1, actual_1)
assertEquals(expected_1, actual_1)
val receivedMessages1 = getMessages(nMessages, topicMessageStreams1)
assertEquals(nMessages, receivedMessages1.size)

View File

@ -34,11 +34,11 @@ class AutoOffsetResetTest extends JUnit3Suite with KafkaServerTestHarness with L
val topic = "test_topic"
val group = "default_group"
val testConsumer = "consumer"
val brokerPort = 9892
val configs = List(new KafkaConfig(TestUtils.createBrokerConfig(0, brokerPort)))
val numMessages = 10
val largeOffset = 10000
val smallOffset = -1
val BrokerPort = 9892
val configs = List(new KafkaConfig(TestUtils.createBrokerConfig(0, BrokerPort)))
val NumMessages = 10
val LargeOffset = 10000
val SmallOffset = -1
val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandler])
@ -55,165 +55,55 @@ class AutoOffsetResetTest extends JUnit3Suite with KafkaServerTestHarness with L
super.tearDown
}
def testEarliestOffsetResetForward() = {
def testResetToEarliestWhenOffsetTooHigh() =
assertEquals(NumMessages, resetAndConsume(NumMessages, "smallest", LargeOffset))
def testResetToEarliestWhenOffsetTooLow() =
assertEquals(NumMessages, resetAndConsume(NumMessages, "smallest", SmallOffset))
def testResetToLatestWhenOffsetTooHigh() =
assertEquals(0, resetAndConsume(NumMessages, "largest", LargeOffset))
def testResetToLatestWhenOffsetTooLow() =
assertEquals(0, resetAndConsume(NumMessages, "largest", SmallOffset))
/* Produce the given number of messages, create a consumer with the given offset policy,
* then reset the offset to the given value and consume until we get no new messages.
* Returns the count of messages received.
*/
def resetAndConsume(numMessages: Int, resetTo: String, offset: Long): Int = {
val producer: Producer[String, Message] = TestUtils.createProducer(zkConnect)
for(i <- 0 until numMessages) {
for(i <- 0 until numMessages)
producer.send(new ProducerData[String, Message](topic, topic, new Message("test".getBytes())))
}
// update offset in zookeeper for consumer to jump "forward" in time
val dirs = new ZKGroupTopicDirs(group, topic)
var consumerProps = TestUtils.createConsumerProperties(zkConnect, group, testConsumer)
consumerProps.put("autooffset.reset", "smallest")
consumerProps.put("autooffset.reset", resetTo)
consumerProps.put("consumer.timeout.ms", "2000")
consumerProps.put("max.fetch.wait.ms", "0")
val consumerConfig = new ConsumerConfig(consumerProps)
TestUtils.updateConsumerOffset(consumerConfig, dirs.consumerOffsetDir + "/" + "0", largeOffset)
info("Updated consumer offset to " + largeOffset)
TestUtils.updateConsumerOffset(consumerConfig, dirs.consumerOffsetDir + "/" + "0", offset)
info("Updated consumer offset to " + offset)
Thread.sleep(500)
val consumerConnector: ConsumerConnector = Consumer.create(consumerConfig)
val messageStreams = consumerConnector.createMessageStreams(Map(topic -> 1))
val messageStream = consumerConnector.createMessageStreams(Map(topic -> 1))(topic).head
var threadList = List[Thread]()
val nMessages : AtomicInteger = new AtomicInteger(0)
for ((topic, streamList) <- messageStreams)
for (i <- 0 until streamList.length)
threadList ::= new Thread("kafka-zk-consumer-" + i) {
override def run() {
try {
for (message <- streamList(i)) {
nMessages.incrementAndGet
}
}
catch {
case te: ConsumerTimeoutException => info("Consumer thread timing out..")
case _: InterruptedException =>
case _: ClosedByInterruptException =>
case e => throw e
}
}
}
for (thread <- threadList)
thread.start
threadList(0).join(2000)
info("Asserting...")
assertEquals(numMessages, nMessages.get)
consumerConnector.shutdown
}
def testEarliestOffsetResetBackward() = {
val producer: Producer[String, Message] = TestUtils.createProducer(zkConnect)
for(i <- 0 until numMessages) {
producer.send(new ProducerData[String, Message](topic, topic, new Message("test".getBytes())))
var received = 0
val iter = messageStream.iterator
try {
for (i <- 0 until numMessages) {
iter.next // will throw a timeout exception if the message isn't there
received += 1
}
} catch {
case e: ConsumerTimeoutException =>
info("consumer timed out after receiving " + received + " messages.")
}
// update offset in zookeeper for consumer to jump "forward" in time
val dirs = new ZKGroupTopicDirs(group, topic)
var consumerProps = TestUtils.createConsumerProperties(zkConnect, group, testConsumer)
consumerProps.put("autooffset.reset", "smallest")
consumerProps.put("consumer.timeout.ms", "2000")
val consumerConfig = new ConsumerConfig(consumerProps)
TestUtils.updateConsumerOffset(consumerConfig, dirs.consumerOffsetDir + "/" + "0", smallOffset)
info("Updated consumer offset to " + smallOffset)
val consumerConnector: ConsumerConnector = Consumer.create(consumerConfig)
val messageStreams = consumerConnector.createMessageStreams(Map(topic -> 1))
var threadList = List[Thread]()
val nMessages : AtomicInteger = new AtomicInteger(0)
for ((topic, streamList) <- messageStreams)
for (i <- 0 until streamList.length)
threadList ::= new Thread("kafka-zk-consumer-" + i) {
override def run() {
try {
for (message <- streamList(i)) {
nMessages.incrementAndGet
}
}
catch {
case _: InterruptedException =>
case _: ClosedByInterruptException =>
case e => throw e
}
}
}
for (thread <- threadList)
thread.start
threadList(0).join(2000)
assertEquals(numMessages, nMessages.get)
consumerConnector.shutdown
received
}
def testLatestOffsetResetForward() = {
val producer: Producer[String, Message] = TestUtils.createProducer(zkConnect)
for(i <- 0 until numMessages) {
producer.send(new ProducerData[String, Message](topic, topic, new Message("test".getBytes())))
}
// update offset in zookeeper for consumer to jump "forward" in time
val dirs = new ZKGroupTopicDirs(group, topic)
var consumerProps = TestUtils.createConsumerProperties(zkConnect, group, testConsumer)
consumerProps.put("autooffset.reset", "largest")
consumerProps.put("consumer.timeout.ms", "2000")
val consumerConfig = new ConsumerConfig(consumerProps)
TestUtils.updateConsumerOffset(consumerConfig, dirs.consumerOffsetDir + "/" + "0", largeOffset)
info("Updated consumer offset to " + largeOffset)
val consumerConnector: ConsumerConnector = Consumer.create(consumerConfig)
val messageStreams = consumerConnector.createMessageStreams(Map(topic -> 1))
var threadList = List[Thread]()
val nMessages : AtomicInteger = new AtomicInteger(0)
for ((topic, streamList) <- messageStreams)
for (i <- 0 until streamList.length)
threadList ::= new Thread("kafka-zk-consumer-" + i) {
override def run() {
try {
for (message <- streamList(i)) {
nMessages.incrementAndGet
}
}
catch {
case _: InterruptedException =>
case _: ClosedByInterruptException =>
case e => throw e
}
}
}
for (thread <- threadList)
thread.start
threadList(0).join(2000)
info("Asserting...")
assertEquals(0, nMessages.get)
consumerConnector.shutdown
}
}

View File

@ -82,8 +82,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness {
new OffsetDetail("topic1", Array(3, 4, 5), Array(0L, 0L, 0L), Array(1000, 1000, 1000)),
new OffsetDetail("topic2", Array(3, 4, 5), Array(0L, 0L, 0L), Array(1000, 1000, 1000))
)
val request = new FetchRequest( versionId = FetchRequest.CurrentVersion, correlationId = 0, clientId = "",
replicaId = -1, maxWait = -1, minBytes = -1, offsetInfo = offsets)
val request = new FetchRequest(offsetInfo = offsets)
try {
consumer.fetch(request)
fail("FetchRequest should throw FetchRequestFormatException due to duplicate topics")
@ -172,7 +171,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness {
{
// send some invalid offsets
val builder = new FetchRequestBuilder()
for( (topic, partition) <- topics)
for((topic, partition) <- topics)
builder.addFetch(topic, partition, -1, 10000)
try {
@ -189,7 +188,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness {
{
// send some invalid partitions
val builder = new FetchRequestBuilder()
for( (topic, partition) <- topics)
for((topic, partition) <- topics)
builder.addFetch(topic, -1, 0, 10000)
try {

View File

@ -24,7 +24,7 @@ import java.nio.ByteBuffer
import kafka.log.LogManager
import junit.framework.Assert._
import org.easymock.EasyMock
import kafka.network.BoundedByteBufferReceive
import kafka.network._
import kafka.api.{TopicMetadataSend, TopicMetadataRequest}
import kafka.cluster.Broker
import kafka.server.{KafkaZooKeeper, KafkaApis, KafkaConfig}
@ -93,7 +93,8 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
serializedMetadataRequest.rewind()
// create the kafka request handler
val kafkaRequestHandler = new KafkaApis(logManager, kafkaZookeeper)
val requestChannel = new RequestChannel(2, 5)
val apis = new KafkaApis(requestChannel, logManager, kafkaZookeeper)
// mock the receive API to return the request buffer as created above
val receivedRequest = EasyMock.createMock(classOf[BoundedByteBufferReceive])
@ -101,23 +102,18 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
EasyMock.replay(receivedRequest)
// call the API (to be tested) to get metadata
val metadataResponse = kafkaRequestHandler.handleTopicMetadataRequest(receivedRequest)
apis.handleTopicMetadataRequest(new RequestChannel.Request(processor=0, requestKey=5, request=receivedRequest, start=1))
val metadataResponse = requestChannel.receiveResponse(0).response.asInstanceOf[TopicMetadataSend].metadata
// verify the topic metadata returned
metadataResponse match {
case Some(metadata) =>
val responseBuffer = metadata.asInstanceOf[TopicMetadataSend].metadata
val topicMetadata = TopicMetadataRequest.deserializeTopicsMetadataResponse(responseBuffer)
assertEquals("Expecting metadata only for 1 topic", 1, topicMetadata.size)
assertEquals("Expecting metadata for the test topic", "test", topicMetadata.head.topic)
val partitionMetadata = topicMetadata.head.partitionsMetadata
assertEquals("Expecting metadata for 1 partition", 1, partitionMetadata.size)
assertEquals("Expecting partition id to be 0", 0, partitionMetadata.head.partitionId)
assertEquals(brokers, partitionMetadata.head.replicas)
assertNull("Not expecting log metadata", partitionMetadata.head.logMetadata.getOrElse(null))
case None =>
fail("Metadata response expected")
}
// check assertions
val topicMetadata = TopicMetadataRequest.deserializeTopicsMetadataResponse(metadataResponse)
assertEquals("Expecting metadata only for 1 topic", 1, topicMetadata.size)
assertEquals("Expecting metadata for the test topic", "test", topicMetadata.head.topic)
val partitionMetadata = topicMetadata.head.partitionsMetadata
assertEquals("Expecting metadata for 1 partition", 1, partitionMetadata.size)
assertEquals("Expecting partition id to be 0", 0, partitionMetadata.head.partitionId)
assertEquals(brokers, partitionMetadata.head.replicas)
assertNull("Not expecting log metadata", partitionMetadata.head.logMetadata.getOrElse(null))
// verify the expected calls to log manager occurred in the right order
EasyMock.verify(logManager)

View File

@ -46,7 +46,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
val nMessages = 2
def testBasic() {
val requestHandlerLogger = Logger.getLogger(classOf[KafkaApis])
val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
requestHandlerLogger.setLevel(Level.FATAL)
var actualMessages: List[Message] = Nil

View File

@ -0,0 +1,94 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import scala.collection._
import org.junit.{After, Before, Test}
import junit.framework.Assert._
import kafka.server._
import kafka.message._
import kafka.api._
import kafka.utils.TestUtils
class RequestPurgatoryTest {
val producerRequest1 = TestUtils.produceRequest("test", new ByteBufferMessageSet(new Message("hello1".getBytes)))
val producerRequest2 = TestUtils.produceRequest("test", new ByteBufferMessageSet(new Message("hello2".getBytes)))
var purgatory: MockRequestPurgatory = null
@Before
def setup() {
purgatory = new MockRequestPurgatory()
}
@After
def teardown() {
purgatory.shutdown()
}
@Test
def testRequestSatisfaction() {
val r1 = new DelayedRequest(Array("test1"), null, 100000L)
val r2 = new DelayedRequest(Array("test2"), null, 100000L)
assertEquals("With no waiting requests, nothing should be satisfied", 0, purgatory.update("test1", producerRequest1).size)
purgatory.watch(r1)
assertEquals("Still nothing satisfied", 0, purgatory.update("test1", producerRequest1).size)
purgatory.watch(r2)
assertEquals("Still nothing satisfied", 0, purgatory.update("test2", producerRequest2).size)
purgatory.satisfied += r1
assertEquals("r1 satisfied", mutable.ArrayBuffer(r1), purgatory.update("test1", producerRequest1))
assertEquals("Nothing satisfied", 0, purgatory.update("test1", producerRequest2).size)
purgatory.satisfied += r2
assertEquals("r2 satisfied", mutable.ArrayBuffer(r2), purgatory.update("test2", producerRequest2))
assertEquals("Nothing satisfied", 0, purgatory.update("test2", producerRequest2).size)
}
@Test
def testRequestExpirey() {
val expiration = 20L
val r1 = new DelayedRequest(Array("test1"), null, expiration)
val r2 = new DelayedRequest(Array("test1"), null, 200000L)
val start = System.currentTimeMillis
purgatory.watch(r1)
purgatory.watch(r2)
purgatory.awaitExpiration(r1)
val ellapsed = System.currentTimeMillis - start
println(ellapsed)
assertTrue("r1 expired", purgatory.expired.contains(r1))
assertTrue("r2 hasn't expired", !purgatory.expired.contains(r2))
assertTrue("Time for expiration was about 20ms", (ellapsed - expiration).abs < 10L)
}
class MockRequestPurgatory extends RequestPurgatory[DelayedRequest, ProducerRequest] {
val satisfied = mutable.Set[DelayedRequest]()
val expired = mutable.Set[DelayedRequest]()
def awaitExpiration(delayed: DelayedRequest) = {
delayed synchronized {
delayed.wait()
}
}
def checkSatisfied(request: ProducerRequest, delayed: DelayedRequest): Boolean = satisfied.contains(delayed)
def expire(delayed: DelayedRequest) {
expired += delayed
delayed synchronized {
delayed.notify()
}
}
}
}

View File

@ -416,6 +416,7 @@ object TestUtils extends Logging {
leaderLock.unlock()
}
}
}
object TestZKUtils {