KAFKA-544 Store the key given to the producer in the message. Expose this key in the consumer. Patch reviewed by Jun.

git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8@1410055 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Edward Jay Kreps 2012-11-15 22:15:14 +00:00
parent 22e032fdd4
commit f4ccf21d5d
54 changed files with 735 additions and 679 deletions

View File

@ -27,9 +27,9 @@ import kafka.etl.KafkaETLKey;
import kafka.etl.KafkaETLRequest;
import kafka.etl.Props;
import kafka.javaapi.producer.Producer;
import kafka.javaapi.producer.ProducerData;
import kafka.message.Message;
import kafka.producer.ProducerConfig;
import kafka.producer.KeyedMessage;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
@ -81,18 +81,17 @@ public class DataGenerator {
public void run() throws Exception {
List<Message> list = new ArrayList<Message>();
List<KeyedMessage> list = new ArrayList<KeyedMessage>();
for (int i = 0; i < _count; i++) {
Long timestamp = RANDOM.nextLong();
if (timestamp < 0) timestamp = -timestamp;
byte[] bytes = timestamp.toString().getBytes("UTF8");
Message message = new Message(bytes);
list.add(message);
list.add(new KeyedMessage<Integer, Message>(_topic, null, message));
}
// send events
System.out.println(" send " + list.size() + " " + _topic + " count events to " + _uri);
ProducerData<Integer, Message> pd = new ProducerData<Integer, Message>(_topic, null, list);
_producer.send(pd);
_producer.send(list);
// close the producer
_producer.close();

View File

@ -21,7 +21,7 @@ import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import kafka.javaapi.producer.Producer;
import kafka.javaapi.producer.ProducerData;
import kafka.producer.KeyedMessage;
import kafka.message.Message;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
@ -33,7 +33,7 @@ public class KafkaRecordWriter<W extends BytesWritable> extends RecordWriter<Nul
protected Producer<Integer, Message> producer;
protected String topic;
protected List<ProducerData<Integer, Message>> msgList = new LinkedList<ProducerData<Integer, Message>>();
protected List<KeyedMessage<Integer, Message>> msgList = new LinkedList<KeyedMessage<Integer, Message>>();
protected int totalSize = 0;
protected int queueSize;
@ -57,7 +57,7 @@ public class KafkaRecordWriter<W extends BytesWritable> extends RecordWriter<Nul
public void write(NullWritable key, BytesWritable value) throws IOException, InterruptedException
{
Message msg = new Message(value.getBytes());
msgList.add(new ProducerData<Integer, Message>(this.topic, msg));
msgList.add(new KeyedMessage<Integer, Message>(this.topic, msg));
totalSize += msg.size();
// MultiProducerRequest only supports sending up to Short.MAX_VALUE messages in one batch

View File

@ -24,7 +24,7 @@ import java.util.Properties
import java.util.Random
import java.io.PrintStream
import kafka.message._
import kafka.serializer.StringDecoder
import kafka.serializer._
import kafka.utils._
import kafka.metrics.KafkaMetricsReporter
@ -179,7 +179,7 @@ object ConsoleConsumer extends Logging {
val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter]
formatter.init(formatterArgs)
try {
val stream = connector.createMessageStreamsByFilter(filterSpec).get(0)
val stream = connector.createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new DefaultDecoder()).get(0)
val iter = if(maxMessages >= 0)
stream.slice(0, maxMessages)
else
@ -187,7 +187,7 @@ object ConsoleConsumer extends Logging {
for(messageAndTopic <- iter) {
try {
formatter.writeTo(messageAndTopic.message, System.out)
formatter.writeTo(messageAndTopic.key, messageAndTopic.message, System.out)
} catch {
case e =>
if (skipMessageOnError)
@ -251,36 +251,14 @@ object MessageFormatter {
}
trait MessageFormatter {
def writeTo(message: Message, output: PrintStream)
def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream)
def init(props: Properties) {}
def close() {}
}
class DecodedMessageFormatter extends MessageFormatter {
var topicStr: String = _
val decoder = new StringDecoder()
override def init(props: Properties) {
topicStr = props.getProperty("topic")
if (topicStr != null)
topicStr = topicStr + ":"
else
topicStr = ""
}
def writeTo(message: Message, output: PrintStream) {
try {
output.println(topicStr + decoder.toEvent(message) + ":payloadsize:" + message.payloadSize)
} catch {
case e => e.printStackTrace()
}
}
}
class NewlineMessageFormatter extends MessageFormatter {
def writeTo(message: Message, output: PrintStream) {
val payload = message.payload
output.write(payload.array, payload.arrayOffset, payload.limit)
def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) {
output.write(value)
output.write('\n')
}
}
@ -296,8 +274,8 @@ class ChecksumMessageFormatter extends MessageFormatter {
topicStr = ""
}
def writeTo(message: Message, output: PrintStream) {
val chksum = message.checksum
def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) {
val chksum = new Message(value, key).checksum
output.println(topicStr + "checksum:" + chksum)
}
}

View File

@ -19,38 +19,53 @@ package kafka.consumer
import scala.collection._
import kafka.utils.Logging
import kafka.serializer.{DefaultDecoder, Decoder}
import kafka.serializer._
/**
* Main interface for consumer
*/
trait ConsumerConnector {
/**
* Create a list of MessageStreams for each topic.
*
* @param topicCountMap a map of (topic, #streams) pair
* @param decoder Decoder to decode each Message to type T
* @return a map of (topic, list of KafkaStream) pairs.
* The number of items in the list is #streams. Each stream supports
* an iterator over message/metadata pairs.
*/
def createMessageStreams[T](topicCountMap: Map[String,Int],
decoder: Decoder[T] = new DefaultDecoder)
: Map[String,List[KafkaStream[T]]]
def createMessageStreams(topicCountMap: Map[String,Int]): Map[String, List[KafkaStream[Array[Byte],Array[Byte]]]]
/**
* Create a list of MessageStreams for each topic.
*
* @param topicCountMap a map of (topic, #streams) pair
* @param keyDecoder Decoder to decode the key portion of the message
* @param valueDecoder Decoder to decode the value portion of the message
* @return a map of (topic, list of KafkaStream) pairs.
* The number of items in the list is #streams. Each stream supports
* an iterator over message/metadata pairs.
*/
def createMessageStreams[K,V](topicCountMap: Map[String,Int],
keyDecoder: Decoder[K],
valueDecoder: Decoder[V])
: Map[String,List[KafkaStream[K,V]]]
/**
* Create a list of message streams for all topics that match a given filter.
*
* @param topicFilter Either a Whitelist or Blacklist TopicFilter object.
* @param numStreams Number of streams to return
* @param decoder Decoder to decode each Message to type T
* @param keyDecoder Decoder to decode the key portion of the message
* @param valueDecoder Decoder to decode the value portion of the message
* @return a list of KafkaStream each of which provides an
* iterator over message/metadata pairs over allowed topics.
*/
def createMessageStreamsByFilter[T](topicFilter: TopicFilter,
numStreams: Int = 1,
decoder: Decoder[T] = new DefaultDecoder)
: Seq[KafkaStream[T]]
def createMessageStreamsByFilter[K,V](topicFilter: TopicFilter,
numStreams: Int = 1,
keyDecoder: Decoder[K] = new DefaultDecoder(),
valueDecoder: Decoder[V] = new DefaultDecoder())
: Seq[KafkaStream[K,V]]
/**
* Commit the offsets of all broker partitions connected by this connector.

View File

@ -17,7 +17,7 @@
package kafka.consumer
import kafka.utils.{IteratorTemplate, Logging}
import kafka.utils.{IteratorTemplate, Logging, Utils}
import java.util.concurrent.{TimeUnit, BlockingQueue}
import kafka.serializer.Decoder
import java.util.concurrent.atomic.AtomicReference
@ -30,17 +30,18 @@ import kafka.common.{KafkaException, MessageSizeTooLargeException}
* The iterator takes a shutdownCommand object which can be added to the queue to trigger a shutdown
*
*/
class ConsumerIterator[T](private val channel: BlockingQueue[FetchedDataChunk],
consumerTimeoutMs: Int,
private val decoder: Decoder[T],
val enableShallowIterator: Boolean)
extends IteratorTemplate[MessageAndMetadata[T]] with Logging {
class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk],
consumerTimeoutMs: Int,
private val keyDecoder: Decoder[K],
private val valueDecoder: Decoder[V],
val enableShallowIterator: Boolean)
extends IteratorTemplate[MessageAndMetadata[K, V]] with Logging {
private var current: AtomicReference[Iterator[MessageAndOffset]] = new AtomicReference(null)
private var currentTopicInfo:PartitionTopicInfo = null
private var currentTopicInfo: PartitionTopicInfo = null
private var consumedOffset: Long = -1L
override def next(): MessageAndMetadata[T] = {
override def next(): MessageAndMetadata[K, V] = {
val item = super.next()
if(consumedOffset < 0)
throw new KafkaException("Offset returned by the message set is invalid %d".format(consumedOffset))
@ -52,7 +53,7 @@ class ConsumerIterator[T](private val channel: BlockingQueue[FetchedDataChunk],
item
}
protected def makeNext(): MessageAndMetadata[T] = {
protected def makeNext(): MessageAndMetadata[K, V] = {
var currentDataChunk: FetchedDataChunk = null
// if we don't have an iterator, get one
var localCurrent = current.get()
@ -103,7 +104,10 @@ class ConsumerIterator[T](private val channel: BlockingQueue[FetchedDataChunk],
item.message.ensureValid() // validate checksum of message to ensure it is valid
new MessageAndMetadata(decoder.toEvent(item.message), currentTopicInfo.topic)
val keyBuffer = item.message.key
val key = if(keyBuffer == null) null.asInstanceOf[K] else keyDecoder.fromBytes(Utils.readBytes(keyBuffer))
val value = valueDecoder.fromBytes(Utils.readBytes(item.message.payload))
new MessageAndMetadata(key, value, currentTopicInfo.topic, currentTopicInfo.partitionId, item.offset)
}
def clearCurrentChunk() {

View File

@ -22,19 +22,20 @@ import java.util.concurrent.BlockingQueue
import kafka.serializer.Decoder
import kafka.message.MessageAndMetadata
class KafkaStream[T](private val queue: BlockingQueue[FetchedDataChunk],
consumerTimeoutMs: Int,
private val decoder: Decoder[T],
val enableShallowIterator: Boolean)
extends Iterable[MessageAndMetadata[T]] with java.lang.Iterable[MessageAndMetadata[T]] {
class KafkaStream[K,V](private val queue: BlockingQueue[FetchedDataChunk],
consumerTimeoutMs: Int,
private val keyDecoder: Decoder[K],
private val valueDecoder: Decoder[V],
val enableShallowIterator: Boolean)
extends Iterable[MessageAndMetadata[K,V]] with java.lang.Iterable[MessageAndMetadata[K,V]] {
private val iter: ConsumerIterator[T] =
new ConsumerIterator[T](queue, consumerTimeoutMs, decoder, enableShallowIterator)
private val iter: ConsumerIterator[K,V] =
new ConsumerIterator[K,V](queue, consumerTimeoutMs, keyDecoder, valueDecoder, enableShallowIterator)
/**
* Create an iterator over messages in the stream.
*/
def iterator(): ConsumerIterator[T] = iter
def iterator(): ConsumerIterator[K,V] = iter
/**
* This method clears the queue being iterated during the consumer rebalancing. This is mainly

View File

@ -28,13 +28,14 @@ import java.net.InetAddress
import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient}
import org.apache.zookeeper.Watcher.Event.KeeperState
import java.util.UUID
import kafka.serializer.Decoder
import kafka.serializer._
import kafka.utils.ZkUtils._
import kafka.common._
import kafka.client.ClientUtils
import com.yammer.metrics.core.Gauge
import kafka.api.OffsetRequest
import kafka.metrics._
import kafka.producer.ProducerConfig
/**
@ -121,16 +122,22 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
def this(config: ConsumerConfig) = this(config, true)
def createMessageStreams[T](topicCountMap: Map[String,Int], decoder: Decoder[T])
: Map[String,List[KafkaStream[T]]] = {
def createMessageStreams(topicCountMap: Map[String,Int]): Map[String, List[KafkaStream[Array[Byte],Array[Byte]]]] =
createMessageStreams(topicCountMap, new DefaultDecoder(), new DefaultDecoder())
def createMessageStreams[K,V](topicCountMap: Map[String,Int], keyDecoder: Decoder[K], valueDecoder: Decoder[V])
: Map[String, List[KafkaStream[K,V]]] = {
if (messageStreamCreated.getAndSet(true))
throw new RuntimeException(this.getClass.getSimpleName +
" can create message streams at most once")
consume(topicCountMap, decoder)
consume(topicCountMap, keyDecoder, valueDecoder)
}
def createMessageStreamsByFilter[T](topicFilter: TopicFilter, numStreams: Int, decoder: Decoder[T]) = {
val wildcardStreamsHandler = new WildcardStreamsHandler[T](topicFilter, numStreams, decoder)
def createMessageStreamsByFilter[K,V](topicFilter: TopicFilter,
numStreams: Int,
keyDecoder: Decoder[K] = new DefaultDecoder(),
valueDecoder: Decoder[V] = new DefaultDecoder()) = {
val wildcardStreamsHandler = new WildcardStreamsHandler[K,V](topicFilter, numStreams, keyDecoder, valueDecoder)
wildcardStreamsHandler.streams
}
@ -173,8 +180,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
}
}
def consume[T](topicCountMap: scala.collection.Map[String,Int], decoder: Decoder[T])
: Map[String,List[KafkaStream[T]]] = {
def consume[K, V](topicCountMap: scala.collection.Map[String,Int], keyDecoder: Decoder[K], valueDecoder: Decoder[V])
: Map[String,List[KafkaStream[K,V]]] = {
debug("entering consume ")
if (topicCountMap == null)
throw new RuntimeException("topicCountMap is null")
@ -187,8 +194,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
val queuesAndStreams = topicThreadIds.values.map(threadIdSet =>
threadIdSet.map(_ => {
val queue = new LinkedBlockingQueue[FetchedDataChunk](config.maxQueuedChunks)
val stream = new KafkaStream[T](
queue, config.consumerTimeoutMs, decoder, config.enableShallowIterator)
val stream = new KafkaStream[K,V](
queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.enableShallowIterator)
(queue, stream)
})
).flatten.toList
@ -197,7 +204,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
registerConsumerInZK(dirs, consumerIdString, topicCount)
reinitializeConsumer(topicCount, queuesAndStreams)
loadBalancerListener.kafkaMessageAndMetadataStreams.asInstanceOf[Map[String, List[KafkaStream[T]]]]
loadBalancerListener.kafkaMessageAndMetadataStreams.asInstanceOf[Map[String, List[KafkaStream[K,V]]]]
}
// this API is used by unit tests only
@ -293,7 +300,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
}
class ZKRebalancerListener(val group: String, val consumerIdString: String,
val kafkaMessageAndMetadataStreams: mutable.Map[String,List[KafkaStream[_]]])
val kafkaMessageAndMetadataStreams: mutable.Map[String,List[KafkaStream[_,_]]])
extends IZkChildListener {
private var isWatcherTriggered = false
private val lock = new ReentrantLock
@ -473,7 +480,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
}
private def closeFetchersForQueues(cluster: Cluster,
messageStreams: Map[String,List[KafkaStream[_]]],
messageStreams: Map[String,List[KafkaStream[_,_]]],
queuesToBeCleared: Iterable[BlockingQueue[FetchedDataChunk]]) {
val allPartitionInfos = topicRegistry.values.map(p => p.values).flatten
fetcher match {
@ -496,7 +503,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
private def clearFetcherQueues(topicInfos: Iterable[PartitionTopicInfo], cluster: Cluster,
queuesTobeCleared: Iterable[BlockingQueue[FetchedDataChunk]],
messageStreams: Map[String,List[KafkaStream[_]]]) {
messageStreams: Map[String,List[KafkaStream[_,_]]]) {
// Clear all but the currently iterated upon chunk in the consumer thread's queue
queuesTobeCleared.foreach(_.clear)
@ -510,7 +517,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
}
private def closeFetchers(cluster: Cluster, messageStreams: Map[String,List[KafkaStream[_]]],
private def closeFetchers(cluster: Cluster, messageStreams: Map[String,List[KafkaStream[_,_]]],
relevantTopicThreadIdsMap: Map[String, Set[String]]) {
// only clear the fetcher queues for certain topic partitions that *might* no longer be served by this consumer
// after this rebalancing attempt
@ -610,17 +617,17 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
}
}
private def reinitializeConsumer[T](
private def reinitializeConsumer[K,V](
topicCount: TopicCount,
queuesAndStreams: List[(LinkedBlockingQueue[FetchedDataChunk],KafkaStream[T])]) {
queuesAndStreams: List[(LinkedBlockingQueue[FetchedDataChunk],KafkaStream[K,V])]) {
val dirs = new ZKGroupDirs(config.groupId)
// listener to consumer and partition changes
if (loadBalancerListener == null) {
val topicStreamsMap = new mutable.HashMap[String,List[KafkaStream[T]]]
val topicStreamsMap = new mutable.HashMap[String,List[KafkaStream[K,V]]]
loadBalancerListener = new ZKRebalancerListener(
config.groupId, consumerIdString, topicStreamsMap.asInstanceOf[scala.collection.mutable.Map[String, List[KafkaStream[_]]]])
config.groupId, consumerIdString, topicStreamsMap.asInstanceOf[scala.collection.mutable.Map[String, List[KafkaStream[_,_]]]])
}
// register listener for session expired event
@ -690,9 +697,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
loadBalancerListener.syncedRebalance()
}
class WildcardStreamsHandler[T](topicFilter: TopicFilter,
class WildcardStreamsHandler[K,V](topicFilter: TopicFilter,
numStreams: Int,
decoder: Decoder[T])
keyDecoder: Decoder[K],
valueDecoder: Decoder[V])
extends TopicEventHandler[String] {
if (messageStreamCreated.getAndSet(true))
@ -702,8 +710,11 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
private val wildcardQueuesAndStreams = (1 to numStreams)
.map(e => {
val queue = new LinkedBlockingQueue[FetchedDataChunk](config.maxQueuedChunks)
val stream = new KafkaStream[T](
queue, config.consumerTimeoutMs, decoder, config.enableShallowIterator)
val stream = new KafkaStream[K,V](queue,
config.consumerTimeoutMs,
keyDecoder,
valueDecoder,
config.enableShallowIterator)
(queue, stream)
}).toList
@ -760,7 +771,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
reinitializeConsumer(wildcardTopicCount, wildcardQueuesAndStreams)
}
def streams: Seq[KafkaStream[T]] =
def streams: Seq[KafkaStream[K,V]] =
wildcardQueuesAndStreams.map(_._2)
}
}

View File

@ -20,7 +20,6 @@ package kafka.javaapi.consumer;
import kafka.consumer.KafkaStream;
import kafka.consumer.TopicFilter;
import kafka.message.Message;
import kafka.serializer.Decoder;
import java.util.List;
@ -36,10 +35,10 @@ public interface ConsumerConnector {
* The number of items in the list is #streams. Each stream supports
* an iterator over message/metadata pairs.
*/
public <T> Map<String, List<KafkaStream<T>>> createMessageStreams(
Map<String, Integer> topicCountMap, Decoder<T> decoder);
public Map<String, List<KafkaStream<Message>>> createMessageStreams(
Map<String, Integer> topicCountMap);
public <K,V> Map<String, List<KafkaStream<K,V>>>
createMessageStreams(Map<String, Integer> topicCountMap, Decoder<K> keyDecoder, Decoder<V> valueDecoder);
public Map<String, List<KafkaStream<byte[], byte[]>>> createMessageStreams(Map<String, Integer> topicCountMap);
/**
* Create a list of MessageAndTopicStreams containing messages of type T.
@ -47,16 +46,17 @@ public interface ConsumerConnector {
* @param topicFilter a TopicFilter that specifies which topics to
* subscribe to (encapsulates a whitelist or a blacklist).
* @param numStreams the number of message streams to return.
* @param decoder a decoder that converts from Message to T
* @param keyDecoder a decoder that decodes the message key
* @param valueDecoder a decoder that decodes the message itself
* @return a list of KafkaStream. Each stream supports an
* iterator over its MessageAndMetadata elements.
*/
public <T> List<KafkaStream<T>> createMessageStreamsByFilter(
TopicFilter topicFilter, int numStreams, Decoder<T> decoder);
public List<KafkaStream<Message>> createMessageStreamsByFilter(
TopicFilter topicFilter, int numStreams);
public List<KafkaStream<Message>> createMessageStreamsByFilter(
TopicFilter topicFilter);
public <K,V> List<KafkaStream<K,V>>
createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams, Decoder<K> keyDecoder, Decoder<V> valueDecoder);
public List<KafkaStream<byte[], byte[]>> createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams);
public List<KafkaStream<byte[], byte[]>> createMessageStreamsByFilter(TopicFilter topicFilter);
/**
* Commit the offsets of all broker partitions connected by this connector.

View File

@ -17,7 +17,7 @@
package kafka.javaapi.consumer
import kafka.message.Message
import kafka.serializer.{DefaultDecoder, Decoder}
import kafka.serializer._
import kafka.consumer._
import scala.collection.JavaConversions.asList
@ -59,7 +59,7 @@ import scala.collection.JavaConversions.asList
*/
private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
val enableFetcher: Boolean) // for testing only
val enableFetcher: Boolean) // for testing only
extends ConsumerConnector {
private val underlying = new kafka.consumer.ZookeeperConsumerConnector(config, enableFetcher)
@ -67,17 +67,18 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
def this(config: ConsumerConfig) = this(config, true)
// for java client
def createMessageStreams[T](
def createMessageStreams[K,V](
topicCountMap: java.util.Map[String,java.lang.Integer],
decoder: Decoder[T])
: java.util.Map[String,java.util.List[KafkaStream[T]]] = {
keyDecoder: Decoder[K],
valueDecoder: Decoder[V])
: java.util.Map[String,java.util.List[KafkaStream[K,V]]] = {
import scala.collection.JavaConversions._
val scalaTopicCountMap: Map[String, Int] = Map.empty[String, Int] ++ asMap(topicCountMap.asInstanceOf[java.util.Map[String, Int]])
val scalaReturn = underlying.consume(scalaTopicCountMap, decoder)
val ret = new java.util.HashMap[String,java.util.List[KafkaStream[T]]]
val scalaReturn = underlying.consume(scalaTopicCountMap, keyDecoder, valueDecoder)
val ret = new java.util.HashMap[String,java.util.List[KafkaStream[K,V]]]
for ((topic, streams) <- scalaReturn) {
var javaStreamList = new java.util.ArrayList[KafkaStream[T]]
var javaStreamList = new java.util.ArrayList[KafkaStream[K,V]]
for (stream <- streams)
javaStreamList.add(stream)
ret.put(topic, javaStreamList)
@ -85,19 +86,17 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
ret
}
def createMessageStreams(
topicCountMap: java.util.Map[String,java.lang.Integer])
: java.util.Map[String,java.util.List[KafkaStream[Message]]] =
createMessageStreams(topicCountMap, new DefaultDecoder)
def createMessageStreams(topicCountMap: java.util.Map[String,java.lang.Integer]): java.util.Map[String,java.util.List[KafkaStream[Array[Byte],Array[Byte]]]] =
createMessageStreams(topicCountMap, new DefaultDecoder(), new DefaultDecoder())
def createMessageStreamsByFilter[T](topicFilter: TopicFilter, numStreams: Int, decoder: Decoder[T]) =
asList(underlying.createMessageStreamsByFilter(topicFilter, numStreams, decoder))
def createMessageStreamsByFilter[K,V](topicFilter: TopicFilter, numStreams: Int, keyDecoder: Decoder[K], valueDecoder: Decoder[V]) =
asList(underlying.createMessageStreamsByFilter(topicFilter, numStreams, keyDecoder, valueDecoder))
def createMessageStreamsByFilter(topicFilter: TopicFilter, numStreams: Int) =
createMessageStreamsByFilter(topicFilter, numStreams, new DefaultDecoder)
createMessageStreamsByFilter(topicFilter, numStreams, new DefaultDecoder(), new DefaultDecoder())
def createMessageStreamsByFilter(topicFilter: TopicFilter) =
createMessageStreamsByFilter(topicFilter, 1, new DefaultDecoder)
createMessageStreamsByFilter(topicFilter, 1, new DefaultDecoder(), new DefaultDecoder())
def commitOffsets() {
underlying.commitOffsets

View File

@ -18,6 +18,7 @@
package kafka.javaapi.producer
import kafka.producer.ProducerConfig
import kafka.producer.KeyedMessage
class Producer[K,V](private val underlying: kafka.producer.Producer[K,V]) // for testing only
{
@ -27,20 +28,17 @@ class Producer[K,V](private val underlying: kafka.producer.Producer[K,V]) // for
* synchronous or the asynchronous producer
* @param producerData the producer data object that encapsulates the topic, key and message data
*/
def send(producerData: kafka.javaapi.producer.ProducerData[K,V]) {
import collection.JavaConversions._
underlying.send(new kafka.producer.ProducerData[K,V](producerData.getTopic, producerData.getKey,
asBuffer(producerData.getData)))
def send(message: KeyedMessage[K,V]) {
underlying.send(message)
}
/**
* Use this API to send data to multiple topics
* @param producerData list of producer data objects that encapsulate the topic, key and message data
*/
def send(producerData: java.util.List[kafka.javaapi.producer.ProducerData[K,V]]) {
def send(messages: java.util.List[KeyedMessage[K,V]]) {
import collection.JavaConversions._
underlying.send(asBuffer(producerData).map(pd => new kafka.producer.ProducerData[K,V](pd.getTopic, pd.getKey,
asBuffer(pd.getData))): _*)
underlying.send(asBuffer(messages):_*)
}
/**

View File

@ -17,5 +17,5 @@
package kafka.message
case class MessageAndMetadata[T](message: T, topic: String = "")
case class MessageAndMetadata[K, V](key: K, message: V, topic: String, partition: Int, offset: Long)

View File

@ -59,6 +59,14 @@ class BlockingChannel( val host: String,
writeChannel = channel
readChannel = Channels.newChannel(channel.socket().getInputStream)
connected = true
// settings may not match what we requested above
val msg = "Created socket with SO_TIMEOUT = %d (requested %d), SO_RCVBUF = %d (requested %d), SO_SNDBUF = %d (requested %d)."
debug(msg.format(channel.socket.getSoTimeout,
readTimeoutMs,
channel.socket.getReceiveBufferSize,
readBufferSize,
channel.socket.getSendBufferSize,
writeBufferSize))
}
}

View File

@ -111,7 +111,7 @@ object ConsoleProducer {
do {
message = reader.readMessage()
if(message != null)
producer.send(new ProducerData(topic, message))
producer.send(new KeyedMessage(topic, message))
} while(message != null)
}

View File

@ -19,7 +19,9 @@ package kafka.producer
import kafka.utils.Utils
private[kafka] class DefaultPartitioner[T] extends Partitioner[T] {
import kafka.utils._
private class DefaultPartitioner[T](props: VerifiableProperties = null) extends Partitioner[T] {
private val random = new java.util.Random
def partition(key: T, numPartitions: Int): Int = {

View File

@ -87,8 +87,7 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging {
}
else this.layout.format(event)
LogLog.debug("[" + new Date(event.getTimeStamp).toString + "]" + message)
val messageData : ProducerData[String, String] =
new ProducerData[String, String](topic, message)
val messageData = new KeyedMessage[String, String](topic, message)
producer.send(messageData);
}

View File

@ -18,24 +18,13 @@
package kafka.producer
/**
* Represents the data to be sent using the Producer send API
* @param topic the topic under which the message is to be published
* @param key the key used by the partitioner to pick a broker partition
* @param data variable length data to be published as Kafka messages under topic
*/
case class ProducerData[K,V](topic: String,
key: K,
data: Seq[V]) {
* A topic, key, and value
*/
case class KeyedMessage[K, V](val topic: String, val key: K, val message: V) {
if(topic == null)
throw new IllegalArgumentException("Topic cannot be null.")
def this(t: String, d: Seq[V]) = this(topic = t, key = null.asInstanceOf[K], data = d)
def this(topic: String, message: V) = this(topic, null.asInstanceOf[K], message)
def this(t: String, d: V) = this(topic = t, key = null.asInstanceOf[K], data = List(d))
def this(t: String, k: K, d: V) = this(topic = t, key = k, data = List(d))
def getTopic: String = topic
def getKey: K = key
def getData: Seq[V] = data
def hasKey = key != null
}

View File

@ -16,6 +16,13 @@
*/
package kafka.producer
/**
* A partitioner controls the mapping between user-provided keys and kafka partitions. Users can implement a custom
* partitioner to change this mapping.
*
* Implementations will be constructed via reflection and are required to have a constructor that takes a single
* VerifiableProperties instance--this allows passing configuration properties into the partitioner implementation.
*/
trait Partitioner[T] {
/**
* Uses the key to calculate a partition bucket id for routing

View File

@ -33,7 +33,7 @@ extends Logging {
if (config.batchSize > config.queueSize)
throw new InvalidConfigException("Batch size can't be larger than queue size.")
private val queue = new LinkedBlockingQueue[ProducerData[K,V]](config.queueSize)
private val queue = new LinkedBlockingQueue[KeyedMessage[K,V]](config.queueSize)
private val random = new Random
private var sync: Boolean = true
@ -43,9 +43,12 @@ extends Logging {
case "async" =>
sync = false
val asyncProducerID = random.nextInt(Int.MaxValue)
producerSendThread = new ProducerSendThread[K,V]("ProducerSendThread-" + asyncProducerID, queue,
eventHandler, config.queueTime, config.batchSize)
producerSendThread.start
producerSendThread = new ProducerSendThread[K,V]("ProducerSendThread-" + asyncProducerID,
queue,
eventHandler,
config.queueTime,
config.batchSize)
producerSendThread.start()
case _ => throw new InvalidConfigException("Valid values for producer.type are sync/async")
}
@ -54,8 +57,9 @@ extends Logging {
def this(config: ProducerConfig) =
this(config,
new DefaultEventHandler[K,V](config,
Utils.createObject[Partitioner[K]](config.partitionerClass),
Utils.createObject[Encoder[V]](config.serializerClass),
Utils.createObject[Partitioner[K]](config.partitionerClass, config.props),
Utils.createObject[Encoder[V]](config.serializerClass, config.props),
Utils.createObject[Encoder[K]](config.keySerializerClass, config.props),
new ProducerPool(config)))
/**
@ -63,36 +67,36 @@ extends Logging {
* synchronous or the asynchronous producer
* @param producerData the producer data object that encapsulates the topic, key and message data
*/
def send(producerData: ProducerData[K,V]*) {
def send(messages: KeyedMessage[K,V]*) {
if (hasShutdown.get)
throw new ProducerClosedException
recordStats(producerData: _*)
recordStats(messages)
sync match {
case true => eventHandler.handle(producerData)
case false => asyncSend(producerData: _*)
case true => eventHandler.handle(messages)
case false => asyncSend(messages)
}
}
private def recordStats(producerData: ProducerData[K,V]*) {
for (data <- producerData) {
ProducerTopicStat.getProducerTopicStat(data.getTopic).messageRate.mark(data.getData.size)
ProducerTopicStat.getProducerAllTopicStat.messageRate.mark(data.getData.size)
private def recordStats(messages: Seq[KeyedMessage[K,V]]) {
for (message <- messages) {
ProducerTopicStat.getProducerTopicStat(message.topic).messageRate.mark()
ProducerTopicStat.getProducerAllTopicStat.messageRate.mark()
}
}
private def asyncSend(producerData: ProducerData[K,V]*) {
for (data <- producerData) {
private def asyncSend(messages: Seq[KeyedMessage[K,V]]) {
for (message <- messages) {
val added = config.enqueueTimeoutMs match {
case 0 =>
queue.offer(data)
queue.offer(message)
case _ =>
try {
config.enqueueTimeoutMs < 0 match {
case true =>
queue.put(data)
queue.put(message)
true
case _ =>
queue.offer(data, config.enqueueTimeoutMs, TimeUnit.MILLISECONDS)
queue.offer(message, config.enqueueTimeoutMs, TimeUnit.MILLISECONDS)
}
}
catch {
@ -102,10 +106,10 @@ extends Logging {
}
if(!added) {
AsyncProducerStats.droppedMessageRate.mark()
error("Event queue is full of unsent messages, could not send event: " + data.toString)
throw new QueueFullException("Event queue is full of unsent messages, could not send event: " + data.toString)
error("Event queue is full of unsent messages, could not send event: " + message.toString)
throw new QueueFullException("Event queue is full of unsent messages, could not send event: " + message.toString)
}else {
trace("Added to send queue an event: " + data.toString)
trace("Added to send queue an event: " + message.toString)
trace("Remaining queue size: " + queue.remainingCapacity)
}
}

View File

@ -38,6 +38,10 @@ trait AsyncProducerConfig {
/** the number of messages batched at the producer */
val batchSize = props.getInt("batch.size", 200)
/** the serializer class for events */
/** the serializer class for values */
val serializerClass = props.getString("serializer.class", "kafka.serializer.DefaultEncoder")
/** the serializer class for keys (defaults to the same as for values) */
val keySerializerClass = props.getString("key.serializer.class", serializerClass)
}

View File

@ -23,7 +23,7 @@ import kafka.producer._
import kafka.serializer.Encoder
import kafka.utils.{Utils, Logging}
import scala.collection.{Seq, Map}
import scala.collection.mutable.{ListBuffer, HashMap}
import scala.collection.mutable.{ArrayBuffer, HashMap}
import java.util.concurrent.atomic._
import kafka.api.{TopicMetadata, ProducerRequest}
@ -31,6 +31,7 @@ import kafka.api.{TopicMetadata, ProducerRequest}
class DefaultEventHandler[K,V](config: ProducerConfig,
private val partitioner: Partitioner[K],
private val encoder: Encoder[V],
private val keyEncoder: Encoder[K],
private val producerPool: ProducerPool,
private val topicPartitionInfos: HashMap[String, TopicMetadata] = new HashMap[String, TopicMetadata])
extends EventHandler[K,V] with Logging {
@ -41,13 +42,14 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
private val lock = new Object()
def handle(events: Seq[ProducerData[K,V]]) {
def handle(events: Seq[KeyedMessage[K,V]]) {
lock synchronized {
val serializedData = serialize(events)
serializedData.foreach{
pd => val dataSize = pd.data.foldLeft(0)(_ + _.payloadSize)
ProducerTopicStat.getProducerTopicStat(pd.topic).byteRate.mark(dataSize)
ProducerTopicStat.getProducerAllTopicStat.byteRate.mark(dataSize)
keyed =>
val dataSize = keyed.message.payloadSize
ProducerTopicStat.getProducerTopicStat(keyed.topic).byteRate.mark(dataSize)
ProducerTopicStat.getProducerAllTopicStat.byteRate.mark(dataSize)
}
var outstandingProduceRequests = serializedData
var remainingRetries = config.producerRetries + 1
@ -57,7 +59,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
// back off and update the topic metadata cache before attempting another send operation
Thread.sleep(config.producerRetryBackoffMs)
// get topics of the outstanding produce requests and refresh metadata for those
Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.getTopic).toSet))
Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.topic).toSet))
remainingRetries -= 1
ProducerStats.resendRate.mark()
}
@ -70,24 +72,23 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
}
}
private def dispatchSerializedData(messages: Seq[ProducerData[K,Message]]): Seq[ProducerData[K, Message]] = {
private def dispatchSerializedData(messages: Seq[KeyedMessage[K,Message]]): Seq[KeyedMessage[K, Message]] = {
val partitionedDataOpt = partitionAndCollate(messages)
partitionedDataOpt match {
case Some(partitionedData) =>
val failedProduceRequests = new ListBuffer[ProducerData[K,Message]]
val failedProduceRequests = new ArrayBuffer[KeyedMessage[K,Message]]
try {
for ((brokerid, eventsPerBrokerMap) <- partitionedData) {
for ((brokerid, messagesPerBrokerMap) <- partitionedData) {
if (logger.isTraceEnabled)
eventsPerBrokerMap.foreach(partitionAndEvent => trace("Handling event for Topic: %s, Broker: %d, Partitions: %s"
.format(partitionAndEvent._1, brokerid, partitionAndEvent._2)))
val messageSetPerBroker = groupMessagesToSet(eventsPerBrokerMap)
messagesPerBrokerMap.foreach(partitionAndEvent =>
trace("Handling event for Topic: %s, Broker: %d, Partitions: %s".format(partitionAndEvent._1, brokerid, partitionAndEvent._2)))
val messageSetPerBroker = groupMessagesToSet(messagesPerBrokerMap)
val failedTopicPartitions = send(brokerid, messageSetPerBroker)
failedTopicPartitions.foreach(topicPartition => {
eventsPerBrokerMap.get(topicPartition) match {
messagesPerBrokerMap.get(topicPartition) match {
case Some(data) => failedProduceRequests.appendAll(data)
case None => // nothing
}
})
}
@ -100,63 +101,61 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
}
}
def serialize(events: Seq[ProducerData[K,V]]): Seq[ProducerData[K,Message]] = {
val serializedProducerData = new ListBuffer[ProducerData[K,Message]]
events.foreach {e =>
val serializedMessages = new ListBuffer[Message]
for (d <- e.getData) {
try {
serializedMessages += encoder.toMessage(d)
} catch {
case t =>
ProducerStats.serializationErrorRate.mark()
if (isSync)
throw t
else {
// currently, if in async mode, we just log the serialization error. We need to revisit
// this when doing kafka-496
error("Error serializing message ", t)
}
}
def serialize(events: Seq[KeyedMessage[K,V]]): Seq[KeyedMessage[K,Message]] = {
val serializedMessages = new ArrayBuffer[KeyedMessage[K,Message]](events.size)
events.map{e =>
try {
if(e.hasKey)
serializedMessages += KeyedMessage[K,Message](topic = e.topic, key = e.key, message = new Message(key = keyEncoder.toBytes(e.key), bytes = encoder.toBytes(e.message)))
else
serializedMessages += KeyedMessage[K,Message](topic = e.topic, key = null.asInstanceOf[K], message = new Message(bytes = encoder.toBytes(e.message)))
} catch {
case t =>
ProducerStats.serializationErrorRate.mark()
if (isSync) {
throw t
} else {
// currently, if in async mode, we just log the serialization error. We need to revisit
// this when doing kafka-496
error("Error serializing message ", t)
}
}
if (serializedMessages.size > 0)
serializedProducerData += new ProducerData[K,Message](e.getTopic, e.getKey, serializedMessages)
}
serializedProducerData
serializedMessages
}
def partitionAndCollate(events: Seq[ProducerData[K,Message]]): Option[Map[Int, Map[TopicAndPartition, Seq[ProducerData[K,Message]]]]] = {
val ret = new HashMap[Int, Map[TopicAndPartition, Seq[ProducerData[K,Message]]]]
def partitionAndCollate(messages: Seq[KeyedMessage[K,Message]]): Option[Map[Int, Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]] = {
val ret = new HashMap[Int, Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]
try {
for (event <- events) {
val topicPartitionsList = getPartitionListForTopic(event)
for (message <- messages) {
val topicPartitionsList = getPartitionListForTopic(message)
val totalNumPartitions = topicPartitionsList.length
val partitionIndex = getPartition(event.getKey, totalNumPartitions)
val partitionIndex = getPartition(message.key, totalNumPartitions)
val brokerPartition = topicPartitionsList(partitionIndex)
// postpone the failure until the send operation, so that requests for other brokers are handled correctly
val leaderBrokerId = brokerPartition.leaderBrokerIdOpt.getOrElse(-1)
var dataPerBroker: HashMap[TopicAndPartition, Seq[ProducerData[K,Message]]] = null
var dataPerBroker: HashMap[TopicAndPartition, Seq[KeyedMessage[K,Message]]] = null
ret.get(leaderBrokerId) match {
case Some(element) =>
dataPerBroker = element.asInstanceOf[HashMap[TopicAndPartition, Seq[ProducerData[K,Message]]]]
dataPerBroker = element.asInstanceOf[HashMap[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]
case None =>
dataPerBroker = new HashMap[TopicAndPartition, Seq[ProducerData[K,Message]]]
dataPerBroker = new HashMap[TopicAndPartition, Seq[KeyedMessage[K,Message]]]
ret.put(leaderBrokerId, dataPerBroker)
}
val topicAndPartition = TopicAndPartition(event.getTopic, brokerPartition.partitionId)
var dataPerTopicPartition: ListBuffer[ProducerData[K,Message]] = null
val topicAndPartition = TopicAndPartition(message.topic, brokerPartition.partitionId)
var dataPerTopicPartition: ArrayBuffer[KeyedMessage[K,Message]] = null
dataPerBroker.get(topicAndPartition) match {
case Some(element) =>
dataPerTopicPartition = element.asInstanceOf[ListBuffer[ProducerData[K,Message]]]
dataPerTopicPartition = element.asInstanceOf[ArrayBuffer[KeyedMessage[K,Message]]]
case None =>
dataPerTopicPartition = new ListBuffer[ProducerData[K,Message]]
dataPerTopicPartition = new ArrayBuffer[KeyedMessage[K,Message]]
dataPerBroker.put(topicAndPartition, dataPerTopicPartition)
}
dataPerTopicPartition.append(event)
dataPerTopicPartition.append(message)
}
Some(ret)
}catch { // Swallow recoverable exceptions and return None so that they can be retried.
@ -166,13 +165,14 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
}
}
private def getPartitionListForTopic(pd: ProducerData[K,Message]): Seq[PartitionAndLeader] = {
debug("Getting the number of broker partitions registered for topic: " + pd.getTopic)
val topicPartitionsList = brokerPartitionInfo.getBrokerPartitionInfo(pd.getTopic)
private def getPartitionListForTopic(m: KeyedMessage[K,Message]): Seq[PartitionAndLeader] = {
debug("Getting the number of broker partitions registered for topic: " + m.topic)
val topicPartitionsList = brokerPartitionInfo.getBrokerPartitionInfo(m.topic)
debug("Broker partitions registered for topic: %s are %s"
.format(pd.getTopic, topicPartitionsList.map(p => p.partitionId).mkString(",")))
.format(m.topic, topicPartitionsList.map(p => p.partitionId).mkString(",")))
val totalNumPartitions = topicPartitionsList.length
if(totalNumPartitions == 0) throw new NoBrokersForPartitionException("Partition = " + pd.getKey)
if(totalNumPartitions == 0)
throw new NoBrokersForPartitionException("Partition key = " + m.key)
topicPartitionsList
}
@ -236,7 +236,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
}
}
private def groupMessagesToSet(eventsPerTopicAndPartition: Map[TopicAndPartition, Seq[ProducerData[K,Message]]]) = {
private def groupMessagesToSet(messagesPerTopicAndPartition: Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]) = {
/** enforce the compressed.topics config here.
* If the compression codec is anything other than NoCompressionCodec,
* Enable compression only for specified topics if any
@ -244,32 +244,29 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
* If the compression codec is NoCompressionCodec, compression is disabled for all topics
*/
val messagesPerTopicPartition = eventsPerTopicAndPartition.map { e =>
val topicAndPartition = e._1
val produceData = e._2
val messages = new ListBuffer[Message]
produceData.foreach(p => messages.appendAll(p.getData))
val messagesPerTopicPartition = messagesPerTopicAndPartition.map { case (topicAndPartition, messages) =>
val rawMessages = messages.map(_.message)
( topicAndPartition,
config.compressionCodec match {
case NoCompressionCodec =>
trace("Sending %d messages with no compression to %s".format(messages.size, topicAndPartition))
new ByteBufferMessageSet(NoCompressionCodec, messages: _*)
new ByteBufferMessageSet(NoCompressionCodec, rawMessages: _*)
case _ =>
config.compressedTopics.size match {
case 0 =>
trace("Sending %d messages with compression codec %d to %s"
.format(messages.size, config.compressionCodec.codec, topicAndPartition))
new ByteBufferMessageSet(config.compressionCodec, messages: _*)
new ByteBufferMessageSet(config.compressionCodec, rawMessages: _*)
case _ =>
if(config.compressedTopics.contains(topicAndPartition.topic)) {
trace("Sending %d messages with compression codec %d to %s"
.format(messages.size, config.compressionCodec.codec, topicAndPartition))
new ByteBufferMessageSet(config.compressionCodec, messages: _*)
new ByteBufferMessageSet(config.compressionCodec, rawMessages: _*)
}
else {
trace("Sending %d messages to %s with no compression as it is not in compressed.topics - %s"
.format(messages.size, topicAndPartition, config.compressedTopics.toString))
new ByteBufferMessageSet(NoCompressionCodec, messages: _*)
new ByteBufferMessageSet(NoCompressionCodec, rawMessages: _*)
}
}
}

View File

@ -16,7 +16,7 @@
*/
package kafka.producer.async
import kafka.producer.ProducerData
import kafka.producer.KeyedMessage
/**
* Handler that dispatches the batched data from the queue.
@ -27,7 +27,7 @@ trait EventHandler[K,V] {
* Callback to dispatch the batched data and send it to a Kafka server
* @param events the data sent to the producer
*/
def handle(events: Seq[ProducerData[K,V]])
def handle(events: Seq[KeyedMessage[K,V]])
/**
* Cleans up and shuts down the event handler

View File

@ -20,28 +20,25 @@ package kafka.producer.async
import kafka.utils.{SystemTime, Logging}
import java.util.concurrent.{TimeUnit, CountDownLatch, BlockingQueue}
import collection.mutable.ListBuffer
import kafka.producer.ProducerData
import kafka.producer.KeyedMessage
import kafka.metrics.KafkaMetricsGroup
import com.yammer.metrics.core.Gauge
class ProducerSendThread[K,V](val threadName: String,
val queue: BlockingQueue[ProducerData[K,V]],
val queue: BlockingQueue[KeyedMessage[K,V]],
val handler: EventHandler[K,V],
val queueTime: Long,
val batchSize: Int) extends Thread(threadName) with Logging with KafkaMetricsGroup {
private val shutdownLatch = new CountDownLatch(1)
private val shutdownCommand = new ProducerData[K,V](null, null.asInstanceOf[K], null.asInstanceOf[Seq[V]])
private val shutdownCommand = new KeyedMessage[K,V]("shutdown", null.asInstanceOf[K], null.asInstanceOf[V])
newGauge(
"ProducerQueueSize-" + getId,
new Gauge[Int] {
def getValue = queue.size
}
)
newGauge("ProducerQueueSize-" + getId,
new Gauge[Int] {
def getValue = queue.size
})
override def run {
try {
processEvents
}catch {
@ -60,7 +57,7 @@ class ProducerSendThread[K,V](val threadName: String,
private def processEvents() {
var lastSend = SystemTime.milliseconds
var events = new ListBuffer[ProducerData[K,V]]
var events = new ListBuffer[KeyedMessage[K,V]]
var full: Boolean = false
// drain the queue until you get a shutdown command
@ -72,12 +69,8 @@ class ProducerSendThread[K,V](val threadName: String,
// returns a null object
val expired = currentQueueItem == null
if(currentQueueItem != null) {
if(currentQueueItem.getKey == null)
trace("Dequeued item for topic %s, no partition key, data: %s"
.format(currentQueueItem.getTopic, currentQueueItem.getData.toString))
else
trace("Dequeued item for topic %s, partition key: %s, data: %s"
.format(currentQueueItem.getTopic, currentQueueItem.getKey.toString, currentQueueItem.getData.toString))
trace("Dequeued item for topic %s, partition key: %s, data: %s"
.format(currentQueueItem.topic, currentQueueItem.key, currentQueueItem.message))
events += currentQueueItem
}
@ -85,12 +78,14 @@ class ProducerSendThread[K,V](val threadName: String,
full = events.size >= batchSize
if(full || expired) {
if(expired) debug(elapsed + " ms elapsed. Queue time reached. Sending..")
if(full) debug("Batch full. Sending..")
if(expired)
debug(elapsed + " ms elapsed. Queue time reached. Sending..")
if(full)
debug("Batch full. Sending..")
// if either queue time has reached or batch size has reached, dispatch to event handler
tryToHandle(events)
lastSend = SystemTime.milliseconds
events = new ListBuffer[ProducerData[K,V]]
events = new ListBuffer[KeyedMessage[K,V]]
}
}
// send the last batch of events
@ -100,7 +95,7 @@ class ProducerSendThread[K,V](val threadName: String,
.format(queue.size))
}
def tryToHandle(events: Seq[ProducerData[K,V]]) {
def tryToHandle(events: Seq[KeyedMessage[K,V]]) {
val size = events.size
try {
debug("Handling " + size + " events")

View File

@ -17,21 +17,44 @@
package kafka.serializer
import kafka.message.Message
import kafka.message._
import kafka.utils.VerifiableProperties
/**
* A decoder is a method of turning byte arrays into objects.
* An implementation is required to provide a constructor that
* takes a VerifiableProperties instance.
*/
trait Decoder[T] {
def toEvent(message: Message):T
def fromBytes(bytes: Array[Byte]): T
}
class DefaultDecoder extends Decoder[Message] {
def toEvent(message: Message):Message = message
/**
* The default implementation does nothing, just returns the same byte array it takes in.
*/
class DefaultDecoder(props: VerifiableProperties = null) extends Decoder[Array[Byte]] {
def fromBytes(bytes: Array[Byte]): Array[Byte] = bytes
}
class StringDecoder extends Decoder[String] {
def toEvent(message: Message):String = {
val buf = message.payload
val arr = new Array[Byte](buf.remaining)
buf.get(arr)
new String(arr)
/**
* Decode messages without any key
*/
class KeylessMessageDecoder(props: VerifiableProperties = null) extends Decoder[Message] {
def fromBytes(bytes: Array[Byte]) = new Message(bytes)
}
/**
* The string encoder translates strings into bytes. It uses UTF8 by default but takes
* an optional property serializer.encoding to control this.
*/
class StringDecoder(props: VerifiableProperties = null) extends Decoder[String] {
val encoding =
if(props == null)
"UTF8"
else
props.getString("serializer.encoding", "UTF8")
def fromBytes(bytes: Array[Byte]): String = {
new String(bytes, encoding)
}
}

View File

@ -17,16 +17,44 @@
package kafka.serializer
import kafka.message.Message
import kafka.utils.VerifiableProperties
import kafka.message._
import kafka.utils.Utils
/**
* An encoder is a method of turning objects into byte arrays.
* An implementation is required to provide a constructor that
* takes a VerifiableProperties instance.
*/
trait Encoder[T] {
def toMessage(event: T):Message
def toBytes(t: T): Array[Byte]
}
class DefaultEncoder extends Encoder[Message] {
override def toMessage(event: Message):Message = event
/**
* The default implementation is a no-op, it just returns the same array it takes in
*/
class DefaultEncoder(props: VerifiableProperties = null) extends Encoder[Array[Byte]] {
override def toBytes(value: Array[Byte]): Array[Byte] = value
}
class StringEncoder extends Encoder[String] {
override def toMessage(event: String):Message = new Message(event.getBytes)
class NullEncoder[T](props: VerifiableProperties = null) extends Encoder[T] {
override def toBytes(value: T): Array[Byte] = null
}
/**
* The string encoder takes an optional parameter serializer.encoding which controls
* the character set used in encoding the string into bytes.
*/
class StringEncoder(props: VerifiableProperties = null) extends Encoder[String] {
val encoding =
if(props == null)
"UTF8"
else
props.getString("serializer.encoding", "UTF8")
override def toBytes(s: String): Array[Byte] =
if(s == null)
null
else
s.getBytes(encoding)
}

View File

@ -19,8 +19,8 @@ package kafka.tools;
import joptsimple.*;
import kafka.javaapi.producer.Producer;
import kafka.javaapi.producer.ProducerData;
import kafka.message.Message;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.utils.Utils;
import scala.collection.Iterator;
@ -282,7 +282,7 @@ public class KafkaMigrationTool
((ByteBuffer)payload_07).get(bytes);
Message message_08 = new Message(bytes);
logger.debug(String.format("Send kafka 08 message of size %d to topic %s", message_08.size(), topic));
ProducerData<String, Message> producerData = new ProducerData((String)topic, message_08);
KeyedMessage<String, Message> producerData = new KeyedMessage((String)topic, null, message_08);
Producer nextProducer = producerCircularIterator.next();
nextProducer.send(producerData);
}

View File

@ -20,10 +20,11 @@ package kafka.tools
import kafka.message.Message
import joptsimple.OptionParser
import kafka.utils.{Utils, CommandLineUtils, Logging}
import kafka.producer.{ProducerData, ProducerConfig, Producer}
import kafka.producer.{KeyedMessage, ProducerConfig, Producer}
import scala.collection.JavaConversions._
import java.util.concurrent.CountDownLatch
import kafka.consumer._
import kafka.serializer._
object MirrorMaker extends Logging {
@ -92,7 +93,7 @@ object MirrorMaker extends Logging {
val producers = (1 to options.valueOf(numProducersOpt).intValue()).map(_ => {
val config = new ProducerConfig(
Utils.loadProps(options.valueOf(producerConfigOpt)))
new Producer[Null, Message](config)
new Producer[Array[Byte], Array[Byte]](config)
})
val threads = {
@ -113,11 +114,9 @@ object MirrorMaker extends Logging {
new Blacklist(options.valueOf(blacklistOpt))
val streams =
connectors.map(_.createMessageStreamsByFilter(filterSpec, numStreams.intValue()))
connectors.map(_.createMessageStreamsByFilter(filterSpec, numStreams.intValue(), new DefaultDecoder(), new DefaultDecoder()))
streams.flatten.zipWithIndex.map(streamAndIndex => {
new MirrorMakerThread(streamAndIndex._1, producers, streamAndIndex._2)
})
streams.flatten.zipWithIndex.map(streamAndIndex => new MirrorMakerThread(streamAndIndex._1, producers, streamAndIndex._2))
}
threads.foreach(_.start())
@ -125,8 +124,8 @@ object MirrorMaker extends Logging {
threads.foreach(_.awaitShutdown())
}
class MirrorMakerThread(stream: KafkaStream[Message],
producers: Seq[Producer[Null, Message]],
class MirrorMakerThread(stream: KafkaStream[Array[Byte], Array[Byte]],
producers: Seq[Producer[Array[Byte], Array[Byte]]],
threadId: Int)
extends Thread with Logging {
@ -140,16 +139,14 @@ object MirrorMaker extends Logging {
try {
for (msgAndMetadata <- stream) {
val producer = producerSelector.next()
val pd = new ProducerData[Null, Message](
val pd = new KeyedMessage[Array[Byte], Array[Byte]](
msgAndMetadata.topic, msgAndMetadata.message)
producer.send(pd)
}
}
catch {
} catch {
case e =>
fatal("%s stream unexpectedly exited.", e)
}
finally {
} finally {
shutdownLatch.countDown()
info("Stopped thread %s.".format(threadName))
}
@ -158,8 +155,7 @@ object MirrorMaker extends Logging {
def awaitShutdown() {
try {
shutdownLatch.await()
}
catch {
} catch {
case e: InterruptedException => fatal(
"Shutdown of thread %s interrupted. This might leak data!"
.format(threadName))

View File

@ -62,7 +62,7 @@ object ProducerShell {
done = true
} else {
val message = line.trim
producer.send(new ProducerData[String, String](topic, message))
producer.send(new KeyedMessage[String, String](topic, message))
println("Sent: %s (%d bytes)".format(line, message.getBytes.length))
}
}

View File

@ -20,7 +20,7 @@ package kafka.tools
import joptsimple.OptionParser
import java.util.concurrent.{Executors, CountDownLatch}
import java.util.Properties
import kafka.producer.{ProducerData, ProducerConfig, Producer}
import kafka.producer.{KeyedMessage, ProducerConfig, Producer}
import kafka.consumer._
import kafka.utils.{Logging, ZkUtils}
import kafka.api.OffsetRequest
@ -136,7 +136,7 @@ object ReplayLogProducer extends Logging {
val compressionCodec = CompressionCodec.getCompressionCodec(options.valueOf(compressionCodecOption).intValue)
}
class ZKConsumerThread(config: Config, stream: KafkaStream[Message]) extends Thread with Logging {
class ZKConsumerThread(config: Config, stream: KafkaStream[Array[Byte], Array[Byte]]) extends Thread with Logging {
val shutdownLatch = new CountDownLatch(1)
val props = new Properties()
props.put("broker.list", config.brokerList)
@ -150,7 +150,7 @@ object ReplayLogProducer extends Logging {
props.put("producer.type", "async")
val producerConfig = new ProducerConfig(props)
val producer = new Producer[Message, Message](producerConfig)
val producer = new Producer[Array[Byte], Array[Byte]](producerConfig)
override def run() {
info("Starting consumer thread..")
@ -163,7 +163,7 @@ object ReplayLogProducer extends Logging {
stream
for (messageAndMetadata <- iter) {
try {
producer.send(new ProducerData[Message, Message](config.outputTopic, messageAndMetadata.message))
producer.send(new KeyedMessage[Array[Byte], Array[Byte]](config.outputTopic, messageAndMetadata.message))
if (config.delayedMSBtwSend > 0 && (messageCount + 1) % config.batchSize == 0)
Thread.sleep(config.delayedMSBtwSend)
messageCount += 1

View File

@ -19,10 +19,12 @@ package kafka.tools
import joptsimple._
import kafka.utils._
import kafka.producer.ProducerConfig
import kafka.consumer._
import kafka.client.ClientUtils
import kafka.api.{OffsetRequest, FetchRequestBuilder, Request}
import kafka.cluster.Broker
import java.util.Properties
import scala.collection.JavaConversions._
/**
@ -194,7 +196,9 @@ object SimpleConsumerShell extends Logging {
offset = messageAndOffset.nextOffset
if(printOffsets)
System.out.println("next offset = " + offset)
formatter.writeTo(messageAndOffset.message, System.out)
val message = messageAndOffset.message
val key = if(message.hasKey) Utils.readBytes(message.key) else null
formatter.writeTo(key, Utils.readBytes(message.payload), System.out)
} catch {
case e =>
if (skipMessageOnError)

View File

@ -134,18 +134,24 @@ object Utils extends Logging {
thread
}
/**
* Read the given byte buffer into a byte array
*/
def readBytes(buffer: ByteBuffer): Array[Byte] = readBytes(buffer, 0, buffer.limit)
/**
* Read a byte array from the given offset and size in the buffer
* TODO: Should use System.arraycopy
*/
def readBytes(buffer: ByteBuffer, offset: Int, size: Int): Array[Byte] = {
val bytes = new Array[Byte](size)
var i = 0
while(i < size) {
bytes(i) = buffer.get(offset + i)
i += 1
val dest = new Array[Byte](size)
if(buffer.hasArray) {
System.arraycopy(buffer.array, buffer.arrayOffset() + offset, dest, 0, size)
} else {
buffer.mark()
buffer.get(dest)
buffer.reset()
}
bytes
dest
}
/**
@ -204,7 +210,7 @@ object Utils extends Logging {
* @param buffer The buffer to translate
* @param encoding The encoding to use in translating bytes to characters
*/
def readString(buffer: ByteBuffer, encoding: String): String = {
def readString(buffer: ByteBuffer, encoding: String = Charset.defaultCharset.toString): String = {
val bytes = new Array[Byte](buffer.remaining)
buffer.get(bytes)
new String(bytes, encoding)
@ -446,16 +452,10 @@ object Utils extends Logging {
/**
* Create an instance of the class with the given class name
*/
def createObject[T<:AnyRef](className: String): T = {
className match {
case null => null.asInstanceOf[T]
case _ =>
val clazz = Class.forName(className)
val clazzT = clazz.asInstanceOf[Class[T]]
val constructors = clazzT.getConstructors
require(constructors.length == 1)
constructors.head.newInstance().asInstanceOf[T]
}
def createObject[T<:AnyRef](className: String, args: AnyRef*): T = {
val klass = Class.forName(className).asInstanceOf[Class[T]]
val constructor = klass.getConstructor(args.map(_.getClass): _*)
constructor.newInstance(args: _*).asInstanceOf[T]
}
/**

View File

@ -55,7 +55,7 @@ object TestEndToEndLatency {
var totalTime = 0.0
for(i <- 0 until numMessages) {
var begin = System.nanoTime
producer.send(new ProducerData(topic, message))
producer.send(new KeyedMessage(topic, message))
val received = iter.next
val elapsed = System.nanoTime - begin
// poor man's progress bar

View File

@ -44,7 +44,7 @@ object TestKafkaAppender extends Logging {
}
}
class AppenderStringSerializer extends Encoder[AnyRef] {
def toMessage(event: AnyRef):Message = new Message(event.asInstanceOf[String].getBytes)
class AppenderStringSerializer(encoding: String = "UTF-8") extends Encoder[AnyRef] {
def toBytes(event: AnyRef): Array[Byte] = event.toString.getBytes(encoding)
}

View File

@ -56,13 +56,13 @@ object TestZKConsumerOffsets {
}
}
private class ConsumerThread(stream: KafkaStream[Message]) extends Thread {
private class ConsumerThread(stream: KafkaStream[Array[Byte], Array[Byte]]) extends Thread {
val shutdownLatch = new CountDownLatch(1)
override def run() {
println("Starting consumer thread..")
for (messageAndMetadata <- stream) {
println("consumed: " + Utils.readString(messageAndMetadata.message.payload, "UTF-8"))
println("consumed: " + new String(messageAndMetadata.message, "UTF-8"))
}
shutdownLatch.countDown
println("thread shutdown !" )

View File

@ -26,10 +26,10 @@ import junit.framework.Assert._
import kafka.message._
import kafka.server._
import kafka.utils.TestUtils._
import kafka.utils.{TestZKUtils, TestUtils}
import kafka.utils._
import kafka.admin.CreateTopicCommand
import org.junit.Test
import kafka.serializer.DefaultDecoder
import kafka.serializer._
import kafka.cluster.{Broker, Cluster}
import org.scalatest.junit.JUnit3Suite
import kafka.integration.KafkaServerTestHarness
@ -46,13 +46,14 @@ class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness {
val topic = "topic"
val group = "group1"
val consumer0 = "consumer0"
val consumedOffset = 5
val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, c.brokerId.toString, "localhost", c.port)))
val queue = new LinkedBlockingQueue[FetchedDataChunk]
val topicInfos = configs.map(c => new PartitionTopicInfo(topic,
c.brokerId,
0,
queue,
new AtomicLong(5),
new AtomicLong(consumedOffset),
new AtomicLong(0),
new AtomicInteger(0)))
val consumerConfig = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer0))
@ -65,24 +66,25 @@ class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness {
@Test
def testConsumerIteratorDeduplicationDeepIterator() {
val messages = 0.until(10).map(x => new Message((configs(0).brokerId * 5 + x).toString.getBytes)).toList
val messageStrings = (0 until 10).map(_.toString).toList
val messages = messageStrings.map(s => new Message(s.getBytes))
val messageSet = new ByteBufferMessageSet(DefaultCompressionCodec, new AtomicLong(0), messages:_*)
topicInfos(0).enqueue(messageSet)
assertEquals(1, queue.size)
queue.put(ZookeeperConsumerConnector.shutdownCommand)
val iter: ConsumerIterator[Message] = new ConsumerIterator[Message](queue, consumerConfig.consumerTimeoutMs,
new DefaultDecoder, false)
var receivedMessages: List[Message] = Nil
for (i <- 0 until 5) {
assertTrue(iter.hasNext)
receivedMessages ::= iter.next.message
}
val iter = new ConsumerIterator[String, String](queue,
consumerConfig.consumerTimeoutMs,
new StringDecoder(),
new StringDecoder(),
enableShallowIterator = false)
var receivedMessages = (0 until 5).map(i => iter.next.message).toList
assertTrue(!iter.hasNext)
assertFalse(iter.hasNext)
assertEquals(1, queue.size) // This is only the shutdown command.
assertEquals(5, receivedMessages.size)
assertEquals(receivedMessages.sortWith((s,t) => s.checksum < t.checksum), messages.takeRight(5).sortWith((s,t) => s.checksum < t.checksum))
val unconsumed = messageSet.filter(_.offset >= consumedOffset).map(m => Utils.readString(m.message.payload))
assertEquals(unconsumed, receivedMessages)
}
}

View File

@ -25,11 +25,11 @@ import scala.collection._
import org.scalatest.junit.JUnit3Suite
import org.apache.log4j.{Level, Logger}
import kafka.message._
import kafka.serializer.StringDecoder
import kafka.serializer._
import kafka.admin.CreateTopicCommand
import org.I0Itec.zkclient.ZkClient
import kafka.utils._
import kafka.producer.{ProducerConfig, ProducerData, Producer}
import kafka.producer.{ProducerConfig, KeyedMessage, Producer}
import java.util.{Collections, Properties}
import kafka.utils.TestUtils._
@ -73,7 +73,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
override val consumerTimeoutMs = 200
}
val zkConsumerConnector0 = new ZookeeperConsumerConnector(consumerConfig0, true)
val topicMessageStreams0 = zkConsumerConnector0.createMessageStreams(Predef.Map(topic -> 1))
val topicMessageStreams0 = zkConsumerConnector0.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder())
// no messages to consume, we should hit timeout;
// also the iterator should support re-entrant, so loop it twice
@ -90,9 +90,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
zkConsumerConnector0.shutdown
// send some messages to each broker
val sentMessages1_1 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages)
val sentMessages1_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages)
val sentMessages1 = (sentMessages1_1 ++ sentMessages1_2).sortWith((s,t) => s.checksum < t.checksum)
val sentMessages1 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages) ++
sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages)
// wait to make sure the topic and partition have a leader for the successful case
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
@ -101,11 +100,10 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
// create a consumer
val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1))
val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Predef.Map(topic -> 1))
val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder())
val receivedMessages1 = getMessages(nMessages*2, topicMessageStreams1)
assertEquals(sentMessages1.size, receivedMessages1.size)
assertEquals(sentMessages1, receivedMessages1)
assertEquals(sentMessages1.sorted, receivedMessages1.sorted)
// also check partition ownership
val actual_1 = getZKChildrenValues(dirs.consumerOwnerDir)
@ -121,19 +119,16 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
override val rebalanceBackoffMs = RebalanceBackoffMs
}
val zkConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, true)
val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Predef.Map(topic -> 1))
val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder())
// send some messages to each broker
val sentMessages2_1 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages)
val sentMessages2_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages)
val sentMessages2 = (sentMessages2_1 ++ sentMessages2_2).sortWith((s,t) => s.checksum < t.checksum)
val sentMessages2 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages) ++
sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages)
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1, 500)
val receivedMessages2_1 = getMessages(nMessages, topicMessageStreams1)
val receivedMessages2_2 = getMessages(nMessages, topicMessageStreams2)
val receivedMessages2 = (receivedMessages2_1 ::: receivedMessages2_2).sortWith((s,t) => s.checksum < t.checksum)
assertEquals(sentMessages2, receivedMessages2)
val receivedMessages2 = getMessages(nMessages, topicMessageStreams1) ++ getMessages(nMessages, topicMessageStreams2)
assertEquals(sentMessages2.sorted, receivedMessages2.sorted)
// also check partition ownership
val actual_2 = getZKChildrenValues(dirs.consumerOwnerDir)
@ -147,18 +142,14 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
val zkConsumerConnector3 = new ZookeeperConsumerConnector(consumerConfig3, true)
val topicMessageStreams3 = zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int]())
// send some messages to each broker
val sentMessages3_1 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages)
val sentMessages3_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages)
val sentMessages3 = (sentMessages3_1 ++ sentMessages3_2).sortWith((s,t) => s.checksum < t.checksum)
val sentMessages3 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages) ++
sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages)
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1, 500)
val receivedMessages3_1 = getMessages(nMessages, topicMessageStreams1)
val receivedMessages3_2 = getMessages(nMessages, topicMessageStreams2)
val receivedMessages3 = (receivedMessages3_1 ::: receivedMessages3_2).sortWith((s,t) => s.checksum < t.checksum)
assertEquals(sentMessages3.size, receivedMessages3.size)
assertEquals(sentMessages3, receivedMessages3)
val receivedMessages3 = getMessages(nMessages, topicMessageStreams1) ++ getMessages(nMessages, topicMessageStreams2)
assertEquals(sentMessages3.sorted, receivedMessages3.sorted)
// also check partition ownership
val actual_3 = getZKChildrenValues(dirs.consumerOwnerDir)
@ -176,9 +167,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
requestHandlerLogger.setLevel(Level.FATAL)
// send some messages to each broker
val sentMessages1_1 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages, GZIPCompressionCodec)
val sentMessages1_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec)
val sentMessages1 = (sentMessages1_1 ++ sentMessages1_2).sortWith((s,t) => s.checksum < t.checksum)
val sentMessages1 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages, GZIPCompressionCodec) ++
sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec)
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1, 500)
@ -187,10 +177,9 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
val consumerConfig1 = new ConsumerConfig(
TestUtils.createConsumerProperties(zkConnect, group, consumer1))
val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Predef.Map(topic -> 1))
val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder())
val receivedMessages1 = getMessages(nMessages*2, topicMessageStreams1)
assertEquals(sentMessages1.size, receivedMessages1.size)
assertEquals(sentMessages1, receivedMessages1)
assertEquals(sentMessages1.sorted, receivedMessages1.sorted)
// also check partition ownership
val actual_1 = getZKChildrenValues(dirs.consumerOwnerDir)
@ -206,19 +195,16 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
override val rebalanceBackoffMs = RebalanceBackoffMs
}
val zkConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, true)
val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Predef.Map(topic -> 1))
val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder())
// send some messages to each broker
val sentMessages2_1 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages, GZIPCompressionCodec)
val sentMessages2_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec)
val sentMessages2 = (sentMessages2_1 ++ sentMessages2_2).sortWith((s,t) => s.checksum < t.checksum)
val sentMessages2 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages, GZIPCompressionCodec) ++
sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec)
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1, 500)
val receivedMessages2_1 = getMessages(nMessages, topicMessageStreams1)
val receivedMessages2_2 = getMessages(nMessages, topicMessageStreams2)
val receivedMessages2 = (receivedMessages2_1 ::: receivedMessages2_2).sortWith((s,t) => s.checksum < t.checksum)
assertEquals(sentMessages2, receivedMessages2)
val receivedMessages2 = getMessages(nMessages, topicMessageStreams1) ++ getMessages(nMessages, topicMessageStreams2)
assertEquals(sentMessages2.sorted, receivedMessages2.sorted)
// also check partition ownership
val actual_2 = getZKChildrenValues(dirs.consumerOwnerDir)
@ -230,20 +216,16 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
val consumerConfig3 = new ConsumerConfig(
TestUtils.createConsumerProperties(zkConnect, group, consumer3))
val zkConsumerConnector3 = new ZookeeperConsumerConnector(consumerConfig3, true)
val topicMessageStreams3 = zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int]())
val topicMessageStreams3 = zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int](), new StringDecoder(), new StringDecoder())
// send some messages to each broker
val sentMessages3_1 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages, GZIPCompressionCodec)
val sentMessages3_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec)
val sentMessages3 = (sentMessages3_1 ++ sentMessages3_2).sortWith((s,t) => s.checksum < t.checksum)
val sentMessages3 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages, GZIPCompressionCodec) ++
sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec)
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1, 500)
val receivedMessages3_1 = getMessages(nMessages, topicMessageStreams1)
val receivedMessages3_2 = getMessages(nMessages, topicMessageStreams2)
val receivedMessages3 = (receivedMessages3_1 ::: receivedMessages3_2).sortWith((s,t) => s.checksum < t.checksum)
assertEquals(sentMessages3.size, receivedMessages3.size)
assertEquals(sentMessages3, receivedMessages3)
val receivedMessages3 = getMessages(nMessages, topicMessageStreams1) ++ getMessages(nMessages, topicMessageStreams2)
assertEquals(sentMessages3.sorted, receivedMessages3.sorted)
// also check partition ownership
val actual_3 = getZKChildrenValues(dirs.consumerOwnerDir)
@ -258,17 +240,14 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
def testCompressionSetConsumption() {
// send some messages to each broker
val sentMessages1 = sendMessagesToBrokerPartition(configs.head, topic, 0, 200, DefaultCompressionCodec)
val sentMessages2 = sendMessagesToBrokerPartition(configs.last, topic, 1, 200, DefaultCompressionCodec)
val sentMessages = (sentMessages1 ++ sentMessages2).sortWith((s,t) => s.checksum < t.checksum)
val sentMessages = sendMessagesToBrokerPartition(configs.head, topic, 0, 200, DefaultCompressionCodec) ++
sendMessagesToBrokerPartition(configs.last, topic, 1, 200, DefaultCompressionCodec)
val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer0))
val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Predef.Map(topic -> 1))
val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder())
val receivedMessages = getMessages(400, topicMessageStreams1)
val sortedReceivedMessages = receivedMessages.sortWith((s,t) => s.checksum < t.checksum)
val sortedSentMessages = sentMessages.sortWith((s,t) => s.checksum < t.checksum)
assertEquals(sortedSentMessages, sortedReceivedMessages)
assertEquals(sentMessages.sorted, receivedMessages.sorted)
// also check partition ownership
val actual_2 = getZKChildrenValues(dirs.consumerOwnerDir)
@ -284,10 +263,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
requestHandlerLogger.setLevel(Level.FATAL)
// send some messages to each broker
val sentMessages1 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages, NoCompressionCodec)
val sentMessages2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, NoCompressionCodec)
val sentMessages = (sentMessages1 ++ sentMessages2).map(m => Utils.readString(m.payload, "UTF-8")).
sortWith((s, t) => s.compare(t) == -1)
val sentMessages = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages, NoCompressionCodec) ++
sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, NoCompressionCodec)
val consumerConfig = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1))
@ -297,8 +274,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
val zkConsumerConnector =
new ZookeeperConsumerConnector(consumerConfig, true)
val topicMessageStreams =
zkConsumerConnector.createMessageStreams(Predef.Map(topic -> 1), new StringDecoder)
zkConsumerConnector.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder())
var receivedMessages: List[String] = Nil
for ((topic, messageStreams) <- topicMessageStreams) {
@ -312,8 +288,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
}
}
}
receivedMessages = receivedMessages.sortWith((s, t) => s.compare(t) == -1)
assertEquals(sentMessages, receivedMessages)
assertEquals(sentMessages.sorted, receivedMessages.sorted)
zkConsumerConnector.shutdown()
requestHandlerLogger.setLevel(Level.ERROR)
@ -331,7 +306,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
// create a consumer
val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1))
val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Predef.Map(topic -> 1))
val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder())
val topicRegistry = zkConsumerConnector1.getTopicRegistry
assertEquals(1, topicRegistry.map(r => r._1).size)
assertEquals(topic, topicRegistry.map(r => r._1).head)
@ -346,54 +321,61 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
assertEquals(expected_1, actual_1)
val receivedMessages1 = getMessages(nMessages, topicMessageStreams1)
assertEquals(nMessages, receivedMessages1.size)
assertEquals(sentMessages1.sortWith((s,t) => s.checksum < t.checksum), receivedMessages1)
assertEquals(sentMessages1, receivedMessages1)
}
def sendMessagesToBrokerPartition(config: KafkaConfig, topic: String, partition: Int, numMessages: Int, compression: CompressionCodec = NoCompressionCodec): List[Message] = {
def sendMessagesToBrokerPartition(config: KafkaConfig,
topic: String,
partition: Int,
numMessages: Int,
compression: CompressionCodec = NoCompressionCodec): List[String] = {
val header = "test-%d-%d".format(config.brokerId, partition)
val props = new Properties()
props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
props.put("partitioner.class", "kafka.utils.FixedValuePartitioner")
props.put("compression.codec", compression.codec.toString)
val producer: Producer[Int, Message] = new Producer[Int, Message](new ProducerConfig(props))
val ms = 0.until(numMessages).map(x =>
new Message((header + config.brokerId + "-" + partition + "-" + x).getBytes)).toArray
producer.send(new ProducerData[Int, Message](topic, partition, ms))
props.put("key.serializer.class", classOf[IntEncoder].getName.toString)
props.put("serializer.class", classOf[StringEncoder].getName.toString)
val producer: Producer[Int, String] = new Producer[Int, String](new ProducerConfig(props))
val ms = 0.until(numMessages).map(x => header + config.brokerId + "-" + partition + "-" + x)
producer.send(ms.map(m => new KeyedMessage[Int, String](topic, partition, m)):_*)
debug("Sent %d messages to broker %d for topic %s and partition %d".format(ms.size, config.brokerId, topic, partition))
producer
producer.close()
ms.toList
}
def sendMessages(config: KafkaConfig, messagesPerNode: Int, header: String, compression: CompressionCodec, numParts: Int): List[Message]= {
var messages: List[Message] = Nil
def sendMessages(config: KafkaConfig,
messagesPerNode: Int,
header: String,
compression: CompressionCodec,
numParts: Int): List[String]= {
var messages: List[String] = Nil
val props = new Properties()
props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
props.put("partitioner.class", "kafka.utils.FixedValuePartitioner")
val producer: Producer[Int, Message] = new Producer[Int, Message](new ProducerConfig(props))
props.put("key.serializer.class", classOf[IntEncoder].getName.toString)
props.put("serializer.class", classOf[StringEncoder].getName)
val producer: Producer[Int, String] = new Producer[Int, String](new ProducerConfig(props))
for (partition <- 0 until numParts) {
val ms = 0.until(messagesPerNode).map(x =>
new Message((header + config.brokerId + "-" + partition + "-" + x).getBytes)).toArray
for (message <- ms)
messages ::= message
producer.send(new ProducerData[Int, Message](topic, partition, ms))
val ms = 0.until(messagesPerNode).map(x => header + config.brokerId + "-" + partition + "-" + x)
producer.send(ms.map(m => new KeyedMessage[Int, String](topic, partition, m)):_*)
messages ++= ms
debug("Sent %d messages to broker %d for topic %s and partition %d".format(ms.size, config.brokerId, topic, partition))
}
producer.close()
messages.reverse
messages
}
def sendMessages(messagesPerNode: Int, header: String, compression: CompressionCodec = NoCompressionCodec): List[Message]= {
var messages: List[Message] = Nil
for(conf <- configs) {
def sendMessages(messagesPerNode: Int, header: String, compression: CompressionCodec = NoCompressionCodec): List[String]= {
var messages: List[String] = Nil
for(conf <- configs)
messages ++= sendMessages(conf, messagesPerNode, header, compression, numParts)
}
messages.sortWith((s,t) => s.checksum < t.checksum)
messages
}
def getMessages(nMessagesPerThread: Int, topicMessageStreams: Map[String,List[KafkaStream[Message]]]): List[Message]= {
var messages: List[Message] = Nil
def getMessages(nMessagesPerThread: Int,
topicMessageStreams: Map[String,List[KafkaStream[String, String]]]): List[String]= {
var messages: List[String] = Nil
for((topic, messageStreams) <- topicMessageStreams) {
for (messageStream <- messageStreams) {
val iterator = messageStream.iterator
@ -401,11 +383,11 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
assertTrue(iterator.hasNext)
val message = iterator.next.message
messages ::= message
debug("received message: " + Utils.readString(message.payload, "UTF-8"))
debug("received message: " + message)
}
}
}
messages.sortWith((s,t) => s.checksum < t.checksum)
messages.reverse
}
def getZKChildrenValues(path : String) : Seq[Tuple2[String,String]] = {

View File

@ -25,7 +25,8 @@ import org.apache.log4j.{Level, Logger}
import org.scalatest.junit.JUnit3Suite
import kafka.utils.TestUtils
import kafka.message.Message
import kafka.producer.{Producer, ProducerData}
import kafka.serializer._
import kafka.producer.{Producer, KeyedMessage}
class AutoOffsetResetTest extends JUnit3Suite with KafkaServerTestHarness with Logging {
@ -69,10 +70,11 @@ class AutoOffsetResetTest extends JUnit3Suite with KafkaServerTestHarness with L
* Returns the count of messages received.
*/
def resetAndConsume(numMessages: Int, resetTo: String, offset: Long): Int = {
val producer: Producer[String, Message] = TestUtils.createProducer(TestUtils.getBrokerListStrFromConfigs(configs))
val producer: Producer[String, Array[Byte]] = TestUtils.createProducer(TestUtils.getBrokerListStrFromConfigs(configs),
new DefaultEncoder(), new StringEncoder())
for(i <- 0 until numMessages)
producer.send(new ProducerData[String, Message](topic, topic, new Message("test".getBytes())))
producer.send(new KeyedMessage[String, Array[Byte]](topic, topic, "test".getBytes))
// update offset in zookeeper for consumer to jump "forward" in time
val dirs = new ZKGroupTopicDirs(group, topic)

View File

@ -27,7 +27,8 @@ import kafka.message._
import kafka.server._
import org.scalatest.junit.JUnit3Suite
import kafka.consumer._
import kafka.producer.{ProducerData, Producer}
import kafka.serializer._
import kafka.producer.{KeyedMessage, Producer}
import kafka.utils.TestUtils._
import kafka.utils.TestUtils
import kafka.admin.CreateTopicCommand
@ -38,7 +39,7 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
val configs =
for(props <- TestUtils.createBrokerConfigs(numNodes))
yield new KafkaConfig(props)
val messages = new mutable.HashMap[Int, Seq[Message]]
val messages = new mutable.HashMap[Int, Seq[Array[Byte]]]
val topic = "topic"
val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, c.brokerId.toString, "localhost", c.port)))
val shutdown = ZookeeperConsumerConnector.shutdownCommand
@ -83,10 +84,10 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
def sendMessages(messagesPerNode: Int): Int = {
var count = 0
for(conf <- configs) {
val producer: Producer[String, Message] = TestUtils.createProducer(TestUtils.getBrokerListStrFromConfigs(configs))
val ms = 0.until(messagesPerNode).map(x => new Message((conf.brokerId * 5 + x).toString.getBytes)).toArray
val producer: Producer[String, Array[Byte]] = TestUtils.createProducer(TestUtils.getBrokerListStrFromConfigs(configs), new DefaultEncoder(), new StringEncoder())
val ms = 0.until(messagesPerNode).map(x => (conf.brokerId * 5 + x).toString.getBytes).toArray
messages += conf.brokerId -> ms
producer.send(new ProducerData[String, Message](topic, topic, ms))
producer.send(ms.map(m => KeyedMessage[String, Array[Byte]](topic, topic, m)):_*)
producer.close()
count += ms.size
}

View File

@ -21,10 +21,11 @@ import kafka.api.FetchRequestBuilder
import kafka.message.{Message, ByteBufferMessageSet}
import kafka.server.{KafkaRequestHandler, KafkaConfig}
import org.apache.log4j.{Level, Logger}
import org.junit.Assert._
import org.scalatest.junit.JUnit3Suite
import scala.collection._
import kafka.producer.ProducerData
import kafka.utils.TestUtils
import kafka.producer.KeyedMessage
import kafka.utils._
import kafka.common.{ErrorMapping, KafkaException, OffsetOutOfRangeException}
/**
@ -57,17 +58,17 @@ class LazyInitProducerTest extends JUnit3Suite with ProducerConsumerTestHarness
def testProduceAndFetch() {
// send some messages
val topic = "test"
val sentMessages = List(new Message("hello".getBytes()), new Message("there".getBytes()))
val producerData = new ProducerData[String, Message](topic, topic, sentMessages)
val sentMessages = List("hello", "there")
val producerData = sentMessages.map(m => new KeyedMessage[String, String](topic, topic, m))
producer.send(producerData)
producer.send(producerData:_*)
var fetchedMessage: ByteBufferMessageSet = null
while(fetchedMessage == null || fetchedMessage.validBytes == 0) {
val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
fetchedMessage = fetched.messageSet(topic, 0)
}
TestUtils.checkEquals(sentMessages.iterator, fetchedMessage.map(m => m.message).iterator)
assertEquals(sentMessages, fetchedMessage.map(m => Utils.readString(m.message.payload)).toList)
// send an invalid offset
try {
@ -83,12 +84,12 @@ class LazyInitProducerTest extends JUnit3Suite with ProducerConsumerTestHarness
// send some messages, with non-ordered topics
val topicOffsets = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
{
val messages = new mutable.HashMap[String, Seq[Message]]
val messages = new mutable.HashMap[String, Seq[String]]
val builder = new FetchRequestBuilder()
for( (topic, offset) <- topicOffsets) {
val producedData = List(new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))
val producedData = List("a_" + topic, "b_" + topic)
messages += topic -> producedData
producer.send(new ProducerData[String, Message](topic, topic, producedData))
producer.send(producedData.map(m => new KeyedMessage[String, String](topic, topic, m)):_*)
builder.addFetch(topic, offset, 0, 10000)
}
@ -97,7 +98,7 @@ class LazyInitProducerTest extends JUnit3Suite with ProducerConsumerTestHarness
val response = consumer.fetch(request)
for( (topic, offset) <- topicOffsets) {
val fetched = response.messageSet(topic, offset)
TestUtils.checkEquals(messages(topic).iterator, fetched.map(m => m.message).iterator)
assertEquals(messages(topic), fetched.map(m => Utils.readString(m.message.payload)))
}
}
@ -121,13 +122,13 @@ class LazyInitProducerTest extends JUnit3Suite with ProducerConsumerTestHarness
def testMultiProduce() {
// send some messages
val topics = List("test1", "test2", "test3");
val messages = new mutable.HashMap[String, Seq[Message]]
val messages = new mutable.HashMap[String, Seq[String]]
val builder = new FetchRequestBuilder()
var produceList: List[ProducerData[String, Message]] = Nil
var produceList: List[KeyedMessage[String, String]] = Nil
for(topic <- topics) {
val set = List(new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))
val set = List("a_" + topic, "b_" + topic)
messages += topic -> set
produceList ::= new ProducerData[String, Message](topic, topic, set)
produceList ++= set.map(new KeyedMessage[String, String](topic, topic, _))
builder.addFetch(topic, 0, 0, 10000)
}
producer.send(produceList: _*)
@ -137,20 +138,20 @@ class LazyInitProducerTest extends JUnit3Suite with ProducerConsumerTestHarness
val response = consumer.fetch(request)
for(topic <- topics) {
val fetched = response.messageSet(topic, 0)
TestUtils.checkEquals(messages(topic).iterator, fetched.map(m => m.message).iterator)
assertEquals(messages(topic), fetched.map(m => Utils.readString(m.message.payload)))
}
}
def testMultiProduceResend() {
// send some messages
val topics = List("test1", "test2", "test3");
val messages = new mutable.HashMap[String, Seq[Message]]
val messages = new mutable.HashMap[String, Seq[String]]
val builder = new FetchRequestBuilder()
var produceList: List[ProducerData[String, Message]] = Nil
var produceList: List[KeyedMessage[String, String]] = Nil
for(topic <- topics) {
val set = List(new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))
val set = List("a_" + topic, "b_" + topic)
messages += topic -> set
produceList ::= new ProducerData[String, Message](topic, topic, set)
produceList ++= set.map(new KeyedMessage[String, String](topic, topic, _))
builder.addFetch(topic, 0, 0, 10000)
}
producer.send(produceList: _*)
@ -161,9 +162,7 @@ class LazyInitProducerTest extends JUnit3Suite with ProducerConsumerTestHarness
val response = consumer.fetch(request)
for(topic <- topics) {
val topicMessages = response.messageSet(topic, 0)
TestUtils.checkEquals(TestUtils.stackedIterator(messages(topic).iterator,
messages(topic).iterator),
topicMessages.iterator.map(_.message))
assertEquals(messages(topic) ++ messages(topic), topicMessages.map(m => Utils.readString(m.message.payload)))
}
}
}

View File

@ -22,8 +22,9 @@ import junit.framework.Assert._
import kafka.api.{PartitionFetchInfo, FetchRequest, FetchRequestBuilder}
import kafka.server.{KafkaRequestHandler, KafkaConfig}
import java.util.Properties
import kafka.producer.{ProducerData, Producer, ProducerConfig}
import kafka.serializer.StringDecoder
import kafka.utils.Utils
import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
import kafka.serializer._
import kafka.message.Message
import kafka.utils.TestUtils
import org.apache.log4j.{Level, Logger}
@ -91,7 +92,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
val config = new ProducerConfig(props)
val stringProducer1 = new Producer[String, String](config)
stringProducer1.send(new ProducerData[String, String](topic, Array("test-message")))
stringProducer1.send(new KeyedMessage[String, String](topic, "test-message"))
val replica = servers.head.replicaManager.getReplica(topic, 0).get
assertTrue("HighWatermark should equal logEndOffset with just 1 replica",
@ -108,30 +109,26 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
assertTrue(messageSet.iterator.hasNext)
val fetchedMessageAndOffset = messageSet.head
val stringDecoder = new StringDecoder
val fetchedStringMessage = stringDecoder.toEvent(fetchedMessageAndOffset.message)
assertEquals("test-message", fetchedStringMessage)
assertEquals("test-message", Utils.readString(fetchedMessageAndOffset.message.payload, "UTF-8"))
}
def testDefaultEncoderProducerAndFetchWithCompression() {
val topic = "test-topic"
val props = new Properties()
props.put("serializer.class", "kafka.serializer.StringEncoder")
props.put("serializer.class", classOf[StringEncoder].getName.toString)
props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
props.put("compression", "true")
val config = new ProducerConfig(props)
val stringProducer1 = new Producer[String, String](config)
stringProducer1.send(new ProducerData[String, String](topic, Array("test-message")))
stringProducer1.send(new KeyedMessage[String, String](topic, "test-message"))
var fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
val messageSet = fetched.messageSet(topic, 0)
assertTrue(messageSet.iterator.hasNext)
val fetchedMessageAndOffset = messageSet.head
val stringDecoder = new StringDecoder
val fetchedStringMessage = stringDecoder.toEvent(fetchedMessageAndOffset.message)
assertEquals("test-message", fetchedStringMessage)
assertEquals("test-message", Utils.readString(fetchedMessageAndOffset.message.payload, "UTF-8"))
}
def testProduceAndMultiFetch() {
@ -140,22 +137,21 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
// send some messages
val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
{
val messages = new mutable.HashMap[String, Seq[Message]]
val messages = new mutable.HashMap[String, Seq[String]]
val builder = new FetchRequestBuilder()
for( (topic, partition) <- topics) {
val messageList = List(new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))
val producerData = new ProducerData[String, Message](topic, topic, messageList)
val messageList = List("a_" + topic, "b_" + topic)
val producerData = messageList.map(new KeyedMessage[String, String](topic, topic, _))
messages += topic -> messageList
producer.send(producerData)
producer.send(producerData:_*)
builder.addFetch(topic, partition, 0, 10000)
}
}
// wait a bit for produced message to be available
val request = builder.build()
val response = consumer.fetch(request)
for( (topic, partition) <- topics) {
for((topic, partition) <- topics) {
val fetched = response.messageSet(topic, partition)
TestUtils.checkEquals(messages(topic).iterator, fetched.map(messageAndOffset => messageAndOffset.message).iterator)
assertEquals(messages(topic), fetched.map(messageAndOffset => Utils.readString(messageAndOffset.message.payload)))
}
}
@ -204,13 +200,13 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
// send some messages
val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
{
val messages = new mutable.HashMap[String, Seq[Message]]
val messages = new mutable.HashMap[String, Seq[String]]
val builder = new FetchRequestBuilder()
for( (topic, partition) <- topics) {
val messageList = List(new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))
val producerData = new ProducerData[String, Message](topic, topic, messageList)
val messageList = List("a_" + topic, "b_" + topic)
val producerData = messageList.map(new KeyedMessage[String, String](topic, topic, _))
messages += topic -> messageList
producer.send(producerData)
producer.send(producerData:_*)
builder.addFetch(topic, partition, 0, 10000)
}
@ -219,7 +215,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
val response = consumer.fetch(request)
for( (topic, partition) <- topics) {
val fetched = response.messageSet(topic, partition)
TestUtils.checkEquals(messages(topic).iterator, fetched.map(messageAndOffset => messageAndOffset.message).iterator)
assertEquals(messages(topic), fetched.map(messageAndOffset => Utils.readString(messageAndOffset.message.payload)))
}
}
@ -267,14 +263,14 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
// send some messages
val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
val messages = new mutable.HashMap[String, Seq[Message]]
val messages = new mutable.HashMap[String, Seq[String]]
val builder = new FetchRequestBuilder()
var produceList: List[ProducerData[String, Message]] = Nil
var produceList: List[KeyedMessage[String, String]] = Nil
for( (topic, partition) <- topics) {
val messageList = List(new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))
val producerData = new ProducerData[String, Message](topic, topic, messageList)
val messageList = List("a_" + topic, "b_" + topic)
val producerData = messageList.map(new KeyedMessage[String, String](topic, topic, _))
messages += topic -> messageList
producer.send(producerData)
producer.send(producerData:_*)
builder.addFetch(topic, partition, 0, 10000)
}
producer.send(produceList: _*)
@ -284,21 +280,21 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
val response = consumer.fetch(request)
for( (topic, partition) <- topics) {
val fetched = response.messageSet(topic, partition)
TestUtils.checkEquals(messages(topic).iterator, fetched.map(messageAndOffset => messageAndOffset.message).iterator)
assertEquals(messages(topic), fetched.map(messageAndOffset => Utils.readString(messageAndOffset.message.payload)))
}
}
def testMultiProduceWithCompression() {
// send some messages
val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
val messages = new mutable.HashMap[String, Seq[Message]]
val messages = new mutable.HashMap[String, Seq[String]]
val builder = new FetchRequestBuilder()
var produceList: List[ProducerData[String, Message]] = Nil
var produceList: List[KeyedMessage[String, String]] = Nil
for( (topic, partition) <- topics) {
val messageList = List(new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))
val producerData = new ProducerData[String, Message](topic, topic, messageList)
val messageList = List("a_" + topic, "b_" + topic)
val producerData = messageList.map(new KeyedMessage[String, String](topic, topic, _))
messages += topic -> messageList
producer.send(producerData)
producer.send(producerData:_*)
builder.addFetch(topic, partition, 0, 10000)
}
producer.send(produceList: _*)
@ -308,7 +304,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
val response = consumer.fetch(request)
for( (topic, partition) <- topics) {
val fetched = response.messageSet(topic, 0)
TestUtils.checkEquals(messages(topic).iterator, fetched.map(messageAndOffset => messageAndOffset.message).iterator)
assertEquals(messages(topic), fetched.map(messageAndOffset => Utils.readString(messageAndOffset.message.payload)))
}
}

View File

@ -23,11 +23,12 @@ import java.util.Properties
import kafka.producer.{ProducerConfig, Producer}
import kafka.message.Message
import kafka.utils.TestUtils
import kafka.serializer._
trait ProducerConsumerTestHarness extends JUnit3Suite with KafkaServerTestHarness {
val port: Int
val host = "localhost"
var producer: Producer[String, Message] = null
var producer: Producer[String, String] = null
var consumer: SimpleConsumer = null
override def setUp() {
@ -41,6 +42,7 @@ trait ProducerConsumerTestHarness extends JUnit3Suite with KafkaServerTestHarnes
props.put("producer.retry.backoff.ms", "1000")
props.put("producer.num.retries", "3")
props.put("producer.request.required.acks", "-1")
props.put("serializer.class", classOf[StringEncoder].getName.toString)
producer = new Producer(new ProducerConfig(props))
consumer = new SimpleConsumer(host,
port,

View File

@ -24,7 +24,10 @@ import org.scalatest.junit.JUnit3Suite
import scala.collection.JavaConversions._
import org.apache.log4j.{Level, Logger}
import kafka.message._
import kafka.javaapi.producer.{ProducerData, Producer}
import kafka.serializer._
import kafka.producer.KeyedMessage
import kafka.javaapi.producer.Producer
import kafka.utils.IntEncoder
import kafka.utils.TestUtils._
import kafka.utils.{Utils, Logging, TestUtils}
import kafka.consumer.{KafkaStream, ConsumerConfig}
@ -60,43 +63,46 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
// create a consumer
val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zookeeperConnect, group, consumer1))
val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(toJavaMap(Predef.Map(topic -> numNodes*numParts/2)))
val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(toJavaMap(Map(topic -> numNodes*numParts/2)), new StringDecoder(), new StringDecoder())
val receivedMessages1 = getMessages(nMessages*2, topicMessageStreams1)
assertEquals(sentMessages1, receivedMessages1)
assertEquals(sentMessages1.sorted, receivedMessages1.sorted)
zkConsumerConnector1.shutdown
info("all consumer connectors stopped")
requestHandlerLogger.setLevel(Level.ERROR)
}
def sendMessages(conf: KafkaConfig, messagesPerNode: Int, header: String, compressed: CompressionCodec): List[Message]= {
var messages: List[Message] = Nil
val producer: kafka.producer.Producer[Int, Message] = TestUtils.createProducer(TestUtils.getBrokerListStrFromConfigs(configs))
val javaProducer: Producer[Int, Message] = new kafka.javaapi.producer.Producer(producer)
def sendMessages(conf: KafkaConfig,
messagesPerNode: Int,
header: String,
compressed: CompressionCodec): List[String] = {
var messages: List[String] = Nil
val producer: kafka.producer.Producer[Int, String] =
TestUtils.createProducer(TestUtils.getBrokerListStrFromConfigs(configs), new StringEncoder(), new IntEncoder())
val javaProducer: Producer[Int, String] = new kafka.javaapi.producer.Producer(producer)
for (partition <- 0 until numParts) {
val ms = 0.until(messagesPerNode).map(x =>
new Message((header + conf.brokerId + "-" + partition + "-" + x).getBytes)).toArray
for (message <- ms)
messages ::= message
val ms = 0.until(messagesPerNode).map(x => header + conf.brokerId + "-" + partition + "-" + x)
messages ++= ms
import scala.collection.JavaConversions._
javaProducer.send(new ProducerData[Int, Message](topic, partition, asList(ms)))
javaProducer.send(asList(ms.map(new KeyedMessage[Int, String](topic, partition, _))))
}
javaProducer.close
messages
}
def sendMessages(messagesPerNode: Int, header: String, compressed: CompressionCodec = NoCompressionCodec): List[Message]= {
var messages: List[Message] = Nil
for(conf <- configs) {
def sendMessages(messagesPerNode: Int,
header: String,
compressed: CompressionCodec = NoCompressionCodec): List[String] = {
var messages: List[String] = Nil
for(conf <- configs)
messages ++= sendMessages(conf, messagesPerNode, header, compressed)
}
messages.sortWith((s,t) => s.checksum < t.checksum)
messages
}
def getMessages(nMessagesPerThread: Int, jTopicMessageStreams: java.util.Map[String, java.util.List[KafkaStream[Message]]])
: List[Message]= {
var messages: List[Message] = Nil
def getMessages(nMessagesPerThread: Int,
jTopicMessageStreams: java.util.Map[String, java.util.List[KafkaStream[String, String]]]): List[String] = {
var messages: List[String] = Nil
val topicMessageStreams = asMap(jTopicMessageStreams)
for ((topic, messageStreams) <- topicMessageStreams) {
for (messageStream <- messageStreams) {
@ -105,11 +111,11 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
assertTrue(iterator.hasNext)
val message = iterator.next.message
messages ::= message
debug("received message: " + Utils.readString(message.payload, "UTF-8"))
debug("received message: " + message)
}
}
}
messages.sortWith((s,t) => s.checksum < t.checksum)
messages
}
private def toJavaMap(scalaMap: Map[String, Int]): java.util.Map[String, java.lang.Integer] = {

View File

@ -169,10 +169,9 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with
}
}
class AppenderStringEncoder extends Encoder[LoggingEvent] {
def toMessage(event: LoggingEvent):Message = {
val logMessage = event.getMessage
new Message(logMessage.asInstanceOf[String].getBytes)
class AppenderStringEncoder(encoding: String = "UTF-8") extends Encoder[LoggingEvent] {
def toBytes(event: LoggingEvent): Array[Byte] = {
event.getMessage.toString.getBytes(encoding)
}
}

View File

@ -25,14 +25,14 @@ import org.junit.Test
import kafka.api._
import kafka.cluster.Broker
import kafka.common._
import kafka.message.Message
import kafka.message._
import kafka.producer.async._
import kafka.serializer.{StringEncoder, StringDecoder, Encoder}
import kafka.serializer._
import kafka.server.KafkaConfig
import kafka.utils.TestUtils._
import org.scalatest.junit.JUnit3Suite
import scala.collection.Map
import scala.collection.mutable.ListBuffer
import scala.collection.mutable.ArrayBuffer
import kafka.utils._
class AsyncProducerTest extends JUnit3Suite {
@ -52,7 +52,7 @@ class AsyncProducerTest extends JUnit3Suite {
// a mock event handler that blocks
val mockEventHandler = new EventHandler[String,String] {
def handle(events: Seq[ProducerData[String,String]]) {
def handle(events: Seq[KeyedMessage[String,String]]) {
Thread.sleep(500)
}
@ -116,7 +116,7 @@ class AsyncProducerTest extends JUnit3Suite {
EasyMock.expectLastCall
EasyMock.replay(mockHandler)
val queue = new LinkedBlockingQueue[ProducerData[String,String]](10)
val queue = new LinkedBlockingQueue[KeyedMessage[String,String]](10)
val producerSendThread =
new ProducerSendThread[String,String]("thread1", queue, mockHandler, Integer.MAX_VALUE, 5)
producerSendThread.start()
@ -141,7 +141,7 @@ class AsyncProducerTest extends JUnit3Suite {
EasyMock.replay(mockHandler)
val queueExpirationTime = 200
val queue = new LinkedBlockingQueue[ProducerData[String,String]](10)
val queue = new LinkedBlockingQueue[KeyedMessage[String,String]](10)
val producerSendThread =
new ProducerSendThread[String,String]("thread1", queue, mockHandler, queueExpirationTime, 5)
producerSendThread.start()
@ -156,12 +156,12 @@ class AsyncProducerTest extends JUnit3Suite {
@Test
def testPartitionAndCollateEvents() {
val producerDataList = new ListBuffer[ProducerData[Int,Message]]
producerDataList.append(new ProducerData[Int,Message]("topic1", 0, new Message("msg1".getBytes)))
producerDataList.append(new ProducerData[Int,Message]("topic2", 1, new Message("msg2".getBytes)))
producerDataList.append(new ProducerData[Int,Message]("topic1", 2, new Message("msg3".getBytes)))
producerDataList.append(new ProducerData[Int,Message]("topic1", 3, new Message("msg4".getBytes)))
producerDataList.append(new ProducerData[Int,Message]("topic2", 4, new Message("msg5".getBytes)))
val producerDataList = new ArrayBuffer[KeyedMessage[Int,Message]]
producerDataList.append(new KeyedMessage[Int,Message]("topic1", 0, new Message("msg1".getBytes)))
producerDataList.append(new KeyedMessage[Int,Message]("topic2", 1, new Message("msg2".getBytes)))
producerDataList.append(new KeyedMessage[Int,Message]("topic1", 2, new Message("msg3".getBytes)))
producerDataList.append(new KeyedMessage[Int,Message]("topic1", 3, new Message("msg4".getBytes)))
producerDataList.append(new KeyedMessage[Int,Message]("topic2", 4, new Message("msg5".getBytes)))
val props = new Properties()
props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
@ -185,20 +185,18 @@ class AsyncProducerTest extends JUnit3Suite {
val producerPool = new ProducerPool(config)
val handler = new DefaultEventHandler[Int,String](config,
partitioner = intPartitioner,
encoder = null.asInstanceOf[Encoder[String]],
producerPool = producerPool,
topicPartitionInfos)
partitioner = intPartitioner,
encoder = null.asInstanceOf[Encoder[String]],
keyEncoder = new IntEncoder(),
producerPool = producerPool,
topicPartitionInfos)
val topic1Broker1Data = new ListBuffer[ProducerData[Int,Message]]
topic1Broker1Data.appendAll(List(new ProducerData[Int,Message]("topic1", 0, new Message("msg1".getBytes)),
new ProducerData[Int,Message]("topic1", 2, new Message("msg3".getBytes))))
val topic1Broker2Data = new ListBuffer[ProducerData[Int,Message]]
topic1Broker2Data.appendAll(List(new ProducerData[Int,Message]("topic1", 3, new Message("msg4".getBytes))))
val topic2Broker1Data = new ListBuffer[ProducerData[Int,Message]]
topic2Broker1Data.appendAll(List(new ProducerData[Int,Message]("topic2", 4, new Message("msg5".getBytes))))
val topic2Broker2Data = new ListBuffer[ProducerData[Int,Message]]
topic2Broker2Data.appendAll(List(new ProducerData[Int,Message]("topic2", 1, new Message("msg2".getBytes))))
val topic1Broker1Data =
ArrayBuffer[KeyedMessage[Int,Message]](new KeyedMessage[Int,Message]("topic1", 0, new Message("msg1".getBytes)),
new KeyedMessage[Int,Message]("topic1", 2, new Message("msg3".getBytes)))
val topic1Broker2Data = ArrayBuffer[KeyedMessage[Int,Message]](new KeyedMessage[Int,Message]("topic1", 3, new Message("msg4".getBytes)))
val topic2Broker1Data = ArrayBuffer[KeyedMessage[Int,Message]](new KeyedMessage[Int,Message]("topic2", 4, new Message("msg5".getBytes)))
val topic2Broker2Data = ArrayBuffer[KeyedMessage[Int,Message]](new KeyedMessage[Int,Message]("topic2", 1, new Message("msg2".getBytes)))
val expectedResult = Some(Map(
0 -> Map(
TopicAndPartition("topic1", 0) -> topic1Broker1Data,
@ -214,7 +212,7 @@ class AsyncProducerTest extends JUnit3Suite {
@Test
def testSerializeEvents() {
val produceData = TestUtils.getMsgStrings(5).map(m => new ProducerData[String,String]("topic1",m))
val produceData = TestUtils.getMsgStrings(5).map(m => new KeyedMessage[String,String]("topic1",m))
val props = new Properties()
props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
val config = new ProducerConfig(props)
@ -228,20 +226,20 @@ class AsyncProducerTest extends JUnit3Suite {
val handler = new DefaultEventHandler[String,String](config,
partitioner = null.asInstanceOf[Partitioner[String]],
encoder = new StringEncoder,
keyEncoder = new StringEncoder,
producerPool = producerPool,
topicPartitionInfos
)
val serializedData = handler.serialize(produceData)
val decoder = new StringDecoder
val deserializedData = serializedData.map(d => new ProducerData[String,String](d.getTopic, d.getData.map(m => decoder.toEvent(m))))
val deserializedData = serializedData.map(d => new KeyedMessage[String,String](d.topic, Utils.readString(d.message.payload)))
TestUtils.checkEquals(produceData.iterator, deserializedData.iterator)
}
@Test
def testInvalidPartition() {
val producerDataList = new ListBuffer[ProducerData[String,Message]]
producerDataList.append(new ProducerData[String,Message]("topic1", "key1", new Message("msg1".getBytes)))
val producerDataList = new ArrayBuffer[KeyedMessage[String,Message]]
producerDataList.append(new KeyedMessage[String,Message]("topic1", "key1", new Message("msg1".getBytes)))
val props = new Properties()
props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
val config = new ProducerConfig(props)
@ -257,6 +255,7 @@ class AsyncProducerTest extends JUnit3Suite {
val handler = new DefaultEventHandler[String,String](config,
partitioner = new NegativePartitioner,
encoder = null.asInstanceOf[Encoder[String]],
keyEncoder = null.asInstanceOf[Encoder[String]],
producerPool = producerPool,
topicPartitionInfos)
try {
@ -282,11 +281,12 @@ class AsyncProducerTest extends JUnit3Suite {
val producerPool = new ProducerPool(config)
val producerDataList = new ListBuffer[ProducerData[String,String]]
producerDataList.append(new ProducerData[String,String]("topic1", "msg1"))
val producerDataList = new ArrayBuffer[KeyedMessage[String,String]]
producerDataList.append(new KeyedMessage[String,String]("topic1", "msg1"))
val handler = new DefaultEventHandler[String,String](config,
partitioner = null.asInstanceOf[Partitioner[String]],
encoder = new StringEncoder,
keyEncoder = new StringEncoder,
producerPool = producerPool,
topicPartitionInfos)
try {
@ -333,12 +333,13 @@ class AsyncProducerTest extends JUnit3Suite {
val handler = new DefaultEventHandler[String,String](config,
partitioner = null.asInstanceOf[Partitioner[String]],
encoder = null.asInstanceOf[Encoder[String]],
keyEncoder = null.asInstanceOf[Encoder[String]],
producerPool = producerPool,
topicPartitionInfos)
val producerDataList = new ListBuffer[ProducerData[String,Message]]
producerDataList.append(new ProducerData[String,Message]("topic1", new Message("msg1".getBytes)))
producerDataList.append(new ProducerData[String,Message]("topic2", new Message("msg2".getBytes)))
producerDataList.append(new ProducerData[String,Message]("topic1", new Message("msg3".getBytes)))
val producerDataList = new ArrayBuffer[KeyedMessage[String,Message]]
producerDataList.append(new KeyedMessage[String,Message]("topic1", new Message("msg1".getBytes)))
producerDataList.append(new KeyedMessage[String,Message]("topic2", new Message("msg2".getBytes)))
producerDataList.append(new KeyedMessage[String,Message]("topic1", new Message("msg3".getBytes)))
val partitionedDataOpt = handler.partitionAndCollate(producerDataList)
partitionedDataOpt match {
@ -375,13 +376,14 @@ class AsyncProducerTest extends JUnit3Suite {
val handler = new DefaultEventHandler[String,String]( config,
partitioner = null.asInstanceOf[Partitioner[String]],
encoder = new StringEncoder,
keyEncoder = new StringEncoder,
producerPool = producerPool,
topicPartitionInfos)
val producer = new Producer[String, String](config, handler)
try {
// send all 10 messages, should create 2 batches and 2 syncproducer calls
producer.send(msgs.map(m => new ProducerData[String,String](topic, List(m))): _*)
producer.send(msgs.map(m => new KeyedMessage[String,String](topic, m)): _*)
producer.close
} catch {
@ -392,7 +394,8 @@ class AsyncProducerTest extends JUnit3Suite {
@Test
def testFailedSendRetryLogic() {
val props = new Properties()
props.put("serializer.class", "kafka.serializer.StringEncoder")
props.put("serializer.class", classOf[StringEncoder].getName.toString)
props.put("key.serializer.class", classOf[NullEncoder[Int]].getName.toString)
props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
val config = new ProducerConfig(props)
@ -407,11 +410,11 @@ class AsyncProducerTest extends JUnit3Suite {
// produce request for topic1 and partitions 0 and 1. Let the first request fail
// entirely. The second request will succeed for partition 1 but fail for partition 0.
// On the third try for partition 0, let it succeed.
val request1 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), TestUtils.messagesToSet(msgs), 0)
val request1 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), 0)
val response1 = ProducerResponse(ProducerRequest.CurrentVersion, 0,
Map((TopicAndPartition("topic1", 0), ProducerResponseStatus(ErrorMapping.NotLeaderForPartitionCode.toShort, 0L)),
(TopicAndPartition("topic1", 1), ProducerResponseStatus(ErrorMapping.NoError, 0L))))
val request2 = TestUtils.produceRequest(topic1, 0, TestUtils.messagesToSet(msgs))
val request2 = TestUtils.produceRequest(topic1, 0, messagesToSet(msgs))
val response2 = ProducerResponse(ProducerRequest.CurrentVersion, 0,
Map((TopicAndPartition("topic1", 0), ProducerResponseStatus(ErrorMapping.NoError, 0L))))
val mockSyncProducer = EasyMock.createMock(classOf[SyncProducer])
@ -429,11 +432,11 @@ class AsyncProducerTest extends JUnit3Suite {
val handler = new DefaultEventHandler[Int,String](config,
partitioner = new FixedValuePartitioner(),
encoder = new StringEncoder,
encoder = new StringEncoder(),
keyEncoder = new NullEncoder[Int](),
producerPool = producerPool,
topicPartitionInfos)
val data = List(new ProducerData[Int,String](topic1, 0, msgs),
new ProducerData[Int,String](topic1, 1, msgs))
val data = msgs.map(m => new KeyedMessage[Int,String](topic1, 0, m)) ++ msgs.map(m => new KeyedMessage[Int,String](topic1, 1, m))
handler.handle(data)
handler.close()
@ -445,12 +448,8 @@ class AsyncProducerTest extends JUnit3Suite {
def testJavaProducer() {
val topic = "topic1"
val msgs = TestUtils.getMsgStrings(5)
val scalaProducerData = msgs.map(m => new ProducerData[String, String](topic, List(m)))
val javaProducerData = scala.collection.JavaConversions.asList(msgs.map(m => {
val javaList = new LinkedList[String]()
javaList.add(m)
new kafka.javaapi.producer.ProducerData[String, String](topic, javaList)
}))
val scalaProducerData = msgs.map(m => new KeyedMessage[String, String](topic, m))
val javaProducerData = scala.collection.JavaConversions.asList(scalaProducerData)
val mockScalaProducer = EasyMock.createMock(classOf[kafka.producer.Producer[String, String]])
mockScalaProducer.send(scalaProducerData.head)
@ -480,10 +479,10 @@ class AsyncProducerTest extends JUnit3Suite {
}
}
def getProduceData(nEvents: Int): Seq[ProducerData[String,String]] = {
val producerDataList = new ListBuffer[ProducerData[String,String]]
def getProduceData(nEvents: Int): Seq[KeyedMessage[String,String]] = {
val producerDataList = new ArrayBuffer[KeyedMessage[String,String]]
for (i <- 0 until nEvents)
producerDataList.append(new ProducerData[String,String]("topic1", null, List("msg" + i)))
producerDataList.append(new KeyedMessage[String,String]("topic1", null, "msg" + i))
producerDataList
}
@ -495,4 +494,16 @@ class AsyncProducerTest extends JUnit3Suite {
val broker1 = new Broker(brokerId, brokerHost, brokerHost, brokerPort)
new TopicMetadata(topic, partition.map(new PartitionMetadata(_, Some(broker1), List(broker1))))
}
def messagesToSet(messages: Seq[String]): ByteBufferMessageSet = {
new ByteBufferMessageSet(NoCompressionCodec, messages.map(m => new Message(m.getBytes)): _*)
}
def messagesToSet(key: Array[Byte], messages: Seq[Array[Byte]]): ByteBufferMessageSet = {
new ByteBufferMessageSet(NoCompressionCodec, messages.map(m => new Message(key = key, bytes = m)): _*)
}
}
class NegativePartitioner(props: VerifiableProperties = null) extends Partitioner[String] {
def partition(data: String, numPartitions: Int): Int = -1
}

View File

@ -97,7 +97,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
val producerConfig1 = new ProducerConfig(props1)
val producer1 = new Producer[String, String](producerConfig1)
try{
producer1.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
producer1.send(new KeyedMessage[String, String]("new-topic", "test", "test1"))
fail("Test should fail because the broker list provided are not valid")
} catch {
case e: KafkaException =>
@ -112,7 +112,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
val producerConfig2= new ProducerConfig(props2)
val producer2 = new Producer[String, String](producerConfig2)
try{
producer2.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
producer2.send(new KeyedMessage[String, String]("new-topic", "test", "test1"))
} catch {
case e => fail("Should succeed sending the message", e)
} finally {
@ -125,7 +125,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
val producerConfig3 = new ProducerConfig(props3)
val producer3 = new Producer[String, String](producerConfig3)
try{
producer3.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
producer3.send(new KeyedMessage[String, String]("new-topic", "test", "test1"))
} catch {
case e => fail("Should succeed sending the message", e)
} finally {
@ -159,8 +159,8 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
val producer1 = new Producer[String, String](producerConfig1)
val producer2 = new Producer[String, String](producerConfig2)
// Available partition ids should be 0.
producer1.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
producer1.send(new ProducerData[String, String]("new-topic", "test", Array("test2")))
producer1.send(new KeyedMessage[String, String]("new-topic", "test", "test1"))
producer1.send(new KeyedMessage[String, String]("new-topic", "test", "test2"))
// get the leader
val leaderOpt = ZkUtils.getLeaderForPartition(zkClient, "new-topic", 0)
assertTrue("Leader for topic new-topic partition 0 should exist", leaderOpt.isDefined)
@ -174,12 +174,12 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
response2.messageSet("new-topic", 0).iterator.toBuffer
}
assertEquals("Should have fetched 2 messages", 2, messageSet.size)
assertEquals(new Message("test1".getBytes), messageSet(0).message)
assertEquals(new Message("test2".getBytes), messageSet(1).message)
assertEquals(new Message(bytes = "test1".getBytes, key = "test".getBytes), messageSet(0).message)
assertEquals(new Message(bytes = "test2".getBytes, key = "test".getBytes), messageSet(1).message)
producer1.close()
try {
producer2.send(new ProducerData[String, String]("new-topic", "test", Array("test2")))
producer2.send(new KeyedMessage[String, String]("new-topic", "test", "test2"))
fail("Should have timed out for 3 acks.")
}
catch {
@ -215,7 +215,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
try {
// Available partition ids should be 0, 1, 2 and 3, all lead and hosted only
// on broker 0
producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
producer.send(new KeyedMessage[String, String]("new-topic", "test", "test1"))
} catch {
case e => fail("Unexpected exception: " + e)
}
@ -226,7 +226,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
try {
// These sends should fail since there are no available brokers
producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
producer.send(new KeyedMessage[String, String]("new-topic", "test", "test1"))
fail("Should fail since no leader exists for the partition.")
} catch {
case e => // success
@ -241,7 +241,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
val response1 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
val messageSet1 = response1.messageSet("new-topic", 0).iterator
assertTrue("Message set should have 1 message", messageSet1.hasNext)
assertEquals(new Message("test1".getBytes), messageSet1.next.message)
assertEquals(new Message(bytes = "test1".getBytes, key = "test".getBytes), messageSet1.next.message)
assertFalse("Message set should have another message", messageSet1.hasNext)
} catch {
case e: Exception => fail("Not expected", e)
@ -270,7 +270,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
// do a simple test to make sure plumbing is okay
try {
// this message should be assigned to partition 0 whose leader is on broker 0
producer.send(new ProducerData[String, String]("new-topic", "test", Array("test")))
producer.send(new KeyedMessage[String, String]("new-topic", "test", "test"))
// cross check if brokers got the messages
val response1 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
val messageSet1 = response1.messageSet("new-topic", 0).iterator
@ -288,7 +288,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
try {
// this message should be assigned to partition 0 whose leader is on broker 0, but
// broker 0 will not response within timeoutMs millis.
producer.send(new ProducerData[String, String]("new-topic", "test", Array("test")))
producer.send(new KeyedMessage[String, String]("new-topic", "test", "test"))
} catch {
case e: FailedToSendMessageException => /* success */
case e: Exception => fail("Not expected", e)

View File

@ -20,10 +20,12 @@ import org.scalatest.junit.JUnit3Suite
import org.junit.Assert._
import kafka.admin.CreateTopicCommand
import kafka.utils.TestUtils._
import kafka.utils.IntEncoder
import kafka.utils.{Utils, TestUtils}
import kafka.zk.ZooKeeperTestHarness
import kafka.serializer._
import kafka.message.Message
import kafka.producer.{ProducerConfig, ProducerData, Producer}
import kafka.producer.{ProducerConfig, KeyedMessage, Producer}
class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
@ -42,23 +44,25 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
val configProps1 = configs.head
val configProps2 = configs.last
val message = new Message("hello".getBytes())
val message = "hello"
var producer: Producer[Int, Message] = null
var producer: Producer[Int, String] = null
var hwFile1: HighwaterMarkCheckpoint = new HighwaterMarkCheckpoint(configProps1.logDirs(0))
var hwFile2: HighwaterMarkCheckpoint = new HighwaterMarkCheckpoint(configProps2.logDirs(0))
var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
val producerProps = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs), 64*1024, 100000, 10000)
producerProps.put("key.serializer.class", classOf[IntEncoder].getName.toString)
producerProps.put("producer.request.required.acks", "-1")
def testHWCheckpointNoFailuresSingleLogSegment {
// start both servers
server1 = TestUtils.createServer(configProps1)
server2 = TestUtils.createServer(configProps2)
servers ++= List(server1, server2)
val producerProps = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs), 64*1024, 100000, 10000)
producerProps.put("producer.request.timeout.ms", "1000")
producerProps.put("producer.request.required.acks", "-1")
producer = new Producer[Int, Message](new ProducerConfig(producerProps))
producer = new Producer[Int, String](new ProducerConfig(producerProps))
// create topic with 1 partition, 2 replicas, one on each broker
CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(_.brokerId).mkString(":"))
@ -92,10 +96,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
server2 = TestUtils.createServer(configProps2)
servers ++= List(server1, server2)
val producerProps = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs), 64*1024, 100000, 10000)
producerProps.put("producer.request.timeout.ms", "1000")
producerProps.put("producer.request.required.acks", "-1")
producer = new Producer[Int, Message](new ProducerConfig(producerProps))
producer = new Producer[Int, String](new ProducerConfig(producerProps))
// create topic with 1 partition, 2 replicas, one on each broker
CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(_.brokerId).mkString(":"))
@ -152,14 +153,6 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
}
def testHWCheckpointNoFailuresMultipleLogSegments {
val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) {
override val replicaMaxLagTimeMs = 5000L
override val replicaMaxLagBytes = 10L
override val flushInterval = 10
override val replicaMinBytes = 20
override val logFileSize = 30
})
// start both servers
server1 = TestUtils.createServer(configs.head)
server2 = TestUtils.createServer(configs.last)
@ -168,10 +161,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
hwFile1 = new HighwaterMarkCheckpoint(server1.config.logDirs(0))
hwFile2 = new HighwaterMarkCheckpoint(server2.config.logDirs(0))
val producerProps = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs), 64*1024, 100000, 10000)
producerProps.put("producer.request.timeout.ms", "1000")
producerProps.put("producer.request.required.acks", "-1")
producer = new Producer[Int, Message](new ProducerConfig(producerProps))
producer = new Producer[Int, String](new ProducerConfig(producerProps))
// create topic with 1 partition, 2 replicas, one on each broker
CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(_.brokerId).mkString(":"))
@ -197,14 +187,6 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
}
def testHWCheckpointWithFailuresMultipleLogSegments {
val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) {
override val replicaMaxLagTimeMs = 5000L
override val replicaMaxLagBytes = 10L
override val flushInterval = 1000
override val replicaMinBytes = 20
override val logFileSize = 30
})
// start both servers
server1 = TestUtils.createServer(configs.head)
server2 = TestUtils.createServer(configs.last)
@ -213,10 +195,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
hwFile1 = new HighwaterMarkCheckpoint(server1.config.logDirs(0))
hwFile2 = new HighwaterMarkCheckpoint(server2.config.logDirs(0))
val producerProps = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs), 64*1024, 100000, 10000)
producerProps.put("producer.request.timeout.ms", "1000")
producerProps.put("producer.request.required.acks", "-1")
producer = new Producer[Int, Message](new ProducerConfig(producerProps))
producer = new Producer[Int, String](new ProducerConfig(producerProps))
// create topic with 1 partition, 2 replicas, one on each broker
CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(_.brokerId).mkString(":"))
@ -268,6 +247,6 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
private def sendMessages(n: Int = 1) {
for(i <- 0 until n)
producer.send(new ProducerData[Int, Message](topic, 0, message))
producer.send(new KeyedMessage[Int, String](topic, 0, message))
}
}

View File

@ -20,7 +20,7 @@ package kafka.server
import org.scalatest.junit.JUnit3Suite
import kafka.zk.ZooKeeperTestHarness
import kafka.utils.TestUtils._
import kafka.producer.ProducerData
import kafka.producer.KeyedMessage
import kafka.serializer.StringEncoder
import kafka.admin.CreateTopicCommand
import kafka.utils.TestUtils
@ -55,9 +55,11 @@ class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness {
}
// send test messages to leader
val producer = TestUtils.createProducer[String, String](TestUtils.getBrokerListStrFromConfigs(configs), new StringEncoder)
producer.send(new ProducerData[String, String](topic1, testMessageList1),
new ProducerData[String, String](topic2, testMessageList2))
val producer = TestUtils.createProducer[String, String](TestUtils.getBrokerListStrFromConfigs(configs),
new StringEncoder(),
new StringEncoder())
val messages = testMessageList1.map(m => new KeyedMessage(topic1, m, m)) ++ testMessageList2.map(m => new KeyedMessage(topic2, m, m))
producer.send(messages:_*)
producer.close()
def logsMatch(): Boolean = {

View File

@ -24,6 +24,7 @@ import kafka.message.{Message, ByteBufferMessageSet}
import org.scalatest.junit.JUnit3Suite
import kafka.zk.ZooKeeperTestHarness
import kafka.producer._
import kafka.utils.IntEncoder
import kafka.utils.TestUtils._
import kafka.admin.CreateTopicCommand
import kafka.api.FetchRequestBuilder
@ -36,19 +37,21 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
val host = "localhost"
val topic = "test"
val sent1 = List(new Message("hello".getBytes()), new Message("there".getBytes()))
val sent2 = List( new Message("more".getBytes()), new Message("messages".getBytes()))
val sent1 = List("hello", "there")
val sent2 = List("more", "messages")
@Test
def testCleanShutdown() {
var server = new KafkaServer(config)
server.startup()
var producer = new Producer[Int, Message](new ProducerConfig(getProducerConfig(TestUtils.getBrokerListStrFromConfigs(Seq(config)), 64*1024, 100000, 10000)))
val producerConfig = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(Seq(config)), 64*1024, 100000, 10000)
producerConfig.put("key.serializer.class", classOf[IntEncoder].getName.toString)
var producer = new Producer[Int, String](new ProducerConfig(producerConfig))
// create topic
CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "0")
// send some messages
producer.send(new ProducerData[Int, Message](topic, 0, sent1))
producer.send(sent1.map(m => new KeyedMessage[Int, String](topic, 0, m)):_*)
// do a clean shutdown and check that the clean shudown file is written out
server.shutdown()
@ -62,7 +65,7 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
server = new KafkaServer(config)
server.startup()
producer = new Producer[Int, Message](new ProducerConfig(getProducerConfig(TestUtils.getBrokerListStrFromConfigs(Seq(config)), 64*1024, 100000, 10000)))
producer = new Producer[Int, String](new ProducerConfig(producerConfig))
val consumer = new SimpleConsumer(host,
port,
1000000,
@ -75,18 +78,18 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).maxWait(0).build())
fetchedMessage = fetched.messageSet(topic, 0)
}
TestUtils.checkEquals(sent1.iterator, fetchedMessage.map(m => m.message).iterator)
assertEquals(sent1, fetchedMessage.map(m => Utils.readString(m.message.payload)))
val newOffset = fetchedMessage.last.nextOffset
// send some more messages
producer.send(new ProducerData[Int, Message](topic, 0, sent2))
producer.send(sent2.map(m => new KeyedMessage[Int, String](topic, 0, m)):_*)
fetchedMessage = null
while(fetchedMessage == null || fetchedMessage.validBytes == 0) {
val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, newOffset, 10000).build())
fetchedMessage = fetched.messageSet(topic, 0)
}
TestUtils.checkEquals(sent2.iterator, fetchedMessage.map(m => m.message).iterator)
assertEquals(sent2, fetchedMessage.map(m => Utils.readString(m.message.payload)))
consumer.close()
producer.close()

View File

@ -288,13 +288,16 @@ object TestUtils extends Logging {
/**
* Create a producer for the given host and port
*/
def createProducer[K, V](brokerList: String, encoder: Encoder[V] = new DefaultEncoder): Producer[K, V] = {
def createProducer[K, V](brokerList: String,
encoder: Encoder[V] = new DefaultEncoder(),
keyEncoder: Encoder[K] = new DefaultEncoder()): Producer[K, V] = {
val props = new Properties()
props.put("broker.list", brokerList)
props.put("buffer.size", "65536")
props.put("connect.timeout.ms", "100000")
props.put("reconnect.interval", "10000")
props.put("serializer.class", encoder.getClass.getCanonicalName)
props.put("key.serializer.class", keyEncoder.getClass.getCanonicalName)
new Producer[K, V](new ProducerConfig(props))
}
@ -307,6 +310,8 @@ object TestUtils extends Logging {
props.put("buffer.size", bufferSize.toString)
props.put("connect.timeout.ms", connectTimeout.toString)
props.put("reconnect.interval", reconnectInterval.toString)
props.put("producer.request.timeout.ms", 30000.toString)
props.put("serializer.class", classOf[StringEncoder].getName.toString)
props
}
@ -353,11 +358,6 @@ object TestUtils extends Logging {
produceRequest(SyncProducerConfig.DefaultCorrelationId,topic,partition,message)
}
def messagesToSet(messages: Seq[String]): ByteBufferMessageSet = {
val encoder = new StringEncoder
new ByteBufferMessageSet(NoCompressionCodec, messages.map(m => encoder.toMessage(m)): _*)
}
def produceRequest(correlationId: Int, topic: String, partition: Int, message: ByteBufferMessageSet): kafka.api.ProducerRequest = {
produceRequestWithAcks(List(topic), List(partition), message, SyncProducerConfig.DefaultRequiredAcks)
}
@ -490,30 +490,22 @@ object TestZKUtils {
val zookeeperConnect = "127.0.0.1:2182"
}
class StringSerializer extends Encoder[String] {
def toEvent(message: Message):String = message.toString
def toMessage(event: String):Message = new Message(event.getBytes)
def getTopic(event: String): String = event.concat("-topic")
class IntEncoder(props: VerifiableProperties = null) extends Encoder[Int] {
override def toBytes(n: Int) = n.toString.getBytes
}
class NegativePartitioner extends Partitioner[String] {
def partition(data: String, numPartitions: Int): Int = {
-1
}
}
class StaticPartitioner extends Partitioner[String] {
class StaticPartitioner(props: VerifiableProperties = null) extends Partitioner[String] {
def partition(data: String, numPartitions: Int): Int = {
(data.length % numPartitions)
}
}
class HashPartitioner extends Partitioner[String] {
class HashPartitioner(props: VerifiableProperties = null) extends Partitioner[String] {
def partition(data: String, numPartitions: Int): Int = {
(data.hashCode % numPartitions)
}
}
class FixedValuePartitioner extends Partitioner[Int] {
class FixedValuePartitioner(props: VerifiableProperties = null) extends Partitioner[Int] {
def partition(data: Int, numPartitions: Int): Int = data
}

View File

@ -17,6 +17,8 @@
package kafka.utils
import java.util.Arrays
import java.nio.ByteBuffer
import org.apache.log4j.Logger
import org.scalatest.junit.JUnitSuite
import org.junit.Test
@ -52,4 +54,12 @@ class UtilsTest extends JUnitSuite {
assertEquals(1, its.next())
}
@Test
def testReadBytes() {
for(testCase <- List("", "a", "abcd")) {
val bytes = testCase.getBytes
assertTrue(Arrays.equals(bytes, Utils.readBytes(ByteBuffer.wrap(bytes))))
}
}
}

View File

@ -56,10 +56,10 @@ public class Consumer extends Thread
public void run() {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(1));
Map<String, List<KafkaStream<Message>>> consumerMap = consumer.createMessageStreams(topicCountMap);
KafkaStream<Message> stream = consumerMap.get(topic).get(0);
ConsumerIterator<Message> it = stream.iterator();
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while(it.hasNext())
System.out.println(ExampleUtils.getMessage(it.next().message()));
System.out.println(new String(it.next().message()));
}
}

View File

@ -18,7 +18,7 @@ package kafka.examples;
import java.util.Properties;
import kafka.javaapi.producer.ProducerData;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class Producer extends Thread
@ -42,7 +42,7 @@ public class Producer extends Thread
while(true)
{
String messageStr = new String("Message_" + messageNo);
producer.send(new ProducerData<Integer, String>(topic, messageStr));
producer.send(new KeyedMessage<Integer, String>(topic, messageStr));
messageNo++;
}
}

View File

@ -19,6 +19,9 @@ package kafka.examples;
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.javaapi.FetchResponse;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import kafka.javaapi.consumer.SimpleConsumer;
@ -29,9 +32,12 @@ import java.util.Map;
public class SimpleConsumerDemo {
private static void printMessages(ByteBufferMessageSet messageSet) {
for (MessageAndOffset messageAndOffset : messageSet) {
System.out.println(ExampleUtils.getMessage(messageAndOffset.message()));
private static void printMessages(ByteBufferMessageSet messageSet) throws UnsupportedEncodingException {
for(MessageAndOffset messageAndOffset: messageSet) {
ByteBuffer payload = messageAndOffset.message().payload();
byte[] bytes = new byte[payload.limit()];
payload.get(bytes);
System.out.println(new String(bytes, "UTF-8"));
}
}
@ -47,7 +53,7 @@ public class SimpleConsumerDemo {
}
}
public static void main(String[] args) {
public static void main(String[] args) throws Exception {
generateData();
SimpleConsumer simpleConsumer = new SimpleConsumer(KafkaProperties.kafkaServerURL,

View File

@ -52,7 +52,7 @@ object ConsumerPerformance {
val consumerConnector: ConsumerConnector = Consumer.create(config.consumerConfig)
val topicMessageStreams = consumerConnector.createMessageStreams(Predef.Map(config.topic -> config.numThreads))
val topicMessageStreams = consumerConnector.createMessageStreams(Map(config.topic -> config.numThreads))
var threadList = List[ConsumerPerfThread]()
for ((topic, streamList) <- topicMessageStreams)
for (i <- 0 until streamList.length)
@ -140,7 +140,7 @@ object ConsumerPerformance {
val hideHeader = options.has(hideHeaderOpt)
}
class ConsumerPerfThread(threadId: Int, name: String, stream: KafkaStream[Message],
class ConsumerPerfThread(threadId: Int, name: String, stream: KafkaStream[Array[Byte], Array[Byte]],
config:ConsumerPerfConfig, totalMessagesRead: AtomicLong, totalBytesRead: AtomicLong)
extends Thread(name) {
private val shutdownLatch = new CountDownLatch(1)
@ -160,7 +160,7 @@ object ConsumerPerformance {
try {
for (messageAndMetadata <- stream if messagesRead < config.numMessages) {
messagesRead += 1
bytesRead += messageAndMetadata.message.payloadSize
bytesRead += messageAndMetadata.message.length
if (messagesRead % config.reportingInterval == 0) {
if(config.showDetailedStats)

View File

@ -23,6 +23,7 @@ import kafka.producer._
import org.apache.log4j.Logger
import kafka.message.{CompressionCodec, Message}
import java.text.SimpleDateFormat
import kafka.serializer._
import java.util._
import collection.immutable.List
import kafka.utils.{VerifiableProperties, Logging}
@ -201,9 +202,12 @@ object ProducerPerformance extends Logging {
props.put("producer.request.timeout.ms", config.producerRequestTimeoutMs.toString)
props.put("producer.num.retries", config.producerNumRetries.toString)
props.put("producer.retry.backoff.ms", config.producerRetryBackoffMs.toString)
props.put("serializer.class", classOf[DefaultEncoder].getName.toString)
props.put("key.serializer.class", classOf[NullEncoder[Long]].getName.toString)
val producerConfig = new ProducerConfig(props)
val producer = new Producer[Message, Message](producerConfig)
val producer = new Producer[Long, Array[Byte]](producerConfig)
val seqIdNumDigit = 10 // no. of digits for max int value
val messagesPerThread = config.numMessages / config.numThreads
@ -215,7 +219,7 @@ object ProducerPerformance extends Logging {
private val threadIdLabel = "ThreadID"
private val topicLabel = "Topic"
private var leftPaddedSeqId : String = ""
private def generateMessageWithSeqId(topic: String, msgId: Long, msgSize: Int): Message = {
private def generateMessageWithSeqId(topic: String, msgId: Long, msgSize: Int): Array[Byte] = {
// Each thread gets a unique range of sequential no. for its ids.
// Eg. 1000 msg in 10 threads => 100 msg per thread
// thread 0 IDs : 0 ~ 99
@ -233,19 +237,18 @@ object ProducerPerformance extends Logging {
val seqMsgString = String.format("%1$-"+msgSize+"s", msgHeader).replace(' ', 'x')
debug(seqMsgString)
return new Message(seqMsgString.getBytes())
return seqMsgString.getBytes()
}
private def generateProducerData(topic: String, messageId: Long): (ProducerData[Message, Message], Int) = {
private def generateProducerData(topic: String, messageId: Long): (KeyedMessage[Long, Array[Byte]], Int) = {
val msgSize = if(config.isFixSize) config.messageSize else 1 + rand.nextInt(config.messageSize)
val message = if(config.seqIdMode) {
val seqId = config.initialMessageId + (messagesPerThread * threadId) + messageId
generateMessageWithSeqId(topic, seqId, msgSize)
} else {
new Array[Byte](msgSize)
}
else {
new Message(new Array[Byte](msgSize))
}
(new ProducerData[Message, Message](topic, null, message), message.payloadSize)
(new KeyedMessage[Long, Array[Byte]](topic, messageId, message), message.length)
}
override def run {