mirror of https://github.com/apache/kafka.git
KAFKA-320 testZKSendWithDeadBroker fails intermittently due to ZKNodeExistsException; patched by nehanarkhede; reviewed by junrao and prashanth menon
git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8@1310661 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
a260bfacae
commit
79a3b31f26
|
@ -88,6 +88,7 @@ class ProducerPool(val config: ProducerConfig, val zkClient: ZkClient) extends L
|
||||||
val iter = syncProducers.values.iterator
|
val iter = syncProducers.values.iterator
|
||||||
while(iter.hasNext)
|
while(iter.hasNext)
|
||||||
iter.next.close
|
iter.next.close
|
||||||
|
zkClient.close()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,13 +17,13 @@
|
||||||
|
|
||||||
package kafka.server
|
package kafka.server
|
||||||
|
|
||||||
import java.util.concurrent._
|
|
||||||
import java.util.concurrent.atomic._
|
|
||||||
import java.io.File
|
import java.io.File
|
||||||
import kafka.network.{SocketServerStats, SocketServer}
|
import kafka.network.{SocketServerStats, SocketServer}
|
||||||
import kafka.log.LogManager
|
import kafka.log.LogManager
|
||||||
import kafka.utils._
|
import kafka.utils._
|
||||||
import kafka.cluster.Replica
|
import kafka.cluster.Replica
|
||||||
|
import java.util.concurrent._
|
||||||
|
import atomic.AtomicBoolean
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Represents the lifecycle of a single Kafka broker. Handles all functionality required
|
* Represents the lifecycle of a single Kafka broker. Handles all functionality required
|
||||||
|
@ -32,8 +32,8 @@ import kafka.cluster.Replica
|
||||||
class KafkaServer(val config: KafkaConfig) extends Logging {
|
class KafkaServer(val config: KafkaConfig) extends Logging {
|
||||||
|
|
||||||
val CleanShutdownFile = ".kafka_cleanshutdown"
|
val CleanShutdownFile = ".kafka_cleanshutdown"
|
||||||
private val isShuttingDown = new AtomicBoolean(false)
|
private var isShuttingDown = new AtomicBoolean(false)
|
||||||
private val shutdownLatch = new CountDownLatch(1)
|
private var shutdownLatch = new CountDownLatch(1)
|
||||||
private val statsMBeanName = "kafka:type=kafka.SocketServerStats"
|
private val statsMBeanName = "kafka:type=kafka.SocketServerStats"
|
||||||
var socketServer: SocketServer = null
|
var socketServer: SocketServer = null
|
||||||
var requestHandlerPool: KafkaRequestHandlerPool = null
|
var requestHandlerPool: KafkaRequestHandlerPool = null
|
||||||
|
@ -47,6 +47,8 @@ class KafkaServer(val config: KafkaConfig) extends Logging {
|
||||||
*/
|
*/
|
||||||
def startup() {
|
def startup() {
|
||||||
info("Starting Kafka server...")
|
info("Starting Kafka server...")
|
||||||
|
isShuttingDown = new AtomicBoolean(false)
|
||||||
|
shutdownLatch = new CountDownLatch(1)
|
||||||
var needRecovery = true
|
var needRecovery = true
|
||||||
val cleanShutDownFile = new File(new File(config.logDir), CleanShutdownFile)
|
val cleanShutDownFile = new File(new File(config.logDir), CleanShutdownFile)
|
||||||
if (cleanShutDownFile.exists) {
|
if (cleanShutDownFile.exists) {
|
||||||
|
|
|
@ -29,7 +29,6 @@ import scala.collection.mutable
|
||||||
import kafka.message.{NoCompressionCodec, CompressionCodec}
|
import kafka.message.{NoCompressionCodec, CompressionCodec}
|
||||||
import org.I0Itec.zkclient.ZkClient
|
import org.I0Itec.zkclient.ZkClient
|
||||||
import java.util.{Random, Properties}
|
import java.util.{Random, Properties}
|
||||||
import kafka.network.{BoundedByteBufferReceive, Receive, BoundedByteBufferSend, Request}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Helper functions!
|
* Helper functions!
|
||||||
|
|
|
@ -4,9 +4,9 @@
|
||||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
# (the "License"); you may not use this file except in compliance with
|
# (the "License"); you may not use this file except in compliance with
|
||||||
# the License. You may obtain a copy of the License at
|
# the License. You may obtain a copy of the License at
|
||||||
#
|
#
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
#
|
#
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
@ -21,5 +21,5 @@ log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
|
||||||
log4j.logger.kafka=ERROR
|
log4j.logger.kafka=ERROR
|
||||||
|
|
||||||
# zkclient can be verbose, during debugging it is common to adjust is separately
|
# zkclient can be verbose, during debugging it is common to adjust is separately
|
||||||
log4j.logger.org.I0Itec.zkclient.ZkClient=ERROR
|
log4j.logger.org.I0Itec.zkclient.ZkClient=WARN
|
||||||
log4j.logger.org.apache.zookeeper=ERROR
|
log4j.logger.org.apache.zookeeper=WARN
|
||||||
|
|
|
@ -134,18 +134,18 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness {
|
||||||
List("1", "2", "3"),
|
List("1", "2", "3"),
|
||||||
List("1", "3", "4")
|
List("1", "3", "4")
|
||||||
)
|
)
|
||||||
TestUtils.createBrokersInZk(zookeeper.client, List(0, 1, 2, 3, 4))
|
TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3, 4))
|
||||||
|
|
||||||
val topic = "test"
|
val topic = "test"
|
||||||
// create the topic
|
// create the topic
|
||||||
AdminUtils.createReplicaAssignmentPathInZK(topic, expectedReplicaAssignment, zookeeper.client)
|
AdminUtils.createReplicaAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
|
||||||
val actualReplicaAssignment = AdminUtils.getTopicMetaDataFromZK(List(topic), zookeeper.client).head
|
val actualReplicaAssignment = AdminUtils.getTopicMetaDataFromZK(List(topic), zkClient).head
|
||||||
.get.partitionsMetadata.map(p => p.replicas)
|
.get.partitionsMetadata.map(p => p.replicas)
|
||||||
val actualReplicaList = actualReplicaAssignment.map(r => r.map(b => b.id.toString).toList).toList
|
val actualReplicaList = actualReplicaAssignment.map(r => r.map(b => b.id.toString).toList).toList
|
||||||
expectedReplicaAssignment.toList.zip(actualReplicaList).foreach(l => assertEquals(l._1, l._2))
|
expectedReplicaAssignment.toList.zip(actualReplicaList).foreach(l => assertEquals(l._1, l._2))
|
||||||
|
|
||||||
try {
|
try {
|
||||||
AdminUtils.createReplicaAssignmentPathInZK(topic, expectedReplicaAssignment, zookeeper.client)
|
AdminUtils.createReplicaAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
|
||||||
fail("shouldn't be able to create a topic already exists")
|
fail("shouldn't be able to create a topic already exists")
|
||||||
}
|
}
|
||||||
catch {
|
catch {
|
||||||
|
@ -161,10 +161,10 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness {
|
||||||
List("1", "2", "3")
|
List("1", "2", "3")
|
||||||
)
|
)
|
||||||
val topic = "auto-topic"
|
val topic = "auto-topic"
|
||||||
TestUtils.createBrokersInZk(zookeeper.client, List(0, 1, 2, 3))
|
TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3))
|
||||||
AdminUtils.createReplicaAssignmentPathInZK(topic, expectedReplicaAssignment, zookeeper.client)
|
AdminUtils.createReplicaAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
|
||||||
|
|
||||||
val newTopicMetadata = AdminUtils.getTopicMetaDataFromZK(List(topic), zookeeper.client).head
|
val newTopicMetadata = AdminUtils.getTopicMetaDataFromZK(List(topic), zkClient).head
|
||||||
newTopicMetadata match {
|
newTopicMetadata match {
|
||||||
case Some(metadata) => assertEquals(topic, metadata.topic)
|
case Some(metadata) => assertEquals(topic, metadata.topic)
|
||||||
assertNotNull("partition metadata list cannot be null", metadata.partitionsMetadata)
|
assertNotNull("partition metadata list cannot be null", metadata.partitionsMetadata)
|
||||||
|
|
|
@ -52,12 +52,14 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
|
||||||
val consumer2 = "consumer2"
|
val consumer2 = "consumer2"
|
||||||
val consumer3 = "consumer3"
|
val consumer3 = "consumer3"
|
||||||
val nMessages = 2
|
val nMessages = 2
|
||||||
var zkClient: ZkClient = null
|
|
||||||
|
|
||||||
override def setUp() {
|
override def setUp() {
|
||||||
super.setUp()
|
super.setUp()
|
||||||
dirs = new ZKGroupTopicDirs(group, topic)
|
dirs = new ZKGroupTopicDirs(group, topic)
|
||||||
zkClient = new ZkClient(zkConnect, 6000, 3000, ZKStringSerializer)
|
}
|
||||||
|
|
||||||
|
override def tearDown() {
|
||||||
|
super.tearDown()
|
||||||
}
|
}
|
||||||
|
|
||||||
def testBasic() {
|
def testBasic() {
|
||||||
|
@ -98,8 +100,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
|
||||||
val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
|
val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
|
||||||
val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Predef.Map(topic -> 1))
|
val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Predef.Map(topic -> 1))
|
||||||
|
|
||||||
waitUntilLeaderIsElected(zookeeper.client, topic, 0, 500)
|
waitUntilLeaderIsElected(zkClient, topic, 0, 500)
|
||||||
waitUntilLeaderIsElected(zookeeper.client, topic, 1, 500)
|
waitUntilLeaderIsElected(zkClient, topic, 1, 500)
|
||||||
|
|
||||||
val receivedMessages1 = getMessages(nMessages*2, topicMessageStreams1)
|
val receivedMessages1 = getMessages(nMessages*2, topicMessageStreams1)
|
||||||
assertEquals(sentMessages1.size, receivedMessages1.size)
|
assertEquals(sentMessages1.size, receivedMessages1.size)
|
||||||
|
@ -124,8 +126,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
|
||||||
val sentMessages2_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages)
|
val sentMessages2_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages)
|
||||||
val sentMessages2 = (sentMessages2_1 ++ sentMessages2_2).sortWith((s,t) => s.checksum < t.checksum)
|
val sentMessages2 = (sentMessages2_1 ++ sentMessages2_2).sortWith((s,t) => s.checksum < t.checksum)
|
||||||
|
|
||||||
waitUntilLeaderIsElected(zookeeper.client, topic, 0, 500)
|
waitUntilLeaderIsElected(zkClient, topic, 0, 500)
|
||||||
waitUntilLeaderIsElected(zookeeper.client, topic, 1, 500)
|
waitUntilLeaderIsElected(zkClient, topic, 1, 500)
|
||||||
|
|
||||||
val receivedMessages2_1 = getMessages(nMessages, topicMessageStreams1)
|
val receivedMessages2_1 = getMessages(nMessages, topicMessageStreams1)
|
||||||
val receivedMessages2_2 = getMessages(nMessages, topicMessageStreams2)
|
val receivedMessages2_2 = getMessages(nMessages, topicMessageStreams2)
|
||||||
|
@ -149,8 +151,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
|
||||||
val sentMessages3_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages)
|
val sentMessages3_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages)
|
||||||
val sentMessages3 = (sentMessages3_1 ++ sentMessages3_2).sortWith((s,t) => s.checksum < t.checksum)
|
val sentMessages3 = (sentMessages3_1 ++ sentMessages3_2).sortWith((s,t) => s.checksum < t.checksum)
|
||||||
|
|
||||||
waitUntilLeaderIsElected(zookeeper.client, topic, 0, 500)
|
waitUntilLeaderIsElected(zkClient, topic, 0, 500)
|
||||||
waitUntilLeaderIsElected(zookeeper.client, topic, 1, 500)
|
waitUntilLeaderIsElected(zkClient, topic, 1, 500)
|
||||||
|
|
||||||
val receivedMessages3_1 = getMessages(nMessages, topicMessageStreams1)
|
val receivedMessages3_1 = getMessages(nMessages, topicMessageStreams1)
|
||||||
val receivedMessages3_2 = getMessages(nMessages, topicMessageStreams2)
|
val receivedMessages3_2 = getMessages(nMessages, topicMessageStreams2)
|
||||||
|
@ -178,8 +180,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
|
||||||
val sentMessages1_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec)
|
val sentMessages1_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec)
|
||||||
val sentMessages1 = (sentMessages1_1 ++ sentMessages1_2).sortWith((s,t) => s.checksum < t.checksum)
|
val sentMessages1 = (sentMessages1_1 ++ sentMessages1_2).sortWith((s,t) => s.checksum < t.checksum)
|
||||||
|
|
||||||
waitUntilLeaderIsElected(zookeeper.client, topic, 0, 500)
|
waitUntilLeaderIsElected(zkClient, topic, 0, 500)
|
||||||
waitUntilLeaderIsElected(zookeeper.client, topic, 1, 500)
|
waitUntilLeaderIsElected(zkClient, topic, 1, 500)
|
||||||
|
|
||||||
// create a consumer
|
// create a consumer
|
||||||
val consumerConfig1 = new ConsumerConfig(
|
val consumerConfig1 = new ConsumerConfig(
|
||||||
|
@ -209,8 +211,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
|
||||||
val sentMessages2_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec)
|
val sentMessages2_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec)
|
||||||
val sentMessages2 = (sentMessages2_1 ++ sentMessages2_2).sortWith((s,t) => s.checksum < t.checksum)
|
val sentMessages2 = (sentMessages2_1 ++ sentMessages2_2).sortWith((s,t) => s.checksum < t.checksum)
|
||||||
|
|
||||||
waitUntilLeaderIsElected(zookeeper.client, topic, 0, 500)
|
waitUntilLeaderIsElected(zkClient, topic, 0, 500)
|
||||||
waitUntilLeaderIsElected(zookeeper.client, topic, 1, 500)
|
waitUntilLeaderIsElected(zkClient, topic, 1, 500)
|
||||||
|
|
||||||
val receivedMessages2_1 = getMessages(nMessages, topicMessageStreams1)
|
val receivedMessages2_1 = getMessages(nMessages, topicMessageStreams1)
|
||||||
val receivedMessages2_2 = getMessages(nMessages, topicMessageStreams2)
|
val receivedMessages2_2 = getMessages(nMessages, topicMessageStreams2)
|
||||||
|
@ -234,8 +236,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
|
||||||
val sentMessages3_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec)
|
val sentMessages3_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec)
|
||||||
val sentMessages3 = (sentMessages3_1 ++ sentMessages3_2).sortWith((s,t) => s.checksum < t.checksum)
|
val sentMessages3 = (sentMessages3_1 ++ sentMessages3_2).sortWith((s,t) => s.checksum < t.checksum)
|
||||||
|
|
||||||
waitUntilLeaderIsElected(zookeeper.client, topic, 0, 500)
|
waitUntilLeaderIsElected(zkClient, topic, 0, 500)
|
||||||
waitUntilLeaderIsElected(zookeeper.client, topic, 1, 500)
|
waitUntilLeaderIsElected(zkClient, topic, 1, 500)
|
||||||
|
|
||||||
val receivedMessages3_1 = getMessages(nMessages, topicMessageStreams1)
|
val receivedMessages3_1 = getMessages(nMessages, topicMessageStreams1)
|
||||||
val receivedMessages3_2 = getMessages(nMessages, topicMessageStreams2)
|
val receivedMessages3_2 = getMessages(nMessages, topicMessageStreams2)
|
||||||
|
@ -438,11 +440,11 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
|
||||||
|
|
||||||
def getZKChildrenValues(path : String) : Seq[Tuple2[String,String]] = {
|
def getZKChildrenValues(path : String) : Seq[Tuple2[String,String]] = {
|
||||||
import scala.collection.JavaConversions
|
import scala.collection.JavaConversions
|
||||||
val children = zookeeper.client.getChildren(path)
|
val children = zkClient.getChildren(path)
|
||||||
Collections.sort(children)
|
Collections.sort(children)
|
||||||
val childrenAsSeq : Seq[java.lang.String] = JavaConversions.asBuffer(children)
|
val childrenAsSeq : Seq[java.lang.String] = JavaConversions.asBuffer(children)
|
||||||
childrenAsSeq.map(partition =>
|
childrenAsSeq.map(partition =>
|
||||||
(partition, zookeeper.client.readData(path + "/" + partition).asInstanceOf[String]))
|
(partition, zkClient.readData(path + "/" + partition).asInstanceOf[String]))
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,8 +28,8 @@ import kafka.server._
|
||||||
import org.scalatest.junit.JUnit3Suite
|
import org.scalatest.junit.JUnit3Suite
|
||||||
import kafka.integration.KafkaServerTestHarness
|
import kafka.integration.KafkaServerTestHarness
|
||||||
import kafka.producer.{ProducerData, Producer}
|
import kafka.producer.{ProducerData, Producer}
|
||||||
import kafka.utils.TestUtils
|
|
||||||
import kafka.utils.TestUtils._
|
import kafka.utils.TestUtils._
|
||||||
|
import kafka.utils.TestUtils
|
||||||
|
|
||||||
class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
|
class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
|
||||||
|
|
||||||
|
@ -67,7 +67,7 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
|
||||||
def testFetcher() {
|
def testFetcher() {
|
||||||
val perNode = 2
|
val perNode = 2
|
||||||
var count = sendMessages(perNode)
|
var count = sendMessages(perNode)
|
||||||
waitUntilLeaderIsElected(zookeeper.client, topic, 0, 500)
|
waitUntilLeaderIsElected(zkClient, topic, 0, 500)
|
||||||
fetch(count)
|
fetch(count)
|
||||||
Thread.sleep(100)
|
Thread.sleep(100)
|
||||||
assertQueueEmpty()
|
assertQueueEmpty()
|
||||||
|
|
|
@ -22,26 +22,22 @@ import kafka.zk.ZooKeeperTestHarness
|
||||||
import kafka.admin.CreateTopicCommand
|
import kafka.admin.CreateTopicCommand
|
||||||
import java.nio.ByteBuffer
|
import java.nio.ByteBuffer
|
||||||
import kafka.log.LogManager
|
import kafka.log.LogManager
|
||||||
import kafka.utils.TestUtils
|
|
||||||
import junit.framework.Assert._
|
import junit.framework.Assert._
|
||||||
import org.I0Itec.zkclient.ZkClient
|
|
||||||
import TestUtils._
|
|
||||||
import org.easymock.EasyMock
|
import org.easymock.EasyMock
|
||||||
import kafka.network.BoundedByteBufferReceive
|
import kafka.network.BoundedByteBufferReceive
|
||||||
import kafka.api.{TopicMetadataSend, TopicMetadataRequest}
|
import kafka.api.{TopicMetadataSend, TopicMetadataRequest}
|
||||||
import kafka.cluster.Broker
|
import kafka.cluster.Broker
|
||||||
import kafka.server.{KafkaZooKeeper, KafkaApis, KafkaConfig}
|
import kafka.server.{KafkaZooKeeper, KafkaApis, KafkaConfig}
|
||||||
|
import kafka.utils.TestUtils
|
||||||
|
import kafka.utils.TestUtils._
|
||||||
|
|
||||||
class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
|
class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
|
||||||
val props = createBrokerConfigs(1)
|
val props = createBrokerConfigs(1)
|
||||||
val configs = props.map(p => new KafkaConfig(p) { override val flushInterval = 1})
|
val configs = props.map(p => new KafkaConfig(p) { override val flushInterval = 1})
|
||||||
var zkClient: ZkClient = null
|
|
||||||
var brokers: Seq[Broker] = null
|
var brokers: Seq[Broker] = null
|
||||||
|
|
||||||
override def setUp() {
|
override def setUp() {
|
||||||
super.setUp()
|
super.setUp()
|
||||||
zkClient = zookeeper.client
|
|
||||||
// create brokers in zookeeper
|
|
||||||
brokers = TestUtils.createBrokersInZk(zkClient, configs.map(config => config.brokerId))
|
brokers = TestUtils.createBrokersInZk(zkClient, configs.map(config => config.brokerId))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -26,8 +26,8 @@ import kafka.consumer.{ConsumerConfig, KafkaMessageStream}
|
||||||
import org.apache.log4j.{Level, Logger}
|
import org.apache.log4j.{Level, Logger}
|
||||||
import kafka.message._
|
import kafka.message._
|
||||||
import kafka.javaapi.producer.{ProducerData, Producer}
|
import kafka.javaapi.producer.{ProducerData, Producer}
|
||||||
import kafka.utils.{Utils, Logging, TestUtils}
|
|
||||||
import kafka.utils.TestUtils._
|
import kafka.utils.TestUtils._
|
||||||
|
import kafka.utils.{Utils, Logging, TestUtils}
|
||||||
|
|
||||||
class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with Logging {
|
class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with Logging {
|
||||||
|
|
||||||
|
@ -53,8 +53,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
|
||||||
// send some messages to each broker
|
// send some messages to each broker
|
||||||
val sentMessages1 = sendMessages(nMessages, "batch1")
|
val sentMessages1 = sendMessages(nMessages, "batch1")
|
||||||
|
|
||||||
waitUntilLeaderIsElected(zookeeper.client, topic, 0, 500)
|
waitUntilLeaderIsElected(zkClient, topic, 0, 500)
|
||||||
waitUntilLeaderIsElected(zookeeper.client, topic, 1, 500)
|
waitUntilLeaderIsElected(zkClient, topic, 1, 500)
|
||||||
|
|
||||||
// create a consumer
|
// create a consumer
|
||||||
val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zookeeperConnect, group, consumer1))
|
val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zookeeperConnect, group, consumer1))
|
||||||
|
|
|
@ -22,10 +22,10 @@ import junit.framework.Assert._
|
||||||
import org.junit.Test
|
import org.junit.Test
|
||||||
import kafka.common.OffsetOutOfRangeException
|
import kafka.common.OffsetOutOfRangeException
|
||||||
import kafka.zk.ZooKeeperTestHarness
|
import kafka.zk.ZooKeeperTestHarness
|
||||||
import kafka.utils.{TestZKUtils, Utils, MockTime, TestUtils}
|
|
||||||
import org.scalatest.junit.JUnit3Suite
|
import org.scalatest.junit.JUnit3Suite
|
||||||
import kafka.admin.CreateTopicCommand
|
import kafka.admin.CreateTopicCommand
|
||||||
import kafka.server.KafkaConfig
|
import kafka.server.KafkaConfig
|
||||||
|
import kafka.utils._
|
||||||
|
|
||||||
class LogManagerTest extends JUnit3Suite with ZooKeeperTestHarness {
|
class LogManagerTest extends JUnit3Suite with ZooKeeperTestHarness {
|
||||||
|
|
||||||
|
@ -48,10 +48,10 @@ class LogManagerTest extends JUnit3Suite with ZooKeeperTestHarness {
|
||||||
logManager.startup
|
logManager.startup
|
||||||
logDir = logManager.logDir
|
logDir = logManager.logDir
|
||||||
|
|
||||||
TestUtils.createBrokersInZk(zookeeper.client, List(config.brokerId))
|
TestUtils.createBrokersInZk(zkClient, List(config.brokerId))
|
||||||
|
|
||||||
// setup brokers in zookeeper as owners of partitions for this test
|
// setup brokers in zookeeper as owners of partitions for this test
|
||||||
CreateTopicCommand.createTopic(zookeeper.client, name, 3, 1, "0,0,0")
|
CreateTopicCommand.createTopic(zkClient, name, 3, 1, "0,0,0")
|
||||||
}
|
}
|
||||||
|
|
||||||
override def tearDown() {
|
override def tearDown() {
|
||||||
|
|
|
@ -26,7 +26,6 @@ import collection.mutable.WrappedArray
|
||||||
import kafka.consumer.SimpleConsumer
|
import kafka.consumer.SimpleConsumer
|
||||||
import org.junit.{After, Before, Test}
|
import org.junit.{After, Before, Test}
|
||||||
import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
|
import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
|
||||||
import org.apache.log4j._
|
|
||||||
import kafka.zk.ZooKeeperTestHarness
|
import kafka.zk.ZooKeeperTestHarness
|
||||||
import org.scalatest.junit.JUnit3Suite
|
import org.scalatest.junit.JUnit3Suite
|
||||||
import kafka.admin.CreateTopicCommand
|
import kafka.admin.CreateTopicCommand
|
||||||
|
@ -44,8 +43,6 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness {
|
||||||
val brokerPort: Int = 9099
|
val brokerPort: Int = 9099
|
||||||
var simpleConsumer: SimpleConsumer = null
|
var simpleConsumer: SimpleConsumer = null
|
||||||
|
|
||||||
private val logger = Logger.getLogger(classOf[LogOffsetTest])
|
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
override def setUp() {
|
override def setUp() {
|
||||||
super.setUp()
|
super.setUp()
|
||||||
|
@ -99,7 +96,7 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness {
|
||||||
val part = Integer.valueOf(topicPartition.split("-").last).intValue
|
val part = Integer.valueOf(topicPartition.split("-").last).intValue
|
||||||
|
|
||||||
// setup brokers in zookeeper as owners of partitions for this test
|
// setup brokers in zookeeper as owners of partitions for this test
|
||||||
CreateTopicCommand.createTopic(zookeeper.client, topic, 1, 1, "1")
|
CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "1")
|
||||||
|
|
||||||
val logManager = server.getLogManager
|
val logManager = server.getLogManager
|
||||||
val log = logManager.getOrCreateLog(topic, part)
|
val log = logManager.getOrCreateLog(topic, part)
|
||||||
|
@ -137,7 +134,7 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness {
|
||||||
val part = Integer.valueOf(topicPartition.split("-").last).intValue
|
val part = Integer.valueOf(topicPartition.split("-").last).intValue
|
||||||
|
|
||||||
// setup brokers in zookeeper as owners of partitions for this test
|
// setup brokers in zookeeper as owners of partitions for this test
|
||||||
CreateTopicCommand.createTopic(zookeeper.client, topic, 1, 1, "1")
|
CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "1")
|
||||||
|
|
||||||
var offsetChanged = false
|
var offsetChanged = false
|
||||||
for(i <- 1 to 14) {
|
for(i <- 1 to 14) {
|
||||||
|
@ -158,7 +155,7 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness {
|
||||||
val part = Integer.valueOf(topicPartition.split("-").last).intValue
|
val part = Integer.valueOf(topicPartition.split("-").last).intValue
|
||||||
|
|
||||||
// setup brokers in zookeeper as owners of partitions for this test
|
// setup brokers in zookeeper as owners of partitions for this test
|
||||||
CreateTopicCommand.createTopic(zookeeper.client, topic, 3, 1, "1,1,1")
|
CreateTopicCommand.createTopic(zkClient, topic, 3, 1, "1,1,1")
|
||||||
|
|
||||||
val logManager = server.getLogManager
|
val logManager = server.getLogManager
|
||||||
val log = logManager.getOrCreateLog(topic, part)
|
val log = logManager.getOrCreateLog(topic, part)
|
||||||
|
@ -185,7 +182,7 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness {
|
||||||
val part = Integer.valueOf(topicPartition.split("-").last).intValue
|
val part = Integer.valueOf(topicPartition.split("-").last).intValue
|
||||||
|
|
||||||
// setup brokers in zookeeper as owners of partitions for this test
|
// setup brokers in zookeeper as owners of partitions for this test
|
||||||
CreateTopicCommand.createTopic(zookeeper.client, topic, 3, 1, "1,1,1")
|
CreateTopicCommand.createTopic(zkClient, topic, 3, 1, "1,1,1")
|
||||||
|
|
||||||
val logManager = server.getLogManager
|
val logManager = server.getLogManager
|
||||||
val log = logManager.getOrCreateLog(topic, part)
|
val log = logManager.getOrCreateLog(topic, part)
|
||||||
|
|
|
@ -30,23 +30,19 @@ import kafka.producer.async._
|
||||||
import kafka.serializer.{StringEncoder, StringDecoder, Encoder}
|
import kafka.serializer.{StringEncoder, StringDecoder, Encoder}
|
||||||
import kafka.server.KafkaConfig
|
import kafka.server.KafkaConfig
|
||||||
import kafka.utils.TestUtils._
|
import kafka.utils.TestUtils._
|
||||||
import kafka.utils.{NegativePartitioner, TestZKUtils, TestUtils}
|
|
||||||
import kafka.zk.ZooKeeperTestHarness
|
import kafka.zk.ZooKeeperTestHarness
|
||||||
import collection.Map
|
import collection.Map
|
||||||
import collection.mutable.ListBuffer
|
import collection.mutable.ListBuffer
|
||||||
import org.I0Itec.zkclient.ZkClient
|
|
||||||
import org.scalatest.junit.JUnit3Suite
|
import org.scalatest.junit.JUnit3Suite
|
||||||
|
import kafka.utils.{NegativePartitioner, TestZKUtils, TestUtils}
|
||||||
|
|
||||||
class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
|
class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
|
||||||
val props = createBrokerConfigs(1)
|
val props = createBrokerConfigs(1)
|
||||||
val configs = props.map(p => new KafkaConfig(p) { override val flushInterval = 1})
|
val configs = props.map(p => new KafkaConfig(p) { override val flushInterval = 1})
|
||||||
var zkClient: ZkClient = null
|
|
||||||
var brokers: Seq[Broker] = null
|
var brokers: Seq[Broker] = null
|
||||||
|
|
||||||
override def setUp() {
|
override def setUp() {
|
||||||
super.setUp()
|
super.setUp()
|
||||||
zkClient = zookeeper.client
|
|
||||||
// create brokers in zookeeper
|
|
||||||
brokers = TestUtils.createBrokersInZk(zkClient, configs.map(config => config.brokerId))
|
brokers = TestUtils.createBrokersInZk(zkClient, configs.map(config => config.brokerId))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -27,10 +27,9 @@ import kafka.message.Message
|
||||||
import kafka.server.{KafkaConfig, KafkaRequestHandler, KafkaServer}
|
import kafka.server.{KafkaConfig, KafkaRequestHandler, KafkaServer}
|
||||||
import kafka.zk.ZooKeeperTestHarness
|
import kafka.zk.ZooKeeperTestHarness
|
||||||
import org.apache.log4j.{Level, Logger}
|
import org.apache.log4j.{Level, Logger}
|
||||||
import org.I0Itec.zkclient.ZkClient
|
|
||||||
import org.junit.Assert._
|
import org.junit.Assert._
|
||||||
import org.junit.Test
|
import org.junit.Test
|
||||||
import kafka.utils.{SystemTime, TestZKUtils, Utils, TestUtils}
|
import kafka.utils._
|
||||||
|
|
||||||
class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
|
class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
|
||||||
private val brokerId1 = 0
|
private val brokerId1 = 0
|
||||||
|
@ -42,13 +41,10 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
|
||||||
private var consumer1: SimpleConsumer = null
|
private var consumer1: SimpleConsumer = null
|
||||||
private var consumer2: SimpleConsumer = null
|
private var consumer2: SimpleConsumer = null
|
||||||
private val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
|
private val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
|
||||||
private var zkClient: ZkClient = null
|
|
||||||
|
|
||||||
override def setUp() {
|
override def setUp() {
|
||||||
super.setUp()
|
super.setUp()
|
||||||
// set up 2 brokers with 4 partitions each
|
// set up 2 brokers with 4 partitions each
|
||||||
zkClient = zookeeper.client
|
|
||||||
|
|
||||||
val props1 = TestUtils.createBrokerConfig(brokerId1, port1)
|
val props1 = TestUtils.createBrokerConfig(brokerId1, port1)
|
||||||
val config1 = new KafkaConfig(props1) {
|
val config1 = new KafkaConfig(props1) {
|
||||||
override val numPartitions = 4
|
override val numPartitions = 4
|
||||||
|
@ -166,7 +162,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
|
||||||
|
|
||||||
// restart server 1
|
// restart server 1
|
||||||
server1.startup()
|
server1.startup()
|
||||||
Thread.sleep(500)
|
Thread.sleep(100)
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// cross check if broker 1 got the messages
|
// cross check if broker 1 got the messages
|
||||||
|
|
|
@ -116,10 +116,10 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
|
||||||
response.offsets.foreach(Assert.assertEquals(-1L, _))
|
response.offsets.foreach(Assert.assertEquals(-1L, _))
|
||||||
|
|
||||||
// #2 - test that we get correct offsets when partition is owner by broker
|
// #2 - test that we get correct offsets when partition is owner by broker
|
||||||
val zkClient = zookeeper.client
|
|
||||||
CreateTopicCommand.createTopic(zkClient, "topic1", 1, 1)
|
CreateTopicCommand.createTopic(zkClient, "topic1", 1, 1)
|
||||||
CreateTopicCommand.createTopic(zkClient, "topic3", 1, 1)
|
CreateTopicCommand.createTopic(zkClient, "topic3", 1, 1)
|
||||||
|
|
||||||
|
Thread.sleep(500)
|
||||||
val response2 = producer.send(request)
|
val response2 = producer.send(request)
|
||||||
Assert.assertEquals(request.correlationId, response2.correlationId)
|
Assert.assertEquals(request.correlationId, response2.correlationId)
|
||||||
Assert.assertEquals(response2.errors.length, response2.offsets.length)
|
Assert.assertEquals(response2.errors.length, response2.offsets.length)
|
||||||
|
|
|
@ -20,10 +20,9 @@ package kafka.server
|
||||||
import org.scalatest.junit.JUnit3Suite
|
import org.scalatest.junit.JUnit3Suite
|
||||||
import kafka.zk.ZooKeeperTestHarness
|
import kafka.zk.ZooKeeperTestHarness
|
||||||
import kafka.admin.CreateTopicCommand
|
import kafka.admin.CreateTopicCommand
|
||||||
import org.I0Itec.zkclient.ZkClient
|
|
||||||
import kafka.utils.TestUtils._
|
import kafka.utils.TestUtils._
|
||||||
import junit.framework.Assert._
|
import junit.framework.Assert._
|
||||||
import kafka.utils.{ZKStringSerializer, Utils, TestUtils}
|
import kafka.utils.{Utils, TestUtils}
|
||||||
|
|
||||||
class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
|
class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
|
||||||
|
|
||||||
|
@ -37,13 +36,10 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
|
||||||
val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2)
|
val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2)
|
||||||
|
|
||||||
var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
|
var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
|
||||||
var zkClient: ZkClient = null
|
|
||||||
|
|
||||||
override def setUp() {
|
override def setUp() {
|
||||||
super.setUp()
|
super.setUp()
|
||||||
|
|
||||||
zkClient = new ZkClient(zkConnect, 6000, 3000, ZKStringSerializer)
|
|
||||||
|
|
||||||
// start both servers
|
// start both servers
|
||||||
val server1 = TestUtils.createServer(new KafkaConfig(configProps1))
|
val server1 = TestUtils.createServer(new KafkaConfig(configProps1))
|
||||||
val server2 = TestUtils.createServer(new KafkaConfig(configProps2))
|
val server2 = TestUtils.createServer(new KafkaConfig(configProps2))
|
||||||
|
|
|
@ -48,7 +48,7 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
|
||||||
server.startup()
|
server.startup()
|
||||||
|
|
||||||
// create topic
|
// create topic
|
||||||
CreateTopicCommand.createTopic(zookeeper.client, topic, 1, 1, "0")
|
CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "0")
|
||||||
|
|
||||||
val producer = new Producer[Int, Message](getProducerConfig(64*1024, 100000, 10000))
|
val producer = new Producer[Int, Message](getProducerConfig(64*1024, 100000, 10000))
|
||||||
|
|
||||||
|
@ -60,6 +60,7 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
|
||||||
server.shutdown()
|
server.shutdown()
|
||||||
val cleanShutDownFile = new File(new File(config.logDir), server.CleanShutdownFile)
|
val cleanShutDownFile = new File(new File(config.logDir), server.CleanShutdownFile)
|
||||||
assertTrue(cleanShutDownFile.exists)
|
assertTrue(cleanShutDownFile.exists)
|
||||||
|
producer.close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -73,7 +74,7 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
|
||||||
val server = new KafkaServer(config)
|
val server = new KafkaServer(config)
|
||||||
server.startup()
|
server.startup()
|
||||||
|
|
||||||
waitUntilLeaderIsElected(zookeeper.client, topic, 0, 1000)
|
waitUntilLeaderIsElected(zkClient, topic, 0, 1000)
|
||||||
|
|
||||||
var fetchedMessage: ByteBufferMessageSet = null
|
var fetchedMessage: ByteBufferMessageSet = null
|
||||||
while(fetchedMessage == null || fetchedMessage.validBytes == 0) {
|
while(fetchedMessage == null || fetchedMessage.validBytes == 0) {
|
||||||
|
@ -97,6 +98,7 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
|
||||||
|
|
||||||
server.shutdown()
|
server.shutdown()
|
||||||
Utils.rm(server.config.logDir)
|
Utils.rm(server.config.logDir)
|
||||||
|
producer.close()
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,9 +20,8 @@ package kafka.zk
|
||||||
import org.apache.zookeeper.server.ZooKeeperServer
|
import org.apache.zookeeper.server.ZooKeeperServer
|
||||||
import org.apache.zookeeper.server.NIOServerCnxn
|
import org.apache.zookeeper.server.NIOServerCnxn
|
||||||
import kafka.utils.TestUtils
|
import kafka.utils.TestUtils
|
||||||
import org.I0Itec.zkclient.ZkClient
|
|
||||||
import java.net.InetSocketAddress
|
import java.net.InetSocketAddress
|
||||||
import kafka.utils.{Utils, ZKStringSerializer}
|
import kafka.utils.Utils
|
||||||
|
|
||||||
class EmbeddedZookeeper(val connectString: String) {
|
class EmbeddedZookeeper(val connectString: String) {
|
||||||
val snapshotDir = TestUtils.tempDir()
|
val snapshotDir = TestUtils.tempDir()
|
||||||
|
@ -32,8 +31,6 @@ class EmbeddedZookeeper(val connectString: String) {
|
||||||
val port = connectString.split(":")(1).toInt
|
val port = connectString.split(":")(1).toInt
|
||||||
val factory = new NIOServerCnxn.Factory(new InetSocketAddress("127.0.0.1", port))
|
val factory = new NIOServerCnxn.Factory(new InetSocketAddress("127.0.0.1", port))
|
||||||
factory.startup(zookeeper)
|
factory.startup(zookeeper)
|
||||||
val client = new ZkClient(connectString)
|
|
||||||
client.setZkSerializer(ZKStringSerializer)
|
|
||||||
|
|
||||||
def shutdown() {
|
def shutdown() {
|
||||||
factory.shutdown()
|
factory.shutdown()
|
||||||
|
|
|
@ -18,19 +18,24 @@
|
||||||
package kafka.zk
|
package kafka.zk
|
||||||
|
|
||||||
import org.scalatest.junit.JUnit3Suite
|
import org.scalatest.junit.JUnit3Suite
|
||||||
import kafka.utils.TestZKUtils
|
import org.I0Itec.zkclient.ZkClient
|
||||||
|
import kafka.utils.{ZKStringSerializer, TestZKUtils}
|
||||||
|
|
||||||
trait ZooKeeperTestHarness extends JUnit3Suite {
|
trait ZooKeeperTestHarness extends JUnit3Suite {
|
||||||
val zkConnect: String = TestZKUtils.zookeeperConnect
|
val zkConnect: String = TestZKUtils.zookeeperConnect
|
||||||
var zookeeper: EmbeddedZookeeper = null
|
var zookeeper: EmbeddedZookeeper = null
|
||||||
|
var zkClient: ZkClient = null
|
||||||
|
|
||||||
override def setUp() {
|
override def setUp() {
|
||||||
zookeeper = new EmbeddedZookeeper(zkConnect)
|
zookeeper = new EmbeddedZookeeper(zkConnect)
|
||||||
|
zkClient = new ZkClient(zookeeper.connectString)
|
||||||
|
zkClient.setZkSerializer(ZKStringSerializer)
|
||||||
super.setUp
|
super.setUp
|
||||||
}
|
}
|
||||||
|
|
||||||
override def tearDown() {
|
override def tearDown() {
|
||||||
super.tearDown
|
super.tearDown
|
||||||
|
zkClient.close()
|
||||||
zookeeper.shutdown()
|
zookeeper.shutdown()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue