KAFKA-16369: Broker may not shut down when SocketServer fails to bind as Address already in use (#15530)

Add a Wait in in KafkaServer (ZK mode) for all the SocketServer ports
to be open, and the Acceptors to be started

The BrokerServer (KRaft mode) had such a wait,
which was missing from the KafkaServer (ZK mode).

Add unit test.
This commit is contained in:
Edoardo Comar 2024-03-18 10:14:43 +00:00
parent 689c8e0706
commit f103cb61c5
2 changed files with 35 additions and 2 deletions

View File

@ -584,7 +584,7 @@ class KafkaServer(
}
}
}
socketServer.enableRequestProcessing(authorizerFutures)
val enableRequestProcessingFuture = socketServer.enableRequestProcessing(authorizerFutures)
// Block here until all the authorizer futures are complete
try {
CompletableFuture.allOf(authorizerFutures.values.toSeq: _*).join()
@ -592,6 +592,13 @@ class KafkaServer(
case t: Throwable => throw new RuntimeException("Received a fatal error while " +
"waiting for all of the authorizer futures to be completed.", t)
}
// Wait for all the SocketServer ports to be open, and the Acceptors to be started.
try {
enableRequestProcessingFuture.join()
} catch {
case t: Throwable => throw new RuntimeException("Received a fatal error while " +
"waiting for the SocketServer Acceptors to be started.", t)
}
_brokerState = BrokerState.RUNNING
shutdownLatch = new CountDownLatch(1)

View File

@ -17,12 +17,13 @@
package kafka.server
import kafka.utils.TestUtils
import kafka.utils.{CoreUtils, TestUtils}
import org.apache.kafka.common.security.JaasUtils
import org.junit.jupiter.api.Assertions.{assertEquals, assertNull, assertThrows, assertTrue, fail}
import org.junit.jupiter.api.Test
import java.util.Properties
import java.net.{InetAddress, ServerSocket}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.util.MockTime
@ -47,6 +48,24 @@ class KafkaServerTest extends QuorumTestHarness {
TestUtils.shutdownServers(Seq(server1, server2))
}
@Test
def testListenerPortAlreadyInUse(): Unit = {
val serverSocket = new ServerSocket(0, 0, InetAddress.getLoopbackAddress)
var kafkaServer : Option[KafkaServer] = None
try {
TestUtils.waitUntilTrue(() => serverSocket.isBound, "Server socket failed to bind.")
// start a server with listener on the port already bound
assertThrows(classOf[RuntimeException],
() => kafkaServer = Option(createServerWithListenerOnPort(serverSocket.getLocalPort)),
"Expected RuntimeException due to address already in use during KafkaServer startup"
)
} finally {
CoreUtils.swallow(serverSocket.close(), this);
TestUtils.shutdownServers(kafkaServer.toList)
}
}
@Test
def testCreatesProperZkConfigWhenSaslDisabled(): Unit = {
val props = new Properties
@ -185,4 +204,11 @@ class KafkaServerTest extends QuorumTestHarness {
TestUtils.createServer(kafkaConfig)
}
def createServerWithListenerOnPort(port: Int): KafkaServer = {
val props = TestUtils.createBrokerConfig(0, zkConnect)
props.put(KafkaConfig.ListenersProp, s"PLAINTEXT://localhost:$port")
val kafkaConfig = KafkaConfig.fromProps(props)
TestUtils.createServer(kafkaConfig)
}
}