Fix retry logic for producers; patched by Prashanth Menon; reviewed by Jun Rao, Neha Narkhede; KAFKA-49

git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8@1343255 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Prashanth Menon 2012-05-28 13:45:51 +00:00
parent 23b422bc44
commit 904708fae8
15 changed files with 228 additions and 109 deletions

View File

@ -42,12 +42,6 @@ case class PartitionData(partition: Int, error: Int = ErrorMapping.NoError, init
def this(partition: Int, messages: MessageSet) = this(partition, ErrorMapping.NoError, 0L, messages) def this(partition: Int, messages: MessageSet) = this(partition, ErrorMapping.NoError, 0L, messages)
def translatePartition(topic: String, randomSelector: String => Int): Int = {
if (partition == ProducerRequest.RandomPartition)
return randomSelector(topic)
else
return partition
}
} }
object TopicData { object TopicData {

View File

@ -23,7 +23,6 @@ import kafka.network._
import kafka.utils._ import kafka.utils._
object ProducerRequest { object ProducerRequest {
val RandomPartition = -1
val CurrentVersion: Short = 0 val CurrentVersion: Short = 0
def readFrom(buffer: ByteBuffer): ProducerRequest = { def readFrom(buffer: ByteBuffer): ProducerRequest = {
@ -84,7 +83,7 @@ case class ProducerRequest( versionId: Short,
} }
} }
def sizeInBytes(): Int = { def sizeInBytes: Int = {
var size = 0 var size = 0
//size, request_type_id, version_id, correlation_id, client_id, required_acks, ack_timeout, data.size //size, request_type_id, version_id, correlation_id, client_id, required_acks, ack_timeout, data.size
size = 2 + 4 + 2 + clientId.length + 2 + 4 + 4; size = 2 + 4 + 2 + clientId.length + 2 + 4 + 4;
@ -112,5 +111,4 @@ case class ProducerRequest( versionId: Short,
def topicPartitionCount = data.foldLeft(0)(_ + _.partitionData.length) def topicPartitionCount = data.foldLeft(0)(_ + _.partitionData.length)
def expectResponse = requiredAcks > 0
} }

View File

@ -34,6 +34,8 @@ object ErrorMapping {
val WrongPartitionCode = 3 val WrongPartitionCode = 3
val InvalidFetchSizeCode = 4 val InvalidFetchSizeCode = 4
val InvalidFetchRequestFormatCode = 5 val InvalidFetchRequestFormatCode = 5
val NoLeaderForPartitionCode = 6
val NotLeaderForPartitionCode = 7
private val exceptionToCode = private val exceptionToCode =
Map[Class[Throwable], Int]( Map[Class[Throwable], Int](
@ -41,7 +43,9 @@ object ErrorMapping {
classOf[InvalidMessageException].asInstanceOf[Class[Throwable]] -> InvalidMessageCode, classOf[InvalidMessageException].asInstanceOf[Class[Throwable]] -> InvalidMessageCode,
classOf[InvalidPartitionException].asInstanceOf[Class[Throwable]] -> WrongPartitionCode, classOf[InvalidPartitionException].asInstanceOf[Class[Throwable]] -> WrongPartitionCode,
classOf[InvalidMessageSizeException].asInstanceOf[Class[Throwable]] -> InvalidFetchSizeCode, classOf[InvalidMessageSizeException].asInstanceOf[Class[Throwable]] -> InvalidFetchSizeCode,
classOf[FetchRequestFormatException].asInstanceOf[Class[Throwable]] -> InvalidFetchRequestFormatCode classOf[FetchRequestFormatException].asInstanceOf[Class[Throwable]] -> InvalidFetchRequestFormatCode,
classOf[NoLeaderForPartitionException].asInstanceOf[Class[Throwable]] -> NoLeaderForPartitionCode,
classOf[NotLeaderForPartitionException].asInstanceOf[Class[Throwable]] -> NotLeaderForPartitionCode
).withDefaultValue(UnknownCode) ).withDefaultValue(UnknownCode)
/* invert the mapping */ /* invert the mapping */

View File

@ -0,0 +1,25 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.common
/**
* Exception raised when broker receives a produce message for partition it does not lead
* @param message - A more detailed and descriptive error message
*/
class NotLeaderForPartitionException(message: String) extends Exception(message) {
def this() = this(null)
}

View File

@ -125,9 +125,10 @@ private[kafka] class LogManager(val config: KafkaConfig,
if (topic.length <= 0) if (topic.length <= 0)
throw new InvalidTopicException("topic name can't be empty") throw new InvalidTopicException("topic name can't be empty")
if (partition < 0 || partition >= topicPartitionsMap.getOrElse(topic, numPartitions)) { if (partition < 0 || partition >= topicPartitionsMap.getOrElse(topic, numPartitions)) {
warn("Wrong partition " + partition + " valid partitions (0," + val error = "Wrong partition %d, valid partitions (0, %d)."
(topicPartitionsMap.getOrElse(topic, numPartitions) - 1) + ")") .format(partition, (topicPartitionsMap.getOrElse(topic, numPartitions) - 1))
throw new InvalidPartitionException("wrong partition " + partition) warn(error)
throw new InvalidPartitionException(error)
} }
logs.get(topic) logs.get(topic)
} }

View File

@ -19,13 +19,13 @@ package kafka.producer.async
import kafka.api.{ProducerRequest, TopicData, PartitionData} import kafka.api.{ProducerRequest, TopicData, PartitionData}
import kafka.cluster.Partition import kafka.cluster.Partition
import kafka.common._
import kafka.message.{Message, NoCompressionCodec, ByteBufferMessageSet} import kafka.message.{Message, NoCompressionCodec, ByteBufferMessageSet}
import kafka.producer._ import kafka.producer._
import kafka.serializer.Encoder import kafka.serializer.Encoder
import kafka.utils.{Utils, Logging}
import scala.collection.Map import scala.collection.Map
import scala.collection.mutable.{ListBuffer, HashMap} import scala.collection.mutable.{ListBuffer, HashMap}
import kafka.utils.{Utils, Logging}
import kafka.common.{FailedToSendMessageException, NoLeaderForPartitionException, InvalidPartitionException, NoBrokersForPartitionException}
class DefaultEventHandler[K,V](config: ProducerConfig, // this api is for testing class DefaultEventHandler[K,V](config: ProducerConfig, // this api is for testing
private val partitioner: Partitioner[K], // use the other constructor private val partitioner: Partitioner[K], // use the other constructor
@ -71,8 +71,13 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
.format(partitionAndEvent._1, brokerid, partitionAndEvent._2))) .format(partitionAndEvent._1, brokerid, partitionAndEvent._2)))
val messageSetPerBroker = groupMessagesToSet(eventsPerBrokerMap) val messageSetPerBroker = groupMessagesToSet(eventsPerBrokerMap)
if((brokerid < 0) || (!send(brokerid, messageSetPerBroker))) val failedTopicPartitions = send(brokerid, messageSetPerBroker)
failedProduceRequests.appendAll(eventsPerBrokerMap.map(r => r._2).flatten) for( (topic, partition) <- failedTopicPartitions ) {
eventsPerBrokerMap.get((topic, partition)) match {
case Some(data) => failedProduceRequests.appendAll(data)
case None => // nothing
}
}
} }
} catch { } catch {
case t: Throwable => error("Failed to send messages") case t: Throwable => error("Failed to send messages")
@ -156,31 +161,37 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
* *
* @param brokerId the broker that will receive the request * @param brokerId the broker that will receive the request
* @param messagesPerTopic the messages as a map from (topic, partition) -> messages * @param messagesPerTopic the messages as a map from (topic, partition) -> messages
* @return the set (topic, partitions) messages which incurred an error sending or processing
*/ */
private def send(brokerId: Int, messagesPerTopic: Map[(String, Int), ByteBufferMessageSet]): Boolean = { private def send(brokerId: Int, messagesPerTopic: Map[(String, Int), ByteBufferMessageSet]): Seq[(String, Int)] = {
try { if(brokerId < 0) {
if(brokerId < 0) messagesPerTopic.keys.toSeq
throw new NoLeaderForPartitionException("No leader for some partition(s) on broker %d".format(brokerId)) } else if(messagesPerTopic.size > 0) {
if(messagesPerTopic.size > 0) {
val topics = new HashMap[String, ListBuffer[PartitionData]]() val topics = new HashMap[String, ListBuffer[PartitionData]]()
for( ((topicName, partitionId), messagesSet) <- messagesPerTopic ) { for( ((topicName, partitionId), messagesSet) <- messagesPerTopic ) {
topics.get(topicName) match { val partitionData = topics.getOrElseUpdate(topicName, new ListBuffer[PartitionData]())
case Some(x) => trace("found " + topicName) partitionData.append(new PartitionData(partitionId, messagesSet))
case None => topics += topicName -> new ListBuffer[PartitionData]() //create a new listbuffer for this topic
} }
topics(topicName).append(new PartitionData(partitionId, messagesSet)) val topicData = topics.map(kv => new TopicData(kv._1, kv._2.toArray)).toArray
} val producerRequest = new ProducerRequest(config.correlationId, config.clientId, config.requiredAcks, config.ackTimeout, topicData)
val topicData = topics.map(kv => new TopicData(kv._1, kv._2.toArray)) try {
val producerRequest = new ProducerRequest(config.correlationId, config.clientId, config.requiredAcks, config.ackTimeout, topicData.toArray)
val syncProducer = producerPool.getProducer(brokerId) val syncProducer = producerPool.getProducer(brokerId)
val response = syncProducer.send(producerRequest) val response = syncProducer.send(producerRequest)
// TODO: possibly send response to response callback handler trace("producer sent messages for topics %s to broker %d on %s:%d"
trace("kafka producer sent messages for topics %s to broker %d on %s:%d"
.format(messagesPerTopic, brokerId, syncProducer.config.host, syncProducer.config.port)) .format(messagesPerTopic, brokerId, syncProducer.config.host, syncProducer.config.port))
var msgIdx = -1
val errors = new ListBuffer[(String, Int)]
for( topic <- topicData; partition <- topic.partitionData ) {
msgIdx += 1
if(msgIdx > response.errors.size || response.errors(msgIdx) != ErrorMapping.NoError)
errors.append((topic.topic, partition.partition))
} }
true errors
} catch { } catch {
case t: Throwable => false case e => messagesPerTopic.keys.toSeq
}
} else {
List.empty
} }
} }

