MINOR: Replace boundPort with brokerBoundPort (#20297)

## Changes:
- Replaced all references to boundPort with brokerBoundPort.

## Reasons
- boundPort and brokerBoundPort share the same definition and behavior.

Reviewers: TaiJuWu <tjwu1217@gmail.com>, Ken Huang <s7133700@gmail.com>,
 Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Chang-Chi Hsu 2025-08-04 10:44:12 +02:00 committed by GitHub
parent e67c042f7f
commit 888861d803
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 9 additions and 17 deletions

View File

@ -380,7 +380,7 @@ public class ConsumerBounceTest {
TestUtils.waitForCondition(() -> { TestUtils.waitForCondition(() -> {
FindCoordinatorResponse response = null; FindCoordinatorResponse response = null;
try { try {
response = IntegrationTestUtils.connectAndReceive(request, clusterInstance.boundPorts().get(0)); response = IntegrationTestUtils.connectAndReceive(request, clusterInstance.brokerBoundPorts().get(0));
} catch (IOException e) { } catch (IOException e) {
return false; return false;
} }

View File

@ -38,7 +38,7 @@ abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) {
def sendUnsupportedApiVersionRequest(request: ApiVersionsRequest): ApiVersionsResponse = { def sendUnsupportedApiVersionRequest(request: ApiVersionsRequest): ApiVersionsResponse = {
val overrideHeader = IntegrationTestUtils.nextRequestHeader(ApiKeys.API_VERSIONS, Short.MaxValue) val overrideHeader = IntegrationTestUtils.nextRequestHeader(ApiKeys.API_VERSIONS, Short.MaxValue)
val socket = IntegrationTestUtils.connect(cluster.boundPorts().get(0)) val socket = IntegrationTestUtils.connect(cluster.brokerBoundPorts().get(0))
try { try {
val serializedBytes = Utils.toArray( val serializedBytes = Utils.toArray(
RequestUtils.serialize(overrideHeader.data, overrideHeader.headerVersion, request.data, request.version)) RequestUtils.serialize(overrideHeader.data, overrideHeader.headerVersion, request.data, request.version))

View File

@ -35,7 +35,7 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) {
val request = new DescribeQuorumRequest.Builder( val request = new DescribeQuorumRequest.Builder(
singletonRequest(KafkaRaftServer.MetadataPartition) singletonRequest(KafkaRaftServer.MetadataPartition)
).build(version.toShort) ).build(version.toShort)
val response = IntegrationTestUtils.connectAndReceive[DescribeQuorumResponse](request, cluster.boundPorts().get(0)) val response = IntegrationTestUtils.connectAndReceive[DescribeQuorumResponse](request, cluster.brokerBoundPorts().get(0))
assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode)) assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode))
assertEquals("", response.data.errorMessage) assertEquals("", response.data.errorMessage)

View File

@ -954,7 +954,7 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) {
} }
protected def connectAny(): Socket = { protected def connectAny(): Socket = {
val socket: Socket = IntegrationTestUtils.connect(cluster.boundPorts().get(0)) val socket: Socket = IntegrationTestUtils.connect(cluster.brokerBoundPorts().get(0))
openSockets += socket openSockets += socket
socket socket
} }
@ -968,7 +968,7 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) {
protected def connectAndReceive[T <: AbstractResponse]( protected def connectAndReceive[T <: AbstractResponse](
request: AbstractRequest request: AbstractRequest
): T = { ): T = {
IntegrationTestUtils.connectAndReceive[T](request, cluster.boundPorts().get(0)) IntegrationTestUtils.connectAndReceive[T](request, cluster.brokerBoundPorts().get(0))
} }
protected def connectAndReceive[T <: AbstractResponse]( protected def connectAndReceive[T <: AbstractResponse](

View File

@ -35,7 +35,7 @@ class SaslApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVe
controllerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT controllerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT
) )
def testApiVersionsRequestBeforeSaslHandshakeRequest(): Unit = { def testApiVersionsRequestBeforeSaslHandshakeRequest(): Unit = {
val socket = IntegrationTestUtils.connect(cluster.boundPorts().get(0)) val socket = IntegrationTestUtils.connect(cluster.brokerBoundPorts().get(0))
try { try {
val apiVersionsResponse = IntegrationTestUtils.sendAndReceive[ApiVersionsResponse]( val apiVersionsResponse = IntegrationTestUtils.sendAndReceive[ApiVersionsResponse](
new ApiVersionsRequest.Builder().build(0), socket) new ApiVersionsRequest.Builder().build(0), socket)
@ -56,7 +56,7 @@ class SaslApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVe
controllerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT controllerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT
) )
def testApiVersionsRequestAfterSaslHandshakeRequest(): Unit = { def testApiVersionsRequestAfterSaslHandshakeRequest(): Unit = {
val socket = IntegrationTestUtils.connect(cluster.boundPorts().get(0)) val socket = IntegrationTestUtils.connect(cluster.brokerBoundPorts().get(0))
try { try {
sendSaslHandshakeRequestValidateResponse(socket) sendSaslHandshakeRequestValidateResponse(socket)
val response = IntegrationTestUtils.sendAndReceive[ApiVersionsResponse]( val response = IntegrationTestUtils.sendAndReceive[ApiVersionsResponse](
@ -72,7 +72,7 @@ class SaslApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVe
controllerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT controllerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT
) )
def testApiVersionsRequestWithUnsupportedVersion(): Unit = { def testApiVersionsRequestWithUnsupportedVersion(): Unit = {
val socket = IntegrationTestUtils.connect(cluster.boundPorts().get(0)) val socket = IntegrationTestUtils.connect(cluster.brokerBoundPorts().get(0))
try { try {
val apiVersionsRequest = new ApiVersionsRequest.Builder().build(0) val apiVersionsRequest = new ApiVersionsRequest.Builder().build(0)
val apiVersionsResponse = sendUnsupportedApiVersionRequest(apiVersionsRequest) val apiVersionsResponse = sendUnsupportedApiVersionRequest(apiVersionsRequest)

View File

@ -933,7 +933,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
} }
private def connectAndReceive(request: ShareGroupHeartbeatRequest): ShareGroupHeartbeatResponse = { private def connectAndReceive(request: ShareGroupHeartbeatRequest): ShareGroupHeartbeatResponse = {
IntegrationTestUtils.connectAndReceive[ShareGroupHeartbeatResponse](request, cluster.boundPorts().get(0)) IntegrationTestUtils.connectAndReceive[ShareGroupHeartbeatResponse](request, cluster.brokerBoundPorts().get(0))
} }
private def increasePartitions[B <: KafkaBroker](admin: Admin, private def increasePartitions[B <: KafkaBroker](admin: Admin,

View File

@ -447,12 +447,4 @@ public interface ClusterInstance {
throw new AssertionError("Timing out after " + timeoutMs + throw new AssertionError("Timing out after " + timeoutMs +
" ms since a leader was not elected for partition " + topicPartition); " ms since a leader was not elected for partition " + topicPartition);
} }
default List<Integer> boundPorts() {
return brokers().values().stream()
.map(KafkaBroker::socketServer)
.map(s -> s.boundPort(clientListener()))
.toList();
}
} }