KAFKA-19239 Rewrite IntegrationTestUtils by java (#19776)

This PR rewrites the IntegrationTestUtils.java from Scala to Java.

## Changes:

- Converted all the existing Scala code in IntegrationTestUtils.scala
into Java in IntegrationTestUtils.java.
- Preserved the original logic and functionality to ensure backward
compatibility.
- Updated relevant imports and dependencies accordingly.

Motivation:

The rewrite aims to standardize the codebase in Java, which aligns
better with the rest of the project and facilitates easier maintenance
by the Java-centric team.

Reviewers: TaiJuWu <tjwu1217@gmail.com>, Ken Huang <s7133700@gmail.com>,
 PoAn Yang <payang@apache.org>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Chang-Chi Hsu 2025-06-20 01:46:29 +08:00 committed by GitHub
parent dbbc45f7a0
commit 46b474a9de
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 133 additions and 178 deletions

View File

@ -18,7 +18,7 @@
package kafka.coordinator.transaction
import kafka.network.SocketServer
import kafka.server.IntegrationTestUtils
import org.apache.kafka.server.IntegrationTestUtils
import org.apache.kafka.clients.admin.{Admin, NewTopic, TransactionState}
import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, ConsumerRecords, OffsetAndMetadata}
import org.apache.kafka.clients.producer.{Producer, ProducerConfig, ProducerRecord}
@ -206,11 +206,8 @@ class ProducerIntegrationTest {
.setTransactionalId(null)
.setTransactionTimeoutMs(10)
val request = new InitProducerIdRequest.Builder(data).build()
response = IntegrationTestUtils.connectAndReceive[InitProducerIdResponse](request,
destination = broker,
listenerName = listener)
val port = broker.boundPort(listener)
response = IntegrationTestUtils.connectAndReceive[InitProducerIdResponse](request, port)
shouldRetry = response.data.errorCode == Errors.COORDINATOR_LOAD_IN_PROGRESS.code
}
assertTrue(deadline.hasTimeLeft())

View File

@ -1,106 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import kafka.network.SocketServer
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.{ApiKeys, ByteBufferAccessor}
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, RequestHeader, ResponseHeader}
import org.apache.kafka.common.utils.Utils
import java.io.{DataInputStream, DataOutputStream}
import java.net.Socket
import java.nio.ByteBuffer
import scala.reflect.ClassTag
object IntegrationTestUtils {
def sendRequest(socket: Socket, request: Array[Byte]): Unit = {
val outgoing = new DataOutputStream(socket.getOutputStream)
outgoing.writeInt(request.length)
outgoing.write(request)
outgoing.flush()
}
private def sendWithHeader(request: AbstractRequest, header: RequestHeader, socket: Socket): Unit = {
val serializedBytes = Utils.toArray(request.serializeWithHeader(header))
sendRequest(socket, serializedBytes)
}
def nextRequestHeader[T <: AbstractResponse](apiKey: ApiKeys,
apiVersion: Short,
clientId: String = "client-id",
correlationIdOpt: Option[Int] = None): RequestHeader = {
val correlationId = correlationIdOpt.getOrElse {
this.correlationId += 1
this.correlationId
}
new RequestHeader(apiKey, apiVersion, clientId, correlationId)
}
def send(request: AbstractRequest,
socket: Socket,
clientId: String = "client-id",
correlationId: Option[Int] = None): Unit = {
val header = nextRequestHeader(request.apiKey, request.version, clientId, correlationId)
sendWithHeader(request, header, socket)
}
def receive[T <: AbstractResponse](socket: Socket, apiKey: ApiKeys, version: Short)
(implicit classTag: ClassTag[T]): T = {
val incoming = new DataInputStream(socket.getInputStream)
val len = incoming.readInt()
val responseBytes = new Array[Byte](len)
incoming.readFully(responseBytes)
val responseBuffer = ByteBuffer.wrap(responseBytes)
ResponseHeader.parse(responseBuffer, apiKey.responseHeaderVersion(version))
AbstractResponse.parseResponse(apiKey, new ByteBufferAccessor(responseBuffer), version) match {
case response: T => response
case response =>
throw new ClassCastException(s"Expected response with type ${classTag.runtimeClass}, but found ${response.getClass}")
}
}
def sendAndReceive[T <: AbstractResponse](request: AbstractRequest,
socket: Socket,
clientId: String = "client-id",
correlationId: Option[Int] = None)
(implicit classTag: ClassTag[T]): T = {
send(request, socket, clientId, correlationId)
receive[T](socket, request.apiKey, request.version)
}
def connectAndReceive[T <: AbstractResponse](request: AbstractRequest,
destination: SocketServer,
listenerName: ListenerName)
(implicit classTag: ClassTag[T]): T = {
val socket = connect(destination, listenerName)
try sendAndReceive[T](request, socket)
finally socket.close()
}
private var correlationId = 0
def connect(socketServer: SocketServer,
listenerName: ListenerName): Socket = {
new Socket("localhost", socketServer.boundPort(listenerName))
}
}

