Fix KAFKA-16768

This commit is contained in:
ChangYu Huang 2025-08-04 11:13:46 +08:00
parent ed9acf45b6
commit 790a19d7e8
1 changed files with 13 additions and 4 deletions

View File

@ -656,6 +656,7 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer,
// all processors, block until the last one is able to accept a connection. // all processors, block until the last one is able to accept a connection.
var retriesLeft = synchronized(processors.length) var retriesLeft = synchronized(processors.length)
var processor: Processor = null var processor: Processor = null
var accepted: Boolean = false
do { do {
retriesLeft -= 1 retriesLeft -= 1
processor = synchronized { processor = synchronized {
@ -665,7 +666,13 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer,
processors(currentProcessorIndex) processors(currentProcessorIndex)
} }
currentProcessorIndex += 1 currentProcessorIndex += 1
} while (!assignNewConnection(socketChannel, processor, retriesLeft == 0)) accepted = assignNewConnection(socketChannel, processor, retriesLeft == 0)
} while (!accepted && shouldRun.get())
if (!accepted) {
debug(s"Refused connection from ${socketChannel.socket.getRemoteSocketAddress} on" +
s" ${socketChannel.socket.getLocalSocketAddress} because the server is shutting down.")
socketChannel.close()
}
} }
} else } else
throw new IllegalStateException("Unrecognized key state for acceptor thread.") throw new IllegalStateException("Unrecognized key state for acceptor thread.")
@ -1153,9 +1160,11 @@ private[kafka] class Processor(
*/ */
def accept(socketChannel: SocketChannel, def accept(socketChannel: SocketChannel,
mayBlock: Boolean, mayBlock: Boolean,
acceptorBlockedPercentMeter: com.yammer.metrics.core.Meter): Boolean = { acceptorBlockedPercentMeter: com.yammer.metrics.core.Meter): Boolean = synchronized {
val accepted = { val accepted = {
if (newConnections.offer(socketChannel)) if (!shouldRun.get())
false
else if (newConnections.offer(socketChannel))
true true
else if (mayBlock) { else if (mayBlock) {
val startNs = time.nanoseconds val startNs = time.nanoseconds
@ -1262,7 +1271,7 @@ private[kafka] class Processor(
*/ */
def wakeup(): Unit = selector.wakeup() def wakeup(): Unit = selector.wakeup()
def beginShutdown(): Unit = { def beginShutdown(): Unit = synchronized {
if (shouldRun.getAndSet(false)) { if (shouldRun.getAndSet(false)) {
wakeup() wakeup()
} }