From 5e2a9a560d847bd0cf364d86bd6784f70d99c71a Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Thu, 27 Feb 2014 10:50:15 -0800 Subject: [PATCH] KAFKA-1260 Integration Test for New Producer Part II: Broker Failure Handling; reviewed by Jay Kreps, Neha Narkhede and Jun Rao --- .../kafka/clients/producer/KafkaProducer.java | 8 +- .../kafka/clients/producer/MockProducer.java | 2 +- .../clients/producer/RecordMetadata.java | 8 +- .../internals/FutureRecordMetadata.java | 2 +- .../clients/producer/internals/Metadata.java | 4 +- .../producer/internals/RecordBatch.java | 13 +- .../clients/producer/internals/Sender.java | 2 +- .../apache/kafka/common/network/Selector.java | 1 + .../apache/kafka/common/protocol/Errors.java | 14 +- .../scala/kafka/api/ProducerResponse.scala | 2 +- .../main/scala/kafka/server/KafkaApis.scala | 85 +++--- .../api/ProducerFailureHandlingTest.scala | 284 ++++++++++++++++++ .../kafka/api/ProducerSendTest.scala | 59 ++-- .../scala/unit/kafka/utils/TestUtils.scala | 25 +- 14 files changed, 393 insertions(+), 116 deletions(-) create mode 100644 core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index e4bc9727958..757f7a7d20d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -33,6 +33,7 @@ import org.apache.kafka.common.Metric; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.errors.ApiException; import org.apache.kafka.common.errors.RecordTooLargeException; import org.apache.kafka.common.metrics.JmxReporter; import org.apache.kafka.common.metrics.MetricConfig; @@ -217,10 +218,14 @@ public class KafkaProducer implements Producer { FutureRecordMetadata future = accumulator.append(tp, record.key(), record.value(), CompressionType.NONE, callback); this.sender.wakeup(); return future; - } catch (Exception e) { + // For API exceptions return them in the future; + // for other exceptions throw directly + } catch (ApiException e) { if (callback != null) callback.onCompletion(null, e); return new FutureFailure(e); + } catch (InterruptedException e) { + throw new KafkaException(e); } } @@ -255,7 +260,6 @@ public class KafkaProducer implements Producer { */ @Override public void close() { - this.accumulator.close(); this.sender.initiateClose(); try { this.ioThread.join(); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java index f43da80580f..6a0f3b27f75 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java @@ -108,7 +108,7 @@ public class MockProducer implements Producer { FutureRecordMetadata future = new FutureRecordMetadata(result, 0); TopicPartition topicPartition = new TopicPartition(record.topic(), partition); long offset = nextOffset(topicPartition); - Completion completion = new Completion(topicPartition, offset, new RecordMetadata(topicPartition, offset), result, callback); + Completion completion = new Completion(topicPartition, offset, new RecordMetadata(topicPartition, 0, offset), result, callback); this.sent.add(record); if (autoComplete) completion.complete(null); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java index 8c776980ef1..8015f0da397 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java @@ -26,12 +26,18 @@ public final class RecordMetadata { private final long offset; private final TopicPartition topicPartition; - public RecordMetadata(TopicPartition topicPartition, long offset) { + private RecordMetadata(TopicPartition topicPartition, long offset) { super(); this.offset = offset; this.topicPartition = topicPartition; } + public RecordMetadata(TopicPartition topicPartition, long baseOffset, long relativeOffset) { + // ignore the relativeOffset if the base offset is -1, + // since this indicates the offset is unknown + this(topicPartition, baseOffset == -1 ? baseOffset : baseOffset + relativeOffset); + } + /** * The offset of the record in the topic/partition. */ diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java index 22d4c79bc06..aec31c38165 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java @@ -60,7 +60,7 @@ public final class FutureRecordMetadata implements Future { if (this.result.error() != null) throw new ExecutionException(this.result.error()); else - return new RecordMetadata(result.topicPartition(), this.result.baseOffset() + this.relativeOffset); + return new RecordMetadata(result.topicPartition(), this.result.baseOffset(), this.relativeOffset); } public long relativeOffset() { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java index 62613a3e29a..ce231685edb 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java @@ -74,10 +74,10 @@ public final class Metadata { */ public synchronized Cluster fetch(String topic, long maxWaitMs) { List partitions = null; + long begin = System.currentTimeMillis(); do { partitions = cluster.partitionsFor(topic); if (partitions == null) { - long begin = System.currentTimeMillis(); topics.add(topic); forceUpdate = true; try { @@ -85,7 +85,7 @@ public final class Metadata { } catch (InterruptedException e) { /* this is fine, just try again */ } long ellapsed = System.currentTimeMillis() - begin; - if (ellapsed > maxWaitMs) + if (ellapsed >= maxWaitMs) throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms."); } else { return cluster; diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java index eb16f6d236e..ef8e658da25 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java @@ -55,7 +55,7 @@ public final class RecordBatch { this.records.append(0L, key, value, compression); FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount); if (callback != null) - thunks.add(new Thunk(callback, this.recordCount)); + thunks.add(new Thunk(callback, future)); this.recordCount++; return future; } @@ -74,8 +74,7 @@ public final class RecordBatch { try { Thunk thunk = this.thunks.get(i); if (exception == null) - thunk.callback.onCompletion(new RecordMetadata(topicPartition, this.produceFuture.baseOffset() + thunk.relativeOffset), - null); + thunk.callback.onCompletion(thunk.future.get(), null); else thunk.callback.onCompletion(null, exception); } catch (Exception e) { @@ -85,15 +84,15 @@ public final class RecordBatch { } /** - * A callback and the associated RecordSend argument to pass to it. + * A callback and the associated FutureRecordMetadata argument to pass to it. */ final private static class Thunk { final Callback callback; - final long relativeOffset; + final FutureRecordMetadata future; - public Thunk(Callback callback, long relativeOffset) { + public Thunk(Callback callback, FutureRecordMetadata future) { this.callback = callback; - this.relativeOffset = relativeOffset; + this.future = future; } } } \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index e373265f19f..541c5e1b309 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -324,6 +324,7 @@ public class Sender implements Runnable { private void handleDisconnects(List disconnects, long now) { // clear out the in-flight requests for the disconnected broker for (int node : disconnects) { + nodeStates.disconnected(node); for (InFlightRequest request : this.inFlightRequests.clearAll(node)) { if (request.batches != null) { for (RecordBatch batch : request.batches.values()) { @@ -335,7 +336,6 @@ public class Sender implements Runnable { } } } - nodeStates.disconnected(request.request.destination()); } } // we got a disconnect so we should probably refresh our metadata and see if that broker is dead diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index f1e474cd530..678bfccb571 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -299,6 +299,7 @@ public class Selector implements Selectable { Transmissions trans = transmissions(key); if (trans != null) { this.disconnected.add(trans.id); + this.keys.remove(trans.id); trans.clearReceive(); trans.clearSend(); } diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index f88992a0caf..3374bd98be8 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -41,17 +41,15 @@ import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; public enum Errors { UNKNOWN(-1, new UnknownServerException("The server experienced an unexpected error when processing the request")), NONE(0, null), - OFFSET_OUT_OF_RANGE(1, - new OffsetOutOfRangeException("The requested offset is not within the range of offsets maintained by the server.")), - CORRUPT_MESSAGE(2, - new CorruptRecordException("The message contents does not match the message CRC or the message is otherwise corrupt.")), + OFFSET_OUT_OF_RANGE(1, new OffsetOutOfRangeException("The requested offset is not within the range of offsets maintained by the server.")), + CORRUPT_MESSAGE(2, new CorruptRecordException("The message contents does not match the message CRC or the message is otherwise corrupt.")), UNKNOWN_TOPIC_OR_PARTITION(3, new UnknownTopicOrPartitionException("This server does not host this topic-partition.")), - LEADER_NOT_AVAILABLE(5, - new LeaderNotAvailableException("There is no leader for this topic-partition as we are in the middle of a leadership election.")), + // TODO: errorCode 4 for InvalidFetchSize + LEADER_NOT_AVAILABLE(5, new LeaderNotAvailableException("There is no leader for this topic-partition as we are in the middle of a leadership election.")), NOT_LEADER_FOR_PARTITION(6, new NotLeaderForPartitionException("This server is not the leader for that topic-partition.")), REQUEST_TIMED_OUT(7, new TimeoutException("The request timed out.")), - MESSAGE_TOO_LARGE(10, - new RecordTooLargeException("The request included a message larger than the max message size the server will accept.")), + // TODO: errorCode 8, 9, 11 + MESSAGE_TOO_LARGE(10, new RecordTooLargeException("The request included a message larger than the max message size the server will accept.")), OFFSET_METADATA_TOO_LARGE(12, new OffsetMetadataTooLarge("The metadata field of the offset request was too large.")), NETWORK_EXCEPTION(13, new NetworkException("The server disconnected before a response was received.")); diff --git a/core/src/main/scala/kafka/api/ProducerResponse.scala b/core/src/main/scala/kafka/api/ProducerResponse.scala index 06261b91363..5a1d8015379 100644 --- a/core/src/main/scala/kafka/api/ProducerResponse.scala +++ b/core/src/main/scala/kafka/api/ProducerResponse.scala @@ -41,7 +41,7 @@ object ProducerResponse { } } -case class ProducerResponseStatus(error: Short, offset: Long) +case class ProducerResponseStatus(var error: Short, offset: Long) case class ProducerResponse(override val correlationId: Int, status: Map[TopicAndPartition, ProducerResponseStatus]) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index ae2df2014a0..215ac36ece4 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -231,7 +231,8 @@ class KafkaApis(val requestChannel: RequestChannel, // create a list of (topic, partition) pairs to use as keys for this delayed request val producerRequestKeys = produceRequest.data.keys.map( topicAndPartition => new RequestKey(topicAndPartition)).toSeq - val statuses = localProduceResults.map(r => r.key -> ProducerResponseStatus(r.errorCode, r.end + 1)).toMap + val statuses = localProduceResults.map(r => + r.key -> DelayedProduceResponseStatus(r.end + 1, ProducerResponseStatus(r.errorCode, r.start))).toMap val delayedProduce = new DelayedProduce(producerRequestKeys, request, statuses, @@ -255,7 +256,16 @@ class KafkaApis(val requestChannel: RequestChannel, produceRequest.emptyData() } } - + + case class DelayedProduceResponseStatus(requiredOffset: Long, + status: ProducerResponseStatus) { + var acksPending = false + + override def toString = + "acksPending:%b, error: %d, startOffset: %d, requiredOffset: %d".format( + acksPending, status.error, status.offset, requiredOffset) + } + case class ProduceResult(key: TopicAndPartition, start: Long, end: Long, error: Option[Throwable] = None) { def this(key: TopicAndPartition, throwable: Throwable) = this(key, -1L, -1L, Some(throwable)) @@ -762,41 +772,31 @@ class KafkaApis(val requestChannel: RequestChannel, class DelayedProduce(keys: Seq[RequestKey], request: RequestChannel.Request, - initialErrorsAndOffsets: Map[TopicAndPartition, ProducerResponseStatus], + val partitionStatus: Map[TopicAndPartition, DelayedProduceResponseStatus], val produce: ProducerRequest, delayMs: Long) extends DelayedRequest(keys, request, delayMs) with Logging { - /** - * Map of (topic, partition) -> partition status - * The values in this map don't need to be synchronized since updates to the - * values are effectively synchronized by the ProducerRequestPurgatory's - * update method - */ - private [kafka] val partitionStatus = keys.map(requestKey => { - val producerResponseStatus = initialErrorsAndOffsets(TopicAndPartition(requestKey.topic, requestKey.partition)) - // if there was an error in writing to the local replica's log, then don't - // wait for acks on this partition - val (acksPending, error, nextOffset) = - if (producerResponseStatus.error == ErrorMapping.NoError) { - // Timeout error state will be cleared when requiredAcks are received - (true, ErrorMapping.RequestTimedOutCode, producerResponseStatus.offset) - } - else (false, producerResponseStatus.error, producerResponseStatus.offset) + // first update the acks pending variable according to error code + partitionStatus foreach { case (topicAndPartition, delayedStatus) => + if (delayedStatus.status.error == ErrorMapping.NoError) { + // Timeout error state will be cleared when requiredAcks are received + delayedStatus.acksPending = true + delayedStatus.status.error = ErrorMapping.RequestTimedOutCode + } else { + delayedStatus.acksPending = false + } + + trace("Initial partition status for %s is %s".format(topicAndPartition, delayedStatus)) + } - val initialStatus = PartitionStatus(acksPending, error, nextOffset) - trace("Initial partition status for %s = %s".format(requestKey.keyLabel, initialStatus)) - (requestKey, initialStatus) - }).toMap def respond() { - val finalErrorsAndOffsets = initialErrorsAndOffsets.map( - status => { - val pstat = partitionStatus(new RequestKey(status._1)) - (status._1, ProducerResponseStatus(pstat.error, pstat.requiredOffset)) - }) - - val response = ProducerResponse(produce.correlationId, finalErrorsAndOffsets) + val responseStatus = partitionStatus.map { case (topicAndPartition, delayedStatus) => + topicAndPartition -> delayedStatus.status + } + + val response = ProducerResponse(produce.correlationId, responseStatus) requestChannel.sendResponse(new RequestChannel.Response( request, new BoundedByteBufferSend(response))) @@ -816,8 +816,7 @@ class KafkaApis(val requestChannel: RequestChannel, def isSatisfied(followerFetchRequestKey: RequestKey) = { val topic = followerFetchRequestKey.topic val partitionId = followerFetchRequestKey.partition - val key = RequestKey(topic, partitionId) - val fetchPartitionStatus = partitionStatus(key) + val fetchPartitionStatus = partitionStatus(TopicAndPartition(topic, partitionId)) trace("Checking producer request satisfaction for %s-%d, acksPending = %b" .format(topic, partitionId, fetchPartitionStatus.acksPending)) if (fetchPartitionStatus.acksPending) { @@ -830,10 +829,10 @@ class KafkaApis(val requestChannel: RequestChannel, } if (errorCode != ErrorMapping.NoError) { fetchPartitionStatus.acksPending = false - fetchPartitionStatus.error = errorCode + fetchPartitionStatus.status.error = errorCode } else if (hasEnough) { fetchPartitionStatus.acksPending = false - fetchPartitionStatus.error = ErrorMapping.NoError + fetchPartitionStatus.status.error = ErrorMapping.NoError } if (!fetchPartitionStatus.acksPending) { val messageSizeInBytes = produce.topicPartitionMessageSizeMap(followerFetchRequestKey.topicAndPartition) @@ -846,20 +845,6 @@ class KafkaApis(val requestChannel: RequestChannel, trace("Producer request satisfaction for %s-%d = %b".format(topic, partitionId, satisfied)) satisfied } - - case class PartitionStatus(var acksPending: Boolean, - var error: Short, - requiredOffset: Long) { - def setThisBrokerNotLeader() { - error = ErrorMapping.NotLeaderForPartitionCode - acksPending = false - } - - override def toString = - "acksPending:%b, error: %d, requiredOffset: %d".format( - acksPending, error, requiredOffset - ) - } } /** @@ -877,8 +862,8 @@ class KafkaApis(val requestChannel: RequestChannel, * Handle an expired delayed request */ protected def expire(delayedProduce: DelayedProduce) { - for (partitionStatus <- delayedProduce.partitionStatus if partitionStatus._2.acksPending) - delayedRequestMetrics.recordDelayedProducerKeyExpired(partitionStatus._1) + for ((topicPartition, responseStatus) <- delayedProduce.partitionStatus if responseStatus.acksPending) + delayedRequestMetrics.recordDelayedProducerKeyExpired(RequestKey(topicPartition.topic, topicPartition.partition)) delayedProduce.respond() } diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala new file mode 100644 index 00000000000..b8eb72605d9 --- /dev/null +++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala @@ -0,0 +1,284 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.api.test + +import org.scalatest.junit.JUnit3Suite +import org.junit.Test +import org.junit.Assert._ + +import java.util.Properties +import java.lang.Integer +import java.util.concurrent.{TimeoutException, TimeUnit, ExecutionException} + +import kafka.server.{KafkaConfig, KafkaServer} +import kafka.utils.{Utils, TestUtils} +import kafka.zk.ZooKeeperTestHarness +import kafka.consumer.SimpleConsumer + +import org.apache.kafka.common.KafkaException +import org.apache.kafka.clients.producer._ + +class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness { + private val brokerId1 = 0 + private val brokerId2 = 1 + private val ports = TestUtils.choosePorts(2) + private val (port1, port2) = (ports(0), ports(1)) + private var server1: KafkaServer = null + private var server2: KafkaServer = null + private var servers = List.empty[KafkaServer] + + private var consumer1: SimpleConsumer = null + private var consumer2: SimpleConsumer = null + + private var producer1: KafkaProducer = null + private var producer2: KafkaProducer = null + private var producer3: KafkaProducer = null + private var producer4: KafkaProducer = null + + private val props1 = TestUtils.createBrokerConfig(brokerId1, port1) + private val props2 = TestUtils.createBrokerConfig(brokerId2, port2) + props1.put("auto.create.topics.enable", "false") + props2.put("auto.create.topics.enable", "false") + private val config1 = new KafkaConfig(props1) + private val config2 = new KafkaConfig(props2) + private val brokerList = TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)) + + private val bufferSize = 2 * config1.messageMaxBytes + + private val topic1 = "topic-1" + private val topic2 = "topic-2" + + // TODO: move this function to TestUtils after we have server dependant on clients + private def makeProducer(brokerList: String, acks: Int, metadataFetchTimeout: Long, + blockOnBufferFull: Boolean, bufferSize: Long) : KafkaProducer = { + val producerProps = new Properties() + producerProps.put(ProducerConfig.BROKER_LIST_CONFIG, brokerList) + producerProps.put(ProducerConfig.REQUIRED_ACKS_CONFIG, acks.toString) + producerProps.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, metadataFetchTimeout.toString) + producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, blockOnBufferFull.toString) + producerProps.put(ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG, bufferSize.toString) + return new KafkaProducer(producerProps) + } + + override def setUp() { + super.setUp() + server1 = TestUtils.createServer(config1) + server2 = TestUtils.createServer(config2) + servers = List(server1,server2) + + // TODO: we need to migrate to new consumers when 0.9 is final + consumer1 = new SimpleConsumer("localhost", port1, 100, 1024*1024, "") + consumer2 = new SimpleConsumer("localhost", port2, 100, 1024*1024, "") + + producer1 = makeProducer(brokerList, 0, 3000, false, bufferSize); // produce with ack=0 + producer2 = makeProducer(brokerList, 1, 3000, false, bufferSize); // produce with ack=1 + producer3 = makeProducer(brokerList, -1, 3000, false, bufferSize); // produce with ack=-1 + producer4 = makeProducer("localhost:8686,localhost:4242", 1, 3000, false, bufferSize); // produce with incorrect broker list + } + + override def tearDown() { + server1.shutdown; Utils.rm(server1.config.logDirs) + server2.shutdown; Utils.rm(server2.config.logDirs) + + consumer1.close + consumer2.close + + if (producer1 != null) producer1.close + if (producer2 != null) producer2.close + if (producer3 != null) producer3.close + if (producer4 != null) producer4.close + + super.tearDown() + } + + /** + * With ack == 0 the future metadata will have no exceptions with offset -1 + */ + @Test + def testTooLargeRecordWithAckZero() { + // create topic + TestUtils.createTopic(zkClient, topic1, 1, 2, servers) + + // send a too-large record + val record = new ProducerRecord(topic1, null, "key".getBytes, new Array[Byte](config1.messageMaxBytes + 1)) + assertEquals("Returned metadata should have offset -1", producer1.send(record).get.offset, -1L) + } + + /** + * With ack == 1 the future metadata will throw ExecutionException caused by RecordTooLargeException + */ + @Test + def testTooLargeRecordWithAckOne() { + // create topic + TestUtils.createTopic(zkClient, topic1, 1, 2, servers) + + // send a too-large record + val record = new ProducerRecord(topic1, null, "key".getBytes, new Array[Byte](config1.messageMaxBytes + 1)) + intercept[ExecutionException] { + producer2.send(record).get + } + } + + /** + * With non-exist-topic the future metadata should return ExecutionException caused by TimeoutException + */ + @Test + def testNonExistTopic() { + // send a record with non-exist topic + val record = new ProducerRecord(topic2, null, "key".getBytes, "value".getBytes) + intercept[ExecutionException] { + producer1.send(record).get + } + } + + /** + * With incorrect broker-list the future metadata should return ExecutionException caused by TimeoutException + * + * TODO: other exceptions that can be thrown in ExecutionException: + * UnknownTopicOrPartitionException + * NotLeaderForPartitionException + * LeaderNotAvailableException + * CorruptRecordException + * TimeoutException + */ + @Test + def testWrongBrokerList() { + // create topic + TestUtils.createTopic(zkClient, topic1, 1, 2, servers) + + // send a record with incorrect broker list + val record = new ProducerRecord(topic1, null, "key".getBytes, "value".getBytes) + intercept[ExecutionException] { + producer4.send(record).get + } + } + + /** + * 1. With ack=0, the future metadata should not be blocked. + * 2. With ack=1, the future metadata should block, + * and subsequent calls will eventually cause buffer full + */ + @Test + def testNoResponse() { + // create topic + TestUtils.createTopic(zkClient, topic1, 1, 2, servers) + + // first send a message to make sure the metadata is refreshed + val record = new ProducerRecord(topic1, null, "key".getBytes, "value".getBytes) + producer1.send(record).get + producer2.send(record).get + + // stop IO threads and request handling, but leave networking operational + // any requests should be accepted and queue up, but not handled + server1.requestHandlerPool.shutdown() + server2.requestHandlerPool.shutdown() + + producer1.send(record).get(5000, TimeUnit.MILLISECONDS) + + intercept[TimeoutException] { + producer2.send(record).get(5000, TimeUnit.MILLISECONDS) + } + + // TODO: expose producer configs after creating them + // send enough messages to get buffer full + val tooManyRecords = bufferSize / ("key".getBytes.length + "value".getBytes.length) + + intercept[KafkaException] { + for (i <- 1 to tooManyRecords) + producer2.send(record) + } + + // do not close produce2 since it will block + // TODO: can we do better? + producer2 = null + } + + /** + * The send call with invalid partition id should throw KafkaException caused by IllegalArgumentException + */ + @Test + def testInvalidPartition() { + // create topic + TestUtils.createTopic(zkClient, topic1, 1, 2, servers) + + // create a record with incorrect partition id, send should fail + val record = new ProducerRecord(topic1, new Integer(1), "key".getBytes, "value".getBytes) + intercept[KafkaException] { + producer1.send(record) + } + intercept[KafkaException] { + producer2.send(record) + } + intercept[KafkaException] { + producer3.send(record) + } + } + + /** + * The send call after producer closed should throw KafkaException cased by IllegalStateException + */ + @Test + def testSendAfterClosed() { + // create topic + TestUtils.createTopic(zkClient, topic1, 1, 2, servers) + + val record = new ProducerRecord(topic1, null, "key".getBytes, "value".getBytes) + + // first send a message to make sure the metadata is refreshed + producer1.send(record).get + producer2.send(record).get + producer3.send(record).get + + intercept[KafkaException] { + producer1.close + producer1.send(record) + } + intercept[KafkaException] { + producer2.close + producer2.send(record) + } + intercept[KafkaException] { + producer3.close + producer3.send(record) + } + + // re-close producer is fine + } + + /** + * With replication, producer should able able to find new leader after it detects broker failure + */ + @Test + def testBrokerFailure() { + // create topic + val leaders = TestUtils.createTopic(zkClient, topic1, 1, 2, servers) + val leader = leaders(0) + assertTrue("Leader of partition 0 of the topic should exist", leader.isDefined) + + val record = new ProducerRecord(topic1, null, "key".getBytes, "value".getBytes) + assertEquals("Returned metadata should have offset 0", producer3.send(record).get.offset, 0L) + + // shutdown broker + val serverToShutdown = if(leader.get == server1.config.brokerId) server1 else server2 + serverToShutdown.shutdown() + serverToShutdown.awaitShutdown() + + // send the message again, it should still succeed due-to retry + assertEquals("Returned metadata should have offset 1", producer3.send(record).get.offset, 1L) + } +} \ No newline at end of file diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala index 34baa8c6c7a..66ea76b9b6c 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala @@ -15,12 +15,11 @@ * limitations under the License. */ -package kafka.test +package kafka.api.test import kafka.server.{KafkaConfig, KafkaServer} -import kafka.utils.{ZkUtils, Utils, TestUtils, Logging} +import kafka.utils.{Utils, TestUtils} import kafka.zk.ZooKeeperTestHarness -import kafka.admin.AdminUtils import kafka.consumer.SimpleConsumer import kafka.api.FetchRequestBuilder import kafka.message.Message @@ -33,7 +32,6 @@ import org.junit.Assert._ import java.util.Properties import java.lang.{Integer, IllegalArgumentException} -import org.apache.log4j.Logger class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -110,29 +108,25 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { // send a normal record val record0 = new ProducerRecord(topic, new Integer(0), "key".getBytes, "value".getBytes) - val response0 = producer.send(record0, callback) - assertEquals("Should have offset 0", 0L, response0.get.offset) + assertEquals("Should have offset 0", 0L, producer.send(record0, callback).get.offset) // send a record with null value should be ok val record1 = new ProducerRecord(topic, new Integer(0), "key".getBytes, null) - val response1 = producer.send(record1, callback) - assertEquals("Should have offset 1", 1L, response1.get.offset) + assertEquals("Should have offset 1", 1L, producer.send(record1, callback).get.offset) // send a record with null key should be ok val record2 = new ProducerRecord(topic, new Integer(0), null, "value".getBytes) - val response2 = producer.send(record2, callback) - assertEquals("Should have offset 2", 2L, response2.get.offset) + assertEquals("Should have offset 2", 2L, producer.send(record2, callback).get.offset) // send a record with null part id should be ok val record3 = new ProducerRecord(topic, null, "key".getBytes, "value".getBytes) - val response3 = producer.send(record3, callback) - assertEquals("Should have offset 3", 3L, response3.get.offset) + assertEquals("Should have offset 3", 3L, producer.send(record3, callback).get.offset) // send a record with null topic should fail try { val record4 = new ProducerRecord(null, new Integer(0), "key".getBytes, "value".getBytes) - val response4 = producer.send(record4, callback) - response4.wait + producer.send(record4, callback) + fail("Should not allow sending a record without topic") } catch { case iae: IllegalArgumentException => // this is ok case e: Throwable => fail("Only expecting IllegalArgumentException", e) @@ -143,8 +137,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { producer.send(record0) // check that all messages have been acked via offset - val response5 = producer.send(record0, callback) - assertEquals("Should have offset " + (numRecords + 4), numRecords + 4L, response5.get.offset) + assertEquals("Should have offset " + (numRecords + 4), numRecords + 4L, producer.send(record0, callback).get.offset) } finally { if (producer != null) { @@ -157,7 +150,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { /** * testClose checks the closing behavior * - * 1. After close() returns, all messages should be sent with correct returned offset metadata + * After close() returns, all messages should be sent with correct returned offset metadata */ @Test def testClose() { @@ -195,7 +188,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { /** * testSendToPartition checks the partitioning behavior * - * 1. The specified partition-id should be respected + * The specified partition-id should be respected */ @Test def testSendToPartition() { @@ -207,40 +200,40 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { try { // create topic val leaders = TestUtils.createTopic(zkClient, topic, 2, 2, servers) + val partition = 1 // make sure leaders exist - val leader1 = leaders.get(1) + val leader1 = leaders(partition) assertTrue("Leader for topic \"topic\" partition 1 should exist", leader1.isDefined) - val partition = 1 val responses = - for (i <- 0 until numRecords) + for (i <- 1 to numRecords) yield producer.send(new ProducerRecord(topic, partition, null, ("value" + i).getBytes)) val futures = responses.toList - futures.map(_.wait) + futures.map(_.get) for (future <- futures) assertTrue("Request should have completed", future.isDone) // make sure all of them end up in the same partition with increasing offset values for ((future, offset) <- futures zip (0 until numRecords)) { - assertEquals(offset, future.get.offset) + assertEquals(offset.toLong, future.get.offset) assertEquals(topic, future.get.topic) - assertEquals(1, future.get.partition) + assertEquals(partition, future.get.partition) } // make sure the fetched messages also respect the partitioning and ordering val fetchResponse1 = if(leader1.get == server1.config.brokerId) { - consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 1, 0, Int.MaxValue).build()) - }else { - consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 1, 0, Int.MaxValue).build()) + consumer1.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0, Int.MaxValue).build()) + } else { + consumer2.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0, Int.MaxValue).build()) } - val messageSet1 = fetchResponse1.messageSet(topic, 1).iterator.toBuffer + val messageSet1 = fetchResponse1.messageSet(topic, partition).iterator.toBuffer assertEquals("Should have fetched " + numRecords + " messages", numRecords, messageSet1.size) // TODO: also check topic and partition after they are added in the return messageSet for (i <- 0 to numRecords - 1) { assertEquals(new Message(bytes = ("value" + (i + 1)).getBytes), messageSet1(i).message) - assertEquals(i, messageSet1(i).offset) + assertEquals(i.toLong, messageSet1(i).offset) } } finally { if (producer != null) { @@ -250,6 +243,11 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { } } + /** + * testAutoCreateTopic + * + * The topic should be created upon sending the first message + */ @Test def testAutoCreateTopic() { val props = new Properties() @@ -259,8 +257,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { try { // Send a message to auto-create the topic val record = new ProducerRecord(topic, null, "key".getBytes, "value".getBytes) - val response = producer.send(record) - assertEquals("Should have offset 0", 0L, response.get.offset) + assertEquals("Should have offset 0", 0L, producer.send(record).get.offset) // double check that the topic is created with leader elected assertTrue("Topic should already be created with leader", TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 0).isDefined) diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 1c7a4506519..772d2140ed9 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -23,24 +23,27 @@ import java.nio._ import java.nio.channels._ import java.util.Random import java.util.Properties -import junit.framework.AssertionFailedError -import junit.framework.Assert._ +import java.util.concurrent.locks.ReentrantLock +import java.util.concurrent.TimeUnit + +import collection.mutable.Map +import collection.mutable.ListBuffer + +import org.I0Itec.zkclient.ZkClient + import kafka.server._ import kafka.producer._ import kafka.message._ -import org.I0Itec.zkclient.ZkClient -import kafka.cluster.Broker -import collection.mutable.ListBuffer -import kafka.consumer.ConsumerConfig -import java.util.concurrent.locks.ReentrantLock -import java.util.concurrent.TimeUnit import kafka.api._ -import collection.mutable.Map +import kafka.cluster.Broker +import kafka.consumer.ConsumerConfig import kafka.serializer.{StringEncoder, DefaultEncoder, Encoder} import kafka.common.TopicAndPartition -import junit.framework.Assert import kafka.admin.AdminUtils +import kafka.producer.ProducerConfig +import junit.framework.AssertionFailedError +import junit.framework.Assert._ /** * Utility functions to help with testing @@ -526,7 +529,7 @@ object TestUtils extends Logging { } def waitUntilMetadataIsPropagated(servers: Seq[KafkaServer], topic: String, partition: Int, timeout: Long) = { - Assert.assertTrue("Partition [%s,%d] metadata not propagated after timeout".format(topic, partition), + assertTrue("Partition [%s,%d] metadata not propagated after timeout".format(topic, partition), TestUtils.waitUntilTrue(() => servers.foldLeft(true)(_ && _.apis.metadataCache.keySet.contains(TopicAndPartition(topic, partition))), timeout)) }