From 98f64070891a34e3a8b3f0483a7f5397b8a6933b Mon Sep 17 00:00:00 2001 From: Neha Narkhede Date: Tue, 16 Oct 2012 17:30:46 +0000 Subject: [PATCH] KAFKA-537 Expose clientId in ConsumerConfig and fix correlation id; patched by Yang Ye; reviewed by Neha Narkhede git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8@1398893 13f79535-47bb-0310-9956-ffa450edef68 --- .../main/java/kafka/etl/KafkaETLContext.java | 5 ++-- .../main/scala/kafka/api/FetchRequest.scala | 28 +++++++++---------- .../scala/kafka/consumer/ConsumerConfig.scala | 6 ++++ .../consumer/ConsumerFetcherThread.scala | 13 ++++++--- .../kafka/server/AbstractFetcherThread.scala | 20 ++++++------- .../kafka/server/ReplicaFetcherThread.scala | 20 +++++++++---- .../kafka/tools/SimpleConsumerShell.scala | 17 ++++++----- .../kafka/integration/PrimitiveApiTest.scala | 4 +-- .../kafka/examples/SimpleConsumerDemo.java | 2 -- .../perf/SimpleConsumerPerformance.scala | 3 -- 10 files changed, 63 insertions(+), 55 deletions(-) diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java index 949816924ec..b0e75bc18e7 100644 --- a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java +++ b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java @@ -60,7 +60,6 @@ public class KafkaETLContext { protected long _offset = Long.MAX_VALUE; /*current offset*/ protected long _count; /*current count*/ - protected int requestId = 0; /* the id of the next fetch request */ protected FetchResponse _response = null; /*fetch response*/ protected Iterator _messageIt = null; /*message iterator*/ protected Iterator _respIterator = null; @@ -74,6 +73,7 @@ public class KafkaETLContext { protected MultipleOutputs _mos; protected OutputCollector _offsetOut = null; + protected FetchRequestBuilder builder = new FetchRequestBuilder(); public long getTotalBytes() { return (_offsetRange[1] > _offsetRange[0])? _offsetRange[1] - _offsetRange[0] : 0; @@ -150,8 +150,7 @@ public class KafkaETLContext { public boolean fetchMore () throws IOException { if (!hasMore()) return false; - FetchRequest fetchRequest = new FetchRequestBuilder() - .correlationId(requestId) + FetchRequest fetchRequest = builder .clientId(_request.clientId()) .addFetch(_request.getTopic(), _request.getPartition(), _offset, _bufferSize) .build(); diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala index 88f620e329e..9892fb35627 100644 --- a/core/src/main/scala/kafka/api/FetchRequest.scala +++ b/core/src/main/scala/kafka/api/FetchRequest.scala @@ -23,6 +23,7 @@ import kafka.api.ApiUtils._ import scala.collection.immutable.Map import kafka.common.TopicAndPartition import kafka.consumer.ConsumerConfig +import java.util.concurrent.atomic.AtomicInteger case class PartitionFetchInfo(offset: Long, fetchSize: Int) @@ -30,8 +31,10 @@ case class PartitionFetchInfo(offset: Long, fetchSize: Int) object FetchRequest { val CurrentVersion = 1.shortValue() - val DefaultCorrelationId = -1 - val DefaultClientId = "" + val DefaultMaxWait = 0 + val DefaultMinBytes = 0 + val ReplicaFetcherClientId = "replica fetcher" + val DefaultCorrelationId = 0 def readFrom(buffer: ByteBuffer): FetchRequest = { val versionId = buffer.getShort @@ -57,10 +60,10 @@ object FetchRequest { case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion, correlationId: Int = FetchRequest.DefaultCorrelationId, - clientId: String = FetchRequest.DefaultClientId, + clientId: String = ConsumerConfig.DefaultClientId, replicaId: Int = Request.OrdinaryConsumerId, - maxWait: Int = ConsumerConfig.MaxFetchWaitMs, - minBytes: Int = ConsumerConfig.MinFetchBytes, + maxWait: Int = FetchRequest.DefaultMaxWait, + minBytes: Int = FetchRequest.DefaultMinBytes, requestInfo: Map[TopicAndPartition, PartitionFetchInfo]) extends RequestOrResponse(Some(RequestKeys.FetchKey)) { @@ -123,12 +126,12 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion, @nonthreadsafe class FetchRequestBuilder() { - private var correlationId = FetchRequest.DefaultCorrelationId + private val correlationId = new AtomicInteger(0) private val versionId = FetchRequest.CurrentVersion - private var clientId = FetchRequest.DefaultClientId + private var clientId = ConsumerConfig.DefaultClientId private var replicaId = Request.OrdinaryConsumerId - private var maxWait = ConsumerConfig.MaxFetchWaitMs - private var minBytes = ConsumerConfig.MinFetchBytes + private var maxWait = FetchRequest.DefaultMaxWait + private var minBytes = FetchRequest.DefaultMinBytes private val requestMap = new collection.mutable.HashMap[TopicAndPartition, PartitionFetchInfo] def addFetch(topic: String, partition: Int, offset: Long, fetchSize: Int) = { @@ -136,11 +139,6 @@ class FetchRequestBuilder() { this } - def correlationId(correlationId: Int): FetchRequestBuilder = { - this.correlationId = correlationId - this - } - def clientId(clientId: String): FetchRequestBuilder = { this.clientId = clientId this @@ -161,5 +159,5 @@ class FetchRequestBuilder() { this } - def build() = FetchRequest(versionId, correlationId, clientId, replicaId, maxWait, minBytes, requestMap.toMap) + def build() = FetchRequest(versionId, correlationId.getAndIncrement, clientId, replicaId, maxWait, minBytes, requestMap.toMap) } diff --git a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala index 3770e47f876..6f13ba6d4ee 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala @@ -42,6 +42,7 @@ object ConsumerConfig { val MirrorTopicsWhitelistProp = "mirror.topics.whitelist" val MirrorTopicsBlacklistProp = "mirror.topics.blacklist" val MirrorConsumerNumThreadsProp = "mirror.consumer.numthreads" + val DefaultClientId = "" } class ConsumerConfig private (props: VerifiableProperties) extends ZKConfig(props) { @@ -106,5 +107,10 @@ class ConsumerConfig private (props: VerifiableProperties) extends ZKConfig(prop * overhead of decompression. * */ val enableShallowIterator = props.getBoolean("shallowiterator.enable", false) + + /** + * Cliient id is specified by the kafka consumer client, used to distinguish different clients + */ + val clientId = props.getString("clientid", groupId) } diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala index ad84aafe05b..d7e5a25a182 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala @@ -28,10 +28,15 @@ class ConsumerFetcherThread(name: String, val config: ConsumerConfig, sourceBroker: Broker, val consumerFetcherManager: ConsumerFetcherManager) - extends AbstractFetcherThread(name = name, sourceBroker = sourceBroker, socketTimeout = config.socketTimeoutMs, - socketBufferSize = config.socketBufferSize, fetchSize = config.fetchSize, - fetcherBrokerId = Request.OrdinaryConsumerId, maxWait = config.maxFetchWaitMs, - minBytes = config.minFetchBytes) { + extends AbstractFetcherThread(name = name, + clientId = config.clientId, + sourceBroker = sourceBroker, + socketTimeout = config.socketTimeoutMs, + socketBufferSize = config.socketBufferSize, + fetchSize = config.fetchSize, + fetcherBrokerId = Request.OrdinaryConsumerId, + maxWait = config.maxFetchWaitMs, + minBytes = config.minFetchBytes) { // process fetched data def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData: FetchResponsePartitionData) { diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 3184ce92cd5..42681e23ea2 100644 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -34,7 +34,7 @@ import java.util.concurrent.TimeUnit /** * Abstract class for fetching data from multiple partitions from the same broker. */ -abstract class AbstractFetcherThread(name: String, sourceBroker: Broker, socketTimeout: Int, socketBufferSize: Int, +abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroker: Broker, socketTimeout: Int, socketBufferSize: Int, fetchSize: Int, fetcherBrokerId: Int = -1, maxWait: Int = 0, minBytes: Int = 1) extends ShutdownableThread(name) { @@ -42,7 +42,13 @@ abstract class AbstractFetcherThread(name: String, sourceBroker: Broker, socket private val fetchMapLock = new Object val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize) val fetcherMetrics = FetcherStat.getFetcherStat(name + "-" + sourceBroker.id) - + + val fetchRequestuilder = new FetchRequestBuilder(). + clientId(clientId). + replicaId(fetcherBrokerId). + maxWait(maxWait). + minBytes(minBytes) + /* callbacks to be defined in subclass */ // process fetched data @@ -61,21 +67,15 @@ abstract class AbstractFetcherThread(name: String, sourceBroker: Broker, socket } override def doWork() { - val builder = new FetchRequestBuilder(). - clientId(name). - replicaId(fetcherBrokerId). - maxWait(maxWait). - minBytes(minBytes) - fetchMapLock synchronized { fetchMap.foreach { case((topicAndPartition, offset)) => - builder.addFetch(topicAndPartition.topic, topicAndPartition.partition, + fetchRequestuilder.addFetch(topicAndPartition.topic, topicAndPartition.partition, offset, fetchSize) } } - val fetchRequest = builder.build() + val fetchRequest = fetchRequestuilder.build() val partitionsWithError = new mutable.HashSet[TopicAndPartition] var response: FetchResponse = null try { diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index 4c66d64baeb..00f6cac8c54 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -17,17 +17,25 @@ package kafka.server -import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest, FetchResponsePartitionData} import kafka.cluster.Broker import kafka.message.ByteBufferMessageSet import kafka.common.TopicAndPartition +import kafka.api.{FetchRequest, PartitionOffsetRequestInfo, OffsetRequest, FetchResponsePartitionData} -class ReplicaFetcherThread(name:String, sourceBroker: Broker, brokerConfig: KafkaConfig, replicaMgr: ReplicaManager) - extends AbstractFetcherThread(name = name, sourceBroker = sourceBroker, socketTimeout = brokerConfig.replicaSocketTimeoutMs, - socketBufferSize = brokerConfig.replicaSocketBufferSize, fetchSize = brokerConfig.replicaFetchSize, - fetcherBrokerId = brokerConfig.brokerId, maxWait = brokerConfig.replicaMaxWaitTimeMs, - minBytes = brokerConfig.replicaMinBytes) { +class ReplicaFetcherThread(name:String, + sourceBroker: Broker, + brokerConfig: KafkaConfig, + replicaMgr: ReplicaManager) + extends AbstractFetcherThread(name = name, + clientId = FetchRequest.ReplicaFetcherClientId + "- %s:%d".format(sourceBroker.host, sourceBroker.port) , + sourceBroker = sourceBroker, + socketTimeout = brokerConfig.replicaSocketTimeoutMs, + socketBufferSize = brokerConfig.replicaSocketBufferSize, + fetchSize = brokerConfig.replicaFetchSize, + fetcherBrokerId = brokerConfig.brokerId, + maxWait = brokerConfig.replicaMaxWaitTimeMs, + minBytes = brokerConfig.replicaMinBytes) { // process fetched data def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData: FetchResponsePartitionData) { diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala index f696050e8d8..8ac81984194 100644 --- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala +++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala @@ -114,6 +114,12 @@ object SimpleConsumerShell extends Logging { val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt)) val formatterArgs = MessageFormatter.tryParseFormatterArgs(options.valuesOf(messageFormatterArgOpt)) + val fetchRequestBuilder = new FetchRequestBuilder() + .clientId(clientId) + .replicaId(Request.DebuggingConsumerId) + .maxWait(maxWaitMs) + .minBytes(ConsumerConfig.MinFetchBytes) + // getting topic metadata info("Getting topic metatdata...") val metadataTargetBrokers = ClientUtils.parseBrokerList(options.valueOf(brokerListOpt)) @@ -168,17 +174,11 @@ object SimpleConsumerShell extends Logging { val simpleConsumer = new SimpleConsumer(fetchTargetBroker.host, fetchTargetBroker.port, 10000, 64*1024) val thread = Utils.newThread("kafka-simpleconsumer-shell", new Runnable() { def run() { - var reqId = 0 var offset = startingOffset try { while(true) { - val fetchRequest = new FetchRequestBuilder() - .correlationId(reqId) - .clientId(clientId) - .replicaId(Request.DebuggingConsumerId) + val fetchRequest = fetchRequestBuilder .addFetch(topic, partitionId, offset, fetchSize) - .maxWait(maxWaitMs) - .minBytes(ConsumerConfig.MinFetchBytes) .build() val fetchResponse = simpleConsumer.fetch(fetchRequest) val messageSet = fetchResponse.messageSet(topic, partitionId) @@ -206,11 +206,10 @@ object SimpleConsumerShell extends Logging { } consumed += 1 } - reqId += 1 } } catch { case e: Throwable => - error("Error consuming topic, partition, replica (%s, %d, %d) with request id [%d] and offset [%d]".format(topic, partitionId, replicaId, reqId, offset), e) + error("Error consuming topic, partition, replica (%s, %d, %d) with offset [%d]".format(topic, partitionId, replicaId, offset), e) } } }, false) diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala index 2ce5d37df54..7c41310b712 100644 --- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala +++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala @@ -62,7 +62,6 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with def testFetchRequestCanProperlySerialize() { val request = new FetchRequestBuilder() - .correlationId(100) .clientId("test-client") .maxWait(10001) .minBytes(4444) @@ -99,12 +98,11 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with replica.logEndOffset > 0 && replica.logEndOffset == replica.highWatermark) val request = new FetchRequestBuilder() - .correlationId(100) .clientId("test-client") .addFetch(topic, 0, 0, 10000) .build() val fetched = consumer.fetch(request) - assertEquals("Returned correlationId doesn't match that in request.", 100, fetched.correlationId) + assertEquals("Returned correlationId doesn't match that in request.", 0, fetched.correlationId) val messageSet = fetched.messageSet(topic, 0) assertTrue(messageSet.iterator.hasNext) diff --git a/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java b/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java index db92d9dbe93..f9951205545 100644 --- a/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java +++ b/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java @@ -57,7 +57,6 @@ public class SimpleConsumerDemo { System.out.println("Testing single fetch"); FetchRequest req = new FetchRequestBuilder() - .correlationId(0) .clientId(KafkaProperties.clientId) .addFetch(KafkaProperties.topic2, 0, 0L, 100) .build(); @@ -70,7 +69,6 @@ public class SimpleConsumerDemo { put(KafkaProperties.topic3, new ArrayList(){{ add(0); }}); }}; req = new FetchRequestBuilder() - .correlationId(0) .clientId(KafkaProperties.clientId) .addFetch(KafkaProperties.topic2, 0, 0L, 100) .addFetch(KafkaProperties.topic3, 0, 0L, 100) diff --git a/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala b/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala index 40d1ca8e14e..b5d6d7ba5f8 100644 --- a/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala +++ b/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala @@ -59,11 +59,9 @@ object SimpleConsumerPerformance { var lastReportTime: Long = startMs var lastBytesRead = 0L var lastMessagesRead = 0L - var reqId = 0 while(!done) { // TODO: add in the maxWait and minBytes for performance val request = new FetchRequestBuilder() - .correlationId(reqId) .clientId(config.clientId) .addFetch(config.topic, config.partition, offset, config.fetchSize) .build() @@ -101,7 +99,6 @@ object SimpleConsumerPerformance { lastMessagesRead = totalMessagesRead consumedInterval = 0 } - reqId += 1 } val reportTime = System.currentTimeMillis val elapsed = (reportTime - startMs) / 1000.0