mirror of https://github.com/apache/kafka.git
obsessive compulsive tag team: Replace tabs with spaces
patch by jkreps; reviewed by cburroughs for KAFKA-114 git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/trunk@1178670 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
4c2d121b26
commit
a7b4731947
|
@ -75,7 +75,7 @@ object ConsoleConsumer {
|
||||||
.describedAs("prop")
|
.describedAs("prop")
|
||||||
.ofType(classOf[String])
|
.ofType(classOf[String])
|
||||||
val resetBeginningOpt = parser.accepts("from-beginning", "If the consumer does not already have an established offset to consume from, " +
|
val resetBeginningOpt = parser.accepts("from-beginning", "If the consumer does not already have an established offset to consume from, " +
|
||||||
"start with the earliest message present in the log rather than the latest message.")
|
"start with the earliest message present in the log rather than the latest message.")
|
||||||
val autoCommitIntervalOpt = parser.accepts("autocommit.interval.ms", "The time interval at which to save the current offset in ms")
|
val autoCommitIntervalOpt = parser.accepts("autocommit.interval.ms", "The time interval at which to save the current offset in ms")
|
||||||
.withRequiredArg
|
.withRequiredArg
|
||||||
.describedAs("ms")
|
.describedAs("ms")
|
||||||
|
@ -86,7 +86,7 @@ object ConsoleConsumer {
|
||||||
.describedAs("num_messages")
|
.describedAs("num_messages")
|
||||||
.ofType(classOf[java.lang.Integer])
|
.ofType(classOf[java.lang.Integer])
|
||||||
val skipMessageOnErrorOpt = parser.accepts("skip-message-on-error", "If there is an error when processing a message, " +
|
val skipMessageOnErrorOpt = parser.accepts("skip-message-on-error", "If there is an error when processing a message, " +
|
||||||
"skip it instead of halt.")
|
"skip it instead of halt.")
|
||||||
|
|
||||||
val options: OptionSet = tryParse(parser, args)
|
val options: OptionSet = tryParse(parser, args)
|
||||||
checkRequiredArgs(parser, options, topicIdOpt, zkConnectOpt)
|
checkRequiredArgs(parser, options, topicIdOpt, zkConnectOpt)
|
||||||
|
@ -170,9 +170,9 @@ object ConsoleConsumer {
|
||||||
|
|
||||||
def checkRequiredArgs(parser: OptionParser, options: OptionSet, required: OptionSpec[_]*) {
|
def checkRequiredArgs(parser: OptionParser, options: OptionSet, required: OptionSpec[_]*) {
|
||||||
for(arg <- required) {
|
for(arg <- required) {
|
||||||
if(!options.has(arg)) {
|
if(!options.has(arg)) {
|
||||||
logger.error("Missing required argument \"" + arg + "\"")
|
logger.error("Missing required argument \"" + arg + "\"")
|
||||||
parser.printHelpOn(System.err)
|
parser.printHelpOn(System.err)
|
||||||
System.exit(1)
|
System.exit(1)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -66,8 +66,8 @@ class ConsumerIterator[T](private val topic: String,
|
||||||
if(currentDataChunk eq ZookeeperConsumerConnector.shutdownCommand) {
|
if(currentDataChunk eq ZookeeperConsumerConnector.shutdownCommand) {
|
||||||
if(logger.isDebugEnabled)
|
if(logger.isDebugEnabled)
|
||||||
logger.debug("Received the shutdown command")
|
logger.debug("Received the shutdown command")
|
||||||
channel.offer(currentDataChunk)
|
channel.offer(currentDataChunk)
|
||||||
return allDone()
|
return allDone
|
||||||
} else {
|
} else {
|
||||||
currentTopicInfo = currentDataChunk.topicInfo
|
currentTopicInfo = currentDataChunk.topicInfo
|
||||||
if (currentTopicInfo.getConsumeOffset != currentDataChunk.fetchOffset) {
|
if (currentTopicInfo.getConsumeOffset != currentDataChunk.fetchOffset) {
|
||||||
|
|
|
@ -5,7 +5,7 @@
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
* (the "License"); you may not use this file except in compliance with
|
* (the "License"); you may not use this file except in compliance with
|
||||||
* the License. You may obtain a copy of the License at
|
* the License. You may obtain a copy of the License at
|
||||||
*
|
*
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
*
|
*
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
@ -35,17 +35,17 @@ import kafka.api.RequestKeys
|
||||||
* N Processor threads that each have their own selectors and handle all requests from their connections synchronously
|
* N Processor threads that each have their own selectors and handle all requests from their connections synchronously
|
||||||
*/
|
*/
|
||||||
class SocketServer(val port: Int,
|
class SocketServer(val port: Int,
|
||||||
val numProcessorThreads: Int,
|
val numProcessorThreads: Int,
|
||||||
monitoringPeriodSecs: Int,
|
monitoringPeriodSecs: Int,
|
||||||
private val handlerFactory: Handler.HandlerMapping,
|
private val handlerFactory: Handler.HandlerMapping,
|
||||||
val maxRequestSize: Int = Int.MaxValue) {
|
val maxRequestSize: Int = Int.MaxValue) {
|
||||||
|
|
||||||
private val logger = Logger.getLogger(classOf[SocketServer])
|
private val logger = Logger.getLogger(classOf[SocketServer])
|
||||||
private val time = SystemTime
|
private val time = SystemTime
|
||||||
private val processors = new Array[Processor](numProcessorThreads)
|
private val processors = new Array[Processor](numProcessorThreads)
|
||||||
private var acceptor: Acceptor = new Acceptor(port, processors)
|
private var acceptor: Acceptor = new Acceptor(port, processors)
|
||||||
val stats: SocketServerStats = new SocketServerStats(1000L * 1000L * 1000L * monitoringPeriodSecs)
|
val stats: SocketServerStats = new SocketServerStats(1000L * 1000L * 1000L * monitoringPeriodSecs)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Start the socket server
|
* Start the socket server
|
||||||
*/
|
*/
|
||||||
|
@ -57,7 +57,7 @@ class SocketServer(val port: Int,
|
||||||
Utils.newThread("kafka-acceptor", acceptor, false).start()
|
Utils.newThread("kafka-acceptor", acceptor, false).start()
|
||||||
acceptor.awaitStartup
|
acceptor.awaitStartup
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Shutdown the socket server
|
* Shutdown the socket server
|
||||||
*/
|
*/
|
||||||
|
@ -66,20 +66,20 @@ class SocketServer(val port: Int,
|
||||||
for(processor <- processors)
|
for(processor <- processors)
|
||||||
processor.shutdown
|
processor.shutdown
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A base class with some helper variables and methods
|
* A base class with some helper variables and methods
|
||||||
*/
|
*/
|
||||||
private[kafka] abstract class AbstractServerThread extends Runnable {
|
private[kafka] abstract class AbstractServerThread extends Runnable {
|
||||||
|
|
||||||
protected val selector = Selector.open();
|
protected val selector = Selector.open();
|
||||||
protected val logger = Logger.getLogger(getClass())
|
protected val logger = Logger.getLogger(getClass())
|
||||||
private val startupLatch = new CountDownLatch(1)
|
private val startupLatch = new CountDownLatch(1)
|
||||||
private val shutdownLatch = new CountDownLatch(1)
|
private val shutdownLatch = new CountDownLatch(1)
|
||||||
private val alive = new AtomicBoolean(false)
|
private val alive = new AtomicBoolean(false)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Initiates a graceful shutdown by signeling to stop and waiting for the shutdown to complete
|
* Initiates a graceful shutdown by signeling to stop and waiting for the shutdown to complete
|
||||||
*/
|
*/
|
||||||
|
@ -88,17 +88,17 @@ private[kafka] abstract class AbstractServerThread extends Runnable {
|
||||||
selector.wakeup
|
selector.wakeup
|
||||||
shutdownLatch.await
|
shutdownLatch.await
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Wait for the thread to completely start up
|
* Wait for the thread to completely start up
|
||||||
*/
|
*/
|
||||||
def awaitStartup(): Unit = startupLatch.await
|
def awaitStartup(): Unit = startupLatch.await
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Record that the thread startup is complete
|
* Record that the thread startup is complete
|
||||||
*/
|
*/
|
||||||
protected def startupComplete() = {
|
protected def startupComplete() = {
|
||||||
alive.set(true)
|
alive.set(true)
|
||||||
startupLatch.countDown
|
startupLatch.countDown
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -106,52 +106,52 @@ private[kafka] abstract class AbstractServerThread extends Runnable {
|
||||||
* Record that the thread shutdown is complete
|
* Record that the thread shutdown is complete
|
||||||
*/
|
*/
|
||||||
protected def shutdownComplete() = shutdownLatch.countDown
|
protected def shutdownComplete() = shutdownLatch.countDown
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Is the server still running?
|
* Is the server still running?
|
||||||
*/
|
*/
|
||||||
protected def isRunning = alive.get
|
protected def isRunning = alive.get
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Thread that accepts and configures new connections. There is only need for one of these
|
* Thread that accepts and configures new connections. There is only need for one of these
|
||||||
*/
|
*/
|
||||||
private[kafka] class Acceptor(val port: Int, private val processors: Array[Processor]) extends AbstractServerThread {
|
private[kafka] class Acceptor(val port: Int, private val processors: Array[Processor]) extends AbstractServerThread {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Accept loop that checks for new connection attempts
|
* Accept loop that checks for new connection attempts
|
||||||
*/
|
*/
|
||||||
def run() {
|
def run() {
|
||||||
val serverChannel = ServerSocketChannel.open()
|
val serverChannel = ServerSocketChannel.open()
|
||||||
serverChannel.configureBlocking(false)
|
serverChannel.configureBlocking(false)
|
||||||
serverChannel.socket.bind(new InetSocketAddress(port))
|
serverChannel.socket.bind(new InetSocketAddress(port))
|
||||||
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
|
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
|
||||||
logger.info("Awaiting connections on port " + port)
|
logger.info("Awaiting connections on port " + port)
|
||||||
startupComplete()
|
startupComplete()
|
||||||
|
|
||||||
var currentProcessor = 0
|
var currentProcessor = 0
|
||||||
while(isRunning) {
|
while(isRunning) {
|
||||||
val ready = selector.select(500)
|
val ready = selector.select(500)
|
||||||
if(ready > 0) {
|
if(ready > 0) {
|
||||||
val keys = selector.selectedKeys()
|
val keys = selector.selectedKeys()
|
||||||
val iter = keys.iterator()
|
val iter = keys.iterator()
|
||||||
while(iter.hasNext && isRunning) {
|
while(iter.hasNext && isRunning) {
|
||||||
var key: SelectionKey = null
|
var key: SelectionKey = null
|
||||||
try {
|
try {
|
||||||
key = iter.next
|
key = iter.next
|
||||||
iter.remove()
|
iter.remove()
|
||||||
|
|
||||||
if(key.isAcceptable)
|
if(key.isAcceptable)
|
||||||
accept(key, processors(currentProcessor))
|
accept(key, processors(currentProcessor))
|
||||||
else
|
else
|
||||||
throw new IllegalStateException("Unrecognized key state for acceptor thread.")
|
throw new IllegalStateException("Unrecognized key state for acceptor thread.")
|
||||||
|
|
||||||
// round robin to the next processor thread
|
// round robin to the next processor thread
|
||||||
currentProcessor = (currentProcessor + 1) % processors.length
|
currentProcessor = (currentProcessor + 1) % processors.length
|
||||||
} catch {
|
} catch {
|
||||||
case e: Throwable => logger.error("Error in acceptor", e)
|
case e: Throwable => logger.error("Error in acceptor", e)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -160,7 +160,7 @@ private[kafka] class Acceptor(val port: Int, private val processors: Array[Proce
|
||||||
Utils.swallow(logger.error, selector.close())
|
Utils.swallow(logger.error, selector.close())
|
||||||
shutdownComplete()
|
shutdownComplete()
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Accept a new connection
|
* Accept a new connection
|
||||||
*/
|
*/
|
||||||
|
@ -169,10 +169,10 @@ private[kafka] class Acceptor(val port: Int, private val processors: Array[Proce
|
||||||
if(logger.isDebugEnabled)
|
if(logger.isDebugEnabled)
|
||||||
logger.info("Accepted connection from " + socketChannel.socket.getInetAddress() + " on " + socketChannel.socket.getLocalSocketAddress)
|
logger.info("Accepted connection from " + socketChannel.socket.getInetAddress() + " on " + socketChannel.socket.getLocalSocketAddress)
|
||||||
socketChannel.configureBlocking(false)
|
socketChannel.configureBlocking(false)
|
||||||
socketChannel.socket().setTcpNoDelay(true)
|
socketChannel.socket().setTcpNoDelay(true)
|
||||||
processor.accept(socketChannel)
|
processor.accept(socketChannel)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -180,10 +180,10 @@ private[kafka] class Acceptor(val port: Int, private val processors: Array[Proce
|
||||||
* each of which has its own selectors
|
* each of which has its own selectors
|
||||||
*/
|
*/
|
||||||
private[kafka] class Processor(val handlerMapping: Handler.HandlerMapping,
|
private[kafka] class Processor(val handlerMapping: Handler.HandlerMapping,
|
||||||
val time: Time,
|
val time: Time,
|
||||||
val stats: SocketServerStats,
|
val stats: SocketServerStats,
|
||||||
val maxRequestSize: Int) extends AbstractServerThread {
|
val maxRequestSize: Int) extends AbstractServerThread {
|
||||||
|
|
||||||
private val newConnections = new ConcurrentLinkedQueue[SocketChannel]();
|
private val newConnections = new ConcurrentLinkedQueue[SocketChannel]();
|
||||||
private val requestLogger = Logger.getLogger("kafka.request.logger")
|
private val requestLogger = Logger.getLogger("kafka.request.logger")
|
||||||
|
|
||||||
|
@ -192,34 +192,34 @@ private[kafka] class Processor(val handlerMapping: Handler.HandlerMapping,
|
||||||
while(isRunning) {
|
while(isRunning) {
|
||||||
// setup any new connections that have been queued up
|
// setup any new connections that have been queued up
|
||||||
configureNewConnections()
|
configureNewConnections()
|
||||||
|
|
||||||
val ready = selector.select(500)
|
val ready = selector.select(500)
|
||||||
if(ready > 0) {
|
if(ready > 0) {
|
||||||
val keys = selector.selectedKeys()
|
val keys = selector.selectedKeys()
|
||||||
val iter = keys.iterator()
|
val iter = keys.iterator()
|
||||||
while(iter.hasNext && isRunning) {
|
while(iter.hasNext && isRunning) {
|
||||||
var key: SelectionKey = null
|
var key: SelectionKey = null
|
||||||
try {
|
try {
|
||||||
key = iter.next
|
key = iter.next
|
||||||
iter.remove()
|
iter.remove()
|
||||||
|
|
||||||
if(key.isReadable)
|
if(key.isReadable)
|
||||||
read(key)
|
read(key)
|
||||||
else if(key.isWritable)
|
else if(key.isWritable)
|
||||||
write(key)
|
write(key)
|
||||||
else if(!key.isValid)
|
else if(!key.isValid)
|
||||||
close(key)
|
close(key)
|
||||||
else
|
else
|
||||||
throw new IllegalStateException("Unrecognized key state for processor thread.")
|
throw new IllegalStateException("Unrecognized key state for processor thread.")
|
||||||
} catch {
|
} catch {
|
||||||
case e: EOFException => {
|
case e: EOFException => {
|
||||||
logger.info("Closing socket connection to %s.".format(channelFor(key).socket.getInetAddress))
|
logger.info("Closing socket connection to %s.".format(channelFor(key).socket.getInetAddress))
|
||||||
close(key)
|
close(key)
|
||||||
}
|
}
|
||||||
case e: InvalidRequestException => {
|
case e: InvalidRequestException => {
|
||||||
logger.info("Closing socket connection to %s due to invalid request: %s".format(channelFor(key).socket.getInetAddress, e.getMessage))
|
logger.info("Closing socket connection to %s due to invalid request: %s".format(channelFor(key).socket.getInetAddress, e.getMessage))
|
||||||
close(key)
|
close(key)
|
||||||
} case e: Throwable => {
|
} case e: Throwable => {
|
||||||
logger.error("Closing socket for " + channelFor(key).socket.getInetAddress + " because of error", e)
|
logger.error("Closing socket for " + channelFor(key).socket.getInetAddress + " because of error", e)
|
||||||
close(key)
|
close(key)
|
||||||
}
|
}
|
||||||
|
@ -231,7 +231,7 @@ private[kafka] class Processor(val handlerMapping: Handler.HandlerMapping,
|
||||||
Utils.swallow(logger.info, selector.close())
|
Utils.swallow(logger.info, selector.close())
|
||||||
shutdownComplete()
|
shutdownComplete()
|
||||||
}
|
}
|
||||||
|
|
||||||
private def close(key: SelectionKey) {
|
private def close(key: SelectionKey) {
|
||||||
val channel = key.channel.asInstanceOf[SocketChannel]
|
val channel = key.channel.asInstanceOf[SocketChannel]
|
||||||
if(logger.isDebugEnabled)
|
if(logger.isDebugEnabled)
|
||||||
|
@ -241,7 +241,7 @@ private[kafka] class Processor(val handlerMapping: Handler.HandlerMapping,
|
||||||
key.attach(null)
|
key.attach(null)
|
||||||
Utils.swallow(logger.info, key.cancel())
|
Utils.swallow(logger.info, key.cancel())
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Queue up a new connection for reading
|
* Queue up a new connection for reading
|
||||||
*/
|
*/
|
||||||
|
@ -249,7 +249,7 @@ private[kafka] class Processor(val handlerMapping: Handler.HandlerMapping,
|
||||||
newConnections.add(socketChannel)
|
newConnections.add(socketChannel)
|
||||||
selector.wakeup()
|
selector.wakeup()
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Register any new connections that have been queued up
|
* Register any new connections that have been queued up
|
||||||
*/
|
*/
|
||||||
|
@ -261,7 +261,7 @@ private[kafka] class Processor(val handlerMapping: Handler.HandlerMapping,
|
||||||
channel.register(selector, SelectionKey.OP_READ)
|
channel.register(selector, SelectionKey.OP_READ)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Handle a completed request producing an optional response
|
* Handle a completed request producing an optional response
|
||||||
*/
|
*/
|
||||||
|
@ -290,7 +290,7 @@ private[kafka] class Processor(val handlerMapping: Handler.HandlerMapping,
|
||||||
stats.recordRequest(requestTypeId, time.nanoseconds - start)
|
stats.recordRequest(requestTypeId, time.nanoseconds - start)
|
||||||
maybeSend
|
maybeSend
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Process reads from ready sockets
|
* Process reads from ready sockets
|
||||||
*/
|
*/
|
||||||
|
@ -322,7 +322,7 @@ private[kafka] class Processor(val handlerMapping: Handler.HandlerMapping,
|
||||||
selector.wakeup()
|
selector.wakeup()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Process writes to ready sockets
|
* Process writes to ready sockets
|
||||||
*/
|
*/
|
||||||
|
@ -341,7 +341,7 @@ private[kafka] class Processor(val handlerMapping: Handler.HandlerMapping,
|
||||||
selector.wakeup()
|
selector.wakeup()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private def channelFor(key: SelectionKey) = key.channel().asInstanceOf[SocketChannel]
|
private def channelFor(key: SelectionKey) = key.channel().asInstanceOf[SocketChannel]
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -91,7 +91,7 @@ abstract class MultiSend[S <: Send](val sends: List[S]) extends Send {
|
||||||
var totalWritten = 0
|
var totalWritten = 0
|
||||||
|
|
||||||
def writeTo(channel: WritableByteChannel): Int = {
|
def writeTo(channel: WritableByteChannel): Int = {
|
||||||
expectIncomplete
|
expectIncomplete
|
||||||
val written = current.head.writeTo(channel)
|
val written = current.head.writeTo(channel)
|
||||||
totalWritten += written
|
totalWritten += written
|
||||||
if(current.head.complete)
|
if(current.head.complete)
|
||||||
|
|
|
@ -25,7 +25,7 @@ object FAILED extends State
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Transliteration of the iterator template in google collections. To implement an iterator
|
* Transliteration of the iterator template in google collections. To implement an iterator
|
||||||
* override makeNext and call allDone() when there is no more items
|
* override makeNext and call allDone() when there is no more items
|
||||||
*/
|
*/
|
||||||
abstract class IteratorTemplate[T] extends Iterator[T] with java.util.Iterator[T] {
|
abstract class IteratorTemplate[T] extends Iterator[T] with java.util.Iterator[T] {
|
||||||
|
|
||||||
|
|
|
@ -104,7 +104,7 @@ class LazyInitProducerTest extends JUnit3Suite with ProducerConsumerTestHarness
|
||||||
Thread.sleep(200)
|
Thread.sleep(200)
|
||||||
val response = consumer.multifetch(fetches: _*)
|
val response = consumer.multifetch(fetches: _*)
|
||||||
for((topic, resp) <- topics.zip(response.toList))
|
for((topic, resp) <- topics.zip(response.toList))
|
||||||
TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
|
TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
|
@ -117,7 +117,7 @@ class LazyInitProducerTest extends JUnit3Suite with ProducerConsumerTestHarness
|
||||||
try {
|
try {
|
||||||
val responses = consumer.multifetch(fetches: _*)
|
val responses = consumer.multifetch(fetches: _*)
|
||||||
for(resp <- responses)
|
for(resp <- responses)
|
||||||
resp.iterator
|
resp.iterator
|
||||||
}
|
}
|
||||||
catch {
|
catch {
|
||||||
case e: OffsetOutOfRangeException => exceptionThrown = true
|
case e: OffsetOutOfRangeException => exceptionThrown = true
|
||||||
|
@ -149,7 +149,7 @@ class LazyInitProducerTest extends JUnit3Suite with ProducerConsumerTestHarness
|
||||||
Thread.sleep(200)
|
Thread.sleep(200)
|
||||||
val response = consumer.multifetch(fetches: _*)
|
val response = consumer.multifetch(fetches: _*)
|
||||||
for((topic, resp) <- topics.zip(response.toList))
|
for((topic, resp) <- topics.zip(response.toList))
|
||||||
TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
|
TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
|
||||||
}
|
}
|
||||||
|
|
||||||
def testMultiProduceResend() {
|
def testMultiProduceResend() {
|
||||||
|
@ -180,6 +180,6 @@ class LazyInitProducerTest extends JUnit3Suite with ProducerConsumerTestHarness
|
||||||
TestUtils.checkEquals(TestUtils.stackedIterator(messages(topic).map(m => m.message).iterator,
|
TestUtils.checkEquals(TestUtils.stackedIterator(messages(topic).map(m => m.message).iterator,
|
||||||
messages(topic).map(m => m.message).iterator),
|
messages(topic).map(m => m.message).iterator),
|
||||||
resp.map(m => m.message).iterator)
|
resp.map(m => m.message).iterator)
|
||||||
// TestUtils.checkEquals(TestUtils.stackedIterator(messages(topic).iterator, messages(topic).iterator), resp.iterator)
|
// TestUtils.checkEquals(TestUtils.stackedIterator(messages(topic).iterator, messages(topic).iterator), resp.iterator)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -103,7 +103,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
|
||||||
Thread.sleep(700)
|
Thread.sleep(700)
|
||||||
val response = consumer.multifetch(fetches: _*)
|
val response = consumer.multifetch(fetches: _*)
|
||||||
for((topic, resp) <- topics.zip(response.toList))
|
for((topic, resp) <- topics.zip(response.toList))
|
||||||
TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
|
TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
|
||||||
}
|
}
|
||||||
|
|
||||||
// temporarily set request handler logger to a higher level
|
// temporarily set request handler logger to a higher level
|
||||||
|
@ -118,7 +118,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
|
||||||
try {
|
try {
|
||||||
val responses = consumer.multifetch(fetches: _*)
|
val responses = consumer.multifetch(fetches: _*)
|
||||||
for(resp <- responses)
|
for(resp <- responses)
|
||||||
resp.iterator
|
resp.iterator
|
||||||
fail("expect exception")
|
fail("expect exception")
|
||||||
}
|
}
|
||||||
catch {
|
catch {
|
||||||
|
@ -135,7 +135,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
|
||||||
try {
|
try {
|
||||||
val responses = consumer.multifetch(fetches: _*)
|
val responses = consumer.multifetch(fetches: _*)
|
||||||
for(resp <- responses)
|
for(resp <- responses)
|
||||||
resp.iterator
|
resp.iterator
|
||||||
fail("expect exception")
|
fail("expect exception")
|
||||||
}
|
}
|
||||||
catch {
|
catch {
|
||||||
|
@ -166,7 +166,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
|
||||||
Thread.sleep(200)
|
Thread.sleep(200)
|
||||||
val response = consumer.multifetch(fetches: _*)
|
val response = consumer.multifetch(fetches: _*)
|
||||||
for((topic, resp) <- topics.zip(response.toList))
|
for((topic, resp) <- topics.zip(response.toList))
|
||||||
TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
|
TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
|
||||||
}
|
}
|
||||||
|
|
||||||
// temporarily set request handler logger to a higher level
|
// temporarily set request handler logger to a higher level
|
||||||
|
@ -181,7 +181,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
|
||||||
try {
|
try {
|
||||||
val responses = consumer.multifetch(fetches: _*)
|
val responses = consumer.multifetch(fetches: _*)
|
||||||
for(resp <- responses)
|
for(resp <- responses)
|
||||||
resp.iterator
|
resp.iterator
|
||||||
fail("expect exception")
|
fail("expect exception")
|
||||||
}
|
}
|
||||||
catch {
|
catch {
|
||||||
|
@ -198,7 +198,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
|
||||||
try {
|
try {
|
||||||
val responses = consumer.multifetch(fetches: _*)
|
val responses = consumer.multifetch(fetches: _*)
|
||||||
for(resp <- responses)
|
for(resp <- responses)
|
||||||
resp.iterator
|
resp.iterator
|
||||||
fail("expect exception")
|
fail("expect exception")
|
||||||
}
|
}
|
||||||
catch {
|
catch {
|
||||||
|
@ -232,7 +232,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
|
||||||
Thread.sleep(200)
|
Thread.sleep(200)
|
||||||
val response = consumer.multifetch(fetches: _*)
|
val response = consumer.multifetch(fetches: _*)
|
||||||
for((topic, resp) <- topics.zip(response.toList))
|
for((topic, resp) <- topics.zip(response.toList))
|
||||||
TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
|
TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
|
||||||
}
|
}
|
||||||
|
|
||||||
def testMultiProduceWithCompression() {
|
def testMultiProduceWithCompression() {
|
||||||
|
@ -257,6 +257,6 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
|
||||||
Thread.sleep(200)
|
Thread.sleep(200)
|
||||||
val response = consumer.multifetch(fetches: _*)
|
val response = consumer.multifetch(fetches: _*)
|
||||||
for((topic, resp) <- topics.zip(response.toList))
|
for((topic, resp) <- topics.zip(response.toList))
|
||||||
TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
|
TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -152,7 +152,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
|
||||||
for(topic <- topics) {
|
for(topic <- topics) {
|
||||||
if (iter.hasNext) {
|
if (iter.hasNext) {
|
||||||
val resp = iter.next
|
val resp = iter.next
|
||||||
TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
|
TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
fail("fewer responses than expected")
|
fail("fewer responses than expected")
|
||||||
|
@ -172,7 +172,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
|
||||||
val responses = consumer.multifetch(getFetchRequestList(fetches: _*))
|
val responses = consumer.multifetch(getFetchRequestList(fetches: _*))
|
||||||
val iter = responses.iterator
|
val iter = responses.iterator
|
||||||
while (iter.hasNext)
|
while (iter.hasNext)
|
||||||
iter.next.iterator
|
iter.next.iterator
|
||||||
fail("expect exception")
|
fail("expect exception")
|
||||||
}
|
}
|
||||||
catch {
|
catch {
|
||||||
|
@ -190,7 +190,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
|
||||||
val responses = consumer.multifetch(getFetchRequestList(fetches: _*))
|
val responses = consumer.multifetch(getFetchRequestList(fetches: _*))
|
||||||
val iter = responses.iterator
|
val iter = responses.iterator
|
||||||
while (iter.hasNext)
|
while (iter.hasNext)
|
||||||
iter.next.iterator
|
iter.next.iterator
|
||||||
fail("expect exception")
|
fail("expect exception")
|
||||||
}
|
}
|
||||||
catch {
|
catch {
|
||||||
|
@ -225,7 +225,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
|
||||||
for(topic <- topics) {
|
for(topic <- topics) {
|
||||||
if (iter.hasNext) {
|
if (iter.hasNext) {
|
||||||
val resp = iter.next
|
val resp = iter.next
|
||||||
TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
|
TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
fail("fewer responses than expected")
|
fail("fewer responses than expected")
|
||||||
|
@ -245,7 +245,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
|
||||||
val responses = consumer.multifetch(getFetchRequestList(fetches: _*))
|
val responses = consumer.multifetch(getFetchRequestList(fetches: _*))
|
||||||
val iter = responses.iterator
|
val iter = responses.iterator
|
||||||
while (iter.hasNext)
|
while (iter.hasNext)
|
||||||
iter.next.iterator
|
iter.next.iterator
|
||||||
fail("expect exception")
|
fail("expect exception")
|
||||||
}
|
}
|
||||||
catch {
|
catch {
|
||||||
|
@ -263,7 +263,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
|
||||||
val responses = consumer.multifetch(getFetchRequestList(fetches: _*))
|
val responses = consumer.multifetch(getFetchRequestList(fetches: _*))
|
||||||
val iter = responses.iterator
|
val iter = responses.iterator
|
||||||
while (iter.hasNext)
|
while (iter.hasNext)
|
||||||
iter.next.iterator
|
iter.next.iterator
|
||||||
fail("expect exception")
|
fail("expect exception")
|
||||||
}
|
}
|
||||||
catch {
|
catch {
|
||||||
|
@ -298,7 +298,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
|
||||||
for(topic <- topics) {
|
for(topic <- topics) {
|
||||||
if (iter.hasNext) {
|
if (iter.hasNext) {
|
||||||
val resp = iter.next
|
val resp = iter.next
|
||||||
TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
|
TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
fail("fewer responses than expected")
|
fail("fewer responses than expected")
|
||||||
|
@ -329,7 +329,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
|
||||||
for(topic <- topics) {
|
for(topic <- topics) {
|
||||||
if (iter.hasNext) {
|
if (iter.hasNext) {
|
||||||
val resp = iter.next
|
val resp = iter.next
|
||||||
TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
|
TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
fail("fewer responses than expected")
|
fail("fewer responses than expected")
|
||||||
|
|
|
@ -43,7 +43,7 @@ class LogTest extends JUnitSuite {
|
||||||
|
|
||||||
def createEmptyLogs(dir: File, offsets: Int*) = {
|
def createEmptyLogs(dir: File, offsets: Int*) = {
|
||||||
for(offset <- offsets)
|
for(offset <- offsets)
|
||||||
new File(dir, Integer.toString(offset) + Log.FileSuffix).createNewFile()
|
new File(dir, Integer.toString(offset) + Log.FileSuffix).createNewFile()
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -46,7 +46,7 @@ class FileMessageSetTest extends BaseMessageSetTestCases {
|
||||||
@Test
|
@Test
|
||||||
def testIterationOverPartialAndTruncation() {
|
def testIterationOverPartialAndTruncation() {
|
||||||
testPartialWrite(0, messageSet)
|
testPartialWrite(0, messageSet)
|
||||||
testPartialWrite(2, messageSet)
|
testPartialWrite(2, messageSet)
|
||||||
testPartialWrite(4, messageSet)
|
testPartialWrite(4, messageSet)
|
||||||
testPartialWrite(5, messageSet)
|
testPartialWrite(5, messageSet)
|
||||||
testPartialWrite(6, messageSet)
|
testPartialWrite(6, messageSet)
|
||||||
|
@ -56,7 +56,7 @@ class FileMessageSetTest extends BaseMessageSetTestCases {
|
||||||
val buffer = ByteBuffer.allocate(size)
|
val buffer = ByteBuffer.allocate(size)
|
||||||
val originalPosition = messageSet.channel.position
|
val originalPosition = messageSet.channel.position
|
||||||
for(i <- 0 until size)
|
for(i <- 0 until size)
|
||||||
buffer.put(0.asInstanceOf[Byte])
|
buffer.put(0.asInstanceOf[Byte])
|
||||||
buffer.rewind()
|
buffer.rewind()
|
||||||
messageSet.channel.write(buffer)
|
messageSet.channel.write(buffer)
|
||||||
// appending those bytes should not change the contents
|
// appending those bytes should not change the contents
|
||||||
|
|
Loading…
Reference in New Issue