KAFKA-537 Expose clientId in ConsumerConfig and fix correlation id; patched by Yang Ye; reviewed by Neha Narkhede

git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8@1398893 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Neha Narkhede 2012-10-16 17:30:46 +00:00
parent d1a22b2e3b
commit 98f6407089
10 changed files with 63 additions and 55 deletions

View File

@ -60,7 +60,6 @@ public class KafkaETLContext {
protected long _offset = Long.MAX_VALUE; /*current offset*/
protected long _count; /*current count*/
protected int requestId = 0; /* the id of the next fetch request */
protected FetchResponse _response = null; /*fetch response*/
protected Iterator<MessageAndOffset> _messageIt = null; /*message iterator*/
protected Iterator<ByteBufferMessageSet> _respIterator = null;
@ -74,6 +73,7 @@ public class KafkaETLContext {
protected MultipleOutputs _mos;
protected OutputCollector<KafkaETLKey, BytesWritable> _offsetOut = null;
protected FetchRequestBuilder builder = new FetchRequestBuilder();
public long getTotalBytes() {
return (_offsetRange[1] > _offsetRange[0])? _offsetRange[1] - _offsetRange[0] : 0;
@ -150,8 +150,7 @@ public class KafkaETLContext {
public boolean fetchMore () throws IOException {
if (!hasMore()) return false;
FetchRequest fetchRequest = new FetchRequestBuilder()
.correlationId(requestId)
FetchRequest fetchRequest = builder
.clientId(_request.clientId())
.addFetch(_request.getTopic(), _request.getPartition(), _offset, _bufferSize)
.build();

View File

@ -23,6 +23,7 @@ import kafka.api.ApiUtils._
import scala.collection.immutable.Map
import kafka.common.TopicAndPartition
import kafka.consumer.ConsumerConfig
import java.util.concurrent.atomic.AtomicInteger
case class PartitionFetchInfo(offset: Long, fetchSize: Int)
@ -30,8 +31,10 @@ case class PartitionFetchInfo(offset: Long, fetchSize: Int)
object FetchRequest {
val CurrentVersion = 1.shortValue()
val DefaultCorrelationId = -1
val DefaultClientId = ""
val DefaultMaxWait = 0
val DefaultMinBytes = 0
val ReplicaFetcherClientId = "replica fetcher"
val DefaultCorrelationId = 0
def readFrom(buffer: ByteBuffer): FetchRequest = {
val versionId = buffer.getShort
@ -57,10 +60,10 @@ object FetchRequest {
case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
correlationId: Int = FetchRequest.DefaultCorrelationId,
clientId: String = FetchRequest.DefaultClientId,
clientId: String = ConsumerConfig.DefaultClientId,
replicaId: Int = Request.OrdinaryConsumerId,
maxWait: Int = ConsumerConfig.MaxFetchWaitMs,
minBytes: Int = ConsumerConfig.MinFetchBytes,
maxWait: Int = FetchRequest.DefaultMaxWait,
minBytes: Int = FetchRequest.DefaultMinBytes,
requestInfo: Map[TopicAndPartition, PartitionFetchInfo])
extends RequestOrResponse(Some(RequestKeys.FetchKey)) {
@ -123,12 +126,12 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
@nonthreadsafe
class FetchRequestBuilder() {
private var correlationId = FetchRequest.DefaultCorrelationId
private val correlationId = new AtomicInteger(0)
private val versionId = FetchRequest.CurrentVersion
private var clientId = FetchRequest.DefaultClientId
private var clientId = ConsumerConfig.DefaultClientId
private var replicaId = Request.OrdinaryConsumerId
private var maxWait = ConsumerConfig.MaxFetchWaitMs
private var minBytes = ConsumerConfig.MinFetchBytes
private var maxWait = FetchRequest.DefaultMaxWait
private var minBytes = FetchRequest.DefaultMinBytes
private val requestMap = new collection.mutable.HashMap[TopicAndPartition, PartitionFetchInfo]
def addFetch(topic: String, partition: Int, offset: Long, fetchSize: Int) = {
@ -136,11 +139,6 @@ class FetchRequestBuilder() {
this
}
def correlationId(correlationId: Int): FetchRequestBuilder = {
this.correlationId = correlationId
this
}
def clientId(clientId: String): FetchRequestBuilder = {
this.clientId = clientId
this
@ -161,5 +159,5 @@ class FetchRequestBuilder() {
this
}
def build() = FetchRequest(versionId, correlationId, clientId, replicaId, maxWait, minBytes, requestMap.toMap)
def build() = FetchRequest(versionId, correlationId.getAndIncrement, clientId, replicaId, maxWait, minBytes, requestMap.toMap)
}

View File

@ -42,6 +42,7 @@ object ConsumerConfig {
val MirrorTopicsWhitelistProp = "mirror.topics.whitelist"
val MirrorTopicsBlacklistProp = "mirror.topics.blacklist"
val MirrorConsumerNumThreadsProp = "mirror.consumer.numthreads"
val DefaultClientId = ""
}
class ConsumerConfig private (props: VerifiableProperties) extends ZKConfig(props) {
@ -106,5 +107,10 @@ class ConsumerConfig private (props: VerifiableProperties) extends ZKConfig(prop
* overhead of decompression.
* */
val enableShallowIterator = props.getBoolean("shallowiterator.enable", false)
/**
* Cliient id is specified by the kafka consumer client, used to distinguish different clients
*/
val clientId = props.getString("clientid", groupId)
}

View File

@ -28,10 +28,15 @@ class ConsumerFetcherThread(name: String,
val config: ConsumerConfig,
sourceBroker: Broker,
val consumerFetcherManager: ConsumerFetcherManager)
extends AbstractFetcherThread(name = name, sourceBroker = sourceBroker, socketTimeout = config.socketTimeoutMs,
socketBufferSize = config.socketBufferSize, fetchSize = config.fetchSize,
fetcherBrokerId = Request.OrdinaryConsumerId, maxWait = config.maxFetchWaitMs,
minBytes = config.minFetchBytes) {
extends AbstractFetcherThread(name = name,
clientId = config.clientId,
sourceBroker = sourceBroker,
socketTimeout = config.socketTimeoutMs,
socketBufferSize = config.socketBufferSize,
fetchSize = config.fetchSize,
fetcherBrokerId = Request.OrdinaryConsumerId,
maxWait = config.maxFetchWaitMs,
minBytes = config.minFetchBytes) {
// process fetched data
def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData: FetchResponsePartitionData) {

View File

@ -34,7 +34,7 @@ import java.util.concurrent.TimeUnit
/**
* Abstract class for fetching data from multiple partitions from the same broker.
*/
abstract class AbstractFetcherThread(name: String, sourceBroker: Broker, socketTimeout: Int, socketBufferSize: Int,
abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroker: Broker, socketTimeout: Int, socketBufferSize: Int,
fetchSize: Int, fetcherBrokerId: Int = -1, maxWait: Int = 0, minBytes: Int = 1)
extends ShutdownableThread(name) {
@ -42,7 +42,13 @@ abstract class AbstractFetcherThread(name: String, sourceBroker: Broker, socket
private val fetchMapLock = new Object
val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize)
val fetcherMetrics = FetcherStat.getFetcherStat(name + "-" + sourceBroker.id)
val fetchRequestuilder = new FetchRequestBuilder().
clientId(clientId).
replicaId(fetcherBrokerId).
maxWait(maxWait).
minBytes(minBytes)
/* callbacks to be defined in subclass */
// process fetched data
@ -61,21 +67,15 @@ abstract class AbstractFetcherThread(name: String, sourceBroker: Broker, socket
}
override def doWork() {
val builder = new FetchRequestBuilder().
clientId(name).
replicaId(fetcherBrokerId).
maxWait(maxWait).
minBytes(minBytes)
fetchMapLock synchronized {
fetchMap.foreach {
case((topicAndPartition, offset)) =>
builder.addFetch(topicAndPartition.topic, topicAndPartition.partition,
fetchRequestuilder.addFetch(topicAndPartition.topic, topicAndPartition.partition,
offset, fetchSize)
}
}
val fetchRequest = builder.build()
val fetchRequest = fetchRequestuilder.build()
val partitionsWithError = new mutable.HashSet[TopicAndPartition]
var response: FetchResponse = null
try {

View File

@ -17,17 +17,25 @@
package kafka.server
import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest, FetchResponsePartitionData}
import kafka.cluster.Broker
import kafka.message.ByteBufferMessageSet
import kafka.common.TopicAndPartition
import kafka.api.{FetchRequest, PartitionOffsetRequestInfo, OffsetRequest, FetchResponsePartitionData}
class ReplicaFetcherThread(name:String, sourceBroker: Broker, brokerConfig: KafkaConfig, replicaMgr: ReplicaManager)
extends AbstractFetcherThread(name = name, sourceBroker = sourceBroker, socketTimeout = brokerConfig.replicaSocketTimeoutMs,
socketBufferSize = brokerConfig.replicaSocketBufferSize, fetchSize = brokerConfig.replicaFetchSize,
fetcherBrokerId = brokerConfig.brokerId, maxWait = brokerConfig.replicaMaxWaitTimeMs,
minBytes = brokerConfig.replicaMinBytes) {
class ReplicaFetcherThread(name:String,
sourceBroker: Broker,
brokerConfig: KafkaConfig,
replicaMgr: ReplicaManager)
extends AbstractFetcherThread(name = name,
clientId = FetchRequest.ReplicaFetcherClientId + "- %s:%d".format(sourceBroker.host, sourceBroker.port) ,
sourceBroker = sourceBroker,
socketTimeout = brokerConfig.replicaSocketTimeoutMs,
socketBufferSize = brokerConfig.replicaSocketBufferSize,
fetchSize = brokerConfig.replicaFetchSize,
fetcherBrokerId = brokerConfig.brokerId,
maxWait = brokerConfig.replicaMaxWaitTimeMs,
minBytes = brokerConfig.replicaMinBytes) {
// process fetched data
def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData: FetchResponsePartitionData) {

View File

@ -114,6 +114,12 @@ object SimpleConsumerShell extends Logging {
val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt))
val formatterArgs = MessageFormatter.tryParseFormatterArgs(options.valuesOf(messageFormatterArgOpt))
val fetchRequestBuilder = new FetchRequestBuilder()
.clientId(clientId)
.replicaId(Request.DebuggingConsumerId)
.maxWait(maxWaitMs)
.minBytes(ConsumerConfig.MinFetchBytes)
// getting topic metadata
info("Getting topic metatdata...")
val metadataTargetBrokers = ClientUtils.parseBrokerList(options.valueOf(brokerListOpt))
@ -168,17 +174,11 @@ object SimpleConsumerShell extends Logging {
val simpleConsumer = new SimpleConsumer(fetchTargetBroker.host, fetchTargetBroker.port, 10000, 64*1024)
val thread = Utils.newThread("kafka-simpleconsumer-shell", new Runnable() {
def run() {
var reqId = 0
var offset = startingOffset
try {
while(true) {
val fetchRequest = new FetchRequestBuilder()
.correlationId(reqId)
.clientId(clientId)
.replicaId(Request.DebuggingConsumerId)
val fetchRequest = fetchRequestBuilder
.addFetch(topic, partitionId, offset, fetchSize)
.maxWait(maxWaitMs)
.minBytes(ConsumerConfig.MinFetchBytes)
.build()
val fetchResponse = simpleConsumer.fetch(fetchRequest)
val messageSet = fetchResponse.messageSet(topic, partitionId)
@ -206,11 +206,10 @@ object SimpleConsumerShell extends Logging {
}
consumed += 1
}
reqId += 1
}
} catch {
case e: Throwable =>
error("Error consuming topic, partition, replica (%s, %d, %d) with request id [%d] and offset [%d]".format(topic, partitionId, replicaId, reqId, offset), e)
error("Error consuming topic, partition, replica (%s, %d, %d) with offset [%d]".format(topic, partitionId, replicaId, offset), e)
}
}
}, false)

View File

@ -62,7 +62,6 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
def testFetchRequestCanProperlySerialize() {
val request = new FetchRequestBuilder()
.correlationId(100)
.clientId("test-client")
.maxWait(10001)
.minBytes(4444)
@ -99,12 +98,11 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
replica.logEndOffset > 0 && replica.logEndOffset == replica.highWatermark)
val request = new FetchRequestBuilder()
.correlationId(100)
.clientId("test-client")
.addFetch(topic, 0, 0, 10000)
.build()
val fetched = consumer.fetch(request)
assertEquals("Returned correlationId doesn't match that in request.", 100, fetched.correlationId)
assertEquals("Returned correlationId doesn't match that in request.", 0, fetched.correlationId)
val messageSet = fetched.messageSet(topic, 0)
assertTrue(messageSet.iterator.hasNext)

View File

@ -57,7 +57,6 @@ public class SimpleConsumerDemo {
System.out.println("Testing single fetch");
FetchRequest req = new FetchRequestBuilder()
.correlationId(0)
.clientId(KafkaProperties.clientId)
.addFetch(KafkaProperties.topic2, 0, 0L, 100)
.build();
@ -70,7 +69,6 @@ public class SimpleConsumerDemo {
put(KafkaProperties.topic3, new ArrayList<Integer>(){{ add(0); }});
}};
req = new FetchRequestBuilder()
.correlationId(0)
.clientId(KafkaProperties.clientId)
.addFetch(KafkaProperties.topic2, 0, 0L, 100)
.addFetch(KafkaProperties.topic3, 0, 0L, 100)

View File

@ -59,11 +59,9 @@ object SimpleConsumerPerformance {
var lastReportTime: Long = startMs
var lastBytesRead = 0L
var lastMessagesRead = 0L
var reqId = 0
while(!done) {
// TODO: add in the maxWait and minBytes for performance
val request = new FetchRequestBuilder()
.correlationId(reqId)
.clientId(config.clientId)
.addFetch(config.topic, config.partition, offset, config.fetchSize)
.build()
@ -101,7 +99,6 @@ object SimpleConsumerPerformance {
lastMessagesRead = totalMessagesRead
consumedInterval = 0
}
reqId += 1
}
val reportTime = System.currentTimeMillis
val elapsed = (reportTime - startMs) / 1000.0