mirror of https://github.com/apache/kafka.git
KAFKA-98 Fix hardcoded ports in unit tests so you can run them while running kafka.
git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/trunk@1157922 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
7976993f9e
commit
6d1b6f2827
|
@ -34,7 +34,7 @@ class BackwardsCompatibilityTest extends JUnit3Suite {
|
||||||
val testConsumer = "consumer"
|
val testConsumer = "consumer"
|
||||||
val kafkaProps = new Properties
|
val kafkaProps = new Properties
|
||||||
val host = "localhost"
|
val host = "localhost"
|
||||||
val port = 9892
|
val port = TestUtils.choosePort
|
||||||
val loader = getClass.getClassLoader
|
val loader = getClass.getClassLoader
|
||||||
val kafkaLogDir = loader.getResource("test-kafka-logs")
|
val kafkaLogDir = loader.getResource("test-kafka-logs")
|
||||||
kafkaProps.put("brokerid", "12")
|
kafkaProps.put("brokerid", "12")
|
||||||
|
|
|
@ -32,7 +32,7 @@ import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet}
|
||||||
*/
|
*/
|
||||||
class LazyInitProducerTest extends JUnit3Suite with ProducerConsumerTestHarness {
|
class LazyInitProducerTest extends JUnit3Suite with ProducerConsumerTestHarness {
|
||||||
|
|
||||||
val port = 9999
|
val port = TestUtils.choosePort
|
||||||
val props = TestUtils.createBrokerConfig(0, port)
|
val props = TestUtils.createBrokerConfig(0, port)
|
||||||
val config = new KafkaConfig(props) {
|
val config = new KafkaConfig(props) {
|
||||||
override val enableZookeeper = false
|
override val enableZookeeper = false
|
||||||
|
|
|
@ -17,7 +17,7 @@ import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet}
|
||||||
|
|
||||||
class LogCorruptionTest extends JUnit3Suite with ProducerConsumerTestHarness with KafkaServerTestHarness with ZooKeeperTestHarness {
|
class LogCorruptionTest extends JUnit3Suite with ProducerConsumerTestHarness with KafkaServerTestHarness with ZooKeeperTestHarness {
|
||||||
val zkConnect = TestZKUtils.zookeeperConnect
|
val zkConnect = TestZKUtils.zookeeperConnect
|
||||||
val port = 9999
|
val port = TestUtils.choosePort
|
||||||
val props = TestUtils.createBrokerConfig(0, port)
|
val props = TestUtils.createBrokerConfig(0, port)
|
||||||
val config = new KafkaConfig(props) {
|
val config = new KafkaConfig(props) {
|
||||||
override val hostName = "localhost"
|
override val hostName = "localhost"
|
||||||
|
|
|
@ -35,7 +35,7 @@ import kafka.message.{DefaultCompressionCodec, NoCompressionCodec, Message, Byte
|
||||||
*/
|
*/
|
||||||
class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with KafkaServerTestHarness {
|
class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with KafkaServerTestHarness {
|
||||||
|
|
||||||
val port = 9999
|
val port = TestUtils.choosePort
|
||||||
val props = TestUtils.createBrokerConfig(0, port)
|
val props = TestUtils.createBrokerConfig(0, port)
|
||||||
val config = new KafkaConfig(props) {
|
val config = new KafkaConfig(props) {
|
||||||
override val enableZookeeper = false
|
override val enableZookeeper = false
|
||||||
|
@ -47,7 +47,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
|
||||||
val topic = "test-topic"
|
val topic = "test-topic"
|
||||||
val props = new Properties()
|
val props = new Properties()
|
||||||
props.put("serializer.class", "kafka.serializer.StringEncoder")
|
props.put("serializer.class", "kafka.serializer.StringEncoder")
|
||||||
props.put("broker.list", "0:localhost:9999")
|
props.put("broker.list", "0:localhost:" + port)
|
||||||
val config = new ProducerConfig(props)
|
val config = new ProducerConfig(props)
|
||||||
|
|
||||||
val stringProducer1 = new Producer[String, String](config)
|
val stringProducer1 = new Producer[String, String](config)
|
||||||
|
@ -67,7 +67,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
|
||||||
val topic = "test-topic"
|
val topic = "test-topic"
|
||||||
val props = new Properties()
|
val props = new Properties()
|
||||||
props.put("serializer.class", "kafka.serializer.StringEncoder")
|
props.put("serializer.class", "kafka.serializer.StringEncoder")
|
||||||
props.put("broker.list", "0:localhost:9999")
|
props.put("broker.list", "0:localhost:" + port)
|
||||||
props.put("compression", "true")
|
props.put("compression", "true")
|
||||||
val config = new ProducerConfig(props)
|
val config = new ProducerConfig(props)
|
||||||
|
|
||||||
|
|
|
@ -40,8 +40,8 @@ class ProducerTest extends JUnitSuite {
|
||||||
private val topic = "test-topic"
|
private val topic = "test-topic"
|
||||||
private val brokerId1 = 0
|
private val brokerId1 = 0
|
||||||
private val brokerId2 = 1
|
private val brokerId2 = 1
|
||||||
private val port1 = 9098
|
private val ports = TestUtils.choosePorts(2)
|
||||||
private val port2 = 9099
|
private val (port1, port2) = (ports(0), ports(1))
|
||||||
private var server1: KafkaServer = null
|
private var server1: KafkaServer = null
|
||||||
private var server2: KafkaServer = null
|
private var server2: KafkaServer = null
|
||||||
private var producer1: SyncProducer = null
|
private var producer1: SyncProducer = null
|
||||||
|
@ -605,7 +605,8 @@ class ProducerTest extends JUnitSuite {
|
||||||
val producerPool = new ProducerPool(config, serializer, syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]())
|
val producerPool = new ProducerPool(config, serializer, syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]())
|
||||||
val producer = new Producer[String, String](config, partitioner, producerPool, false, null)
|
val producer = new Producer[String, String](config, partitioner, producerPool, false, null)
|
||||||
|
|
||||||
val serverProps = TestUtils.createBrokerConfig(2, 9094)
|
val port = TestUtils.choosePort
|
||||||
|
val serverProps = TestUtils.createBrokerConfig(2, port)
|
||||||
val serverConfig = new KafkaConfig(serverProps) {
|
val serverConfig = new KafkaConfig(serverProps) {
|
||||||
override val numPartitions = 4
|
override val numPartitions = 4
|
||||||
}
|
}
|
||||||
|
@ -615,7 +616,7 @@ class ProducerTest extends JUnitSuite {
|
||||||
// send a message to the new broker to register it under topic "test-topic"
|
// send a message to the new broker to register it under topic "test-topic"
|
||||||
val tempProps = new Properties()
|
val tempProps = new Properties()
|
||||||
tempProps.put("host", "localhost")
|
tempProps.put("host", "localhost")
|
||||||
tempProps.put("port", "9094")
|
tempProps.put("port", port.toString)
|
||||||
val tempProducer = new SyncProducer(new SyncProducerConfig(tempProps))
|
val tempProducer = new SyncProducer(new SyncProducerConfig(tempProps))
|
||||||
tempProducer.send("test-topic", new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
|
tempProducer.send("test-topic", new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
|
||||||
messages = new Message("test".getBytes())))
|
messages = new Message("test".getBytes())))
|
||||||
|
|
|
@ -36,7 +36,7 @@ class SyncProducerTest extends JUnitSuite {
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
def setUp() {
|
def setUp() {
|
||||||
server = TestUtils.createServer(new KafkaConfig(TestUtils.createBrokerConfig(0, 9092))
|
server = TestUtils.createServer(new KafkaConfig(TestUtils.createBrokerConfig(0, TestUtils.choosePort))
|
||||||
{
|
{
|
||||||
override val enableZookeeper = false
|
override val enableZookeeper = false
|
||||||
})
|
})
|
||||||
|
@ -51,7 +51,7 @@ class SyncProducerTest extends JUnitSuite {
|
||||||
def testUnreachableServer() {
|
def testUnreachableServer() {
|
||||||
val props = new Properties()
|
val props = new Properties()
|
||||||
props.put("host", "NOT_USED")
|
props.put("host", "NOT_USED")
|
||||||
props.put("port", "9092")
|
props.put("port", server.socketServer.port.toString)
|
||||||
props.put("buffer.size", "102400")
|
props.put("buffer.size", "102400")
|
||||||
props.put("connect.timeout.ms", "300")
|
props.put("connect.timeout.ms", "300")
|
||||||
props.put("reconnect.interval", "1000")
|
props.put("reconnect.interval", "1000")
|
||||||
|
@ -77,11 +77,10 @@ class SyncProducerTest extends JUnitSuite {
|
||||||
producer.send("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)))
|
producer.send("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)))
|
||||||
}catch {
|
}catch {
|
||||||
case e: Exception => failed = true
|
case e: Exception => failed = true
|
||||||
|
|
||||||
}
|
}
|
||||||
val secondEnd = SystemTime.milliseconds
|
val secondEnd = SystemTime.milliseconds
|
||||||
println("Second message send retries took " + (secondEnd-secondStart) + " ms")
|
println("Second message send retries took " + (secondEnd-secondStart) + " ms")
|
||||||
Assert.assertTrue((secondEnd-secondEnd) < 300)
|
Assert.assertTrue((secondEnd-secondStart) < 300)
|
||||||
simpleProducerLogger.setLevel(Level.ERROR)
|
simpleProducerLogger.setLevel(Level.ERROR)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -89,7 +88,7 @@ class SyncProducerTest extends JUnitSuite {
|
||||||
def testReachableServer() {
|
def testReachableServer() {
|
||||||
val props = new Properties()
|
val props = new Properties()
|
||||||
props.put("host", "localhost")
|
props.put("host", "localhost")
|
||||||
props.put("port", "9092")
|
props.put("port", server.socketServer.port.toString)
|
||||||
props.put("buffer.size", "102400")
|
props.put("buffer.size", "102400")
|
||||||
props.put("connect.timeout.ms", "500")
|
props.put("connect.timeout.ms", "500")
|
||||||
props.put("reconnect.interval", "1000")
|
props.put("reconnect.interval", "1000")
|
||||||
|
@ -113,7 +112,7 @@ class SyncProducerTest extends JUnitSuite {
|
||||||
}
|
}
|
||||||
Assert.assertFalse(failed)
|
Assert.assertFalse(failed)
|
||||||
val secondEnd = SystemTime.milliseconds
|
val secondEnd = SystemTime.milliseconds
|
||||||
Assert.assertTrue((secondEnd-secondEnd) < 500)
|
Assert.assertTrue((secondEnd-secondStart) < 500)
|
||||||
|
|
||||||
try {
|
try {
|
||||||
producer.multiSend(Array(new ProducerRequest("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)))))
|
producer.multiSend(Array(new ProducerRequest("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)))))
|
||||||
|
@ -127,7 +126,7 @@ class SyncProducerTest extends JUnitSuite {
|
||||||
def testReachableServerWrongPort() {
|
def testReachableServerWrongPort() {
|
||||||
val props = new Properties()
|
val props = new Properties()
|
||||||
props.put("host", "localhost")
|
props.put("host", "localhost")
|
||||||
props.put("port", "9091")
|
props.put("port", (server.socketServer.port + 1).toString) // the wrong port
|
||||||
props.put("buffer.size", "102400")
|
props.put("buffer.size", "102400")
|
||||||
props.put("connect.timeout.ms", "300")
|
props.put("connect.timeout.ms", "300")
|
||||||
props.put("reconnect.interval", "500")
|
props.put("reconnect.interval", "500")
|
||||||
|
@ -153,7 +152,7 @@ class SyncProducerTest extends JUnitSuite {
|
||||||
}
|
}
|
||||||
Assert.assertTrue(failed)
|
Assert.assertTrue(failed)
|
||||||
val secondEnd = SystemTime.milliseconds
|
val secondEnd = SystemTime.milliseconds
|
||||||
Assert.assertTrue((secondEnd-secondEnd) < 300)
|
Assert.assertTrue((secondEnd-secondStart) < 300)
|
||||||
simpleProducerLogger.setLevel(Level.ERROR)
|
simpleProducerLogger.setLevel(Level.ERROR)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -161,7 +160,7 @@ class SyncProducerTest extends JUnitSuite {
|
||||||
def testMessageSizeTooLarge() {
|
def testMessageSizeTooLarge() {
|
||||||
val props = new Properties()
|
val props = new Properties()
|
||||||
props.put("host", "localhost")
|
props.put("host", "localhost")
|
||||||
props.put("port", "9091")
|
props.put("port", server.socketServer.port.toString)
|
||||||
props.put("buffer.size", "102400")
|
props.put("buffer.size", "102400")
|
||||||
props.put("connect.timeout.ms", "300")
|
props.put("connect.timeout.ms", "300")
|
||||||
props.put("reconnect.interval", "500")
|
props.put("reconnect.interval", "500")
|
||||||
|
|
|
@ -31,7 +31,7 @@ import junit.framework.Assert._
|
||||||
import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet}
|
import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet}
|
||||||
|
|
||||||
class ServerShutdownTest extends JUnitSuite {
|
class ServerShutdownTest extends JUnitSuite {
|
||||||
val port = 9999
|
val port = TestUtils.choosePort
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
def testCleanShutdown() {
|
def testCleanShutdown() {
|
||||||
|
|
Loading…
Reference in New Issue