new consumer request format; patched by Prashanth Menon; reviewed by Jun Rao and Jay Kreps; KAFKA-240

git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8@1243407 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jun Rao 2012-02-13 03:58:37 +00:00
parent 21da30456f
commit a5fb217293
37 changed files with 1038 additions and 804 deletions

View File

@ -16,27 +16,26 @@
*/
package kafka.etl;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.zip.CRC32;
import kafka.api.FetchRequest;
import kafka.javaapi.MultiFetchResponse;
import kafka.api.FetchRequestBuilder;
import kafka.api.OffsetRequest;
import kafka.common.ErrorMapping;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.Message;
import kafka.javaapi.message.MessageSet;
import kafka.message.MessageAndOffset;
import kafka.message.MessageSet;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.MultipleOutputs;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
@SuppressWarnings({ "deprecation"})
public class KafkaETLContext {
@ -59,7 +58,8 @@ public class KafkaETLContext {
protected long _offset = Long.MAX_VALUE; /*current offset*/
protected long _count; /*current count*/
protected MultiFetchResponse _response = null; /*fetch response*/
protected int requestId = 0; /* the id of the next fetch request */
protected FetchResponse _response = null; /*fetch response*/
protected Iterator<MessageAndOffset> _messageIt = null; /*message iterator*/
protected Iterator<ByteBufferMessageSet> _respIterator = null;
protected int _retry = 0;
@ -149,15 +149,19 @@ public class KafkaETLContext {
public boolean fetchMore () throws IOException {
if (!hasMore()) return false;
FetchRequest fetchRequest =
new FetchRequest(_request.getTopic(), _request.getPartition(), _offset, _bufferSize);
List<FetchRequest> array = new ArrayList<FetchRequest>();
array.add(fetchRequest);
FetchRequest fetchRequest = new FetchRequestBuilder()
.correlationId(requestId)
.clientId(_request.clientId())
.addFetch(_request.getTopic(), _request.getPartition(), _offset, _bufferSize)
.build();
long tempTime = System.currentTimeMillis();
_response = _consumer.multifetch(array);
if(_response != null)
_respIterator = _response.iterator();
_response = _consumer.fetch(fetchRequest);
if(_response != null) {
_respIterator = new ArrayList<ByteBufferMessageSet>(){{
add((ByteBufferMessageSet) _response.messageSet(_request.getTopic(), _request.getPartition()));
}}.iterator();
}
_requestTime += (System.currentTimeMillis() - tempTime);
return true;

View File

@ -29,6 +29,7 @@ public class KafkaETLRequest {
URI _uri;
int _partition;
long _offset = DEFAULT_OFFSET;
String _clientId = "KafkaHadoopETL";
public KafkaETLRequest() {
@ -83,11 +84,11 @@ public class KafkaETLRequest {
_offset = offset;
}
public String getTopic() { return _topic;}
public URI getURI () { return _uri;}
public int getPartition() { return _partition;}
public long getOffset() { return _offset;}
public String getTopic() { return _topic; }
public URI getURI () { return _uri; }
public int getPartition() { return _partition; }
public long getOffset() { return _offset; }
public String clientId() { return _clientId; }
public boolean isValidOffset() {
return _offset >= 0;

View File

@ -20,32 +20,149 @@ package kafka.api
import java.nio._
import kafka.network._
import kafka.utils._
import scala.collection.mutable.{HashMap, Buffer, ListBuffer}
object OffsetDetail {
def readFrom(buffer: ByteBuffer): OffsetDetail = {
val topic = Utils.readShortString(buffer, "UTF-8")
val partitionsCount = buffer.getInt
val partitions = new Array[Int](partitionsCount)
for (i <- 0 until partitions.length)
partitions(i) = buffer.getInt
val offsetsCount = buffer.getInt
val offsets = new Array[Long](offsetsCount)
for (i <- 0 until offsets.length)
offsets(i) = buffer.getLong
val fetchesCount = buffer.getInt
val fetchSizes = new Array[Int](fetchesCount)
for (i <- 0 until fetchSizes.length)
fetchSizes(i) = buffer.getInt
new OffsetDetail(topic, partitions, offsets, fetchSizes)
}
}
case class OffsetDetail(topic: String, partitions: Seq[Int], offsets: Seq[Long], fetchSizes: Seq[Int]) {
def writeTo(buffer: ByteBuffer) {
Utils.writeShortString(buffer, topic, "UTF-8")
if(partitions.size > Int.MaxValue || offsets.size > Int.MaxValue || fetchSizes.size > Int.MaxValue)
throw new IllegalArgumentException("Number of fetches in FetchRequest exceeds " + Int.MaxValue + ".")
buffer.putInt(partitions.length)
partitions.foreach(buffer.putInt(_))
buffer.putInt(offsets.length)
offsets.foreach(buffer.putLong(_))
buffer.putInt(fetchSizes.length)
fetchSizes.foreach(buffer.putInt(_))
}
def sizeInBytes(): Int = {
2 + topic.length() + // topic string
partitions.foldLeft(4)((s, _) => s + 4) + // each request partition (int)
offsets.foldLeft(4)((s, _) => s + 8) + // each request offset (long)
fetchSizes.foldLeft(4)((s,_) => s + 4) // each request fetch size
}
}
object FetchRequest {
val CurrentVersion = 1.shortValue()
def readFrom(buffer: ByteBuffer): FetchRequest = {
val topic = Utils.readShortString(buffer, "UTF-8")
val partition = buffer.getInt()
val offset = buffer.getLong()
val size = buffer.getInt()
new FetchRequest(topic, partition, offset, size)
val versionId = buffer.getShort
val correlationId = buffer.getInt
val clientId = Utils.readShortString(buffer, "UTF-8")
val replicaId = buffer.getInt
val maxWait = buffer.getInt
val minBytes = buffer.getInt
val offsetsCount = buffer.getInt
val offsetInfo = new Array[OffsetDetail](offsetsCount)
for(i <- 0 until offsetInfo.length)
offsetInfo(i) = OffsetDetail.readFrom(buffer)
new FetchRequest(versionId, correlationId, clientId, replicaId, maxWait, minBytes, offsetInfo)
}
}
class FetchRequest(val topic: String,
val partition: Int,
val offset: Long,
val maxSize: Int) extends Request(RequestKeys.Fetch) {
case class FetchRequest( versionId: Short,
correlationId: Int,
clientId: String,
replicaId: Int,
maxWait: Int,
minBytes: Int,
offsetInfo: Seq[OffsetDetail] ) extends Request(RequestKeys.Fetch) {
def writeTo(buffer: ByteBuffer) {
Utils.writeShortString(buffer, topic)
buffer.putInt(partition)
buffer.putLong(offset)
buffer.putInt(maxSize)
buffer.putShort(versionId)
buffer.putInt(correlationId)
Utils.writeShortString(buffer, clientId, "UTF-8")
buffer.putInt(replicaId)
buffer.putInt(maxWait)
buffer.putInt(minBytes)
buffer.putInt(offsetInfo.size)
for(topicDetail <- offsetInfo) {
topicDetail.writeTo(buffer)
}
}
def sizeInBytes(): Int = 2 + topic.length + 4 + 8 + 4
override def toString(): String= "FetchRequest(topic:" + topic + ", part:" + partition +" offset:" + offset +
" maxSize:" + maxSize + ")"
def sizeInBytes: Int = 2 + 4 + (2 + clientId.length()) + 4 + 4 + 4 + offsetInfo.foldLeft(4)(_ + _.sizeInBytes())
}
class FetchRequestBuilder() {
private var correlationId = -1
private val versionId = FetchRequest.CurrentVersion
private var clientId = ""
private var replicaId = -1 // sensible default
private var maxWait = -1 // sensible default
private var minBytes = -1 // sensible default
private val requestMap = new HashMap[String, Tuple3[Buffer[Int], Buffer[Long], Buffer[Int]]]
def addFetch(topic: String, partition: Int, offset: Long, fetchSize: Int) = {
val topicData = requestMap.getOrElseUpdate(topic, (ListBuffer[Int](), ListBuffer[Long](), ListBuffer[Int]()))
topicData._1.append(partition)
topicData._2.append(offset)
topicData._3.append(fetchSize)
this
}
def correlationId(correlationId: Int): FetchRequestBuilder = {
this.correlationId = correlationId
this
}
def clientId(clientId: String): FetchRequestBuilder = {
this.clientId = clientId
this
}
def replicaId(replicaId: Int): FetchRequestBuilder = {
this.replicaId = replicaId
this
}
def maxWait(maxWait: Int): FetchRequestBuilder = {
this.maxWait = maxWait
this
}
def minBytes(minBytes: Int): FetchRequestBuilder = {
this.minBytes = minBytes
this
}
def build() = {
val offsetDetails = requestMap.map{ topicData =>
new OffsetDetail(topicData._1, topicData._2._1.toArray, topicData._2._2.toArray, topicData._2._3.toArray)
}
new FetchRequest(versionId, correlationId, clientId, replicaId, maxWait, minBytes, offsetDetails.toArray[OffsetDetail])
}
}

View File

@ -0,0 +1,199 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.api
import java.nio.ByteBuffer
import java.nio.channels.GatheringByteChannel
import kafka.common.ErrorMapping
import kafka.message.{MessageSet, ByteBufferMessageSet}
import kafka.network.{MultiSend, Send}
import kafka.utils.Utils
object PartitionData {
def readFrom(buffer: ByteBuffer): PartitionData = {
val partition = buffer.getInt
val error = buffer.getInt
val initialOffset = buffer.getLong
val messageSetSize = buffer.getInt
val messageSetBuffer = buffer.slice()
messageSetBuffer.limit(messageSetSize)
buffer.position(buffer.position + messageSetSize)
new PartitionData(partition, error, initialOffset, new ByteBufferMessageSet(messageSetBuffer, initialOffset, error))
}
}
case class PartitionData(partition: Int, error: Int = ErrorMapping.NoError, initialOffset:Long = 0L, messages: MessageSet) {
val sizeInBytes = 4 + 4 + 8 + 4 + messages.sizeInBytes.intValue()
}
object TopicData {
def readFrom(buffer: ByteBuffer): TopicData = {
val topic = Utils.readShortString(buffer, "UTF-8")
val partitionCount = buffer.getInt
val partitions = new Array[PartitionData](partitionCount)
for(i <- 0 until partitions.length)
partitions(i) = PartitionData.readFrom(buffer)
new TopicData(topic, partitions.sortBy(_.partition))
}
def findPartition(data: Array[PartitionData], partition: Int): Option[PartitionData] = {
if(data == null || data.size == 0)
return None
var (low, high) = (0, data.size-1)
while(low <= high) {
val mid = (low + high) / 2
val found = data(mid)
if(found.partition == partition)
return Some(found)
else if(partition < found.partition)
high = mid - 1
else
low = mid + 1
}
None
}
}
case class TopicData(topic: String, partitionData: Array[PartitionData]) {
val sizeInBytes = 2 + topic.length + partitionData.foldLeft(4)(_ + _.sizeInBytes)
}
object FetchResponse {
def readFrom(buffer: ByteBuffer): FetchResponse = {
val versionId = buffer.getShort
val correlationId = buffer.getInt
val dataCount = buffer.getInt
val data = new Array[TopicData](dataCount)
for(i <- 0 until data.length)
data(i) = TopicData.readFrom(buffer)
new FetchResponse(versionId, correlationId, data)
}
}
case class FetchResponse(versionId: Short, correlationId: Int, data: Array[TopicData]) {
val sizeInBytes = 2 + 4 + data.foldLeft(4)(_ + _.sizeInBytes)
lazy val topicMap = data.groupBy(_.topic).mapValues(_.head)
def messageSet(topic: String, partition: Int): ByteBufferMessageSet = {
val messageSet = topicMap.get(topic) match {
case Some(topicData) =>
TopicData.findPartition(topicData.partitionData, partition).map(_.messages).getOrElse(MessageSet.Empty)
case None =>
MessageSet.Empty
}
messageSet.asInstanceOf[ByteBufferMessageSet]
}
}
// SENDS
class PartitionDataSend(val partitionData: PartitionData) extends Send {
private val messageSize = partitionData.messages.sizeInBytes
private var messagesSentSize = 0L
private val buffer = ByteBuffer.allocate(20)
buffer.putInt(partitionData.partition)
buffer.putInt(partitionData.error)
buffer.putLong(partitionData.initialOffset)
buffer.putInt(partitionData.messages.sizeInBytes.intValue())
buffer.rewind()
def complete = !buffer.hasRemaining && messagesSentSize >= messageSize
def writeTo(channel: GatheringByteChannel): Int = {
var written = 0
if(buffer.hasRemaining)
written += channel.write(buffer)
if(!buffer.hasRemaining && messagesSentSize < messageSize) {
val bytesSent = partitionData.messages.writeTo(channel, messagesSentSize, messageSize - messagesSentSize).toInt
messagesSentSize += bytesSent
written += bytesSent
}
written
}
}
class TopicDataSend(val topicData: TopicData) extends Send {
val size = topicData.sizeInBytes
var sent = 0
private val buffer = ByteBuffer.allocate(2 + topicData.topic.length() + 4)
Utils.writeShortString(buffer, topicData.topic, "UTF-8")
buffer.putInt(topicData.partitionData.length)
buffer.rewind()
val sends = new MultiSend(topicData.partitionData.map(new PartitionDataSend(_)).toList) {
val expectedBytesToWrite = topicData.partitionData.foldLeft(0)(_ + _.sizeInBytes)
}
def complete = sent >= size
def writeTo(channel: GatheringByteChannel): Int = {
expectIncomplete()
var written = 0
if(buffer.hasRemaining)
written += channel.write(buffer)
if(!buffer.hasRemaining && !sends.complete) {
written += sends.writeCompletely(channel)
}
sent += written
written
}
}
class FetchResponseSend(val fetchResponse: FetchResponse,
val errorCode: Int = ErrorMapping.NoError) extends Send {
private val size = fetchResponse.sizeInBytes
private var sent = 0
private val buffer = ByteBuffer.allocate(16)
buffer.putInt(size + 2)
buffer.putShort(errorCode.shortValue())
buffer.putShort(fetchResponse.versionId)
buffer.putInt(fetchResponse.correlationId)
buffer.putInt(fetchResponse.data.length)
buffer.rewind()
val sends = new MultiSend(fetchResponse.data.map(new TopicDataSend(_)).toList) {
val expectedBytesToWrite = fetchResponse.data.foldLeft(0)(_ + _.sizeInBytes)
}
def complete = sent >= sendSize
def writeTo(channel: GatheringByteChannel):Int = {
expectIncomplete()
var written = 0
if(buffer.hasRemaining)
written += channel.write(buffer)
if(!buffer.hasRemaining && !sends.complete) {
written += sends.writeCompletely(channel)
}
sent += written
written
}
def sendSize = 4 + 2 + fetchResponse.sizeInBytes
}

View File

@ -1,58 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.api
import java.nio._
import kafka.network._
object MultiFetchRequest {
def readFrom(buffer: ByteBuffer): MultiFetchRequest = {
val count = buffer.getShort
val fetches = new Array[FetchRequest](count)
for(i <- 0 until fetches.length)
fetches(i) = FetchRequest.readFrom(buffer)
new MultiFetchRequest(fetches)
}
}
class MultiFetchRequest(val fetches: Array[FetchRequest]) extends Request(RequestKeys.MultiFetch) {
def writeTo(buffer: ByteBuffer) {
if(fetches.length > Short.MaxValue)
throw new IllegalArgumentException("Number of requests in MultiFetchRequest exceeds " + Short.MaxValue + ".")
buffer.putShort(fetches.length.toShort)
for(fetch <- fetches)
fetch.writeTo(buffer)
}
def sizeInBytes: Int = {
var size = 2
for(fetch <- fetches)
size += fetch.sizeInBytes
size
}
override def toString(): String = {
val buffer = new StringBuffer
for(fetch <- fetches) {
buffer.append(fetch.toString)
buffer.append(",")
}
buffer.toString
}
}

View File

@ -1,52 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.api
import java.nio._
import collection.mutable
import kafka.utils.IteratorTemplate
import kafka.message._
class MultiFetchResponse(val buffer: ByteBuffer, val numSets: Int, val offsets: Array[Long]) extends Iterable[ByteBufferMessageSet] {
private val messageSets = new mutable.ListBuffer[ByteBufferMessageSet]
for(i <- 0 until numSets) {
val size = buffer.getInt()
val errorCode: Int = buffer.getShort()
val copy = buffer.slice()
val payloadSize = size - 2
copy.limit(payloadSize)
buffer.position(buffer.position + payloadSize)
messageSets += new ByteBufferMessageSet(copy, offsets(i), errorCode)
}
def iterator : Iterator[ByteBufferMessageSet] = {
new IteratorTemplate[ByteBufferMessageSet] {
val iter = messageSets.iterator
override def makeNext(): ByteBufferMessageSet = {
if(iter.hasNext)
iter.next
else
return allDone
}
}
}
override def toString() = this.messageSets.toString
}

View File

@ -20,8 +20,7 @@ package kafka.api
object RequestKeys {
val Produce: Short = 0
val Fetch: Short = 1
val MultiFetch: Short = 2
val MultiProduce: Short = 3
val Offsets: Short = 4
val TopicMetadata: Short = 5
val MultiProduce: Short = 2
val Offsets: Short = 3
val TopicMetadata: Short = 4
}

View File

@ -50,8 +50,7 @@ class ConsumerConfig(props: Properties) extends ZKConfig(props) {
/** consumer id: generated automatically if not set.
* Set this explicitly for only testing purpose. */
val consumerId: Option[String] = /** TODO: can be written better in scala 2.8 */
if (Utils.getString(props, "consumerid", null) != null) Some(Utils.getString(props, "consumerid")) else None
val consumerId: Option[String] = Option(Utils.getString(props, "consumerid", null))
/** the socket timeout for network requests */
val socketTimeoutMs = Utils.getInt(props, "socket.timeout.ms", SocketTimeout)

View File

@ -17,13 +17,14 @@
package kafka.consumer
import java.util.concurrent.CountDownLatch
import kafka.common.ErrorMapping
import kafka.cluster.{Partition, Broker}
import kafka.api.{OffsetRequest, FetchRequest}
import org.I0Itec.zkclient.ZkClient
import kafka.utils._
import java.io.IOException
import java.util.concurrent.CountDownLatch
import kafka.api.{FetchRequestBuilder, OffsetRequest}
import kafka.cluster.{Partition, Broker}
import kafka.common.ErrorMapping
import kafka.message.ByteBufferMessageSet
import kafka.utils._
import org.I0Itec.zkclient.ZkClient
class FetcherRunnable(val name: String,
val zkClient : ZkClient,
@ -50,18 +51,26 @@ class FetcherRunnable(val name: String,
info(name + " start fetching topic: " + infopti.topic + " part: " + infopti.partition.partId + " offset: "
+ infopti.getFetchOffset + " from " + broker.host + ":" + broker.port)
var reqId = 0
try {
while (!stopped) {
val fetches = partitionTopicInfos.map(info =>
new FetchRequest(info.topic, info.partition.partId, info.getFetchOffset, config.fetchSize))
// TODO: fix up the max wait and min bytes
val builder = new FetchRequestBuilder().
correlationId(reqId).
clientId(config.consumerId.getOrElse(name)).
maxWait(0).
minBytes(0)
partitionTopicInfos.foreach(pti =>
builder.addFetch(pti.topic, pti.partition.partId, pti.getFetchOffset(), config.fetchSize)
)
trace("fetch request: " + fetches.toString)
val response = simpleConsumer.multifetch(fetches : _*)
val fetchRequest = builder.build()
trace("fetch request: " + fetchRequest)
val response = simpleConsumer.fetch(fetchRequest)
var read = 0L
for((messages, infopti) <- response.zip(partitionTopicInfos)) {
for(infopti <- partitionTopicInfos) {
val messages = response.messageSet(infopti.topic, infopti.partition.partId).asInstanceOf[ByteBufferMessageSet]
try {
var done = false
if(messages.getErrorCode == ErrorMapping.OffsetOutOfRangeCode) {
@ -76,8 +85,7 @@ class FetcherRunnable(val name: String,
}
if (!done)
read += infopti.enqueue(messages, infopti.getFetchOffset)
}
catch {
} catch {
case e1: IOException =>
// something is wrong with the socket, re-throw the exception to stop the fetcher
throw e1
@ -91,6 +99,7 @@ class FetcherRunnable(val name: String,
throw e2
}
}
reqId = if(reqId == Int.MaxValue) 0 else reqId + 1
trace("fetched bytes: " + read)
if(read == 0) {
@ -98,8 +107,7 @@ class FetcherRunnable(val name: String,
Thread.sleep(config.fetcherBackoffMs)
}
}
}
catch {
} catch {
case e =>
if (stopped)
info("FecherRunnable " + this + " interrupted")

View File

@ -20,7 +20,6 @@ package kafka.consumer
import java.net._
import java.nio.channels._
import kafka.api._
import kafka.message._
import kafka.network._
import kafka.utils._
@ -72,7 +71,7 @@ class SimpleConsumer(val host: String,
* @param request specifies the topic name, topic partition, starting byte offset, maximum bytes to be fetched.
* @return a set of fetched messages
*/
def fetch(request: FetchRequest): ByteBufferMessageSet = {
def fetch(request: FetchRequest): FetchResponse = {
lock synchronized {
val startTime = SystemTime.nanoseconds
getOrMakeConnection()
@ -88,51 +87,19 @@ class SimpleConsumer(val host: String,
channel = connect
sendRequest(request)
response = getResponse
}catch {
} catch {
case ioe: java.io.IOException => channel = null; throw ioe;
}
case e => throw e
}
val fetchResponse = FetchResponse.readFrom(response._1.buffer)
val fetchedSize = fetchResponse.sizeInBytes
val endTime = SystemTime.nanoseconds
SimpleConsumerStats.recordFetchRequest(endTime - startTime)
SimpleConsumerStats.recordConsumptionThroughput(response._1.buffer.limit)
new ByteBufferMessageSet(response._1.buffer, request.offset, response._2)
}
}
SimpleConsumerStats.recordConsumptionThroughput(fetchedSize)
/**
* Combine multiple fetch requests in one call.
*
* @param fetches a sequence of fetch requests.
* @return a sequence of fetch responses
*/
def multifetch(fetches: FetchRequest*): MultiFetchResponse = {
lock synchronized {
val startTime = SystemTime.nanoseconds
getOrMakeConnection()
var response: Tuple2[Receive,Int] = null
try {
sendRequest(new MultiFetchRequest(fetches.toArray))
response = getResponse
} catch {
case e : java.io.IOException =>
info("Reconnect in multifetch due to socket error: ", e)
// retry once
try {
channel = connect
sendRequest(new MultiFetchRequest(fetches.toArray))
response = getResponse
}catch {
case ioe: java.io.IOException => channel = null; throw ioe;
}
case e => throw e
}
val endTime = SystemTime.nanoseconds
SimpleConsumerStats.recordFetchRequest(endTime - startTime)
SimpleConsumerStats.recordConsumptionThroughput(response._1.buffer.limit)
// error code will be set on individual messageset inside MultiFetchResponse
new MultiFetchResponse(response._1.buffer, fetches.length, fetches.toArray.map(f => f.offset))
fetchResponse
}
}
@ -158,7 +125,7 @@ class SimpleConsumer(val host: String,
channel = connect
sendRequest(new OffsetRequest(topic, partition, time, maxNumOffsets))
response = getResponse
}catch {
} catch {
case ioe: java.io.IOException => channel = null; throw ioe;
}
}

View File

@ -32,8 +32,7 @@ private[kafka] object TopicCount extends Logging {
case Some(m) => topMap = m.asInstanceOf[Map[String,Int]]
case None => throw new RuntimeException("error constructing TopicCount : " + jsonString)
}
}
catch {
} catch {
case e =>
error("error parsing consumer json string " + jsonString, e)
throw e
@ -46,8 +45,7 @@ private[kafka] object TopicCount extends Logging {
private[kafka] class TopicCount(val consumerIdString: String, val topicCountMap: Map[String, Int]) {
def getConsumerThreadIdsPerTopic()
: Map[String, Set[String]] = {
def getConsumerThreadIdsPerTopic(): Map[String, Set[String]] = {
val consumerThreadIdsPerTopicMap = new mutable.HashMap[String, Set[String]]()
for ((topic, nConsumers) <- topicCountMap) {
val consumerSet = new mutable.HashSet[String]

View File

@ -105,8 +105,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
def this(config: ConsumerConfig) = this(config, true)
def createMessageStreams[T](topicCountMap: Map[String,Int],
decoder: Decoder[T])
def createMessageStreams[T](topicCountMap: Map[String,Int], decoder: Decoder[T])
: Map[String,List[KafkaMessageStream[T]]] = {
consume(topicCountMap, decoder)
}
@ -138,8 +137,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
zkClient.close()
zkClient = null
}
}
catch {
} catch {
case e =>
fatal("error during consumer connector shutdown", e)
}
@ -147,8 +145,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
}
}
def consume[T](topicCountMap: scala.collection.Map[String,Int],
decoder: Decoder[T])
def consume[T](topicCountMap: scala.collection.Map[String,Int], decoder: Decoder[T])
: Map[String,List[KafkaMessageStream[T]]] = {
debug("entering consume ")
if (topicCountMap == null)
@ -159,13 +156,13 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
var consumerUuid : String = null
config.consumerId match {
case Some(consumerId) // for testing only
=> consumerUuid = consumerId
case None // generate unique consumerId automatically
=> val uuid = UUID.randomUUID()
consumerUuid = "%s-%d-%s".format(
InetAddress.getLocalHost.getHostName, System.currentTimeMillis,
uuid.getMostSignificantBits().toHexString.substring(0,8))
case Some(consumerId) => // for testing only
consumerUuid = consumerId
case None => // generate unique consumerId automatically
val uuid = UUID.randomUUID()
consumerUuid = "%s-%d-%s".format( InetAddress.getLocalHost.getHostName,
System.currentTimeMillis,
uuid.getMostSignificantBits().toHexString.substring(0,8) )
}
val consumerIdString = config.groupId + "_" + consumerUuid
val topicCount = new TopicCount(consumerIdString, topicCountMap)
@ -243,8 +240,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
try {
updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + info.partition.name,
newOffset.toString)
}
catch {
} catch {
case t: Throwable =>
// log it and let it go
warn("exception during commitOffsets", t)
@ -321,8 +317,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
ConsumerConfig.SocketBufferSize)
val offsets = simpleConsumer.getOffsetsBefore(topic, partitionId, earliestOrLatest, 1)
producedOffset = offsets(0)
}
catch {
} catch {
case e =>
error("error in earliestOrLatestOffset() ", e)
}
@ -419,8 +414,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
val cluster = getCluster(zkClient)
try {
done = rebalance(cluster)
}
catch {
} catch {
case e =>
/** occasionally, we may hit a ZK exception because the ZK state is changing while we are iterating.
* For example, a ZK node can disappear between the time we get all children and the time we try to get
@ -433,7 +427,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
info("end rebalancing consumer " + consumerIdString + " try #" + i)
if (done) {
return
}else {
} else {
/* Here the cache is at a risk of being stale. To take future rebalancing decisions correctly, we should
* clear the cache */
info("Rebalancing attempt failed. Clearing the cache before the next rebalancing operation is triggered")
@ -529,7 +523,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
oldConsumersPerTopicMap = consumersPerTopicMap
updateFetcher(cluster, kafkaMessageStreams)
true
}else
} else
false
}
@ -611,8 +605,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
createEphemeralPathExpectConflict(zkClient, partitionOwnerPath, consumerThreadId)
info(consumerThreadId + " successfully owned partition " + partition + " for topic " + topic)
true
}
catch {
} catch {
case e: ZkNodeExistsException =>
// The node hasn't been deleted by the original owner. So wait a bit and retry.
info("waiting for the partition ownership to be deleted: " + partition)

View File

@ -0,0 +1,33 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.javaapi
import kafka.api.TopicData
class FetchResponse( val versionId: Short,
val correlationId: Int,
val data: Array[TopicData] ) {
private val underlying = new kafka.api.FetchResponse(versionId, correlationId, data)
def messageSet(topic: String, partition: Int): kafka.javaapi.message.ByteBufferMessageSet = {
import Implicits._
underlying.messageSet(topic, partition)
}
}

View File

@ -28,9 +28,6 @@ private[javaapi] object Implicits extends Logging {
messageSet.getErrorCode)
}
implicit def toMultiFetchResponse(response: kafka.javaapi.MultiFetchResponse): kafka.api.MultiFetchResponse =
response.underlying
implicit def toJavaMultiFetchResponse(response: kafka.api.MultiFetchResponse): kafka.javaapi.MultiFetchResponse =
new kafka.javaapi.MultiFetchResponse(response.buffer, response.numSets, response.offsets)
implicit def toJavaFetchResponse(response: kafka.api.FetchResponse): kafka.javaapi.FetchResponse =
new kafka.javaapi.FetchResponse(response.versionId, response.correlationId, response.data)
}

View File

@ -1,45 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.javaapi
import kafka.utils.IteratorTemplate
import java.nio.ByteBuffer
import message.ByteBufferMessageSet
class MultiFetchResponse(buffer: ByteBuffer, numSets: Int, offsets: Array[Long]) extends java.lang.Iterable[ByteBufferMessageSet] {
val underlyingBuffer = ByteBuffer.wrap(buffer.array)
// this has the side effect of setting the initial position of buffer correctly
val errorCode = underlyingBuffer.getShort
import Implicits._
val underlying = new kafka.api.MultiFetchResponse(underlyingBuffer, numSets, offsets)
override def toString() = underlying.toString
def iterator : java.util.Iterator[ByteBufferMessageSet] = {
new IteratorTemplate[ByteBufferMessageSet] {
val iter = underlying.iterator
override def makeNext(): ByteBufferMessageSet = {
if(iter.hasNext)
iter.next
else
return allDone
}
}
}
}

View File

@ -17,10 +17,9 @@
package kafka.javaapi.consumer
import kafka.utils.threadsafe
import kafka.javaapi.message.ByteBufferMessageSet
import kafka.javaapi.MultiFetchResponse
import kafka.api.FetchRequest
import kafka.javaapi.FetchResponse
import kafka.utils.threadsafe
/**
* A consumer of kafka messages
@ -38,23 +37,11 @@ class SimpleConsumer(val host: String,
* @param request specifies the topic name, topic partition, starting byte offset, maximum bytes to be fetched.
* @return a set of fetched messages
*/
def fetch(request: FetchRequest): ByteBufferMessageSet = {
def fetch(request: FetchRequest): FetchResponse = {
import kafka.javaapi.Implicits._
underlying.fetch(request)
}
/**
* Combine multiple fetch requests in one call.
*
* @param fetches a sequence of fetch requests.
* @return a sequence of fetch responses
*/
def multifetch(fetches: java.util.List[FetchRequest]): MultiFetchResponse = {
import scala.collection.JavaConversions._
import kafka.javaapi.Implicits._
underlying.multifetch(asBuffer(fetches): _*)
}
/**
* Get a list of valid offsets (up to maxSize) before the given time.
* The result is a list of offsets, in descending order.

View File

@ -28,7 +28,7 @@ import kafka.utils._
@nonthreadsafe
private[kafka] class BoundedByteBufferReceive(val maxSize: Int) extends Receive with Logging {
private val sizeBuffer: ByteBuffer = ByteBuffer.allocate(4)
private val sizeBuffer = ByteBuffer.allocate(4)
private var contentBuffer: ByteBuffer = null
def this() = this(Int.MaxValue)
@ -78,12 +78,10 @@ private[kafka] class BoundedByteBufferReceive(val maxSize: Int) extends Receive
var buffer: ByteBuffer = null
try {
buffer = ByteBuffer.allocate(size)
}
catch {
case e: OutOfMemoryError => {
} catch {
case e: OutOfMemoryError =>
error("OOME with size " + size, e)
throw e
}
case e2 =>
throw e2
}

View File

@ -21,8 +21,8 @@ import java.util.concurrent._
object RequestChannel {
val AllDone = new Request(1, 2, null, 0)
case class Request(val processor: Int, requestKey: Any, request: Receive, start: Long)
case class Response(val processor: Int, requestKey: Any, response: Send, start: Long, ellapsed: Long)
case class Request(processor: Int, requestKey: Any, request: Receive, start: Long)
case class Response(processor: Int, requestKey: Any, response: Send, start: Long, elapsed: Long)
}
class RequestChannel(val numProcessors: Int, val queueSize: Int) {

View File

@ -50,7 +50,7 @@ class SocketServerStats(val monitorDurationNs: Long, val time: Time) extends Soc
requestTypeId match {
case r if r == RequestKeys.Produce || r == RequestKeys.MultiProduce =>
produceTimeStats.recordRequestMetric(durationNs)
case r if r == RequestKeys.Fetch || r == RequestKeys.MultiFetch =>
case r if r == RequestKeys.Fetch =>
fetchTimeStats.recordRequestMetric(durationNs)
case _ => /* not collecting; let go */
}

View File

@ -66,14 +66,15 @@ trait Receive extends Transmission {
trait Send extends Transmission {
def writeTo(channel: GatheringByteChannel): Int
def writeCompletely(channel: GatheringByteChannel): Int = {
var written = 0
var totalWritten = 0
while(!complete) {
written = writeTo(channel)
val written = writeTo(channel)
trace(written + " bytes written.")
totalWritten += written
}
written
totalWritten
}
}
@ -99,9 +100,9 @@ abstract class MultiSend[S <: Send](val sends: List[S]) extends Send {
if (current == Nil) {
if (totalWritten != expectedBytesToWrite)
error("mismatch in sending bytes over socket; expected: " + expectedBytesToWrite + " actual: " + totalWritten)
return true
true
} else {
false
}
else
return false
}
}

View File

@ -17,17 +17,17 @@
package kafka.server
import org.apache.log4j.Logger
import kafka.log._
import kafka.network._
import kafka.message._
import java.io.IOException
import java.lang.IllegalStateException
import kafka.admin.{CreateTopicCommand, AdminUtils}
import kafka.api._
import kafka.common.ErrorMapping
import java.io.IOException
import kafka.log._
import kafka.message._
import kafka.network._
import kafka.utils.{SystemTime, Logging}
import collection.mutable.ListBuffer
import kafka.admin.{CreateTopicCommand, AdminUtils}
import java.lang.IllegalStateException
import org.apache.log4j.Logger
import scala.collection.mutable.ListBuffer
/**
* Logic to handle the various Kafka requests
@ -39,13 +39,12 @@ class KafkaApis(val logManager: LogManager) extends Logging {
def handle(receive: Receive): Option[Send] = {
val apiId = receive.buffer.getShort()
apiId match {
case RequestKeys.Produce => handleProducerRequest(receive)
case RequestKeys.Fetch => handleFetchRequest(receive)
case RequestKeys.MultiFetch => handleMultiFetchRequest(receive)
case RequestKeys.MultiProduce => handleMultiProducerRequest(receive)
case RequestKeys.Offsets => handleOffsetRequest(receive)
case RequestKeys.TopicMetadata => handleTopicMetadataRequest(receive)
case _ => throw new IllegalStateException("No mapping found for handler id " + apiId)
case RequestKeys.Produce => handleProducerRequest(receive)
case RequestKeys.Fetch => handleFetchRequest(receive)
case RequestKeys.MultiProduce => handleMultiProducerRequest(receive)
case RequestKeys.Offsets => handleOffsetRequest(receive)
case RequestKeys.TopicMetadata => handleTopicMetadataRequest(receive)
case _ => throw new IllegalStateException("No mapping found for handler id " + apiId)
}
}
@ -92,34 +91,37 @@ class KafkaApis(val logManager: LogManager) extends Logging {
val fetchRequest = FetchRequest.readFrom(request.buffer)
if(requestLogger.isTraceEnabled)
requestLogger.trace("Fetch request " + fetchRequest.toString)
Some(readMessageSet(fetchRequest))
}
def handleMultiFetchRequest(request: Receive): Option[Send] = {
val multiFetchRequest = MultiFetchRequest.readFrom(request.buffer)
if(requestLogger.isTraceEnabled)
requestLogger.trace("Multifetch request")
multiFetchRequest.fetches.foreach(req => requestLogger.trace(req.toString))
var responses = multiFetchRequest.fetches.map(fetch =>
readMessageSet(fetch)).toList
Some(new MultiMessageSetSend(responses))
val fetchedData = new ListBuffer[TopicData]()
var error: Int = ErrorMapping.NoError
for(offsetDetail <- fetchRequest.offsetInfo) {
val info = new ListBuffer[PartitionData]()
val topic = offsetDetail.topic
val (partitions, offsets, fetchSizes) = (offsetDetail.partitions, offsetDetail.offsets, offsetDetail.fetchSizes)
for( (partition, offset, fetchSize) <- (partitions, offsets, fetchSizes).zipped.map((_,_,_)) ) {
val partitionInfo = readMessageSet(topic, partition, offset, fetchSize) match {
case Left(err) => error = err; new PartitionData(partition, err, offset, MessageSet.Empty)
case Right(messages) => new PartitionData(partition, ErrorMapping.NoError, offset, messages)
}
info.append(partitionInfo)
}
fetchedData.append(new TopicData(topic, info.toArray))
}
val response = new FetchResponse(FetchRequest.CurrentVersion, fetchRequest.correlationId, fetchedData.toArray )
Some(new FetchResponseSend(response, error))
}
private def readMessageSet(fetchRequest: FetchRequest): MessageSetSend = {
var response: MessageSetSend = null
private def readMessageSet(topic: String, partition: Int, offset: Long, maxSize: Int): Either[Int, MessageSet] = {
var response: Either[Int, MessageSet] = null
try {
trace("Fetching log segment for topic = " + fetchRequest.topic + " and partition = " + fetchRequest.partition)
val log = logManager.getLog(fetchRequest.topic, fetchRequest.partition)
if (log != null)
response = new MessageSetSend(log.read(fetchRequest.offset, fetchRequest.maxSize))
else
response = new MessageSetSend()
}
catch {
trace("Fetching log segment for topic, partition, offset, size = " + (topic, partition, offset, maxSize))
val log = logManager.getLog(topic, partition)
response = Right(if(log != null) log.read(offset, maxSize) else MessageSet.Empty)
} catch {
case e =>
error("error when processing request " + fetchRequest, e)
response=new MessageSetSend(MessageSet.Empty, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
error("error when processing request " + (topic, partition, offset, maxSize), e)
response = Left(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
}
response
}

View File

@ -35,9 +35,9 @@ class KafkaRequestHandler(val requestChannel: RequestChannel, val handle: (Recei
case Some(send) => {
val resp = new RequestChannel.Response(processor = req.processor,
requestKey = req.requestKey,
response = send,
start = req.start,
ellapsed = -1)
response = send,
start = req.start,
elapsed = -1)
requestChannel.sendResponse(resp)
trace("Processor " + Thread.currentThread.getName + " sent response " + resp)
}

View File

@ -19,7 +19,7 @@ package kafka.tools
import java.net.URI
import joptsimple._
import kafka.api.FetchRequest
import kafka.api.FetchRequestBuilder
import kafka.utils._
import kafka.consumer._
@ -54,6 +54,11 @@ object SimpleConsumerShell extends Logging {
.describedAs("fetchsize")
.ofType(classOf[java.lang.Integer])
.defaultsTo(1000000)
val clientIdOpt = parser.accepts("clientId", "The ID of this client.")
.withOptionalArg
.describedAs("clientId")
.ofType(classOf[String])
.defaultsTo("SimpleConsumerShell")
val printOffsetOpt = parser.accepts("print-offsets", "Print the offsets returned by the iterator")
.withOptionalArg
.describedAs("print offsets")
@ -79,7 +84,8 @@ object SimpleConsumerShell extends Logging {
val topic = options.valueOf(topicOpt)
val partition = options.valueOf(partitionOpt).intValue
val startingOffset = options.valueOf(offsetOpt).longValue
val fetchsize = options.valueOf(fetchsizeOpt).intValue
val fetchSize = options.valueOf(fetchsizeOpt).intValue
val clientId = options.valueOf(clientIdOpt).toString
val printOffsets = if(options.has(printOffsetOpt)) true else false
val printMessages = if(options.has(printMessageOpt)) true else false
@ -87,22 +93,27 @@ object SimpleConsumerShell extends Logging {
val consumer = new SimpleConsumer(url.getHost, url.getPort, 10000, 64*1024)
val thread = Utils.newThread("kafka-consumer", new Runnable() {
def run() {
var reqId = 0
var offset = startingOffset
while(true) {
val fetchRequest = new FetchRequest(topic, partition, offset, fetchsize)
val messageSets = consumer.multifetch(fetchRequest)
for (messages <- messageSets) {
debug("multi fetched " + messages.sizeInBytes + " bytes from offset " + offset)
var consumed = 0
for(messageAndOffset <- messages) {
if(printMessages)
info("consumed: " + Utils.toString(messageAndOffset.message.payload, "UTF-8"))
offset = messageAndOffset.offset
if(printOffsets)
info("next offset = " + offset)
consumed += 1
}
val fetchRequest = new FetchRequestBuilder()
.correlationId(reqId)
.clientId(clientId)
.addFetch(topic, partition, offset, fetchSize)
.build()
val fetchResponse = consumer.fetch(fetchRequest)
val messageSet = fetchResponse.messageSet(topic, partition)
debug("multi fetched " + messageSet.sizeInBytes + " bytes from offset " + offset)
var consumed = 0
for(messageAndOffset <- messageSet) {
if(printMessages)
info("consumed: " + Utils.toString(messageAndOffset.message.payload, "UTF-8"))
offset = messageAndOffset.offset
if(printOffsets)
info("next offset = " + offset)
consumed += 1
}
reqId += 1
}
}
}, false);

View File

@ -17,15 +17,17 @@
package kafka.integration
import kafka.server.KafkaConfig
import org.scalatest.junit.JUnit3Suite
import org.apache.log4j.Logger
import java.util.Properties
import kafka.consumer.SimpleConsumer
import kafka.api.{OffsetRequest, FetchRequest}
import junit.framework.Assert._
import java.util.Properties
import kafka.api.{FetchRequestBuilder, OffsetRequest}
import kafka.consumer.SimpleConsumer
import kafka.server.KafkaConfig
import kafka.utils.TestUtils
import org.apache.log4j.Logger
import org.scalatest.junit.JUnit3Suite
class BackwardsCompatibilityTest extends JUnit3Suite with KafkaServerTestHarness {
val topic = "MagicByte0"
@ -62,9 +64,10 @@ class BackwardsCompatibilityTest extends JUnit3Suite with KafkaServerTestHarness
var messageCount: Int = 0
while(fetchOffset < lastOffset(0)) {
val fetched = simpleConsumer.fetch(new FetchRequest(topic, 0, fetchOffset, 10000))
fetched.foreach(m => fetchOffset = m.offset)
messageCount += fetched.size
val fetched = simpleConsumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, fetchOffset, 10000).build())
val fetchedMessages = fetched.messageSet(topic, 0)
fetchedMessages.foreach(m => fetchOffset = m.offset)
messageCount += fetchedMessages.size
}
assertEquals(100, messageCount)
}

View File

@ -17,15 +17,15 @@
package kafka.integration
import scala.collection._
import kafka.api.{FetchRequestBuilder, ProducerRequest}
import kafka.common.OffsetOutOfRangeException
import kafka.api.{ProducerRequest, FetchRequest}
import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet}
import kafka.server.{KafkaRequestHandler, KafkaServer, KafkaConfig}
import kafka.utils.{TestUtils, Utils}
import kafka.zk.ZooKeeperTestHarness
import org.apache.log4j.{Level, Logger}
import org.scalatest.junit.JUnit3Suite
import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet}
import kafka.zk.ZooKeeperTestHarness
import kafka.utils.{TestUtils, Utils}
import scala.collection._
/**
* End to end tests of the primitive apis against a local server
@ -65,54 +65,60 @@ class LazyInitProducerTest extends JUnit3Suite with ProducerConsumerTestHarness
new Message("hello".getBytes()), new Message("there".getBytes()))
producer.send(topic, sent)
sent.getBuffer.rewind
var fetched: ByteBufferMessageSet = null
while(fetched == null || fetched.validBytes == 0)
fetched = consumer.fetch(new FetchRequest(topic, 0, 0, 10000))
TestUtils.checkEquals(sent.iterator, fetched.iterator)
var fetchedMessage: ByteBufferMessageSet = null
while(fetchedMessage == null || fetchedMessage.validBytes == 0) {
val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
fetchedMessage = fetched.messageSet(topic, 0)
}
TestUtils.checkEquals(sent.iterator, fetchedMessage.iterator)
// send an invalid offset
try {
val fetchedWithError = consumer.fetch(new FetchRequest(topic, 0, -1, 10000))
fetchedWithError.iterator
val fetchedWithError = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, -1, 10000).build())
fetchedWithError.messageSet(topic, 0).iterator
fail("Expected an OffsetOutOfRangeException exception to be thrown")
}
catch {
} catch {
case e: OffsetOutOfRangeException =>
}
}
def testProduceAndMultiFetch() {
// send some messages
val topics = List("test1", "test2", "test3");
// send some messages, with non-ordered topics
val topicOffsets = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
{
val messages = new mutable.HashMap[String, ByteBufferMessageSet]
val fetches = new mutable.ArrayBuffer[FetchRequest]
for(topic <- topics) {
val builder = new FetchRequestBuilder()
for( (topic, offset) <- topicOffsets) {
val set = new ByteBufferMessageSet(NoCompressionCodec,
new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))
messages += topic -> set
producer.send(topic, set)
set.getBuffer.rewind
fetches += new FetchRequest(topic, 0, 0, 10000)
messages += topic -> set
builder.addFetch(topic, offset, 0, 10000)
}
// wait a bit for produced message to be available
Thread.sleep(200)
val response = consumer.multifetch(fetches: _*)
for((topic, resp) <- topics.zip(response.toList))
TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
val request = builder.build()
val response = consumer.fetch(request)
for( (topic, offset) <- topicOffsets) {
val fetched = response.messageSet(topic, offset)
TestUtils.checkEquals(messages(topic).iterator, fetched.iterator)
}
}
{
// send some invalid offsets
val fetches = new mutable.ArrayBuffer[FetchRequest]
for(topic <- topics)
fetches += new FetchRequest(topic, 0, -1, 10000)
val builder = new FetchRequestBuilder()
for( (topic, offset) <- topicOffsets )
builder.addFetch(topic, offset, -1, 10000)
val responses = consumer.multifetch(fetches: _*)
for(resp <- responses) {
val request = builder.build()
val responses = consumer.fetch(request)
for( (topic, offset) <- topicOffsets ) {
try {
resp.iterator
responses.messageSet(topic, offset).iterator
fail("Expected an OffsetOutOfRangeException exception to be thrown")
} catch {
case e: OffsetOutOfRangeException =>
@ -125,14 +131,14 @@ class LazyInitProducerTest extends JUnit3Suite with ProducerConsumerTestHarness
// send some messages
val topics = List("test1", "test2", "test3");
val messages = new mutable.HashMap[String, ByteBufferMessageSet]
val fetches = new mutable.ArrayBuffer[FetchRequest]
val builder = new FetchRequestBuilder()
var produceList: List[ProducerRequest] = Nil
for(topic <- topics) {
val set = new ByteBufferMessageSet(NoCompressionCodec,
new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))
messages += topic -> set
produceList ::= new ProducerRequest(topic, 0, set)
fetches += new FetchRequest(topic, 0, 0, 10000)
builder.addFetch(topic, 0, 0, 10000)
}
producer.multiSend(produceList.toArray)
@ -141,23 +147,26 @@ class LazyInitProducerTest extends JUnit3Suite with ProducerConsumerTestHarness
// wait a bit for produced message to be available
Thread.sleep(200)
val response = consumer.multifetch(fetches: _*)
for((topic, resp) <- topics.zip(response.toList))
TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
val request = builder.build()
val response = consumer.fetch(request)
for(topic <- topics) {
val fetched = response.messageSet(topic, 0)
TestUtils.checkEquals(messages(topic).iterator, fetched.iterator)
}
}
def testMultiProduceResend() {
// send some messages
val topics = List("test1", "test2", "test3");
val messages = new mutable.HashMap[String, ByteBufferMessageSet]
val fetches = new mutable.ArrayBuffer[FetchRequest]
val builder = new FetchRequestBuilder()
var produceList: List[ProducerRequest] = Nil
for(topic <- topics) {
val set = new ByteBufferMessageSet(NoCompressionCodec,
new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))
messages += topic -> set
produceList ::= new ProducerRequest(topic, 0, set)
fetches += new FetchRequest(topic, 0, 0, 10000)
builder.addFetch(topic, 0, 0, 10000)
}
producer.multiSend(produceList.toArray)
@ -169,11 +178,13 @@ class LazyInitProducerTest extends JUnit3Suite with ProducerConsumerTestHarness
// wait a bit for produced message to be available
Thread.sleep(750)
val response = consumer.multifetch(fetches: _*)
for((topic, resp) <- topics.zip(response.toList))
val request = builder.build()
val response = consumer.fetch(request)
for(topic <- topics) {
val topicMessages = response.messageSet(topic, 0)
TestUtils.checkEquals(TestUtils.stackedIterator(messages(topic).map(m => m.message).iterator,
messages(topic).map(m => m.message).iterator),
resp.map(m => m.message).iterator)
// TestUtils.checkEquals(TestUtils.stackedIterator(messages(topic).iterator, messages(topic).iterator), resp.iterator)
topicMessages.iterator.map(_.message))
}
}
}

View File

@ -20,16 +20,14 @@ package kafka.log
import kafka.server.KafkaConfig
import java.io.File
import java.nio.ByteBuffer
import kafka.utils.Utils
import kafka.api.FetchRequest
import kafka.api.FetchRequestBuilder
import kafka.common.InvalidMessageSizeException
import kafka.utils.TestUtils
import kafka.consumer.{ZookeeperConsumerConnector, ConsumerConfig}
import org.scalatest.junit.JUnit3Suite
import kafka.integration.ProducerConsumerTestHarness
import kafka.integration.KafkaServerTestHarness
import org.apache.log4j.{Logger, Level}
import kafka.integration.{KafkaServerTestHarness, ProducerConsumerTestHarness}
import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet}
import kafka.utils.{Utils, TestUtils}
import org.scalatest.junit.JUnit3Suite
import org.apache.log4j.{Logger, Level}
class LogCorruptionTest extends JUnit3Suite with ProducerConsumerTestHarness with KafkaServerTestHarness {
val port = TestUtils.choosePort
@ -65,23 +63,21 @@ class LogCorruptionTest extends JUnit3Suite with ProducerConsumerTestHarness wit
Thread.sleep(500)
// test SimpleConsumer
val messageSet = consumer.fetch(new FetchRequest(topic, partition, 0, 10000))
val response = consumer.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0, 10000).build())
try {
for (msg <- messageSet)
for (msg <- response.messageSet(topic, partition))
fail("shouldn't reach here in SimpleConsumer since log file is corrupted.")
fail("shouldn't reach here in SimpleConsumer since log file is corrupted.")
}
catch {
} catch {
case e: InvalidMessageSizeException => "This is good"
}
val messageSet2 = consumer.fetch(new FetchRequest(topic, partition, 0, 10000))
val response2 = consumer.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0, 10000).build())
try {
for (msg <- messageSet2)
for (msg <- response2.messageSet(topic, partition))
fail("shouldn't reach here in SimpleConsumer since log file is corrupted.")
fail("shouldn't reach here in SimpleConsumer since log file is corrupted.")
}
catch {
} catch {
case e: InvalidMessageSizeException => println("This is good")
}
@ -95,8 +91,7 @@ class LogCorruptionTest extends JUnit3Suite with ProducerConsumerTestHarness wit
for (message <- messageStreams(0))
fail("shouldn't reach here in ZookeeperConsumer since log file is corrupted.")
fail("shouldn't reach here in ZookeeperConsumer since log file is corrupted.")
}
catch {
} catch {
case e: InvalidMessageSizeException => "This is good"
case e: Exception => "This is not bad too !"
}

View File

@ -18,18 +18,19 @@
package kafka.integration
import scala.collection._
import junit.framework.Assert._
import kafka.api.{ProducerRequest, FetchRequest}
import kafka.common.{OffsetOutOfRangeException, InvalidPartitionException}
import kafka.server.{KafkaRequestHandler, KafkaConfig}
import org.apache.log4j.{Level, Logger}
import org.scalatest.junit.JUnit3Suite
import java.io.File
import java.util.Properties
import junit.framework.Assert._
import kafka.common.{ErrorMapping, OffsetOutOfRangeException, InvalidPartitionException}
import kafka.message.{DefaultCompressionCodec, NoCompressionCodec, Message, ByteBufferMessageSet}
import kafka.producer.{ProducerData, Producer, ProducerConfig}
import kafka.serializer.StringDecoder
import kafka.message.{DefaultCompressionCodec, NoCompressionCodec, Message, ByteBufferMessageSet}
import java.io.File
import kafka.server.{KafkaRequestHandler, KafkaConfig}
import kafka.utils.TestUtils
import org.apache.log4j.{Level, Logger}
import org.scalatest.junit.JUnit3Suite
import java.nio.ByteBuffer
import kafka.api.{FetchRequest, FetchRequestBuilder, ProducerRequest}
/**
* End to end tests of the primitive apis against a local server
@ -39,11 +40,28 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
val port = TestUtils.choosePort
val props = TestUtils.createBrokerConfig(0, port)
val config = new KafkaConfig(props) {
override val flushInterval = 1
}
override val flushInterval = 1
}
val configs = List(config)
val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
def testFetchRequestCanProperlySerialize() {
val request = new FetchRequestBuilder()
.correlationId(100)
.clientId("test-client")
.maxWait(10001)
.minBytes(4444)
.addFetch("topic1", 0, 0, 10000)
.addFetch("topic2", 1, 1024, 9999)
.addFetch("topic1", 1, 256, 444)
.build()
val serializedBuffer = ByteBuffer.allocate(request.sizeInBytes)
request.writeTo(serializedBuffer)
serializedBuffer.rewind()
val deserializedRequest = FetchRequest.readFrom(serializedBuffer)
assertEquals(request, deserializedRequest)
}
def testDefaultEncoderProducerAndFetch() {
val topic = "test-topic"
val props = new Properties()
@ -55,10 +73,18 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
stringProducer1.send(new ProducerData[String, String](topic, Array("test-message")))
Thread.sleep(200)
var fetched = consumer.fetch(new FetchRequest(topic, 0, 0, 10000))
assertTrue(fetched.iterator.hasNext)
val request = new FetchRequestBuilder()
.correlationId(100)
.clientId("test-client")
.addFetch(topic, 0, 0, 10000)
.build()
val fetched = consumer.fetch(request)
assertEquals("Returned correlationId doesn't match that in request.", 100, fetched.correlationId)
val fetchedMessageAndOffset = fetched.iterator.next
val messageSet = fetched.messageSet(topic, 0)
assertTrue(messageSet.iterator.hasNext)
val fetchedMessageAndOffset = messageSet.head
val stringDecoder = new StringDecoder
val fetchedStringMessage = stringDecoder.toEvent(fetchedMessageAndOffset.message)
assertEquals("test-message", fetchedStringMessage)
@ -76,10 +102,11 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
stringProducer1.send(new ProducerData[String, String](topic, Array("test-message")))
Thread.sleep(200)
var fetched = consumer.fetch(new FetchRequest(topic, 0, 0, 10000))
assertTrue(fetched.iterator.hasNext)
var fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
val messageSet = fetched.messageSet(topic, 0)
assertTrue(messageSet.iterator.hasNext)
val fetchedMessageAndOffset = fetched.iterator.next
val fetchedMessageAndOffset = messageSet.head
val stringDecoder = new StringDecoder
val fetchedStringMessage = stringDecoder.toEvent(fetchedMessageAndOffset.message)
assertEquals("test-message", fetchedStringMessage)
@ -87,24 +114,27 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
def testProduceAndMultiFetch() {
// send some messages
val topics = List("test1", "test2", "test3");
val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
{
val messages = new mutable.HashMap[String, ByteBufferMessageSet]
val fetches = new mutable.ArrayBuffer[FetchRequest]
for(topic <- topics) {
val builder = new FetchRequestBuilder()
for( (topic, partition) <- topics) {
val set = new ByteBufferMessageSet(NoCompressionCodec,
new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))
messages += topic -> set
producer.send(topic, set)
set.getBuffer.rewind
fetches += new FetchRequest(topic, 0, 0, 10000)
builder.addFetch(topic, partition, 0, 10000)
}
// wait a bit for produced message to be available
Thread.sleep(700)
val response = consumer.multifetch(fetches: _*)
for((topic, resp) <- topics.zip(response.toList))
TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
val request = builder.build()
val response = consumer.fetch(request)
for( (topic, partition) <- topics) {
val fetched = response.messageSet(topic, partition)
TestUtils.checkEquals(messages(topic).iterator, fetched.iterator)
}
}
// temporarily set request handler logger to a higher level
@ -112,34 +142,34 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
{
// send some invalid offsets
val fetches = new mutable.ArrayBuffer[FetchRequest]
for(topic <- topics)
fetches += new FetchRequest(topic, 0, -1, 10000)
val builder = new FetchRequestBuilder()
for( (topic, partition) <- topics)
builder.addFetch(topic, partition, -1, 10000)
try {
val responses = consumer.multifetch(fetches: _*)
for(resp <- responses)
resp.iterator
fail("expect exception")
}
catch {
val request = builder.build()
val response = consumer.fetch(request)
for( (topic, partition) <- topics)
response.messageSet(topic, partition).iterator
fail("Expected exception when fetching message with invalid offset")
} catch {
case e: OffsetOutOfRangeException => "this is good"
}
}
{
// send some invalid partitions
val fetches = new mutable.ArrayBuffer[FetchRequest]
for(topic <- topics)
fetches += new FetchRequest(topic, -1, 0, 10000)
val builder = new FetchRequestBuilder()
for( (topic, partition) <- topics)
builder.addFetch(topic, -1, 0, 10000)
try {
val responses = consumer.multifetch(fetches: _*)
for(resp <- responses)
resp.iterator
fail("expect exception")
}
catch {
val request = builder.build()
val response = consumer.fetch(request)
for( (topic, partition) <- topics)
response.messageSet(topic, -1).iterator
fail("Expected exception when fetching message with invalid partition")
} catch {
case e: InvalidPartitionException => "this is good"
}
}
@ -150,24 +180,27 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
def testProduceAndMultiFetchWithCompression() {
// send some messages
val topics = List("test1", "test2", "test3");
val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
{
val messages = new mutable.HashMap[String, ByteBufferMessageSet]
val fetches = new mutable.ArrayBuffer[FetchRequest]
for(topic <- topics) {
val builder = new FetchRequestBuilder()
for( (topic, partition) <- topics) {
val set = new ByteBufferMessageSet(DefaultCompressionCodec,
new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))
messages += topic -> set
producer.send(topic, set)
set.getBuffer.rewind
fetches += new FetchRequest(topic, 0, 0, 10000)
builder.addFetch(topic, partition, 0, 10000)
}
// wait a bit for produced message to be available
Thread.sleep(200)
val response = consumer.multifetch(fetches: _*)
for((topic, resp) <- topics.zip(response.toList))
TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
val request = builder.build()
val response = consumer.fetch(request)
for( (topic, partition) <- topics) {
val fetched = response.messageSet(topic, partition)
TestUtils.checkEquals(messages(topic).iterator, fetched.iterator)
}
}
// temporarily set request handler logger to a higher level
@ -175,34 +208,34 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
{
// send some invalid offsets
val fetches = new mutable.ArrayBuffer[FetchRequest]
for(topic <- topics)
fetches += new FetchRequest(topic, 0, -1, 10000)
val builder = new FetchRequestBuilder()
for( (topic, partition) <- topics)
builder.addFetch(topic, partition, -1, 10000)
try {
val responses = consumer.multifetch(fetches: _*)
for(resp <- responses)
resp.iterator
fail("expect exception")
}
catch {
val request = builder.build()
val response = consumer.fetch(request)
for( (topic, partition) <- topics)
response.messageSet(topic, partition).iterator
fail("Expected exception when fetching message with invalid offset")
} catch {
case e: OffsetOutOfRangeException => "this is good"
}
}
{
// send some invalid partitions
val fetches = new mutable.ArrayBuffer[FetchRequest]
for(topic <- topics)
fetches += new FetchRequest(topic, -1, 0, 10000)
val builder = new FetchRequestBuilder()
for( (topic, _) <- topics)
builder.addFetch(topic, -1, 0, 10000)
try {
val responses = consumer.multifetch(fetches: _*)
for(resp <- responses)
resp.iterator
fail("expect exception")
}
catch {
val request = builder.build()
val response = consumer.fetch(request)
for( (topic, _) <- topics)
response.messageSet(topic, -1).iterator
fail("Expected exception when fetching message with invalid partition")
} catch {
case e: InvalidPartitionException => "this is good"
}
}
@ -213,16 +246,16 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
def testMultiProduce() {
// send some messages
val topics = List("test1", "test2", "test3");
val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
val messages = new mutable.HashMap[String, ByteBufferMessageSet]
val fetches = new mutable.ArrayBuffer[FetchRequest]
val builder = new FetchRequestBuilder()
var produceList: List[ProducerRequest] = Nil
for(topic <- topics) {
for( (topic, partition) <- topics) {
val set = new ByteBufferMessageSet(NoCompressionCodec,
new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))
messages += topic -> set
produceList ::= new ProducerRequest(topic, 0, set)
fetches += new FetchRequest(topic, 0, 0, 10000)
builder.addFetch(topic, partition, 0, 10000)
}
producer.multiSend(produceList.toArray)
@ -231,23 +264,26 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
// wait a bit for produced message to be available
Thread.sleep(200)
val response = consumer.multifetch(fetches: _*)
for((topic, resp) <- topics.zip(response.toList))
TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
val request = builder.build()
val response = consumer.fetch(request)
for( (topic, partition) <- topics) {
val fetched = response.messageSet(topic, partition)
TestUtils.checkEquals(messages(topic).iterator, fetched.iterator)
}
}
def testMultiProduceWithCompression() {
// send some messages
val topics = List("test1", "test2", "test3");
val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
val messages = new mutable.HashMap[String, ByteBufferMessageSet]
val fetches = new mutable.ArrayBuffer[FetchRequest]
val builder = new FetchRequestBuilder()
var produceList: List[ProducerRequest] = Nil
for(topic <- topics) {
for( (topic, partition) <- topics) {
val set = new ByteBufferMessageSet(DefaultCompressionCodec,
new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))
messages += topic -> set
produceList ::= new ProducerRequest(topic, 0, set)
fetches += new FetchRequest(topic, 0, 0, 10000)
builder.addFetch(topic, partition, 0, 10000)
}
producer.multiSend(produceList.toArray)
@ -256,15 +292,18 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
// wait a bit for produced message to be available
Thread.sleep(200)
val response = consumer.multifetch(fetches: _*)
for((topic, resp) <- topics.zip(response.toList))
TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
val request = builder.build()
val response = consumer.fetch(request)
for( (topic, partition) <- topics) {
val fetched = response.messageSet(topic, 0)
TestUtils.checkEquals(messages(topic).iterator, fetched.iterator)
}
}
def testConsumerNotExistTopic() {
val newTopic = "new-topic"
val messageSetIter = consumer.fetch(new FetchRequest(newTopic, 0, 0, 10000)).iterator
assertTrue(messageSetIter.hasNext == false)
val fetchResponse = consumer.fetch(new FetchRequestBuilder().addFetch(newTopic, 0, 0, 10000).build())
assertFalse(fetchResponse.messageSet(newTopic, 0).iterator.hasNext)
val logFile = new File(config.logDir, newTopic + "-0")
assertTrue(!logFile.exists)
}

View File

@ -18,15 +18,15 @@
package kafka.javaapi.integration
import scala.collection._
import kafka.api.FetchRequest
import kafka.api.FetchRequestBuilder
import kafka.common.{InvalidPartitionException, OffsetOutOfRangeException}
import kafka.javaapi.ProducerRequest
import kafka.javaapi.message.ByteBufferMessageSet
import kafka.message.{DefaultCompressionCodec, NoCompressionCodec, Message}
import kafka.server.{KafkaRequestHandler, KafkaConfig}
import kafka.utils.TestUtils
import org.apache.log4j.{Level, Logger}
import org.scalatest.junit.JUnit3Suite
import kafka.javaapi.message.ByteBufferMessageSet
import kafka.javaapi.ProducerRequest
import kafka.message.{DefaultCompressionCodec, NoCompressionCodec, Message}
import kafka.utils.TestUtils
/**
* End to end tests of the primitive apis against a local server
@ -43,39 +43,42 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
// send some messages
val topic = "test"
// send an empty messageset first
val sent2 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
messages = getMessageList(Seq.empty[Message]: _*))
// send an empty messageset first
val sent2 = new ByteBufferMessageSet(NoCompressionCodec, getMessageList(Seq.empty[Message]: _*))
producer.send(topic, sent2)
Thread.sleep(200)
sent2.getBuffer.rewind
var fetched2 = consumer.fetch(new FetchRequest(topic, 0, 0, 10000))
TestUtils.checkEquals(sent2.iterator, fetched2.iterator)
val fetched2 = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
val fetchedMessage2 = fetched2.messageSet(topic, 0)
TestUtils.checkEquals(sent2.iterator, fetchedMessage2.iterator)
// send some messages
val sent3 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
messages = getMessageList(new Message("hello".getBytes()),
new Message("there".getBytes())))
val sent3 = new ByteBufferMessageSet(NoCompressionCodec,
getMessageList(
new Message("hello".getBytes()),new Message("there".getBytes())))
producer.send(topic, sent3)
Thread.sleep(200)
sent3.getBuffer.rewind
var fetched3: ByteBufferMessageSet = null
while(fetched3 == null || fetched3.validBytes == 0)
fetched3 = consumer.fetch(new FetchRequest(topic, 0, 0, 10000))
TestUtils.checkEquals(sent3.iterator, fetched3.iterator)
var messageSet: ByteBufferMessageSet = null
while(messageSet == null || messageSet.validBytes == 0) {
val fetched3 = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
messageSet = fetched3.messageSet(topic, 0).asInstanceOf[ByteBufferMessageSet]
}
TestUtils.checkEquals(sent3.iterator, messageSet.iterator)
// temporarily set request handler logger to a higher level
requestHandlerLogger.setLevel(Level.FATAL)
// send an invalid offset
try {
val fetchedWithError = consumer.fetch(new FetchRequest(topic, 0, -1, 10000))
fetchedWithError.iterator
fail("expect exception")
}
catch {
val fetchedWithError = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, -1, 10000).build())
val messageWithError = fetchedWithError.messageSet(topic, 0)
messageWithError.iterator
fail("Fetch with invalid offset should throw an exception when iterating over response")
} catch {
case e: OffsetOutOfRangeException => "this is good"
}
@ -87,39 +90,42 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
// send some messages
val topic = "test"
// send an empty messageset first
val sent2 = new ByteBufferMessageSet(compressionCodec = DefaultCompressionCodec,
messages = getMessageList(Seq.empty[Message]: _*))
// send an empty messageset first
val sent2 = new ByteBufferMessageSet(DefaultCompressionCodec, getMessageList(Seq.empty[Message]: _*))
producer.send(topic, sent2)
Thread.sleep(200)
sent2.getBuffer.rewind
var fetched2 = consumer.fetch(new FetchRequest(topic, 0, 0, 10000))
TestUtils.checkEquals(sent2.iterator, fetched2.iterator)
val fetched2 = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
val message2 = fetched2.messageSet(topic, 0)
TestUtils.checkEquals(sent2.iterator, message2.iterator)
// send some messages
val sent3 = new ByteBufferMessageSet(compressionCodec = DefaultCompressionCodec,
messages = getMessageList(new Message("hello".getBytes()),
new Message("there".getBytes())))
val sent3 = new ByteBufferMessageSet( DefaultCompressionCodec,
getMessageList(
new Message("hello".getBytes()),new Message("there".getBytes())))
producer.send(topic, sent3)
Thread.sleep(200)
sent3.getBuffer.rewind
var fetched3: ByteBufferMessageSet = null
while(fetched3 == null || fetched3.validBytes == 0)
fetched3 = consumer.fetch(new FetchRequest(topic, 0, 0, 10000))
TestUtils.checkEquals(sent3.iterator, fetched3.iterator)
var fetchedMessage: ByteBufferMessageSet = null
while(fetchedMessage == null || fetchedMessage.validBytes == 0) {
val fetched3 = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
fetchedMessage = fetched3.messageSet(topic, 0).asInstanceOf[ByteBufferMessageSet]
}
TestUtils.checkEquals(sent3.iterator, fetchedMessage.iterator)
// temporarily set request handler logger to a higher level
requestHandlerLogger.setLevel(Level.FATAL)
// send an invalid offset
try {
val fetchedWithError = consumer.fetch(new FetchRequest(topic, 0, -1, 10000))
fetchedWithError.iterator
fail("expect exception")
}
catch {
val fetchedWithError = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, -1, 10000).build())
val messageWithError = fetchedWithError.messageSet(topic, 0)
messageWithError.iterator
fail("Fetch with invalid offset should throw an exception when iterating over response")
} catch {
case e: OffsetOutOfRangeException => "this is good"
}
@ -129,31 +135,27 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
def testProduceAndMultiFetch() {
// send some messages
val topics = List("test1", "test2", "test3");
val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
{
val messages = new mutable.HashMap[String, ByteBufferMessageSet]
val fetches = new mutable.ArrayBuffer[FetchRequest]
for(topic <- topics) {
val builder = new FetchRequestBuilder()
for( (topic, partition) <- topics) {
val set = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
messages = getMessageList(new Message(("a_" + topic).getBytes),
new Message(("b_" + topic).getBytes)))
messages += topic -> set
producer.send(topic, set)
set.getBuffer.rewind
fetches += new FetchRequest(topic, 0, 0, 10000)
builder.addFetch(topic, partition, 0, 10000)
}
// wait a bit for produced message to be available
Thread.sleep(200)
val response = consumer.multifetch(getFetchRequestList(fetches: _*))
val iter = response.iterator
for(topic <- topics) {
if (iter.hasNext) {
val resp = iter.next
TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
}
else
fail("fewer responses than expected")
val request = builder.build()
val response = consumer.fetch(request)
for( (topic, partition) <- topics) {
val messageSet = response.messageSet(topic, partition)
TestUtils.checkEquals(messages(topic).iterator, messageSet.iterator)
}
}
@ -162,37 +164,41 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
{
// send some invalid offsets
val fetches = new mutable.ArrayBuffer[FetchRequest]
for(topic <- topics)
fetches += new FetchRequest(topic, 0, -1, 10000)
val builder = new FetchRequestBuilder()
for( (topic, partition) <- topics)
builder.addFetch(topic, partition, -1, 10000)
try {
val responses = consumer.multifetch(getFetchRequestList(fetches: _*))
val iter = responses.iterator
while (iter.hasNext)
iter.next.iterator
fail("expect exception")
}
catch {
case e: OffsetOutOfRangeException => "this is good"
val request = builder.build()
val response = consumer.fetch(request)
for( (topic, partition) <- topics) {
try {
val iter = response.messageSet(topic, partition).iterator
while (iter.hasNext)
iter.next
fail("MessageSet for invalid offset should throw exception")
} catch {
case e: OffsetOutOfRangeException => "this is good"
}
}
}
{
// send some invalid partitions
val fetches = new mutable.ArrayBuffer[FetchRequest]
for(topic <- topics)
fetches += new FetchRequest(topic, -1, 0, 10000)
val builder = new FetchRequestBuilder()
for( (topic, _) <- topics)
builder.addFetch(topic, -1, 0, 10000)
try {
val responses = consumer.multifetch(getFetchRequestList(fetches: _*))
val iter = responses.iterator
while (iter.hasNext)
iter.next.iterator
fail("expect exception")
}
catch {
case e: InvalidPartitionException => "this is good"
val request = builder.build()
val response = consumer.fetch(request)
for( (topic, _) <- topics) {
try {
val iter = response.messageSet(topic, -1).iterator
while (iter.hasNext)
iter.next
fail("MessageSet for invalid partition should throw exception")
} catch {
case e: InvalidPartitionException => "this is good"
}
}
}
@ -202,31 +208,31 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
def testProduceAndMultiFetchWithCompression() {
// send some messages
val topics = List("test1", "test2", "test3");
val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
{
val messages = new mutable.HashMap[String, ByteBufferMessageSet]
val fetches = new mutable.ArrayBuffer[FetchRequest]
for(topic <- topics) {
val builder = new FetchRequestBuilder()
for( (topic, partition) <- topics) {
val set = new ByteBufferMessageSet(compressionCodec = DefaultCompressionCodec,
messages = getMessageList(new Message(("a_" + topic).getBytes),
new Message(("b_" + topic).getBytes)))
messages += topic -> set
producer.send(topic, set)
set.getBuffer.rewind
fetches += new FetchRequest(topic, 0, 0, 10000)
builder.addFetch(topic, partition, 0, 10000)
}
// wait a bit for produced message to be available
Thread.sleep(200)
val response = consumer.multifetch(getFetchRequestList(fetches: _*))
val iter = response.iterator
for(topic <- topics) {
val request = builder.build()
val response = consumer.fetch(request)
for( (topic, partition) <- topics) {
val iter = response.messageSet(topic, partition).iterator
if (iter.hasNext) {
val resp = iter.next
TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
}
else
TestUtils.checkEquals(messages(topic).iterator, iter)
} else {
fail("fewer responses than expected")
}
}
}
@ -235,37 +241,41 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
{
// send some invalid offsets
val fetches = new mutable.ArrayBuffer[FetchRequest]
for(topic <- topics)
fetches += new FetchRequest(topic, 0, -1, 10000)
val builder = new FetchRequestBuilder()
for( (topic, partition) <- topics)
builder.addFetch(topic, partition, -1, 10000)
try {
val responses = consumer.multifetch(getFetchRequestList(fetches: _*))
val iter = responses.iterator
while (iter.hasNext)
iter.next.iterator
fail("expect exception")
}
catch {
case e: OffsetOutOfRangeException => "this is good"
val request = builder.build()
val response = consumer.fetch(request)
for( (topic, partition) <- topics) {
try {
val iter = response.messageSet(topic, partition).iterator
while (iter.hasNext)
iter.next
fail("Expected exception when fetching invalid offset")
} catch {
case e: OffsetOutOfRangeException => "this is good"
}
}
}
{
// send some invalid partitions
val fetches = new mutable.ArrayBuffer[FetchRequest]
for(topic <- topics)
fetches += new FetchRequest(topic, -1, 0, 10000)
val builder = new FetchRequestBuilder()
for( (topic, _) <- topics)
builder.addFetch(topic, -1, 0, 10000)
try {
val responses = consumer.multifetch(getFetchRequestList(fetches: _*))
val iter = responses.iterator
while (iter.hasNext)
iter.next.iterator
fail("expect exception")
}
catch {
case e: InvalidPartitionException => "this is good"
val request = builder.build()
val response = consumer.fetch(request)
for( (topic, _) <- topics) {
try {
val iter = response.messageSet(topic, -1).iterator
while (iter.hasNext)
iter.next
fail("Expected exception when fetching invalid partition")
} catch {
case e: InvalidPartitionException => "this is good"
}
}
}
@ -275,79 +285,75 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
def testProduceAndMultiFetchJava() {
// send some messages
val topics = List("test1", "test2", "test3");
val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
{
val messages = new mutable.HashMap[String, ByteBufferMessageSet]
val fetches : java.util.ArrayList[FetchRequest] = new java.util.ArrayList[FetchRequest]
for(topic <- topics) {
val builder = new FetchRequestBuilder()
for( (topic, partition) <- topics) {
val set = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
messages = getMessageList(new Message(("a_" + topic).getBytes),
new Message(("b_" + topic).getBytes)))
messages += topic -> set
producer.send(topic, set)
set.getBuffer.rewind
fetches.add(new FetchRequest(topic, 0, 0, 10000))
builder.addFetch(topic, partition, 0, 10000)
}
// wait a bit for produced message to be available
Thread.sleep(200)
val response = consumer.multifetch(fetches)
val iter = response.iterator
for(topic <- topics) {
val request = builder.build()
val response = consumer.fetch(request)
for( (topic, partition) <- topics) {
val iter = response.messageSet(topic, partition).iterator
if (iter.hasNext) {
val resp = iter.next
TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
}
else
TestUtils.checkEquals(messages(topic).iterator, iter)
} else {
fail("fewer responses than expected")
}
}
}
}
def testProduceAndMultiFetchJavaWithCompression() {
// send some messages
val topics = List("test1", "test2", "test3");
val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
{
val messages = new mutable.HashMap[String, ByteBufferMessageSet]
val fetches : java.util.ArrayList[FetchRequest] = new java.util.ArrayList[FetchRequest]
for(topic <- topics) {
val builder = new FetchRequestBuilder()
for( (topic, partition) <- topics) {
val set = new ByteBufferMessageSet(compressionCodec = DefaultCompressionCodec,
messages = getMessageList(new Message(("a_" + topic).getBytes),
new Message(("b_" + topic).getBytes)))
messages += topic -> set
producer.send(topic, set)
set.getBuffer.rewind
fetches.add(new FetchRequest(topic, 0, 0, 10000))
builder.addFetch(topic, partition, 0, 10000)
}
// wait a bit for produced message to be available
Thread.sleep(200)
val response = consumer.multifetch(fetches)
val iter = response.iterator
for(topic <- topics) {
if (iter.hasNext) {
val resp = iter.next
TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
}
else
fail("fewer responses than expected")
val request = builder.build()
val response = consumer.fetch(request)
for( (topic, partition) <- topics) {
val iter = response.messageSet(topic, partition).iterator
TestUtils.checkEquals(messages(topic).iterator, iter)
}
}
}
def testMultiProduce() {
// send some messages
val topics = List("test1", "test2", "test3");
val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
val messages = new mutable.HashMap[String, ByteBufferMessageSet]
val fetches = new mutable.ArrayBuffer[FetchRequest]
val builder = new FetchRequestBuilder()
var produceList: List[ProducerRequest] = Nil
for(topic <- topics) {
for( (topic, partition) <- topics) {
val set = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
messages = getMessageList(new Message(("a_" + topic).getBytes),
new Message(("b_" + topic).getBytes)))
messages += topic -> set
produceList ::= new ProducerRequest(topic, 0, set)
fetches += new FetchRequest(topic, 0, 0, 10000)
builder.addFetch(topic, partition, 0, 10000)
}
producer.multiSend(produceList.toArray)
@ -356,31 +362,31 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
// wait a bit for produced message to be available
Thread.sleep(200)
val response = consumer.multifetch(getFetchRequestList(fetches: _*))
val iter = response.iterator
for(topic <- topics) {
val request = builder.build()
val response = consumer.fetch(request)
for( (topic, partition) <- topics) {
val iter = response.messageSet(topic, partition).iterator
if (iter.hasNext) {
val resp = iter.next
TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
}
else
TestUtils.checkEquals(messages(topic).iterator, iter)
} else {
fail("fewer responses than expected")
}
}
}
def testMultiProduceWithCompression() {
// send some messages
val topics = List("test1", "test2", "test3");
val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
val messages = new mutable.HashMap[String, ByteBufferMessageSet]
val fetches = new mutable.ArrayBuffer[FetchRequest]
val builder = new FetchRequestBuilder()
var produceList: List[ProducerRequest] = Nil
for(topic <- topics) {
for( (topic, partition) <- topics) {
val set = new ByteBufferMessageSet(compressionCodec = DefaultCompressionCodec,
messages = getMessageList(new Message(("a_" + topic).getBytes),
new Message(("b_" + topic).getBytes)))
messages += topic -> set
produceList ::= new ProducerRequest(topic, 0, set)
fetches += new FetchRequest(topic, 0, 0, 10000)
builder.addFetch(topic, partition, 0, 10000)
}
producer.multiSend(produceList.toArray)
@ -389,15 +395,15 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
// wait a bit for produced message to be available
Thread.sleep(200)
val response = consumer.multifetch(getFetchRequestList(fetches: _*))
val iter = response.iterator
for(topic <- topics) {
val request = builder.build()
val response = consumer.fetch(request)
for( (topic, partition) <- topics) {
val iter = response.messageSet(topic, partition).iterator
if (iter.hasNext) {
val resp = iter.next
TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
}
else
TestUtils.checkEquals(messages(topic).iterator, iter)
} else {
fail("fewer responses than expected")
}
}
}
@ -406,10 +412,4 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
messages.foreach(m => messageList.add(m))
messageList
}
private def getFetchRequestList(fetches: FetchRequest*): java.util.List[FetchRequest] = {
val fetchReqs = new java.util.ArrayList[FetchRequest]()
fetches.foreach(f => fetchReqs.add(f))
fetchReqs
}
}

View File

@ -22,7 +22,6 @@ import kafka.utils._
import kafka.server.{KafkaConfig, KafkaServer}
import junit.framework.Assert._
import java.util.{Random, Properties}
import kafka.api.{FetchRequest, OffsetRequest}
import collection.mutable.WrappedArray
import kafka.consumer.SimpleConsumer
import org.junit.{After, Before, Test}
@ -30,6 +29,7 @@ import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
import org.apache.log4j._
import kafka.zk.ZooKeeperTestHarness
import org.scalatest.junit.JUnit3Suite
import kafka.api.{FetchRequestBuilder, OffsetRequest}
object LogOffsetTest {
val random = new Random()
@ -66,9 +66,8 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness {
@Test
def testEmptyLogs() {
val messageSet: ByteBufferMessageSet = simpleConsumer.fetch(
new FetchRequest("test", 0, 0, 300 * 1024))
assertFalse(messageSet.iterator.hasNext)
val fetchResponse = simpleConsumer.fetch(new FetchRequestBuilder().addFetch("test", 0, 0, 300 * 1024).build())
assertFalse(fetchResponse.messageSet("test", 0).iterator.hasNext)
val name = "test"
val logFile = new File(logDir, name + "-0")
@ -119,9 +118,9 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness {
assertTrue((Array(240L, 216L, 108L, 0L): WrappedArray[Long]) == (consumerOffsets: WrappedArray[Long]))
// try to fetch using latest offset
val messageSet: ByteBufferMessageSet = simpleConsumer.fetch(
new FetchRequest(topic, 0, consumerOffsets.head, 300 * 1024))
assertFalse(messageSet.iterator.hasNext)
val fetchResponse = simpleConsumer.fetch(
new FetchRequestBuilder().addFetch(topic, 0, consumerOffsets.head, 300 * 1024).build())
assertFalse(fetchResponse.messageSet(topic, 0).iterator.hasNext)
}
@Test

View File

@ -17,23 +17,21 @@
package kafka.log4j
import org.apache.log4j.spi.LoggingEvent
import org.apache.log4j.{PropertyConfigurator, Logger}
import java.util.Properties
import java.io.File
import kafka.consumer.SimpleConsumer
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.utils.TestZKUtils
import kafka.zk.EmbeddedZookeeper
import junit.framework.Assert._
import kafka.api.FetchRequest
import kafka.serializer.Encoder
import kafka.api.FetchRequestBuilder
import kafka.consumer.SimpleConsumer
import kafka.message.Message
import kafka.producer.async.MissingConfigException
import kafka.serializer.Encoder
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.utils.{TestUtils, TestZKUtils, Utils, Logging}
import kafka.zk.{EmbeddedZookeeper, ZooKeeperTestHarness}
import org.apache.log4j.spi.LoggingEvent
import org.apache.log4j.{PropertyConfigurator, Logger}
import org.junit.{After, Before, Test}
import org.scalatest.junit.JUnit3Suite
import kafka.zk.ZooKeeperTestHarness
import kafka.utils.{TestUtils, Utils, Logging}
class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
@ -172,10 +170,10 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with
Thread.sleep(2500)
var offset = 0L
val messages = simpleConsumerBl.fetch(new FetchRequest("test-topic", 0, offset, 1024*1024))
val response = simpleConsumerBl.fetch(new FetchRequestBuilder().addFetch("test-topic", 0, offset, 1024*1024).build())
val fetchedMessage = response.messageSet("test-topic", 0)
var count = 0
for(message <- messages) {
for(message <- fetchedMessage) {
count = count + 1
offset += message.offset
}
@ -192,14 +190,16 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with
Thread.sleep(500)
val messages = simpleConsumerZk.fetch(new FetchRequest("test-topic", 0, 0L, 1024*1024))
val response = simpleConsumerZk.fetch(new FetchRequestBuilder().addFetch("test-topic", 0, 0L, 1024*1024).build())
val fetchMessage = response.messageSet("test-topic", 0)
var count = 0
for(message <- messages) {
for(message <- fetchMessage) {
count = count + 1
}
val messagesFromOtherBroker = simpleConsumerBl.fetch(new FetchRequest("test-topic", 0, 0L, 1024*1024))
val response2 = simpleConsumerBl.fetch(new FetchRequestBuilder().addFetch("test-topic", 0, 0L, 1024*1024).build())
val messagesFromOtherBroker = response2.messageSet("test-topic", 0)
for(message <- messagesFromOtherBroker) {
count = count + 1

View File

@ -17,18 +17,18 @@
package kafka.producer
import org.apache.log4j.{Logger, Level}
import kafka.zk.EmbeddedZookeeper
import org.junit.{After, Before, Test}
import junit.framework.Assert._
import org.scalatest.junit.JUnitSuite
import kafka.utils.{TestUtils, TestZKUtils, Utils}
import kafka.api.FetchRequest
import java.util.Properties
import kafka.api.FetchRequestBuilder
import kafka.consumer.SimpleConsumer
import kafka.message.Message
import kafka.serializer.Encoder
import kafka.consumer.SimpleConsumer
import java.util.Properties
import kafka.server.{KafkaRequestHandler, KafkaServer, KafkaConfig}
import kafka.utils.{TestUtils, TestZKUtils, Utils}
import kafka.zk.EmbeddedZookeeper
import org.apache.log4j.{Logger, Level}
import org.junit.{After, Before, Test}
import org.scalatest.junit.JUnitSuite
class ProducerTest extends JUnitSuite {
private val topic = "test-topic"
@ -106,12 +106,14 @@ class ProducerTest extends JUnitSuite {
producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
Thread.sleep(100)
// cross check if brokers got the messages
val messageSet1 = consumer1.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator
assertTrue("Message set should have 1 message", messageSet1.hasNext)
assertEquals(new Message("test1".getBytes), messageSet1.next.message)
val messageSet2 = consumer2.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator
assertTrue("Message set should have 1 message", messageSet2.hasNext)
assertEquals(new Message("test1".getBytes), messageSet2.next.message)
val response1 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
val messageSet1 = response1.messageSet("new-topic", 0)
assertTrue("Message set should have 1 message", messageSet1.iterator.hasNext)
assertEquals(new Message("test1".getBytes), messageSet1.head.message)
val response2 = consumer2.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
val messageSet2 = response2.messageSet("new-topic", 0)
assertTrue("Message set should have 1 message", messageSet2.iterator.hasNext)
assertEquals(new Message("test1".getBytes), messageSet2.head.message)
} catch {
case e: Exception => fail("Not expected", e)
}
@ -142,11 +144,12 @@ class ProducerTest extends JUnitSuite {
producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
Thread.sleep(100)
// cross check if brokers got the messages
val messageSet1 = consumer1.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator
assertTrue("Message set should have 1 message", messageSet1.hasNext)
assertEquals(new Message("test1".getBytes), messageSet1.next.message)
assertTrue("Message set should have another message", messageSet1.hasNext)
assertEquals(new Message("test1".getBytes), messageSet1.next.message)
val response1 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
val messageSet1Iter = response1.messageSet("new-topic", 0).iterator
assertTrue("Message set should have 1 message", messageSet1Iter.hasNext)
assertEquals(new Message("test1".getBytes), messageSet1Iter.next.message)
assertTrue("Message set should have another message", messageSet1Iter.hasNext)
assertEquals(new Message("test1".getBytes), messageSet1Iter.next.message)
} catch {
case e: Exception => fail("Not expected")
}
@ -174,9 +177,10 @@ class ProducerTest extends JUnitSuite {
producer.send(new ProducerData[String, String]("new-topic", "test", Array("test")))
Thread.sleep(100)
// cross check if brokers got the messages
val messageSet1 = consumer2.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator
assertTrue("Message set should have 1 message", messageSet1.hasNext)
assertEquals(new Message("test".getBytes), messageSet1.next.message)
val response1 = consumer2.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
val messageSet1 = response1.messageSet("new-topic", 0)
assertTrue("Message set should have 1 message", messageSet1.iterator.hasNext)
assertEquals(new Message("test".getBytes), messageSet1.head.message)
// shutdown server2
server2.shutdown
@ -197,9 +201,10 @@ class ProducerTest extends JUnitSuite {
Thread.sleep(100)
// cross check if brokers got the messages
val messageSet2 = consumer1.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator
assertTrue("Message set should have 1 message", messageSet2.hasNext)
assertEquals(new Message("test".getBytes), messageSet2.next.message)
val response2 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
val messageSet2 = response2.messageSet("new-topic", 0)
assertTrue("Message set should have 1 message", messageSet2.iterator.hasNext)
assertEquals(new Message("test".getBytes), messageSet2.head.message)
} catch {
case e: Exception => fail("Not expected", e)

View File

@ -17,7 +17,6 @@
package kafka.server
import java.io.File
import kafka.api.FetchRequest
import kafka.producer.{SyncProducer, SyncProducerConfig}
import kafka.consumer.SimpleConsumer
import java.util.Properties
@ -27,6 +26,7 @@ import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet}
import org.scalatest.junit.JUnit3Suite
import kafka.zk.ZooKeeperTestHarness
import kafka.utils.{TestUtils, Utils}
import kafka.api.{FetchResponse, FetchRequestBuilder, FetchRequest}
class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
val port = TestUtils.choosePort
@ -82,11 +82,13 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
server.startup()
// bring the server back again and read the messages
var fetched: ByteBufferMessageSet = null
while(fetched == null || fetched.validBytes == 0)
fetched = consumer.fetch(new FetchRequest(topic, 0, 0, 10000))
TestUtils.checkEquals(sent1.iterator, fetched.iterator)
val newOffset = fetched.validBytes
var fetchedMessage: ByteBufferMessageSet = null
while(fetchedMessage == null || fetchedMessage.validBytes == 0) {
val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
fetchedMessage = fetched.messageSet(topic, 0)
}
TestUtils.checkEquals(sent1.iterator, fetchedMessage.iterator)
val newOffset = fetchedMessage.validBytes
// send some more messages
producer.send(topic, sent2)
@ -94,10 +96,12 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
Thread.sleep(200)
fetched = null
while(fetched == null || fetched.validBytes == 0)
fetched = consumer.fetch(new FetchRequest(topic, 0, newOffset, 10000))
TestUtils.checkEquals(sent2.map(m => m.message).iterator, fetched.map(m => m.message).iterator)
fetchedMessage = null
while(fetchedMessage == null || fetchedMessage.validBytes == 0) {
val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, newOffset, 10000).build())
fetchedMessage = fetched.messageSet(topic, 0)
}
TestUtils.checkEquals(sent2.map(m => m.message).iterator, fetchedMessage.map(m => m.message).iterator)
server.shutdown()
Utils.rm(server.config.logDir)

View File

@ -186,23 +186,21 @@ object TestUtils {
length += 1
assertEquals(expected.next, actual.next)
}
if (expected.hasNext)
{
// check if the expected iterator is longer
if (expected.hasNext) {
var length1 = length;
while (expected.hasNext)
{
while (expected.hasNext) {
expected.next
length1 += 1
}
assertFalse("Iterators have uneven length-- first has more: "+length1 + " > " + length, true);
}
if (actual.hasNext)
{
// check if the actual iterator was longer
if (actual.hasNext) {
var length2 = length;
while (actual.hasNext)
{
while (actual.hasNext) {
actual.next
length2 += 1
}

View File

@ -77,7 +77,7 @@ class ZKLoadBalanceTest extends JUnit3Suite with ZooKeeperTestHarness {
// wait a bit to make sure rebalancing logic is triggered
Thread.sleep(1000)
Thread.sleep(1500)
// check Partition Owner Registry
val actual_3 = getZKChildrenValues(dirs.consumerOwnerDir)
val expected_3 = List( ("200-0", "group1_consumer1-0"),

View File

@ -28,4 +28,5 @@ public interface KafkaProperties
final static int reconnectInterval = 10000;
final static String topic2 = "topic2";
final static String topic3 = "topic3";
final static String clientId = "SimpleConsumerDemoClient";
}

View File

@ -16,71 +16,76 @@
*/
package kafka.examples;
import java.util.ArrayList;
import java.util.List;
import kafka.javaapi.MultiFetchResponse;
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.javaapi.message.MessageSet;
import kafka.message.MessageAndOffset;
import scala.collection.Iterator;
import kafka.api.FetchRequest;
import kafka.message.Message;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class SimpleConsumerDemo
{
private static void printMessages(ByteBufferMessageSet messageSet)
{
public class SimpleConsumerDemo {
private static void printMessages(ByteBufferMessageSet messageSet) {
for (MessageAndOffset messageAndOffset : messageSet) {
System.out.println(ExampleUtils.getMessage(messageAndOffset.message()));
}
}
private static void generateData()
{
private static void generateData() {
Producer producer2 = new Producer(KafkaProperties.topic2);
producer2.start();
Producer producer3 = new Producer(KafkaProperties.topic3);
producer3.start();
try
{
try {
Thread.sleep(1000);
}
catch (InterruptedException e)
{
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args)
{
public static void main(String[] args) {
generateData();
SimpleConsumer simpleConsumer = new SimpleConsumer(KafkaProperties.kafkaServerURL,
KafkaProperties.kafkaServerPort,
KafkaProperties.connectionTimeOut,
KafkaProperties.kafkaProducerBufferSize);
System.out.println("Testing single fetch");
FetchRequest req = new FetchRequest(KafkaProperties.topic2, 0, 0L, 100);
ByteBufferMessageSet messageSet = simpleConsumer.fetch(req);
printMessages(messageSet);
FetchRequest req = new FetchRequestBuilder()
.correlationId(0)
.clientId(KafkaProperties.clientId)
.addFetch(KafkaProperties.topic2, 0, 0L, 100)
.build();
FetchResponse fetchResponse = simpleConsumer.fetch(req);
printMessages((ByteBufferMessageSet) fetchResponse.messageSet(KafkaProperties.topic2, 0));
System.out.println("Testing single multi-fetch");
req = new FetchRequest(KafkaProperties.topic2, 0, 0L, 100);
List<FetchRequest> list = new ArrayList<FetchRequest>();
list.add(req);
req = new FetchRequest(KafkaProperties.topic3, 0, 0L, 100);
list.add(req);
MultiFetchResponse response = simpleConsumer.multifetch(list);
Map<String, List<Integer>> topicMap = new HashMap<String, List<Integer>>() {{
put(KafkaProperties.topic2, new ArrayList<Integer>(){{ add(0); }});
put(KafkaProperties.topic3, new ArrayList<Integer>(){{ add(0); }});
}};
req = new FetchRequestBuilder()
.correlationId(0)
.clientId(KafkaProperties.clientId)
.addFetch(KafkaProperties.topic2, 0, 0L, 100)
.addFetch(KafkaProperties.topic3, 0, 0L, 100)
.build();
fetchResponse = simpleConsumer.fetch(req);
int fetchReq = 0;
for (ByteBufferMessageSet resMessageSet : response )
{
System.out.println("Response from fetch request no: " + ++fetchReq);
printMessages(resMessageSet);
for ( Map.Entry<String, List<Integer>> entry : topicMap.entrySet() ) {
String topic = entry.getKey();
for ( Integer offset : entry.getValue()) {
System.out.println("Response from fetch request no: " + ++fetchReq);
printMessages((ByteBufferMessageSet) fetchResponse.messageSet(topic, offset));
}
}
}
}

View File

@ -18,13 +18,12 @@
package kafka.perf
import java.net.URI
import joptsimple._
import kafka.utils._
import kafka.server._
import kafka.consumer.SimpleConsumer
import org.apache.log4j.Logger
import kafka.api.{OffsetRequest, FetchRequest}
import java.text.SimpleDateFormat
import kafka.api.{FetchRequestBuilder, OffsetRequest}
import kafka.consumer.SimpleConsumer
import kafka.utils._
import org.apache.log4j.Logger
import kafka.message.ByteBufferMessageSet
/**
* Performance test for the simple consumer
@ -56,12 +55,20 @@ object SimpleConsumerPerformance {
var lastReportTime: Long = startMs
var lastBytesRead = 0L
var lastMessagesRead = 0L
var reqId = 0
while(!done) {
val messages = consumer.fetch(new FetchRequest(config.topic, config.partition, offset, config.fetchSize))
// TODO: add in the maxWait and minBytes for performance
val request = new FetchRequestBuilder()
.correlationId(reqId)
.clientId(config.clientId)
.addFetch(config.topic, config.partition, offset, config.fetchSize)
.build()
val fetchResponse = consumer.fetch(request)
var messagesRead = 0
var bytesRead = 0
for(message <- messages) {
val messageSet = fetchResponse.messageSet(config.topic, config.partition)
for (message <- messageSet) {
messagesRead += 1
bytesRead += message.message.payloadSize
}
@ -69,7 +76,8 @@ object SimpleConsumerPerformance {
if(messagesRead == 0 || totalMessagesRead > config.numMessages)
done = true
else
offset += messages.validBytes
// we only did one fetch so we find the offset for the first (head) messageset
offset += messageSet.validBytes
totalBytesRead += bytesRead
totalMessagesRead += messagesRead
@ -89,6 +97,7 @@ object SimpleConsumerPerformance {
lastMessagesRead = totalMessagesRead
consumedInterval = 0
}
reqId += 1
}
val reportTime = System.currentTimeMillis
val elapsed = (reportTime - startMs) / 1000.0
@ -119,6 +128,11 @@ object SimpleConsumerPerformance {
.describedAs("bytes")
.ofType(classOf[java.lang.Integer])
.defaultsTo(1024*1024)
val clientIdOpt = parser.accepts("clientId", "The ID of this client.")
.withOptionalArg
.describedAs("clientId")
.ofType(classOf[String])
.defaultsTo("SimpleConsumerPerformanceClient")
val options = parser.parse(args : _*)
@ -139,5 +153,6 @@ object SimpleConsumerPerformance {
val showDetailedStats = options.has(showDetailedStatsOpt)
val dateFormat = new SimpleDateFormat(options.valueOf(dateFormatOpt))
val hideHeader = options.has(hideHeaderOpt)
val clientId = options.valueOf(clientIdOpt).toString
}
}