mirror of https://github.com/apache/kafka.git
Bug in mirroring code causes mirroring to halt; patched by Jun Rao; reviewed by Neha Narkhede; KAFKA-225
git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/trunk@1213990 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
bbba103d76
commit
31f6f02aa2
|
@ -0,0 +1,26 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package kafka.common
|
||||
|
||||
/**
|
||||
* Thrown when a request is made for broker but no brokers with that topic
|
||||
* exist.
|
||||
*/
|
||||
class ConsumerRebalanceFailedException(message: String) extends RuntimeException(message) {
|
||||
def this() = this(null)
|
||||
}
|
|
@ -29,7 +29,7 @@ import org.apache.zookeeper.Watcher.Event.KeeperState
|
|||
import kafka.api.OffsetRequest
|
||||
import java.util.UUID
|
||||
import kafka.serializer.Decoder
|
||||
import kafka.common.InvalidConfigException
|
||||
import kafka.common.{ConsumerRebalanceFailedException, InvalidConfigException}
|
||||
|
||||
/**
|
||||
* This class handles the consumers interaction with zookeeper
|
||||
|
@ -446,7 +446,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
|
|||
}
|
||||
}
|
||||
|
||||
throw new RuntimeException(consumerIdString + " can't rebalance after " + config.maxRebalanceRetries +" retries")
|
||||
throw new ConsumerRebalanceFailedException(consumerIdString + " can't rebalance after " + config.maxRebalanceRetries +" retries")
|
||||
}
|
||||
|
||||
private def rebalance(): Boolean = {
|
||||
|
|
|
@ -21,9 +21,11 @@ import scala.collection.JavaConversions._
|
|||
import kafka.utils.{Utils, ZkUtils, ZKStringSerializer, Logging}
|
||||
import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient}
|
||||
import org.apache.zookeeper.Watcher.Event.KeeperState
|
||||
import kafka.server.KafkaServerStartable
|
||||
import kafka.common.ConsumerRebalanceFailedException
|
||||
|
||||
class ZookeeperTopicEventWatcher(val config:ConsumerConfig,
|
||||
val eventHandler: TopicEventHandler[String]) extends Logging {
|
||||
val eventHandler: TopicEventHandler[String], kafkaServerStartable: KafkaServerStartable) extends Logging {
|
||||
|
||||
val lock = new Object()
|
||||
|
||||
|
@ -33,7 +35,7 @@ class ZookeeperTopicEventWatcher(val config:ConsumerConfig,
|
|||
startWatchingTopicEvents()
|
||||
|
||||
private def startWatchingTopicEvents() {
|
||||
val topicEventListener = new ZkTopicEventListener
|
||||
val topicEventListener = new ZkTopicEventListener(kafkaServerStartable)
|
||||
ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.BrokerTopicsPath)
|
||||
|
||||
zkClient.subscribeStateChanges(
|
||||
|
@ -50,24 +52,17 @@ class ZookeeperTopicEventWatcher(val config:ConsumerConfig,
|
|||
|
||||
def shutdown() {
|
||||
lock.synchronized {
|
||||
try {
|
||||
if (zkClient != null) {
|
||||
stopWatchingTopicEvents()
|
||||
zkClient.close()
|
||||
zkClient = null
|
||||
}
|
||||
else
|
||||
warn("Cannot shutdown already shutdown topic event watcher.")
|
||||
}
|
||||
catch {
|
||||
case e =>
|
||||
fatal(e)
|
||||
fatal(Utils.stackTrace(e))
|
||||
if (zkClient != null) {
|
||||
stopWatchingTopicEvents()
|
||||
zkClient.close()
|
||||
zkClient = null
|
||||
}
|
||||
else
|
||||
warn("Cannot shutdown already shutdown topic event watcher.")
|
||||
}
|
||||
}
|
||||
|
||||
class ZkTopicEventListener() extends IZkChildListener {
|
||||
class ZkTopicEventListener(val kafkaServerStartable: KafkaServerStartable) extends IZkChildListener {
|
||||
|
||||
@throws(classOf[Exception])
|
||||
def handleChildChange(parent: String, children: java.util.List[String]) {
|
||||
|
@ -81,9 +76,11 @@ class ZookeeperTopicEventWatcher(val config:ConsumerConfig,
|
|||
}
|
||||
}
|
||||
catch {
|
||||
case e: ConsumerRebalanceFailedException =>
|
||||
fatal("can't rebalance in embedded consumer; proceed to shutdown", e)
|
||||
kafkaServerStartable.shutdown()
|
||||
case e =>
|
||||
fatal(e)
|
||||
fatal(Utils.stackTrace(e))
|
||||
error("error in handling child changes in embedded consumer", e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -39,7 +39,7 @@ class KafkaServerStartable(val serverConfig: KafkaConfig,
|
|||
server = new KafkaServer(serverConfig)
|
||||
if (consumerConfig != null)
|
||||
embeddedConsumer =
|
||||
new EmbeddedConsumer(consumerConfig, producerConfig, server)
|
||||
new EmbeddedConsumer(consumerConfig, producerConfig, this)
|
||||
}
|
||||
|
||||
def startup() {
|
||||
|
@ -75,7 +75,7 @@ class KafkaServerStartable(val serverConfig: KafkaConfig,
|
|||
|
||||
class EmbeddedConsumer(private val consumerConfig: ConsumerConfig,
|
||||
private val producerConfig: ProducerConfig,
|
||||
private val kafkaServer: KafkaServer) extends TopicEventHandler[String] with Logging {
|
||||
private val kafkaServerStartable: KafkaServerStartable) extends TopicEventHandler[String] with Logging {
|
||||
|
||||
private val whiteListTopics =
|
||||
consumerConfig.mirrorTopicsWhitelist.split(",").toList.map(_.trim)
|
||||
|
@ -160,7 +160,8 @@ class EmbeddedConsumer(private val consumerConfig: ConsumerConfig,
|
|||
}
|
||||
|
||||
def startup() {
|
||||
topicEventWatcher = new ZookeeperTopicEventWatcher(consumerConfig, this)
|
||||
info("staring up embedded consumer")
|
||||
topicEventWatcher = new ZookeeperTopicEventWatcher(consumerConfig, this, kafkaServerStartable)
|
||||
/*
|
||||
* consumer threads are (re-)started upon topic events (which includes an
|
||||
* initial startup event which lists the current topics)
|
||||
|
|
Loading…
Reference in New Issue