mirror of https://github.com/apache/kafka.git
Compare commits
4 Commits
f997dad6fe
...
dc8bc1b27c
| Author | SHA1 | Date |
|---|---|---|
|
|
dc8bc1b27c | |
|
|
86dda289c9 | |
|
|
790a19d7e8 | |
|
|
ed9acf45b6 |
|
|
@ -656,6 +656,7 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer,
|
|||
// all processors, block until the last one is able to accept a connection.
|
||||
var retriesLeft = synchronized(processors.length)
|
||||
var processor: Processor = null
|
||||
var accepted: Boolean = false
|
||||
do {
|
||||
retriesLeft -= 1
|
||||
processor = synchronized {
|
||||
|
|
@ -665,7 +666,13 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer,
|
|||
processors(currentProcessorIndex)
|
||||
}
|
||||
currentProcessorIndex += 1
|
||||
} while (!assignNewConnection(socketChannel, processor, retriesLeft == 0))
|
||||
accepted = assignNewConnection(socketChannel, processor, retriesLeft == 0)
|
||||
} while (!accepted && shouldRun.get())
|
||||
if (!accepted) {
|
||||
debug(s"Refused connection from ${socketChannel.socket.getRemoteSocketAddress} on" +
|
||||
s" ${socketChannel.socket.getLocalSocketAddress} because the server is shutting down.")
|
||||
socketChannel.close()
|
||||
}
|
||||
}
|
||||
} else
|
||||
throw new IllegalStateException("Unrecognized key state for acceptor thread.")
|
||||
|
|
@ -725,7 +732,8 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer,
|
|||
}
|
||||
}
|
||||
|
||||
private def assignNewConnection(socketChannel: SocketChannel, processor: Processor, mayBlock: Boolean): Boolean = {
|
||||
// `protected` for test usage
|
||||
protected[kafka] def assignNewConnection(socketChannel: SocketChannel, processor: Processor, mayBlock: Boolean): Boolean = {
|
||||
if (processor.accept(socketChannel, mayBlock, blockedPercentMeter)) {
|
||||
debug(s"Accepted connection from ${socketChannel.socket.getRemoteSocketAddress} on" +
|
||||
s" ${socketChannel.socket.getLocalSocketAddress} and assigned it to processor ${processor.id}," +
|
||||
|
|
@ -1152,9 +1160,11 @@ private[kafka] class Processor(
|
|||
*/
|
||||
def accept(socketChannel: SocketChannel,
|
||||
mayBlock: Boolean,
|
||||
acceptorBlockedPercentMeter: com.yammer.metrics.core.Meter): Boolean = {
|
||||
acceptorBlockedPercentMeter: com.yammer.metrics.core.Meter): Boolean = synchronized {
|
||||
val accepted = {
|
||||
if (newConnections.offer(socketChannel))
|
||||
if (!shouldRun.get())
|
||||
false
|
||||
else if (newConnections.offer(socketChannel))
|
||||
true
|
||||
else if (mayBlock) {
|
||||
val startNs = time.nanoseconds
|
||||
|
|
@ -1261,7 +1271,7 @@ private[kafka] class Processor(
|
|||
*/
|
||||
def wakeup(): Unit = selector.wakeup()
|
||||
|
||||
def beginShutdown(): Unit = {
|
||||
def beginShutdown(): Unit = synchronized {
|
||||
if (shouldRun.getAndSet(false)) {
|
||||
wakeup()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1991,6 +1991,17 @@ class SocketServerTest {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
def testProcessorShutdownCausesSocketChannelLeak(): Unit = {
|
||||
withTestableServer(testWithServer = { testableServer =>
|
||||
testableServer.closeTestableProcessor()
|
||||
connect(testableServer)
|
||||
testableServer.shutdown()
|
||||
val clientChannels = testableServer.clientChannels()
|
||||
assertFalse(clientChannels.exists(_.isOpen))
|
||||
})
|
||||
}
|
||||
|
||||
class TestableAcceptor(socketServer: SocketServer,
|
||||
endPoint: Endpoint,
|
||||
cfg: KafkaConfig,
|
||||
|
|
@ -2025,7 +2036,15 @@ class SocketServerTest {
|
|||
new TestableProcessor(id, time, requestChannel, listenerName, securityProtocol, cfg, connectionQuotas, connectionQueueSize, isPrivilegedListener, socketServer.connectionDisconnectListeners)
|
||||
}
|
||||
|
||||
override protected[kafka] def assignNewConnection(socketChannel: SocketChannel, processor: Processor, mayBlock: Boolean): Boolean = {
|
||||
if (clientChannels.isEmpty || clientChannels(clientChannels.length - 1) != socketChannel) {
|
||||
clientChannels.append(socketChannel)
|
||||
}
|
||||
super.assignNewConnection(socketChannel, processor, mayBlock)
|
||||
}
|
||||
|
||||
def isOpen: Boolean = serverChannel.isOpen
|
||||
val clientChannels: ArrayBuffer[SocketChannel] = ArrayBuffer.empty[SocketChannel]
|
||||
}
|
||||
|
||||
class TestableProcessor(id: Int,
|
||||
|
|
@ -2131,6 +2150,13 @@ class SocketServerTest {
|
|||
assertNull(selector.channel(connectionId), "Channel not removed")
|
||||
assertNull(selector.closingChannel(connectionId), "Closing channel not removed")
|
||||
}
|
||||
|
||||
def closeTestableProcessor(): Unit = {
|
||||
testableProcessor.beginShutdown()
|
||||
testableProcessor.thread.join()
|
||||
}
|
||||
|
||||
def clientChannels(): ArrayBuffer[SocketChannel] = testableAcceptor.clientChannels
|
||||
}
|
||||
|
||||
// a X509TrustManager to trust self-signed certs for unit tests.
|
||||
|
|
|
|||
Loading…
Reference in New Issue