KAFKA-19328: SharePartitionManagerTest testMultipleConcurrentShareFetches doAnswer chaining needs verification (#19872)

Hi, I've created pull request.

jira: [19328](https://issues.apache.org/jira/browse/KAFKA-19328)

problem:

1. doAnswer chaining works as intended only when calls are made
sequentially. In a multithreaded environment, its behavior is
unpredictable.
2. errors in a thread can be swallowed, not seen in main thread.
3. 5 doAnswer chain is not enough for 100 threads. The last chain is
returned for most cases.
4. nextFetchOffset seems to be called before doAnswer chain, so the last
values (25, 5,  26, 16) always was found in doAsnwer chain.

solution:

Delete doAnswer chain so that above four problems disappear.

Reviewers: Abhinav Dixit <adixit@confluent.io>, Apoorv Mittal
 <apoorvmittal10@gmail.com>, Andrew Schofield <aschofield@confluent.io>
This commit is contained in:
Ji-Seung Ryu 2025-06-05 01:15:18 +09:00 committed by GitHub
parent 2a2626b3d8
commit cfd18132e8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 0 additions and 143 deletions

View File

@ -75,7 +75,6 @@ import org.apache.kafka.server.share.session.ShareSessionCache;
import org.apache.kafka.server.share.session.ShareSessionKey;
import org.apache.kafka.server.storage.log.FetchIsolation;
import org.apache.kafka.server.storage.log.FetchParams;
import org.apache.kafka.server.storage.log.FetchPartitionData;
import org.apache.kafka.server.util.FutureUtils;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.server.util.timer.MockTimer;
@ -110,9 +109,6 @@ import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import scala.Tuple2;
@ -120,7 +116,6 @@ import scala.collection.Seq;
import scala.jdk.javaapi.CollectionConverters;
import static kafka.server.share.DelayedShareFetchTest.mockTopicIdPartitionToReturnDataEqualToMinBytes;
import static org.apache.kafka.server.share.fetch.ShareFetchTestUtils.createShareAcquiredRecords;
import static org.apache.kafka.server.share.fetch.ShareFetchTestUtils.validateRotatedListEquals;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -135,8 +130,6 @@ import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.atMost;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
@ -1187,142 +1180,6 @@ public class SharePartitionManagerTest {
);
}
@Test
public void testMultipleConcurrentShareFetches() throws InterruptedException {
String groupId = "grp";
Uuid memberId1 = Uuid.randomUuid();
Uuid fooId = Uuid.randomUuid();
Uuid barId = Uuid.randomUuid();
TopicIdPartition tp0 = new TopicIdPartition(fooId, new TopicPartition("foo", 0));
TopicIdPartition tp1 = new TopicIdPartition(fooId, new TopicPartition("foo", 1));
TopicIdPartition tp2 = new TopicIdPartition(barId, new TopicPartition("bar", 0));
TopicIdPartition tp3 = new TopicIdPartition(barId, new TopicPartition("bar", 1));
List<TopicIdPartition> topicIdPartitions = List.of(tp0, tp1, tp2, tp3);
mockFetchOffsetForTimestamp(mockReplicaManager);
Timer mockTimer = systemTimerReaper();
DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
"TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(),
DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, false, true);
mockReplicaManagerDelayedShareFetch(mockReplicaManager, delayedShareFetchPurgatory);
mockTopicIdPartitionToReturnDataEqualToMinBytes(mockReplicaManager, tp0, 1);
mockTopicIdPartitionToReturnDataEqualToMinBytes(mockReplicaManager, tp1, 1);
mockTopicIdPartitionToReturnDataEqualToMinBytes(mockReplicaManager, tp2, 1);
mockTopicIdPartitionToReturnDataEqualToMinBytes(mockReplicaManager, tp3, 1);
SharePartition sp0 = mock(SharePartition.class);
SharePartition sp1 = mock(SharePartition.class);
SharePartition sp2 = mock(SharePartition.class);
SharePartition sp3 = mock(SharePartition.class);
// Mock the share partitions corresponding to the topic partitions.
SharePartitionCache partitionCache = new SharePartitionCache();
partitionCache.put(new SharePartitionKey(groupId, tp0), sp0);
partitionCache.put(new SharePartitionKey(groupId, tp1), sp1);
partitionCache.put(new SharePartitionKey(groupId, tp2), sp2);
partitionCache.put(new SharePartitionKey(groupId, tp3), sp3);
// Mock the share partitions to get initialized instantaneously without any error.
when(sp0.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null));
when(sp1.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null));
when(sp2.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null));
when(sp3.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null));
// Required mocks so that the share partitions can acquire record.
when(sp0.maybeAcquireFetchLock(any())).thenReturn(true);
when(sp1.maybeAcquireFetchLock(any())).thenReturn(true);
when(sp2.maybeAcquireFetchLock(any())).thenReturn(true);
when(sp3.maybeAcquireFetchLock(any())).thenReturn(true);
when(sp0.canAcquireRecords()).thenReturn(true);
when(sp1.canAcquireRecords()).thenReturn(true);
when(sp2.canAcquireRecords()).thenReturn(true);
when(sp3.canAcquireRecords()).thenReturn(true);
when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class), any())).thenReturn(
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class), any())).thenReturn(
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
when(sp2.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class), any())).thenReturn(
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
when(sp3.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class), any())).thenReturn(
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
// Mocks to have fetch offset metadata match for share partitions to avoid any extra calls to replicaManager.readFromLog.
when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(mock(LogOffsetMetadata.class)));
when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(mock(LogOffsetMetadata.class)));
when(sp2.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(mock(LogOffsetMetadata.class)));
when(sp3.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(mock(LogOffsetMetadata.class)));
// Mock nextFetchOffset() functionality for share partitions to reflect the moving fetch of share partitions.
when(sp0.nextFetchOffset()).thenReturn((long) 1, (long) 15, (long) 6, (long) 30, (long) 25);
when(sp1.nextFetchOffset()).thenReturn((long) 4, (long) 1, (long) 18, (long) 5);
when(sp2.nextFetchOffset()).thenReturn((long) 10, (long) 25, (long) 26);
when(sp3.nextFetchOffset()).thenReturn((long) 20, (long) 15, (long) 23, (long) 16);
sharePartitionManager = SharePartitionManagerBuilder.builder()
.withReplicaManager(mockReplicaManager)
.withTimer(mockTimer)
.withBrokerTopicStats(brokerTopicStats)
.withPartitionCache(partitionCache)
.build();
doAnswer(invocation -> {
assertEquals(1, sp0.nextFetchOffset());
assertEquals(4, sp1.nextFetchOffset());
assertEquals(10, sp2.nextFetchOffset());
assertEquals(20, sp3.nextFetchOffset());
return buildLogReadResult(topicIdPartitions);
}).doAnswer(invocation -> {
assertEquals(15, sp0.nextFetchOffset());
assertEquals(1, sp1.nextFetchOffset());
assertEquals(25, sp2.nextFetchOffset());
assertEquals(15, sp3.nextFetchOffset());
return buildLogReadResult(topicIdPartitions);
}).doAnswer(invocation -> {
assertEquals(6, sp0.nextFetchOffset());
assertEquals(18, sp1.nextFetchOffset());
assertEquals(26, sp2.nextFetchOffset());
assertEquals(23, sp3.nextFetchOffset());
return buildLogReadResult(topicIdPartitions);
}).doAnswer(invocation -> {
assertEquals(30, sp0.nextFetchOffset());
assertEquals(5, sp1.nextFetchOffset());
assertEquals(26, sp2.nextFetchOffset());
assertEquals(16, sp3.nextFetchOffset());
return buildLogReadResult(topicIdPartitions);
}).doAnswer(invocation -> {
assertEquals(25, sp0.nextFetchOffset());
assertEquals(5, sp1.nextFetchOffset());
assertEquals(26, sp2.nextFetchOffset());
assertEquals(16, sp3.nextFetchOffset());
return buildLogReadResult(topicIdPartitions);
}).when(mockReplicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
int threadCount = 100;
ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
FetchParams fetchParams = new FetchParams(
FetchRequest.ORDINARY_CONSUMER_ID, -1, 200,
1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty(), true);
try {
for (int i = 0; i != threadCount; ++i) {
executorService.submit(() -> {
sharePartitionManager.fetchMessages(groupId, memberId1.toString(), fetchParams, 0,
MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
});
// We are blocking the main thread at an interval of 10 threads so that the currently running executorService threads can complete.
if (i % 10 == 0)
executorService.awaitTermination(50, TimeUnit.MILLISECONDS);
}
} finally {
if (!executorService.awaitTermination(50, TimeUnit.MILLISECONDS))
executorService.shutdown();
}
// We are checking the number of replicaManager readFromLog() calls
Mockito.verify(mockReplicaManager, atMost(100)).readFromLog(
any(), any(), any(ReplicaQuota.class), anyBoolean());
Mockito.verify(mockReplicaManager, atLeast(10)).readFromLog(
any(), any(), any(ReplicaQuota.class), anyBoolean());
}
@Test
public void testReplicaManagerFetchShouldNotProceed() {
String groupId = "grp";