KAFKA-1501 Let the OS choose the port in unit tests to avoid collisions. Patch by Ewen CP, reviewed by Guozhang and me.

This commit is contained in:
Ewen Cheslack-Postava 2015-04-04 14:26:38 -07:00 committed by Jay Kreps
parent 15b93a410a
commit 6adaffd8ea
54 changed files with 732 additions and 551 deletions

View File

@ -131,9 +131,12 @@ public class SelectorTest {
@Test
public void testConnectionRefused() throws Exception {
int node = 0;
selector.connect(node, new InetSocketAddress("localhost", TestUtils.choosePort()), BUFFER_SIZE, BUFFER_SIZE);
ServerSocket nonListeningSocket = new ServerSocket(0);
int nonListeningPort = nonListeningSocket.getLocalPort();
selector.connect(node, new InetSocketAddress("localhost", nonListeningPort), BUFFER_SIZE, BUFFER_SIZE);
while (selector.disconnected().contains(node))
selector.poll(1000L);
nonListeningSocket.close();
}
/**
@ -271,8 +274,8 @@ public class SelectorTest {
private final List<Socket> sockets;
public EchoServer() throws Exception {
this.port = TestUtils.choosePort();
this.serverSocket = new ServerSocket(port);
this.serverSocket = new ServerSocket(0);
this.port = this.serverSocket.getLocalPort();
this.threads = Collections.synchronizedList(new ArrayList<Thread>());
this.sockets = Collections.synchronizedList(new ArrayList<Socket>());
}

View File

@ -19,8 +19,6 @@ package org.apache.kafka.test;
import static java.util.Arrays.asList;
import java.io.File;
import java.io.IOException;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
@ -59,32 +57,6 @@ public class TestUtils {
return new Cluster(asList(ns), parts);
}
/**
* Choose a number of random available ports
*/
public static int[] choosePorts(int count) {
try {
ServerSocket[] sockets = new ServerSocket[count];
int[] ports = new int[count];
for (int i = 0; i < count; i++) {
sockets[i] = new ServerSocket(0);
ports[i] = sockets[i].getLocalPort();
}
for (int i = 0; i < count; i++)
sockets[i].close();
return ports;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* Choose an available port
*/
public static int choosePort() {
return choosePorts(1)[0];
}
/**
* Generate an array of random bytes
*

View File

@ -39,7 +39,7 @@ import com.yammer.metrics.core.{Gauge, Meter}
*/
class SocketServer(val brokerId: Int,
val host: String,
val port: Int,
private val port: Int,
val numProcessorThreads: Int,
val maxQueuedRequests: Int,
val sendBufferSize: Int,
@ -72,7 +72,7 @@ class SocketServer(val brokerId: Int,
requestChannel,
quotas,
connectionsMaxIdleMs)
Utils.newThread("kafka-network-thread-%d-%d".format(port, i), processors(i), false).start()
Utils.newThread("kafka-network-thread-%d-%d".format(brokerId, i), processors(i), false).start()
}
newGauge("ResponsesBeingSent", new Gauge[Int] {
@ -100,6 +100,12 @@ class SocketServer(val brokerId: Int,
processor.shutdown()
info("Shutdown completed")
}
def boundPort(): Int = {
if (acceptor == null)
throw new KafkaException("Tried to check server's port before server was started")
acceptor.serverChannel.socket().getLocalPort
}
}
/**
@ -197,7 +203,7 @@ private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQ
* Thread that accepts and configures new connections. There is only need for one of these
*/
private[kafka] class Acceptor(val host: String,
val port: Int,
private val port: Int,
private val processors: Array[Processor],
val sendBufferSize: Int,
val recvBufferSize: Int,

View File

@ -157,7 +157,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
topicConfigManager.startup()
/* tell everyone we are alive */
kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient)
val advertisedPort = if (config.advertisedPort != 0) config.advertisedPort else socketServer.boundPort()
kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, advertisedPort, config.zkSessionTimeoutMs, zkClient)
kafkaHealthcheck.startup()
/* register broker metrics */
@ -357,6 +358,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
def getLogManager(): LogManager = logManager
def boundPort(): Int = socketServer.boundPort()
private def createLogManager(zkClient: ZkClient, brokerState: BrokerState): LogManager = {
val defaultLogConfig = LogConfig(segmentSize = config.logSegmentBytes,
segmentMs = config.logRollTimeMillis,

View File

@ -0,0 +1,154 @@
/**
* Copyright 2015 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package kafka.api
import kafka.server.KafkaConfig
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.CommitType
import org.apache.kafka.common.TopicPartition
import kafka.utils.{ShutdownableThread, TestUtils, Logging}
import org.junit.Assert._
import scala.collection.JavaConversions._
/**
* Integration tests for the new consumer that cover basic usage as well as server failures
*/
class ConsumerBounceTest extends IntegrationTestHarness with Logging {
val producerCount = 1
val consumerCount = 2
val serverCount = 3
val topic = "topic"
val part = 0
val tp = new TopicPartition(topic, part)
// configure the servers and clients
this.serverConfig.setProperty("controlled.shutdown.enable", "false") // speed up shutdown
this.serverConfig.setProperty("offsets.topic.replication.factor", "3") // don't want to lose offset
this.serverConfig.setProperty("offsets.topic.num.partitions", "1")
this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all")
this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-test")
this.consumerConfig.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 4096.toString)
this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
override def generateConfigs() = {
FixedPortTestUtils.createBrokerConfigs(serverCount, zkConnect,enableControlledShutdown = false)
.map(KafkaConfig.fromProps(_, serverConfig))
}
override def setUp() {
super.setUp()
// create the test topic with all the brokers as replicas
TestUtils.createTopic(this.zkClient, topic, 1, serverCount, this.servers)
}
def testConsumptionWithBrokerFailures() = consumeWithBrokerFailures(5)
/*
* 1. Produce a bunch of messages
* 2. Then consume the messages while killing and restarting brokers at random
*/
def consumeWithBrokerFailures(numIters: Int) {
val numRecords = 1000
sendRecords(numRecords)
this.producers.map(_.close)
var consumed = 0
val consumer = this.consumers(0)
consumer.subscribe(topic)
val scheduler = new BounceBrokerScheduler(numIters)
scheduler.start()
while (scheduler.isRunning.get()) {
for (record <- consumer.poll(100)) {
assertEquals(consumed.toLong, record.offset())
consumed += 1
}
consumer.commit(CommitType.SYNC)
if (consumed == numRecords) {
consumer.seekToBeginning()
consumed = 0
}
}
scheduler.shutdown()
}
def testSeekAndCommitWithBrokerFailures() = seekAndCommitWithBrokerFailures(5)
def seekAndCommitWithBrokerFailures(numIters: Int) {
val numRecords = 1000
sendRecords(numRecords)
this.producers.map(_.close)
val consumer = this.consumers(0)
consumer.subscribe(tp)
consumer.seek(tp, 0)
val scheduler = new BounceBrokerScheduler(numIters)
scheduler.start()
while(scheduler.isRunning.get()) {
val coin = TestUtils.random.nextInt(3)
if (coin == 0) {
info("Seeking to end of log")
consumer.seekToEnd()
assertEquals(numRecords.toLong, consumer.position(tp))
} else if (coin == 1) {
val pos = TestUtils.random.nextInt(numRecords).toLong
info("Seeking to " + pos)
consumer.seek(tp, pos)
assertEquals(pos, consumer.position(tp))
} else if (coin == 2) {
info("Committing offset.")
consumer.commit(CommitType.SYNC)
assertEquals(consumer.position(tp), consumer.committed(tp))
}
}
}
private class BounceBrokerScheduler(val numIters: Int) extends ShutdownableThread("daemon-bounce-broker", false)
{
var iter: Int = 0
override def doWork(): Unit = {
killRandomBroker()
Thread.sleep(500)
restartDeadBrokers()
iter += 1
if (iter == numIters)
initiateShutdown()
else
Thread.sleep(500)
}
}
private def sendRecords(numRecords: Int) {
val futures = (0 until numRecords).map { i =>
this.producers(0).send(new ProducerRecord(topic, part, i.toString.getBytes, i.toString.getBytes))
}
futures.map(_.get)
}
}

View File

@ -146,72 +146,6 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
assertNull(this.consumers(0).partitionsFor("non-exist-topic"))
}
def testConsumptionWithBrokerFailures() = consumeWithBrokerFailures(5)
/*
* 1. Produce a bunch of messages
* 2. Then consume the messages while killing and restarting brokers at random
*/
def consumeWithBrokerFailures(numIters: Int) {
val numRecords = 1000
sendRecords(numRecords)
this.producers.map(_.close)
var consumed = 0
val consumer = this.consumers(0)
consumer.subscribe(topic)
val scheduler = new BounceBrokerScheduler(numIters)
scheduler.start()
while (scheduler.isRunning.get()) {
for (record <- consumer.poll(100)) {
assertEquals(consumed.toLong, record.offset())
consumed += 1
}
consumer.commit(CommitType.SYNC)
if (consumed == numRecords) {
consumer.seekToBeginning()
consumed = 0
}
}
scheduler.shutdown()
}
def testSeekAndCommitWithBrokerFailures() = seekAndCommitWithBrokerFailures(5)
def seekAndCommitWithBrokerFailures(numIters: Int) {
val numRecords = 1000
sendRecords(numRecords)
this.producers.map(_.close)
val consumer = this.consumers(0)
consumer.subscribe(tp)
consumer.seek(tp, 0)
val scheduler = new BounceBrokerScheduler(numIters)
scheduler.start()
while(scheduler.isRunning.get()) {
val coin = TestUtils.random.nextInt(3)
if (coin == 0) {
info("Seeking to end of log")
consumer.seekToEnd()
assertEquals(numRecords.toLong, consumer.position(tp))
} else if (coin == 1) {
val pos = TestUtils.random.nextInt(numRecords).toLong
info("Seeking to " + pos)
consumer.seek(tp, pos)
assertEquals(pos, consumer.position(tp))
} else if (coin == 2) {
info("Committing offset.")
consumer.commit(CommitType.SYNC)
assertEquals(consumer.position(tp), consumer.committed(tp))
}
}
}
def testPartitionReassignmentCallback() {
val callback = new TestConsumerReassignmentCallback()
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "200"); // timeout quickly to avoid slow test
@ -255,23 +189,6 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
}
}
private class BounceBrokerScheduler(val numIters: Int) extends ShutdownableThread("daemon-bounce-broker", false)
{
var iter: Int = 0
override def doWork(): Unit = {
killRandomBroker()
Thread.sleep(500)
restartDeadBrokers()
iter += 1
if (iter == numIters)
initiateShutdown()
else
Thread.sleep(500)
}
}
private def sendRecords(numRecords: Int) {
val futures = (0 until numRecords).map { i =>
this.producers(0).send(new ProducerRecord(topic, part, i.toString.getBytes, i.toString.getBytes))

View File

@ -0,0 +1,55 @@
/**
* Copyright 2015 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package kafka.api
import java.io.IOException
import java.net.ServerSocket
import java.util.Properties
import kafka.utils.TestUtils
/**
* DO NOT USE THESE UTILITIES UNLESS YOU ABSOLUTELY MUST
*
* These are utilities for selecting fixed (preselected), ephemeral ports to use with tests. This is not a reliable way
* of testing on most machines because you can easily run into port conflicts. If you're using this class, you're almost
* certainly doing something wrong unless you can prove that your test **cannot function** properly without it.
*/
object FixedPortTestUtils {
def choosePorts(count: Int): Seq[Int] = {
try {
val sockets = (0 until count).map(i => new ServerSocket(0))
val ports = sockets.map(_.getLocalPort())
sockets.foreach(_.close())
ports
} catch {
case e: IOException => {
throw new RuntimeException(e)
}
}
}
def createBrokerConfigs(numConfigs: Int,
zkConnect: String,
enableControlledShutdown: Boolean = true,
enableDeleteTopic: Boolean = false): Seq[Properties] = {
val ports = FixedPortTestUtils.choosePorts(numConfigs)
(0 until numConfigs)
.map(node => TestUtils.createBrokerConfig(node, zkConnect, enableControlledShutdown, enableDeleteTopic, ports(node)))
}
}

View File

@ -38,15 +38,16 @@ trait IntegrationTestHarness extends KafkaServerTestHarness {
lazy val producerConfig = new Properties
lazy val consumerConfig = new Properties
lazy val serverConfig = new Properties
override lazy val configs = {
val cfgs = TestUtils.createBrokerConfigs(serverCount)
var consumers = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
var producers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]()
override def generateConfigs() = {
val cfgs = TestUtils.createBrokerConfigs(serverCount, zkConnect)
cfgs.map(_.putAll(serverConfig))
cfgs.map(KafkaConfig.fromProps)
}
var consumers = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
var producers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]()
override def setUp() {
super.setUp()
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapUrl)

View File

@ -0,0 +1,164 @@
/**
* Copyright 2015 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package kafka.api
import java.util.Properties
import kafka.consumer.SimpleConsumer
import kafka.integration.KafkaServerTestHarness
import kafka.server.KafkaConfig
import kafka.utils.{ShutdownableThread, TestUtils}
import org.apache.kafka.clients.producer._
import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
import org.junit.Assert._
import org.junit.Test
class ProducerBounceTest extends KafkaServerTestHarness {
private val producerBufferSize = 30000
private val serverMessageMaxBytes = producerBufferSize/2
val numServers = 2
val overridingProps = new Properties()
overridingProps.put(KafkaConfig.AutoCreateTopicsEnableProp, false.toString)
overridingProps.put(KafkaConfig.MessageMaxBytesProp, serverMessageMaxBytes.toString)
// Set a smaller value for the number of partitions for the offset commit topic (__consumer_offset topic)
// so that the creation of that topic/partition(s) and subsequent leader assignment doesn't take relatively long
overridingProps.put(KafkaConfig.OffsetsTopicPartitionsProp, 1.toString)
// This is the one of the few tests we currently allow to preallocate ports, despite the fact that this can result in transient
// failures due to ports getting reused. We can't use random ports because of bad behavior that can result from bouncing
// brokers too quickly when they get new, random ports. If we're not careful, the client can end up in a situation
// where metadata is not refreshed quickly enough, and by the time it's actually trying to, all the servers have
// been bounced and have new addresses. None of the bootstrap nodes or current metadata can get them connected to a
// running server.
//
// Since such quick rotation of servers is incredibly unrealistic, we allow this one test to preallocate ports, leaving
// a small risk of hitting errors due to port conflicts. Hopefully this is infrequent enough to not cause problems.
override def generateConfigs() = {
FixedPortTestUtils.createBrokerConfigs(numServers, zkConnect,enableControlledShutdown = false)
.map(KafkaConfig.fromProps(_, overridingProps))
}
private var consumer1: SimpleConsumer = null
private var consumer2: SimpleConsumer = null
private var producer1: KafkaProducer[Array[Byte],Array[Byte]] = null
private var producer2: KafkaProducer[Array[Byte],Array[Byte]] = null
private var producer3: KafkaProducer[Array[Byte],Array[Byte]] = null
private var producer4: KafkaProducer[Array[Byte],Array[Byte]] = null
private val topic1 = "topic-1"
private val topic2 = "topic-2"
override def setUp() {
super.setUp()
producer1 = TestUtils.createNewProducer(brokerList, acks = 0, blockOnBufferFull = false, bufferSize = producerBufferSize)
producer2 = TestUtils.createNewProducer(brokerList, acks = 1, blockOnBufferFull = false, bufferSize = producerBufferSize)
producer3 = TestUtils.createNewProducer(brokerList, acks = -1, blockOnBufferFull = false, bufferSize = producerBufferSize)
}
override def tearDown() {
if (producer1 != null) producer1.close
if (producer2 != null) producer2.close
if (producer3 != null) producer3.close
if (producer4 != null) producer4.close
super.tearDown()
}
/**
* With replication, producer should able able to find new leader after it detects broker failure
*/
@Test
def testBrokerFailure() {
val numPartitions = 3
val leaders = TestUtils.createTopic(zkClient, topic1, numPartitions, numServers, servers)
assertTrue("Leader of all partitions of the topic should exist", leaders.values.forall(leader => leader.isDefined))
val scheduler = new ProducerScheduler()
scheduler.start
// rolling bounce brokers
for (i <- 0 until numServers) {
for (server <- servers) {
server.shutdown()
server.awaitShutdown()
server.startup()
Thread.sleep(2000)
}
// Make sure the producer do not see any exception
// in returned metadata due to broker failures
assertTrue(scheduler.failed == false)
// Make sure the leader still exists after bouncing brokers
(0 until numPartitions).foreach(partition => TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic1, partition))
}
scheduler.shutdown
// Make sure the producer do not see any exception
// when draining the left messages on shutdown
assertTrue(scheduler.failed == false)
// double check that the leader info has been propagated after consecutive bounces
val newLeaders = (0 until numPartitions).map(i => TestUtils.waitUntilMetadataIsPropagated(servers, topic1, i))
val fetchResponses = newLeaders.zipWithIndex.map { case (leader, partition) =>
// Consumers must be instantiated after all the restarts since they use random ports each time they start up
val consumer = new SimpleConsumer("localhost", servers(leader).boundPort(), 100, 1024 * 1024, "")
val response = consumer.fetch(new FetchRequestBuilder().addFetch(topic1, partition, 0, Int.MaxValue).build()).messageSet(topic1, partition)
consumer.close
response
}
val messages = fetchResponses.flatMap(r => r.iterator.toList.map(_.message))
val uniqueMessages = messages.toSet
val uniqueMessageSize = uniqueMessages.size
assertEquals("Should have fetched " + scheduler.sent + " unique messages", scheduler.sent, uniqueMessageSize)
}
private class ProducerScheduler extends ShutdownableThread("daemon-producer", false)
{
val numRecords = 1000
var sent = 0
var failed = false
val producer = TestUtils.createNewProducer(brokerList, bufferSize = producerBufferSize, retries = 10)
override def doWork(): Unit = {
val responses =
for (i <- sent+1 to sent+numRecords)
yield producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, null, i.toString.getBytes),
new ErrorLoggingCallback(topic1, null, null, true))
val futures = responses.toList
try {
futures.map(_.get)
sent += numRecords
} catch {
case e : Exception => failed = true
}
}
override def shutdown(){
super.shutdown()
producer.close
}
}
}

