mirror of https://github.com/apache/kafka.git
				
				
				
			Code clean up in FetcherRunnable and ZookeeperConsumerConnector; KAFKA-120; patched by junrao; reviewed by nehanarkhede
git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/trunk@1160953 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
		
							parent
							
								
									bf30ae996f
								
							
						
					
					
						commit
						72544aa51e
					
				|  | @ -69,7 +69,7 @@ class FetcherRunnable(val name: String, | ||||||
|           try { |           try { | ||||||
|             var done = false |             var done = false | ||||||
|             if(messages.getErrorCode == ErrorMapping.OffsetOutOfRangeCode) { |             if(messages.getErrorCode == ErrorMapping.OffsetOutOfRangeCode) { | ||||||
|               logger.info("offset " + info.getFetchOffset + " out of range") |               logger.info("offset for " + info + " out of range") | ||||||
|               // see if we can fix this error |               // see if we can fix this error | ||||||
|               val resetOffset = resetConsumerOffsets(info.topic, info.partition) |               val resetOffset = resetConsumerOffsets(info.topic, info.partition) | ||||||
|               if(resetOffset >= 0) { |               if(resetOffset >= 0) { | ||||||
|  | @ -136,7 +136,8 @@ class FetcherRunnable(val name: String, | ||||||
|     val topicDirs = new ZKGroupTopicDirs(config.groupId, topic) |     val topicDirs = new ZKGroupTopicDirs(config.groupId, topic) | ||||||
| 
 | 
 | ||||||
|     // reset manually in zookeeper |     // reset manually in zookeeper | ||||||
|     logger.info("updating partition " + partition.name + " with " + (if(offset == OffsetRequest.EarliestTime) "earliest " else " latest ") + "offset " + offsets(0)) |     logger.info("updating partition " + partition.name + " for topic " + topic + " with " + | ||||||
|  |             (if(offset == OffsetRequest.EarliestTime) "earliest " else " latest ") + "offset " + offsets(0)) | ||||||
|     ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + partition.name, offsets(0).toString) |     ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + partition.name, offsets(0).toString) | ||||||
| 
 | 
 | ||||||
|     offsets(0) |     offsets(0) | ||||||
|  |  | ||||||
|  | @ -137,8 +137,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, | ||||||
|       } |       } | ||||||
|       catch { |       catch { | ||||||
|         case e => |         case e => | ||||||
|           logger.fatal(e) |           logger.fatal("error during consumer connector shutdown", e) | ||||||
|           logger.fatal(Utils.stackTrace(e)) |  | ||||||
|       } |       } | ||||||
|       logger.info("ZKConsumerConnector shut down completed") |       logger.info("ZKConsumerConnector shut down completed") | ||||||
|     } |     } | ||||||
|  | @ -240,7 +239,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, | ||||||
|         catch { |         catch { | ||||||
|           case t: Throwable => |           case t: Throwable => | ||||||
|           // log it and let it go |           // log it and let it go | ||||||
|             logger.warn("exception during commitOffsets: " + t + Utils.stackTrace(t)) |             logger.warn("exception during commitOffsets",  t) | ||||||
|         } |         } | ||||||
|         if(logger.isDebugEnabled) |         if(logger.isDebugEnabled) | ||||||
|           logger.debug("Committed offset " + newOffset + " for topic " + info) |           logger.debug("Committed offset " + newOffset + " for topic " + info) | ||||||
|  | @ -434,7 +433,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, | ||||||
|               // occasionally, we may hit a ZK exception because the ZK state is changing while we are iterating. |               // occasionally, we may hit a ZK exception because the ZK state is changing while we are iterating. | ||||||
|               // For example, a ZK node can disappear between the time we get all children and the time we try to get |               // For example, a ZK node can disappear between the time we get all children and the time we try to get | ||||||
|               // the value of a child. Just let this go since another rebalance will be triggered. |               // the value of a child. Just let this go since another rebalance will be triggered. | ||||||
|               logger.info("exception during rebalance " + e) |               logger.info("exception during rebalance ", e) | ||||||
|           } |           } | ||||||
|           logger.info("end rebalancing consumer " + consumerIdString + " try #" + i) |           logger.info("end rebalancing consumer " + consumerIdString + " try #" + i) | ||||||
|           if (done) |           if (done) | ||||||
|  | @ -450,12 +449,6 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     private def rebalance(): Boolean = { |     private def rebalance(): Boolean = { | ||||||
|       // testing code |  | ||||||
|       //if ("group1_consumer1" == consumerIdString) { |  | ||||||
|       //  logger.info("sleeping " + consumerIdString) |  | ||||||
|       //  Thread.sleep(20) |  | ||||||
|       //} |  | ||||||
| 
 |  | ||||||
|       val myTopicThreadIdsMap = getTopicCount(consumerIdString).getConsumerThreadIdsPerTopic |       val myTopicThreadIdsMap = getTopicCount(consumerIdString).getConsumerThreadIdsPerTopic | ||||||
|       val cluster = ZkUtils.getCluster(zkClient) |       val cluster = ZkUtils.getCluster(zkClient) | ||||||
|       val consumersPerTopicMap = getConsumersPerTopic(group) |       val consumersPerTopicMap = getConsumersPerTopic(group) | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue