diff --git a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala index 811e1d92c9b..704d810137f 100644 --- a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala @@ -31,13 +31,12 @@ import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Tag import scala.jdk.CollectionConverters._ -import scala.jdk.OptionConverters.RichOptional @Tag("integration") abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) { def sendApiVersionsRequest(request: ApiVersionsRequest, listenerName: ListenerName): ApiVersionsResponse = { - val socket = if (cluster.controllerListenerName().toScala.contains(listenerName)) { + val socket = if (cluster.controllerListenerName() == listenerName) { cluster.controllerSocketServers().asScala.head } else { cluster.brokerSocketServers().asScala.head @@ -92,7 +91,7 @@ abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) { assertEquals(0, apiVersionsResponse.data().supportedFeatures().find(StreamsVersion.FEATURE_NAME).minVersion()) assertEquals(StreamsVersion.SV_1.featureLevel(), apiVersionsResponse.data().supportedFeatures().find(StreamsVersion.FEATURE_NAME).maxVersion()) } - val expectedApis = if (cluster.controllerListenerName().toScala.contains(listenerName)) { + val expectedApis = if (cluster.controllerListenerName() == listenerName) { ApiVersionsResponse.collectApis( ApiMessageType.ListenerType.CONTROLLER, ApiKeys.apisForListener(ApiMessageType.ListenerType.CONTROLLER), @@ -110,7 +109,7 @@ abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) { assertEquals(expectedApis.size, apiVersionsResponse.data.apiKeys.size, "API keys in ApiVersionsResponse must match API keys supported by broker.") - val defaultApiVersionsResponse = if (cluster.controllerListenerName().toScala.contains(listenerName)) { + val defaultApiVersionsResponse = if (cluster.controllerListenerName() == listenerName) { TestUtils.defaultApiVersionsResponse(0, ListenerType.CONTROLLER, enableUnstableLastVersion) } else { TestUtils.createApiVersionsResponse(0, expectedApis) diff --git a/core/src/test/scala/unit/kafka/server/AllocateProducerIdsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AllocateProducerIdsRequestTest.scala index d54e3227f80..cb44719fad7 100644 --- a/core/src/test/scala/unit/kafka/server/AllocateProducerIdsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/AllocateProducerIdsRequestTest.scala @@ -82,7 +82,7 @@ class AllocateProducerIdsRequestTest(cluster: ClusterInstance) { IntegrationTestUtils.connectAndReceive[AllocateProducerIdsResponse]( request, controllerSocketServer, - cluster.controllerListenerName.get + cluster.controllerListenerName ) } diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala index 6e32cfc01f8..30443d81106 100644 --- a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala @@ -50,8 +50,8 @@ class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersio @ClusterTest(types = Array(Type.KRAFT)) def testApiVersionsRequestThroughControllerListener(): Unit = { val request = new ApiVersionsRequest.Builder().build() - val apiVersionsResponse = sendApiVersionsRequest(request, cluster.controllerListenerName.get()) - validateApiVersionsResponse(apiVersionsResponse, cluster.controllerListenerName.get(), enableUnstableLastVersion = true) + val apiVersionsResponse = sendApiVersionsRequest(request, cluster.controllerListenerName()) + validateApiVersionsResponse(apiVersionsResponse, cluster.controllerListenerName(), enableUnstableLastVersion = true) } @ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT)) @@ -82,8 +82,8 @@ class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersio @ClusterTest(types = Array(Type.KRAFT)) def testApiVersionsRequestValidationV0ThroughControllerListener(): Unit = { val apiVersionsRequest = new ApiVersionsRequest.Builder().build(0.asInstanceOf[Short]) - val apiVersionsResponse = sendApiVersionsRequest(apiVersionsRequest, cluster.controllerListenerName.get()) - validateApiVersionsResponse(apiVersionsResponse, cluster.controllerListenerName.get(), apiVersion = 0, enableUnstableLastVersion = true) + val apiVersionsResponse = sendApiVersionsRequest(apiVersionsRequest, cluster.controllerListenerName()) + validateApiVersionsResponse(apiVersionsResponse, cluster.controllerListenerName(), apiVersion = 0, enableUnstableLastVersion = true) } @ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT)) diff --git a/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala b/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala index c7a4bd45f78..7e08f996305 100644 --- a/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala @@ -47,10 +47,10 @@ class BrokerRegistrationRequestTest { def node: Option[Node] = Some(new Node( clusterInstance.anyControllerSocketServer().config.nodeId, "127.0.0.1", - clusterInstance.anyControllerSocketServer().boundPort(clusterInstance.controllerListenerName().get()), + clusterInstance.anyControllerSocketServer().boundPort(clusterInstance.controllerListenerName()), )) - def listenerName: ListenerName = clusterInstance.controllerListenerName().get() + def listenerName: ListenerName = clusterInstance.controllerListenerName() val securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT diff --git a/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala b/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala index 2aa8f5a9e2c..1e6d27320ca 100644 --- a/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala @@ -81,7 +81,7 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) { val nodes = response.data.nodes().asScala assertEquals(cluster.controllerIds().asScala, nodes.map(_.nodeId()).toSet) val node = nodes.find(_.nodeId() == cluster.controllers().keySet().asScala.head) - assertEquals(cluster.controllerListenerName().get().value(), node.get.listeners().asScala.head.name()) + assertEquals(cluster.controllerListenerName().value(), node.get.listeners().asScala.head.name()) } } } diff --git a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java index d07e3bd2ea2..2fafc4139de 100644 --- a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java +++ b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java @@ -115,9 +115,7 @@ public interface ClusterInstance { /** * The listener for the kraft cluster controller configured by controller.listener.names. */ - default Optional controllerListenerName() { - return Optional.empty(); - } + ListenerName controllerListenerName(); /** * The broker connect string which can be used by clients for bootstrapping diff --git a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/junit/RaftClusterInvocationContext.java b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/junit/RaftClusterInvocationContext.java index 956013e5d13..0b291159858 100644 --- a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/junit/RaftClusterInvocationContext.java +++ b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/junit/RaftClusterInvocationContext.java @@ -55,7 +55,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import java.util.stream.Stream; -import scala.jdk.javaapi.OptionConverters; /** @@ -133,11 +132,16 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte } @Override - public Optional controllerListenerName() { - return controllers().values().stream() - .findAny() - .flatMap(s -> OptionConverters.toJava(s.config().controllerListenerNames().headOption())) - .map(ListenerName::new); + public ListenerName controllerListenerName() { + return new ListenerName( + controllers() + .values() + .iterator() + .next() + .config() + .controllerListenerNames() + .head() + ); } @Override diff --git a/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/junit/ClusterTestExtensionsTest.java b/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/junit/ClusterTestExtensionsTest.java index c1e379fe700..4ad3ff0dfc1 100644 --- a/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/junit/ClusterTestExtensionsTest.java +++ b/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/junit/ClusterTestExtensionsTest.java @@ -345,7 +345,7 @@ public class ClusterTestExtensionsTest { @ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT}, controllerListener = "FOO") public void testControllerListenerName(ClusterInstance cluster) throws ExecutionException, InterruptedException { - assertEquals("FOO", cluster.controllerListenerName().get().value()); + assertEquals("FOO", cluster.controllerListenerName().value()); try (Admin admin = cluster.admin(Map.of(), true)) { assertEquals(1, admin.describeMetadataQuorum().quorumInfo().get().nodes().size()); }