From b3a4fe9cedca778a95b7f22054cb8f8ef6cf38c7 Mon Sep 17 00:00:00 2001 From: Jun Rao Date: Tue, 29 Jan 2013 17:00:49 -0800 Subject: [PATCH] correlationId is not set in FetchRequest in AbstractFetcherThread; patched by Jun Rao; reviewed by Neha Narkhede and Swapnil Ghike; kafka-738 --- core/src/main/scala/kafka/api/FetchRequest.scala | 6 +++++- .../scala/kafka/server/AbstractFetcherThread.scala | 11 +++++------ 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala index ac749311e1e..19c961eb8e5 100644 --- a/core/src/main/scala/kafka/api/FetchRequest.scala +++ b/core/src/main/scala/kafka/api/FetchRequest.scala @@ -201,5 +201,9 @@ class FetchRequestBuilder() { this } - def build() = FetchRequest(versionId, correlationId.getAndIncrement, clientId, replicaId, maxWait, minBytes, requestMap.toMap) + def build() = { + val fetchRequest = FetchRequest(versionId, correlationId.getAndIncrement, clientId, replicaId, maxWait, minBytes, requestMap.toMap) + requestMap.clear() + fetchRequest + } } diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 0b286f014bf..1ccf5786c9c 100644 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -46,6 +46,11 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke private val metricId = new ClientIdAndBroker(clientId, brokerInfo) val fetcherStats = new FetcherStats(metricId) val fetcherLagStats = new FetcherLagStats(metricId) + val fetchRequestBuilder = new FetchRequestBuilder(). + clientId(clientId). + replicaId(fetcherBrokerId). + maxWait(maxWait). + minBytes(minBytes) /* callbacks to be defined in subclass */ @@ -65,12 +70,6 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke } override def doWork() { - val fetchRequestBuilder = new FetchRequestBuilder(). - clientId(clientId). - replicaId(fetcherBrokerId). - maxWait(maxWait). - minBytes(minBytes) - partitionMapLock.lock() try { while (partitionMap.isEmpty)