KAFKA-19420 Don't export SocketServer from ClusterInstance (#20002)
CI / build (push) Waiting to run Details
Fixup PR Labels / fixup-pr-labels (needs-attention) (push) Has been cancelled Details
Fixup PR Labels / fixup-pr-labels (triage) (push) Has been cancelled Details
Docker Image CVE Scanner / scan_jvm (3.7.2) (push) Has been cancelled Details
Docker Image CVE Scanner / scan_jvm (3.8.1) (push) Has been cancelled Details
Docker Image CVE Scanner / scan_jvm (3.9.1) (push) Has been cancelled Details
Docker Image CVE Scanner / scan_jvm (4.0.0) (push) Has been cancelled Details
Docker Image CVE Scanner / scan_jvm (latest) (push) Has been cancelled Details
Fixup PR Labels / needs-attention (push) Has been cancelled Details

Refactor the code related to SocketServer  SocketServer is an internal
class, and normally the integration tests should not use it directly.
[KAFKA-19239](https://issues.apache.org/jira/browse/KAFKA-19239) will
add a new helper to expose the bound ports, and so the tests that need
to send raw request can leverage it without accessing the SocketServer.

Reviewers: PoAn Yang <payang@apache.org>, Ken Huang
 <s7133700@gmail.com>, TengYao Chi <kitingiao@gmail.com>, Chia-Ping Tsai
 <chia7712@gmail.com>
This commit is contained in:
Xuan-Zhang Gong 2025-06-27 21:12:57 +08:00 committed by GitHub
parent b919836551
commit 05b6e81688
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 37 additions and 70 deletions

View File

@ -17,7 +17,6 @@
package kafka.coordinator.transaction
import kafka.network.SocketServer
import org.apache.kafka.server.IntegrationTestUtils
import org.apache.kafka.clients.admin.{Admin, NewTopic, TransactionState}
import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, ConsumerRecords, OffsetAndMetadata}
@ -27,7 +26,6 @@ import org.apache.kafka.common.errors.RecordTooLargeException
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterFeature, ClusterTest, ClusterTestDefaults, ClusterTests, Type}
import org.apache.kafka.common.message.InitProducerIdRequestData
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.common.requests.{InitProducerIdRequest, InitProducerIdResponse}
@ -183,9 +181,9 @@ class ProducerIntegrationTest {
private def verifyUniqueIds(clusterInstance: ClusterInstance): Unit = {
// Request enough PIDs from each broker to ensure each broker generates two blocks
val ids = clusterInstance.brokerSocketServers().stream().flatMap( broker => {
IntStream.range(0, 1001).parallel().mapToObj( _ =>
nextProducerId(broker, clusterInstance.clientListener())
val ids = clusterInstance.brokers().values().stream().flatMap(broker => {
IntStream.range(0, 1001).parallel().mapToObj(_ =>
nextProducerId(broker.boundPort(clusterInstance.clientListener()))
)}).collect(Collectors.toList[Long]).asScala.toSeq
val brokerCount = clusterInstance.brokerIds.size
@ -194,7 +192,7 @@ class ProducerIntegrationTest {
assertEquals(expectedTotalCount, ids.distinct.size, "Found duplicate producer IDs")
}
private def nextProducerId(broker: SocketServer, listener: ListenerName): Long = {
private def nextProducerId(port: Int): Long = {
// Generating producer ids may fail while waiting for the initial block and also
// when the current block is full and waiting for the prefetched block.
val deadline = 5.seconds.fromNow
@ -207,7 +205,6 @@ class ProducerIntegrationTest {
.setTransactionalId(null)
.setTransactionTimeoutMs(10)
val request = new InitProducerIdRequest.Builder(data).build()
val port = broker.boundPort(listener)
response = IntegrationTestUtils.connectAndReceive[InitProducerIdResponse](request, port)
shouldRetry = response.data.errorCode == Errors.COORDINATOR_LOAD_IN_PROGRESS.code
}

View File

@ -36,15 +36,6 @@ import scala.jdk.CollectionConverters._
@Tag("integration")
abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) {
def sendApiVersionsRequest(request: ApiVersionsRequest, listenerName: ListenerName): ApiVersionsResponse = {
val socket = if (cluster.controllerListenerName() == listenerName) {
cluster.controllerSocketServers().asScala.head
} else {
cluster.brokerSocketServers().asScala.head
}
IntegrationTestUtils.connectAndReceive[ApiVersionsResponse](request, socket.boundPort(listenerName))
}
def sendUnsupportedApiVersionRequest(request: ApiVersionsRequest): ApiVersionsResponse = {
val overrideHeader = IntegrationTestUtils.nextRequestHeader(ApiKeys.API_VERSIONS, Short.MaxValue)
val socket = IntegrationTestUtils.connect(cluster.boundPorts().get(0))

View File

@ -19,9 +19,10 @@ package kafka.server
import org.apache.kafka.common.message.ApiVersionsRequestData
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.ApiVersionsRequest
import org.apache.kafka.common.requests.{ApiVersionsRequest, ApiVersionsResponse}
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest, Type}
import org.apache.kafka.server.IntegrationTestUtils
import org.apache.kafka.server.common.MetadataVersion
import org.junit.jupiter.api.Assertions._
@ -33,7 +34,7 @@ class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersio
))
def testApiVersionsRequest(): Unit = {
val request = new ApiVersionsRequest.Builder().build()
val apiVersionsResponse = sendApiVersionsRequest(request, cluster.clientListener())
val apiVersionsResponse = IntegrationTestUtils.connectAndReceive[ApiVersionsResponse](request, cluster.brokerBoundPorts().get(0))
validateApiVersionsResponse(apiVersionsResponse)
}
@ -43,14 +44,14 @@ class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersio
))
def testApiVersionsRequestIncludesUnreleasedApis(): Unit = {
val request = new ApiVersionsRequest.Builder().build()
val apiVersionsResponse = sendApiVersionsRequest(request, cluster.clientListener())
val apiVersionsResponse = IntegrationTestUtils.connectAndReceive[ApiVersionsResponse](request, cluster.brokerBoundPorts().get(0))
validateApiVersionsResponse(apiVersionsResponse, enableUnstableLastVersion = true)
}
@ClusterTest(types = Array(Type.KRAFT))
def testApiVersionsRequestThroughControllerListener(): Unit = {
val request = new ApiVersionsRequest.Builder().build()
val apiVersionsResponse = sendApiVersionsRequest(request, cluster.controllerListenerName())
val apiVersionsResponse = IntegrationTestUtils.connectAndReceive[ApiVersionsResponse](request, cluster.controllerBoundPorts().get(0))
validateApiVersionsResponse(apiVersionsResponse, cluster.controllerListenerName(), enableUnstableLastVersion = true)
}
@ -73,7 +74,7 @@ class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersio
))
def testApiVersionsRequestValidationV0(): Unit = {
val apiVersionsRequest = new ApiVersionsRequest.Builder().build(0.asInstanceOf[Short])
val apiVersionsResponse = sendApiVersionsRequest(apiVersionsRequest, cluster.clientListener())
val apiVersionsResponse = IntegrationTestUtils.connectAndReceive[ApiVersionsResponse](apiVersionsRequest, cluster.brokerBoundPorts().get(0))
validateApiVersionsResponse(apiVersionsResponse, apiVersion = 0,
enableUnstableLastVersion = !"false".equals(
cluster.config().serverProperties().get("unstable.api.versions.enable")))
@ -82,7 +83,7 @@ class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersio
@ClusterTest(types = Array(Type.KRAFT))
def testApiVersionsRequestValidationV0ThroughControllerListener(): Unit = {
val apiVersionsRequest = new ApiVersionsRequest.Builder().build(0.asInstanceOf[Short])
val apiVersionsResponse = sendApiVersionsRequest(apiVersionsRequest, cluster.controllerListenerName())
val apiVersionsResponse = IntegrationTestUtils.connectAndReceive[ApiVersionsResponse](apiVersionsRequest, cluster.controllerBoundPorts().get(0))
validateApiVersionsResponse(apiVersionsResponse, cluster.controllerListenerName(), apiVersion = 0, enableUnstableLastVersion = true)
}
@ -90,7 +91,7 @@ class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersio
def testApiVersionsRequestValidationV3(): Unit = {
// Invalid request because Name and Version are empty by default
val apiVersionsRequest = new ApiVersionsRequest(new ApiVersionsRequestData(), 3.asInstanceOf[Short])
val apiVersionsResponse = sendApiVersionsRequest(apiVersionsRequest, cluster.clientListener())
val apiVersionsResponse = IntegrationTestUtils.connectAndReceive[ApiVersionsResponse](apiVersionsRequest, cluster.brokerBoundPorts().get(0))
assertEquals(Errors.INVALID_REQUEST.code(), apiVersionsResponse.data.errorCode())
}
}

View File

@ -42,12 +42,13 @@ import java.util.concurrent.{CompletableFuture, TimeUnit, TimeoutException}
class BrokerRegistrationRequestTest {
def brokerToControllerChannelManager(clusterInstance: ClusterInstance): NodeToControllerChannelManager = {
val controllerSocketServer = clusterInstance.controllers().values().stream().map(_.socketServer).findFirst().get()
new NodeToControllerChannelManagerImpl(
new ControllerNodeProvider() {
def node: Option[Node] = Some(new Node(
clusterInstance.anyControllerSocketServer().config.nodeId,
controllerSocketServer.config.nodeId,
"127.0.0.1",
clusterInstance.anyControllerSocketServer().boundPort(clusterInstance.controllerListenerName()),
controllerSocketServer.boundPort(clusterInstance.controllerListenerName()),
))
def listenerName: ListenerName = clusterInstance.controllerListenerName()
@ -61,7 +62,7 @@ class BrokerRegistrationRequestTest {
},
Time.SYSTEM,
new Metrics(),
clusterInstance.anyControllerSocketServer().config,
controllerSocketServer.config,
"heartbeat",
"test-heartbeat-",
10000

View File

@ -217,7 +217,7 @@ class ClientQuotasRequestTest(cluster: ClusterInstance) {
else
InetAddress.getByName(entityName)
var currentServerQuota = 0
currentServerQuota = cluster.brokerSocketServers().asScala.head.connectionQuotas.connectionRateForIp(entityIp)
currentServerQuota = cluster.brokers().values().asScala.head.socketServer.connectionQuotas.connectionRateForIp(entityIp)
assertTrue(Math.abs(expectedMatches(entity) - currentServerQuota) < 0.01,
s"Connection quota of $entity is not ${expectedMatches(entity)} but $currentServerQuota")
}

View File

@ -17,7 +17,6 @@
package org.apache.kafka.common.test;
import kafka.network.SocketServer;
import kafka.server.BrokerServer;
import kafka.server.ControllerServer;
import kafka.server.KafkaBroker;
@ -130,36 +129,18 @@ public interface ClusterInstance {
*/
String bootstrapControllers();
/**
* A collection of all brokers in the cluster.
*/
default Collection<SocketServer> brokerSocketServers() {
default List<Integer> controllerBoundPorts() {
return controllers().values().stream()
.map(ControllerServer::socketServer)
.map(ss -> ss.boundPort(controllerListenerName()))
.toList();
}
default List<Integer> brokerBoundPorts() {
return brokers().values().stream()
.map(KafkaBroker::socketServer)
.collect(Collectors.toList());
}
/**
* A collection of all controllers in the cluster.
*/
Collection<SocketServer> controllerSocketServers();
/**
* Return any one of the broker servers. Throw an error if none are found
*/
default SocketServer anyBrokerSocketServer() {
return brokerSocketServers().stream()
.findFirst()
.orElseThrow(() -> new RuntimeException("No broker SocketServers found"));
}
/**
* Return any one of the controller servers. Throw an error if none are found
*/
default SocketServer anyControllerSocketServer() {
return controllerSocketServers().stream()
.findFirst()
.orElseThrow(() -> new RuntimeException("No controller SocketServers found"));
.map(ss -> ss.boundPort(clientListener()))
.toList();
}
String clusterId();

View File

@ -16,7 +16,6 @@
*/
package org.apache.kafka.common.test.junit;
import kafka.network.SocketServer;
import kafka.server.BrokerServer;
import kafka.server.ControllerServer;
import kafka.server.KafkaBroker;
@ -41,7 +40,6 @@ import org.junit.jupiter.api.extension.BeforeEachCallback;
import org.junit.jupiter.api.extension.Extension;
import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@ -170,13 +168,6 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
);
}
@Override
public Collection<SocketServer> controllerSocketServers() {
return controllers().values().stream()
.map(ControllerServer::socketServer)
.collect(Collectors.toList());
}
@Override
public String clusterId() {
return Stream.concat(controllers().values().stream().map(ControllerServer::clusterId),

View File

@ -18,6 +18,7 @@
package org.apache.kafka.common.test.junit;
import kafka.server.ControllerServer;
import kafka.server.KafkaBroker;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
@ -83,6 +84,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ClusterTestDefaults(types = {Type.KRAFT}, serverProperties = {
@ClusterConfigProperty(key = "default.key", value = "default.value"),
@ -188,9 +190,9 @@ public class ClusterTestExtensionsTest {
@ClusterTest(autoStart = AutoStart.NO)
public void testNoAutoStart() {
Assertions.assertThrows(RuntimeException.class, clusterInstance::anyBrokerSocketServer);
Assertions.assertThrows(RuntimeException.class, () -> clusterInstance.brokers().values().stream().map(KafkaBroker::socketServer).findFirst());
clusterInstance.start();
assertNotNull(clusterInstance.anyBrokerSocketServer());
assertTrue(clusterInstance.brokers().values().stream().map(KafkaBroker::socketServer).findFirst().isPresent());
}
@ClusterTest

View File

@ -17,6 +17,7 @@
package org.apache.kafka.tools;
import kafka.admin.ConfigCommand;
import kafka.server.KafkaBroker;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientTestUtils;
@ -444,7 +445,7 @@ public class ConfigCommandIntegrationTest {
@ClusterTest
public void testUpdateInvalidBrokerConfigs() {
updateAndCheckInvalidBrokerConfig(Optional.empty());
updateAndCheckInvalidBrokerConfig(Optional.of(cluster.anyBrokerSocketServer().config().brokerId() + ""));
updateAndCheckInvalidBrokerConfig(Optional.of(String.valueOf((cluster.brokers().entrySet().iterator().next().getKey()))));
}
private void updateAndCheckInvalidBrokerConfig(Optional<String> brokerIdOrDefault) {
@ -506,7 +507,9 @@ public class ConfigCommandIntegrationTest {
"--entity-type", "brokers",
"--entity-default"))));
kafka.utils.TestUtils.waitUntilTrue(
() -> cluster.brokerSocketServers().stream().allMatch(broker -> broker.config().getInt("log.cleaner.threads") == 2),
() -> cluster.brokers().values().stream()
.map(KafkaBroker::config)
.allMatch(config -> config.getInt("log.cleaner.threads") == 2),
() -> "Timeout waiting for topic config propagating to broker",
org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS,
100L);