KAFKA-19241: Updated tests in ShareFetchAcknowledgeRequestTest to reuse the socket for subsequent requests (#19640)

Currently in the tests in ShareFetchAcknowledgeRequestTest, subsequent
share fetch / share acknowledge requests creates a new socket everytime,
even when the requests are sent by the same member. In reality, a single
share consumer clisnet will reuse the same socket for all the share
related requests in its lifetime. This PR changes the behaviour in the
tests to align with reality and reuse the same socket for all requests
by the same share group member.

Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>
This commit is contained in:
Chirag Wadhwa 2025-05-07 18:30:41 +05:30 committed by GitHub
parent d034268312
commit f3a4a1b185
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 173 additions and 108 deletions

View File

@ -913,6 +913,24 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) {
assertEquals(expectedResponseData.results.asScala.toSet, deleteGroupsResponse.data.results.asScala.toSet) assertEquals(expectedResponseData.results.asScala.toSet, deleteGroupsResponse.data.results.asScala.toSet)
} }
protected def connectAny(): Socket = {
val socket: Socket = IntegrationTestUtils.connect(
cluster.anyBrokerSocketServer(),
cluster.clientListener()
)
openSockets += socket
socket
}
protected def connect(destination: Int): Socket = {
val socket: Socket = IntegrationTestUtils.connect(
brokerSocketServer(destination),
cluster.clientListener()
)
openSockets += socket
socket
}
protected def connectAndReceive[T <: AbstractResponse]( protected def connectAndReceive[T <: AbstractResponse](
request: AbstractRequest request: AbstractRequest
)(implicit classTag: ClassTag[T]): T = { )(implicit classTag: ClassTag[T]): T = {
@ -934,23 +952,6 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) {
) )
} }
protected def connectAndReceiveWithoutClosingSocket[T <: AbstractResponse](
request: AbstractRequest,
destination: Int
)(implicit classTag: ClassTag[T]): T = {
val socket = IntegrationTestUtils.connect(brokerSocketServer(destination), cluster.clientListener())
openSockets += socket
IntegrationTestUtils.sendAndReceive[T](request, socket)
}
protected def connectAndReceiveWithoutClosingSocket[T <: AbstractResponse](
request: AbstractRequest
)(implicit classTag: ClassTag[T]): T = {
val socket = IntegrationTestUtils.connect(cluster.anyBrokerSocketServer(), cluster.clientListener())
openSockets += socket
IntegrationTestUtils.sendAndReceive[T](request, socket)
}
private def brokerSocketServer(brokerId: Int): SocketServer = { private def brokerSocketServer(brokerId: Int): SocketServer = {
getBrokers.find { broker => getBrokers.find { broker =>
broker.config.brokerId == brokerId broker.config.brokerId == brokerId

View File

@ -28,6 +28,7 @@ import org.apache.kafka.server.common.Feature
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.{AfterEach, Timeout} import org.junit.jupiter.api.{AfterEach, Timeout}
import java.net.Socket
import java.util import java.util
import java.util.Collections import java.util.Collections
import scala.jdk.CollectionConverters._ import scala.jdk.CollectionConverters._
@ -59,8 +60,10 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("topic1", 1)) new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("topic1", 1))
) )
val socket: Socket = connectAny()
val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty)
val shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) val shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket)
assertEquals(Errors.UNSUPPORTED_VERSION.code, shareFetchResponse.data.errorCode) assertEquals(Errors.UNSUPPORTED_VERSION.code, shareFetchResponse.data.errorCode)
assertEquals(0, shareFetchResponse.data.acquisitionLockTimeoutMs) assertEquals(0, shareFetchResponse.data.acquisitionLockTimeoutMs)
@ -75,8 +78,10 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
val groupId: String = "group" val groupId: String = "group"
val metadata: ShareRequestMetadata = new ShareRequestMetadata(Uuid.randomUuid(), ShareRequestMetadata.INITIAL_EPOCH) val metadata: ShareRequestMetadata = new ShareRequestMetadata(Uuid.randomUuid(), ShareRequestMetadata.INITIAL_EPOCH)
val socket: Socket = connectAny()
val shareAcknowledgeRequest = createShareAcknowledgeRequest(groupId, metadata, Map.empty) val shareAcknowledgeRequest = createShareAcknowledgeRequest(groupId, metadata, Map.empty)
val shareAcknowledgeResponse = connectAndReceiveWithoutClosingSocket[ShareAcknowledgeResponse](shareAcknowledgeRequest) val shareAcknowledgeResponse = IntegrationTestUtils.sendAndReceive[ShareAcknowledgeResponse](shareAcknowledgeRequest, socket)
assertEquals(Errors.UNSUPPORTED_VERSION.code, shareAcknowledgeResponse.data.errorCode) assertEquals(Errors.UNSUPPORTED_VERSION.code, shareAcknowledgeResponse.data.errorCode)
} }
@ -122,9 +127,11 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
val send: Seq[TopicIdPartition] = Seq(topicIdPartition) val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
val socket: Socket = connect(nonReplicaId)
// Send the share fetch request to the non-replica and verify the error code // Send the share fetch request to the non-replica and verify the error code
val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty)
val shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest, nonReplicaId) val shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket)
assertEquals(30000, shareFetchResponse.data.acquisitionLockTimeoutMs) assertEquals(30000, shareFetchResponse.data.acquisitionLockTimeoutMs)
val partitionData = shareFetchResponse.responseData(topicNames).get(topicIdPartition) val partitionData = shareFetchResponse.responseData(topicNames).get(topicIdPartition)
assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.code, partitionData.errorCode) assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.code, partitionData.errorCode)
@ -163,8 +170,10 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
val send: Seq[TopicIdPartition] = Seq(topicIdPartition) val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
val socket: Socket = connectAny()
// Send the first share fetch request to initialize the share partition // Send the first share fetch request to initialize the share partition
sendFirstShareFetchRequest(memberId, groupId, send) sendFirstShareFetchRequest(memberId, groupId, send, socket)
initProducer() initProducer()
// Producing 10 records to the topic created above // Producing 10 records to the topic created above
@ -174,7 +183,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
val metadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)) val metadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))
val acknowledgementsMap: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty val acknowledgementsMap: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMap) val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMap)
val shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) val shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket)
val shareFetchResponseData = shareFetchResponse.data() val shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@ -227,8 +236,10 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
val send: Seq[TopicIdPartition] = Seq(topicIdPartition1, topicIdPartition2, topicIdPartition3) val send: Seq[TopicIdPartition] = Seq(topicIdPartition1, topicIdPartition2, topicIdPartition3)
val socket: Socket = connectAny()
// Send the first share fetch request to initialize the share partitions // Send the first share fetch request to initialize the share partitions
sendFirstShareFetchRequest(memberId, groupId, send) sendFirstShareFetchRequest(memberId, groupId, send, socket)
initProducer() initProducer()
// Producing 10 records to the topic partitions created above // Producing 10 records to the topic partitions created above
@ -245,7 +256,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
// as the share partitions might not be initialized yet. So, we retry until we get the response. // as the share partitions might not be initialized yet. So, we retry until we get the response.
var responses = Seq[ShareFetchResponseData.PartitionData]() var responses = Seq[ShareFetchResponseData.PartitionData]()
TestUtils.waitUntilTrue(() => { TestUtils.waitUntilTrue(() => {
val shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) val shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket)
val shareFetchResponseData = shareFetchResponse.data() val shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs) assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
@ -334,15 +345,19 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
val metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH) val metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH)
val acknowledgementsMap: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty val acknowledgementsMap: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
val socket1: Socket = connect(leader1)
val socket2: Socket = connect(leader2)
val socket3: Socket = connect(leader3)
// Send the first share fetch request to initialize the share partitions // Send the first share fetch request to initialize the share partitions
// Create different share fetch requests for different partitions as they may have leaders on separate brokers // Create different share fetch requests for different partitions as they may have leaders on separate brokers
var shareFetchRequest1 = createShareFetchRequest(groupId, metadata, send1, Seq.empty, acknowledgementsMap) var shareFetchRequest1 = createShareFetchRequest(groupId, metadata, send1, Seq.empty, acknowledgementsMap)
var shareFetchRequest2 = createShareFetchRequest(groupId, metadata, send2, Seq.empty, acknowledgementsMap) var shareFetchRequest2 = createShareFetchRequest(groupId, metadata, send2, Seq.empty, acknowledgementsMap)
var shareFetchRequest3 = createShareFetchRequest(groupId, metadata, send3, Seq.empty, acknowledgementsMap) var shareFetchRequest3 = createShareFetchRequest(groupId, metadata, send3, Seq.empty, acknowledgementsMap)
var shareFetchResponse1 = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest1, destination = leader1) var shareFetchResponse1 = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest1, socket1)
var shareFetchResponse2 = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest2, destination = leader2) var shareFetchResponse2 = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest2, socket2)
var shareFetchResponse3 = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest3, destination = leader3) var shareFetchResponse3 = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest3, socket3)
initProducer() initProducer()
// Producing 10 records to the topic partitions created above // Producing 10 records to the topic partitions created above
@ -356,9 +371,9 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
shareFetchRequest2 = createShareFetchRequest(groupId, metadata, send2, Seq.empty, acknowledgementsMap) shareFetchRequest2 = createShareFetchRequest(groupId, metadata, send2, Seq.empty, acknowledgementsMap)
shareFetchRequest3 = createShareFetchRequest(groupId, metadata, send3, Seq.empty, acknowledgementsMap) shareFetchRequest3 = createShareFetchRequest(groupId, metadata, send3, Seq.empty, acknowledgementsMap)
shareFetchResponse1 = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest1, destination = leader1) shareFetchResponse1 = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest1, socket1)
shareFetchResponse2 = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest2, destination = leader2) shareFetchResponse2 = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest2, socket2)
shareFetchResponse3 = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest3, destination = leader3) shareFetchResponse3 = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest3, socket3)
val shareFetchResponseData1 = shareFetchResponse1.data() val shareFetchResponseData1 = shareFetchResponse1.data()
assertEquals(Errors.NONE.code, shareFetchResponseData1.errorCode) assertEquals(Errors.NONE.code, shareFetchResponseData1.errorCode)
@ -439,8 +454,10 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
val send: Seq[TopicIdPartition] = Seq(topicIdPartition) val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
val socket: Socket = connectAny()
// Send the first share fetch request to initialize share partitions // Send the first share fetch request to initialize share partitions
sendFirstShareFetchRequest(memberId, groupId, send) sendFirstShareFetchRequest(memberId, groupId, send, socket)
initProducer() initProducer()
// Producing 10 records to the topic created above // Producing 10 records to the topic created above
@ -451,7 +468,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
val acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty val acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
var shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch) var shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch)
var shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) var shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket)
var shareFetchResponseData = shareFetchResponse.data() var shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@ -478,7 +495,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
.setLastOffset(9) .setLastOffset(9)
.setAcknowledgeTypes(Collections.singletonList(1.toByte))).asJava) // Accept the records .setAcknowledgeTypes(Collections.singletonList(1.toByte))).asJava) // Accept the records
val shareAcknowledgeRequest = createShareAcknowledgeRequest(groupId, metadata, acknowledgementsMapForAcknowledge) val shareAcknowledgeRequest = createShareAcknowledgeRequest(groupId, metadata, acknowledgementsMapForAcknowledge)
val shareAcknowledgeResponse = connectAndReceiveWithoutClosingSocket[ShareAcknowledgeResponse](shareAcknowledgeRequest) val shareAcknowledgeResponse = IntegrationTestUtils.sendAndReceive[ShareAcknowledgeResponse](shareAcknowledgeRequest, socket)
val shareAcknowledgeResponseData = shareAcknowledgeResponse.data() val shareAcknowledgeResponseData = shareAcknowledgeResponse.data()
assertEquals(Errors.NONE.code, shareAcknowledgeResponseData.errorCode) assertEquals(Errors.NONE.code, shareAcknowledgeResponseData.errorCode)
@ -500,7 +517,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch) shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty)
shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket)
shareFetchResponseData = shareFetchResponse.data() shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@ -554,8 +571,10 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
val send: Seq[TopicIdPartition] = Seq(topicIdPartition) val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
val socket: Socket = connectAny()
// Send the first share fetch request to initialize the share partition // Send the first share fetch request to initialize the share partition
sendFirstShareFetchRequest(memberId, groupId, send, 15000) sendFirstShareFetchRequest(memberId, groupId, send, socket, 15000)
initProducer() initProducer()
// Producing 10 records to the topic created above // Producing 10 records to the topic created above
@ -566,7 +585,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
var acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty var acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
var shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch) var shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch)
var shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) var shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket)
var shareFetchResponseData = shareFetchResponse.data() var shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@ -595,7 +614,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
.setLastOffset(9) .setLastOffset(9)
.setAcknowledgeTypes(Collections.singletonList(1.toByte))).asJava) // Accept the records .setAcknowledgeTypes(Collections.singletonList(1.toByte))).asJava) // Accept the records
shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch) shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch)
shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket)
shareFetchResponseData = shareFetchResponse.data() shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@ -620,7 +639,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch) shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty)
shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket)
shareFetchResponseData = shareFetchResponse.data() shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@ -672,8 +691,10 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
val send: Seq[TopicIdPartition] = Seq(topicIdPartition) val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
val socket: Socket = connectAny()
// Send the first share fetch request to initialize the share partiion // Send the first share fetch request to initialize the share partiion
sendFirstShareFetchRequest(memberId, groupId, send) sendFirstShareFetchRequest(memberId, groupId, send, socket)
initProducer() initProducer()
// Producing 10 records to the topic created above // Producing 10 records to the topic created above
@ -684,7 +705,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
val acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty val acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
var shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch) var shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch)
var shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) var shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket)
var shareFetchResponseData = shareFetchResponse.data() var shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@ -711,7 +732,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
.setLastOffset(9) .setLastOffset(9)
.setAcknowledgeTypes(Collections.singletonList(2.toByte))).asJava) // Release the records .setAcknowledgeTypes(Collections.singletonList(2.toByte))).asJava) // Release the records
val shareAcknowledgeRequest = createShareAcknowledgeRequest(groupId, metadata, acknowledgementsMapForAcknowledge) val shareAcknowledgeRequest = createShareAcknowledgeRequest(groupId, metadata, acknowledgementsMapForAcknowledge)
val shareAcknowledgeResponse = connectAndReceiveWithoutClosingSocket[ShareAcknowledgeResponse](shareAcknowledgeRequest) val shareAcknowledgeResponse = IntegrationTestUtils.sendAndReceive[ShareAcknowledgeResponse](shareAcknowledgeRequest, socket)
val shareAcknowledgeResponseData = shareAcknowledgeResponse.data() val shareAcknowledgeResponseData = shareAcknowledgeResponse.data()
assertEquals(Errors.NONE.code, shareAcknowledgeResponseData.errorCode) assertEquals(Errors.NONE.code, shareAcknowledgeResponseData.errorCode)
@ -730,7 +751,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch) shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty)
shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket)
shareFetchResponseData = shareFetchResponse.data() shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@ -782,8 +803,10 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
val send: Seq[TopicIdPartition] = Seq(topicIdPartition) val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
val socket: Socket = connectAny()
// Send the first share fetch request to initialize the share partition // Send the first share fetch request to initialize the share partition
sendFirstShareFetchRequest(memberId, groupId, send) sendFirstShareFetchRequest(memberId, groupId, send, socket)
initProducer() initProducer()
// Producing 10 records to the topic created above // Producing 10 records to the topic created above
@ -794,7 +817,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
var acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty var acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
var shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch) var shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch)
var shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) var shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket)
var shareFetchResponseData = shareFetchResponse.data() var shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@ -839,7 +862,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
releaseAcknowledgementSent = true releaseAcknowledgementSent = true
} }
shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch) shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch)
shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket)
shareFetchResponseData = shareFetchResponse.data() shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@ -896,8 +919,10 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
val send: Seq[TopicIdPartition] = Seq(topicIdPartition) val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
val socket: Socket = connectAny()
// Send the first share fetch request to initialize the share partition // Send the first share fetch request to initialize the share partition
sendFirstShareFetchRequest(memberId, groupId, send) sendFirstShareFetchRequest(memberId, groupId, send, socket)
initProducer() initProducer()
// Producing 10 records to the topic created above // Producing 10 records to the topic created above
@ -908,7 +933,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
val acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty val acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
var shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch) var shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch)
var shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) var shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket)
var shareFetchResponseData = shareFetchResponse.data() var shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@ -935,7 +960,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
.setLastOffset(9) .setLastOffset(9)
.setAcknowledgeTypes(Collections.singletonList(3.toByte))).asJava) // Reject the records .setAcknowledgeTypes(Collections.singletonList(3.toByte))).asJava) // Reject the records
val shareAcknowledgeRequest = createShareAcknowledgeRequest(groupId, metadata, acknowledgementsMapForAcknowledge) val shareAcknowledgeRequest = createShareAcknowledgeRequest(groupId, metadata, acknowledgementsMapForAcknowledge)
val shareAcknowledgeResponse = connectAndReceiveWithoutClosingSocket[ShareAcknowledgeResponse](shareAcknowledgeRequest) val shareAcknowledgeResponse = IntegrationTestUtils.sendAndReceive[ShareAcknowledgeResponse](shareAcknowledgeRequest, socket)
val shareAcknowledgeResponseData = shareAcknowledgeResponse.data() val shareAcknowledgeResponseData = shareAcknowledgeResponse.data()
assertEquals(Errors.NONE.code, shareAcknowledgeResponseData.errorCode) assertEquals(Errors.NONE.code, shareAcknowledgeResponseData.errorCode)
@ -957,7 +982,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch) shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty)
shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket)
shareFetchResponseData = shareFetchResponse.data() shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@ -1009,8 +1034,10 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
val send: Seq[TopicIdPartition] = Seq(topicIdPartition) val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
val socket: Socket = connectAny()
// Send the first share fetch request to initialize the share partition // Send the first share fetch request to initialize the share partition
sendFirstShareFetchRequest(memberId, groupId, send) sendFirstShareFetchRequest(memberId, groupId, send, socket)
initProducer() initProducer()
// Producing 10 records to the topic created above // Producing 10 records to the topic created above
@ -1021,7 +1048,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
var acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty var acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
var shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch) var shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch)
var shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) var shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket)
var shareFetchResponseData = shareFetchResponse.data() var shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@ -1050,7 +1077,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
.setLastOffset(9) .setLastOffset(9)
.setAcknowledgeTypes(Collections.singletonList(3.toByte))).asJava) // Reject the records .setAcknowledgeTypes(Collections.singletonList(3.toByte))).asJava) // Reject the records
shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch) shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch)
shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket)
shareFetchResponseData = shareFetchResponse.data() shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@ -1075,7 +1102,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch) shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty)
shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket)
shareFetchResponseData = shareFetchResponse.data() shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@ -1129,8 +1156,10 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
val send: Seq[TopicIdPartition] = Seq(topicIdPartition) val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
val socket: Socket = connectAny()
// Send the first share fetch request to initialize the shar partition // Send the first share fetch request to initialize the shar partition
sendFirstShareFetchRequest(memberId, groupId, send) sendFirstShareFetchRequest(memberId, groupId, send, socket)
initProducer() initProducer()
// Producing 10 records to the topic created above // Producing 10 records to the topic created above
@ -1141,7 +1170,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
val acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty val acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
var shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch) var shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch)
var shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) var shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket)
var shareFetchResponseData = shareFetchResponse.data() var shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@ -1168,7 +1197,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
.setLastOffset(9) .setLastOffset(9)
.setAcknowledgeTypes(Collections.singletonList(2.toByte))).asJava) // Release the records .setAcknowledgeTypes(Collections.singletonList(2.toByte))).asJava) // Release the records
var shareAcknowledgeRequest = createShareAcknowledgeRequest(groupId, metadata, acknowledgementsMapForAcknowledge) var shareAcknowledgeRequest = createShareAcknowledgeRequest(groupId, metadata, acknowledgementsMapForAcknowledge)
var shareAcknowledgeResponse = connectAndReceiveWithoutClosingSocket[ShareAcknowledgeResponse](shareAcknowledgeRequest) var shareAcknowledgeResponse = IntegrationTestUtils.sendAndReceive[ShareAcknowledgeResponse](shareAcknowledgeRequest, socket)
var shareAcknowledgeResponseData = shareAcknowledgeResponse.data() var shareAcknowledgeResponseData = shareAcknowledgeResponse.data()
assertEquals(Errors.NONE.code, shareAcknowledgeResponseData.errorCode) assertEquals(Errors.NONE.code, shareAcknowledgeResponseData.errorCode)
@ -1187,7 +1216,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch) shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty)
shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket)
shareFetchResponseData = shareFetchResponse.data() shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@ -1213,7 +1242,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
.setLastOffset(9) .setLastOffset(9)
.setAcknowledgeTypes(Collections.singletonList(2.toByte))).asJava) // Release the records again .setAcknowledgeTypes(Collections.singletonList(2.toByte))).asJava) // Release the records again
shareAcknowledgeRequest = createShareAcknowledgeRequest(groupId, metadata, acknowledgementsMapForAcknowledge) shareAcknowledgeRequest = createShareAcknowledgeRequest(groupId, metadata, acknowledgementsMapForAcknowledge)
shareAcknowledgeResponse = connectAndReceiveWithoutClosingSocket[ShareAcknowledgeResponse](shareAcknowledgeRequest) shareAcknowledgeResponse = IntegrationTestUtils.sendAndReceive[ShareAcknowledgeResponse](shareAcknowledgeRequest, socket)
shareAcknowledgeResponseData = shareAcknowledgeResponse.data() shareAcknowledgeResponseData = shareAcknowledgeResponse.data()
assertEquals(Errors.NONE.code, shareAcknowledgeResponseData.errorCode) assertEquals(Errors.NONE.code, shareAcknowledgeResponseData.errorCode)
@ -1235,7 +1264,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch) shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty)
shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket)
shareFetchResponseData = shareFetchResponse.data() shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@ -1276,7 +1305,6 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
def testShareFetchRequestSuccessfulSharingBetweenMultipleConsumers(): Unit = { def testShareFetchRequestSuccessfulSharingBetweenMultipleConsumers(): Unit = {
val groupId: String = "group" val groupId: String = "group"
val memberId = Uuid.randomUuid()
val memberId1 = Uuid.randomUuid() val memberId1 = Uuid.randomUuid()
val memberId2 = Uuid.randomUuid() val memberId2 = Uuid.randomUuid()
val memberId3 = Uuid.randomUuid() val memberId3 = Uuid.randomUuid()
@ -1291,8 +1319,12 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
val send: Seq[TopicIdPartition] = Seq(topicIdPartition) val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
val socket1: Socket = connectAny()
val socket2: Socket = connectAny()
val socket3: Socket = connectAny()
// Sending a dummy share fetch request to initialize the share partition // Sending a dummy share fetch request to initialize the share partition
sendFirstShareFetchRequest(memberId, groupId, send) sendFirstShareFetchRequest(memberId1, groupId, send, socket1)
initProducer() initProducer()
// Producing 10000 records to the topic created above // Producing 10000 records to the topic created above
@ -1312,9 +1344,9 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
val acknowledgementsMap3: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty val acknowledgementsMap3: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
val shareFetchRequest3 = createShareFetchRequest(groupId, metadata3, send, Seq.empty, acknowledgementsMap3, minBytes = 100, maxBytes = 1500) val shareFetchRequest3 = createShareFetchRequest(groupId, metadata3, send, Seq.empty, acknowledgementsMap3, minBytes = 100, maxBytes = 1500)
val shareFetchResponse1 = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest1) val shareFetchResponse1 = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest1, socket1)
val shareFetchResponse2 = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest2) val shareFetchResponse2 = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest2, socket2)
val shareFetchResponse3 = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest3) val shareFetchResponse3 = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest3, socket3)
val shareFetchResponseData1 = shareFetchResponse1.data() val shareFetchResponseData1 = shareFetchResponse1.data()
@ -1384,10 +1416,14 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
val send: Seq[TopicIdPartition] = Seq(topicIdPartition) val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
val socket1: Socket = connectAny()
val socket2: Socket = connectAny()
val socket3: Socket = connectAny()
// Sending 3 dummy share Fetch Requests with to inititlaize the share partitions for each share group\ // Sending 3 dummy share Fetch Requests with to inititlaize the share partitions for each share group\
sendFirstShareFetchRequest(memberId1, groupId1, send) sendFirstShareFetchRequest(memberId1, groupId1, send, socket1)
sendFirstShareFetchRequest(memberId2, groupId2, send) sendFirstShareFetchRequest(memberId2, groupId2, send, socket2)
sendFirstShareFetchRequest(memberId3, groupId3, send) sendFirstShareFetchRequest(memberId3, groupId3, send, socket3)
initProducer() initProducer()
// Producing 10 records to the topic created above // Producing 10 records to the topic created above
@ -1407,9 +1443,9 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
val acknowledgementsMap3: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty val acknowledgementsMap3: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
val shareFetchRequest3 = createShareFetchRequest(groupId3, metadata3, send, Seq.empty, acknowledgementsMap3) val shareFetchRequest3 = createShareFetchRequest(groupId3, metadata3, send, Seq.empty, acknowledgementsMap3)
val shareFetchResponse1 = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest1) val shareFetchResponse1 = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest1, socket1)
val shareFetchResponse2 = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest2) val shareFetchResponse2 = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest2, socket2)
val shareFetchResponse3 = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest3) val shareFetchResponse3 = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest3, socket3)
val shareFetchResponseData1 = shareFetchResponse1.data() val shareFetchResponseData1 = shareFetchResponse1.data()
@ -1475,8 +1511,10 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
val send: Seq[TopicIdPartition] = Seq(topicIdPartition) val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
val socket: Socket = connectAny()
// Send the first share fetch request to initialize the share partition // Send the first share fetch request to initialize the share partition
sendFirstShareFetchRequest(memberId, groupId, send) sendFirstShareFetchRequest(memberId, groupId, send, socket)
initProducer() initProducer()
// Producing 10 records to the topic created above // Producing 10 records to the topic created above
@ -1487,7 +1525,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
var acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty var acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
var shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch) var shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch)
var shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) var shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket)
var shareFetchResponseData = shareFetchResponse.data() var shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@ -1516,7 +1554,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
.setLastOffset(9) .setLastOffset(9)
.setAcknowledgeTypes(Collections.singletonList(1.toByte))).asJava) // Accept the records .setAcknowledgeTypes(Collections.singletonList(1.toByte))).asJava) // Accept the records
shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch) shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch)
shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket)
shareFetchResponseData = shareFetchResponse.data() shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@ -1542,7 +1580,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
.setLastOffset(19) .setLastOffset(19)
.setAcknowledgeTypes(Collections.singletonList(1.toByte))).asJava) // Accept the records .setAcknowledgeTypes(Collections.singletonList(1.toByte))).asJava) // Accept the records
shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty)
shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket)
shareFetchResponseData = shareFetchResponse.data() shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@ -1583,8 +1621,10 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
val send: Seq[TopicIdPartition] = Seq(topicIdPartition) val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
val socket: Socket = connectAny()
// Send the first share fetch request to initialize the share partition // Send the first share fetch request to initialize the share partition
sendFirstShareFetchRequest(memberId, groupId, send) sendFirstShareFetchRequest(memberId, groupId, send, socket)
initProducer() initProducer()
// Producing 10 records to the topic created above // Producing 10 records to the topic created above
@ -1595,7 +1635,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
var acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty var acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
var shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch) var shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch)
var shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) var shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket)
var shareFetchResponseData = shareFetchResponse.data() var shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@ -1624,7 +1664,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
.setLastOffset(9) .setLastOffset(9)
.setAcknowledgeTypes(Collections.singletonList(1.toByte))).asJava) // Accept the records .setAcknowledgeTypes(Collections.singletonList(1.toByte))).asJava) // Accept the records
shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch) shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch)
shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket)
shareFetchResponseData = shareFetchResponse.data() shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@ -1651,7 +1691,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
.setLastOffset(19) .setLastOffset(19)
.setAcknowledgeTypes(Collections.singletonList(1.toByte))).asJava) // Accept the records .setAcknowledgeTypes(Collections.singletonList(1.toByte))).asJava) // Accept the records
val shareAcknowledgeRequest = createShareAcknowledgeRequest(groupId, metadata, acknowledgementsMapForAcknowledge) val shareAcknowledgeRequest = createShareAcknowledgeRequest(groupId, metadata, acknowledgementsMapForAcknowledge)
val shareAcknowledgeResponse = connectAndReceiveWithoutClosingSocket[ShareAcknowledgeResponse](shareAcknowledgeRequest) val shareAcknowledgeResponse = IntegrationTestUtils.sendAndReceive[ShareAcknowledgeResponse](shareAcknowledgeRequest, socket)
val shareAcknowledgeResponseData = shareAcknowledgeResponse.data() val shareAcknowledgeResponseData = shareAcknowledgeResponse.data()
assertEquals(Errors.NONE.code, shareAcknowledgeResponseData.errorCode) assertEquals(Errors.NONE.code, shareAcknowledgeResponseData.errorCode)
@ -1704,6 +1744,8 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
val send: Seq[TopicIdPartition] = Seq(topicIdPartition) val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
val socket: Socket = connectAny()
val metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH) val metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH)
val acknowledgementsMap: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = val acknowledgementsMap: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] =
Map(topicIdPartition -> List(new ShareFetchRequestData.AcknowledgementBatch() Map(topicIdPartition -> List(new ShareFetchRequestData.AcknowledgementBatch()
@ -1711,7 +1753,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
.setLastOffset(9) .setLastOffset(9)
.setAcknowledgeTypes(Collections.singletonList(1.toByte))).asJava) // Acknowledgements in the Initial Fetch Request .setAcknowledgeTypes(Collections.singletonList(1.toByte))).asJava) // Acknowledgements in the Initial Fetch Request
val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMap) val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMap)
val shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) val shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket)
val shareFetchResponseData = shareFetchResponse.data() val shareFetchResponseData = shareFetchResponse.data()
// The response will have a top level error code because this is an Initial Fetch request with acknowledgement data present // The response will have a top level error code because this is an Initial Fetch request with acknowledgement data present
@ -1750,6 +1792,8 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
val topicId = topicIds.get(topic) val topicId = topicIds.get(topic)
val topicIdPartition = new TopicIdPartition(topicId, new TopicPartition(topic, partition)) val topicIdPartition = new TopicIdPartition(topicId, new TopicPartition(topic, partition))
val socket: Socket = connectAny()
// Send the share fetch request to fetch the records produced above // Send the share fetch request to fetch the records produced above
val metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH) val metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH)
val acknowledgementsMap: Map[TopicIdPartition, util.List[ShareAcknowledgeRequestData.AcknowledgementBatch]] = val acknowledgementsMap: Map[TopicIdPartition, util.List[ShareAcknowledgeRequestData.AcknowledgementBatch]] =
@ -1759,7 +1803,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
.setLastOffset(9) .setLastOffset(9)
setAcknowledgeTypes(Collections.singletonList(1.toByte))).asJava) setAcknowledgeTypes(Collections.singletonList(1.toByte))).asJava)
val shareAcknowledgeRequest = createShareAcknowledgeRequest(groupId, metadata, acknowledgementsMap) val shareAcknowledgeRequest = createShareAcknowledgeRequest(groupId, metadata, acknowledgementsMap)
val shareAcknowledgeResponse = connectAndReceiveWithoutClosingSocket[ShareAcknowledgeResponse](shareAcknowledgeRequest) val shareAcknowledgeResponse = IntegrationTestUtils.sendAndReceive[ShareAcknowledgeResponse](shareAcknowledgeRequest, socket)
val shareAcknowledgeResponseData = shareAcknowledgeResponse.data() val shareAcknowledgeResponseData = shareAcknowledgeResponse.data()
assertEquals(Errors.INVALID_SHARE_SESSION_EPOCH.code, shareAcknowledgeResponseData.errorCode) assertEquals(Errors.INVALID_SHARE_SESSION_EPOCH.code, shareAcknowledgeResponseData.errorCode)
@ -1798,8 +1842,10 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
val send: Seq[TopicIdPartition] = Seq(topicIdPartition) val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
val socket: Socket = connectAny()
// Send the first share fetch request to initialize the share partition // Send the first share fetch request to initialize the share partition
sendFirstShareFetchRequest(memberId, groupId, send) sendFirstShareFetchRequest(memberId, groupId, send, socket)
initProducer() initProducer()
// Producing 10 records to the topic created above // Producing 10 records to the topic created above
@ -1809,7 +1855,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH) var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
var shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) var shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty)
var shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) var shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket)
var shareFetchResponseData = shareFetchResponse.data() var shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@ -1831,7 +1877,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.nextEpoch(shareSessionEpoch)) shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.nextEpoch(shareSessionEpoch))
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty)
shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket)
shareFetchResponseData = shareFetchResponse.data() shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.INVALID_SHARE_SESSION_EPOCH.code, shareFetchResponseData.errorCode) assertEquals(Errors.INVALID_SHARE_SESSION_EPOCH.code, shareFetchResponseData.errorCode)
@ -1870,8 +1916,10 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
val send: Seq[TopicIdPartition] = Seq(topicIdPartition) val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
val socket: Socket = connectAny()
// Send the first share fetch request to initialize the share partition // Send the first share fetch request to initialize the share partition
sendFirstShareFetchRequest(memberId, groupId, send) sendFirstShareFetchRequest(memberId, groupId, send, socket)
initProducer() initProducer()
// Producing 10 records to the topic created above // Producing 10 records to the topic created above
@ -1881,7 +1929,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH) var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty)
val shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) val shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket)
val shareFetchResponseData = shareFetchResponse.data() val shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@ -1908,7 +1956,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
.setLastOffset(9) .setLastOffset(9)
.setAcknowledgeTypes(Collections.singletonList(1.toByte))).asJava) .setAcknowledgeTypes(Collections.singletonList(1.toByte))).asJava)
val shareAcknowledgeRequest = createShareAcknowledgeRequest(groupId, metadata, acknowledgementsMap) val shareAcknowledgeRequest = createShareAcknowledgeRequest(groupId, metadata, acknowledgementsMap)
val shareAcknowledgeResponse = connectAndReceiveWithoutClosingSocket[ShareAcknowledgeResponse](shareAcknowledgeRequest) val shareAcknowledgeResponse = IntegrationTestUtils.sendAndReceive[ShareAcknowledgeResponse](shareAcknowledgeRequest, socket)
val shareAcknowledgeResponseData = shareAcknowledgeResponse.data() val shareAcknowledgeResponseData = shareAcknowledgeResponse.data()
assertEquals(Errors.INVALID_SHARE_SESSION_EPOCH.code, shareAcknowledgeResponseData.errorCode) assertEquals(Errors.INVALID_SHARE_SESSION_EPOCH.code, shareAcknowledgeResponseData.errorCode)
@ -1948,8 +1996,10 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
val send: Seq[TopicIdPartition] = Seq(topicIdPartition) val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
val socket: Socket = connectAny()
// Send the first share fetch request to initialize the share partition // Send the first share fetch request to initialize the share partition
sendFirstShareFetchRequest(memberId, groupId, send) sendFirstShareFetchRequest(memberId, groupId, send, socket)
initProducer() initProducer()
// Producing 10 records to the topic created above // Producing 10 records to the topic created above
@ -1959,7 +2009,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH) var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
var shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) var shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty)
var shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) var shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket)
var shareFetchResponseData = shareFetchResponse.data() var shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@ -1981,7 +2031,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch) shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
metadata = new ShareRequestMetadata(wrongMemberId, shareSessionEpoch) metadata = new ShareRequestMetadata(wrongMemberId, shareSessionEpoch)
shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty)
shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket)
shareFetchResponseData = shareFetchResponse.data() shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.SHARE_SESSION_NOT_FOUND.code, shareFetchResponseData.errorCode) assertEquals(Errors.SHARE_SESSION_NOT_FOUND.code, shareFetchResponseData.errorCode)
@ -2026,11 +2076,15 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
val send: Seq[TopicIdPartition] = Seq(topicIdPartition) val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
val socket1: Socket = connectAny()
val socket2: Socket = connectAny()
val socket3: Socket = connectAny()
// member1 sends share fetch request to register it's share session. Note it does not close the socket connection after. // member1 sends share fetch request to register it's share session. Note it does not close the socket connection after.
TestUtils.waitUntilTrue(() => { TestUtils.waitUntilTrue(() => {
val metadata = new ShareRequestMetadata(memberId1, ShareRequestMetadata.INITIAL_EPOCH) val metadata = new ShareRequestMetadata(memberId1, ShareRequestMetadata.INITIAL_EPOCH)
val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty)
val shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) val shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket1)
val shareFetchResponseData = shareFetchResponse.data() val shareFetchResponseData = shareFetchResponse.data()
shareFetchResponseData.errorCode == Errors.NONE.code shareFetchResponseData.errorCode == Errors.NONE.code
}, "Share fetch request failed", 5000) }, "Share fetch request failed", 5000)
@ -2039,7 +2093,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
TestUtils.waitUntilTrue(() => { TestUtils.waitUntilTrue(() => {
val metadata = new ShareRequestMetadata(memberId2, ShareRequestMetadata.INITIAL_EPOCH) val metadata = new ShareRequestMetadata(memberId2, ShareRequestMetadata.INITIAL_EPOCH)
val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty)
val shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) val shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket2)
val shareFetchResponseData = shareFetchResponse.data() val shareFetchResponseData = shareFetchResponse.data()
shareFetchResponseData.errorCode == Errors.NONE.code shareFetchResponseData.errorCode == Errors.NONE.code
}, "Share fetch request failed", 5000) }, "Share fetch request failed", 5000)
@ -2050,20 +2104,22 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
TestUtils.waitUntilTrue(() => { TestUtils.waitUntilTrue(() => {
val metadata = new ShareRequestMetadata(memberId3, ShareRequestMetadata.INITIAL_EPOCH) val metadata = new ShareRequestMetadata(memberId3, ShareRequestMetadata.INITIAL_EPOCH)
val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty)
val shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) val shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket3)
val shareFetchResponseData = shareFetchResponse.data() val shareFetchResponseData = shareFetchResponse.data()
shareFetchResponseData.errorCode == Errors.SHARE_SESSION_NOT_FOUND.code shareFetchResponseData.errorCode == Errors.SHARE_SESSION_NOT_FOUND.code
}, "Share fetch request failed", 5000) }, "Share fetch request failed", 5000)
// Now we will close the socket connections for the above three members, mimicking a client disconnection // Now we will close the socket connections for the members, mimicking a client disconnection
closeSockets() closeSockets()
// Since the socket connections were closed before, the corresponding share sessions were dropped from the ShareSessionCache val socket4: Socket = connectAny()
// on the broker. Now, since the cache is empty, new share sessions can be registered
// Since one of the socket connections was closed before, the corresponding share session was dropped from the ShareSessionCache
// on the broker. Now, since the cache is not full, new share sessions can be registered
TestUtils.waitUntilTrue(() => { TestUtils.waitUntilTrue(() => {
val metadata = new ShareRequestMetadata(memberId3, ShareRequestMetadata.INITIAL_EPOCH) val metadata = new ShareRequestMetadata(memberId3, ShareRequestMetadata.INITIAL_EPOCH)
val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty)
val shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) val shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket4)
val shareFetchResponseData = shareFetchResponse.data() val shareFetchResponseData = shareFetchResponse.data()
shareFetchResponseData.errorCode == Errors.NONE.code shareFetchResponseData.errorCode == Errors.NONE.code
}, "Share fetch request failed", 5000) }, "Share fetch request failed", 5000)
@ -2103,8 +2159,10 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
val send: Seq[TopicIdPartition] = Seq(topicIdPartition) val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
val socket: Socket = connectAny()
// Send the first share fetch request to initialize the share partition // Send the first share fetch request to initialize the share partition
sendFirstShareFetchRequest(memberId, groupId, send) sendFirstShareFetchRequest(memberId, groupId, send, socket)
initProducer() initProducer()
// Producing 10 records to the topic created above // Producing 10 records to the topic created above
@ -2114,7 +2172,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH) var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)
var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty)
val shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) val shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket)
val shareFetchResponseData = shareFetchResponse.data() val shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@ -2141,7 +2199,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
.setLastOffset(9) .setLastOffset(9)
.setAcknowledgeTypes(Collections.singletonList(1.toByte))).asJava) .setAcknowledgeTypes(Collections.singletonList(1.toByte))).asJava)
val shareAcknowledgeRequest = createShareAcknowledgeRequest(groupId, metadata, acknowledgementsMap) val shareAcknowledgeRequest = createShareAcknowledgeRequest(groupId, metadata, acknowledgementsMap)
val shareAcknowledgeResponse = connectAndReceiveWithoutClosingSocket[ShareAcknowledgeResponse](shareAcknowledgeRequest) val shareAcknowledgeResponse = IntegrationTestUtils.sendAndReceive[ShareAcknowledgeResponse](shareAcknowledgeRequest, socket)
val shareAcknowledgeResponseData = shareAcknowledgeResponse.data() val shareAcknowledgeResponseData = shareAcknowledgeResponse.data()
assertEquals(Errors.SHARE_SESSION_NOT_FOUND.code, shareAcknowledgeResponseData.errorCode) assertEquals(Errors.SHARE_SESSION_NOT_FOUND.code, shareAcknowledgeResponseData.errorCode)
@ -2182,8 +2240,10 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
val send: Seq[TopicIdPartition] = Seq(topicIdPartition1, topicIdPartition2) val send: Seq[TopicIdPartition] = Seq(topicIdPartition1, topicIdPartition2)
val socket: Socket = connectAny()
// Send the first share fetch request to initialize the share partition // Send the first share fetch request to initialize the share partition
sendFirstShareFetchRequest(memberId, groupId, send) sendFirstShareFetchRequest(memberId, groupId, send, socket)
initProducer() initProducer()
// Producing 10 records to the topic partitions created above // Producing 10 records to the topic partitions created above
@ -2200,7 +2260,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
// as the share partitions might not be initialized yet. So, we retry until we get the response. // as the share partitions might not be initialized yet. So, we retry until we get the response.
var responses = Seq[ShareFetchResponseData.PartitionData]() var responses = Seq[ShareFetchResponseData.PartitionData]()
TestUtils.waitUntilTrue(() => { TestUtils.waitUntilTrue(() => {
val shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) val shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket)
val shareFetchResponseData = shareFetchResponse.data() val shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs) assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs)
@ -2226,7 +2286,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
val forget: Seq[TopicIdPartition] = Seq(topicIdPartition1) val forget: Seq[TopicIdPartition] = Seq(topicIdPartition1)
shareFetchRequest = createShareFetchRequest(groupId, metadata, Seq.empty, forget, acknowledgementsMap) shareFetchRequest = createShareFetchRequest(groupId, metadata, Seq.empty, forget, acknowledgementsMap)
val shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) val shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket)
val shareFetchResponseData = shareFetchResponse.data() val shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@ -2277,8 +2337,10 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
val send: Seq[TopicIdPartition] = Seq(topicIdPartition) val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
val socket: Socket = connectAny()
// Send the first share fetch request to initialize the share partition // Send the first share fetch request to initialize the share partition
sendFirstShareFetchRequest(memberId, groupId, send) sendFirstShareFetchRequest(memberId, groupId, send, socket)
initProducer() initProducer()
// Producing 10 records to the topic created above // Producing 10 records to the topic created above
@ -2288,7 +2350,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
val metadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)) val metadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))
val acknowledgementsMap: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty val acknowledgementsMap: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMap, maxRecords = 1, batchSize = 1) val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMap, maxRecords = 1, batchSize = 1)
val shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest) val shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket)
val shareFetchResponseData = shareFetchResponse.data val shareFetchResponseData = shareFetchResponse.data
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@ -2339,8 +2401,10 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
val send: Seq[TopicIdPartition] = Seq(topicIdPartition) val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
val socket: Socket = connectAny()
// Send the first share fetch request to initialize the share partition // Send the first share fetch request to initialize the share partition
sendFirstShareFetchRequest(memberId, groupId, send) sendFirstShareFetchRequest(memberId, groupId, send, socket)
initProducer() initProducer()
// Producing 10 records to the topic created above // Producing 10 records to the topic created above
@ -2350,7 +2414,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
val metadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)) val metadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))
val acknowledgementsMap: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty val acknowledgementsMap: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMap, maxRecords = 5, batchSize = 1) val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMap, maxRecords = 5, batchSize = 1)
val shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest) val shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket)
val shareFetchResponseData = shareFetchResponse.data val shareFetchResponseData = shareFetchResponse.data
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
@ -2371,12 +2435,12 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
// For initial fetch request, the response may not be available in the first attempt when the share // For initial fetch request, the response may not be available in the first attempt when the share
// partition is not initialized yet. Hence, wait for response from all partitions before proceeding. // partition is not initialized yet. Hence, wait for response from all partitions before proceeding.
private def sendFirstShareFetchRequest(memberId: Uuid, groupId: String, topicIdPartitions: Seq[TopicIdPartition], lockTimeout: Int = 30000): Unit = { private def sendFirstShareFetchRequest(memberId: Uuid, groupId: String, topicIdPartitions: Seq[TopicIdPartition], socket: Socket, lockTimeout: Int = 30000): Unit = {
val partitions: util.Set[Integer] = new util.HashSet() val partitions: util.Set[Integer] = new util.HashSet()
TestUtils.waitUntilTrue(() => { TestUtils.waitUntilTrue(() => {
val metadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH) val metadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH)
val shareFetchRequest = createShareFetchRequest(groupId, metadata, topicIdPartitions, Seq.empty, Map.empty) val shareFetchRequest = createShareFetchRequest(groupId, metadata, topicIdPartitions, Seq.empty, Map.empty)
val shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) val shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket)
val shareFetchResponseData = shareFetchResponse.data() val shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)