diff --git a/core/src/main/scala/kafka/consumer/FetcherRunnable.scala b/core/src/main/scala/kafka/consumer/FetcherRunnable.scala index a16d3f1c7a4..bfe6214324e 100644 --- a/core/src/main/scala/kafka/consumer/FetcherRunnable.scala +++ b/core/src/main/scala/kafka/consumer/FetcherRunnable.scala @@ -69,7 +69,7 @@ class FetcherRunnable(val name: String, try { var done = false if(messages.getErrorCode == ErrorMapping.OffsetOutOfRangeCode) { - logger.info("offset " + info.getFetchOffset + " out of range") + logger.info("offset for " + info + " out of range") // see if we can fix this error val resetOffset = resetConsumerOffsets(info.topic, info.partition) if(resetOffset >= 0) { @@ -136,7 +136,8 @@ class FetcherRunnable(val name: String, val topicDirs = new ZKGroupTopicDirs(config.groupId, topic) // reset manually in zookeeper - logger.info("updating partition " + partition.name + " with " + (if(offset == OffsetRequest.EarliestTime) "earliest " else " latest ") + "offset " + offsets(0)) + logger.info("updating partition " + partition.name + " for topic " + topic + " with " + + (if(offset == OffsetRequest.EarliestTime) "earliest " else " latest ") + "offset " + offsets(0)) ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + partition.name, offsets(0).toString) offsets(0) diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 6e4f8acaa66..429d4993492 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -137,8 +137,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, } catch { case e => - logger.fatal(e) - logger.fatal(Utils.stackTrace(e)) + logger.fatal("error during consumer connector shutdown", e) } logger.info("ZKConsumerConnector shut down completed") } @@ -240,7 +239,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, catch { case t: Throwable => // log it and let it go - logger.warn("exception during commitOffsets: " + t + Utils.stackTrace(t)) + logger.warn("exception during commitOffsets", t) } if(logger.isDebugEnabled) logger.debug("Committed offset " + newOffset + " for topic " + info) @@ -434,7 +433,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, // occasionally, we may hit a ZK exception because the ZK state is changing while we are iterating. // For example, a ZK node can disappear between the time we get all children and the time we try to get // the value of a child. Just let this go since another rebalance will be triggered. - logger.info("exception during rebalance " + e) + logger.info("exception during rebalance ", e) } logger.info("end rebalancing consumer " + consumerIdString + " try #" + i) if (done) @@ -450,12 +449,6 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, } private def rebalance(): Boolean = { - // testing code - //if ("group1_consumer1" == consumerIdString) { - // logger.info("sleeping " + consumerIdString) - // Thread.sleep(20) - //} - val myTopicThreadIdsMap = getTopicCount(consumerIdString).getConsumerThreadIdsPerTopic val cluster = ZkUtils.getCluster(zkClient) val consumersPerTopicMap = getConsumersPerTopic(group)