Compare commits

...

4 Commits

Author SHA1 Message Date
Chang-Yu Huang dc8bc1b27c
Merge 86dda289c9 into 28e7803037 2025-10-03 09:35:56 +02:00
Chang-Yu Huang 86dda289c9
Merge branch 'apache:trunk' into KAFKA-16768 2025-08-14 08:07:13 -04:00
ChangYu Huang 790a19d7e8 Fix KAFKA-16768 2025-08-04 20:34:35 +08:00
ChangYu Huang ed9acf45b6 Add a test for channel leak. 2025-08-04 19:54:01 +08:00
2 changed files with 41 additions and 5 deletions

View File

@ -656,6 +656,7 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer,
// all processors, block until the last one is able to accept a connection.
var retriesLeft = synchronized(processors.length)
var processor: Processor = null
var accepted: Boolean = false
do {
retriesLeft -= 1
processor = synchronized {
@ -665,7 +666,13 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer,
processors(currentProcessorIndex)
}
currentProcessorIndex += 1
} while (!assignNewConnection(socketChannel, processor, retriesLeft == 0))
accepted = assignNewConnection(socketChannel, processor, retriesLeft == 0)
} while (!accepted && shouldRun.get())
if (!accepted) {
debug(s"Refused connection from ${socketChannel.socket.getRemoteSocketAddress} on" +
s" ${socketChannel.socket.getLocalSocketAddress} because the server is shutting down.")
socketChannel.close()
}
}
} else
throw new IllegalStateException("Unrecognized key state for acceptor thread.")
@ -725,7 +732,8 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer,
}
}
private def assignNewConnection(socketChannel: SocketChannel, processor: Processor, mayBlock: Boolean): Boolean = {
// `protected` for test usage
protected[kafka] def assignNewConnection(socketChannel: SocketChannel, processor: Processor, mayBlock: Boolean): Boolean = {
if (processor.accept(socketChannel, mayBlock, blockedPercentMeter)) {
debug(s"Accepted connection from ${socketChannel.socket.getRemoteSocketAddress} on" +
s" ${socketChannel.socket.getLocalSocketAddress} and assigned it to processor ${processor.id}," +
@ -1152,9 +1160,11 @@ private[kafka] class Processor(
*/
def accept(socketChannel: SocketChannel,
mayBlock: Boolean,
acceptorBlockedPercentMeter: com.yammer.metrics.core.Meter): Boolean = {
acceptorBlockedPercentMeter: com.yammer.metrics.core.Meter): Boolean = synchronized {
val accepted = {
if (newConnections.offer(socketChannel))
if (!shouldRun.get())
false
else if (newConnections.offer(socketChannel))
true
else if (mayBlock) {
val startNs = time.nanoseconds
@ -1261,7 +1271,7 @@ private[kafka] class Processor(
*/
def wakeup(): Unit = selector.wakeup()
def beginShutdown(): Unit = {
def beginShutdown(): Unit = synchronized {
if (shouldRun.getAndSet(false)) {
wakeup()
}

View File

@ -1991,6 +1991,17 @@ class SocketServerTest {
}
}
@Test
def testProcessorShutdownCausesSocketChannelLeak(): Unit = {
withTestableServer(testWithServer = { testableServer =>
testableServer.closeTestableProcessor()
connect(testableServer)
testableServer.shutdown()
val clientChannels = testableServer.clientChannels()
assertFalse(clientChannels.exists(_.isOpen))
})
}
class TestableAcceptor(socketServer: SocketServer,
endPoint: Endpoint,
cfg: KafkaConfig,
@ -2025,7 +2036,15 @@ class SocketServerTest {
new TestableProcessor(id, time, requestChannel, listenerName, securityProtocol, cfg, connectionQuotas, connectionQueueSize, isPrivilegedListener, socketServer.connectionDisconnectListeners)
}
override protected[kafka] def assignNewConnection(socketChannel: SocketChannel, processor: Processor, mayBlock: Boolean): Boolean = {
if (clientChannels.isEmpty || clientChannels(clientChannels.length - 1) != socketChannel) {
clientChannels.append(socketChannel)
}
super.assignNewConnection(socketChannel, processor, mayBlock)
}
def isOpen: Boolean = serverChannel.isOpen
val clientChannels: ArrayBuffer[SocketChannel] = ArrayBuffer.empty[SocketChannel]
}
class TestableProcessor(id: Int,
@ -2131,6 +2150,13 @@ class SocketServerTest {
assertNull(selector.channel(connectionId), "Channel not removed")
assertNull(selector.closingChannel(connectionId), "Closing channel not removed")
}
def closeTestableProcessor(): Unit = {
testableProcessor.beginShutdown()
testableProcessor.thread.join()
}
def clientChannels(): ArrayBuffer[SocketChannel] = testableAcceptor.clientChannels
}
// a X509TrustManager to trust self-signed certs for unit tests.