KAFKA-19270: Remove Optional from ClusterInstance#controllerListenerName() return type (#19718)
CI / build (push) Waiting to run Details

In KRaft mode, controllerListenerName must always be specified, so we don't need an `Optional` to wrap it.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Ken Huang
<s7133700@gmail.com>, TengYao Chi <frankvicky@apache.org>
This commit is contained in:
Ming-Yen Chung 2025-05-15 10:47:47 +08:00 committed by GitHub
parent 7f02c263a6
commit 7d4acedc27
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 23 additions and 22 deletions

View File

@ -31,13 +31,12 @@ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Tag import org.junit.jupiter.api.Tag
import scala.jdk.CollectionConverters._ import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters.RichOptional
@Tag("integration") @Tag("integration")
abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) { abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) {
def sendApiVersionsRequest(request: ApiVersionsRequest, listenerName: ListenerName): ApiVersionsResponse = { def sendApiVersionsRequest(request: ApiVersionsRequest, listenerName: ListenerName): ApiVersionsResponse = {
val socket = if (cluster.controllerListenerName().toScala.contains(listenerName)) { val socket = if (cluster.controllerListenerName() == listenerName) {
cluster.controllerSocketServers().asScala.head cluster.controllerSocketServers().asScala.head
} else { } else {
cluster.brokerSocketServers().asScala.head cluster.brokerSocketServers().asScala.head
@ -92,7 +91,7 @@ abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) {
assertEquals(0, apiVersionsResponse.data().supportedFeatures().find(StreamsVersion.FEATURE_NAME).minVersion()) assertEquals(0, apiVersionsResponse.data().supportedFeatures().find(StreamsVersion.FEATURE_NAME).minVersion())
assertEquals(StreamsVersion.SV_1.featureLevel(), apiVersionsResponse.data().supportedFeatures().find(StreamsVersion.FEATURE_NAME).maxVersion()) assertEquals(StreamsVersion.SV_1.featureLevel(), apiVersionsResponse.data().supportedFeatures().find(StreamsVersion.FEATURE_NAME).maxVersion())
} }
val expectedApis = if (cluster.controllerListenerName().toScala.contains(listenerName)) { val expectedApis = if (cluster.controllerListenerName() == listenerName) {
ApiVersionsResponse.collectApis( ApiVersionsResponse.collectApis(
ApiMessageType.ListenerType.CONTROLLER, ApiMessageType.ListenerType.CONTROLLER,
ApiKeys.apisForListener(ApiMessageType.ListenerType.CONTROLLER), ApiKeys.apisForListener(ApiMessageType.ListenerType.CONTROLLER),
@ -110,7 +109,7 @@ abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) {
assertEquals(expectedApis.size, apiVersionsResponse.data.apiKeys.size, assertEquals(expectedApis.size, apiVersionsResponse.data.apiKeys.size,
"API keys in ApiVersionsResponse must match API keys supported by broker.") "API keys in ApiVersionsResponse must match API keys supported by broker.")
val defaultApiVersionsResponse = if (cluster.controllerListenerName().toScala.contains(listenerName)) { val defaultApiVersionsResponse = if (cluster.controllerListenerName() == listenerName) {
TestUtils.defaultApiVersionsResponse(0, ListenerType.CONTROLLER, enableUnstableLastVersion) TestUtils.defaultApiVersionsResponse(0, ListenerType.CONTROLLER, enableUnstableLastVersion)
} else { } else {
TestUtils.createApiVersionsResponse(0, expectedApis) TestUtils.createApiVersionsResponse(0, expectedApis)

View File

@ -82,7 +82,7 @@ class AllocateProducerIdsRequestTest(cluster: ClusterInstance) {
IntegrationTestUtils.connectAndReceive[AllocateProducerIdsResponse]( IntegrationTestUtils.connectAndReceive[AllocateProducerIdsResponse](
request, request,
controllerSocketServer, controllerSocketServer,
cluster.controllerListenerName.get cluster.controllerListenerName
) )
} }

View File

@ -50,8 +50,8 @@ class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersio
@ClusterTest(types = Array(Type.KRAFT)) @ClusterTest(types = Array(Type.KRAFT))
def testApiVersionsRequestThroughControllerListener(): Unit = { def testApiVersionsRequestThroughControllerListener(): Unit = {
val request = new ApiVersionsRequest.Builder().build() val request = new ApiVersionsRequest.Builder().build()
val apiVersionsResponse = sendApiVersionsRequest(request, cluster.controllerListenerName.get()) val apiVersionsResponse = sendApiVersionsRequest(request, cluster.controllerListenerName())
validateApiVersionsResponse(apiVersionsResponse, cluster.controllerListenerName.get(), enableUnstableLastVersion = true) validateApiVersionsResponse(apiVersionsResponse, cluster.controllerListenerName(), enableUnstableLastVersion = true)
} }
@ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT)) @ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT))
@ -82,8 +82,8 @@ class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersio
@ClusterTest(types = Array(Type.KRAFT)) @ClusterTest(types = Array(Type.KRAFT))
def testApiVersionsRequestValidationV0ThroughControllerListener(): Unit = { def testApiVersionsRequestValidationV0ThroughControllerListener(): Unit = {
val apiVersionsRequest = new ApiVersionsRequest.Builder().build(0.asInstanceOf[Short]) val apiVersionsRequest = new ApiVersionsRequest.Builder().build(0.asInstanceOf[Short])
val apiVersionsResponse = sendApiVersionsRequest(apiVersionsRequest, cluster.controllerListenerName.get()) val apiVersionsResponse = sendApiVersionsRequest(apiVersionsRequest, cluster.controllerListenerName())
validateApiVersionsResponse(apiVersionsResponse, cluster.controllerListenerName.get(), apiVersion = 0, enableUnstableLastVersion = true) validateApiVersionsResponse(apiVersionsResponse, cluster.controllerListenerName(), apiVersion = 0, enableUnstableLastVersion = true)
} }
@ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT)) @ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT))