View File

@ -34,24 +34,22 @@ import kafka.message.Message
import kafka.zk.ZooKeeperTestHarness
import kafka.utils.{Utils, TestUtils}
import scala.Array
@RunWith(value = classOf[Parameterized])
class ProducerCompressionTest(compression: String) extends JUnit3Suite with ZooKeeperTestHarness {
private val brokerId = 0
private val port = TestUtils.choosePort
private var server: KafkaServer = null
private val props = TestUtils.createBrokerConfig(brokerId, port)
private val config = KafkaConfig.fromProps(props)
private val topic = "topic"
private val numRecords = 2000
@Before
override def setUp() {
super.setUp()
val props = TestUtils.createBrokerConfig(brokerId, zkConnect)
val config = KafkaConfig.fromProps(props)
server = TestUtils.createServer(config)
}
@ -71,14 +69,14 @@ class ProducerCompressionTest(compression: String) extends JUnit3Suite with ZooK
def testCompression() {
val props = new Properties()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config)))
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.getBrokerListStrFromServers(Seq(server)))
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compression)
props.put(ProducerConfig.BATCH_SIZE_CONFIG, "66000")
props.put(ProducerConfig.LINGER_MS_CONFIG, "200")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
var producer = new KafkaProducer[Array[Byte],Array[Byte]](props)
val consumer = new SimpleConsumer("localhost", port, 100, 1024*1024, "")
val consumer = new SimpleConsumer("localhost", server.boundPort(), 100, 1024*1024, "")
try {
// create topic

View File

@ -20,7 +20,6 @@ package kafka.api
import org.junit.Test
import org.junit.Assert._
import java.lang.Integer
import java.util.{Properties, Random}
import java.util.concurrent.{TimeoutException, TimeUnit, ExecutionException}
@ -28,7 +27,7 @@ import kafka.common.Topic
import kafka.consumer.SimpleConsumer
import kafka.integration.KafkaServerTestHarness
import kafka.server.KafkaConfig
import kafka.utils.{TestZKUtils, ShutdownableThread, TestUtils}
import kafka.utils.{ShutdownableThread, TestUtils}
import org.apache.kafka.common.KafkaException
import org.apache.kafka.common.errors.{InvalidTopicException, NotEnoughReplicasException, NotEnoughReplicasAfterAppendException}
@ -42,16 +41,14 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
val numServers = 2
val overridingProps = new Properties()
overridingProps.put(KafkaConfig.ZkConnectProp, TestZKUtils.zookeeperConnect)
overridingProps.put(KafkaConfig.AutoCreateTopicsEnableProp, false.toString)
overridingProps.put(KafkaConfig.MessageMaxBytesProp, serverMessageMaxBytes.toString)
// Set a smaller value for the number of partitions for the offset commit topic (__consumer_offset topic)
// so that the creation of that topic/partition(s) and subsequent leader assignment doesn't take relatively long
overridingProps.put(KafkaConfig.OffsetsTopicPartitionsProp, 1.toString)
val configs =
for (props <- TestUtils.createBrokerConfigs(numServers, false))
yield KafkaConfig.fromProps(props, overridingProps)
def generateConfigs() =
TestUtils.createBrokerConfigs(numServers, zkConnect, false).map(KafkaConfig.fromProps(_, overridingProps))
private var consumer1: SimpleConsumer = null
private var consumer2: SimpleConsumer = null
@ -67,19 +64,12 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
override def setUp() {
super.setUp()
// TODO: we need to migrate to new consumers when 0.9 is final
consumer1 = new SimpleConsumer("localhost", configs(0).port, 100, 1024*1024, "")
consumer2 = new SimpleConsumer("localhost", configs(1).port, 100, 1024*1024, "")
producer1 = TestUtils.createNewProducer(brokerList, acks = 0, blockOnBufferFull = false, bufferSize = producerBufferSize)
producer2 = TestUtils.createNewProducer(brokerList, acks = 1, blockOnBufferFull = false, bufferSize = producerBufferSize)
producer3 = TestUtils.createNewProducer(brokerList, acks = -1, blockOnBufferFull = false, bufferSize = producerBufferSize)
}
override def tearDown() {
consumer1.close
consumer2.close
if (producer1 != null) producer1.close
if (producer2 != null) producer2.close
if (producer3 != null) producer3.close
@ -94,7 +84,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
@Test
def testTooLargeRecordWithAckZero() {
// create topic
TestUtils.createTopic(zkClient, topic1, 1, 2, servers)
TestUtils.createTopic(zkClient, topic1, 1, numServers, servers)
// send a too-large record
val record = new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, "key".getBytes, new Array[Byte](serverMessageMaxBytes + 1))
@ -107,7 +97,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
@Test
def testTooLargeRecordWithAckOne() {
// create topic
TestUtils.createTopic(zkClient, topic1, 1, 2, servers)
TestUtils.createTopic(zkClient, topic1, 1, numServers, servers)
// send a too-large record
val record = new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, "key".getBytes, new Array[Byte](serverMessageMaxBytes + 1))
@ -141,7 +131,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
@Test
def testWrongBrokerList() {
// create topic
TestUtils.createTopic(zkClient, topic1, 1, 2, servers)
TestUtils.createTopic(zkClient, topic1, 1, numServers, servers)
// producer with incorrect broker list
producer4 = TestUtils.createNewProducer("localhost:8686,localhost:4242", acks = 1, blockOnBufferFull = false, bufferSize = producerBufferSize)
@ -161,7 +151,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
@Test
def testNoResponse() {
// create topic
TestUtils.createTopic(zkClient, topic1, 1, 2, servers)
TestUtils.createTopic(zkClient, topic1, 1, numServers, servers)
// first send a message to make sure the metadata is refreshed
val record1 = new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, "key".getBytes, "value".getBytes)
@ -202,7 +192,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
@Test
def testInvalidPartition() {
// create topic
TestUtils.createTopic(zkClient, topic1, 1, 2, servers)
TestUtils.createTopic(zkClient, topic1, 1, numServers, servers)
// create a record with incorrect partition id, send should fail
val record = new ProducerRecord[Array[Byte],Array[Byte]](topic1, new Integer(1), "key".getBytes, "value".getBytes)
@ -223,7 +213,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
@Test
def testSendAfterClosed() {
// create topic
TestUtils.createTopic(zkClient, topic1, 1, 2, servers)
TestUtils.createTopic(zkClient, topic1, 1, numServers, servers)
val record = new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, "key".getBytes, "value".getBytes)
@ -248,59 +238,6 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
// re-close producer is fine
}
/**
* With replication, producer should able able to find new leader after it detects broker failure
*/
@Test
def testBrokerFailure() {
// create topic
val leaders = TestUtils.createTopic(zkClient, topic1, 1, 2, servers)
val partition = 0
assertTrue("Leader of partition 0 of the topic should exist", leaders(partition).isDefined)
val scheduler = new ProducerScheduler()
scheduler.start
// rolling bounce brokers
for (i <- 0 until 2) {
for (server <- servers) {
server.shutdown()
server.awaitShutdown()
server.startup()
Thread.sleep(2000)
}
// Make sure the producer do not see any exception
// in returned metadata due to broker failures
assertTrue(scheduler.failed == false)
// Make sure the leader still exists after bouncing brokers
TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic1, partition)
}
scheduler.shutdown
// Make sure the producer do not see any exception
// when draining the left messages on shutdown
assertTrue(scheduler.failed == false)
// double check that the leader info has been propagated after consecutive bounces
val leader = TestUtils.waitUntilMetadataIsPropagated(servers, topic1, partition)
val fetchResponse = if(leader == configs(0).brokerId) {
consumer1.fetch(new FetchRequestBuilder().addFetch(topic1, partition, 0, Int.MaxValue).build()).messageSet(topic1, partition)
} else {
consumer2.fetch(new FetchRequestBuilder().addFetch(topic1, partition, 0, Int.MaxValue).build()).messageSet(topic1, partition)
}
val messages = fetchResponse.iterator.toList.map(_.message)
val uniqueMessages = messages.toSet
val uniqueMessageSize = uniqueMessages.size
assertEquals("Should have fetched " + scheduler.sent + " unique messages", scheduler.sent, uniqueMessageSize)
}
@Test
def testCannotSendToInternalTopic() {
val thrown = intercept[ExecutionException] {
@ -313,9 +250,9 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
def testNotEnoughReplicas() {
val topicName = "minisrtest"
val topicProps = new Properties()
topicProps.put("min.insync.replicas","3")
topicProps.put("min.insync.replicas",(numServers+1).toString)
TestUtils.createTopic(zkClient, topicName, 1, 2, servers,topicProps)
TestUtils.createTopic(zkClient, topicName, 1, numServers, servers, topicProps)
val record = new ProducerRecord[Array[Byte],Array[Byte]](topicName, null, "key".getBytes, "value".getBytes)
try {
@ -333,9 +270,9 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
def testNotEnoughReplicasAfterBrokerShutdown() {
val topicName = "minisrtest2"
val topicProps = new Properties()
topicProps.put("min.insync.replicas","2")
topicProps.put("min.insync.replicas",numServers.toString)
TestUtils.createTopic(zkClient, topicName, 1, 2, servers,topicProps)
TestUtils.createTopic(zkClient, topicName, 1, numServers, servers,topicProps)
val record = new ProducerRecord[Array[Byte],Array[Byte]](topicName, null, "key".getBytes, "value".getBytes)
// this should work with all brokers up and running

View File

@ -25,7 +25,7 @@ import org.junit.Test
import org.junit.Assert._
import kafka.server.KafkaConfig
import kafka.utils.{TestZKUtils, TestUtils}
import kafka.utils.TestUtils
import kafka.consumer.SimpleConsumer
import kafka.message.Message
import kafka.integration.KafkaServerTestHarness
@ -39,12 +39,10 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness {
val numServers = 2
val overridingProps = new Properties()
overridingProps.put(KafkaConfig.ZkConnectProp, TestZKUtils.zookeeperConnect)
overridingProps.put(KafkaConfig.NumPartitionsProp, 4.toString)
val configs =
for (props <- TestUtils.createBrokerConfigs(numServers, false))
yield KafkaConfig.fromProps(props, overridingProps)
def generateConfigs() =
TestUtils.createBrokerConfigs(numServers, zkConnect, false).map(KafkaConfig.fromProps(_, overridingProps))
private var consumer1: SimpleConsumer = null
private var consumer2: SimpleConsumer = null
@ -56,8 +54,8 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness {
super.setUp()
// TODO: we need to migrate to new consumers when 0.9 is final
consumer1 = new SimpleConsumer("localhost", configs(0).port, 100, 1024*1024, "")
consumer2 = new SimpleConsumer("localhost", configs(1).port, 100, 1024*1024, "")
consumer1 = new SimpleConsumer("localhost", servers(0).boundPort(), 100, 1024*1024, "")
consumer2 = new SimpleConsumer("localhost", servers(1).boundPort(), 100, 1024*1024, "")
}
override def tearDown() {

View File

@ -27,21 +27,7 @@ import kafka.client.ClientUtils
import kafka.server.{KafkaConfig, KafkaServer}
class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness {
val brokerId1 = 0
val brokerId2 = 1
val brokerId3 = 2
val brokerId4 = 3
val port1 = TestUtils.choosePort()
val port2 = TestUtils.choosePort()
val port3 = TestUtils.choosePort()
val port4 = TestUtils.choosePort()
val configProps1 = TestUtils.createBrokerConfig(brokerId1, port1, false)
val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2, false)
val configProps3 = TestUtils.createBrokerConfig(brokerId3, port3, false)
val configProps4 = TestUtils.createBrokerConfig(brokerId4, port4, false)
var configs: Seq[KafkaConfig] = null
var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
var brokers: Seq[Broker] = Seq.empty[Broker]
@ -54,14 +40,11 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness {
override def setUp() {
super.setUp()
// start all the servers
val server1 = TestUtils.createServer(KafkaConfig.fromProps(configProps1))
val server2 = TestUtils.createServer(KafkaConfig.fromProps(configProps2))
val server3 = TestUtils.createServer(KafkaConfig.fromProps(configProps3))
val server4 = TestUtils.createServer(KafkaConfig.fromProps(configProps4))
servers ++= List(server1, server2, server3, server4)
brokers = servers.map(s => new Broker(s.config.brokerId, s.config.hostName, s.config.port))
configs = (0 until 4).map(i => KafkaConfig.fromProps(TestUtils.createBrokerConfig(i, zkConnect, enableControlledShutdown = false)))
// start all the servers
servers = configs.map(c => TestUtils.createServer(c))
brokers = servers.map(s => new Broker(s.config.brokerId, s.config.hostName, s.boundPort))
// create topics first
createTopic(zkClient, topic1, partitionReplicaAssignment = Map(0->Seq(0,1)), servers = servers)

View File

@ -145,7 +145,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
val expectedReplicaAssignment = Map(0 -> List(0, 1, 2))
val topic = "test"
// create brokers
val servers = TestUtils.createBrokerConfigs(4, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
val servers = TestUtils.createBrokerConfigs(4, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
// create the topic
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
// reassign partition 0
@ -176,7 +176,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
val expectedReplicaAssignment = Map(0 -> List(0, 1, 2))
val topic = "test"
// create brokers
val servers = TestUtils.createBrokerConfigs(4, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
val servers = TestUtils.createBrokerConfigs(4, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
// create the topic
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
// reassign partition 0
@ -207,7 +207,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
val expectedReplicaAssignment = Map(0 -> List(0, 1))
val topic = "test"
// create brokers
val servers = TestUtils.createBrokerConfigs(4, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
val servers = TestUtils.createBrokerConfigs(4, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
// create the topic
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
// reassign partition 0
@ -236,7 +236,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
def testReassigningNonExistingPartition() {
val topic = "test"
// create brokers
val servers = TestUtils.createBrokerConfigs(4, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
val servers = TestUtils.createBrokerConfigs(4, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
// reassign partition 0
val newReplicas = Seq(2, 3)
val partitionToBeReassigned = 0
@ -262,7 +262,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map(topicAndPartition -> newReplicas))
reassignPartitionsCommand.reassignPartitions
// create brokers
val servers = TestUtils.createBrokerConfigs(2, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
val servers = TestUtils.createBrokerConfigs(2, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
// wait until reassignment completes
TestUtils.waitUntilTrue(() => !checkIfReassignPartitionPathExists(zkClient),
@ -298,7 +298,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
val partition = 1
val preferredReplica = 0
// create brokers
val serverConfigs = TestUtils.createBrokerConfigs(3, false).map(KafkaConfig.fromProps)
val serverConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false).map(KafkaConfig.fromProps)
// create the topic
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
val servers = serverConfigs.reverse.map(s => TestUtils.createServer(s))
@ -318,7 +318,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
val topic = "test"
val partition = 1
// create brokers
val serverConfigs = TestUtils.createBrokerConfigs(3, false).map(KafkaConfig.fromProps)
val serverConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false).map(KafkaConfig.fromProps)
val servers = serverConfigs.reverse.map(s => TestUtils.createServer(s))
// create the topic
TestUtils.createTopic(zkClient, topic, partitionReplicaAssignment = expectedReplicaAssignment, servers = servers)
@ -365,7 +365,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
def testTopicConfigChange() {
val partitions = 3
val topic = "my-topic"
val server = TestUtils.createServer(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0)))
val server = TestUtils.createServer(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
def makeConfig(messageSize: Int, retentionMs: Long) = {
var props = new Properties()

View File

@ -17,7 +17,7 @@
package kafka.admin
import org.scalatest.junit.JUnit3Suite
import kafka.utils.{ZKGroupDirs, ZKGroupTopicDirs, ZkUtils, TestUtils}
import kafka.utils._
import kafka.server.KafkaConfig
import org.junit.Test
import kafka.consumer._
@ -26,7 +26,7 @@ import kafka.integration.KafkaServerTestHarness
class DeleteConsumerGroupTest extends JUnit3Suite with KafkaServerTestHarness {
val configs = TestUtils.createBrokerConfigs(3, false, true).map(KafkaConfig.fromProps)
def generateConfigs() = TestUtils.createBrokerConfigs(3, zkConnect, false, true).map(KafkaConfig.fromProps)
@Test
def testGroupWideDeleteInZK() {

View File

@ -96,7 +96,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
val expectedReplicaAssignment = Map(0 -> List(0, 1, 2))
val topic = "test"
val topicAndPartition = TopicAndPartition(topic, 0)
val brokerConfigs = TestUtils.createBrokerConfigs(4, false)
val brokerConfigs = TestUtils.createBrokerConfigs(4, zkConnect, false)
brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true"))
// create brokers
val allServers = brokerConfigs.map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
@ -224,7 +224,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
val topicAndPartition = TopicAndPartition(topicName, 0)
val topic = topicAndPartition.topic
val brokerConfigs = TestUtils.createBrokerConfigs(3, false)
val brokerConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false)
brokerConfigs(0).setProperty("delete.topic.enable", "true")
brokerConfigs(0).setProperty("log.cleaner.enable","true")
brokerConfigs(0).setProperty("log.cleanup.policy","compact")
@ -253,7 +253,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
private def createTestTopicAndCluster(topic: String): Seq[KafkaServer] = {
val brokerConfigs = TestUtils.createBrokerConfigs(3, false)
val brokerConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false)
brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true")
)
createTestTopicAndCluster(topic,brokerConfigs)

View File

@ -30,7 +30,6 @@ import kafka.utils.TestUtils._
import kafka.utils._
import org.junit.Test
import kafka.serializer._
import kafka.cluster.{Broker, Cluster}
import org.scalatest.junit.JUnit3Suite
import kafka.integration.KafkaServerTestHarness
@ -38,31 +37,27 @@ class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness {
val numNodes = 1
val overridingProps = new Properties()
overridingProps.put(KafkaConfig.ZkConnectProp, TestZKUtils.zookeeperConnect)
val configs =
for(props <- TestUtils.createBrokerConfigs(numNodes))
yield KafkaConfig.fromProps(props, overridingProps)
def generateConfigs() = TestUtils.createBrokerConfigs(numNodes, zkConnect).map(KafkaConfig.fromProps)
val messages = new mutable.HashMap[Int, Seq[Message]]
val topic = "topic"
val group = "group1"
val consumer0 = "consumer0"
val consumedOffset = 5
val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, "localhost", c.port)))
val queue = new LinkedBlockingQueue[FetchedDataChunk]
val topicInfos = configs.map(c => new PartitionTopicInfo(topic,
0,
queue,
new AtomicLong(consumedOffset),
new AtomicLong(0),
new AtomicInteger(0),
""))
val consumerConfig = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer0))
var topicInfos: Seq[PartitionTopicInfo] = null
def consumerConfig = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer0))
override def setUp() {
super.setUp
super.setUp()
topicInfos = configs.map(c => new PartitionTopicInfo(topic,
0,
queue,
new AtomicLong(consumedOffset),
new AtomicLong(0),
new AtomicInteger(0),
""))
createTopic(zkClient, topic, partitionReplicaAssignment = Map(0 -> Seq(configs.head.brokerId)), servers = servers)
}

View File

@ -38,17 +38,14 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
val RebalanceBackoffMs = 5000
var dirs : ZKGroupTopicDirs = null
val zookeeperConnect = TestZKUtils.zookeeperConnect
val numNodes = 2
val numParts = 2
val topic = "topic1"
val overridingProps = new Properties()
overridingProps.put(KafkaConfig.ZkConnectProp, zookeeperConnect)
overridingProps.put(KafkaConfig.NumPartitionsProp, numParts.toString)
val configs =
for (props <- TestUtils.createBrokerConfigs(numNodes))
yield KafkaConfig.fromProps(props, overridingProps)
override def generateConfigs() = TestUtils.createBrokerConfigs(numNodes, zkConnect)
.map(KafkaConfig.fromProps(_, overridingProps))
val group = "group1"
val consumer0 = "consumer0"
@ -93,8 +90,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
zkConsumerConnector0.shutdown
// send some messages to each broker
val sentMessages1 = sendMessagesToPartition(configs, topic, 0, nMessages) ++
sendMessagesToPartition(configs, topic, 1, nMessages)
val sentMessages1 = sendMessagesToPartition(servers, topic, 0, nMessages) ++
sendMessagesToPartition(servers, topic, 1, nMessages)
// wait to make sure the topic and partition have a leader for the successful case
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
@ -127,8 +124,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
val zkConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, true)
val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder())
// send some messages to each broker
val sentMessages2 = sendMessagesToPartition(configs, topic, 0, nMessages) ++
sendMessagesToPartition(configs, topic, 1, nMessages)
val sentMessages2 = sendMessagesToPartition(servers, topic, 0, nMessages) ++
sendMessagesToPartition(servers, topic, 1, nMessages)
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1)
@ -148,8 +145,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
val zkConsumerConnector3 = new ZookeeperConsumerConnector(consumerConfig3, true)
val topicMessageStreams3 = zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int]())
// send some messages to each broker
val sentMessages3 = sendMessagesToPartition(configs, topic, 0, nMessages) ++
sendMessagesToPartition(configs, topic, 1, nMessages)
val sentMessages3 = sendMessagesToPartition(servers, topic, 0, nMessages) ++
sendMessagesToPartition(servers, topic, 1, nMessages)
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1)
@ -182,8 +179,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
requestHandlerLogger.setLevel(Level.FATAL)
// send some messages to each broker
val sentMessages1 = sendMessagesToPartition(configs, topic, 0, nMessages, GZIPCompressionCodec) ++
sendMessagesToPartition(configs, topic, 1, nMessages, GZIPCompressionCodec)
val sentMessages1 = sendMessagesToPartition(servers, topic, 0, nMessages, GZIPCompressionCodec) ++
sendMessagesToPartition(servers, topic, 1, nMessages, GZIPCompressionCodec)
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1)
@ -215,8 +212,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
val zkConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, true)
val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder())
// send some messages to each broker
val sentMessages2 = sendMessagesToPartition(configs, topic, 0, nMessages, GZIPCompressionCodec) ++
sendMessagesToPartition(configs, topic, 1, nMessages, GZIPCompressionCodec)
val sentMessages2 = sendMessagesToPartition(servers, topic, 0, nMessages, GZIPCompressionCodec) ++
sendMessagesToPartition(servers, topic, 1, nMessages, GZIPCompressionCodec)
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1)
@ -236,8 +233,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
val zkConsumerConnector3 = new ZookeeperConsumerConnector(consumerConfig3, true)
val topicMessageStreams3 = zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int](), new StringDecoder(), new StringDecoder())
// send some messages to each broker
val sentMessages3 = sendMessagesToPartition(configs, topic, 0, nMessages, GZIPCompressionCodec) ++
sendMessagesToPartition(configs, topic, 1, nMessages, GZIPCompressionCodec)
val sentMessages3 = sendMessagesToPartition(servers, topic, 0, nMessages, GZIPCompressionCodec) ++
sendMessagesToPartition(servers, topic, 1, nMessages, GZIPCompressionCodec)
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1)
@ -258,8 +255,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
def testCompressionSetConsumption() {
// send some messages to each broker
val sentMessages = sendMessagesToPartition(configs, topic, 0, 200, DefaultCompressionCodec) ++
sendMessagesToPartition(configs, topic, 1, 200, DefaultCompressionCodec)
val sentMessages = sendMessagesToPartition(servers, topic, 0, 200, DefaultCompressionCodec) ++
sendMessagesToPartition(servers, topic, 1, 200, DefaultCompressionCodec)
TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0)
TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1)
@ -284,8 +281,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
requestHandlerLogger.setLevel(Level.FATAL)
// send some messages to each broker
val sentMessages = sendMessagesToPartition(configs, topic, 0, nMessages, NoCompressionCodec) ++
sendMessagesToPartition(configs, topic, 1, nMessages, NoCompressionCodec)
val sentMessages = sendMessagesToPartition(servers, topic, 0, nMessages, NoCompressionCodec) ++
sendMessagesToPartition(servers, topic, 1, nMessages, NoCompressionCodec)
TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0)
TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1)
@ -319,13 +316,13 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
}
def testLeaderSelectionForPartition() {
val zkClient = new ZkClient(zookeeperConnect, 6000, 30000, ZKStringSerializer)
val zkClient = new ZkClient(zkConnect, 6000, 30000, ZKStringSerializer)
// create topic topic1 with 1 partition on broker 0
createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = servers)
// send some messages to each broker
val sentMessages1 = sendMessages(configs, topic, "producer1", nMessages, "batch1", NoCompressionCodec, 1)
val sentMessages1 = sendMessages(servers, topic, "producer1", nMessages, "batch1", NoCompressionCodec, 1)
// create a consumer
val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1))
@ -351,8 +348,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
def testConsumerRebalanceListener() {
// Send messages to create topic
sendMessagesToPartition(configs, topic, 0, nMessages)
sendMessagesToPartition(configs, topic, 1, nMessages)
sendMessagesToPartition(servers, topic, 0, nMessages)
sendMessagesToPartition(servers, topic, 1, nMessages)
val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1))
val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)

