mirror of https://github.com/apache/kafka.git
KAFKA-19268 Missing mocks for SharePartitionManagerTest tests (#19786)
CI / build (push) Waiting to run
Details
CI / build (push) Waiting to run
Details
Added missing mocks for SharePartitionManagerTests. Reviewers: Abhinav Dixit <adixit@confluent.io>, Andrew Schofield <aschofield@confluent.io>
This commit is contained in:
parent
6e380fbbcc
commit
77aff85b3e
|
@ -26,6 +26,7 @@ import org.apache.kafka.clients.consumer.AcknowledgeType;
|
||||||
import org.apache.kafka.common.TopicIdPartition;
|
import org.apache.kafka.common.TopicIdPartition;
|
||||||
import org.apache.kafka.common.TopicPartition;
|
import org.apache.kafka.common.TopicPartition;
|
||||||
import org.apache.kafka.common.Uuid;
|
import org.apache.kafka.common.Uuid;
|
||||||
|
import org.apache.kafka.common.compress.Compression;
|
||||||
import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
|
import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
|
||||||
import org.apache.kafka.common.errors.FencedStateEpochException;
|
import org.apache.kafka.common.errors.FencedStateEpochException;
|
||||||
import org.apache.kafka.common.errors.InvalidRecordStateException;
|
import org.apache.kafka.common.errors.InvalidRecordStateException;
|
||||||
|
@ -43,6 +44,7 @@ import org.apache.kafka.common.protocol.Errors;
|
||||||
import org.apache.kafka.common.protocol.ObjectSerializationCache;
|
import org.apache.kafka.common.protocol.ObjectSerializationCache;
|
||||||
import org.apache.kafka.common.record.FileRecords;
|
import org.apache.kafka.common.record.FileRecords;
|
||||||
import org.apache.kafka.common.record.MemoryRecords;
|
import org.apache.kafka.common.record.MemoryRecords;
|
||||||
|
import org.apache.kafka.common.record.SimpleRecord;
|
||||||
import org.apache.kafka.common.requests.FetchRequest;
|
import org.apache.kafka.common.requests.FetchRequest;
|
||||||
import org.apache.kafka.common.requests.ShareFetchResponse;
|
import org.apache.kafka.common.requests.ShareFetchResponse;
|
||||||
import org.apache.kafka.common.requests.ShareRequestMetadata;
|
import org.apache.kafka.common.requests.ShareRequestMetadata;
|
||||||
|
@ -778,9 +780,6 @@ public class SharePartitionManagerTest {
|
||||||
@Test
|
@Test
|
||||||
public void testCachedTopicPartitionsForValidShareSessions() {
|
public void testCachedTopicPartitionsForValidShareSessions() {
|
||||||
ShareSessionCache cache = new ShareSessionCache(10);
|
ShareSessionCache cache = new ShareSessionCache(10);
|
||||||
sharePartitionManager = SharePartitionManagerBuilder.builder()
|
|
||||||
.withCache(cache)
|
|
||||||
.build();
|
|
||||||
|
|
||||||
Uuid tpId0 = Uuid.randomUuid();
|
Uuid tpId0 = Uuid.randomUuid();
|
||||||
Uuid tpId1 = Uuid.randomUuid();
|
Uuid tpId1 = Uuid.randomUuid();
|
||||||
|
@ -791,6 +790,24 @@ public class SharePartitionManagerTest {
|
||||||
String groupId = "grp";
|
String groupId = "grp";
|
||||||
Uuid memberId1 = Uuid.randomUuid();
|
Uuid memberId1 = Uuid.randomUuid();
|
||||||
Uuid memberId2 = Uuid.randomUuid();
|
Uuid memberId2 = Uuid.randomUuid();
|
||||||
|
SharePartition sp0 = mock(SharePartition.class);
|
||||||
|
SharePartition sp1 = mock(SharePartition.class);
|
||||||
|
SharePartition sp2 = mock(SharePartition.class);
|
||||||
|
|
||||||
|
when(sp0.releaseAcquiredRecords(ArgumentMatchers.eq(String.valueOf(memberId1)))).thenReturn(CompletableFuture.completedFuture(null));
|
||||||
|
when(sp1.releaseAcquiredRecords(ArgumentMatchers.eq(String.valueOf(memberId1)))).thenReturn(CompletableFuture.completedFuture(null));
|
||||||
|
when(sp2.releaseAcquiredRecords(ArgumentMatchers.eq(String.valueOf(memberId1)))).thenReturn(CompletableFuture.completedFuture(null));
|
||||||
|
|
||||||
|
SharePartitionCache partitionCache = new SharePartitionCache();
|
||||||
|
partitionCache.put(new SharePartitionKey(groupId, tp0), sp0);
|
||||||
|
partitionCache.put(new SharePartitionKey(groupId, tp1), sp1);
|
||||||
|
partitionCache.put(new SharePartitionKey(groupId, tp2), sp2);
|
||||||
|
|
||||||
|
sharePartitionManager = SharePartitionManagerBuilder.builder()
|
||||||
|
.withCache(cache)
|
||||||
|
.withPartitionCache(partitionCache)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
|
||||||
// Create a new share session with an initial share fetch request.
|
// Create a new share session with an initial share fetch request.
|
||||||
List<TopicIdPartition> reqData1 = List.of(tp0, tp1);
|
List<TopicIdPartition> reqData1 = List.of(tp0, tp1);
|
||||||
|
@ -1724,7 +1741,7 @@ public class SharePartitionManagerTest {
|
||||||
// Since acquisition lock for sp1 and sp2 cannot be acquired, we should have 2 watched keys.
|
// Since acquisition lock for sp1 and sp2 cannot be acquired, we should have 2 watched keys.
|
||||||
assertEquals(2, delayedShareFetchPurgatory.watched());
|
assertEquals(2, delayedShareFetchPurgatory.watched());
|
||||||
|
|
||||||
doAnswer(invocation -> buildLogReadResult(topicIdPartitions)).when(mockReplicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
|
doAnswer(invocation -> buildLogReadResult(List.of(tp1))).when(mockReplicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
|
||||||
|
|
||||||
Map<TopicIdPartition, List<ShareAcknowledgementBatch>> acknowledgeTopics = new HashMap<>();
|
Map<TopicIdPartition, List<ShareAcknowledgementBatch>> acknowledgeTopics = new HashMap<>();
|
||||||
acknowledgeTopics.put(tp1, List.of(
|
acknowledgeTopics.put(tp1, List.of(
|
||||||
|
@ -1868,6 +1885,7 @@ public class SharePartitionManagerTest {
|
||||||
|
|
||||||
SharePartition sp1 = mock(SharePartition.class);
|
SharePartition sp1 = mock(SharePartition.class);
|
||||||
SharePartition sp2 = mock(SharePartition.class);
|
SharePartition sp2 = mock(SharePartition.class);
|
||||||
|
SharePartition sp3 = mock(SharePartition.class);
|
||||||
|
|
||||||
ShareSessionCache cache = mock(ShareSessionCache.class);
|
ShareSessionCache cache = mock(ShareSessionCache.class);
|
||||||
ShareSession shareSession = mock(ShareSession.class);
|
ShareSession shareSession = mock(ShareSession.class);
|
||||||
|
@ -1882,10 +1900,12 @@ public class SharePartitionManagerTest {
|
||||||
when(sp2.canAcquireRecords()).thenReturn(true);
|
when(sp2.canAcquireRecords()).thenReturn(true);
|
||||||
return CompletableFuture.completedFuture(Optional.empty());
|
return CompletableFuture.completedFuture(Optional.empty());
|
||||||
}).when(sp2).releaseAcquiredRecords(ArgumentMatchers.eq(memberId));
|
}).when(sp2).releaseAcquiredRecords(ArgumentMatchers.eq(memberId));
|
||||||
|
when(sp3.releaseAcquiredRecords(ArgumentMatchers.eq(memberId))).thenReturn(CompletableFuture.completedFuture(null));
|
||||||
|
|
||||||
SharePartitionCache partitionCache = new SharePartitionCache();
|
SharePartitionCache partitionCache = new SharePartitionCache();
|
||||||
partitionCache.put(new SharePartitionKey(groupId, tp1), sp1);
|
partitionCache.put(new SharePartitionKey(groupId, tp1), sp1);
|
||||||
partitionCache.put(new SharePartitionKey(groupId, tp2), sp2);
|
partitionCache.put(new SharePartitionKey(groupId, tp2), sp2);
|
||||||
|
partitionCache.put(new SharePartitionKey(groupId, tp3), sp3);
|
||||||
|
|
||||||
ShareFetch shareFetch = new ShareFetch(
|
ShareFetch shareFetch = new ShareFetch(
|
||||||
FETCH_PARAMS,
|
FETCH_PARAMS,
|
||||||
|
@ -3146,7 +3166,8 @@ public class SharePartitionManagerTest {
|
||||||
static Seq<Tuple2<TopicIdPartition, LogReadResult>> buildLogReadResult(List<TopicIdPartition> topicIdPartitions) {
|
static Seq<Tuple2<TopicIdPartition, LogReadResult>> buildLogReadResult(List<TopicIdPartition> topicIdPartitions) {
|
||||||
List<Tuple2<TopicIdPartition, LogReadResult>> logReadResults = new ArrayList<>();
|
List<Tuple2<TopicIdPartition, LogReadResult>> logReadResults = new ArrayList<>();
|
||||||
topicIdPartitions.forEach(topicIdPartition -> logReadResults.add(new Tuple2<>(topicIdPartition, new LogReadResult(
|
topicIdPartitions.forEach(topicIdPartition -> logReadResults.add(new Tuple2<>(topicIdPartition, new LogReadResult(
|
||||||
new FetchDataInfo(new LogOffsetMetadata(0, 0, 0), MemoryRecords.EMPTY),
|
new FetchDataInfo(new LogOffsetMetadata(0, 0, 0), MemoryRecords.withRecords(
|
||||||
|
Compression.NONE, new SimpleRecord("test-key".getBytes(), "test-value".getBytes()))),
|
||||||
Option.empty(),
|
Option.empty(),
|
||||||
-1L,
|
-1L,
|
||||||
-1L,
|
-1L,
|
||||||
|
|
Loading…
Reference in New Issue