diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 86ae2aacb71..3aa6eab099b 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -192,7 +192,7 @@ class Partition(val topic: String, case Some(leaderReplica) => val replica = getReplica(replicaId).get val leaderHW = leaderReplica.highWatermark - if (replica.logEndOffset >= leaderHW) { + if (!inSyncReplicas.contains(replica) && replica.logEndOffset >= leaderHW) { // expand ISR val newInSyncReplicas = inSyncReplicas + replica info("Expanding ISR for topic %s partition %d to %s".format(topic, partitionId, newInSyncReplicas.map(_.brokerId).mkString(","))) @@ -237,8 +237,10 @@ class Partition(val topic: String, val allLogEndOffsets = inSyncReplicas.map(_.logEndOffset) val newHighWatermark = allLogEndOffsets.min val oldHighWatermark = leaderReplica.highWatermark - if(newHighWatermark > oldHighWatermark) + if(newHighWatermark > oldHighWatermark) { leaderReplica.highWatermark = newHighWatermark + debug("Highwatermark for topic %s partition %d updated to %d".format(topic, partitionId, newHighWatermark)) + } else debug("Old hw for topic %s partition %d is %d. New hw is %d. All leo's are %s" .format(topic, partitionId, oldHighWatermark, newHighWatermark, allLogEndOffsets.mkString(","))) diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala index e1d7648a416..6d0e57e3e86 100644 --- a/core/src/main/scala/kafka/cluster/Replica.scala +++ b/core/src/main/scala/kafka/cluster/Replica.scala @@ -100,7 +100,7 @@ class Replica(val brokerId: Int, val replicaString = new StringBuilder replicaString.append("ReplicaId: " + brokerId) replicaString.append("; Topic: " + topic) - replicaString.append("; Partition: " + partition.toString) + replicaString.append("; Partition: " + partition.partitionId) replicaString.append("; isLocal: " + isLocal) if(isLocal) replicaString.append("; Highwatermark: " + highWatermark) replicaString.toString() diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index f709780be1b..90cf1873bc2 100644 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -163,7 +163,7 @@ class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (Reques val broker = m._1 val leaderAndIsr = m._2 val leaderAndIsrRequest = new LeaderAndIsrRequest(leaderAndIsr) - debug(("The leaderAndIsr request sent to broker %d is %s").format(broker, leaderAndIsrRequest)) + info("Sending to broker %d leaderAndIsr request of %s".format(broker, leaderAndIsrRequest)) sendRequest(broker, leaderAndIsrRequest, null) } brokerRequestMap.clear() diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 566beee38a4..d43af7f8b40 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -116,7 +116,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg updateLeaderAndIsrCache() // trigger OfflinePartition state for all partitions whose current leader is one amongst the dead brokers val partitionsWithoutLeader = controllerContext.allLeaders.filter(partitionAndLeader => - deadBrokers.contains(partitionAndLeader._2)).map(_._1).toSeq + deadBrokers.contains(partitionAndLeader._2)).keySet.toSeq partitionStateMachine.handleStateChanges(partitionsWithoutLeader, OfflinePartition) // trigger OnlinePartition state changes for offline or new partitions partitionStateMachine.triggerOnlinePartitionStateChange() diff --git a/core/src/main/scala/kafka/message/FileMessageSet.scala b/core/src/main/scala/kafka/message/FileMessageSet.scala index 49f34a0c795..2b484958e42 100644 --- a/core/src/main/scala/kafka/message/FileMessageSet.scala +++ b/core/src/main/scala/kafka/message/FileMessageSet.scala @@ -187,10 +187,7 @@ class FileMessageSet private[kafka](private[message] val channel: FileChannel, if(next >= 0) validUpTo = next } while(next >= 0) - channel.truncate(validUpTo) - setSize.set(validUpTo) - /* This should not be necessary, but fixes bug 6191269 on some OSs. */ - channel.position(validUpTo) + truncateTo(validUpTo) needRecover.set(false) len - validUpTo } @@ -201,6 +198,8 @@ class FileMessageSet private[kafka](private[message] val channel: FileChannel, " size of this log segment is only %d bytes".format(sizeInBytes())) channel.truncate(targetSize) setSize.set(targetSize) + /* This should not be necessary, but fixes bug 6191269 on some OSs. */ + channel.position(targetSize) } /** diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index da3488e055f..34caf6d865d 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -18,15 +18,15 @@ package kafka.network import java.util.concurrent._ -import kafka.utils.SystemTime import kafka.metrics.KafkaMetricsGroup import com.yammer.metrics.core.Gauge import java.nio.ByteBuffer import kafka.api._ import kafka.common.TopicAndPartition +import kafka.utils.{Logging, SystemTime} -object RequestChannel { +object RequestChannel extends Logging { val AllDone = new Request(1, 2, getShutdownReceive(), 0) def getShutdownReceive() = { @@ -45,6 +45,7 @@ object RequestChannel { val requestId = buffer.getShort() val requestObj: RequestOrResponse = RequestKeys.deserializerForKey(requestId)(buffer) buffer.rewind() + trace("Received request: %s".format(requestObj)) def updateRequestMetrics() { val endTimeNs = SystemTime.nanoseconds @@ -70,6 +71,7 @@ object RequestChannel { m.totalTimeHist.update(totalTime) } } + trace("Completed request: %s".format(requestObj)) } case class Response(processor: Int, request: Request, responseSend: Send) { diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala index de3e30bbfd6..6cd357c1fdd 100644 --- a/core/src/main/scala/kafka/producer/SyncProducer.scala +++ b/core/src/main/scala/kafka/producer/SyncProducer.scala @@ -37,8 +37,7 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { private val MaxConnectBackoffMs = 60000 private var sentOnConnection = 0 - /** make time-based reconnect starting at a random time **/ - private var lastConnectionTime = System.currentTimeMillis - SyncProducer.randomGenerator.nextDouble() * config.reconnectInterval + private var lastConnectionTime = -1L private val lock = new Object() @volatile private var shutdown: Boolean = false @@ -90,7 +89,6 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { if(sentOnConnection >= config.reconnectInterval || (config.reconnectTimeInterval >= 0 && System.currentTimeMillis - lastConnectionTime >= config.reconnectTimeInterval)) { reconnect() sentOnConnection = 0 - lastConnectionTime = System.currentTimeMillis } response } @@ -146,6 +144,7 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { while(!blockingChannel.isConnected && !shutdown) { try { blockingChannel.connect() + lastConnectionTime = System.currentTimeMillis info("Connected to " + config.host + ":" + config.port + " for producing") } catch { case e: Exception => { diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 225f83ba717..fbab2dbccb1 100644 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -112,7 +112,7 @@ abstract class AbstractFetcherThread(name: String, sourceBroker: Broker, socket warn("current offset %d for topic %s partition %d out of range; reset offset to %d" .format(currentOffset.get, topic, partitionId, newOffset)) case _ => - error("error for %s %d to broker %d".format(topic, partitionId, sourceBroker.host), + error("error for %s %d to broker %d".format(topic, partitionId, sourceBroker.id), ErrorMapping.exceptionFor(partitionData.error)) partitionsWithError += topicAndPartition fetchMap.remove(topicAndPartition) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 149add7b951..4fe8248d4a9 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -265,7 +265,7 @@ class KafkaApis(val requestChannel: RequestChannel, .format(partition, fetchRequest.clientId)) 0 case e => - error("Error determining available fetch bytes for topic %s partition %s on broker %s for client %s" + warn("Error determining available fetch bytes for topic %s partition %s on broker %s for client %s" .format(topic, partition, brokerId, fetchRequest.clientId), e) 0 } @@ -562,6 +562,7 @@ class KafkaApis(val requestChannel: RequestChannel, // unblocked if there are no partitions with pending acks val satisfied = ! partitionStatus.exists(p => p._2.acksPending) + trace("Producer request satisfaction for %s-%d = %b".format(topic, partitionId, satisfied)) satisfied } diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala index 39a6441036a..69db208e6ba 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala @@ -23,7 +23,7 @@ class ReplicaFetcherManager(private val brokerConfig: KafkaConfig, private val r extends AbstractFetcherManager("ReplicaFetcherManager on broker " + brokerConfig.brokerId + ", ", brokerConfig.numReplicaFetchers) { override def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = { - new ReplicaFetcherThread("ReplicaFetcherThread-%d-%d-on-broker-%d-".format(sourceBroker.id, fetcherId, brokerConfig.brokerId), sourceBroker, brokerConfig, replicaMgr) + new ReplicaFetcherThread("ReplicaFetcherThread-%d-%d-on-broker-%d".format(sourceBroker.id, fetcherId, brokerConfig.brokerId), sourceBroker, brokerConfig, replicaMgr) } def shutdown() { diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala index 85ae0c587df..986700b0a53 100644 --- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala +++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala @@ -45,7 +45,10 @@ object DumpLogSegments { println("payload:\t" + Utils.toString(messageAndOffset.message.payload, "UTF-8")) offset = messageAndOffset.offset } - println("tail of the log is at offset: " + (startOffset + offset)) + val endOffset = startOffset + offset + println("Tail of the log is at offset: " + endOffset) + if (messageSet.sizeInBytes != endOffset) + println("Log corrupted from " + endOffset + " to " + messageSet.sizeInBytes + "!!!") } } }