correlationId is not set in FetchRequest in AbstractFetcherThread; patched by Jun Rao; reviewed by Neha Narkhede and Swapnil Ghike; kafka-738

This commit is contained in:
Jun Rao 2013-01-29 17:00:49 -08:00
parent 1fb3e8c037
commit b3a4fe9ced
2 changed files with 10 additions and 7 deletions

View File

@ -201,5 +201,9 @@ class FetchRequestBuilder() {
this
}
def build() = FetchRequest(versionId, correlationId.getAndIncrement, clientId, replicaId, maxWait, minBytes, requestMap.toMap)
def build() = {
val fetchRequest = FetchRequest(versionId, correlationId.getAndIncrement, clientId, replicaId, maxWait, minBytes, requestMap.toMap)
requestMap.clear()
fetchRequest
}
}

View File

@ -46,6 +46,11 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
private val metricId = new ClientIdAndBroker(clientId, brokerInfo)
val fetcherStats = new FetcherStats(metricId)
val fetcherLagStats = new FetcherLagStats(metricId)
val fetchRequestBuilder = new FetchRequestBuilder().
clientId(clientId).
replicaId(fetcherBrokerId).
maxWait(maxWait).
minBytes(minBytes)
/* callbacks to be defined in subclass */
@ -65,12 +70,6 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
}
override def doWork() {
val fetchRequestBuilder = new FetchRequestBuilder().
clientId(clientId).
replicaId(fetcherBrokerId).
maxWait(maxWait).
minBytes(minBytes)
partitionMapLock.lock()
try {
while (partitionMap.isEmpty)