KAFKA-19310: (MINOR) Missing mocks for DelayedShareFetchTest tests related to Memory Records slicing (#19823)

### About
Added test memory records to avoid the silent exception thrown during
slicing.

### Testing
Ran the tests of `DelayedShareFetchTest` to make sure that there is no
silent exception in any test.

Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>
This commit is contained in:
Abhinav Dixit 2025-05-28 18:31:00 +05:30 committed by GitHub
parent d6ee83a893
commit 14ed1162a5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 35 additions and 20 deletions

View File

@ -59,6 +59,7 @@ import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import java.util.ArrayList;
@ -1405,10 +1406,11 @@ public class DelayedShareFetchTest {
when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
when(sp2.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn(
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
when(sp1.acquire(any(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn(
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
MockedStatic<ShareFetchUtils> mockedShareFetchUtils = Mockito.mockStatic(ShareFetchUtils.class);
Map<TopicIdPartition, ShareFetchResponseData.PartitionData> partitionDataMap = new LinkedHashMap<>();
partitionDataMap.put(tp0, mock(ShareFetchResponseData.PartitionData.class));
partitionDataMap.put(tp1, mock(ShareFetchResponseData.PartitionData.class));
mockedShareFetchUtils.when(() -> ShareFetchUtils.processFetchResponse(any(), any(), any(), any(), any())).thenReturn(partitionDataMap);
// Mocking local log read result for tp1 and remote storage read result for tp2 on first replicaManager readFromLog call(from tryComplete).
// Mocking local log read result for tp0 and tp1 on second replicaManager readFromLog call(from onComplete).
@ -1466,6 +1468,7 @@ public class DelayedShareFetchTest {
assertEquals(Set.of(tp0, tp1), future.join().keySet());
assertTrue(delayedShareFetch.lock().tryLock());
delayedShareFetch.lock().unlock();
mockedShareFetchUtils.close();
}
@Test
@ -1592,8 +1595,10 @@ public class DelayedShareFetchTest {
// sp0 is acquirable.
when(sp0.maybeAcquireFetchLock(fetchId)).thenReturn(true);
when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn(
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
MockedStatic<ShareFetchUtils> mockedShareFetchUtils = Mockito.mockStatic(ShareFetchUtils.class);
Map<TopicIdPartition, ShareFetchResponseData.PartitionData> partitionDataMap = new LinkedHashMap<>();
partitionDataMap.put(tp0, mock(ShareFetchResponseData.PartitionData.class));
mockedShareFetchUtils.when(() -> ShareFetchUtils.processFetchResponse(any(), any(), any(), any(), any())).thenReturn(partitionDataMap);
assertFalse(delayedShareFetch.isCompleted());
assertTrue(delayedShareFetch.tryComplete());
@ -1608,6 +1613,7 @@ public class DelayedShareFetchTest {
assertEquals(Errors.NONE.code(), future.join().get(tp0).errorCode());
assertTrue(delayedShareFetch.lock().tryLock());
delayedShareFetch.lock().unlock();
mockedShareFetchUtils.close();
}
@Test
@ -1644,12 +1650,12 @@ public class DelayedShareFetchTest {
when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
when(sp2.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn(
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
when(sp1.acquire(any(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn(
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
when(sp2.acquire(any(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn(
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
MockedStatic<ShareFetchUtils> mockedShareFetchUtils = Mockito.mockStatic(ShareFetchUtils.class);
Map<TopicIdPartition, ShareFetchResponseData.PartitionData> partitionDataMap = new LinkedHashMap<>();
partitionDataMap.put(tp0, mock(ShareFetchResponseData.PartitionData.class));
partitionDataMap.put(tp1, mock(ShareFetchResponseData.PartitionData.class));
partitionDataMap.put(tp2, mock(ShareFetchResponseData.PartitionData.class));
mockedShareFetchUtils.when(() -> ShareFetchUtils.processFetchResponse(any(), any(), any(), any(), any())).thenReturn(partitionDataMap);
// Mocking local log read result for tp0, tp1 and remote storage read result for tp2 on first replicaManager readFromLog call(from tryComplete).
// Mocking local log read result for tp0 and tp1 on second replicaManager readFromLog call(from onComplete).
@ -1701,6 +1707,7 @@ public class DelayedShareFetchTest {
assertEquals(Errors.NONE.code(), future.join().get(tp2).errorCode());
assertTrue(delayedShareFetch.lock().tryLock());
delayedShareFetch.lock().unlock();
mockedShareFetchUtils.close();
}
@Test
@ -1764,10 +1771,11 @@ public class DelayedShareFetchTest {
when(sp0.maybeAcquireFetchLock(fetchId)).thenReturn(true);
when(sp1.maybeAcquireFetchLock(fetchId)).thenReturn(true);
when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn(
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
when(sp1.acquire(any(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn(
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
MockedStatic<ShareFetchUtils> mockedShareFetchUtils = Mockito.mockStatic(ShareFetchUtils.class);
Map<TopicIdPartition, ShareFetchResponseData.PartitionData> partitionDataMap = new LinkedHashMap<>();
partitionDataMap.put(tp0, mock(ShareFetchResponseData.PartitionData.class));
partitionDataMap.put(tp1, mock(ShareFetchResponseData.PartitionData.class));
mockedShareFetchUtils.when(() -> ShareFetchUtils.processFetchResponse(any(), any(), any(), any(), any())).thenReturn(partitionDataMap);
assertFalse(delayedShareFetch.isCompleted());
assertTrue(delayedShareFetch.tryComplete());
@ -1784,6 +1792,7 @@ public class DelayedShareFetchTest {
assertEquals(Errors.NONE.code(), future.join().get(tp1).errorCode());
assertTrue(delayedShareFetch.lock().tryLock());
delayedShareFetch.lock().unlock();
mockedShareFetchUtils.close();
}
@Test
@ -1848,8 +1857,10 @@ public class DelayedShareFetchTest {
return null;
}).when(pendingRemoteFetches).invokeCallbackOnCompletion(any());
when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class), any())).thenReturn(
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
MockedStatic<ShareFetchUtils> mockedShareFetchUtils = Mockito.mockStatic(ShareFetchUtils.class);
Map<TopicIdPartition, ShareFetchResponseData.PartitionData> partitionDataMap = new LinkedHashMap<>();
partitionDataMap.put(tp0, mock(ShareFetchResponseData.PartitionData.class));
mockedShareFetchUtils.when(() -> ShareFetchUtils.processFetchResponse(any(), any(), any(), any(), any())).thenReturn(partitionDataMap);
assertFalse(delayedShareFetch.isCompleted());
delayedShareFetch.forceComplete();
@ -1862,6 +1873,7 @@ public class DelayedShareFetchTest {
assertTrue(delayedShareFetch.outsidePurgatoryCallbackLock());
assertTrue(delayedShareFetch.lock().tryLock());
delayedShareFetch.lock().unlock();
mockedShareFetchUtils.close();
}
@Test
@ -1929,8 +1941,10 @@ public class DelayedShareFetchTest {
return null;
}).when(replicaManager).addShareFetchTimerRequest(any());
when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class), any())).thenReturn(
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
MockedStatic<ShareFetchUtils> mockedShareFetchUtils = Mockito.mockStatic(ShareFetchUtils.class);
Map<TopicIdPartition, ShareFetchResponseData.PartitionData> partitionDataMap = new LinkedHashMap<>();
partitionDataMap.put(tp0, mock(ShareFetchResponseData.PartitionData.class));
mockedShareFetchUtils.when(() -> ShareFetchUtils.processFetchResponse(any(), any(), any(), any(), any())).thenReturn(partitionDataMap);
assertFalse(delayedShareFetch.isCompleted());
delayedShareFetch.forceComplete();
@ -1943,6 +1957,7 @@ public class DelayedShareFetchTest {
assertTrue(delayedShareFetch.outsidePurgatoryCallbackLock());
assertTrue(delayedShareFetch.lock().tryLock());
delayedShareFetch.lock().unlock();
mockedShareFetchUtils.close();
}
static void mockTopicIdPartitionToReturnDataEqualToMinBytes(ReplicaManager replicaManager, TopicIdPartition topicIdPartition, int minBytes) {