View File

@ -47,10 +47,10 @@ class BrokerRegistrationRequestTest {
def node: Option[Node] = Some(new Node( def node: Option[Node] = Some(new Node(
clusterInstance.anyControllerSocketServer().config.nodeId, clusterInstance.anyControllerSocketServer().config.nodeId,
"127.0.0.1", "127.0.0.1",
clusterInstance.anyControllerSocketServer().boundPort(clusterInstance.controllerListenerName().get()), clusterInstance.anyControllerSocketServer().boundPort(clusterInstance.controllerListenerName()),
)) ))
def listenerName: ListenerName = clusterInstance.controllerListenerName().get() def listenerName: ListenerName = clusterInstance.controllerListenerName()
val securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT val securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT

View File

@ -81,7 +81,7 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) {
val nodes = response.data.nodes().asScala val nodes = response.data.nodes().asScala
assertEquals(cluster.controllerIds().asScala, nodes.map(_.nodeId()).toSet) assertEquals(cluster.controllerIds().asScala, nodes.map(_.nodeId()).toSet)
val node = nodes.find(_.nodeId() == cluster.controllers().keySet().asScala.head) val node = nodes.find(_.nodeId() == cluster.controllers().keySet().asScala.head)
assertEquals(cluster.controllerListenerName().get().value(), node.get.listeners().asScala.head.name()) assertEquals(cluster.controllerListenerName().value(), node.get.listeners().asScala.head.name())
} }
} }
} }

View File

@ -115,9 +115,7 @@ public interface ClusterInstance {
/** /**
* The listener for the kraft cluster controller configured by controller.listener.names. * The listener for the kraft cluster controller configured by controller.listener.names.
*/ */
default Optional<ListenerName> controllerListenerName() { ListenerName controllerListenerName();
return Optional.empty();
}
/** /**
* The broker connect string which can be used by clients for bootstrapping * The broker connect string which can be used by clients for bootstrapping

View File

@ -55,7 +55,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import scala.jdk.javaapi.OptionConverters;
/** /**
@ -133,11 +132,16 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
} }
@Override @Override
public Optional<ListenerName> controllerListenerName() { public ListenerName controllerListenerName() {
return controllers().values().stream() return new ListenerName(
.findAny() controllers()
.flatMap(s -> OptionConverters.toJava(s.config().controllerListenerNames().headOption())) .values()
.map(ListenerName::new); .iterator()
.next()
.config()
.controllerListenerNames()
.head()
);
} }
@Override @Override

View File

@ -345,7 +345,7 @@ public class ClusterTestExtensionsTest {
@ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT}, controllerListener = "FOO") @ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT}, controllerListener = "FOO")
public void testControllerListenerName(ClusterInstance cluster) throws ExecutionException, InterruptedException { public void testControllerListenerName(ClusterInstance cluster) throws ExecutionException, InterruptedException {
assertEquals("FOO", cluster.controllerListenerName().get().value()); assertEquals("FOO", cluster.controllerListenerName().value());
try (Admin admin = cluster.admin(Map.of(), true)) { try (Admin admin = cluster.admin(Map.of(), true)) {
assertEquals(1, admin.describeMetadataQuorum().quorumInfo().get().nodes().size()); assertEquals(1, admin.describeMetadataQuorum().quorumInfo().get().nodes().size());
} }