KAFKA-14908: Set setReuseAddress on the kafka server socket (#13572)

Changes SocketServer to set the setReuseAddress(true) socket option.

This aids use-cases where kafka is started/stopped on the same port in rapid succession. Examples are: where a kafka cluster is embedded in an integration test suite that starts/stops a cluster before/after each test.

Reviewers: Luke Chen <showuon@gmail.com>, Tom Bentley <tbentley@redhat.com>, Divij Vaidya <diviv@amazon.com>
This commit is contained in:
Keith Wall 2023-04-19 03:58:29 +01:00 committed by GitHub
parent f905a5a45d
commit d04c3e56c2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 41 additions and 1 deletions

View File

@ -723,6 +723,10 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer,
new InetSocketAddress(host, port)
val serverChannel = ServerSocketChannel.open()
serverChannel.configureBlocking(false)
// Configure the socket with setReuseAddress(true). This is done to aid use-cases where the kafka
// server is rapidly shut down and started up on the same port (e.g. application integration test suites
// that embed a kafka cluster).
serverChannel.socket().setReuseAddress(true);
if (recvBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
serverChannel.socket().setReceiveBufferSize(recvBufferSize)

View File

@ -20,7 +20,7 @@ package kafka.network
import java.io._
import java.net._
import java.nio.ByteBuffer
import java.nio.channels.{SelectionKey, SocketChannel}
import java.nio.channels.{SelectionKey, ServerSocketChannel, SocketChannel}
import java.nio.charset.StandardCharsets
import java.util
import java.util.concurrent.{CompletableFuture, ConcurrentLinkedQueue, ExecutionException, Executors, TimeUnit}
@ -1893,6 +1893,42 @@ class SocketServerTest {
}, false)
}
@Test
def testDataPlaneAcceptingSocketUsesReuseAddress(): Unit = {
val acceptor = server.dataPlaneAcceptor(listener)
val channel = acceptor.get.serverChannel
verifySocketUsesReuseAddress(channel)
}
@Test
def testControlAndDataPlaneAcceptingSocketsUseReuseAddress(): Unit = {
shutdownServerAndMetrics(server)
val testProps = new Properties
testProps ++= props
// Use port 0 so that the system will assign an available port from the ephemeral range
testProps.put("listeners", "PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0")
testProps.put("listener.security.protocol.map", "PLAINTEXT:PLAINTEXT,CONTROL_PLANE:PLAINTEXT")
testProps.put("control.plane.listener.name", "CONTROL_PLANE")
val config = KafkaConfig.fromProps(testProps)
val testServer = new SocketServer(config, metrics, Time.SYSTEM, credentialProvider, apiVersionManager)
try {
val dataPlaneAcceptor = testServer.dataPlaneAcceptor(listener)
val dataPlaneChannel = dataPlaneAcceptor.get.serverChannel
verifySocketUsesReuseAddress(dataPlaneChannel)
val controlPlaneAcceptor = testServer.controlPlaneAcceptorOpt.get
val acceptingChannel = controlPlaneAcceptor.serverChannel
verifySocketUsesReuseAddress(acceptingChannel)
} finally {
shutdownServerAndMetrics(testServer)
}
}
private def verifySocketUsesReuseAddress(channel: ServerSocketChannel): Unit = {
assertTrue(channel.socket().isBound, "Listening channel not bound")
assertTrue(channel.socket().getReuseAddress, "Listening socket reuseAddress in unexpected state")
}
/**
* Test to ensure "Selector.poll()" does not block at "select(timeout)" when there is no data in the socket but there
* is data in the buffer. This only happens when SSL protocol is used.