From f3a4a1b185a6146d1851f6d9c6a59cea9fee8830 Mon Sep 17 00:00:00 2001 From: Chirag Wadhwa Date: Wed, 7 May 2025 18:30:41 +0530 Subject: [PATCH] KAFKA-19241: Updated tests in ShareFetchAcknowledgeRequestTest to reuse the socket for subsequent requests (#19640) Currently in the tests in ShareFetchAcknowledgeRequestTest, subsequent share fetch / share acknowledge requests creates a new socket everytime, even when the requests are sent by the same member. In reality, a single share consumer clisnet will reuse the same socket for all the share related requests in its lifetime. This PR changes the behaviour in the tests to align with reality and reuse the same socket for all requests by the same share group member. Reviewers: Apoorv Mittal --- .../GroupCoordinatorBaseRequestTest.scala | 35 +-- .../ShareFetchAcknowledgeRequestTest.scala | 246 +++++++++++------- 2 files changed, 173 insertions(+), 108 deletions(-) diff --git a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala index e7926df3e36..f7675651cd6 100644 --- a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala @@ -913,6 +913,24 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) { assertEquals(expectedResponseData.results.asScala.toSet, deleteGroupsResponse.data.results.asScala.toSet) } + protected def connectAny(): Socket = { + val socket: Socket = IntegrationTestUtils.connect( + cluster.anyBrokerSocketServer(), + cluster.clientListener() + ) + openSockets += socket + socket + } + + protected def connect(destination: Int): Socket = { + val socket: Socket = IntegrationTestUtils.connect( + brokerSocketServer(destination), + cluster.clientListener() + ) + openSockets += socket + socket + } + protected def connectAndReceive[T <: AbstractResponse]( request: AbstractRequest )(implicit classTag: ClassTag[T]): T = { @@ -934,23 +952,6 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) { ) } - protected def connectAndReceiveWithoutClosingSocket[T <: AbstractResponse]( - request: AbstractRequest, - destination: Int - )(implicit classTag: ClassTag[T]): T = { - val socket = IntegrationTestUtils.connect(brokerSocketServer(destination), cluster.clientListener()) - openSockets += socket - IntegrationTestUtils.sendAndReceive[T](request, socket) - } - - protected def connectAndReceiveWithoutClosingSocket[T <: AbstractResponse]( - request: AbstractRequest - )(implicit classTag: ClassTag[T]): T = { - val socket = IntegrationTestUtils.connect(cluster.anyBrokerSocketServer(), cluster.clientListener()) - openSockets += socket - IntegrationTestUtils.sendAndReceive[T](request, socket) - } - private def brokerSocketServer(brokerId: Int): SocketServer = { getBrokers.find { broker => broker.config.brokerId == brokerId diff --git a/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala b/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala index 7aba491536f..aef8d49aa09 100644 --- a/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala @@ -28,6 +28,7 @@ import org.apache.kafka.server.common.Feature import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} import org.junit.jupiter.api.{AfterEach, Timeout} +import java.net.Socket import java.util import java.util.Collections import scala.jdk.CollectionConverters._ @@ -59,8 +60,10 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("topic1", 1)) ) + val socket: Socket = connectAny() + val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) - val shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + val shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) assertEquals(Errors.UNSUPPORTED_VERSION.code, shareFetchResponse.data.errorCode) assertEquals(0, shareFetchResponse.data.acquisitionLockTimeoutMs) @@ -75,8 +78,10 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val groupId: String = "group" val metadata: ShareRequestMetadata = new ShareRequestMetadata(Uuid.randomUuid(), ShareRequestMetadata.INITIAL_EPOCH) + val socket: Socket = connectAny() + val shareAcknowledgeRequest = createShareAcknowledgeRequest(groupId, metadata, Map.empty) - val shareAcknowledgeResponse = connectAndReceiveWithoutClosingSocket[ShareAcknowledgeResponse](shareAcknowledgeRequest) + val shareAcknowledgeResponse = IntegrationTestUtils.sendAndReceive[ShareAcknowledgeResponse](shareAcknowledgeRequest, socket) assertEquals(Errors.UNSUPPORTED_VERSION.code, shareAcknowledgeResponse.data.errorCode) } @@ -122,9 +127,11 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val send: Seq[TopicIdPartition] = Seq(topicIdPartition) + val socket: Socket = connect(nonReplicaId) + // Send the share fetch request to the non-replica and verify the error code val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) - val shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest, nonReplicaId) + val shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) assertEquals(30000, shareFetchResponse.data.acquisitionLockTimeoutMs) val partitionData = shareFetchResponse.responseData(topicNames).get(topicIdPartition) assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.code, partitionData.errorCode) @@ -163,8 +170,10 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val send: Seq[TopicIdPartition] = Seq(topicIdPartition) + val socket: Socket = connectAny() + // Send the first share fetch request to initialize the share partition - sendFirstShareFetchRequest(memberId, groupId, send) + sendFirstShareFetchRequest(memberId, groupId, send, socket) initProducer() // Producing 10 records to the topic created above @@ -174,7 +183,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val metadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)) val acknowledgementsMap: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMap) - val shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + val shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) val shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) @@ -227,8 +236,10 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val send: Seq[TopicIdPartition] = Seq(topicIdPartition1, topicIdPartition2, topicIdPartition3) + val socket: Socket = connectAny() + // Send the first share fetch request to initialize the share partitions - sendFirstShareFetchRequest(memberId, groupId, send) + sendFirstShareFetchRequest(memberId, groupId, send, socket) initProducer() // Producing 10 records to the topic partitions created above @@ -245,7 +256,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo // as the share partitions might not be initialized yet. So, we retry until we get the response. var responses = Seq[ShareFetchResponseData.PartitionData]() TestUtils.waitUntilTrue(() => { - val shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + val shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) val shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs) @@ -334,15 +345,19 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH) val acknowledgementsMap: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty + val socket1: Socket = connect(leader1) + val socket2: Socket = connect(leader2) + val socket3: Socket = connect(leader3) + // Send the first share fetch request to initialize the share partitions // Create different share fetch requests for different partitions as they may have leaders on separate brokers var shareFetchRequest1 = createShareFetchRequest(groupId, metadata, send1, Seq.empty, acknowledgementsMap) var shareFetchRequest2 = createShareFetchRequest(groupId, metadata, send2, Seq.empty, acknowledgementsMap) var shareFetchRequest3 = createShareFetchRequest(groupId, metadata, send3, Seq.empty, acknowledgementsMap) - var shareFetchResponse1 = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest1, destination = leader1) - var shareFetchResponse2 = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest2, destination = leader2) - var shareFetchResponse3 = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest3, destination = leader3) + var shareFetchResponse1 = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest1, socket1) + var shareFetchResponse2 = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest2, socket2) + var shareFetchResponse3 = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest3, socket3) initProducer() // Producing 10 records to the topic partitions created above @@ -356,9 +371,9 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo shareFetchRequest2 = createShareFetchRequest(groupId, metadata, send2, Seq.empty, acknowledgementsMap) shareFetchRequest3 = createShareFetchRequest(groupId, metadata, send3, Seq.empty, acknowledgementsMap) - shareFetchResponse1 = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest1, destination = leader1) - shareFetchResponse2 = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest2, destination = leader2) - shareFetchResponse3 = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest3, destination = leader3) + shareFetchResponse1 = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest1, socket1) + shareFetchResponse2 = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest2, socket2) + shareFetchResponse3 = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest3, socket3) val shareFetchResponseData1 = shareFetchResponse1.data() assertEquals(Errors.NONE.code, shareFetchResponseData1.errorCode) @@ -439,8 +454,10 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val send: Seq[TopicIdPartition] = Seq(topicIdPartition) + val socket: Socket = connectAny() + // Send the first share fetch request to initialize share partitions - sendFirstShareFetchRequest(memberId, groupId, send) + sendFirstShareFetchRequest(memberId, groupId, send, socket) initProducer() // Producing 10 records to the topic created above @@ -451,7 +468,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) val acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty var shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch) - var shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + var shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) var shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) @@ -478,7 +495,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo .setLastOffset(9) .setAcknowledgeTypes(Collections.singletonList(1.toByte))).asJava) // Accept the records val shareAcknowledgeRequest = createShareAcknowledgeRequest(groupId, metadata, acknowledgementsMapForAcknowledge) - val shareAcknowledgeResponse = connectAndReceiveWithoutClosingSocket[ShareAcknowledgeResponse](shareAcknowledgeRequest) + val shareAcknowledgeResponse = IntegrationTestUtils.sendAndReceive[ShareAcknowledgeResponse](shareAcknowledgeRequest, socket) val shareAcknowledgeResponseData = shareAcknowledgeResponse.data() assertEquals(Errors.NONE.code, shareAcknowledgeResponseData.errorCode) @@ -500,7 +517,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch) metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) - shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) @@ -554,8 +571,10 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val send: Seq[TopicIdPartition] = Seq(topicIdPartition) + val socket: Socket = connectAny() + // Send the first share fetch request to initialize the share partition - sendFirstShareFetchRequest(memberId, groupId, send, 15000) + sendFirstShareFetchRequest(memberId, groupId, send, socket, 15000) initProducer() // Producing 10 records to the topic created above @@ -566,7 +585,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) var acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty var shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch) - var shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + var shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) var shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) @@ -595,7 +614,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo .setLastOffset(9) .setAcknowledgeTypes(Collections.singletonList(1.toByte))).asJava) // Accept the records shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch) - shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) @@ -620,7 +639,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch) metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) - shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) @@ -672,8 +691,10 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val send: Seq[TopicIdPartition] = Seq(topicIdPartition) + val socket: Socket = connectAny() + // Send the first share fetch request to initialize the share partiion - sendFirstShareFetchRequest(memberId, groupId, send) + sendFirstShareFetchRequest(memberId, groupId, send, socket) initProducer() // Producing 10 records to the topic created above @@ -684,7 +705,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) val acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty var shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch) - var shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + var shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) var shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) @@ -711,7 +732,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo .setLastOffset(9) .setAcknowledgeTypes(Collections.singletonList(2.toByte))).asJava) // Release the records val shareAcknowledgeRequest = createShareAcknowledgeRequest(groupId, metadata, acknowledgementsMapForAcknowledge) - val shareAcknowledgeResponse = connectAndReceiveWithoutClosingSocket[ShareAcknowledgeResponse](shareAcknowledgeRequest) + val shareAcknowledgeResponse = IntegrationTestUtils.sendAndReceive[ShareAcknowledgeResponse](shareAcknowledgeRequest, socket) val shareAcknowledgeResponseData = shareAcknowledgeResponse.data() assertEquals(Errors.NONE.code, shareAcknowledgeResponseData.errorCode) @@ -730,7 +751,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch) metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) - shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) @@ -782,8 +803,10 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val send: Seq[TopicIdPartition] = Seq(topicIdPartition) + val socket: Socket = connectAny() + // Send the first share fetch request to initialize the share partition - sendFirstShareFetchRequest(memberId, groupId, send) + sendFirstShareFetchRequest(memberId, groupId, send, socket) initProducer() // Producing 10 records to the topic created above @@ -794,7 +817,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) var acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty var shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch) - var shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + var shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) var shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) @@ -839,7 +862,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo releaseAcknowledgementSent = true } shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch) - shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) @@ -896,8 +919,10 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val send: Seq[TopicIdPartition] = Seq(topicIdPartition) + val socket: Socket = connectAny() + // Send the first share fetch request to initialize the share partition - sendFirstShareFetchRequest(memberId, groupId, send) + sendFirstShareFetchRequest(memberId, groupId, send, socket) initProducer() // Producing 10 records to the topic created above @@ -908,7 +933,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) val acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty var shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch) - var shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + var shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) var shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) @@ -935,7 +960,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo .setLastOffset(9) .setAcknowledgeTypes(Collections.singletonList(3.toByte))).asJava) // Reject the records val shareAcknowledgeRequest = createShareAcknowledgeRequest(groupId, metadata, acknowledgementsMapForAcknowledge) - val shareAcknowledgeResponse = connectAndReceiveWithoutClosingSocket[ShareAcknowledgeResponse](shareAcknowledgeRequest) + val shareAcknowledgeResponse = IntegrationTestUtils.sendAndReceive[ShareAcknowledgeResponse](shareAcknowledgeRequest, socket) val shareAcknowledgeResponseData = shareAcknowledgeResponse.data() assertEquals(Errors.NONE.code, shareAcknowledgeResponseData.errorCode) @@ -957,7 +982,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch) metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) - shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) @@ -1009,8 +1034,10 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val send: Seq[TopicIdPartition] = Seq(topicIdPartition) + val socket: Socket = connectAny() + // Send the first share fetch request to initialize the share partition - sendFirstShareFetchRequest(memberId, groupId, send) + sendFirstShareFetchRequest(memberId, groupId, send, socket) initProducer() // Producing 10 records to the topic created above @@ -1021,7 +1048,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) var acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty var shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch) - var shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + var shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) var shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) @@ -1050,7 +1077,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo .setLastOffset(9) .setAcknowledgeTypes(Collections.singletonList(3.toByte))).asJava) // Reject the records shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch) - shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) @@ -1075,7 +1102,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch) metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) - shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) @@ -1129,8 +1156,10 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val send: Seq[TopicIdPartition] = Seq(topicIdPartition) + val socket: Socket = connectAny() + // Send the first share fetch request to initialize the shar partition - sendFirstShareFetchRequest(memberId, groupId, send) + sendFirstShareFetchRequest(memberId, groupId, send, socket) initProducer() // Producing 10 records to the topic created above @@ -1141,7 +1170,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) val acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty var shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch) - var shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + var shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) var shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) @@ -1168,7 +1197,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo .setLastOffset(9) .setAcknowledgeTypes(Collections.singletonList(2.toByte))).asJava) // Release the records var shareAcknowledgeRequest = createShareAcknowledgeRequest(groupId, metadata, acknowledgementsMapForAcknowledge) - var shareAcknowledgeResponse = connectAndReceiveWithoutClosingSocket[ShareAcknowledgeResponse](shareAcknowledgeRequest) + var shareAcknowledgeResponse = IntegrationTestUtils.sendAndReceive[ShareAcknowledgeResponse](shareAcknowledgeRequest, socket) var shareAcknowledgeResponseData = shareAcknowledgeResponse.data() assertEquals(Errors.NONE.code, shareAcknowledgeResponseData.errorCode) @@ -1187,7 +1216,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch) metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) - shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) @@ -1213,7 +1242,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo .setLastOffset(9) .setAcknowledgeTypes(Collections.singletonList(2.toByte))).asJava) // Release the records again shareAcknowledgeRequest = createShareAcknowledgeRequest(groupId, metadata, acknowledgementsMapForAcknowledge) - shareAcknowledgeResponse = connectAndReceiveWithoutClosingSocket[ShareAcknowledgeResponse](shareAcknowledgeRequest) + shareAcknowledgeResponse = IntegrationTestUtils.sendAndReceive[ShareAcknowledgeResponse](shareAcknowledgeRequest, socket) shareAcknowledgeResponseData = shareAcknowledgeResponse.data() assertEquals(Errors.NONE.code, shareAcknowledgeResponseData.errorCode) @@ -1235,7 +1264,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch) metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) - shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) @@ -1276,7 +1305,6 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo def testShareFetchRequestSuccessfulSharingBetweenMultipleConsumers(): Unit = { val groupId: String = "group" - val memberId = Uuid.randomUuid() val memberId1 = Uuid.randomUuid() val memberId2 = Uuid.randomUuid() val memberId3 = Uuid.randomUuid() @@ -1291,8 +1319,12 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val send: Seq[TopicIdPartition] = Seq(topicIdPartition) + val socket1: Socket = connectAny() + val socket2: Socket = connectAny() + val socket3: Socket = connectAny() + // Sending a dummy share fetch request to initialize the share partition - sendFirstShareFetchRequest(memberId, groupId, send) + sendFirstShareFetchRequest(memberId1, groupId, send, socket1) initProducer() // Producing 10000 records to the topic created above @@ -1312,9 +1344,9 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val acknowledgementsMap3: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty val shareFetchRequest3 = createShareFetchRequest(groupId, metadata3, send, Seq.empty, acknowledgementsMap3, minBytes = 100, maxBytes = 1500) - val shareFetchResponse1 = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest1) - val shareFetchResponse2 = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest2) - val shareFetchResponse3 = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest3) + val shareFetchResponse1 = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest1, socket1) + val shareFetchResponse2 = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest2, socket2) + val shareFetchResponse3 = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest3, socket3) val shareFetchResponseData1 = shareFetchResponse1.data() @@ -1384,10 +1416,14 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val send: Seq[TopicIdPartition] = Seq(topicIdPartition) + val socket1: Socket = connectAny() + val socket2: Socket = connectAny() + val socket3: Socket = connectAny() + // Sending 3 dummy share Fetch Requests with to inititlaize the share partitions for each share group\ - sendFirstShareFetchRequest(memberId1, groupId1, send) - sendFirstShareFetchRequest(memberId2, groupId2, send) - sendFirstShareFetchRequest(memberId3, groupId3, send) + sendFirstShareFetchRequest(memberId1, groupId1, send, socket1) + sendFirstShareFetchRequest(memberId2, groupId2, send, socket2) + sendFirstShareFetchRequest(memberId3, groupId3, send, socket3) initProducer() // Producing 10 records to the topic created above @@ -1407,9 +1443,9 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val acknowledgementsMap3: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty val shareFetchRequest3 = createShareFetchRequest(groupId3, metadata3, send, Seq.empty, acknowledgementsMap3) - val shareFetchResponse1 = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest1) - val shareFetchResponse2 = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest2) - val shareFetchResponse3 = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest3) + val shareFetchResponse1 = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest1, socket1) + val shareFetchResponse2 = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest2, socket2) + val shareFetchResponse3 = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest3, socket3) val shareFetchResponseData1 = shareFetchResponse1.data() @@ -1475,8 +1511,10 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val send: Seq[TopicIdPartition] = Seq(topicIdPartition) + val socket: Socket = connectAny() + // Send the first share fetch request to initialize the share partition - sendFirstShareFetchRequest(memberId, groupId, send) + sendFirstShareFetchRequest(memberId, groupId, send, socket) initProducer() // Producing 10 records to the topic created above @@ -1487,7 +1525,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) var acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty var shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch) - var shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + var shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) var shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) @@ -1516,7 +1554,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo .setLastOffset(9) .setAcknowledgeTypes(Collections.singletonList(1.toByte))).asJava) // Accept the records shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch) - shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) @@ -1542,7 +1580,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo .setLastOffset(19) .setAcknowledgeTypes(Collections.singletonList(1.toByte))).asJava) // Accept the records shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) - shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) @@ -1583,8 +1621,10 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val send: Seq[TopicIdPartition] = Seq(topicIdPartition) + val socket: Socket = connectAny() + // Send the first share fetch request to initialize the share partition - sendFirstShareFetchRequest(memberId, groupId, send) + sendFirstShareFetchRequest(memberId, groupId, send, socket) initProducer() // Producing 10 records to the topic created above @@ -1595,7 +1635,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) var acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty var shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch) - var shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + var shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) var shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) @@ -1624,7 +1664,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo .setLastOffset(9) .setAcknowledgeTypes(Collections.singletonList(1.toByte))).asJava) // Accept the records shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch) - shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) @@ -1651,7 +1691,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo .setLastOffset(19) .setAcknowledgeTypes(Collections.singletonList(1.toByte))).asJava) // Accept the records val shareAcknowledgeRequest = createShareAcknowledgeRequest(groupId, metadata, acknowledgementsMapForAcknowledge) - val shareAcknowledgeResponse = connectAndReceiveWithoutClosingSocket[ShareAcknowledgeResponse](shareAcknowledgeRequest) + val shareAcknowledgeResponse = IntegrationTestUtils.sendAndReceive[ShareAcknowledgeResponse](shareAcknowledgeRequest, socket) val shareAcknowledgeResponseData = shareAcknowledgeResponse.data() assertEquals(Errors.NONE.code, shareAcknowledgeResponseData.errorCode) @@ -1704,6 +1744,8 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val send: Seq[TopicIdPartition] = Seq(topicIdPartition) + val socket: Socket = connectAny() + val metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH) val acknowledgementsMap: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map(topicIdPartition -> List(new ShareFetchRequestData.AcknowledgementBatch() @@ -1711,7 +1753,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo .setLastOffset(9) .setAcknowledgeTypes(Collections.singletonList(1.toByte))).asJava) // Acknowledgements in the Initial Fetch Request val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMap) - val shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + val shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) val shareFetchResponseData = shareFetchResponse.data() // The response will have a top level error code because this is an Initial Fetch request with acknowledgement data present @@ -1750,6 +1792,8 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val topicId = topicIds.get(topic) val topicIdPartition = new TopicIdPartition(topicId, new TopicPartition(topic, partition)) + val socket: Socket = connectAny() + // Send the share fetch request to fetch the records produced above val metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH) val acknowledgementsMap: Map[TopicIdPartition, util.List[ShareAcknowledgeRequestData.AcknowledgementBatch]] = @@ -1759,7 +1803,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo .setLastOffset(9) setAcknowledgeTypes(Collections.singletonList(1.toByte))).asJava) val shareAcknowledgeRequest = createShareAcknowledgeRequest(groupId, metadata, acknowledgementsMap) - val shareAcknowledgeResponse = connectAndReceiveWithoutClosingSocket[ShareAcknowledgeResponse](shareAcknowledgeRequest) + val shareAcknowledgeResponse = IntegrationTestUtils.sendAndReceive[ShareAcknowledgeResponse](shareAcknowledgeRequest, socket) val shareAcknowledgeResponseData = shareAcknowledgeResponse.data() assertEquals(Errors.INVALID_SHARE_SESSION_EPOCH.code, shareAcknowledgeResponseData.errorCode) @@ -1798,8 +1842,10 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val send: Seq[TopicIdPartition] = Seq(topicIdPartition) + val socket: Socket = connectAny() + // Send the first share fetch request to initialize the share partition - sendFirstShareFetchRequest(memberId, groupId, send) + sendFirstShareFetchRequest(memberId, groupId, send, socket) initProducer() // Producing 10 records to the topic created above @@ -1809,7 +1855,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH) var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) var shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) - var shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + var shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) var shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) @@ -1831,7 +1877,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.nextEpoch(shareSessionEpoch)) metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) - shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.INVALID_SHARE_SESSION_EPOCH.code, shareFetchResponseData.errorCode) @@ -1870,8 +1916,10 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val send: Seq[TopicIdPartition] = Seq(topicIdPartition) + val socket: Socket = connectAny() + // Send the first share fetch request to initialize the share partition - sendFirstShareFetchRequest(memberId, groupId, send) + sendFirstShareFetchRequest(memberId, groupId, send, socket) initProducer() // Producing 10 records to the topic created above @@ -1881,7 +1929,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH) var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) - val shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + val shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) val shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) @@ -1908,7 +1956,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo .setLastOffset(9) .setAcknowledgeTypes(Collections.singletonList(1.toByte))).asJava) val shareAcknowledgeRequest = createShareAcknowledgeRequest(groupId, metadata, acknowledgementsMap) - val shareAcknowledgeResponse = connectAndReceiveWithoutClosingSocket[ShareAcknowledgeResponse](shareAcknowledgeRequest) + val shareAcknowledgeResponse = IntegrationTestUtils.sendAndReceive[ShareAcknowledgeResponse](shareAcknowledgeRequest, socket) val shareAcknowledgeResponseData = shareAcknowledgeResponse.data() assertEquals(Errors.INVALID_SHARE_SESSION_EPOCH.code, shareAcknowledgeResponseData.errorCode) @@ -1948,8 +1996,10 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val send: Seq[TopicIdPartition] = Seq(topicIdPartition) + val socket: Socket = connectAny() + // Send the first share fetch request to initialize the share partition - sendFirstShareFetchRequest(memberId, groupId, send) + sendFirstShareFetchRequest(memberId, groupId, send, socket) initProducer() // Producing 10 records to the topic created above @@ -1959,7 +2009,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH) var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) var shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) - var shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + var shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) var shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) @@ -1981,7 +2031,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch) metadata = new ShareRequestMetadata(wrongMemberId, shareSessionEpoch) shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) - shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.SHARE_SESSION_NOT_FOUND.code, shareFetchResponseData.errorCode) @@ -2026,11 +2076,15 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val send: Seq[TopicIdPartition] = Seq(topicIdPartition) + val socket1: Socket = connectAny() + val socket2: Socket = connectAny() + val socket3: Socket = connectAny() + // member1 sends share fetch request to register it's share session. Note it does not close the socket connection after. TestUtils.waitUntilTrue(() => { val metadata = new ShareRequestMetadata(memberId1, ShareRequestMetadata.INITIAL_EPOCH) val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) - val shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + val shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket1) val shareFetchResponseData = shareFetchResponse.data() shareFetchResponseData.errorCode == Errors.NONE.code }, "Share fetch request failed", 5000) @@ -2039,7 +2093,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo TestUtils.waitUntilTrue(() => { val metadata = new ShareRequestMetadata(memberId2, ShareRequestMetadata.INITIAL_EPOCH) val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) - val shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + val shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket2) val shareFetchResponseData = shareFetchResponse.data() shareFetchResponseData.errorCode == Errors.NONE.code }, "Share fetch request failed", 5000) @@ -2050,20 +2104,22 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo TestUtils.waitUntilTrue(() => { val metadata = new ShareRequestMetadata(memberId3, ShareRequestMetadata.INITIAL_EPOCH) val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) - val shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + val shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket3) val shareFetchResponseData = shareFetchResponse.data() shareFetchResponseData.errorCode == Errors.SHARE_SESSION_NOT_FOUND.code }, "Share fetch request failed", 5000) - // Now we will close the socket connections for the above three members, mimicking a client disconnection + // Now we will close the socket connections for the members, mimicking a client disconnection closeSockets() - // Since the socket connections were closed before, the corresponding share sessions were dropped from the ShareSessionCache - // on the broker. Now, since the cache is empty, new share sessions can be registered + val socket4: Socket = connectAny() + + // Since one of the socket connections was closed before, the corresponding share session was dropped from the ShareSessionCache + // on the broker. Now, since the cache is not full, new share sessions can be registered TestUtils.waitUntilTrue(() => { val metadata = new ShareRequestMetadata(memberId3, ShareRequestMetadata.INITIAL_EPOCH) val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) - val shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + val shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket4) val shareFetchResponseData = shareFetchResponse.data() shareFetchResponseData.errorCode == Errors.NONE.code }, "Share fetch request failed", 5000) @@ -2103,8 +2159,10 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val send: Seq[TopicIdPartition] = Seq(topicIdPartition) + val socket: Socket = connectAny() + // Send the first share fetch request to initialize the share partition - sendFirstShareFetchRequest(memberId, groupId, send) + sendFirstShareFetchRequest(memberId, groupId, send, socket) initProducer() // Producing 10 records to the topic created above @@ -2114,7 +2172,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH) var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) - val shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + val shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) val shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) @@ -2141,7 +2199,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo .setLastOffset(9) .setAcknowledgeTypes(Collections.singletonList(1.toByte))).asJava) val shareAcknowledgeRequest = createShareAcknowledgeRequest(groupId, metadata, acknowledgementsMap) - val shareAcknowledgeResponse = connectAndReceiveWithoutClosingSocket[ShareAcknowledgeResponse](shareAcknowledgeRequest) + val shareAcknowledgeResponse = IntegrationTestUtils.sendAndReceive[ShareAcknowledgeResponse](shareAcknowledgeRequest, socket) val shareAcknowledgeResponseData = shareAcknowledgeResponse.data() assertEquals(Errors.SHARE_SESSION_NOT_FOUND.code, shareAcknowledgeResponseData.errorCode) @@ -2182,8 +2240,10 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val send: Seq[TopicIdPartition] = Seq(topicIdPartition1, topicIdPartition2) + val socket: Socket = connectAny() + // Send the first share fetch request to initialize the share partition - sendFirstShareFetchRequest(memberId, groupId, send) + sendFirstShareFetchRequest(memberId, groupId, send, socket) initProducer() // Producing 10 records to the topic partitions created above @@ -2200,7 +2260,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo // as the share partitions might not be initialized yet. So, we retry until we get the response. var responses = Seq[ShareFetchResponseData.PartitionData]() TestUtils.waitUntilTrue(() => { - val shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + val shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) val shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs) @@ -2226,7 +2286,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) val forget: Seq[TopicIdPartition] = Seq(topicIdPartition1) shareFetchRequest = createShareFetchRequest(groupId, metadata, Seq.empty, forget, acknowledgementsMap) - val shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + val shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) val shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) @@ -2277,8 +2337,10 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val send: Seq[TopicIdPartition] = Seq(topicIdPartition) + val socket: Socket = connectAny() + // Send the first share fetch request to initialize the share partition - sendFirstShareFetchRequest(memberId, groupId, send) + sendFirstShareFetchRequest(memberId, groupId, send, socket) initProducer() // Producing 10 records to the topic created above @@ -2288,7 +2350,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val metadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)) val acknowledgementsMap: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMap, maxRecords = 1, batchSize = 1) - val shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest) + val shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) val shareFetchResponseData = shareFetchResponse.data assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) @@ -2339,8 +2401,10 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val send: Seq[TopicIdPartition] = Seq(topicIdPartition) + val socket: Socket = connectAny() + // Send the first share fetch request to initialize the share partition - sendFirstShareFetchRequest(memberId, groupId, send) + sendFirstShareFetchRequest(memberId, groupId, send, socket) initProducer() // Producing 10 records to the topic created above @@ -2350,7 +2414,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val metadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)) val acknowledgementsMap: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMap, maxRecords = 5, batchSize = 1) - val shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest) + val shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) val shareFetchResponseData = shareFetchResponse.data assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) @@ -2371,12 +2435,12 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo // For initial fetch request, the response may not be available in the first attempt when the share // partition is not initialized yet. Hence, wait for response from all partitions before proceeding. - private def sendFirstShareFetchRequest(memberId: Uuid, groupId: String, topicIdPartitions: Seq[TopicIdPartition], lockTimeout: Int = 30000): Unit = { + private def sendFirstShareFetchRequest(memberId: Uuid, groupId: String, topicIdPartitions: Seq[TopicIdPartition], socket: Socket, lockTimeout: Int = 30000): Unit = { val partitions: util.Set[Integer] = new util.HashSet() TestUtils.waitUntilTrue(() => { val metadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH) val shareFetchRequest = createShareFetchRequest(groupId, metadata, topicIdPartitions, Seq.empty, Map.empty) - val shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + val shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) val shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)