View File

@ -31,7 +31,7 @@ import junit.framework.Assert._
class AutoOffsetResetTest extends JUnit3Suite with KafkaServerTestHarness with Logging {
val configs = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0)))
def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
val topic = "test_topic"
val group = "default_group"
@ -78,7 +78,7 @@ class AutoOffsetResetTest extends JUnit3Suite with KafkaServerTestHarness with L
TestUtils.createTopic(zkClient, topic, 1, 1, servers)
val producer: Producer[String, Array[Byte]] = TestUtils.createProducer(
TestUtils.getBrokerListStrFromConfigs(configs),
TestUtils.getBrokerListStrFromServers(servers),
keyEncoder = classOf[StringEncoder].getName)
for(i <- 0 until numMessages)

View File

@ -32,23 +32,13 @@ import kafka.utils.TestUtils._
import kafka.utils.TestUtils
class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
val numNodes = 1
val configs =
for(props <- TestUtils.createBrokerConfigs(numNodes))
yield KafkaConfig.fromProps(props)
def generateConfigs() = TestUtils.createBrokerConfigs(numNodes, zkConnect).map(KafkaConfig.fromProps)
val messages = new mutable.HashMap[Int, Seq[Array[Byte]]]
val topic = "topic"
val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, "localhost", c.port)))
val shutdown = ZookeeperConsumerConnector.shutdownCommand
val queue = new LinkedBlockingQueue[FetchedDataChunk]
val topicInfos = configs.map(c => new PartitionTopicInfo(topic,
0,
queue,
new AtomicLong(0),
new AtomicLong(0),
new AtomicInteger(0),
""))
var fetcher: ConsumerFetcherManager = null
@ -56,8 +46,18 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
super.setUp
createTopic(zkClient, topic, partitionReplicaAssignment = Map(0 -> Seq(configs.head.brokerId)), servers = servers)
val cluster = new Cluster(servers.map(s => new Broker(s.config.brokerId, "localhost", s.boundPort())))
fetcher = new ConsumerFetcherManager("consumer1", new ConsumerConfig(TestUtils.createConsumerProperties("", "", "")), zkClient)
fetcher.stopConnections()
val topicInfos = configs.map(c =>
new PartitionTopicInfo(topic,
0,
queue,
new AtomicLong(0),
new AtomicLong(0),
new AtomicInteger(0),
""))
fetcher.startConnections(topicInfos, cluster)
}
@ -83,7 +83,7 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
var count = 0
for(conf <- configs) {
val producer: Producer[String, Array[Byte]] = TestUtils.createProducer(
TestUtils.getBrokerListStrFromConfigs(configs),
TestUtils.getBrokerListStrFromServers(servers),
keyEncoder = classOf[StringEncoder].getName)
val ms = 0.until(messagesPerNode).map(x => (conf.brokerId * 5 + x).toString.getBytes).toArray
messages += conf.brokerId -> ms

View File

@ -24,28 +24,38 @@ import kafka.utils.{Utils, TestUtils}
import org.scalatest.junit.JUnit3Suite
import kafka.zk.ZooKeeperTestHarness
import kafka.common.KafkaException
import kafka.utils.TestUtils
/**
* A test harness that brings up some number of broker nodes
*/
trait KafkaServerTestHarness extends JUnit3Suite with ZooKeeperTestHarness {
val configs: List[KafkaConfig]
var instanceConfigs: Seq[KafkaConfig] = null
var servers: Buffer[KafkaServer] = null
var brokerList: String = null
var alive: Array[Boolean] = null
/**
* Implementations must override this method to return a set of KafkaConfigs. This method will be invoked for every
* test and should not reuse previous configurations unless they select their ports randomly when servers are started.
*/
def generateConfigs(): Seq[KafkaConfig]
def configs: Seq[KafkaConfig] = {
if (instanceConfigs == null)
instanceConfigs = generateConfigs()
instanceConfigs
}
def serverForId(id: Int) = servers.find(s => s.config.brokerId == id)
def bootstrapUrl = configs.map(c => c.hostName + ":" + c.port).mkString(",")
def bootstrapUrl = servers.map(s => s.config.hostName + ":" + s.boundPort()).mkString(",")
override def setUp() {
super.setUp
if(configs.size <= 0)
throw new KafkaException("Must suply at least one server config.")
brokerList = TestUtils.getBrokerListStrFromConfigs(configs)
throw new KafkaException("Must supply at least one server config.")
servers = configs.map(TestUtils.createServer(_)).toBuffer
brokerList = TestUtils.getBrokerListStrFromServers(servers)
alive = new Array[Boolean](servers.length)
Arrays.fill(alive, true)
}

View File

@ -23,16 +23,13 @@ import kafka.api.{PartitionFetchInfo, FetchRequest, FetchRequestBuilder}
import kafka.server.{KafkaRequestHandler, KafkaConfig}
import kafka.producer.{KeyedMessage, Producer}
import org.apache.log4j.{Level, Logger}
import org.I0Itec.zkclient.ZkClient
import kafka.zk.ZooKeeperTestHarness
import org.scalatest.junit.JUnit3Suite
import scala.collection._
import kafka.admin.AdminUtils
import kafka.common.{TopicAndPartition, ErrorMapping, UnknownTopicOrPartitionException, OffsetOutOfRangeException}
import kafka.utils.{StaticPartitioner, TestUtils, Utils}
import kafka.serializer.StringEncoder
import java.util.Properties
import TestUtils._
/**
* End to end tests of the primitive apis against a local server
@ -40,10 +37,7 @@ import TestUtils._
class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with ZooKeeperTestHarness {
val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
val port = TestUtils.choosePort()
val props = TestUtils.createBrokerConfig(0, port)
val config = KafkaConfig.fromProps(props)
val configs = List(config)
def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
def testFetchRequestCanProperlySerialize() {
val request = new FetchRequestBuilder()
@ -97,7 +91,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
props.put("compression.codec", "gzip")
val stringProducer1 = TestUtils.createProducer[String, String](
TestUtils.getBrokerListStrFromConfigs(configs),
TestUtils.getBrokerListStrFromServers(servers),
encoder = classOf[StringEncoder].getName,
keyEncoder = classOf[StringEncoder].getName,
partitioner = classOf[StaticPartitioner].getName,
@ -222,7 +216,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
props.put("request.required.acks", "0")
val pipelinedProducer: Producer[String, String] =
TestUtils.createProducer[String, String](
TestUtils.getBrokerListStrFromConfigs(configs),
TestUtils.getBrokerListStrFromServers(servers),
encoder = classOf[StringEncoder].getName,
keyEncoder = classOf[StringEncoder].getName,
partitioner = classOf[StaticPartitioner].getName,

View File

@ -24,18 +24,17 @@ import kafka.utils.{StaticPartitioner, TestUtils}
import kafka.serializer.StringEncoder
trait ProducerConsumerTestHarness extends JUnit3Suite with KafkaServerTestHarness {
val port: Int
val host = "localhost"
var producer: Producer[String, String] = null
var consumer: SimpleConsumer = null
override def setUp() {
super.setUp
producer = TestUtils.createProducer[String, String](TestUtils.getBrokerListStrFromConfigs(configs),
producer = TestUtils.createProducer[String, String](TestUtils.getBrokerListStrFromServers(servers),
encoder = classOf[StringEncoder].getName,
keyEncoder = classOf[StringEncoder].getName,
partitioner = classOf[StaticPartitioner].getName)
consumer = new SimpleConsumer(host, port, 1000000, 64*1024, "")
consumer = new SimpleConsumer(host, servers(0).boundPort(), 1000000, 64*1024, "")
}
override def tearDown() {

View File

@ -25,35 +25,18 @@ import kafka.utils.{Utils, TestUtils}
import kafka.server.{KafkaConfig, KafkaServer}
class RollingBounceTest extends JUnit3Suite with ZooKeeperTestHarness {
val brokerId1 = 0
val brokerId2 = 1
val brokerId3 = 2
val brokerId4 = 3
val port1 = TestUtils.choosePort()
val port2 = TestUtils.choosePort()
val port3 = TestUtils.choosePort()
val port4 = TestUtils.choosePort()
// controlled.shutdown.enable is true by default
val configProps1 = TestUtils.createBrokerConfig(brokerId1, port1)
val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2)
val configProps3 = TestUtils.createBrokerConfig(brokerId3, port3)
val configProps4 = TestUtils.createBrokerConfig(brokerId4, port4)
var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
val partitionId = 0
var servers: Seq[KafkaServer] = null
override def setUp() {
super.setUp()
// controlled.shutdown.enable is true by default
val configs = (0 until 4).map(i => TestUtils.createBrokerConfig(i, zkConnect))
configs(3).put("controlled.shutdown.retry.backoff.ms", "100")
// start all the servers
val server1 = TestUtils.createServer(KafkaConfig.fromProps(configProps1))
val server2 = TestUtils.createServer(KafkaConfig.fromProps(configProps2))
val server3 = TestUtils.createServer(KafkaConfig.fromProps(configProps3))
val server4 = TestUtils.createServer(KafkaConfig.fromProps(configProps4))
servers ++= List(server1, server2, server3, server4)
servers = configs.map(c => TestUtils.createServer(KafkaConfig.fromProps(c)))
}
override def tearDown() {

View File

@ -31,14 +31,15 @@ import kafka.common.ErrorMapping
import kafka.client.ClientUtils
class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
val props = createBrokerConfigs(1)
val configs = props.map(p => KafkaConfig.fromProps(p))
private var server1: KafkaServer = null
val brokers = configs.map(c => new Broker(c.brokerId,c.hostName,c.port))
var brokers: Seq[Broker] = null
override def setUp() {
super.setUp()
val props = createBrokerConfigs(1, zkConnect)
val configs = props.map(KafkaConfig.fromProps)
server1 = TestUtils.createServer(configs.head)
brokers = Seq(new Broker(server1.config.brokerId, server1.config.hostName, server1.boundPort()))
}
override def tearDown() {

View File

@ -29,7 +29,7 @@ import kafka.admin.AdminUtils
import kafka.common.FailedToSendMessageException
import kafka.consumer.{Consumer, ConsumerConfig, ConsumerTimeoutException}
import kafka.producer.{KeyedMessage, Producer}
import kafka.serializer.{DefaultEncoder, StringEncoder}
import kafka.serializer.StringEncoder
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.utils.Utils
import kafka.utils.TestUtils._
@ -39,20 +39,12 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
val brokerId1 = 0
val brokerId2 = 1
val port1 = choosePort()
val port2 = choosePort()
// controlled shutdown is needed for these tests, but we can trim the retry count and backoff interval to
// reduce test execution time
val enableControlledShutdown = true
val configProps1 = createBrokerConfig(brokerId1, port1)
val configProps2 = createBrokerConfig(brokerId2, port2)
for (configProps <- List(configProps1, configProps2)) {
configProps.put("controlled.shutdown.enable", String.valueOf(enableControlledShutdown))
configProps.put("controlled.shutdown.max.retries", String.valueOf(1))
configProps.put("controlled.shutdown.retry.backoff.ms", String.valueOf(1000))
}
var configProps1: Properties = null
var configProps2: Properties = null
var configs: Seq[KafkaConfig] = Seq.empty[KafkaConfig]
var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
@ -69,6 +61,15 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
override def setUp() {
super.setUp()
configProps1 = createBrokerConfig(brokerId1, zkConnect)
configProps2 = createBrokerConfig(brokerId2, zkConnect)
for (configProps <- List(configProps1, configProps2)) {
configProps.put("controlled.shutdown.enable", String.valueOf(enableControlledShutdown))
configProps.put("controlled.shutdown.max.retries", String.valueOf(1))
configProps.put("controlled.shutdown.retry.backoff.ms", String.valueOf(1000))
}
// temporarily set loggers to a higher level so that tests run quietly
kafkaApisLogger.setLevel(Level.FATAL)
networkProcessorLogger.setLevel(Level.FATAL)
@ -254,7 +255,7 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
private def produceMessage(topic: String, message: String) = {
val producer: Producer[String, Array[Byte]] = createProducer(
getBrokerListStrFromConfigs(configs),
getBrokerListStrFromServers(servers),
keyEncoder = classOf[StringEncoder].getName)
producer.send(new KeyedMessage[String, Array[Byte]](topic, topic, message.getBytes))
producer.close()

View File

@ -39,19 +39,14 @@ import junit.framework.Assert._
class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with ZooKeeperTestHarness with Logging {
val zookeeperConnect = zkConnect
val numNodes = 2
val numParts = 2
val topic = "topic1"
val overridingProps = new Properties()
overridingProps.put(KafkaConfig.ZkConnectProp, zookeeperConnect)
overridingProps.put(KafkaConfig.NumPartitionsProp, numParts.toString)
val configs =
for (props <- TestUtils.createBrokerConfigs(numNodes))
yield KafkaConfig.fromProps(props, overridingProps)
def generateConfigs() = TestUtils.createBrokerConfigs(numNodes, zkConnect).map(KafkaConfig.fromProps(_, overridingProps))
val group = "group1"
val consumer1 = "consumer1"
@ -68,7 +63,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
val sentMessages1 = sendMessages(nMessages, "batch1")
// create a consumer
val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zookeeperConnect, group, consumer1))
val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1))
val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(toJavaMap(Map(topic -> numNodes*numParts/2)), new StringDecoder(), new StringDecoder())
@ -93,7 +88,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
compressed: CompressionCodec): List[String] = {
var messages: List[String] = Nil
val producer: kafka.producer.Producer[Int, String] =
TestUtils.createProducer(TestUtils.getBrokerListStrFromConfigs(configs),
TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers),
encoder = classOf[StringEncoder].getName,
keyEncoder = classOf[IntEncoder].getName)
val javaProducer: Producer[Int, String] = new kafka.javaapi.producer.Producer(producer)

View File

@ -37,7 +37,7 @@ class LogTest extends JUnitSuite {
@Before
def setUp() {
logDir = TestUtils.tempDir()
val props = TestUtils.createBrokerConfig(0, -1)
val props = TestUtils.createBrokerConfig(0, "127.0.0.1:1", port = -1)
config = KafkaConfig.fromProps(props)
}

View File

@ -47,19 +47,16 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with
private val brokerZk = 0
private val ports = TestUtils.choosePorts(2)
private val portZk = ports(0)
@Before
override def setUp() {
super.setUp()
val propsZk = TestUtils.createBrokerConfig(brokerZk, portZk)
val propsZk = TestUtils.createBrokerConfig(brokerZk, zkConnect)
val logDirZkPath = propsZk.getProperty("log.dir")
logDirZk = new File(logDirZkPath)
config = KafkaConfig.fromProps(propsZk)
server = TestUtils.createServer(config)
simpleConsumerZk = new SimpleConsumer("localhost", portZk, 1000000, 64 * 1024, "")
simpleConsumerZk = new SimpleConsumer("localhost", server.boundPort(), 1000000, 64 * 1024, "")
}
@After
@ -94,7 +91,7 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with
props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
props.put("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout")
props.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n")
props.put("log4j.appender.KAFKA.brokerList", TestUtils.getBrokerListStrFromConfigs(Seq(config)))
props.put("log4j.appender.KAFKA.brokerList", TestUtils.getBrokerListStrFromServers(Seq(server)))
props.put("log4j.logger.kafka.log4j", "INFO, KAFKA")
try {
@ -129,7 +126,7 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with
props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
props.put("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout")
props.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n")
props.put("log4j.appender.KAFKA.BrokerList", TestUtils.getBrokerListStrFromConfigs(Seq(config)))
props.put("log4j.appender.KAFKA.BrokerList", TestUtils.getBrokerListStrFromServers(Seq(server)))
props.put("log4j.appender.KAFKA.Topic", "test-topic")
props.put("log4j.appender.KAFKA.RequiredNumAcks", "1")
props.put("log4j.appender.KAFKA.SyncSend", "true")

View File

@ -36,18 +36,15 @@ import scala.util.matching.Regex
import org.scalatest.junit.JUnit3Suite
class MetricsTest extends JUnit3Suite with KafkaServerTestHarness with Logging {
val zookeeperConnect = TestZKUtils.zookeeperConnect
val numNodes = 2
val numParts = 2
val topic = "topic1"
val overridingProps = new Properties()
overridingProps.put(KafkaConfig.ZkConnectProp, zookeeperConnect)
overridingProps.put(KafkaConfig.NumPartitionsProp, numParts.toString)
val configs =
for (props <- TestUtils.createBrokerConfigs(numNodes, enableDeleteTopic = true))
yield KafkaConfig.fromProps(props, overridingProps)
def generateConfigs() =
TestUtils.createBrokerConfigs(numNodes, zkConnect, enableDeleteTopic=true).map(KafkaConfig.fromProps(_, overridingProps))
val nMessages = 2
@ -80,7 +77,7 @@ class MetricsTest extends JUnit3Suite with KafkaServerTestHarness with Logging {
}
def createAndShutdownStep(group: String, consumerId: String, producerId: String): Unit = {
val sentMessages1 = sendMessages(configs, topic, producerId, nMessages, "batch1", NoCompressionCodec, 1)
val sentMessages1 = sendMessages(servers, topic, producerId, nMessages, "batch1", NoCompressionCodec, 1)
// create a consumer
val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumerId))
val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)

View File

@ -36,7 +36,7 @@ class SocketServerTest extends JUnitSuite {
val server: SocketServer = new SocketServer(0,
host = null,
port = kafka.utils.TestUtils.choosePort,
port = 0,
numProcessorThreads = 1,
maxQueuedRequests = 50,
sendBufferSize = 300000,
@ -73,7 +73,7 @@ class SocketServerTest extends JUnitSuite {
channel.sendResponse(new RequestChannel.Response(request.processor, request, send))
}
def connect(s:SocketServer = server) = new Socket("localhost", s.port)
def connect(s:SocketServer = server) = new Socket("localhost", s.boundPort)
@After
def cleanup() {
@ -162,7 +162,7 @@ class SocketServerTest extends JUnitSuite {
val overrides: Map[String, Int] = Map("localhost" -> overrideNum)
val overrideServer: SocketServer = new SocketServer(0,
host = null,
port = kafka.utils.TestUtils.choosePort,
port = 0,
numProcessorThreads = 1,
maxQueuedRequests = 50,
sendBufferSize = 300000,

View File

@ -36,8 +36,10 @@ import scala.collection.mutable.ArrayBuffer
import kafka.utils._
class AsyncProducerTest extends JUnit3Suite {
val props = createBrokerConfigs(1)
val configs = props.map(p => KafkaConfig.fromProps(p))
// One of the few cases we can just set a fixed port because the producer is mocked out here since this uses mocks
val props = Seq(createBrokerConfig(1, "127.0.0.1:1", port=65534))
val configs = props.map(KafkaConfig.fromProps)
val brokerList = configs.map(c => org.apache.kafka.common.utils.Utils.formatAddress(c.hostName, c.port)).mkString(",")
override def setUp() {
super.setUp()
@ -61,7 +63,7 @@ class AsyncProducerTest extends JUnit3Suite {
val props = new Properties()
props.put("serializer.class", "kafka.serializer.StringEncoder")
props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
props.put("metadata.broker.list", brokerList)
props.put("producer.type", "async")
props.put("queue.buffering.max.messages", "10")
props.put("batch.num.messages", "1")
@ -86,7 +88,7 @@ class AsyncProducerTest extends JUnit3Suite {
def testProduceAfterClosed() {
val produceData = getProduceData(10)
val producer = createProducer[String, String](
getBrokerListStrFromConfigs(configs),
brokerList,
encoder = classOf[StringEncoder].getName)
producer.close
@ -162,7 +164,7 @@ class AsyncProducerTest extends JUnit3Suite {
producerDataList.append(new KeyedMessage[Int,Message]("topic2", key = 4, message = new Message("msg5".getBytes)))
val props = new Properties()
props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
props.put("metadata.broker.list", brokerList)
val broker1 = new Broker(0, "localhost", 9092)
val broker2 = new Broker(1, "localhost", 9093)
@ -212,7 +214,7 @@ class AsyncProducerTest extends JUnit3Suite {
def testSerializeEvents() {
val produceData = TestUtils.getMsgStrings(5).map(m => new KeyedMessage[String,String]("topic1",m))
val props = new Properties()
props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
props.put("metadata.broker.list", brokerList)
val config = new ProducerConfig(props)
// form expected partitions metadata
val topic1Metadata = getTopicMetadata("topic1", 0, 0, "localhost", 9092)
@ -244,7 +246,7 @@ class AsyncProducerTest extends JUnit3Suite {
val producerDataList = new ArrayBuffer[KeyedMessage[String,Message]]
producerDataList.append(new KeyedMessage[String,Message]("topic1", "key1", new Message("msg1".getBytes)))
val props = new Properties()
props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
props.put("metadata.broker.list", brokerList)
val config = new ProducerConfig(props)
// form expected partitions metadata
@ -274,7 +276,7 @@ class AsyncProducerTest extends JUnit3Suite {
@Test
def testNoBroker() {
val props = new Properties()
props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
props.put("metadata.broker.list", brokerList)
val config = new ProducerConfig(props)
// create topic metadata with 0 partitions
@ -308,7 +310,7 @@ class AsyncProducerTest extends JUnit3Suite {
// no need to retry since the send will always fail
props.put("message.send.max.retries", "0")
val producer= createProducer[String, String](
brokerList = getBrokerListStrFromConfigs(configs),
brokerList = brokerList,
encoder = classOf[DefaultEncoder].getName,
keyEncoder = classOf[DefaultEncoder].getName,
producerProps = props)
@ -326,7 +328,7 @@ class AsyncProducerTest extends JUnit3Suite {
@Test
def testRandomPartitioner() {
val props = new Properties()
props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
props.put("metadata.broker.list", brokerList)
val config = new ProducerConfig(props)
// create topic metadata with 0 partitions
@ -364,7 +366,7 @@ class AsyncProducerTest extends JUnit3Suite {
@Test
def testFailedSendRetryLogic() {
val props = new Properties()
props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
props.put("metadata.broker.list", brokerList)
props.put("request.required.acks", "1")
props.put("serializer.class", classOf[StringEncoder].getName.toString)
props.put("key.serializer.class", classOf[NullEncoder[Int]].getName.toString)

View File

@ -17,7 +17,6 @@
package kafka.producer
import org.apache.kafka.common.config.ConfigException
import org.scalatest.TestFailedException
import org.scalatest.junit.JUnit3Suite
import kafka.consumer.SimpleConsumer
@ -40,8 +39,6 @@ import kafka.serializer.StringEncoder
class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
private val brokerId1 = 0
private val brokerId2 = 1
private val ports = TestUtils.choosePorts(2)
private val (port1, port2) = (ports(0), ports(1))
private var server1: KafkaServer = null
private var server2: KafkaServer = null
private var consumer1: SimpleConsumer = null
@ -49,26 +46,36 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
private val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
private var servers = List.empty[KafkaServer]
private val props1 = TestUtils.createBrokerConfig(brokerId1, port1, false)
props1.put("num.partitions", "4")
private val config1 = KafkaConfig.fromProps(props1)
private val props2 = TestUtils.createBrokerConfig(brokerId2, port2, false)
props2.put("num.partitions", "4")
private val config2 = KafkaConfig.fromProps(props2)
// Creation of consumers is deferred until they are actually needed. This allows us to kill brokers that use random
// ports and then get a consumer instance that will be pointed at the correct port
def getConsumer1() = {
if (consumer1 == null)
consumer1 = new SimpleConsumer("localhost", server1.boundPort(), 1000000, 64*1024, "")
consumer1
}
def getConsumer2() = {
if (consumer2 == null)
consumer2 = new SimpleConsumer("localhost", server2.boundPort(), 100, 64*1024, "")
consumer2
}
override def setUp() {
super.setUp()
// set up 2 brokers with 4 partitions each
val props1 = TestUtils.createBrokerConfig(brokerId1, zkConnect, false)
props1.put("num.partitions", "4")
val config1 = KafkaConfig.fromProps(props1)
val props2 = TestUtils.createBrokerConfig(brokerId2, zkConnect, false)
props2.put("num.partitions", "4")
val config2 = KafkaConfig.fromProps(props2)
server1 = TestUtils.createServer(config1)
server2 = TestUtils.createServer(config2)
servers = List(server1,server2)
val props = new Properties()
props.put("host", "localhost")
props.put("port", port1.toString)
consumer1 = new SimpleConsumer("localhost", port1, 1000000, 64*1024, "")
consumer2 = new SimpleConsumer("localhost", port2, 100, 64*1024, "")
props.put("port", server1.boundPort().toString)
// temporarily set request handler logger to a higher level
requestHandlerLogger.setLevel(Level.FATAL)
@ -115,7 +122,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
}
val producer2 = TestUtils.createProducer[String, String](
brokerList = "localhost:80," + TestUtils.getBrokerListStrFromConfigs(Seq(config1)),
brokerList = "localhost:80," + TestUtils.getBrokerListStrFromServers(Seq(server1)),
encoder = classOf[StringEncoder].getName,
keyEncoder = classOf[StringEncoder].getName)
@ -128,7 +135,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
}
val producer3 = TestUtils.createProducer[String, String](
brokerList = TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)),
brokerList = TestUtils.getBrokerListStrFromServers(Seq(server1, server2)),
encoder = classOf[StringEncoder].getName,
keyEncoder = classOf[StringEncoder].getName)
@ -151,7 +158,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
TestUtils.createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 2, servers = servers)
val producer1 = TestUtils.createProducer[String, String](
brokerList = TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)),
brokerList = TestUtils.getBrokerListStrFromServers(Seq(server1, server2)),
encoder = classOf[StringEncoder].getName,
keyEncoder = classOf[StringEncoder].getName,
partitioner = classOf[StaticPartitioner].getName,
@ -166,10 +173,10 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
val leader = leaderOpt.get
val messageSet = if(leader == server1.config.brokerId) {
val response1 = consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
val response1 = getConsumer1().fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
response1.messageSet("new-topic", 0).iterator.toBuffer
}else {
val response2 = consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
val response2 = getConsumer2().fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
response2.messageSet("new-topic", 0).iterator.toBuffer
}
assertEquals("Should have fetched 2 messages", 2, messageSet.size)
@ -184,7 +191,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
try {
val producer2 = TestUtils.createProducer[String, String](
brokerList = TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)),
brokerList = TestUtils.getBrokerListStrFromServers(Seq(server1, server2)),
encoder = classOf[StringEncoder].getName,
keyEncoder = classOf[StringEncoder].getName,
partitioner = classOf[StaticPartitioner].getName,
@ -214,7 +221,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
servers = servers)
val producer = TestUtils.createProducer[String, String](
brokerList = TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)),
brokerList = TestUtils.getBrokerListStrFromServers(Seq(server1, server2)),
encoder = classOf[StringEncoder].getName,
keyEncoder = classOf[StringEncoder].getName,
partitioner = classOf[StaticPartitioner].getName,
@ -248,7 +255,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
try {
// cross check if broker 1 got the messages
val response1 = consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
val response1 = getConsumer1().fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
val messageSet1 = response1.messageSet(topic, 0).iterator
assertTrue("Message set should have 1 message", messageSet1.hasNext)
assertEquals(new Message(bytes = "test1".getBytes, key = "test".getBytes), messageSet1.next.message)
@ -268,7 +275,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
props.put("message.send.max.retries", "0")
props.put("client.id","ProducerTest-testAsyncSendCanCorrectlyFailWithTimeout")
val producer = TestUtils.createProducer[String, String](
brokerList = TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)),
brokerList = TestUtils.getBrokerListStrFromServers(Seq(server1, server2)),
encoder = classOf[StringEncoder].getName,
keyEncoder = classOf[StringEncoder].getName,
partitioner = classOf[StaticPartitioner].getName,
@ -283,7 +290,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
// this message should be assigned to partition 0 whose leader is on broker 0
producer.send(new KeyedMessage[String, String](topic, "test", "test"))
// cross check if brokers got the messages
val response1 = consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
val response1 = getConsumer1().fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
val messageSet1 = response1.messageSet("new-topic", 0).iterator
assertTrue("Message set should have 1 message", messageSet1.hasNext)
assertEquals(new Message("test".getBytes), messageSet1.next.message)
@ -315,7 +322,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
@Test
def testSendNullMessage() {
val producer = TestUtils.createProducer[String, String](
brokerList = TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)),
brokerList = TestUtils.getBrokerListStrFromServers(Seq(server1, server2)),
encoder = classOf[StringEncoder].getName,
keyEncoder = classOf[StringEncoder].getName,
partitioner = classOf[StaticPartitioner].getName)

View File

@ -33,13 +33,12 @@ import kafka.common.{TopicAndPartition, ErrorMapping}
class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
private val messageBytes = new Array[Byte](2)
// turning off controlled shutdown since testProducerCanTimeout() explicitly shuts down request handler pool.
val configs = List(KafkaConfig.fromProps(TestUtils.createBrokerConfigs(1, false).head))
val zookeeperConnect = TestZKUtils.zookeeperConnect
def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfigs(1, zkConnect, false).head))
@Test
def testReachableServer() {
val server = servers.head
val props = TestUtils.getSyncProducerConfig(server.socketServer.port)
val props = TestUtils.getSyncProducerConfig(server.socketServer.boundPort)
val producer = new SyncProducer(new SyncProducerConfig(props))
val firstStart = SystemTime.milliseconds
@ -74,7 +73,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
@Test
def testEmptyProduceRequest() {
val server = servers.head
val props = TestUtils.getSyncProducerConfig(server.socketServer.port)
val props = TestUtils.getSyncProducerConfig(server.socketServer.boundPort)
val correlationId = 0
val clientId = SyncProducerConfig.DefaultClientId
@ -91,7 +90,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
@Test
def testMessageSizeTooLarge() {
val server = servers.head
val props = TestUtils.getSyncProducerConfig(server.socketServer.port)
val props = TestUtils.getSyncProducerConfig(server.socketServer.boundPort)
val producer = new SyncProducer(new SyncProducerConfig(props))
TestUtils.createTopic(zkClient, "test", numPartitions = 1, replicationFactor = 1, servers = servers)
@ -119,7 +118,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
def testMessageSizeTooLargeWithAckZero() {
val server = servers.head
val props = TestUtils.getSyncProducerConfig(server.socketServer.port)
val props = TestUtils.getSyncProducerConfig(server.socketServer.boundPort)
props.put("request.required.acks", "0")
val producer = new SyncProducer(new SyncProducerConfig(props))
@ -145,7 +144,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
@Test
def testProduceCorrectlyReceivesResponse() {
val server = servers.head
val props = TestUtils.getSyncProducerConfig(server.socketServer.port)
val props = TestUtils.getSyncProducerConfig(server.socketServer.boundPort)
val producer = new SyncProducer(new SyncProducerConfig(props))
val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message(messageBytes))
@ -191,7 +190,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
val timeoutMs = 500
val server = servers.head
val props = TestUtils.getSyncProducerConfig(server.socketServer.port)
val props = TestUtils.getSyncProducerConfig(server.socketServer.boundPort)
val producer = new SyncProducer(new SyncProducerConfig(props))
val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message(messageBytes))
@ -217,7 +216,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
@Test
def testProduceRequestWithNoResponse() {
val server = servers.head
val props = TestUtils.getSyncProducerConfig(server.socketServer.port)
val props = TestUtils.getSyncProducerConfig(server.socketServer.boundPort)
val correlationId = 0
val clientId = SyncProducerConfig.DefaultClientId
val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs
@ -233,7 +232,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
val topicName = "minisrtest"
val server = servers.head
val props = TestUtils.getSyncProducerConfig(server.socketServer.port)
val props = TestUtils.getSyncProducerConfig(server.socketServer.boundPort)
props.put("request.required.acks", "-1")
val producer = new SyncProducer(new SyncProducerConfig(props))

View File

@ -30,7 +30,7 @@ class AdvertiseBrokerTest extends JUnit3Suite with ZooKeeperTestHarness {
override def setUp() {
super.setUp()
val props = TestUtils.createBrokerConfig(brokerId, TestUtils.choosePort())
val props = TestUtils.createBrokerConfig(brokerId, zkConnect)
props.put("advertised.host.name", advertisedHostName)
props.put("advertised.port", advertisedPort.toString)

View File

@ -26,8 +26,7 @@ import kafka.admin.{AdminOperationException, AdminUtils}
import org.scalatest.junit.JUnit3Suite
class DynamicConfigChangeTest extends JUnit3Suite with KafkaServerTestHarness {
override val configs = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, TestUtils.choosePort)))
def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
@Test
def testConfigChange() {

View File

@ -30,7 +30,7 @@ import java.util.concurrent.atomic.AtomicBoolean
class HighwatermarkPersistenceTest extends JUnit3Suite {
val configs = TestUtils.createBrokerConfigs(2).map(KafkaConfig.fromProps)
val configs = TestUtils.createBrokerConfigs(2, TestUtils.MockZkConnect).map(KafkaConfig.fromProps)
val topic = "foo"
val logManagers = configs map { config =>
TestUtils.createLogManager(

View File

@ -39,7 +39,7 @@ class IsrExpirationTest extends JUnit3Suite {
val overridingProps = new Properties()
overridingProps.put(KafkaConfig.ReplicaLagTimeMaxMsProp, replicaLagTimeMaxMs.toString)
overridingProps.put(KafkaConfig.ReplicaFetchWaitMaxMsProp, replicaFetchWaitMaxMs.toString)
val configs = TestUtils.createBrokerConfigs(2).map(KafkaConfig.fromProps(_, overridingProps))
val configs = TestUtils.createBrokerConfigs(2, TestUtils.MockZkConnect).map(KafkaConfig.fromProps(_, overridingProps))
val topic = "foo"
val time = new MockTime

View File

@ -29,7 +29,7 @@ class KafkaConfigTest extends JUnit3Suite {
@Test
def testLogRetentionTimeHoursProvided() {
val props = TestUtils.createBrokerConfig(0, 8181)
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
props.put("log.retention.hours", "1")
val cfg = KafkaConfig.fromProps(props)
@ -39,7 +39,7 @@ class KafkaConfigTest extends JUnit3Suite {
@Test
def testLogRetentionTimeMinutesProvided() {
val props = TestUtils.createBrokerConfig(0, 8181)
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
props.put("log.retention.minutes", "30")
val cfg = KafkaConfig.fromProps(props)
@ -49,7 +49,7 @@ class KafkaConfigTest extends JUnit3Suite {
@Test
def testLogRetentionTimeMsProvided() {
val props = TestUtils.createBrokerConfig(0, 8181)
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
props.put("log.retention.ms", "1800000")
val cfg = KafkaConfig.fromProps(props)
@ -59,7 +59,7 @@ class KafkaConfigTest extends JUnit3Suite {
@Test
def testLogRetentionTimeNoConfigProvided() {
val props = TestUtils.createBrokerConfig(0, 8181)
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
val cfg = KafkaConfig.fromProps(props)
assertEquals(24 * 7 * 60L * 60L * 1000L, cfg.logRetentionTimeMillis)
@ -68,7 +68,7 @@ class KafkaConfigTest extends JUnit3Suite {
@Test
def testLogRetentionTimeBothMinutesAndHoursProvided() {
val props = TestUtils.createBrokerConfig(0, 8181)
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
props.put("log.retention.minutes", "30")
props.put("log.retention.hours", "1")
@ -79,7 +79,7 @@ class KafkaConfigTest extends JUnit3Suite {
@Test
def testLogRetentionTimeBothMinutesAndMsProvided() {
val props = TestUtils.createBrokerConfig(0, 8181)
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
props.put("log.retention.ms", "1800000")
props.put("log.retention.minutes", "10")
@ -93,7 +93,7 @@ class KafkaConfigTest extends JUnit3Suite {
val port = 9999
val hostName = "fake-host"
val props = TestUtils.createBrokerConfig(0, port)
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = port)
props.put("host.name", hostName)
val serverConfig = KafkaConfig.fromProps(props)
@ -108,7 +108,7 @@ class KafkaConfigTest extends JUnit3Suite {
val advertisedHostName = "routable-host"
val advertisedPort = 1234
val props = TestUtils.createBrokerConfig(0, port)
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = port)
props.put("advertised.host.name", advertisedHostName)
props.put("advertised.port", advertisedPort.toString)
@ -120,7 +120,7 @@ class KafkaConfigTest extends JUnit3Suite {
@Test
def testUncleanLeaderElectionDefault() {
val props = TestUtils.createBrokerConfig(0, 8181)
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
val serverConfig = KafkaConfig.fromProps(props)
assertEquals(serverConfig.uncleanLeaderElectionEnable, true)
@ -128,7 +128,7 @@ class KafkaConfigTest extends JUnit3Suite {
@Test
def testUncleanElectionDisabled() {
val props = TestUtils.createBrokerConfig(0, 8181)
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
props.put("unclean.leader.election.enable", String.valueOf(false))
val serverConfig = KafkaConfig.fromProps(props)
@ -137,7 +137,7 @@ class KafkaConfigTest extends JUnit3Suite {
@Test
def testUncleanElectionEnabled() {
val props = TestUtils.createBrokerConfig(0, 8181)
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
props.put("unclean.leader.election.enable", String.valueOf(true))
val serverConfig = KafkaConfig.fromProps(props)
@ -146,7 +146,7 @@ class KafkaConfigTest extends JUnit3Suite {
@Test
def testUncleanElectionInvalid() {
val props = TestUtils.createBrokerConfig(0, 8181)
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
props.put("unclean.leader.election.enable", "invalid")
intercept[ConfigException] {
@ -156,7 +156,7 @@ class KafkaConfigTest extends JUnit3Suite {
@Test
def testLogRollTimeMsProvided() {
val props = TestUtils.createBrokerConfig(0, 8181)
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
props.put("log.roll.ms", "1800000")
val cfg = KafkaConfig.fromProps(props)
@ -166,7 +166,7 @@ class KafkaConfigTest extends JUnit3Suite {
@Test
def testLogRollTimeBothMsAndHoursProvided() {
val props = TestUtils.createBrokerConfig(0, 8181)
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
props.put("log.roll.ms", "1800000")
props.put("log.roll.hours", "1")
@ -177,7 +177,7 @@ class KafkaConfigTest extends JUnit3Suite {
@Test
def testLogRollTimeNoConfigProvided() {
val props = TestUtils.createBrokerConfig(0, 8181)
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
val cfg = KafkaConfig.fromProps(props)
assertEquals(24 * 7 * 60L * 60L * 1000L, cfg.logRollTimeMillis )
@ -186,7 +186,7 @@ class KafkaConfigTest extends JUnit3Suite {
@Test
def testDefaultCompressionType() {
val props = TestUtils.createBrokerConfig(0, 8181)
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
val serverConfig = KafkaConfig.fromProps(props)
assertEquals(serverConfig.compressionType, "producer")
@ -194,7 +194,7 @@ class KafkaConfigTest extends JUnit3Suite {
@Test
def testValidCompressionType() {
val props = TestUtils.createBrokerConfig(0, 8181)
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
props.put("compression.type", "gzip")
val serverConfig = KafkaConfig.fromProps(props)
@ -203,7 +203,7 @@ class KafkaConfigTest extends JUnit3Suite {
@Test
def testInvalidCompressionType() {
val props = TestUtils.createBrokerConfig(0, 8181)
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
props.put("compression.type", "abc")
intercept[IllegalArgumentException] {
KafkaConfig.fromProps(props)

View File

@ -31,17 +31,16 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
val brokerId1 = 0
val brokerId2 = 1
val port1 = TestUtils.choosePort()
val port2 = TestUtils.choosePort()
val configProps1 = TestUtils.createBrokerConfig(brokerId1, port1, false)
val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2, false)
var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
var staleControllerEpochDetected = false
override def setUp() {
super.setUp()
val configProps1 = TestUtils.createBrokerConfig(brokerId1, zkConnect, enableControlledShutdown = false)
val configProps2 = TestUtils.createBrokerConfig(brokerId2, zkConnect, enableControlledShutdown = false)
// start both servers
val server1 = TestUtils.createServer(KafkaConfig.fromProps(configProps1))
val server2 = TestUtils.createServer(KafkaConfig.fromProps(configProps2))
@ -117,8 +116,8 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
// start another controller
val controllerId = 2
val controllerConfig = KafkaConfig.fromProps(TestUtils.createBrokerConfig(controllerId, TestUtils.choosePort()))
val brokers = servers.map(s => new Broker(s.config.brokerId, "localhost", s.config.port))
val controllerConfig = KafkaConfig.fromProps(TestUtils.createBrokerConfig(controllerId, zkConnect))
val brokers = servers.map(s => new Broker(s.config.brokerId, "localhost", s.boundPort()))
val controllerContext = new ControllerContext(zkClient, 6000)
controllerContext.liveBrokers = brokers.toSet
val controllerChannelManager = new ControllerChannelManager(controllerContext, controllerConfig)

View File

@ -39,19 +39,18 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness {
var topicLogDir: File = null
var server: KafkaServer = null
var logSize: Int = 100
val brokerPort: Int = 9099
var simpleConsumer: SimpleConsumer = null
var time: Time = new MockTime()
@Before
override def setUp() {
super.setUp()
val config: Properties = createBrokerConfig(1, brokerPort)
val config: Properties = createBrokerConfig(1)
val logDirPath = config.getProperty("log.dir")
logDir = new File(logDirPath)
time = new MockTime()
server = TestUtils.createServer(KafkaConfig.fromProps(config), time)
simpleConsumer = new SimpleConsumer("localhost", brokerPort, 1000000, 64*1024, "")
simpleConsumer = new SimpleConsumer("localhost", server.boundPort(), 1000000, 64*1024, "")
}
@After
@ -194,10 +193,10 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness {
assertEquals(Seq(0L), consumerOffsets)
}
private def createBrokerConfig(nodeId: Int, port: Int): Properties = {
private def createBrokerConfig(nodeId: Int): Properties = {
val props = new Properties
props.put("broker.id", nodeId.toString)
props.put("port", port.toString)
props.put("port", TestUtils.RandomPort.toString())
props.put("log.dir", getLogDir.getAbsolutePath)
props.put("log.flush.interval.messages", "1")
props.put("enable.zookeeper", "false")

View File

@ -19,8 +19,7 @@ package kafka.server
import java.util.Properties
import kafka.utils.TestUtils._
import kafka.utils.IntEncoder
import kafka.utils.{Utils, TestUtils}
import kafka.utils.{IntEncoder, Utils, TestUtils}
import kafka.zk.ZooKeeperTestHarness
import kafka.common._
import kafka.producer.{KeyedMessage, Producer}
@ -43,38 +42,48 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
overridingProps.put(KafkaConfig.ReplicaFetchWaitMaxMsProp, replicaFetchWaitMaxMs.toString)
overridingProps.put(KafkaConfig.ReplicaFetchMinBytesProp, replicaFetchMinBytes.toString)
val configs = TestUtils.createBrokerConfigs(2, false).map(KafkaConfig.fromProps(_, overridingProps))
var configs: Seq[KafkaConfig] = null
val topic = "new-topic"
val partitionId = 0
var server1: KafkaServer = null
var server2: KafkaServer = null
val configProps1 = configs.head
val configProps2 = configs.last
def configProps1 = configs.head
def configProps2 = configs.last
val message = "hello"
var producer: Producer[Int, String] = null
var hwFile1: OffsetCheckpoint = new OffsetCheckpoint(new File(configProps1.logDirs(0), ReplicaManager.HighWatermarkFilename))
var hwFile2: OffsetCheckpoint = new OffsetCheckpoint(new File(configProps2.logDirs(0), ReplicaManager.HighWatermarkFilename))
def hwFile1: OffsetCheckpoint = new OffsetCheckpoint(new File(configProps1.logDirs(0), ReplicaManager.HighWatermarkFilename))
def hwFile2: OffsetCheckpoint = new OffsetCheckpoint(new File(configProps2.logDirs(0), ReplicaManager.HighWatermarkFilename))
var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
// Some tests restart the brokers then produce more data. But since test brokers use random ports, we need
// to use a new producer that knows the new ports
def updateProducer() = {
if (producer != null)
producer.close()
producer = TestUtils.createProducer[Int, String](TestUtils.getBrokerListStrFromServers(servers),
encoder = classOf[StringEncoder].getName,
keyEncoder = classOf[IntEncoder].getName)
}
override def setUp() {
super.setUp()
configs = TestUtils.createBrokerConfigs(2, zkConnect, false).map(KafkaConfig.fromProps(_, overridingProps))
// start both servers
server1 = TestUtils.createServer(configProps1)
server2 = TestUtils.createServer(configProps2)
servers ++= List(server1, server2)
servers = List(server1, server2)
// create topic with 1 partition, 2 replicas, one on each broker
createTopic(zkClient, topic, partitionReplicaAssignment = Map(0->Seq(0,1)), servers = servers)
// create the producer
producer = TestUtils.createProducer[Int, String](TestUtils.getBrokerListStrFromConfigs(configs),
encoder = classOf[StringEncoder].getName,
keyEncoder = classOf[IntEncoder].getName)
updateProducer()
}
override def tearDown() {
@ -121,6 +130,8 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
// bring the preferred replica back
server1.startup()
// Update producer with new server settings
updateProducer()
leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId)
assertTrue("Leader must remain on broker 1, in case of zookeeper session expiration it can move to broker 0",
@ -132,6 +143,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
assertEquals(hw, hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L))
server2.startup()
updateProducer()
leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, oldLeaderOpt = leader)
assertTrue("Leader must remain on broker 0, in case of zookeeper session expiration it can move to broker 1",
leader.isDefined && (leader.get == 0 || leader.get == 1))
@ -181,6 +193,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
assertEquals(hw, hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L))
server2.startup()
updateProducer()
// check if leader moves to the other server
leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, oldLeaderOpt = leader)
assertEquals("Leader must move to broker 1", 1, leader.getOrElse(-1))
@ -189,6 +202,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
// bring the preferred replica back
server1.startup()
updateProducer()
assertEquals(hw, hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L))
assertEquals(hw, hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L))

View File

@ -37,7 +37,6 @@ import junit.framework.Assert._
class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
val random: Random = new Random()
val brokerPort: Int = 9099
val group = "test-group"
val retentionCheckInterval: Long = 100L
var logDir: File = null
@ -50,14 +49,14 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
@Before
override def setUp() {
super.setUp()
val config: Properties = createBrokerConfig(1, brokerPort)
val config: Properties = createBrokerConfig(1, zkConnect)
config.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "1")
config.setProperty(KafkaConfig.OffsetsRetentionCheckIntervalMsProp, retentionCheckInterval.toString)
val logDirPath = config.getProperty("log.dir")
logDir = new File(logDirPath)
time = new MockTime()
server = TestUtils.createServer(KafkaConfig.fromProps(config), time)
simpleConsumer = new SimpleConsumer("localhost", brokerPort, 1000000, 64*1024, "test-client")
simpleConsumer = new SimpleConsumer("localhost", server.boundPort(), 1000000, 64*1024, "test-client")
val consumerMetadataRequest = ConsumerMetadataRequest(group)
Stream.continually {
val consumerMetadataResponse = simpleConsumer.send(consumerMetadataRequest)

View File

@ -22,20 +22,19 @@ import kafka.zk.ZooKeeperTestHarness
import kafka.utils.TestUtils._
import kafka.producer.KeyedMessage
import kafka.serializer.StringEncoder
import kafka.utils.TestUtils
import junit.framework.Assert._
import kafka.utils.{TestUtils}
import kafka.common._
class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness {
val props = createBrokerConfigs(2,false)
val configs = props.map(p => KafkaConfig.fromProps(p))
var brokers: Seq[KafkaServer] = null
val topic1 = "foo"
val topic2 = "bar"
override def setUp() {
super.setUp()
brokers = configs.map(config => TestUtils.createServer(config))
brokers = createBrokerConfigs(2, zkConnect, false)
.map(KafkaConfig.fromProps)
.map(config => TestUtils.createServer(config))
}
override def tearDown() {
@ -54,7 +53,7 @@ class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness {
}
// send test messages to leader
val producer = TestUtils.createProducer[String, String](TestUtils.getBrokerListStrFromConfigs(configs),
val producer = TestUtils.createProducer[String, String](TestUtils.getBrokerListStrFromServers(brokers),
encoder = classOf[StringEncoder].getName,
keyEncoder = classOf[StringEncoder].getName)
val messages = testMessageList1.map(m => new KeyedMessage(topic1, m, m)) ++ testMessageList2.map(m => new KeyedMessage(topic2, m, m))

View File

@ -38,7 +38,7 @@ class ReplicaManagerTest extends JUnit3Suite {
@Test
def testHighWaterMarkDirectoryMapping() {
val props = TestUtils.createBrokerConfig(1)
val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
val config = KafkaConfig.fromProps(props)
val zkClient = EasyMock.createMock(classOf[ZkClient])
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
@ -54,7 +54,7 @@ class ReplicaManagerTest extends JUnit3Suite {
@Test
def testHighwaterMarkRelativeDirectoryMapping() {
val props = TestUtils.createBrokerConfig(1)
val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath)
val config = KafkaConfig.fromProps(props)
val zkClient = EasyMock.createMock(classOf[ZkClient])
@ -71,7 +71,7 @@ class ReplicaManagerTest extends JUnit3Suite {
@Test
def testIllegalRequiredAcks() {
val props = TestUtils.createBrokerConfig(1)
val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
val config = KafkaConfig.fromProps(props)
val zkClient = EasyMock.createMock(classOf[ZkClient])
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)

View File

@ -16,6 +16,8 @@
*/
package kafka.server
import java.util.Properties
import kafka.zk.ZooKeeperTestHarness
import kafka.utils.{TestUtils, Utils}
import org.junit.Test
@ -24,12 +26,19 @@ import junit.framework.Assert._
import java.io.File
class ServerGenerateBrokerIdTest extends JUnit3Suite with ZooKeeperTestHarness {
var props1 = TestUtils.createBrokerConfig(-1, TestUtils.choosePort)
var config1 = KafkaConfig.fromProps(props1)
var props2 = TestUtils.createBrokerConfig(0, TestUtils.choosePort)
var config2 = KafkaConfig.fromProps(props2)
var props1: Properties = null
var config1: KafkaConfig = null
var props2: Properties = null
var config2: KafkaConfig = null
val brokerMetaPropsFile = "meta.properties"
override def setUp() {
super.setUp()
props1 = TestUtils.createBrokerConfig(-1, zkConnect)
config1 = KafkaConfig.fromProps(props1)
props2 = TestUtils.createBrokerConfig(0, zkConnect)
config2 = KafkaConfig.fromProps(props2)
}
@Test
def testAutoGenerateBrokerId() {
@ -51,7 +60,7 @@ class ServerGenerateBrokerIdTest extends JUnit3Suite with ZooKeeperTestHarness {
// start the server with broker.id as part of config
val server1 = new KafkaServer(config1)
val server2 = new KafkaServer(config2)
val props3 = TestUtils.createBrokerConfig(-1, TestUtils.choosePort)
val props3 = TestUtils.createBrokerConfig(-1, zkConnect)
val config3 = KafkaConfig.fromProps(props3)
val server3 = new KafkaServer(config3)
server1.startup()

View File

@ -32,20 +32,23 @@ import org.scalatest.junit.JUnit3Suite
import junit.framework.Assert._
class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
val port = TestUtils.choosePort
val props = TestUtils.createBrokerConfig(0, port)
val config = KafkaConfig.fromProps(props)
var config: KafkaConfig = null
val host = "localhost"
val topic = "test"
val sent1 = List("hello", "there")
val sent2 = List("more", "messages")
override def setUp(): Unit = {
super.setUp()
val props = TestUtils.createBrokerConfig(0, zkConnect)
config = KafkaConfig.fromProps(props)
}
@Test
def testCleanShutdown() {
var server = new KafkaServer(config)
server.startup()
var producer = TestUtils.createProducer[Int, String](TestUtils.getBrokerListStrFromConfigs(Seq(config)),
var producer = TestUtils.createProducer[Int, String](TestUtils.getBrokerListStrFromServers(Seq(server)),
encoder = classOf[StringEncoder].getName,
keyEncoder = classOf[IntEncoder].getName)
@ -71,10 +74,10 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
// wait for the broker to receive the update metadata request after startup
TestUtils.waitUntilMetadataIsPropagated(Seq(server), topic, 0)
producer = TestUtils.createProducer[Int, String](TestUtils.getBrokerListStrFromConfigs(Seq(config)),
producer = TestUtils.createProducer[Int, String](TestUtils.getBrokerListStrFromServers(Seq(server)),
encoder = classOf[StringEncoder].getName,
keyEncoder = classOf[IntEncoder].getName)
val consumer = new SimpleConsumer(host, port, 1000000, 64*1024, "")
val consumer = new SimpleConsumer(host, server.boundPort(), 1000000, 64*1024, "")
var fetchedMessage: ByteBufferMessageSet = null
while(fetchedMessage == null || fetchedMessage.validBytes == 0) {
@ -103,7 +106,7 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
@Test
def testCleanShutdownWithDeleteTopicEnabled() {
val newProps = TestUtils.createBrokerConfig(0, port)
val newProps = TestUtils.createBrokerConfig(0, zkConnect)
newProps.setProperty("delete.topic.enable", "true")
val newConfig = KafkaConfig.fromProps(newProps)
val server = new KafkaServer(newConfig)
@ -116,7 +119,7 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
@Test
def testCleanShutdownAfterFailedStartup() {
val newProps = TestUtils.createBrokerConfig(0, port)
val newProps = TestUtils.createBrokerConfig(0, zkConnect)
newProps.setProperty("zookeeper.connect", "fakehostthatwontresolve:65535")
val newConfig = KafkaConfig.fromProps(newProps)
val server = new KafkaServer(newConfig)

View File

@ -30,7 +30,7 @@ class ServerStartupTest extends JUnit3Suite with ZooKeeperTestHarness {
def testBrokerCreatesZKChroot {
val brokerId = 0
val zookeeperChroot = "/kafka-chroot-for-unittest"
val props = TestUtils.createBrokerConfig(brokerId, TestUtils.choosePort())
val props = TestUtils.createBrokerConfig(brokerId, zkConnect)
val zooKeeperConnect = props.get("zookeeper.connect")
props.put("zookeeper.connect", zooKeeperConnect + zookeeperChroot)
val server = TestUtils.createServer(KafkaConfig.fromProps(props))
@ -47,11 +47,11 @@ class ServerStartupTest extends JUnit3Suite with ZooKeeperTestHarness {
// This shouldn't affect the existing broker registration.
val brokerId = 0
val props1 = TestUtils.createBrokerConfig(brokerId)
val props1 = TestUtils.createBrokerConfig(brokerId, zkConnect)
val server1 = TestUtils.createServer(KafkaConfig.fromProps(props1))
val brokerRegistration = ZkUtils.readData(zkClient, ZkUtils.BrokerIdsPath + "/" + brokerId)._1
val props2 = TestUtils.createBrokerConfig(brokerId)
val props2 = TestUtils.createBrokerConfig(brokerId, zkConnect)
try {
TestUtils.createServer(KafkaConfig.fromProps(props2))
fail("Registering a broker with a conflicting id should fail")

View File

@ -43,7 +43,7 @@ class SimpleFetchTest extends JUnit3Suite {
overridingProps.put(KafkaConfig.ReplicaLagTimeMaxMsProp, replicaLagTimeMaxMs.toString)
overridingProps.put(KafkaConfig.ReplicaFetchWaitMaxMsProp, replicaFetchWaitMaxMs.toString)
val configs = TestUtils.createBrokerConfigs(2).map(KafkaConfig.fromProps(_, overridingProps))
val configs = TestUtils.createBrokerConfigs(2, TestUtils.MockZkConnect).map(KafkaConfig.fromProps(_, overridingProps))
// set the replica manager with the partition
val time = new MockTime

View File

@ -53,7 +53,7 @@ class ReplicationUtilsTest extends JUnit3Suite with ZooKeeperTestHarness {
@Test
def testUpdateLeaderAndIsr() {
val configs = TestUtils.createBrokerConfigs(1).map(KafkaConfig.fromProps)
val configs = TestUtils.createBrokerConfigs(1, zkConnect).map(KafkaConfig.fromProps)
val log = EasyMock.createMock(classOf[kafka.log.Log])
EasyMock.expect(log.logEndOffset).andReturn(20).anyTimes()
EasyMock.expect(log)

View File

@ -65,23 +65,13 @@ object TestUtils extends Logging {
val seededRandom = new Random(192348092834L)
val random = new Random()
/**
* Choose a number of random available ports
*/
def choosePorts(count: Int): List[Int] = {
val sockets =
for(i <- 0 until count)
yield new ServerSocket(0)
val socketList = sockets.toList
val ports = socketList.map(_.getLocalPort)
socketList.map(_.close)
ports
}
/* 0 gives a random port; you can then retrieve the assigned port from the Socket object. */
val RandomPort = 0
/**
* Choose an available port
*/
def choosePort(): Int = choosePorts(1).head
/** Port to use for unit tests that mock/don't require a real ZK server. */
val MockZkPort = 1
/** Zookeeper connection string to use for unit tests that mock/don't require a real ZK server. */
val MockZkConnect = "127.0.0.1:" + MockZkPort
/**
* Create a temporary directory
@ -141,28 +131,29 @@ object TestUtils extends Logging {
* Create a test config for the given node id
*/
def createBrokerConfigs(numConfigs: Int,
zkConnect: String,
enableControlledShutdown: Boolean = true,
enableDeleteTopic: Boolean = false): List[Properties] = {
for((port, node) <- choosePorts(numConfigs).zipWithIndex)
yield createBrokerConfig(node, port, enableControlledShutdown, enableDeleteTopic)
enableDeleteTopic: Boolean = false): Seq[Properties] = {
(0 until numConfigs).map(node => createBrokerConfig(node, zkConnect, enableControlledShutdown, enableDeleteTopic))
}
def getBrokerListStrFromConfigs(configs: Seq[KafkaConfig]): String = {
configs.map(c => formatAddress(c.hostName, c.port)).mkString(",")
def getBrokerListStrFromServers(servers: Seq[KafkaServer]): String = {
servers.map(s => formatAddress(s.config.hostName, s.boundPort())).mkString(",")
}
/**
* Create a test config for the given node id
*/
def createBrokerConfig(nodeId: Int, port: Int = choosePort(),
def createBrokerConfig(nodeId: Int, zkConnect: String,
enableControlledShutdown: Boolean = true,
enableDeleteTopic: Boolean = false): Properties = {
enableDeleteTopic: Boolean = false,
port: Int = RandomPort): Properties = {
val props = new Properties
if (nodeId >= 0) props.put("broker.id", nodeId.toString)
props.put("host.name", "localhost")
props.put("port", port.toString)
props.put("log.dir", TestUtils.tempDir().getAbsolutePath)
props.put("zookeeper.connect", TestZKUtils.zookeeperConnect)
props.put("zookeeper.connect", zkConnect)
props.put("replica.socket.timeout.ms", "1500")
props.put("controller.socket.timeout.ms", "1500")
props.put("controlled.shutdown.enable", enableControlledShutdown.toString)
@ -756,7 +747,7 @@ object TestUtils extends Logging {
brokerState = new BrokerState())
}
def sendMessagesToPartition(configs: Seq[KafkaConfig],
def sendMessagesToPartition(servers: Seq[KafkaServer],
topic: String,
partition: Int,
numMessages: Int,
@ -765,7 +756,7 @@ object TestUtils extends Logging {
val props = new Properties()
props.put("compression.codec", compression.codec.toString)
val producer: Producer[Int, String] =
createProducer(TestUtils.getBrokerListStrFromConfigs(configs),
createProducer(TestUtils.getBrokerListStrFromServers(servers),
encoder = classOf[StringEncoder].getName,
keyEncoder = classOf[IntEncoder].getName,
partitioner = classOf[FixedValuePartitioner].getName,
@ -778,7 +769,7 @@ object TestUtils extends Logging {
ms.toList
}
def sendMessages(configs: Seq[KafkaConfig],
def sendMessages(servers: Seq[KafkaServer],
topic: String,
producerId: String,
messagesPerNode: Int,
@ -790,7 +781,7 @@ object TestUtils extends Logging {
props.put("compression.codec", compression.codec.toString)
props.put("client.id", producerId)
val producer: Producer[Int, String] =
createProducer(brokerList = TestUtils.getBrokerListStrFromConfigs(configs),
createProducer(brokerList = TestUtils.getBrokerListStrFromServers(servers),
encoder = classOf[StringEncoder].getName,
keyEncoder = classOf[IntEncoder].getName,
partitioner = classOf[FixedValuePartitioner].getName,
@ -848,10 +839,6 @@ object TestUtils extends Logging {
}
object TestZKUtils {
val zookeeperConnect = "127.0.0.1:" + TestUtils.choosePort()
}
class IntEncoder(props: VerifiableProperties = null) extends Encoder[Int] {
override def toBytes(n: Int) = n.toString.getBytes
}

View File

@ -24,14 +24,16 @@ import java.net.InetSocketAddress
import kafka.utils.Utils
import org.apache.kafka.common.utils.Utils.getPort
class EmbeddedZookeeper(val connectString: String) {
class EmbeddedZookeeper() {
val snapshotDir = TestUtils.tempDir()
val logDir = TestUtils.tempDir()
val tickTime = 500
val zookeeper = new ZooKeeperServer(snapshotDir, logDir, tickTime)
val factory = new NIOServerCnxnFactory()
factory.configure(new InetSocketAddress("127.0.0.1", getPort(connectString)), 0)
private val addr = new InetSocketAddress("127.0.0.1", TestUtils.RandomPort)
factory.configure(addr, 0)
factory.startup(zookeeper)
val port = zookeeper.getClientPort()
def shutdown() {
Utils.swallow(zookeeper.shutdown())

View File

@ -29,7 +29,7 @@ class ZKPathTest extends JUnit3Suite with ZooKeeperTestHarness {
val path: String = "/some_dir"
val zkSessionTimeoutMs = 1000
val zkConnectWithInvalidRoot: String = zkConnect + "/ghost"
def zkConnectWithInvalidRoot: String = zkConnect + "/ghost"
def testCreatePersistentPathThrowsException {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot,

View File

@ -19,19 +19,22 @@ package kafka.zk
import org.scalatest.junit.JUnit3Suite
import org.I0Itec.zkclient.ZkClient
import kafka.utils.{ZKStringSerializer, TestZKUtils, Utils}
import kafka.utils.{ZKStringSerializer, Utils}
trait ZooKeeperTestHarness extends JUnit3Suite {
val zkConnect: String = TestZKUtils.zookeeperConnect
var zkPort: Int = -1
var zookeeper: EmbeddedZookeeper = null
var zkClient: ZkClient = null
val zkConnectionTimeout = 6000
val zkSessionTimeout = 6000
def zkConnect: String = "127.0.0.1:" + zkPort
override def setUp() {
super.setUp
zookeeper = new EmbeddedZookeeper(zkConnect)
zkClient = new ZkClient(zookeeper.connectString, zkSessionTimeout, zkConnectionTimeout, ZKStringSerializer)
zookeeper = new EmbeddedZookeeper()
zkPort = zookeeper.port
zkClient = new ZkClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, ZKStringSerializer)
}
override def tearDown() {