MINOR: changed the test testShareFetchRequestSuccessfulSharingBetweenMultipleConsumers to remove ambiguity (#19997)
CI / build (push) Waiting to run Details

The test testShareFetchRequestSuccessfulSharingBetweenMultipleConsumers
was recently found to be flaky. Making the following small change that
could potentially resolve the issue. Earlier, 1000 records were being
produced and then 3 consecutive share fetch requests were being sent. At
the end, assertions were done to make sure each share consumer receives
some records, and that none of them consume the same record. Since the
motive for the test is to see if multiple consumers can share the same
subscription and not consume the same record, a better way would be to
produce a record, consume that and repeat it 3 times with the 3
consumers. This ensures that every consumer consume a record, and a
previously consume record is not consumed again by the subsequent share
fetches.

Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, Andrew Schofield
 <aschofield@confluent.io>
This commit is contained in:
Chirag Wadhwa 2025-06-21 13:06:31 +05:30 committed by GitHub
parent 815dd93e2f
commit 7c77519f59
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 28 additions and 33 deletions

View File

@ -1368,55 +1368,50 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
sendFirstShareFetchRequest(memberId1, groupId, send, socket1) sendFirstShareFetchRequest(memberId1, groupId, send, socket1)
initProducer() initProducer()
// Producing 10000 records to the topic created above // Producing 1 record to the topic created above
produceData(topicIdPartition, 10000) produceData(topicIdPartition, 1)
// Sending 3 share Fetch Requests with same groupId to the same topicPartition but with different memberIds, // Sending a share Fetch Request
// mocking the behaviour of multiple share consumers from the same share group
val metadata1: ShareRequestMetadata = new ShareRequestMetadata(memberId1, ShareRequestMetadata.INITIAL_EPOCH) val metadata1: ShareRequestMetadata = new ShareRequestMetadata(memberId1, ShareRequestMetadata.INITIAL_EPOCH)
val acknowledgementsMap1: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty val acknowledgementsMap1: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
val shareFetchRequest1 = createShareFetchRequest(groupId, metadata1, send, Seq.empty, acknowledgementsMap1, minBytes = 100, maxBytes = 1500) val shareFetchRequest1 = createShareFetchRequest(groupId, metadata1, send, Seq.empty, acknowledgementsMap1, minBytes = 100, maxBytes = 1500)
val shareFetchResponse1 = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest1, socket1)
val shareFetchResponseData1 = shareFetchResponse1.data()
val partitionData1 = shareFetchResponseData1.responses().stream().findFirst().get().partitions().get(0)
// Producing 1 record to the topic created above
produceData(topicIdPartition, 1)
// Sending another share Fetch Request with same groupId to the same topicPartition but with different memberId,
// mocking the behaviour of multiple share consumers from the same share group
val metadata2: ShareRequestMetadata = new ShareRequestMetadata(memberId2, ShareRequestMetadata.INITIAL_EPOCH) val metadata2: ShareRequestMetadata = new ShareRequestMetadata(memberId2, ShareRequestMetadata.INITIAL_EPOCH)
val acknowledgementsMap2: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty val acknowledgementsMap2: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
val shareFetchRequest2 = createShareFetchRequest(groupId, metadata2, send, Seq.empty, acknowledgementsMap2, minBytes = 100, maxBytes = 1500) val shareFetchRequest2 = createShareFetchRequest(groupId, metadata2, send, Seq.empty, acknowledgementsMap2, minBytes = 100, maxBytes = 1500)
val shareFetchResponse2 = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest2, socket2)
val shareFetchResponseData2 = shareFetchResponse2.data()
val partitionData2 = shareFetchResponseData2.responses().stream().findFirst().get().partitions().get(0)
// Producing 1 record to the topic created above
produceData(topicIdPartition, 1)
// Sending another share Fetch Request with same groupId to the same topicPartition but with different memberId,
// mocking the behaviour of multiple share consumers from the same share group
val metadata3: ShareRequestMetadata = new ShareRequestMetadata(memberId3, ShareRequestMetadata.INITIAL_EPOCH) val metadata3: ShareRequestMetadata = new ShareRequestMetadata(memberId3, ShareRequestMetadata.INITIAL_EPOCH)
val acknowledgementsMap3: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty val acknowledgementsMap3: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty
val shareFetchRequest3 = createShareFetchRequest(groupId, metadata3, send, Seq.empty, acknowledgementsMap3, minBytes = 100, maxBytes = 1500) val shareFetchRequest3 = createShareFetchRequest(groupId, metadata3, send, Seq.empty, acknowledgementsMap3, minBytes = 100, maxBytes = 1500)
val shareFetchResponse1 = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest1, socket1)
val shareFetchResponse2 = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest2, socket2)
val shareFetchResponse3 = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest3, socket3) val shareFetchResponse3 = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest3, socket3)
val shareFetchResponseData1 = shareFetchResponse1.data()
assertEquals(Errors.NONE.code, shareFetchResponseData1.errorCode)
assertEquals(1, shareFetchResponseData1.responses().size())
assertEquals(topicId, shareFetchResponseData1.responses().stream().findFirst().get().topicId())
assertEquals(1, shareFetchResponseData1.responses().stream().findFirst().get().partitions().size())
val partitionData1 = shareFetchResponseData1.responses().stream().findFirst().get().partitions().get(0)
val shareFetchResponseData2 = shareFetchResponse2.data()
assertEquals(Errors.NONE.code, shareFetchResponseData2.errorCode)
assertEquals(1, shareFetchResponseData2.responses().size())
assertEquals(topicId, shareFetchResponseData2.responses().stream().findFirst().get().topicId())
assertEquals(1, shareFetchResponseData2.responses().stream().findFirst().get().partitions().size())
val partitionData2 = shareFetchResponseData2.responses().stream().findFirst().get().partitions().get(0)
val shareFetchResponseData3 = shareFetchResponse3.data() val shareFetchResponseData3 = shareFetchResponse3.data()
assertEquals(Errors.NONE.code, shareFetchResponseData3.errorCode)
assertEquals(1, shareFetchResponseData3.responses().size())
assertEquals(topicId, shareFetchResponseData3.responses().stream().findFirst().get().topicId())
assertEquals(1, shareFetchResponseData3.responses().stream().findFirst().get().partitions().size())
val partitionData3 = shareFetchResponseData3.responses().stream().findFirst().get().partitions().get(0) val partitionData3 = shareFetchResponseData3.responses().stream().findFirst().get().partitions().get(0)
// There should be no common records between the 3 consumers as they are part of the same group // Each consumer should have received 1 record and any record should only be consumed by 1 consumer
assertTrue(partitionData1.acquiredRecords().get(0).lastOffset() < partitionData2.acquiredRecords().get(0).firstOffset()) assertEquals(partitionData1.acquiredRecords().get(0).firstOffset(), partitionData1.acquiredRecords().get(0).lastOffset())
assertTrue(partitionData2.acquiredRecords().get(0).lastOffset() < partitionData3.acquiredRecords().get(0).firstOffset()) assertEquals(partitionData1.acquiredRecords().get(0).firstOffset(), 0)
assertEquals(partitionData2.acquiredRecords().get(0).firstOffset(), partitionData2.acquiredRecords().get(0).lastOffset())
assertEquals(partitionData2.acquiredRecords().get(0).firstOffset(), 1)
assertEquals(partitionData3.acquiredRecords().get(0).firstOffset(), partitionData3.acquiredRecords().get(0).lastOffset())
assertEquals(partitionData3.acquiredRecords().get(0).firstOffset(), 2)
} }
@ClusterTests( @ClusterTests(