KAFKA-1260 Integration Test for New Producer Part II: Broker Failure Handling; reviewed by Jay Kreps, Neha Narkhede and Jun Rao

This commit is contained in:
Guozhang Wang 2014-02-27 10:50:15 -08:00 committed by Neha Narkhede
parent 57be6c81a7
commit 5e2a9a560d
14 changed files with 393 additions and 116 deletions

View File

@ -33,6 +33,7 @@ import org.apache.kafka.common.Metric;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricConfig;
@ -217,10 +218,14 @@ public class KafkaProducer implements Producer {
FutureRecordMetadata future = accumulator.append(tp, record.key(), record.value(), CompressionType.NONE, callback);
this.sender.wakeup();
return future;
} catch (Exception e) {
// For API exceptions return them in the future;
// for other exceptions throw directly
} catch (ApiException e) {
if (callback != null)
callback.onCompletion(null, e);
return new FutureFailure(e);
} catch (InterruptedException e) {
throw new KafkaException(e);
}
}
@ -255,7 +260,6 @@ public class KafkaProducer implements Producer {
*/
@Override
public void close() {
this.accumulator.close();
this.sender.initiateClose();
try {
this.ioThread.join();

View File

@ -108,7 +108,7 @@ public class MockProducer implements Producer {
FutureRecordMetadata future = new FutureRecordMetadata(result, 0);
TopicPartition topicPartition = new TopicPartition(record.topic(), partition);
long offset = nextOffset(topicPartition);
Completion completion = new Completion(topicPartition, offset, new RecordMetadata(topicPartition, offset), result, callback);
Completion completion = new Completion(topicPartition, offset, new RecordMetadata(topicPartition, 0, offset), result, callback);
this.sent.add(record);
if (autoComplete)
completion.complete(null);

View File

@ -26,12 +26,18 @@ public final class RecordMetadata {
private final long offset;
private final TopicPartition topicPartition;
public RecordMetadata(TopicPartition topicPartition, long offset) {
private RecordMetadata(TopicPartition topicPartition, long offset) {
super();
this.offset = offset;
this.topicPartition = topicPartition;
}
public RecordMetadata(TopicPartition topicPartition, long baseOffset, long relativeOffset) {
// ignore the relativeOffset if the base offset is -1,
// since this indicates the offset is unknown
this(topicPartition, baseOffset == -1 ? baseOffset : baseOffset + relativeOffset);
}
/**
* The offset of the record in the topic/partition.
*/

View File

@ -60,7 +60,7 @@ public final class FutureRecordMetadata implements Future<RecordMetadata> {
if (this.result.error() != null)
throw new ExecutionException(this.result.error());
else
return new RecordMetadata(result.topicPartition(), this.result.baseOffset() + this.relativeOffset);
return new RecordMetadata(result.topicPartition(), this.result.baseOffset(), this.relativeOffset);
}
public long relativeOffset() {

View File

@ -74,10 +74,10 @@ public final class Metadata {
*/
public synchronized Cluster fetch(String topic, long maxWaitMs) {
List<PartitionInfo> partitions = null;
long begin = System.currentTimeMillis();
do {
partitions = cluster.partitionsFor(topic);
if (partitions == null) {
long begin = System.currentTimeMillis();
topics.add(topic);
forceUpdate = true;
try {
@ -85,7 +85,7 @@ public final class Metadata {
} catch (InterruptedException e) { /* this is fine, just try again */
}
long ellapsed = System.currentTimeMillis() - begin;
if (ellapsed > maxWaitMs)
if (ellapsed >= maxWaitMs)
throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
} else {
return cluster;

View File

@ -55,7 +55,7 @@ public final class RecordBatch {
this.records.append(0L, key, value, compression);
FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount);
if (callback != null)
thunks.add(new Thunk(callback, this.recordCount));
thunks.add(new Thunk(callback, future));
this.recordCount++;
return future;
}
@ -74,8 +74,7 @@ public final class RecordBatch {
try {
Thunk thunk = this.thunks.get(i);
if (exception == null)
thunk.callback.onCompletion(new RecordMetadata(topicPartition, this.produceFuture.baseOffset() + thunk.relativeOffset),
null);
thunk.callback.onCompletion(thunk.future.get(), null);
else
thunk.callback.onCompletion(null, exception);
} catch (Exception e) {
@ -85,15 +84,15 @@ public final class RecordBatch {
}
/**
* A callback and the associated RecordSend argument to pass to it.
* A callback and the associated FutureRecordMetadata argument to pass to it.
*/
final private static class Thunk {
final Callback callback;
final long relativeOffset;
final FutureRecordMetadata future;
public Thunk(Callback callback, long relativeOffset) {
public Thunk(Callback callback, FutureRecordMetadata future) {
this.callback = callback;
this.relativeOffset = relativeOffset;
this.future = future;
}
}
}

View File

@ -324,6 +324,7 @@ public class Sender implements Runnable {
private void handleDisconnects(List<Integer> disconnects, long now) {
// clear out the in-flight requests for the disconnected broker
for (int node : disconnects) {
nodeStates.disconnected(node);
for (InFlightRequest request : this.inFlightRequests.clearAll(node)) {
if (request.batches != null) {
for (RecordBatch batch : request.batches.values()) {
@ -335,7 +336,6 @@ public class Sender implements Runnable {
}
}
}
nodeStates.disconnected(request.request.destination());
}
}
// we got a disconnect so we should probably refresh our metadata and see if that broker is dead

View File

@ -299,6 +299,7 @@ public class Selector implements Selectable {
Transmissions trans = transmissions(key);
if (trans != null) {
this.disconnected.add(trans.id);
this.keys.remove(trans.id);
trans.clearReceive();
trans.clearSend();
}

View File

@ -41,17 +41,15 @@ import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
public enum Errors {
UNKNOWN(-1, new UnknownServerException("The server experienced an unexpected error when processing the request")),
NONE(0, null),
OFFSET_OUT_OF_RANGE(1,
new OffsetOutOfRangeException("The requested offset is not within the range of offsets maintained by the server.")),
CORRUPT_MESSAGE(2,
new CorruptRecordException("The message contents does not match the message CRC or the message is otherwise corrupt.")),
OFFSET_OUT_OF_RANGE(1, new OffsetOutOfRangeException("The requested offset is not within the range of offsets maintained by the server.")),
CORRUPT_MESSAGE(2, new CorruptRecordException("The message contents does not match the message CRC or the message is otherwise corrupt.")),
UNKNOWN_TOPIC_OR_PARTITION(3, new UnknownTopicOrPartitionException("This server does not host this topic-partition.")),
LEADER_NOT_AVAILABLE(5,
new LeaderNotAvailableException("There is no leader for this topic-partition as we are in the middle of a leadership election.")),
// TODO: errorCode 4 for InvalidFetchSize
LEADER_NOT_AVAILABLE(5, new LeaderNotAvailableException("There is no leader for this topic-partition as we are in the middle of a leadership election.")),
NOT_LEADER_FOR_PARTITION(6, new NotLeaderForPartitionException("This server is not the leader for that topic-partition.")),
REQUEST_TIMED_OUT(7, new TimeoutException("The request timed out.")),
MESSAGE_TOO_LARGE(10,
new RecordTooLargeException("The request included a message larger than the max message size the server will accept.")),
// TODO: errorCode 8, 9, 11
MESSAGE_TOO_LARGE(10, new RecordTooLargeException("The request included a message larger than the max message size the server will accept.")),
OFFSET_METADATA_TOO_LARGE(12, new OffsetMetadataTooLarge("The metadata field of the offset request was too large.")),
NETWORK_EXCEPTION(13, new NetworkException("The server disconnected before a response was received."));

View File

@ -41,7 +41,7 @@ object ProducerResponse {
}
}
case class ProducerResponseStatus(error: Short, offset: Long)
case class ProducerResponseStatus(var error: Short, offset: Long)
case class ProducerResponse(override val correlationId: Int,
status: Map[TopicAndPartition, ProducerResponseStatus])

View File

@ -231,7 +231,8 @@ class KafkaApis(val requestChannel: RequestChannel,
// create a list of (topic, partition) pairs to use as keys for this delayed request
val producerRequestKeys = produceRequest.data.keys.map(
topicAndPartition => new RequestKey(topicAndPartition)).toSeq
val statuses = localProduceResults.map(r => r.key -> ProducerResponseStatus(r.errorCode, r.end + 1)).toMap
val statuses = localProduceResults.map(r =>
r.key -> DelayedProduceResponseStatus(r.end + 1, ProducerResponseStatus(r.errorCode, r.start))).toMap
val delayedProduce = new DelayedProduce(producerRequestKeys,
request,
statuses,
@ -255,7 +256,16 @@ class KafkaApis(val requestChannel: RequestChannel,
produceRequest.emptyData()
}
}
case class DelayedProduceResponseStatus(requiredOffset: Long,
status: ProducerResponseStatus) {
var acksPending = false
override def toString =
"acksPending:%b, error: %d, startOffset: %d, requiredOffset: %d".format(
acksPending, status.error, status.offset, requiredOffset)
}
case class ProduceResult(key: TopicAndPartition, start: Long, end: Long, error: Option[Throwable] = None) {
def this(key: TopicAndPartition, throwable: Throwable) =
this(key, -1L, -1L, Some(throwable))
@ -762,41 +772,31 @@ class KafkaApis(val requestChannel: RequestChannel,
class DelayedProduce(keys: Seq[RequestKey],
request: RequestChannel.Request,
initialErrorsAndOffsets: Map[TopicAndPartition, ProducerResponseStatus],
val partitionStatus: Map[TopicAndPartition, DelayedProduceResponseStatus],
val produce: ProducerRequest,
delayMs: Long)
extends DelayedRequest(keys, request, delayMs) with Logging {
/**
* Map of (topic, partition) -> partition status
* The values in this map don't need to be synchronized since updates to the
* values are effectively synchronized by the ProducerRequestPurgatory's
* update method
*/
private [kafka] val partitionStatus = keys.map(requestKey => {
val producerResponseStatus = initialErrorsAndOffsets(TopicAndPartition(requestKey.topic, requestKey.partition))
// if there was an error in writing to the local replica's log, then don't
// wait for acks on this partition
val (acksPending, error, nextOffset) =
if (producerResponseStatus.error == ErrorMapping.NoError) {
// Timeout error state will be cleared when requiredAcks are received
(true, ErrorMapping.RequestTimedOutCode, producerResponseStatus.offset)
}
else (false, producerResponseStatus.error, producerResponseStatus.offset)
// first update the acks pending variable according to error code
partitionStatus foreach { case (topicAndPartition, delayedStatus) =>
if (delayedStatus.status.error == ErrorMapping.NoError) {
// Timeout error state will be cleared when requiredAcks are received
delayedStatus.acksPending = true
delayedStatus.status.error = ErrorMapping.RequestTimedOutCode
} else {
delayedStatus.acksPending = false
}
trace("Initial partition status for %s is %s".format(topicAndPartition, delayedStatus))
}
val initialStatus = PartitionStatus(acksPending, error, nextOffset)
trace("Initial partition status for %s = %s".format(requestKey.keyLabel, initialStatus))
(requestKey, initialStatus)
}).toMap
def respond() {
val finalErrorsAndOffsets = initialErrorsAndOffsets.map(
status => {
val pstat = partitionStatus(new RequestKey(status._1))
(status._1, ProducerResponseStatus(pstat.error, pstat.requiredOffset))
})
val response = ProducerResponse(produce.correlationId, finalErrorsAndOffsets)
val responseStatus = partitionStatus.map { case (topicAndPartition, delayedStatus) =>
topicAndPartition -> delayedStatus.status
}
val response = ProducerResponse(produce.correlationId, responseStatus)
requestChannel.sendResponse(new RequestChannel.Response(
request, new BoundedByteBufferSend(response)))
@ -816,8 +816,7 @@ class KafkaApis(val requestChannel: RequestChannel,
def isSatisfied(followerFetchRequestKey: RequestKey) = {
val topic = followerFetchRequestKey.topic
val partitionId = followerFetchRequestKey.partition
val key = RequestKey(topic, partitionId)
val fetchPartitionStatus = partitionStatus(key)
val fetchPartitionStatus = partitionStatus(TopicAndPartition(topic, partitionId))
trace("Checking producer request satisfaction for %s-%d, acksPending = %b"
.format(topic, partitionId, fetchPartitionStatus.acksPending))
if (fetchPartitionStatus.acksPending) {
@ -830,10 +829,10 @@ class KafkaApis(val requestChannel: RequestChannel,
}
if (errorCode != ErrorMapping.NoError) {
fetchPartitionStatus.acksPending = false
fetchPartitionStatus.error = errorCode
fetchPartitionStatus.status.error = errorCode
} else if (hasEnough) {
fetchPartitionStatus.acksPending = false
fetchPartitionStatus.error = ErrorMapping.NoError
fetchPartitionStatus.status.error = ErrorMapping.NoError
}
if (!fetchPartitionStatus.acksPending) {
val messageSizeInBytes = produce.topicPartitionMessageSizeMap(followerFetchRequestKey.topicAndPartition)
@ -846,20 +845,6 @@ class KafkaApis(val requestChannel: RequestChannel,
trace("Producer request satisfaction for %s-%d = %b".format(topic, partitionId, satisfied))
satisfied
}
case class PartitionStatus(var acksPending: Boolean,
var error: Short,
requiredOffset: Long) {
def setThisBrokerNotLeader() {
error = ErrorMapping.NotLeaderForPartitionCode
acksPending = false
}
override def toString =
"acksPending:%b, error: %d, requiredOffset: %d".format(
acksPending, error, requiredOffset
)
}
}
/**
@ -877,8 +862,8 @@ class KafkaApis(val requestChannel: RequestChannel,
* Handle an expired delayed request
*/
protected def expire(delayedProduce: DelayedProduce) {
for (partitionStatus <- delayedProduce.partitionStatus if partitionStatus._2.acksPending)
delayedRequestMetrics.recordDelayedProducerKeyExpired(partitionStatus._1)
for ((topicPartition, responseStatus) <- delayedProduce.partitionStatus if responseStatus.acksPending)
delayedRequestMetrics.recordDelayedProducerKeyExpired(RequestKey(topicPartition.topic, topicPartition.partition))
delayedProduce.respond()
}

View File

@ -0,0 +1,284 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.api.test
import org.scalatest.junit.JUnit3Suite
import org.junit.Test
import org.junit.Assert._
import java.util.Properties
import java.lang.Integer
import java.util.concurrent.{TimeoutException, TimeUnit, ExecutionException}
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.utils.{Utils, TestUtils}
import kafka.zk.ZooKeeperTestHarness
import kafka.consumer.SimpleConsumer
import org.apache.kafka.common.KafkaException
import org.apache.kafka.clients.producer._
class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness {
private val brokerId1 = 0
private val brokerId2 = 1
private val ports = TestUtils.choosePorts(2)
private val (port1, port2) = (ports(0), ports(1))
private var server1: KafkaServer = null
private var server2: KafkaServer = null
private var servers = List.empty[KafkaServer]
private var consumer1: SimpleConsumer = null
private var consumer2: SimpleConsumer = null
private var producer1: KafkaProducer = null
private var producer2: KafkaProducer = null
private var producer3: KafkaProducer = null
private var producer4: KafkaProducer = null
private val props1 = TestUtils.createBrokerConfig(brokerId1, port1)
private val props2 = TestUtils.createBrokerConfig(brokerId2, port2)
props1.put("auto.create.topics.enable", "false")
props2.put("auto.create.topics.enable", "false")
private val config1 = new KafkaConfig(props1)
private val config2 = new KafkaConfig(props2)
private val brokerList = TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))
private val bufferSize = 2 * config1.messageMaxBytes
private val topic1 = "topic-1"
private val topic2 = "topic-2"
// TODO: move this function to TestUtils after we have server dependant on clients
private def makeProducer(brokerList: String, acks: Int, metadataFetchTimeout: Long,
blockOnBufferFull: Boolean, bufferSize: Long) : KafkaProducer = {
val producerProps = new Properties()
producerProps.put(ProducerConfig.BROKER_LIST_CONFIG, brokerList)
producerProps.put(ProducerConfig.REQUIRED_ACKS_CONFIG, acks.toString)
producerProps.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, metadataFetchTimeout.toString)
producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, blockOnBufferFull.toString)
producerProps.put(ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG, bufferSize.toString)
return new KafkaProducer(producerProps)
}
override def setUp() {
super.setUp()
server1 = TestUtils.createServer(config1)
server2 = TestUtils.createServer(config2)
servers = List(server1,server2)
// TODO: we need to migrate to new consumers when 0.9 is final
consumer1 = new SimpleConsumer("localhost", port1, 100, 1024*1024, "")
consumer2 = new SimpleConsumer("localhost", port2, 100, 1024*1024, "")
producer1 = makeProducer(brokerList, 0, 3000, false, bufferSize); // produce with ack=0
producer2 = makeProducer(brokerList, 1, 3000, false, bufferSize); // produce with ack=1
producer3 = makeProducer(brokerList, -1, 3000, false, bufferSize); // produce with ack=-1
producer4 = makeProducer("localhost:8686,localhost:4242", 1, 3000, false, bufferSize); // produce with incorrect broker list
}
override def tearDown() {
server1.shutdown; Utils.rm(server1.config.logDirs)
server2.shutdown; Utils.rm(server2.config.logDirs)
consumer1.close
consumer2.close
if (producer1 != null) producer1.close
if (producer2 != null) producer2.close
if (producer3 != null) producer3.close
if (producer4 != null) producer4.close
super.tearDown()
}
/**
* With ack == 0 the future metadata will have no exceptions with offset -1
*/
@Test
def testTooLargeRecordWithAckZero() {
// create topic
TestUtils.createTopic(zkClient, topic1, 1, 2, servers)
// send a too-large record
val record = new ProducerRecord(topic1, null, "key".getBytes, new Array[Byte](config1.messageMaxBytes + 1))
assertEquals("Returned metadata should have offset -1", producer1.send(record).get.offset, -1L)
}
/**
* With ack == 1 the future metadata will throw ExecutionException caused by RecordTooLargeException
*/
@Test
def testTooLargeRecordWithAckOne() {
// create topic
TestUtils.createTopic(zkClient, topic1, 1, 2, servers)
// send a too-large record
val record = new ProducerRecord(topic1, null, "key".getBytes, new Array[Byte](config1.messageMaxBytes + 1))
intercept[ExecutionException] {
producer2.send(record).get
}
}
/**
* With non-exist-topic the future metadata should return ExecutionException caused by TimeoutException
*/
@Test
def testNonExistTopic() {
// send a record with non-exist topic
val record = new ProducerRecord(topic2, null, "key".getBytes, "value".getBytes)
intercept[ExecutionException] {
producer1.send(record).get
}
}
/**
* With incorrect broker-list the future metadata should return ExecutionException caused by TimeoutException
*
* TODO: other exceptions that can be thrown in ExecutionException:
* UnknownTopicOrPartitionException
* NotLeaderForPartitionException
* LeaderNotAvailableException
* CorruptRecordException
* TimeoutException
*/
@Test
def testWrongBrokerList() {
// create topic
TestUtils.createTopic(zkClient, topic1, 1, 2, servers)
// send a record with incorrect broker list
val record = new ProducerRecord(topic1, null, "key".getBytes, "value".getBytes)
intercept[ExecutionException] {
producer4.send(record).get
}
}
/**
* 1. With ack=0, the future metadata should not be blocked.
* 2. With ack=1, the future metadata should block,
* and subsequent calls will eventually cause buffer full
*/
@Test
def testNoResponse() {
// create topic
TestUtils.createTopic(zkClient, topic1, 1, 2, servers)
// first send a message to make sure the metadata is refreshed
val record = new ProducerRecord(topic1, null, "key".getBytes, "value".getBytes)
producer1.send(record).get
producer2.send(record).get
// stop IO threads and request handling, but leave networking operational
// any requests should be accepted and queue up, but not handled
server1.requestHandlerPool.shutdown()
server2.requestHandlerPool.shutdown()
producer1.send(record).get(5000, TimeUnit.MILLISECONDS)
intercept[TimeoutException] {
producer2.send(record).get(5000, TimeUnit.MILLISECONDS)
}
// TODO: expose producer configs after creating them
// send enough messages to get buffer full
val tooManyRecords = bufferSize / ("key".getBytes.length + "value".getBytes.length)
intercept[KafkaException] {
for (i <- 1 to tooManyRecords)
producer2.send(record)
}
// do not close produce2 since it will block
// TODO: can we do better?
producer2 = null
}
/**
* The send call with invalid partition id should throw KafkaException caused by IllegalArgumentException
*/
@Test
def testInvalidPartition() {
// create topic
TestUtils.createTopic(zkClient, topic1, 1, 2, servers)
// create a record with incorrect partition id, send should fail
val record = new ProducerRecord(topic1, new Integer(1), "key".getBytes, "value".getBytes)
intercept[KafkaException] {
producer1.send(record)
}
intercept[KafkaException] {
producer2.send(record)
}
intercept[KafkaException] {
producer3.send(record)
}
}
/**
* The send call after producer closed should throw KafkaException cased by IllegalStateException
*/
@Test
def testSendAfterClosed() {
// create topic
TestUtils.createTopic(zkClient, topic1, 1, 2, servers)
val record = new ProducerRecord(topic1, null, "key".getBytes, "value".getBytes)
// first send a message to make sure the metadata is refreshed
producer1.send(record).get
producer2.send(record).get
producer3.send(record).get
intercept[KafkaException] {
producer1.close
producer1.send(record)
}
intercept[KafkaException] {
producer2.close
producer2.send(record)
}
intercept[KafkaException] {
producer3.close
producer3.send(record)
}
// re-close producer is fine
}
/**
* With replication, producer should able able to find new leader after it detects broker failure
*/
@Test
def testBrokerFailure() {
// create topic
val leaders = TestUtils.createTopic(zkClient, topic1, 1, 2, servers)
val leader = leaders(0)
assertTrue("Leader of partition 0 of the topic should exist", leader.isDefined)
val record = new ProducerRecord(topic1, null, "key".getBytes, "value".getBytes)
assertEquals("Returned metadata should have offset 0", producer3.send(record).get.offset, 0L)
// shutdown broker
val serverToShutdown = if(leader.get == server1.config.brokerId) server1 else server2
serverToShutdown.shutdown()
serverToShutdown.awaitShutdown()
// send the message again, it should still succeed due-to retry
assertEquals("Returned metadata should have offset 1", producer3.send(record).get.offset, 1L)
}
}

View File

@ -15,12 +15,11 @@
* limitations under the License.
*/
package kafka.test
package kafka.api.test
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.utils.{ZkUtils, Utils, TestUtils, Logging}
import kafka.utils.{Utils, TestUtils}
import kafka.zk.ZooKeeperTestHarness
import kafka.admin.AdminUtils
import kafka.consumer.SimpleConsumer
import kafka.api.FetchRequestBuilder
import kafka.message.Message
@ -33,7 +32,6 @@ import org.junit.Assert._
import java.util.Properties
import java.lang.{Integer, IllegalArgumentException}
import org.apache.log4j.Logger
class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness {
@ -110,29 +108,25 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness {
// send a normal record
val record0 = new ProducerRecord(topic, new Integer(0), "key".getBytes, "value".getBytes)
val response0 = producer.send(record0, callback)
assertEquals("Should have offset 0", 0L, response0.get.offset)
assertEquals("Should have offset 0", 0L, producer.send(record0, callback).get.offset)
// send a record with null value should be ok
val record1 = new ProducerRecord(topic, new Integer(0), "key".getBytes, null)
val response1 = producer.send(record1, callback)
assertEquals("Should have offset 1", 1L, response1.get.offset)
assertEquals("Should have offset 1", 1L, producer.send(record1, callback).get.offset)
// send a record with null key should be ok
val record2 = new ProducerRecord(topic, new Integer(0), null, "value".getBytes)
val response2 = producer.send(record2, callback)
assertEquals("Should have offset 2", 2L, response2.get.offset)
assertEquals("Should have offset 2", 2L, producer.send(record2, callback).get.offset)
// send a record with null part id should be ok
val record3 = new ProducerRecord(topic, null, "key".getBytes, "value".getBytes)
val response3 = producer.send(record3, callback)
assertEquals("Should have offset 3", 3L, response3.get.offset)
assertEquals("Should have offset 3", 3L, producer.send(record3, callback).get.offset)
// send a record with null topic should fail
try {
val record4 = new ProducerRecord(null, new Integer(0), "key".getBytes, "value".getBytes)
val response4 = producer.send(record4, callback)
response4.wait
producer.send(record4, callback)
fail("Should not allow sending a record without topic")
} catch {
case iae: IllegalArgumentException => // this is ok
case e: Throwable => fail("Only expecting IllegalArgumentException", e)
@ -143,8 +137,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness {
producer.send(record0)
// check that all messages have been acked via offset
val response5 = producer.send(record0, callback)
assertEquals("Should have offset " + (numRecords + 4), numRecords + 4L, response5.get.offset)
assertEquals("Should have offset " + (numRecords + 4), numRecords + 4L, producer.send(record0, callback).get.offset)
} finally {
if (producer != null) {
@ -157,7 +150,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness {
/**
* testClose checks the closing behavior
*
* 1. After close() returns, all messages should be sent with correct returned offset metadata
* After close() returns, all messages should be sent with correct returned offset metadata
*/
@Test
def testClose() {
@ -195,7 +188,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness {
/**
* testSendToPartition checks the partitioning behavior
*
* 1. The specified partition-id should be respected
* The specified partition-id should be respected
*/
@Test
def testSendToPartition() {
@ -207,40 +200,40 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness {
try {
// create topic
val leaders = TestUtils.createTopic(zkClient, topic, 2, 2, servers)
val partition = 1
// make sure leaders exist
val leader1 = leaders.get(1)
val leader1 = leaders(partition)
assertTrue("Leader for topic \"topic\" partition 1 should exist", leader1.isDefined)
val partition = 1
val responses =
for (i <- 0 until numRecords)
for (i <- 1 to numRecords)
yield producer.send(new ProducerRecord(topic, partition, null, ("value" + i).getBytes))
val futures = responses.toList
futures.map(_.wait)
futures.map(_.get)
for (future <- futures)
assertTrue("Request should have completed", future.isDone)
// make sure all of them end up in the same partition with increasing offset values
for ((future, offset) <- futures zip (0 until numRecords)) {
assertEquals(offset, future.get.offset)
assertEquals(offset.toLong, future.get.offset)
assertEquals(topic, future.get.topic)
assertEquals(1, future.get.partition)
assertEquals(partition, future.get.partition)
}
// make sure the fetched messages also respect the partitioning and ordering
val fetchResponse1 = if(leader1.get == server1.config.brokerId) {
consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 1, 0, Int.MaxValue).build())
}else {
consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 1, 0, Int.MaxValue).build())
consumer1.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0, Int.MaxValue).build())
} else {
consumer2.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0, Int.MaxValue).build())
}
val messageSet1 = fetchResponse1.messageSet(topic, 1).iterator.toBuffer
val messageSet1 = fetchResponse1.messageSet(topic, partition).iterator.toBuffer
assertEquals("Should have fetched " + numRecords + " messages", numRecords, messageSet1.size)
// TODO: also check topic and partition after they are added in the return messageSet
for (i <- 0 to numRecords - 1) {
assertEquals(new Message(bytes = ("value" + (i + 1)).getBytes), messageSet1(i).message)
assertEquals(i, messageSet1(i).offset)
assertEquals(i.toLong, messageSet1(i).offset)
}
} finally {
if (producer != null) {
@ -250,6 +243,11 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness {
}
}
/**
* testAutoCreateTopic
*
* The topic should be created upon sending the first message
*/
@Test
def testAutoCreateTopic() {
val props = new Properties()
@ -259,8 +257,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness {
try {
// Send a message to auto-create the topic
val record = new ProducerRecord(topic, null, "key".getBytes, "value".getBytes)
val response = producer.send(record)
assertEquals("Should have offset 0", 0L, response.get.offset)
assertEquals("Should have offset 0", 0L, producer.send(record).get.offset)
// double check that the topic is created with leader elected
assertTrue("Topic should already be created with leader", TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 0).isDefined)

View File

@ -23,24 +23,27 @@ import java.nio._
import java.nio.channels._
import java.util.Random
import java.util.Properties
import junit.framework.AssertionFailedError
import junit.framework.Assert._
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.TimeUnit
import collection.mutable.Map
import collection.mutable.ListBuffer
import org.I0Itec.zkclient.ZkClient
import kafka.server._
import kafka.producer._
import kafka.message._
import org.I0Itec.zkclient.ZkClient
import kafka.cluster.Broker
import collection.mutable.ListBuffer
import kafka.consumer.ConsumerConfig
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.TimeUnit
import kafka.api._
import collection.mutable.Map
import kafka.cluster.Broker
import kafka.consumer.ConsumerConfig
import kafka.serializer.{StringEncoder, DefaultEncoder, Encoder}
import kafka.common.TopicAndPartition
import junit.framework.Assert
import kafka.admin.AdminUtils
import kafka.producer.ProducerConfig
import junit.framework.AssertionFailedError
import junit.framework.Assert._
/**
* Utility functions to help with testing
@ -526,7 +529,7 @@ object TestUtils extends Logging {
}
def waitUntilMetadataIsPropagated(servers: Seq[KafkaServer], topic: String, partition: Int, timeout: Long) = {
Assert.assertTrue("Partition [%s,%d] metadata not propagated after timeout".format(topic, partition),
assertTrue("Partition [%s,%d] metadata not propagated after timeout".format(topic, partition),
TestUtils.waitUntilTrue(() =>
servers.foldLeft(true)(_ && _.apis.metadataCache.keySet.contains(TopicAndPartition(topic, partition))), timeout))
}