From 888861d803bf819eabffe0c8ce8ed21b0b0304b0 Mon Sep 17 00:00:00 2001 From: Chang-Chi Hsu Date: Mon, 4 Aug 2025 10:44:12 +0200 Subject: [PATCH] MINOR: Replace boundPort with brokerBoundPort (#20297) ## Changes: - Replaced all references to boundPort with brokerBoundPort. ## Reasons - boundPort and brokerBoundPort share the same definition and behavior. Reviewers: TaiJuWu , Ken Huang , Chia-Ping Tsai --- .../apache/kafka/clients/consumer/ConsumerBounceTest.java | 2 +- .../kafka/server/AbstractApiVersionsRequestTest.scala | 2 +- .../unit/kafka/server/DescribeQuorumRequestTest.scala | 2 +- .../kafka/server/GroupCoordinatorBaseRequestTest.scala | 4 ++-- .../unit/kafka/server/SaslApiVersionsRequestTest.scala | 6 +++--- .../kafka/server/ShareGroupHeartbeatRequestTest.scala | 2 +- .../org/apache/kafka/common/test/ClusterInstance.java | 8 -------- 7 files changed, 9 insertions(+), 17 deletions(-) diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerBounceTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerBounceTest.java index 75dc155d0a5..2583d099977 100644 --- a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerBounceTest.java +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerBounceTest.java @@ -380,7 +380,7 @@ public class ConsumerBounceTest { TestUtils.waitForCondition(() -> { FindCoordinatorResponse response = null; try { - response = IntegrationTestUtils.connectAndReceive(request, clusterInstance.boundPorts().get(0)); + response = IntegrationTestUtils.connectAndReceive(request, clusterInstance.brokerBoundPorts().get(0)); } catch (IOException e) { return false; } diff --git a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala index 43e946c4b22..958c8440c2c 100644 --- a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala @@ -38,7 +38,7 @@ abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) { def sendUnsupportedApiVersionRequest(request: ApiVersionsRequest): ApiVersionsResponse = { val overrideHeader = IntegrationTestUtils.nextRequestHeader(ApiKeys.API_VERSIONS, Short.MaxValue) - val socket = IntegrationTestUtils.connect(cluster.boundPorts().get(0)) + val socket = IntegrationTestUtils.connect(cluster.brokerBoundPorts().get(0)) try { val serializedBytes = Utils.toArray( RequestUtils.serialize(overrideHeader.data, overrideHeader.headerVersion, request.data, request.version)) diff --git a/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala b/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala index ad1782adc65..bc0e768c6f4 100644 --- a/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala @@ -35,7 +35,7 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) { val request = new DescribeQuorumRequest.Builder( singletonRequest(KafkaRaftServer.MetadataPartition) ).build(version.toShort) - val response = IntegrationTestUtils.connectAndReceive[DescribeQuorumResponse](request, cluster.boundPorts().get(0)) + val response = IntegrationTestUtils.connectAndReceive[DescribeQuorumResponse](request, cluster.brokerBoundPorts().get(0)) assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode)) assertEquals("", response.data.errorMessage) diff --git a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala index 431e431504f..d5ab1356ac9 100644 --- a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala @@ -954,7 +954,7 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) { } protected def connectAny(): Socket = { - val socket: Socket = IntegrationTestUtils.connect(cluster.boundPorts().get(0)) + val socket: Socket = IntegrationTestUtils.connect(cluster.brokerBoundPorts().get(0)) openSockets += socket socket } @@ -968,7 +968,7 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) { protected def connectAndReceive[T <: AbstractResponse]( request: AbstractRequest ): T = { - IntegrationTestUtils.connectAndReceive[T](request, cluster.boundPorts().get(0)) + IntegrationTestUtils.connectAndReceive[T](request, cluster.brokerBoundPorts().get(0)) } protected def connectAndReceive[T <: AbstractResponse]( diff --git a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala index 4f63210f595..580b4a71f09 100644 --- a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala @@ -35,7 +35,7 @@ class SaslApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVe controllerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT ) def testApiVersionsRequestBeforeSaslHandshakeRequest(): Unit = { - val socket = IntegrationTestUtils.connect(cluster.boundPorts().get(0)) + val socket = IntegrationTestUtils.connect(cluster.brokerBoundPorts().get(0)) try { val apiVersionsResponse = IntegrationTestUtils.sendAndReceive[ApiVersionsResponse]( new ApiVersionsRequest.Builder().build(0), socket) @@ -56,7 +56,7 @@ class SaslApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVe controllerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT ) def testApiVersionsRequestAfterSaslHandshakeRequest(): Unit = { - val socket = IntegrationTestUtils.connect(cluster.boundPorts().get(0)) + val socket = IntegrationTestUtils.connect(cluster.brokerBoundPorts().get(0)) try { sendSaslHandshakeRequestValidateResponse(socket) val response = IntegrationTestUtils.sendAndReceive[ApiVersionsResponse]( @@ -72,7 +72,7 @@ class SaslApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVe controllerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT ) def testApiVersionsRequestWithUnsupportedVersion(): Unit = { - val socket = IntegrationTestUtils.connect(cluster.boundPorts().get(0)) + val socket = IntegrationTestUtils.connect(cluster.brokerBoundPorts().get(0)) try { val apiVersionsRequest = new ApiVersionsRequest.Builder().build(0) val apiVersionsResponse = sendUnsupportedApiVersionRequest(apiVersionsRequest) diff --git a/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala b/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala index 6ab0d40d432..b05a97fe119 100644 --- a/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala @@ -933,7 +933,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) { } private def connectAndReceive(request: ShareGroupHeartbeatRequest): ShareGroupHeartbeatResponse = { - IntegrationTestUtils.connectAndReceive[ShareGroupHeartbeatResponse](request, cluster.boundPorts().get(0)) + IntegrationTestUtils.connectAndReceive[ShareGroupHeartbeatResponse](request, cluster.brokerBoundPorts().get(0)) } private def increasePartitions[B <: KafkaBroker](admin: Admin, diff --git a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java index b7f9d4c289a..7662eeda7a3 100644 --- a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java +++ b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java @@ -447,12 +447,4 @@ public interface ClusterInstance { throw new AssertionError("Timing out after " + timeoutMs + " ms since a leader was not elected for partition " + topicPartition); } - - default List boundPorts() { - return brokers().values().stream() - .map(KafkaBroker::socketServer) - .map(s -> s.boundPort(clientListener())) - .toList(); - - } }