IndexOutOfBoundsException thrown by kafka.consumer.ConsumerFetcherThread; patched by Jun Rao; reviewed by Jay Kreps and Neha Narkhede; kafka-528

git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8@1391854 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jun Rao 2012-09-29 18:17:39 +00:00
parent 77b511f696
commit 9623ae4de3
11 changed files with 24 additions and 18 deletions

View File

@ -192,7 +192,7 @@ class Partition(val topic: String,
case Some(leaderReplica) => case Some(leaderReplica) =>
val replica = getReplica(replicaId).get val replica = getReplica(replicaId).get
val leaderHW = leaderReplica.highWatermark val leaderHW = leaderReplica.highWatermark
if (replica.logEndOffset >= leaderHW) { if (!inSyncReplicas.contains(replica) && replica.logEndOffset >= leaderHW) {
// expand ISR // expand ISR
val newInSyncReplicas = inSyncReplicas + replica val newInSyncReplicas = inSyncReplicas + replica
info("Expanding ISR for topic %s partition %d to %s".format(topic, partitionId, newInSyncReplicas.map(_.brokerId).mkString(","))) info("Expanding ISR for topic %s partition %d to %s".format(topic, partitionId, newInSyncReplicas.map(_.brokerId).mkString(",")))
@ -237,8 +237,10 @@ class Partition(val topic: String,
val allLogEndOffsets = inSyncReplicas.map(_.logEndOffset) val allLogEndOffsets = inSyncReplicas.map(_.logEndOffset)
val newHighWatermark = allLogEndOffsets.min val newHighWatermark = allLogEndOffsets.min
val oldHighWatermark = leaderReplica.highWatermark val oldHighWatermark = leaderReplica.highWatermark
if(newHighWatermark > oldHighWatermark) if(newHighWatermark > oldHighWatermark) {
leaderReplica.highWatermark = newHighWatermark leaderReplica.highWatermark = newHighWatermark
debug("Highwatermark for topic %s partition %d updated to %d".format(topic, partitionId, newHighWatermark))
}
else else
debug("Old hw for topic %s partition %d is %d. New hw is %d. All leo's are %s" debug("Old hw for topic %s partition %d is %d. New hw is %d. All leo's are %s"
.format(topic, partitionId, oldHighWatermark, newHighWatermark, allLogEndOffsets.mkString(","))) .format(topic, partitionId, oldHighWatermark, newHighWatermark, allLogEndOffsets.mkString(",")))

View File

@ -100,7 +100,7 @@ class Replica(val brokerId: Int,
val replicaString = new StringBuilder val replicaString = new StringBuilder
replicaString.append("ReplicaId: " + brokerId) replicaString.append("ReplicaId: " + brokerId)
replicaString.append("; Topic: " + topic) replicaString.append("; Topic: " + topic)
replicaString.append("; Partition: " + partition.toString) replicaString.append("; Partition: " + partition.partitionId)
replicaString.append("; isLocal: " + isLocal) replicaString.append("; isLocal: " + isLocal)
if(isLocal) replicaString.append("; Highwatermark: " + highWatermark) if(isLocal) replicaString.append("; Highwatermark: " + highWatermark)
replicaString.toString() replicaString.toString()

View File

@ -163,7 +163,7 @@ class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (Reques
val broker = m._1 val broker = m._1
val leaderAndIsr = m._2 val leaderAndIsr = m._2
val leaderAndIsrRequest = new LeaderAndIsrRequest(leaderAndIsr) val leaderAndIsrRequest = new LeaderAndIsrRequest(leaderAndIsr)
debug(("The leaderAndIsr request sent to broker %d is %s").format(broker, leaderAndIsrRequest)) info("Sending to broker %d leaderAndIsr request of %s".format(broker, leaderAndIsrRequest))
sendRequest(broker, leaderAndIsrRequest, null) sendRequest(broker, leaderAndIsrRequest, null)
} }
brokerRequestMap.clear() brokerRequestMap.clear()

View File

@ -116,7 +116,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
updateLeaderAndIsrCache() updateLeaderAndIsrCache()
// trigger OfflinePartition state for all partitions whose current leader is one amongst the dead brokers // trigger OfflinePartition state for all partitions whose current leader is one amongst the dead brokers
val partitionsWithoutLeader = controllerContext.allLeaders.filter(partitionAndLeader => val partitionsWithoutLeader = controllerContext.allLeaders.filter(partitionAndLeader =>
deadBrokers.contains(partitionAndLeader._2)).map(_._1).toSeq deadBrokers.contains(partitionAndLeader._2)).keySet.toSeq
partitionStateMachine.handleStateChanges(partitionsWithoutLeader, OfflinePartition) partitionStateMachine.handleStateChanges(partitionsWithoutLeader, OfflinePartition)
// trigger OnlinePartition state changes for offline or new partitions // trigger OnlinePartition state changes for offline or new partitions
partitionStateMachine.triggerOnlinePartitionStateChange() partitionStateMachine.triggerOnlinePartitionStateChange()

View File

@ -187,10 +187,7 @@ class FileMessageSet private[kafka](private[message] val channel: FileChannel,
if(next >= 0) if(next >= 0)
validUpTo = next validUpTo = next
} while(next >= 0) } while(next >= 0)
channel.truncate(validUpTo) truncateTo(validUpTo)
setSize.set(validUpTo)
/* This should not be necessary, but fixes bug 6191269 on some OSs. */
channel.position(validUpTo)
needRecover.set(false) needRecover.set(false)
len - validUpTo len - validUpTo
} }
@ -201,6 +198,8 @@ class FileMessageSet private[kafka](private[message] val channel: FileChannel,
" size of this log segment is only %d bytes".format(sizeInBytes())) " size of this log segment is only %d bytes".format(sizeInBytes()))
channel.truncate(targetSize) channel.truncate(targetSize)
setSize.set(targetSize) setSize.set(targetSize)
/* This should not be necessary, but fixes bug 6191269 on some OSs. */
channel.position(targetSize)
} }
/** /**

View File

@ -18,15 +18,15 @@
package kafka.network package kafka.network
import java.util.concurrent._ import java.util.concurrent._
import kafka.utils.SystemTime
import kafka.metrics.KafkaMetricsGroup import kafka.metrics.KafkaMetricsGroup
import com.yammer.metrics.core.Gauge import com.yammer.metrics.core.Gauge
import java.nio.ByteBuffer import java.nio.ByteBuffer
import kafka.api._ import kafka.api._
import kafka.common.TopicAndPartition import kafka.common.TopicAndPartition
import kafka.utils.{Logging, SystemTime}
object RequestChannel { object RequestChannel extends Logging {
val AllDone = new Request(1, 2, getShutdownReceive(), 0) val AllDone = new Request(1, 2, getShutdownReceive(), 0)
def getShutdownReceive() = { def getShutdownReceive() = {
@ -45,6 +45,7 @@ object RequestChannel {
val requestId = buffer.getShort() val requestId = buffer.getShort()
val requestObj: RequestOrResponse = RequestKeys.deserializerForKey(requestId)(buffer) val requestObj: RequestOrResponse = RequestKeys.deserializerForKey(requestId)(buffer)
buffer.rewind() buffer.rewind()
trace("Received request: %s".format(requestObj))
def updateRequestMetrics() { def updateRequestMetrics() {
val endTimeNs = SystemTime.nanoseconds val endTimeNs = SystemTime.nanoseconds
@ -70,6 +71,7 @@ object RequestChannel {
m.totalTimeHist.update(totalTime) m.totalTimeHist.update(totalTime)
} }
} }
trace("Completed request: %s".format(requestObj))
} }
case class Response(processor: Int, request: Request, responseSend: Send) { case class Response(processor: Int, request: Request, responseSend: Send) {

View File

@ -37,8 +37,7 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
private val MaxConnectBackoffMs = 60000 private val MaxConnectBackoffMs = 60000
private var sentOnConnection = 0 private var sentOnConnection = 0
/** make time-based reconnect starting at a random time **/ private var lastConnectionTime = -1L
private var lastConnectionTime = System.currentTimeMillis - SyncProducer.randomGenerator.nextDouble() * config.reconnectInterval
private val lock = new Object() private val lock = new Object()
@volatile private var shutdown: Boolean = false @volatile private var shutdown: Boolean = false
@ -90,7 +89,6 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
if(sentOnConnection >= config.reconnectInterval || (config.reconnectTimeInterval >= 0 && System.currentTimeMillis - lastConnectionTime >= config.reconnectTimeInterval)) { if(sentOnConnection >= config.reconnectInterval || (config.reconnectTimeInterval >= 0 && System.currentTimeMillis - lastConnectionTime >= config.reconnectTimeInterval)) {
reconnect() reconnect()
sentOnConnection = 0 sentOnConnection = 0
lastConnectionTime = System.currentTimeMillis
} }
response response
} }
@ -146,6 +144,7 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
while(!blockingChannel.isConnected && !shutdown) { while(!blockingChannel.isConnected && !shutdown) {
try { try {
blockingChannel.connect() blockingChannel.connect()
lastConnectionTime = System.currentTimeMillis
info("Connected to " + config.host + ":" + config.port + " for producing") info("Connected to " + config.host + ":" + config.port + " for producing")
} catch { } catch {
case e: Exception => { case e: Exception => {

View File

@ -112,7 +112,7 @@ abstract class AbstractFetcherThread(name: String, sourceBroker: Broker, socket
warn("current offset %d for topic %s partition %d out of range; reset offset to %d" warn("current offset %d for topic %s partition %d out of range; reset offset to %d"
.format(currentOffset.get, topic, partitionId, newOffset)) .format(currentOffset.get, topic, partitionId, newOffset))
case _ => case _ =>
error("error for %s %d to broker %d".format(topic, partitionId, sourceBroker.host), error("error for %s %d to broker %d".format(topic, partitionId, sourceBroker.id),
ErrorMapping.exceptionFor(partitionData.error)) ErrorMapping.exceptionFor(partitionData.error))
partitionsWithError += topicAndPartition partitionsWithError += topicAndPartition
fetchMap.remove(topicAndPartition) fetchMap.remove(topicAndPartition)

View File

@ -265,7 +265,7 @@ class KafkaApis(val requestChannel: RequestChannel,
.format(partition, fetchRequest.clientId)) .format(partition, fetchRequest.clientId))
0 0
case e => case e =>
error("Error determining available fetch bytes for topic %s partition %s on broker %s for client %s" warn("Error determining available fetch bytes for topic %s partition %s on broker %s for client %s"
.format(topic, partition, brokerId, fetchRequest.clientId), e) .format(topic, partition, brokerId, fetchRequest.clientId), e)
0 0
} }
@ -562,6 +562,7 @@ class KafkaApis(val requestChannel: RequestChannel,
// unblocked if there are no partitions with pending acks // unblocked if there are no partitions with pending acks
val satisfied = ! partitionStatus.exists(p => p._2.acksPending) val satisfied = ! partitionStatus.exists(p => p._2.acksPending)
trace("Producer request satisfaction for %s-%d = %b".format(topic, partitionId, satisfied))
satisfied satisfied
} }

View File

@ -23,7 +23,7 @@ class ReplicaFetcherManager(private val brokerConfig: KafkaConfig, private val r
extends AbstractFetcherManager("ReplicaFetcherManager on broker " + brokerConfig.brokerId + ", ", brokerConfig.numReplicaFetchers) { extends AbstractFetcherManager("ReplicaFetcherManager on broker " + brokerConfig.brokerId + ", ", brokerConfig.numReplicaFetchers) {
override def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = { override def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = {
new ReplicaFetcherThread("ReplicaFetcherThread-%d-%d-on-broker-%d-".format(sourceBroker.id, fetcherId, brokerConfig.brokerId), sourceBroker, brokerConfig, replicaMgr) new ReplicaFetcherThread("ReplicaFetcherThread-%d-%d-on-broker-%d".format(sourceBroker.id, fetcherId, brokerConfig.brokerId), sourceBroker, brokerConfig, replicaMgr)
} }
def shutdown() { def shutdown() {

View File

@ -45,7 +45,10 @@ object DumpLogSegments {
println("payload:\t" + Utils.toString(messageAndOffset.message.payload, "UTF-8")) println("payload:\t" + Utils.toString(messageAndOffset.message.payload, "UTF-8"))
offset = messageAndOffset.offset offset = messageAndOffset.offset
} }
println("tail of the log is at offset: " + (startOffset + offset)) val endOffset = startOffset + offset
println("Tail of the log is at offset: " + endOffset)
if (messageSet.sizeInBytes != endOffset)
println("Log corrupted from " + endOffset + " to " + messageSet.sizeInBytes + "!!!")
} }
} }
} }