View File

@ -18,8 +18,8 @@
package kafka.server
import kafka.network.SocketServer
import kafka.server.IntegrationTestUtils.connectAndReceive
import kafka.utils.TestUtils
import org.apache.kafka.server.IntegrationTestUtils.connectAndReceive
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.clients.admin._
import org.apache.kafka.common.acl.{AclBinding, AclBindingFilter}
@ -518,12 +518,10 @@ class KRaftClusterTest {
}
private def sendDescribeClusterRequestToBoundPort(destination: SocketServer,
listenerName: ListenerName): DescribeClusterResponse =
connectAndReceive[DescribeClusterResponse](
request = new DescribeClusterRequest.Builder(new DescribeClusterRequestData()).build(),
destination = destination,
listenerName = listenerName
)
listenerName: ListenerName): DescribeClusterResponse = {
connectAndReceive[DescribeClusterResponse](new DescribeClusterRequest.Builder(new DescribeClusterRequestData()).build(),
destination.boundPort(listenerName))
}
@Test
def testCreateClusterAndPerformReassignment(): Unit = {

View File

@ -25,6 +25,7 @@ import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.requests.{ApiVersionsRequest, ApiVersionsResponse, RequestUtils}
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.IntegrationTestUtils
import org.apache.kafka.server.common.{EligibleLeaderReplicasVersion, GroupVersion, MetadataVersion, ShareVersion, StreamsVersion, TransactionVersion}
import org.apache.kafka.test.TestUtils
import org.junit.jupiter.api.Assertions._
@ -41,12 +42,12 @@ abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) {
} else {
cluster.brokerSocketServers().asScala.head
}
IntegrationTestUtils.connectAndReceive[ApiVersionsResponse](request, socket, listenerName)
IntegrationTestUtils.connectAndReceive[ApiVersionsResponse](request, socket.boundPort(listenerName))
}
def sendUnsupportedApiVersionRequest(request: ApiVersionsRequest): ApiVersionsResponse = {
val overrideHeader = IntegrationTestUtils.nextRequestHeader(ApiKeys.API_VERSIONS, Short.MaxValue)
val socket = IntegrationTestUtils.connect(cluster.brokerSocketServers().asScala.head, cluster.clientListener())
val socket = IntegrationTestUtils.connect(cluster.boundPorts().get(0))
try {
val serializedBytes = Utils.toArray(
RequestUtils.serialize(overrideHeader.data, overrideHeader.headerVersion, request.data, request.version))

View File

@ -17,12 +17,13 @@
package unit.kafka.server
import kafka.network.SocketServer
import kafka.server.{BrokerServer, ControllerServer, IntegrationTestUtils}
import kafka.server.{BrokerServer, ControllerServer}
import org.apache.kafka.common.test.api.{ClusterTest, ClusterTestDefaults, Type}
import org.apache.kafka.common.message.AllocateProducerIdsRequestData
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests._
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.server.IntegrationTestUtils
import org.apache.kafka.server.common.ProducerIdsBlock
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
@ -81,9 +82,7 @@ class AllocateProducerIdsRequestTest(cluster: ClusterInstance) {
): AllocateProducerIdsResponse = {
IntegrationTestUtils.connectAndReceive[AllocateProducerIdsResponse](
request,
controllerSocketServer,
cluster.controllerListenerName
controllerSocketServer.boundPort(cluster.controllerListenerName())
)
}
}

View File

@ -28,6 +28,7 @@ import org.apache.kafka.common.internals.KafkaFutureImpl
import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent}
import org.apache.kafka.common.requests.{AlterClientQuotasRequest, AlterClientQuotasResponse, DescribeClientQuotasRequest, DescribeClientQuotasResponse}
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.server.IntegrationTestUtils
import org.apache.kafka.server.config.QuotaConfig
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Disabled
@ -556,9 +557,9 @@ class ClientQuotasRequestTest(cluster: ClusterInstance) {
private def sendDescribeClientQuotasRequest(filter: ClientQuotaFilter): DescribeClientQuotasResponse = {
val request = new DescribeClientQuotasRequest.Builder(filter).build()
IntegrationTestUtils.connectAndReceive[DescribeClientQuotasResponse](request,
destination = cluster.anyBrokerSocketServer(),
listenerName = cluster.clientListener())
IntegrationTestUtils.connectAndReceive[DescribeClientQuotasResponse](
request,
cluster.boundPorts().get(0))
}
private def alterEntityQuotas(entity: ClientQuotaEntity, alter: Map[String, Option[Double]], validateOnly: Boolean) =
@ -584,9 +585,8 @@ class ClientQuotasRequestTest(cluster: ClusterInstance) {
private def sendAlterClientQuotasRequest(entries: Iterable[ClientQuotaAlteration], validateOnly: Boolean): AlterClientQuotasResponse = {
val request = new AlterClientQuotasRequest.Builder(entries.asJavaCollection, validateOnly).build()
IntegrationTestUtils.connectAndReceive[AlterClientQuotasResponse](request,
destination = cluster.anyBrokerSocketServer(),
listenerName = cluster.clientListener())
IntegrationTestUtils.connectAndReceive[AlterClientQuotasResponse](
request,
cluster.boundPorts().get(0))
}
}

View File

@ -19,12 +19,12 @@ package kafka.server
import org.apache.kafka.common.test.api.{ClusterTest, ClusterTestDefaults, Type}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.DescribeQuorumRequest.singletonRequest
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, DescribeQuorumRequest, DescribeQuorumResponse}
import org.apache.kafka.common.requests.{DescribeQuorumRequest, DescribeQuorumResponse}
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.server.IntegrationTestUtils
import org.junit.jupiter.api.Assertions._
import scala.jdk.CollectionConverters._
import scala.reflect.ClassTag
@ClusterTestDefaults(types = Array(Type.KRAFT))
class DescribeQuorumRequestTest(cluster: ClusterInstance) {
@ -35,7 +35,7 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) {
val request = new DescribeQuorumRequest.Builder(
singletonRequest(KafkaRaftServer.MetadataPartition)
).build(version.toShort)
val response = connectAndReceive[DescribeQuorumResponse](request)
val response = IntegrationTestUtils.connectAndReceive[DescribeQuorumResponse](request, cluster.boundPorts().get(0))
assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode))
assertEquals("", response.data.errorMessage)
@ -85,17 +85,4 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) {
}
}
}
private def connectAndReceive[T <: AbstractResponse](
request: AbstractRequest
)(
implicit classTag: ClassTag[T]
): T = {
IntegrationTestUtils.connectAndReceive(
request,
cluster.brokerSocketServers().asScala.head,
cluster.clientListener()
)
}
}

