Add a test for channel leak.

This commit is contained in:
ChangYu Huang 2025-08-04 00:26:01 +08:00
parent 904ee87b85
commit ed9acf45b6
2 changed files with 28 additions and 1 deletions

View File

@ -725,7 +725,8 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer,
} }
} }
private def assignNewConnection(socketChannel: SocketChannel, processor: Processor, mayBlock: Boolean): Boolean = { // `protected` for test usage
protected[kafka] def assignNewConnection(socketChannel: SocketChannel, processor: Processor, mayBlock: Boolean): Boolean = {
if (processor.accept(socketChannel, mayBlock, blockedPercentMeter)) { if (processor.accept(socketChannel, mayBlock, blockedPercentMeter)) {
debug(s"Accepted connection from ${socketChannel.socket.getRemoteSocketAddress} on" + debug(s"Accepted connection from ${socketChannel.socket.getRemoteSocketAddress} on" +
s" ${socketChannel.socket.getLocalSocketAddress} and assigned it to processor ${processor.id}," + s" ${socketChannel.socket.getLocalSocketAddress} and assigned it to processor ${processor.id}," +

View File

@ -1991,6 +1991,17 @@ class SocketServerTest {
} }
} }
@Test
def testProcessorShutdownCausesSocketChannelLeak(): Unit = {
withTestableServer(testWithServer = { testableServer =>
testableServer.closeTestableProcessor()
connect(testableServer)
testableServer.shutdown()
val clientChannels = testableServer.clientChannels()
assertFalse(clientChannels.exists(_.isOpen))
})
}
class TestableAcceptor(socketServer: SocketServer, class TestableAcceptor(socketServer: SocketServer,
endPoint: Endpoint, endPoint: Endpoint,
cfg: KafkaConfig, cfg: KafkaConfig,
@ -2025,7 +2036,15 @@ class SocketServerTest {
new TestableProcessor(id, time, requestChannel, listenerName, securityProtocol, cfg, connectionQuotas, connectionQueueSize, isPrivilegedListener, socketServer.connectionDisconnectListeners) new TestableProcessor(id, time, requestChannel, listenerName, securityProtocol, cfg, connectionQuotas, connectionQueueSize, isPrivilegedListener, socketServer.connectionDisconnectListeners)
} }
override protected[kafka] def assignNewConnection(socketChannel: SocketChannel, processor: Processor, mayBlock: Boolean): Boolean = {
if (clientChannels.isEmpty || clientChannels(clientChannels.length - 1) != socketChannel) {
clientChannels.append(socketChannel)
}
super.assignNewConnection(socketChannel, processor, mayBlock)
}
def isOpen: Boolean = serverChannel.isOpen def isOpen: Boolean = serverChannel.isOpen
val clientChannels: ArrayBuffer[SocketChannel] = ArrayBuffer.empty[SocketChannel]
} }
class TestableProcessor(id: Int, class TestableProcessor(id: Int,
@ -2131,6 +2150,13 @@ class SocketServerTest {
assertNull(selector.channel(connectionId), "Channel not removed") assertNull(selector.channel(connectionId), "Channel not removed")
assertNull(selector.closingChannel(connectionId), "Closing channel not removed") assertNull(selector.closingChannel(connectionId), "Closing channel not removed")
} }
def closeTestableProcessor(): Unit = {
testableProcessor.beginShutdown()
testableProcessor.thread.join()
}
def clientChannels(): ArrayBuffer[SocketChannel] = testableAcceptor.clientChannels
} }
// a X509TrustManager to trust self-signed certs for unit tests. // a X509TrustManager to trust self-signed certs for unit tests.