View File

@ -22,13 +22,13 @@ import java.lang.IllegalStateException
import java.util.concurrent.atomic._ import java.util.concurrent.atomic._
import kafka.admin.{CreateTopicCommand, AdminUtils} import kafka.admin.{CreateTopicCommand, AdminUtils}
import kafka.api._ import kafka.api._
import kafka.common._
import kafka.log._ import kafka.log._
import kafka.message._ import kafka.message._
import kafka.network._ import kafka.network._
import kafka.utils.{SystemTime, Logging}
import org.apache.log4j.Logger import org.apache.log4j.Logger
import scala.collection._ import scala.collection._
import kafka.utils.{SystemTime, Logging}
import kafka.common._
import scala.math._ import scala.math._
/** /**
@ -92,18 +92,17 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
for(topicData <- request.data) { for(topicData <- request.data) {
for(partitionData <- topicData.partitionData) { for(partitionData <- topicData.partitionData) {
msgIndex += 1 msgIndex += 1
val partition = partitionData.translatePartition(topicData.topic, logManager.chooseRandomPartition)
try { try {
// TODO: need to handle ack's here! Will probably move to another method. // TODO: need to handle ack's here! Will probably move to another method.
kafkaZookeeper.ensurePartitionOnThisBroker(topicData.topic, partition) kafkaZookeeper.ensurePartitionLeaderOnThisBroker(topicData.topic, partitionData.partition)
val log = logManager.getOrCreateLog(topicData.topic, partition) val log = logManager.getOrCreateLog(topicData.topic, partitionData.partition)
log.append(partitionData.messages) log.append(partitionData.messages)
offsets(msgIndex) = log.nextAppendOffset offsets(msgIndex) = log.nextAppendOffset
errors(msgIndex) = ErrorMapping.NoError.toShort errors(msgIndex) = ErrorMapping.NoError.toShort
trace(partitionData.messages.sizeInBytes + " bytes written to logs.") trace(partitionData.messages.sizeInBytes + " bytes written to logs.")
} catch { } catch {
case e => case e =>
error("Error processing ProducerRequest on " + topicData.topic + ":" + partition, e) error("Error processing ProducerRequest on " + topicData.topic + ":" + partitionData.partition, e)
e match { e match {
case _: IOException => case _: IOException =>
fatal("Halting due to unrecoverable I/O error while handling producer request: " + e.getMessage, e) fatal("Halting due to unrecoverable I/O error while handling producer request: " + e.getMessage, e)

View File

@ -17,14 +17,14 @@
package kafka.server package kafka.server
import java.lang.{Thread, IllegalStateException}
import java.net.InetAddress
import kafka.admin.AdminUtils
import kafka.cluster.Replica
import kafka.common.{NoLeaderForPartitionException, NotLeaderForPartitionException, KafkaZookeeperClient}
import kafka.utils._ import kafka.utils._
import org.apache.zookeeper.Watcher.Event.KeeperState import org.apache.zookeeper.Watcher.Event.KeeperState
import java.net.InetAddress
import kafka.common.{InvalidPartitionException, KafkaZookeeperClient}
import kafka.cluster.Replica
import org.I0Itec.zkclient.{IZkDataListener, IZkChildListener, IZkStateListener, ZkClient} import org.I0Itec.zkclient.{IZkDataListener, IZkChildListener, IZkStateListener, ZkClient}
import kafka.admin.AdminUtils
import java.lang.{Thread, IllegalStateException}
/** /**
* Handles the server's interaction with zookeeper. The server needs to register the following paths: * Handles the server's interaction with zookeeper. The server needs to register the following paths:
@ -108,10 +108,15 @@ class KafkaZooKeeper(config: KafkaConfig,
} }
} }
def ensurePartitionOnThisBroker(topic: String, partition: Int) { def ensurePartitionLeaderOnThisBroker(topic: String, partition: Int) {
if(!ZkUtils.isPartitionOnBroker(zkClient, topic, partition, config.brokerId)) ZkUtils.getLeaderForPartition(zkClient, topic, partition) match {
throw new InvalidPartitionException("Broker %d does not host partition %d for topic %s". case Some(leader) =>
format(config.brokerId, partition, topic)) if(leader != config.brokerId)
throw new NotLeaderForPartitionException("Broker %d is not leader for partition %d for topic %s"
.format(config.brokerId, partition, topic))
case None =>
throw new NoLeaderForPartitionException("There is no leader for topic %s partition %d".format(topic, partition))
}
} }
def getZookeeperClient = zkClient def getZookeeperClient = zkClient

View File

@ -23,7 +23,7 @@ import collection.mutable
class ReplicaManager(config: KafkaConfig) extends Logging { class ReplicaManager(config: KafkaConfig) extends Logging {
private var replicas: mutable.Map[(String, Int), Replica] = new mutable.HashMap[(String, Int), Replica]() private val replicas = new mutable.HashMap[(String, Int), Replica]()
def addLocalReplica(topic: String, partitionId: Int, log: Log): Replica = { def addLocalReplica(topic: String, partitionId: Int, log: Log): Replica = {
val replica = replicas.get((topic, partitionId)) val replica = replicas.get((topic, partitionId))
@ -37,7 +37,7 @@ class ReplicaManager(config: KafkaConfig) extends Logging {
case None => case None =>
val partition = new Partition(topic, partitionId) val partition = new Partition(topic, partitionId)
val replica = new Replica(config.brokerId, partition, topic, Some(log), log.getHighwaterMark, log.maxSize, true) val replica = new Replica(config.brokerId, partition, topic, Some(log), log.getHighwaterMark, log.maxSize, true)
replicas += (topic, partitionId) -> replica replicas.put((topic, partitionId), replica)
info("Added local replica for topic %s partition %s on broker %d" info("Added local replica for topic %s partition %s on broker %d"
.format(replica.topic, replica.partition.partId, replica.brokerId)) .format(replica.topic, replica.partition.partId, replica.brokerId))
} }
@ -51,7 +51,7 @@ class ReplicaManager(config: KafkaConfig) extends Logging {
case None => case None =>
val partition = new Partition(topic, partitionId) val partition = new Partition(topic, partitionId)
val replica = new Replica(config.brokerId, partition, topic, None, -1, -1, false) val replica = new Replica(config.brokerId, partition, topic, None, -1, -1, false)
replicas += (topic, partitionId) -> replica replicas.put((topic, partitionId), replica)
info("Added remote replica for topic %s partition %s on broker %d" info("Added remote replica for topic %s partition %s on broker %d"
.format(replica.topic, replica.partition.partId, replica.brokerId)) .format(replica.topic, replica.partition.partId, replica.brokerId))
} }

View File

@ -90,6 +90,10 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
zkConsumerConnector0.shutdown zkConsumerConnector0.shutdown
// wait to make sure the topic and partition have a leader for the successful case
waitUntilLeaderIsElected(zkClient, topic, 0, 500)
waitUntilLeaderIsElected(zkClient, topic, 1, 500)
// send some messages to each broker // send some messages to each broker
val sentMessages1_1 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages) val sentMessages1_1 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages)
val sentMessages1_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages) val sentMessages1_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages)
@ -101,9 +105,6 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true) val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Predef.Map(topic -> 1)) val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Predef.Map(topic -> 1))
waitUntilLeaderIsElected(zkClient, topic, 0, 500)
waitUntilLeaderIsElected(zkClient, topic, 1, 500)
val receivedMessages1 = getMessages(nMessages*2, topicMessageStreams1) val receivedMessages1 = getMessages(nMessages*2, topicMessageStreams1)
assertEquals(sentMessages1.size, receivedMessages1.size) assertEquals(sentMessages1.size, receivedMessages1.size)
assertEquals(sentMessages1, receivedMessages1) assertEquals(sentMessages1, receivedMessages1)

View File

@ -17,25 +17,28 @@
package kafka.integration package kafka.integration
import java.io.File
import java.nio.ByteBuffer import java.nio.ByteBuffer
import java.util.Properties
import junit.framework.Assert._ import junit.framework.Assert._
import kafka.admin.CreateTopicCommand
import kafka.api.{OffsetDetail, FetchRequest, FetchRequestBuilder} import kafka.api.{OffsetDetail, FetchRequest, FetchRequestBuilder}
import kafka.common.{FetchRequestFormatException, OffsetOutOfRangeException, InvalidPartitionException} import kafka.common.{FetchRequestFormatException, OffsetOutOfRangeException, InvalidPartitionException}
import kafka.server.{KafkaRequestHandler, KafkaConfig} import kafka.message.Message
import org.apache.log4j.{Level, Logger}
import org.scalatest.junit.JUnit3Suite
import java.util.Properties
import kafka.producer.{ProducerData, Producer, ProducerConfig} import kafka.producer.{ProducerData, Producer, ProducerConfig}
import kafka.serializer.StringDecoder import kafka.serializer.StringDecoder
import kafka.message.Message import kafka.server.{KafkaRequestHandler, KafkaConfig}
import java.io.File
import kafka.utils.{TestZKUtils, TestUtils} import kafka.utils.{TestZKUtils, TestUtils}
import org.apache.log4j.{Level, Logger}
import org.I0Itec.zkclient.ZkClient
import kafka.zk.ZooKeeperTestHarness
import org.scalatest.junit.JUnit3Suite
import scala.collection._ import scala.collection._
/** /**
* End to end tests of the primitive apis against a local server * End to end tests of the primitive apis against a local server
*/ */
class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness { class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with ZooKeeperTestHarness {
val port = TestUtils.choosePort val port = TestUtils.choosePort
val props = TestUtils.createBrokerConfig(0, port) val props = TestUtils.createBrokerConfig(0, port)
@ -142,6 +145,8 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness {
} }
def testProduceAndMultiFetch() { def testProduceAndMultiFetch() {
createSimpleTopicsAndAwaitLeader(zkClient, List("test1", "test2", "test3", "test4"), config.brokerId)
// send some messages // send some messages
val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0)); val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
{ {
@ -207,6 +212,8 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness {
} }
def testProduceAndMultiFetchWithCompression() { def testProduceAndMultiFetchWithCompression() {
createSimpleTopicsAndAwaitLeader(zkClient, List("test1", "test2", "test3", "test4"), config.brokerId)
// send some messages // send some messages
val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0)); val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
{ {
@ -272,6 +279,8 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness {
} }
def testMultiProduce() { def testMultiProduce() {
createSimpleTopicsAndAwaitLeader(zkClient, List("test1", "test2", "test3", "test4"), config.brokerId)
// send some messages // send some messages
val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0)); val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
val messages = new mutable.HashMap[String, Seq[Message]] val messages = new mutable.HashMap[String, Seq[Message]]
@ -328,4 +337,15 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness {
val logFile = new File(config.logDir, newTopic + "-0") val logFile = new File(config.logDir, newTopic + "-0")
assertTrue(!logFile.exists) assertTrue(!logFile.exists)
} }
/**
* For testing purposes, just create these topics each with one partition and one replica for
* which the provided broker should the leader for. Create and wait for broker to lead. Simple.
*/
def createSimpleTopicsAndAwaitLeader(zkClient: ZkClient, topics: Seq[String], brokerId: Int) {
for( topic <- topics ) {
CreateTopicCommand.createTopic(zkClient, topic, 1, 1, brokerId.toString)
TestUtils.waitUntilLeaderIsElected(zkClient, topic, 0, 500)
}
}
} }

