support changing host/port of a broker; patched by David Arthur; reviewed by Jun Rao; kafka-474

git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8@1396116 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jun Rao 2012-10-09 17:08:42 +00:00
parent 452be9080e
commit d5df29b84f
1 changed files with 3 additions and 12 deletions

View File

@ -23,7 +23,7 @@ import kafka.cluster.Broker
abstract class AbstractFetcherManager(protected val name: String, numFetchers: Int = 1) extends Logging {
// map of (source brokerid, fetcher Id per source broker) => fetcher
private val fetcherThreadMap = new mutable.HashMap[(Int, Int), AbstractFetcherThread]
private val fetcherThreadMap = new mutable.HashMap[(Broker, Int), AbstractFetcherThread]
private val mapLock = new Object
this.logIdent = "[" + name + "], "
@ -37,7 +37,7 @@ abstract class AbstractFetcherManager(protected val name: String, numFetchers: I
def addFetcher(topic: String, partitionId: Int, initialOffset: Long, sourceBroker: Broker) {
mapLock synchronized {
var fetcherThread: AbstractFetcherThread = null
val key = (sourceBroker.id, getFetcherId(topic, partitionId))
val key = (sourceBroker, getFetcherId(topic, partitionId))
fetcherThreadMap.get(key) match {
case Some(f) => fetcherThread = f
case None =>
@ -64,15 +64,6 @@ abstract class AbstractFetcherManager(protected val name: String, numFetchers: I
}
}
def fetcherSourceBroker(topic: String, partitionId: Int): Option[Int] = {
mapLock synchronized {
for ( ((sourceBrokerId, _), fetcher) <- fetcherThreadMap)
if (fetcher.hasPartition(topic, partitionId))
return Some(sourceBrokerId)
}
None
}
def closeAllFetchers() {
mapLock synchronized {
for ( (_, fetcher) <- fetcherThreadMap) {
@ -81,4 +72,4 @@ abstract class AbstractFetcherManager(protected val name: String, numFetchers: I
fetcherThreadMap.clear()
}
}
}
}