View File

@ -31,6 +31,7 @@ import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.common.utils.ProducerIdAndEpoch
import org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT
import org.apache.kafka.server.IntegrationTestUtils
import org.junit.jupiter.api.Assertions.{assertEquals, fail}
import java.net.Socket
@ -39,7 +40,7 @@ import java.util.stream.Collectors
import scala.collection.Seq
import scala.collection.mutable.ListBuffer
import scala.jdk.CollectionConverters._
import scala.reflect.ClassTag
class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) {
private def brokers(): Seq[KafkaBroker] = cluster.brokers.values().stream().collect(Collectors.toList[KafkaBroker]).asScala.toSeq
@ -900,42 +901,30 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) {
}
protected def connectAny(): Socket = {
val socket: Socket = IntegrationTestUtils.connect(
cluster.anyBrokerSocketServer(),
cluster.clientListener()
)
val socket: Socket = IntegrationTestUtils.connect(cluster.boundPorts().get(0))
openSockets += socket
socket
}
protected def connect(destination: Int): Socket = {
val socket: Socket = IntegrationTestUtils.connect(
brokerSocketServer(destination),
cluster.clientListener()
)
val socket = IntegrationTestUtils.connect(brokerSocketServer(destination).boundPort(cluster.clientListener()))
openSockets += socket
socket
}
protected def connectAndReceive[T <: AbstractResponse](
request: AbstractRequest
)(implicit classTag: ClassTag[T]): T = {
IntegrationTestUtils.connectAndReceive[T](
request,
cluster.anyBrokerSocketServer(),
cluster.clientListener()
)
): T = {
IntegrationTestUtils.connectAndReceive[T](request, cluster.boundPorts().get(0))
}
protected def connectAndReceive[T <: AbstractResponse](
request: AbstractRequest,
destination: Int
)(implicit classTag: ClassTag[T]): T = {
IntegrationTestUtils.connectAndReceive[T](
request,
brokerSocketServer(destination),
cluster.clientListener()
)
): T = {
val socketServer = brokerSocketServer(destination)
val listenerName = cluster.clientListener()
IntegrationTestUtils.connectAndReceive[T](request, socketServer.boundPort(listenerName))
}
private def brokerSocketServer(brokerId: Int): SocketServer = {

View File

@ -22,11 +22,11 @@ import org.apache.kafka.common.requests.{ApiVersionsRequest, ApiVersionsResponse
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.test.api.{ClusterTest, Type}
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.server.IntegrationTestUtils
import org.junit.jupiter.api.Assertions._
import java.net.Socket
import java.util.Collections
import scala.jdk.CollectionConverters._
class SaslApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersionsRequestTest(cluster) {
@ -35,7 +35,7 @@ class SaslApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVe
controllerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT
)
def testApiVersionsRequestBeforeSaslHandshakeRequest(): Unit = {
val socket = IntegrationTestUtils.connect(cluster.brokerSocketServers().asScala.head, cluster.clientListener())
val socket = IntegrationTestUtils.connect(cluster.boundPorts().get(0))
try {
val apiVersionsResponse = IntegrationTestUtils.sendAndReceive[ApiVersionsResponse](
new ApiVersionsRequest.Builder().build(0), socket)
@ -56,7 +56,7 @@ class SaslApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVe
controllerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT
)
def testApiVersionsRequestAfterSaslHandshakeRequest(): Unit = {
val socket = IntegrationTestUtils.connect(cluster.brokerSocketServers().asScala.head, cluster.clientListener())
val socket = IntegrationTestUtils.connect(cluster.boundPorts().get(0))
try {
sendSaslHandshakeRequestValidateResponse(socket)
val response = IntegrationTestUtils.sendAndReceive[ApiVersionsResponse](
@ -72,7 +72,7 @@ class SaslApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVe
controllerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT
)
def testApiVersionsRequestWithUnsupportedVersion(): Unit = {
val socket = IntegrationTestUtils.connect(cluster.brokerSocketServers().asScala.head, cluster.clientListener())
val socket = IntegrationTestUtils.connect(cluster.boundPorts().get(0))
try {
val apiVersionsRequest = new ApiVersionsRequest.Builder().build(0)
val apiVersionsResponse = sendUnsupportedApiVersionRequest(apiVersionsRequest)

View File

@ -26,6 +26,7 @@ import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.common.requests.{FindCoordinatorRequest, FindCoordinatorResponse, ShareAcknowledgeRequest, ShareAcknowledgeResponse, ShareFetchRequest, ShareFetchResponse, ShareGroupHeartbeatRequest, ShareGroupHeartbeatResponse, ShareRequestMetadata}
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.server.common.Feature
import org.apache.kafka.server.IntegrationTestUtils
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.{AfterEach, Timeout}

View File

@ -25,9 +25,11 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{ShareGroupHeartbeatRequest, ShareGroupHeartbeatResponse}
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.server.common.Feature
import org.apache.kafka.server.IntegrationTestUtils;
import org.junit.jupiter.api.Assertions.{assertEquals, assertNotEquals, assertNotNull, assertNull, assertTrue}
import org.junit.jupiter.api.Timeout
import java.util
import scala.jdk.CollectionConverters._
@ -931,11 +933,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
}
private def connectAndReceive(request: ShareGroupHeartbeatRequest): ShareGroupHeartbeatResponse = {
IntegrationTestUtils.connectAndReceive[ShareGroupHeartbeatResponse](
request,
cluster.anyBrokerSocketServer(),
cluster.clientListener()
)
IntegrationTestUtils.connectAndReceive[ShareGroupHeartbeatResponse](request, cluster.boundPorts().get(0))
}
private def increasePartitions[B <: KafkaBroker](admin: Admin,

View File

@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.ResponseHeader;
import org.apache.kafka.common.utils.Utils;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicInteger;
public class IntegrationTestUtils {
private static final AtomicInteger CORRELATION_ID = new AtomicInteger(0);
public static void sendRequest(Socket socket, byte[] request) throws IOException {
DataOutputStream outgoing = new DataOutputStream(socket.getOutputStream());
outgoing.writeInt(request.length);
outgoing.write(request);
outgoing.flush();
}
public static RequestHeader nextRequestHeader(ApiKeys apiKey, short apiVersion) {
return new RequestHeader(apiKey, apiVersion, "client-id", CORRELATION_ID.getAndIncrement());
}
@SuppressWarnings("unchecked")
public static <T extends AbstractResponse> T receive(Socket socket, ApiKeys apiKey, short version) throws IOException, ClassCastException {
var incoming = new DataInputStream(socket.getInputStream());
int len = incoming.readInt();
var responseBytes = new byte[len];
incoming.readFully(responseBytes);
var responseBuffer = ByteBuffer.wrap(responseBytes);
ResponseHeader.parse(responseBuffer, apiKey.responseHeaderVersion(version));
return (T) AbstractResponse.parseResponse(apiKey, new ByteBufferAccessor(responseBuffer), version);
}
public static <T extends AbstractResponse> T sendAndReceive(
AbstractRequest request,
Socket socket
) throws IOException {
var header = nextRequestHeader(request.apiKey(), request.version());
sendRequest(socket, Utils.toArray(request.serializeWithHeader(header)));
return receive(socket, request.apiKey(), request.version());
}
public static <T extends AbstractResponse> T connectAndReceive(
AbstractRequest request,
int port
) throws IOException {
try (Socket socket = connect(port)) {
return sendAndReceive(request, socket);
}
}
public static Socket connect(int port) throws IOException {
return new Socket("localhost", port);
}
}

View File

@ -406,4 +406,11 @@ public interface ClusterInstance {
.orElseThrow(() -> new RuntimeException("Leader not found for tp " + topicPartition));
}
}
default List<Integer> boundPorts() {
return brokers().values().stream()
.map(KafkaBroker::socketServer)
.map(s -> s.boundPort(clientListener()))
.toList();
}
}