View File

@ -24,17 +24,17 @@ import org.easymock.EasyMock
import org.junit.Test import org.junit.Test
import kafka.api._ import kafka.api._
import kafka.cluster.Broker import kafka.cluster.Broker
import kafka.common._
import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message} import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
import kafka.producer.async._ import kafka.producer.async._
import kafka.serializer.{StringEncoder, StringDecoder, Encoder} import kafka.serializer.{StringEncoder, StringDecoder, Encoder}
import kafka.server.KafkaConfig import kafka.server.KafkaConfig
import kafka.utils.{FixedValuePartitioner, NegativePartitioner, TestZKUtils, TestUtils}
import kafka.utils.TestUtils._ import kafka.utils.TestUtils._
import kafka.zk.ZooKeeperTestHarness import kafka.zk.ZooKeeperTestHarness
import collection.Map
import collection.mutable.ListBuffer
import org.scalatest.junit.JUnit3Suite import org.scalatest.junit.JUnit3Suite
import kafka.utils.{NegativePartitioner, TestZKUtils, TestUtils} import scala.collection.Map
import kafka.common.{NoBrokersForPartitionException, InvalidPartitionException, InvalidConfigException, QueueFullException} import scala.collection.mutable.ListBuffer
class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness { class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
val props = createBrokerConfigs(1) val props = createBrokerConfigs(1)
@ -200,7 +200,7 @@ class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
val handler = new DefaultEventHandler[Int,String](config, val handler = new DefaultEventHandler[Int,String](config,
partitioner = intPartitioner, partitioner = intPartitioner,
encoder = null.asInstanceOf[Encoder[String]], encoder = null.asInstanceOf[Encoder[String]],
producerPool) producerPool = producerPool)
val topic1Broker1Data = new ListBuffer[ProducerData[Int,Message]] val topic1Broker1Data = new ListBuffer[ProducerData[Int,Message]]
@ -234,14 +234,14 @@ class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
props.put("zk.connect", zkConnect) props.put("zk.connect", zkConnect)
val config = new ProducerConfig(props) val config = new ProducerConfig(props)
// form expected partitions metadata // form expected partitions metadata
val topic1Metadata = getTopicMetadata("topic1", 0, "localhost", 9092) val topic1Metadata = getTopicMetadata("topic1", 0, 0, "localhost", 9092)
val syncProducer = getSyncProducer(List("topic1"), List(topic1Metadata)) val syncProducer = getSyncProducer(List("topic1"), List(topic1Metadata))
val producerPool = getMockProducerPool(config, syncProducer) val producerPool = getMockProducerPool(config, syncProducer)
val handler = new DefaultEventHandler[String,String](config, val handler = new DefaultEventHandler[String,String](config,
partitioner = null.asInstanceOf[Partitioner[String]], partitioner = null.asInstanceOf[Partitioner[String]],
encoder = new StringEncoder, encoder = new StringEncoder,
producerPool) producerPool = producerPool)
val serializedData = handler.serialize(produceData) val serializedData = handler.serialize(produceData)
val decoder = new StringDecoder val decoder = new StringDecoder
@ -258,7 +258,7 @@ class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
val config = new ProducerConfig(props) val config = new ProducerConfig(props)
// form expected partitions metadata // form expected partitions metadata
val topic1Metadata = getTopicMetadata("topic1", 0, "localhost", 9092) val topic1Metadata = getTopicMetadata("topic1", 0, 0, "localhost", 9092)
val syncProducer = getSyncProducer(List("topic1"), List(topic1Metadata)) val syncProducer = getSyncProducer(List("topic1"), List(topic1Metadata))
@ -267,7 +267,7 @@ class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
val handler = new DefaultEventHandler[String,String](config, val handler = new DefaultEventHandler[String,String](config,
partitioner = new NegativePartitioner, partitioner = new NegativePartitioner,
encoder = null.asInstanceOf[Encoder[String]], encoder = null.asInstanceOf[Encoder[String]],
producerPool) producerPool = producerPool)
try { try {
handler.partitionAndCollate(producerDataList) handler.partitionAndCollate(producerDataList)
fail("Should fail with InvalidPartitionException") fail("Should fail with InvalidPartitionException")
@ -297,7 +297,7 @@ class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
val handler = new DefaultEventHandler[String,String](config, val handler = new DefaultEventHandler[String,String](config,
partitioner = null.asInstanceOf[Partitioner[String]], partitioner = null.asInstanceOf[Partitioner[String]],
encoder = new StringEncoder, encoder = new StringEncoder,
producerPool) producerPool = producerPool)
try { try {
handler.handle(producerDataList) handler.handle(producerDataList)
fail("Should fail with NoBrokersForPartitionException") fail("Should fail with NoBrokersForPartitionException")
@ -333,8 +333,8 @@ class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
val config = new ProducerConfig(props) val config = new ProducerConfig(props)
// create topic metadata with 0 partitions // create topic metadata with 0 partitions
val topic1Metadata = getTopicMetadata("topic1", 0, "localhost", 9092) val topic1Metadata = getTopicMetadata("topic1", 0, 0, "localhost", 9092)
val topic2Metadata = getTopicMetadata("topic2", 0, "localhost", 9092) val topic2Metadata = getTopicMetadata("topic2", 0, 0, "localhost", 9092)
val syncProducer = getSyncProducer(List("topic1", "topic2"), List(topic1Metadata, topic2Metadata)) val syncProducer = getSyncProducer(List("topic1", "topic2"), List(topic1Metadata, topic2Metadata))
@ -350,7 +350,7 @@ class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
val handler = new DefaultEventHandler[String,String](config, val handler = new DefaultEventHandler[String,String](config,
partitioner = null.asInstanceOf[Partitioner[String]], partitioner = null.asInstanceOf[Partitioner[String]],
encoder = null.asInstanceOf[Encoder[String]], encoder = null.asInstanceOf[Encoder[String]],
producerPool) producerPool = producerPool)
val producerDataList = new ListBuffer[ProducerData[String,Message]] val producerDataList = new ListBuffer[ProducerData[String,Message]]
producerDataList.append(new ProducerData[String,Message]("topic1", new Message("msg1".getBytes))) producerDataList.append(new ProducerData[String,Message]("topic1", new Message("msg1".getBytes)))
producerDataList.append(new ProducerData[String,Message]("topic2", new Message("msg2".getBytes))) producerDataList.append(new ProducerData[String,Message]("topic2", new Message("msg2".getBytes)))
@ -375,16 +375,16 @@ class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
val config = new ProducerConfig(props) val config = new ProducerConfig(props)
val topic = "topic1" val topic = "topic1"
val topic1Metadata = getTopicMetadata(topic, 0, "localhost", 9092) val topic1Metadata = getTopicMetadata(topic, 0, 0, "localhost", 9092)
val msgs = TestUtils.getMsgStrings(10) val msgs = TestUtils.getMsgStrings(10)
val mockSyncProducer = EasyMock.createMock(classOf[SyncProducer]) val mockSyncProducer = EasyMock.createMock(classOf[SyncProducer])
mockSyncProducer.send(new TopicMetadataRequest(List(topic))) mockSyncProducer.send(new TopicMetadataRequest(List(topic)))
EasyMock.expectLastCall().andReturn(List(topic1Metadata)) EasyMock.expectLastCall().andReturn(List(topic1Metadata))
mockSyncProducer.send(TestUtils.produceRequest(topic, 0, messagesToSet(msgs.take(5)))) mockSyncProducer.send(TestUtils.produceRequest(topic, 0, messagesToSet(msgs.take(5))))
EasyMock.expectLastCall().andReturn(null) EasyMock.expectLastCall().andReturn(new ProducerResponse(ProducerResponse.CurrentVersion, 0, Array(0.toShort), Array(0L)))
mockSyncProducer.send(TestUtils.produceRequest(topic, 0, messagesToSet(msgs.takeRight(5)))) mockSyncProducer.send(TestUtils.produceRequest(topic, 0, messagesToSet(msgs.takeRight(5))))
EasyMock.expectLastCall().andReturn(null) EasyMock.expectLastCall().andReturn(new ProducerResponse(ProducerResponse.CurrentVersion, 0, Array(0.toShort), Array(0L)))
EasyMock.replay(mockSyncProducer) EasyMock.replay(mockSyncProducer)
val producerPool = EasyMock.createMock(classOf[ProducerPool]) val producerPool = EasyMock.createMock(classOf[ProducerPool])
@ -419,6 +419,70 @@ class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
EasyMock.verify(producerPool) EasyMock.verify(producerPool)
} }
@Test
def testFailedSendRetryLogic() {
val props = new Properties()
props.put("serializer.class", "kafka.serializer.StringEncoder")
props.put("zk.connect", TestZKUtils.zookeeperConnect)
val config = new ProducerConfig(props)
val topic1 = "topic1"
val topic1Metadata = getTopicMetadata(topic1, Array(0, 1), 0, "localhost", 9092)
val msgs = TestUtils.getMsgStrings(2)
// producer used to return topic metadata
val metadataSyncProducer = EasyMock.createMock(classOf[SyncProducer])
metadataSyncProducer.send(new TopicMetadataRequest(List(topic1)))
EasyMock.expectLastCall().andReturn(List(topic1Metadata)).times(3)
EasyMock.replay(metadataSyncProducer)
// produce request for topic1 and partitions 0 and 1. Let the first request fail
// entirely. The second request will succeed for partition 1 but fail for partition 0.
// On the third try for partition 0, let it succeed.
val request1 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), 0)
val response1 =
new ProducerResponse(ProducerResponse.CurrentVersion, 0, Array(ErrorMapping.NotLeaderForPartitionCode.toShort, 0.toShort), Array(0L, 0L))
val request2 = TestUtils.produceRequest(topic1, 0, messagesToSet(msgs))
val response2 = new ProducerResponse(ProducerResponse.CurrentVersion, 0, Array(0.toShort), Array(0L))
val mockSyncProducer = EasyMock.createMock(classOf[SyncProducer])
EasyMock.expect(mockSyncProducer.send(request1)).andThrow(new RuntimeException) // simulate SocketTimeoutException
EasyMock.expect(mockSyncProducer.send(request1)).andReturn(response1)
EasyMock.expect(mockSyncProducer.send(request2)).andReturn(response2)
EasyMock.replay(mockSyncProducer)
val producerPool = EasyMock.createMock(classOf[ProducerPool])
EasyMock.expect(producerPool.getZkClient).andReturn(zkClient)
EasyMock.expect(producerPool.addProducers(config))
EasyMock.expect(producerPool.getAnyProducer).andReturn(metadataSyncProducer)
EasyMock.expect(producerPool.getProducer(0)).andReturn(mockSyncProducer)
EasyMock.expect(producerPool.getAnyProducer).andReturn(metadataSyncProducer)
EasyMock.expect(producerPool.getProducer(0)).andReturn(mockSyncProducer)
EasyMock.expect(producerPool.getAnyProducer).andReturn(metadataSyncProducer)
EasyMock.expect(producerPool.getProducer(0)).andReturn(mockSyncProducer)
EasyMock.expect(producerPool.close())
EasyMock.replay(producerPool)
val handler = new DefaultEventHandler[Int,String](config,
partitioner = new FixedValuePartitioner(),
encoder = new StringEncoder,
producerPool = producerPool)
try {
val data = List(
new ProducerData[Int,String](topic1, 0, msgs),
new ProducerData[Int,String](topic1, 1, msgs)
)
handler.handle(data)
handler.close()
} catch {
case e: Exception => fail("Not expected", e)
}
EasyMock.verify(metadataSyncProducer)
EasyMock.verify(mockSyncProducer)
EasyMock.verify(producerPool)
}
@Test @Test
def testJavaProducer() { def testJavaProducer() {
val topic = "topic1" val topic = "topic1"
@ -488,10 +552,13 @@ class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
producerPool producerPool
} }
private def getTopicMetadata(topic: String, brokerId: Int, brokerHost: String, brokerPort: Int): TopicMetadata = { private def getTopicMetadata(topic: String, partition: Int, brokerId: Int, brokerHost: String, brokerPort: Int): TopicMetadata = {
getTopicMetadata(topic, List(partition), brokerId, brokerHost, brokerPort)
}
private def getTopicMetadata(topic: String, partition: Seq[Int], brokerId: Int, brokerHost: String, brokerPort: Int): TopicMetadata = {
val broker1 = new Broker(brokerId, brokerHost, brokerHost, brokerPort) val broker1 = new Broker(brokerId, brokerHost, brokerHost, brokerPort)
val partition1Metadata = new PartitionMetadata(brokerId, Some(broker1), List(broker1)) new TopicMetadata(topic, partition.map(new PartitionMetadata(_, Some(broker1), List(broker1))))
new TopicMetadata(topic, List(partition1Metadata))
} }
class MockProducer(override val config: SyncProducerConfig) extends SyncProducer(config) { class MockProducer(override val config: SyncProducerConfig) extends SyncProducer(config) {

View File

@ -17,6 +17,7 @@
package kafka.producer package kafka.producer
import java.net.SocketTimeoutException
import java.util.Properties import java.util.Properties
import junit.framework.Assert import junit.framework.Assert
import kafka.admin.CreateTopicCommand import kafka.admin.CreateTopicCommand
@ -27,7 +28,6 @@ import kafka.server.KafkaConfig
import kafka.utils.{TestZKUtils, SystemTime, TestUtils} import kafka.utils.{TestZKUtils, SystemTime, TestUtils}
import org.junit.Test import org.junit.Test
import org.scalatest.junit.JUnit3Suite import org.scalatest.junit.JUnit3Suite
import java.net.SocketTimeoutException
class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
private var messageBytes = new Array[Byte](2); private var messageBytes = new Array[Byte](2);
@ -92,7 +92,6 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
@Test @Test
def testProduceCorrectlyReceivesResponse() { def testProduceCorrectlyReceivesResponse() {
// TODO: this will need to change with kafka-44
val server = servers.head val server = servers.head
val props = new Properties() val props = new Properties()
props.put("host", "localhost") props.put("host", "localhost")
@ -106,21 +105,25 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message(messageBytes)) val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message(messageBytes))
// #1 - test that we get an error when partition does not belong to broker in response // #1 - test that we get an error when partition does not belong to broker in response
val request = TestUtils.produceRequestWithAcks(Array("topic1", "topic2", "topic3"), Array(0), messages) val request = TestUtils.produceRequestWithAcks(Array("topic1", "topic2", "topic3"), Array(0), messages, 1)
val response = producer.send(request) val response = producer.send(request)
Assert.assertNotNull(response)
Assert.assertEquals(request.correlationId, response.correlationId) Assert.assertEquals(request.correlationId, response.correlationId)
Assert.assertEquals(response.errors.length, response.offsets.length) Assert.assertEquals(response.errors.length, response.offsets.length)
Assert.assertEquals(3, response.errors.length) Assert.assertEquals(3, response.errors.length)
response.errors.foreach(Assert.assertEquals(ErrorMapping.WrongPartitionCode.toShort, _)) response.errors.foreach(Assert.assertEquals(ErrorMapping.NoLeaderForPartitionCode.toShort, _))
response.offsets.foreach(Assert.assertEquals(-1L, _)) response.offsets.foreach(Assert.assertEquals(-1L, _))
// #2 - test that we get correct offsets when partition is owner by broker // #2 - test that we get correct offsets when partition is owned by broker
CreateTopicCommand.createTopic(zkClient, "topic1", 1, 1) CreateTopicCommand.createTopic(zkClient, "topic1", 1, 1)
TestUtils.waitUntilLeaderIsElected(zkClient, "topic1", 0, 500)
CreateTopicCommand.createTopic(zkClient, "topic3", 1, 1) CreateTopicCommand.createTopic(zkClient, "topic3", 1, 1)
TestUtils.waitUntilLeaderIsElected(zkClient, "topic3", 0, 500)
Thread.sleep(500) Thread.sleep(500)
val response2 = producer.send(request) val response2 = producer.send(request)
Assert.assertNotNull(response2)
Assert.assertEquals(request.correlationId, response2.correlationId) Assert.assertEquals(request.correlationId, response2.correlationId)
Assert.assertEquals(response2.errors.length, response2.offsets.length) Assert.assertEquals(response2.errors.length, response2.offsets.length)
Assert.assertEquals(3, response2.errors.length) Assert.assertEquals(3, response2.errors.length)
@ -132,7 +135,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
Assert.assertEquals(messages.sizeInBytes, response2.offsets(2)) Assert.assertEquals(messages.sizeInBytes, response2.offsets(2))
// the middle message should have been rejected because broker doesn't lead partition // the middle message should have been rejected because broker doesn't lead partition
Assert.assertEquals(ErrorMapping.WrongPartitionCode.toShort, response2.errors(1)) Assert.assertEquals(ErrorMapping.NoLeaderForPartitionCode.toShort, response2.errors(1))
Assert.assertEquals(-1, response2.offsets(1)) Assert.assertEquals(-1, response2.offsets(1))
} }

View File

@ -27,8 +27,8 @@ import kafka.utils.TestUtils
class RequestPurgatoryTest { class RequestPurgatoryTest {
val producerRequest1 = TestUtils.produceRequest("test", new ByteBufferMessageSet(new Message("hello1".getBytes))) val producerRequest1 = TestUtils.produceRequest("test", 0, new ByteBufferMessageSet(new Message("hello1".getBytes)))
val producerRequest2 = TestUtils.produceRequest("test", new ByteBufferMessageSet(new Message("hello2".getBytes))) val producerRequest2 = TestUtils.produceRequest("test", 0, new ByteBufferMessageSet(new Message("hello2".getBytes)))
var purgatory: MockRequestPurgatory = null var purgatory: MockRequestPurgatory = null
@Before @Before

View File

@ -343,29 +343,20 @@ object TestUtils extends Logging {
/** /**
* Create a wired format request based on simple basic information * Create a wired format request based on simple basic information
*/ */
def produceRequest(topic: String, message: ByteBufferMessageSet): kafka.api.ProducerRequest = {
produceRequest(SyncProducerConfig.DefaultCorrelationId,topic,ProducerRequest.RandomPartition,message)
}
def produceRequest(topic: String, partition: Int, message: ByteBufferMessageSet): kafka.api.ProducerRequest = { def produceRequest(topic: String, partition: Int, message: ByteBufferMessageSet): kafka.api.ProducerRequest = {
produceRequest(SyncProducerConfig.DefaultCorrelationId,topic,partition,message) produceRequest(SyncProducerConfig.DefaultCorrelationId,topic,partition,message)
} }
def produceRequestWithAcks(topics: Seq[String], partitions: Seq[Int], message: ByteBufferMessageSet): kafka.api.ProducerRequest = { def produceRequest(correlationId: Int, topic: String, partition: Int, message: ByteBufferMessageSet): kafka.api.ProducerRequest = {
val correlationId = SyncProducerConfig.DefaultCorrelationId produceRequestWithAcks(List(topic), List(partition), message, SyncProducerConfig.DefaultRequiredAcks)
val clientId = SyncProducerConfig.DefaultClientId
val requiredAcks: Short = 1.toShort
val ackTimeout = SyncProducerConfig.DefaultAckTimeoutMs
val data = topics.map(new TopicData(_, partitions.map(new PartitionData(_, message)).toArray))
new kafka.api.ProducerRequest(correlationId, clientId, requiredAcks, ackTimeout, data.toArray)
} }
def produceRequest(correlationId: Int, topic: String, partition: Int, message: ByteBufferMessageSet): kafka.api.ProducerRequest = { def produceRequestWithAcks(topics: Seq[String], partitions: Seq[Int], message: ByteBufferMessageSet, acks: Int): kafka.api.ProducerRequest = {
val correlationId = SyncProducerConfig.DefaultCorrelationId
val clientId = SyncProducerConfig.DefaultClientId val clientId = SyncProducerConfig.DefaultClientId
val requiredAcks: Short = SyncProducerConfig.DefaultRequiredAcks
val ackTimeout = SyncProducerConfig.DefaultAckTimeoutMs val ackTimeout = SyncProducerConfig.DefaultAckTimeoutMs
var partitionData = Array[PartitionData]( new PartitionData(partition, message) ) val data = topics.map(new TopicData(_, partitions.map(new PartitionData(_, message)).toArray))
var data = Array[TopicData]( new TopicData(topic, partitionData) ) new kafka.api.ProducerRequest(correlationId, clientId, acks.toShort, ackTimeout, data.toArray)
new kafka.api.ProducerRequest(correlationId, clientId, requiredAcks, ackTimeout, data)
} }
def produceJavaRequest(topic: String, message: kafka.javaapi.message.ByteBufferMessageSet): kafka.javaapi.ProducerRequest = { def produceJavaRequest(topic: String, message: kafka.javaapi.message.ByteBufferMessageSet): kafka.javaapi.ProducerRequest = {