diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java b/core/src/main/java/kafka/server/share/DelayedShareFetch.java index f04b5186b8b..650de5b4d97 100644 --- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java +++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java @@ -127,7 +127,7 @@ public class DelayedShareFetch extends DelayedOperation { shareFetchData.future().complete(result); } catch (Exception e) { log.error("Error processing delayed share fetch request", e); - shareFetchData.future().completeExceptionally(e); + sharePartitionManager.handleFetchException(shareFetchData.groupId(), topicPartitionData.keySet(), shareFetchData.future(), e); } finally { // Releasing the lock to move ahead with the next request in queue. releasePartitionLocks(shareFetchData.groupId(), topicPartitionData.keySet()); diff --git a/core/src/main/java/kafka/server/share/ShareFetchUtils.java b/core/src/main/java/kafka/server/share/ShareFetchUtils.java index b517f2f9e06..4eddc846a7f 100644 --- a/core/src/main/java/kafka/server/share/ShareFetchUtils.java +++ b/core/src/main/java/kafka/server/share/ShareFetchUtils.java @@ -16,9 +16,12 @@ */ package kafka.server.share; +import kafka.cluster.Partition; import kafka.server.ReplicaManager; import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.NotLeaderOrFollowerException; import org.apache.kafka.common.message.ShareFetchResponseData; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.FileRecords; @@ -128,4 +131,13 @@ public class ShareFetchUtils { Optional.empty(), true).timestampAndOffsetOpt(); return timestampAndOffset.isEmpty() ? (long) 0 : timestampAndOffset.get().offset; } + + static int leaderEpoch(ReplicaManager replicaManager, TopicPartition tp) { + Partition partition = replicaManager.getPartitionOrException(tp); + if (!partition.isLeader()) { + log.debug("The broker is not the leader for topic partition: {}-{}", tp.topic(), tp.partition()); + throw new NotLeaderOrFollowerException(); + } + return partition.getLeaderEpoch(); + } } diff --git a/core/src/main/java/kafka/server/share/SharePartition.java b/core/src/main/java/kafka/server/share/SharePartition.java index a98be380f5d..aa07bc88c4a 100644 --- a/core/src/main/java/kafka/server/share/SharePartition.java +++ b/core/src/main/java/kafka/server/share/SharePartition.java @@ -23,11 +23,13 @@ import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.CoordinatorNotAvailableException; import org.apache.kafka.common.errors.FencedStateEpochException; +import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.errors.InvalidRecordStateException; import org.apache.kafka.common.errors.InvalidRequestException; import org.apache.kafka.common.errors.LeaderNotAvailableException; import org.apache.kafka.common.errors.NotLeaderOrFollowerException; import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.message.ShareFetchResponseData.AcquiredRecords; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.RecordBatch; @@ -103,7 +105,11 @@ public class SharePartition { /** * The share partition failed to initialize with persisted state. */ - FAILED + FAILED, + /** + * The share partition is fenced and cannot be used. + */ + FENCED } /** @@ -181,6 +187,11 @@ public class SharePartition { */ private final TopicIdPartition topicIdPartition; + /** + * The leader epoch is used to track the partition epoch. + */ + private final int leaderEpoch; + /** * The in-flight record is used to track the state of a record that has been fetched from the * leader. The state of the record is used to determine if the record should be re-fetched or if it @@ -280,6 +291,7 @@ public class SharePartition { SharePartition( String groupId, TopicIdPartition topicIdPartition, + int leaderEpoch, int maxInFlightMessages, int maxDeliveryCount, int defaultRecordLockDurationMs, @@ -288,9 +300,28 @@ public class SharePartition { Persister persister, ReplicaManager replicaManager, GroupConfigManager groupConfigManager + ) { + this(groupId, topicIdPartition, leaderEpoch, maxInFlightMessages, maxDeliveryCount, defaultRecordLockDurationMs, + timer, time, persister, replicaManager, groupConfigManager, SharePartitionState.EMPTY); + } + + SharePartition( + String groupId, + TopicIdPartition topicIdPartition, + int leaderEpoch, + int maxInFlightMessages, + int maxDeliveryCount, + int defaultRecordLockDurationMs, + Timer timer, + Time time, + Persister persister, + ReplicaManager replicaManager, + GroupConfigManager groupConfigManager, + SharePartitionState sharePartitionState ) { this.groupId = groupId; this.topicIdPartition = topicIdPartition; + this.leaderEpoch = leaderEpoch; this.maxInFlightMessages = maxInFlightMessages; this.maxDeliveryCount = maxDeliveryCount; this.cachedState = new ConcurrentSkipListMap<>(); @@ -301,7 +332,7 @@ public class SharePartition { this.timer = timer; this.time = time; this.persister = persister; - this.partitionState = SharePartitionState.EMPTY; + this.partitionState = sharePartitionState; this.replicaManager = replicaManager; this.groupConfigManager = groupConfigManager; } @@ -341,7 +372,7 @@ public class SharePartition { .setGroupTopicPartitionData(new GroupTopicPartitionData.Builder() .setGroupId(this.groupId) .setTopicsData(Collections.singletonList(new TopicData<>(topicIdPartition.topicId(), - Collections.singletonList(PartitionFactory.newPartitionIdLeaderEpochData(topicIdPartition.partition(), 0))))) + Collections.singletonList(PartitionFactory.newPartitionIdLeaderEpochData(topicIdPartition.partition(), leaderEpoch))))) .build()) .build() ).whenComplete((result, exception) -> { @@ -520,13 +551,14 @@ public class SharePartition { * @param fetchPartitionData The fetched records for the share partition. * @return The acquired records for the share partition. */ + @SuppressWarnings("cyclomaticcomplexity") // Consider refactoring to avoid suppression public ShareAcquiredRecords acquire( String memberId, int maxFetchRecords, FetchPartitionData fetchPartitionData ) { log.trace("Received acquire request for share partition: {}-{} memberId: {}", groupId, topicIdPartition, memberId); - if (maxFetchRecords <= 0) { + if (stateNotActive() || maxFetchRecords <= 0) { // Nothing to acquire. return ShareAcquiredRecords.empty(); } @@ -1040,7 +1072,7 @@ public class SharePartition { * @return A boolean which indicates whether the fetch lock is acquired. */ boolean maybeAcquireFetchLock() { - if (partitionState() != SharePartitionState.ACTIVE) { + if (stateNotActive()) { return false; } return fetchLock.compareAndSet(false, true); @@ -1053,6 +1085,22 @@ public class SharePartition { fetchLock.set(false); } + /** + * Marks the share partition as fenced. + */ + void markFenced() { + lock.writeLock().lock(); + try { + partitionState = SharePartitionState.FENCED; + } finally { + lock.writeLock().unlock(); + } + } + + private boolean stateNotActive() { + return partitionState() != SharePartitionState.ACTIVE; + } + private void completeInitializationWithException(CompletableFuture future, Throwable exception) { lock.writeLock().lock(); try { @@ -1075,6 +1123,9 @@ public class SharePartition { case INITIALIZING: future.completeExceptionally(new LeaderNotAvailableException(String.format("Share partition is already initializing %s-%s", groupId, topicIdPartition))); return; + case FENCED: + future.completeExceptionally(new FencedStateEpochException(String.format("Share partition is fenced %s-%s", groupId, topicIdPartition))); + return; case EMPTY: // Do not complete the future as the share partition is not yet initialized. break; @@ -1743,7 +1794,7 @@ public class SharePartition { .setGroupId(this.groupId) .setTopicsData(Collections.singletonList(new TopicData<>(topicIdPartition.topicId(), Collections.singletonList(PartitionFactory.newPartitionStateBatchData( - topicIdPartition.partition(), stateEpoch, startOffset, 0, stateBatches)))) + topicIdPartition.partition(), stateEpoch, startOffset, leaderEpoch, stateBatches)))) ).build()).build()) .whenComplete((result, exception) -> { if (exception != null) { @@ -1792,8 +1843,9 @@ public class SharePartition { case COORDINATOR_LOAD_IN_PROGRESS: return new CoordinatorNotAvailableException(errorMessage); case GROUP_ID_NOT_FOUND: + return new GroupIdNotFoundException(errorMessage); case UNKNOWN_TOPIC_OR_PARTITION: - return new InvalidRequestException(errorMessage); + return new UnknownTopicOrPartitionException(errorMessage); case FENCED_STATE_EPOCH: return new FencedStateEpochException(errorMessage); case FENCED_LEADER_EPOCH: diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java b/core/src/main/java/kafka/server/share/SharePartitionManager.java index 19b4143c8d4..ba61ecb7657 100644 --- a/core/src/main/java/kafka/server/share/SharePartitionManager.java +++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java @@ -23,8 +23,10 @@ import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.FencedStateEpochException; +import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.errors.LeaderNotAvailableException; import org.apache.kafka.common.errors.NotLeaderOrFollowerException; +import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.message.ShareAcknowledgeResponseData; import org.apache.kafka.common.message.ShareFetchResponseData.PartitionData; import org.apache.kafka.common.metrics.Metrics; @@ -69,6 +71,7 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; import scala.jdk.javaapi.CollectionConverters; @@ -271,11 +274,13 @@ public class SharePartitionManager implements AutoCloseable { this.shareGroupMetrics.shareAcknowledgement(); Map> futures = new HashMap<>(); acknowledgeTopics.forEach((topicIdPartition, acknowledgePartitionBatches) -> { - SharePartition sharePartition = partitionCacheMap.get(sharePartitionKey(groupId, topicIdPartition)); + SharePartitionKey sharePartitionKey = sharePartitionKey(groupId, topicIdPartition); + SharePartition sharePartition = partitionCacheMap.get(sharePartitionKey); if (sharePartition != null) { CompletableFuture future = new CompletableFuture<>(); sharePartition.acknowledge(memberId, acknowledgePartitionBatches).whenComplete((result, throwable) -> { if (throwable != null) { + handleFencedSharePartitionException(sharePartitionKey, throwable); future.complete(Errors.forException(throwable)); return; } @@ -339,7 +344,8 @@ public class SharePartitionManager implements AutoCloseable { Map> futuresMap = new HashMap<>(); topicIdPartitions.forEach(topicIdPartition -> { - SharePartition sharePartition = partitionCacheMap.get(sharePartitionKey(groupId, topicIdPartition)); + SharePartitionKey sharePartitionKey = sharePartitionKey(groupId, topicIdPartition); + SharePartition sharePartition = partitionCacheMap.get(sharePartitionKey); if (sharePartition == null) { log.error("No share partition found for groupId {} topicPartition {} while releasing acquired topic partitions", groupId, topicIdPartition); futuresMap.put(topicIdPartition, CompletableFuture.completedFuture(Errors.UNKNOWN_TOPIC_OR_PARTITION)); @@ -347,6 +353,7 @@ public class SharePartitionManager implements AutoCloseable { CompletableFuture future = new CompletableFuture<>(); sharePartition.releaseAcquiredRecords(memberId).whenComplete((result, throwable) -> { if (throwable != null) { + handleFencedSharePartitionException(sharePartitionKey, throwable); future.complete(Errors.forException(throwable)); return; } @@ -490,6 +497,30 @@ public class SharePartitionManager implements AutoCloseable { } } + /** + * The handleFetchException method is used to handle the exception that occurred while reading from log. + * The method will handle the exception for each topic-partition in the request. The share partition + * might get removed from the cache. + *

+ * The replica read request might error out for one share partition + * but as we cannot determine which share partition errored out, we might remove all the share partitions + * in the request. + * + * @param groupId The group id in the share fetch request. + * @param topicIdPartitions The topic-partitions in the replica read request. + * @param future The future to complete with the exception. + * @param throwable The exception that occurred while fetching messages. + */ + public void handleFetchException( + String groupId, + Set topicIdPartitions, + CompletableFuture> future, + Throwable throwable + ) { + topicIdPartitions.forEach(topicIdPartition -> handleFencedSharePartitionException(sharePartitionKey(groupId, topicIdPartition), throwable)); + maybeCompleteShareFetchWithException(future, topicIdPartitions, throwable); + } + /** * The cachedTopicIdPartitionsInShareSession method is used to get the cached topic-partitions in the share session. * @@ -540,57 +571,76 @@ public class SharePartitionManager implements AutoCloseable { return; } - try { - shareFetchData.partitionMaxBytes().keySet().forEach(topicIdPartition -> { - SharePartitionKey sharePartitionKey = sharePartitionKey( - shareFetchData.groupId(), - topicIdPartition - ); - SharePartition sharePartition = getOrCreateSharePartition(sharePartitionKey); + // Initialize lazily, if required. + Map erroneous = null; + Set delayedShareFetchWatchKeys = new HashSet<>(); + for (TopicIdPartition topicIdPartition : shareFetchData.partitionMaxBytes().keySet()) { + SharePartitionKey sharePartitionKey = sharePartitionKey( + shareFetchData.groupId(), + topicIdPartition + ); - // The share partition is initialized asynchronously, so we need to wait for it to be initialized. - // But if the share partition is already initialized, then the future will be completed immediately. - // Hence, it's safe to call the maybeInitialize method and then wait for the future to be completed. - // TopicPartitionData list will be populated only if the share partition is already initialized. - sharePartition.maybeInitialize().whenComplete((result, throwable) -> { - if (throwable != null) { - maybeCompleteInitializationWithException(sharePartitionKey, shareFetchData.future(), throwable); - } - }); - }); - - Set delayedShareFetchWatchKeys = new HashSet<>(); - shareFetchData.partitionMaxBytes().keySet().forEach( - topicIdPartition -> { - // We add a key corresponding to each share partition in the request in the group so that when there are - // acknowledgements/acquisition lock timeout etc, we have a way to perform checkAndComplete for all - // such requests which are delayed because of lack of data to acquire for the share partition. - delayedShareFetchWatchKeys.add(new DelayedShareFetchGroupKey(shareFetchData.groupId(), topicIdPartition.topicId(), topicIdPartition.partition())); - // We add a key corresponding to each topic partition in the request so that when the HWM is updated - // for any topic partition, we have a way to perform checkAndComplete for all such requests which are - // delayed because of lack of data to acquire for the topic partition. - delayedShareFetchWatchKeys.add(new DelayedShareFetchPartitionKey(topicIdPartition.topicId(), topicIdPartition.partition())); - }); - - // Add the share fetch to the delayed share fetch purgatory to process the fetch request. - addDelayedShareFetch(new DelayedShareFetch(shareFetchData, replicaManager, this), - delayedShareFetchWatchKeys); - } catch (Exception e) { - // In case exception occurs then release the locks so queue can be further processed. - log.error("Error processing fetch queue for share partitions", e); - if (!shareFetchData.future().isDone()) { - shareFetchData.future().completeExceptionally(e); + SharePartition sharePartition; + try { + sharePartition = getOrCreateSharePartition(sharePartitionKey); + } catch (Exception e) { + // Complete the whole fetch request with an exception if there is an error processing. + // The exception currently can be thrown only if there is an error while initializing + // the share partition. But skip the processing for other share partitions in the request + // as this situation is not expected. + log.error("Error processing share fetch request", e); + if (erroneous == null) { + erroneous = new HashMap<>(); + } + erroneous.put(topicIdPartition, e); + // Continue iteration for other partitions in the request. + continue; } + + // We add a key corresponding to each share partition in the request in the group so that when there are + // acknowledgements/acquisition lock timeout etc., we have a way to perform checkAndComplete for all + // such requests which are delayed because of lack of data to acquire for the share partition. + delayedShareFetchWatchKeys.add(new DelayedShareFetchGroupKey(shareFetchData.groupId(), topicIdPartition.topicId(), topicIdPartition.partition())); + // We add a key corresponding to each topic partition in the request so that when the HWM is updated + // for any topic partition, we have a way to perform checkAndComplete for all such requests which are + // delayed because of lack of data to acquire for the topic partition. + delayedShareFetchWatchKeys.add(new DelayedShareFetchPartitionKey(topicIdPartition.topicId(), topicIdPartition.partition())); + // The share partition is initialized asynchronously, so we need to wait for it to be initialized. + // But if the share partition is already initialized, then the future will be completed immediately. + // Hence, it's safe to call the maybeInitialize method and then wait for the future to be completed. + // TopicPartitionData list will be populated only if the share partition is already initialized. + sharePartition.maybeInitialize().whenComplete((result, throwable) -> { + if (throwable != null) { + // TODO: Complete error handling for initialization. We have to record the error + // for respective share partition as completing the full request might result in + // some acquired records to not being sent: https://issues.apache.org/jira/browse/KAFKA-17510 + maybeCompleteInitializationWithException(sharePartitionKey, shareFetchData.future(), throwable); + } + }); } + + // If all the partitions in the request errored out, then complete the fetch request with an exception. + if (erroneous != null && erroneous.size() == shareFetchData.partitionMaxBytes().size()) { + completeShareFetchWithException(shareFetchData.future(), erroneous); + // Do not proceed with share fetch processing as all the partitions errored out. + return; + } + + // TODO: If there exists some erroneous partitions then they will not be part of response. + + // Add the share fetch to the delayed share fetch purgatory to process the fetch request. + addDelayedShareFetch(new DelayedShareFetch(shareFetchData, replicaManager, this), delayedShareFetchWatchKeys); } private SharePartition getOrCreateSharePartition(SharePartitionKey sharePartitionKey) { return partitionCacheMap.computeIfAbsent(sharePartitionKey, k -> { long start = time.hiResClockMs(); + int leaderEpoch = ShareFetchUtils.leaderEpoch(replicaManager, sharePartitionKey.topicIdPartition().topicPartition()); SharePartition partition = new SharePartition( sharePartitionKey.groupId(), sharePartitionKey.topicIdPartition(), + leaderEpoch, maxInFlightMessages, maxDeliveryCount, defaultRecordLockDurationMs, @@ -617,22 +667,47 @@ public class SharePartitionManager implements AutoCloseable { return; } - if (throwable instanceof NotLeaderOrFollowerException || throwable instanceof FencedStateEpochException) { + // Remove the partition from the cache as it's failed to initialize. + partitionCacheMap.remove(sharePartitionKey); + // The partition initialization failed, so complete the request with the exception. + // The server should not be in this state, so log the error on broker and surface the same + // to the client. The broker should not be in this state, investigate the root cause of the error. + log.error("Error initializing share partition with key {}", sharePartitionKey, throwable); + maybeCompleteShareFetchWithException(future, Collections.singletonList(sharePartitionKey.topicIdPartition()), throwable); + } + + private void handleFencedSharePartitionException( + SharePartitionKey sharePartitionKey, + Throwable throwable + ) { + if (throwable instanceof NotLeaderOrFollowerException || throwable instanceof FencedStateEpochException || + throwable instanceof GroupIdNotFoundException || throwable instanceof UnknownTopicOrPartitionException) { log.info("The share partition with key {} is fenced: {}", sharePartitionKey, throwable.getMessage()); // The share partition is fenced hence remove the partition from map and let the client retry. // But surface the error to the client so client might take some action i.e. re-fetch // the metadata and retry the fetch on new leader. - partitionCacheMap.remove(sharePartitionKey); - future.completeExceptionally(throwable); - return; + SharePartition sharePartition = partitionCacheMap.remove(sharePartitionKey); + if (sharePartition != null) { + sharePartition.markFenced(); + } } + } - // The partition initialization failed, so complete the request with the exception. - // The server should not be in this state, so log the error on broker and surface the same - // to the client. As of now this state is in-recoverable for the broker, and we should - // investigate the root cause of the error. - log.error("Error initializing share partition with key {}", sharePartitionKey, throwable); - future.completeExceptionally(throwable); + private void maybeCompleteShareFetchWithException(CompletableFuture> future, + Collection topicIdPartitions, Throwable throwable) { + if (!future.isDone()) { + future.complete(topicIdPartitions.stream().collect(Collectors.toMap( + tp -> tp, tp -> new PartitionData().setErrorCode(Errors.forException(throwable).code()).setErrorMessage(throwable.getMessage())))); + } + } + + private void completeShareFetchWithException(CompletableFuture> future, + Map erroneous) { + future.complete(erroneous.entrySet().stream().collect(Collectors.toMap( + Map.Entry::getKey, entry -> { + Throwable t = entry.getValue(); + return new PartitionData().setErrorCode(Errors.forException(t).code()).setErrorMessage(t.getMessage()); + }))); } private SharePartitionKey sharePartitionKey(String groupId, TopicIdPartition topicIdPartition) { diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index b4bb9a69198..d288bf43990 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -4249,7 +4249,8 @@ class KafkaApis(val requestChannel: RequestChannel, fetchMinBytes, fetchMaxBytes, FetchIsolation.HIGH_WATERMARK, - clientMetadata + clientMetadata, + true ) // call the share partition manager to fetch messages from the local replica. diff --git a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java index c20f04232a2..d4fd9dd1079 100644 --- a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java +++ b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java @@ -76,9 +76,9 @@ public class DelayedShareFetchTest { private static final int MAX_FETCH_RECORDS = 100; private static final FetchParams FETCH_PARAMS = new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS, 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, - Optional.empty()); + Optional.empty(), true); - private static Timer mockTimer; + private Timer mockTimer; @BeforeEach public void setUp() { diff --git a/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java b/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java index 13248fb7110..27e3a194090 100644 --- a/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java +++ b/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java @@ -67,6 +67,10 @@ import static org.mockito.Mockito.when; public class ShareFetchUtilsTest { + private static final FetchParams FETCH_PARAMS = new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), + FetchRequest.ORDINARY_CONSUMER_ID, -1, 0, 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, + Optional.empty(), true); + @Test public void testProcessFetchResponse() { String groupId = "grp"; @@ -97,22 +101,20 @@ public class ShareFetchUtilsTest { when(sharePartitionManager.sharePartition(groupId, tp0)).thenReturn(sp0); when(sharePartitionManager.sharePartition(groupId, tp1)).thenReturn(sp1); - ShareFetchData shareFetchData = new ShareFetchData( - new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, 0, - 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, memberId, - new CompletableFuture<>(), partitionMaxBytes, 100); + ShareFetchData shareFetchData = new ShareFetchData(FETCH_PARAMS, groupId, memberId, + new CompletableFuture<>(), partitionMaxBytes, 100); MemoryRecords records = MemoryRecords.withRecords(Compression.NONE, - new SimpleRecord("0".getBytes(), "v".getBytes()), - new SimpleRecord("1".getBytes(), "v".getBytes()), - new SimpleRecord("2".getBytes(), "v".getBytes()), - new SimpleRecord(null, "value".getBytes())); + new SimpleRecord("0".getBytes(), "v".getBytes()), + new SimpleRecord("1".getBytes(), "v".getBytes()), + new SimpleRecord("2".getBytes(), "v".getBytes()), + new SimpleRecord(null, "value".getBytes())); MemoryRecords records1 = MemoryRecords.withRecords(100L, Compression.NONE, - new SimpleRecord("0".getBytes(), "v".getBytes()), - new SimpleRecord("1".getBytes(), "v".getBytes()), - new SimpleRecord("2".getBytes(), "v".getBytes()), - new SimpleRecord(null, "value".getBytes())); + new SimpleRecord("0".getBytes(), "v".getBytes()), + new SimpleRecord("1".getBytes(), "v".getBytes()), + new SimpleRecord("2".getBytes(), "v".getBytes()), + new SimpleRecord(null, "value".getBytes())); Map responseData = new HashMap<>(); responseData.put(tp0, new FetchPartitionData(Errors.NONE, 0L, 0L, @@ -165,20 +167,18 @@ public class ShareFetchUtilsTest { when(sharePartitionManager.sharePartition(groupId, tp0)).thenReturn(sp0); when(sharePartitionManager.sharePartition(groupId, tp1)).thenReturn(sp1); - ShareFetchData shareFetchData = new ShareFetchData( - new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, 0, - 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, memberId, - new CompletableFuture<>(), partitionMaxBytes, 100); + ShareFetchData shareFetchData = new ShareFetchData(FETCH_PARAMS, groupId, memberId, + new CompletableFuture<>(), partitionMaxBytes, 100); Map responseData = new HashMap<>(); responseData.put(tp0, new FetchPartitionData(Errors.NONE, 0L, 0L, - MemoryRecords.EMPTY, Optional.empty(), OptionalLong.empty(), Optional.empty(), - OptionalInt.empty(), false)); + MemoryRecords.EMPTY, Optional.empty(), OptionalLong.empty(), Optional.empty(), + OptionalInt.empty(), false)); responseData.put(tp1, new FetchPartitionData(Errors.NONE, 0L, 0L, - MemoryRecords.EMPTY, Optional.empty(), OptionalLong.empty(), Optional.empty(), - OptionalInt.empty(), false)); + MemoryRecords.EMPTY, Optional.empty(), OptionalLong.empty(), Optional.empty(), + OptionalInt.empty(), false)); Map resultData = - ShareFetchUtils.processFetchResponse(shareFetchData, responseData, sharePartitionManager, mock(ReplicaManager.class)); + ShareFetchUtils.processFetchResponse(shareFetchData, responseData, sharePartitionManager, mock(ReplicaManager.class)); assertEquals(2, resultData.size()); assertTrue(resultData.containsKey(tp0)); @@ -209,10 +209,8 @@ public class ShareFetchUtilsTest { when(sharePartitionManager.sharePartition(groupId, tp0)).thenReturn(sp0); when(sharePartitionManager.sharePartition(groupId, tp1)).thenReturn(sp1); - ShareFetchData shareFetchData = new ShareFetchData( - new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, 0, - 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), - groupId, Uuid.randomUuid().toString(), new CompletableFuture<>(), partitionMaxBytes, 100); + ShareFetchData shareFetchData = new ShareFetchData(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(), + new CompletableFuture<>(), partitionMaxBytes, 100); ReplicaManager replicaManager = mock(ReplicaManager.class); @@ -222,6 +220,7 @@ public class ShareFetchUtilsTest { when(sp0.nextFetchOffset()).thenReturn((long) 0, (long) 5); when(sp1.nextFetchOffset()).thenReturn((long) 4, (long) 4); + when(sp0.acquire(anyString(), anyInt(), any(FetchPartitionData.class))).thenReturn( ShareAcquiredRecords.empty(), ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords() @@ -235,20 +234,20 @@ public class ShareFetchUtilsTest { doNothing().when(sp0).updateCacheAndOffsets(any(Long.class)); MemoryRecords records1 = MemoryRecords.withRecords(Compression.NONE, - new SimpleRecord("0".getBytes(), "v".getBytes()), - new SimpleRecord("1".getBytes(), "v".getBytes()), - new SimpleRecord("2".getBytes(), "v".getBytes()), - new SimpleRecord(null, "value".getBytes())); + new SimpleRecord("0".getBytes(), "v".getBytes()), + new SimpleRecord("1".getBytes(), "v".getBytes()), + new SimpleRecord("2".getBytes(), "v".getBytes()), + new SimpleRecord(null, "value".getBytes())); Map responseData1 = new HashMap<>(); responseData1.put(tp0, new FetchPartitionData(Errors.OFFSET_OUT_OF_RANGE, 0L, 0L, - MemoryRecords.EMPTY, Optional.empty(), OptionalLong.empty(), Optional.empty(), - OptionalInt.empty(), false)); + MemoryRecords.EMPTY, Optional.empty(), OptionalLong.empty(), Optional.empty(), + OptionalInt.empty(), false)); responseData1.put(tp1, new FetchPartitionData(Errors.NONE, 0L, 0L, - records1, Optional.empty(), OptionalLong.empty(), Optional.empty(), - OptionalInt.empty(), false)); + records1, Optional.empty(), OptionalLong.empty(), Optional.empty(), + OptionalInt.empty(), false)); Map resultData1 = - ShareFetchUtils.processFetchResponse(shareFetchData, responseData1, sharePartitionManager, replicaManager); + ShareFetchUtils.processFetchResponse(shareFetchData, responseData1, sharePartitionManager, replicaManager); assertEquals(2, resultData1.size()); assertTrue(resultData1.containsKey(tp0)); @@ -264,20 +263,20 @@ public class ShareFetchUtilsTest { Mockito.verify(sp1, times(0)).updateCacheAndOffsets(any(Long.class)); MemoryRecords records2 = MemoryRecords.withRecords(100L, Compression.NONE, - new SimpleRecord("0".getBytes(), "v".getBytes()), - new SimpleRecord("1".getBytes(), "v".getBytes()), - new SimpleRecord("2".getBytes(), "v".getBytes()), - new SimpleRecord(null, "value".getBytes())); + new SimpleRecord("0".getBytes(), "v".getBytes()), + new SimpleRecord("1".getBytes(), "v".getBytes()), + new SimpleRecord("2".getBytes(), "v".getBytes()), + new SimpleRecord(null, "value".getBytes())); Map responseData2 = new HashMap<>(); responseData2.put(tp0, new FetchPartitionData(Errors.NONE, 0L, 0L, - records2, Optional.empty(), OptionalLong.empty(), Optional.empty(), - OptionalInt.empty(), false)); + records2, Optional.empty(), OptionalLong.empty(), Optional.empty(), + OptionalInt.empty(), false)); responseData2.put(tp1, new FetchPartitionData(Errors.NONE, 0L, 0L, - MemoryRecords.EMPTY, Optional.empty(), OptionalLong.empty(), Optional.empty(), - OptionalInt.empty(), false)); + MemoryRecords.EMPTY, Optional.empty(), OptionalLong.empty(), Optional.empty(), + OptionalInt.empty(), false)); Map resultData2 = - ShareFetchUtils.processFetchResponse(shareFetchData, responseData2, sharePartitionManager, replicaManager); + ShareFetchUtils.processFetchResponse(shareFetchData, responseData2, sharePartitionManager, replicaManager); assertEquals(2, resultData2.size()); assertTrue(resultData2.containsKey(tp0)); @@ -304,10 +303,8 @@ public class ShareFetchUtilsTest { SharePartitionManager sharePartitionManager = mock(SharePartitionManager.class); when(sharePartitionManager.sharePartition(groupId, tp0)).thenReturn(sp0); - ShareFetchData shareFetchData = new ShareFetchData( - new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, 0, - 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), - groupId, Uuid.randomUuid().toString(), new CompletableFuture<>(), partitionMaxBytes, 100); + ShareFetchData shareFetchData = new ShareFetchData(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(), + new CompletableFuture<>(), partitionMaxBytes, 100); ReplicaManager replicaManager = mock(ReplicaManager.class); diff --git a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java index b24d0dc6e8b..4e461d3df3b 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java @@ -16,6 +16,7 @@ */ package kafka.server.share; +import kafka.cluster.Partition; import kafka.server.DelayedOperationPurgatory; import kafka.server.LogReadResult; import kafka.server.ReplicaManager; @@ -31,11 +32,13 @@ import org.apache.kafka.common.errors.FencedStateEpochException; import org.apache.kafka.common.errors.InvalidRecordStateException; import org.apache.kafka.common.errors.InvalidRequestException; import org.apache.kafka.common.errors.InvalidShareSessionEpochException; +import org.apache.kafka.common.errors.KafkaStorageException; import org.apache.kafka.common.errors.LeaderNotAvailableException; import org.apache.kafka.common.errors.NotLeaderOrFollowerException; import org.apache.kafka.common.errors.ShareSessionNotFoundException; import org.apache.kafka.common.message.ShareAcknowledgeResponseData; import org.apache.kafka.common.message.ShareFetchResponseData; +import org.apache.kafka.common.message.ShareFetchResponseData.PartitionData; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; @@ -56,6 +59,7 @@ import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch; import org.apache.kafka.server.share.context.FinalContext; import org.apache.kafka.server.share.context.ShareFetchContext; import org.apache.kafka.server.share.context.ShareSessionContext; +import org.apache.kafka.server.share.fetch.ShareAcquiredRecords; import org.apache.kafka.server.share.fetch.ShareFetchData; import org.apache.kafka.server.share.persister.NoOpShareStatePersister; import org.apache.kafka.server.share.persister.Persister; @@ -102,7 +106,6 @@ import scala.Tuple2; import scala.collection.Seq; import scala.jdk.javaapi.CollectionConverters; -import static org.apache.kafka.test.TestUtils.assertFutureThrows; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; @@ -113,9 +116,12 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.atMost; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -133,12 +139,13 @@ public class SharePartitionManagerTest { private static final int DELAYED_SHARE_FETCH_TIMEOUT_MS = 3000; private static final FetchParams FETCH_PARAMS = new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, DELAYED_SHARE_FETCH_MAX_WAIT_MS, - 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()); + 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty(), true); static final int PARTITION_MAX_BYTES = 40000; static final int DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL = 1000; - private static Timer mockTimer; + private Timer mockTimer; + private ReplicaManager mockReplicaManager; private static final List EMPTY_PART_LIST = Collections.unmodifiableList(new ArrayList<>()); @@ -146,6 +153,9 @@ public class SharePartitionManagerTest { public void setUp() { mockTimer = new SystemTimerReaper("sharePartitionManagerTestReaper", new SystemTimer("sharePartitionManagerTestTimer")); + mockReplicaManager = mock(ReplicaManager.class); + Partition partition = mockPartition(); + when(mockReplicaManager.getPartitionOrException(Mockito.any())).thenReturn(partition); } @AfterEach @@ -1026,34 +1036,33 @@ public class SharePartitionManagerTest { partitionMaxBytes.put(tp5, PARTITION_MAX_BYTES); partitionMaxBytes.put(tp6, PARTITION_MAX_BYTES); - ReplicaManager replicaManager = mock(ReplicaManager.class); Time time = mock(Time.class); when(time.hiResClockMs()).thenReturn(0L).thenReturn(100L); Metrics metrics = new Metrics(); DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory<>( - "TestShareFetch", mockTimer, replicaManager.localBrokerId(), + "TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(), DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true); - mockReplicaManagerDelayedShareFetch(replicaManager, delayedShareFetchPurgatory); + mockReplicaManagerDelayedShareFetch(mockReplicaManager, delayedShareFetchPurgatory); SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() - .withReplicaManager(replicaManager) + .withReplicaManager(mockReplicaManager) .withTime(time) .withMetrics(metrics) .withTimer(mockTimer) .build(); - doAnswer(invocation -> buildLogReadResult(partitionMaxBytes.keySet())).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); + doAnswer(invocation -> buildLogReadResult(partitionMaxBytes.keySet())).when(mockReplicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); sharePartitionManager.fetchMessages(groupId, memberId1.toString(), FETCH_PARAMS, partitionMaxBytes); - Mockito.verify(replicaManager, times(1)).readFromLog( + Mockito.verify(mockReplicaManager, times(1)).readFromLog( any(), any(), any(ReplicaQuota.class), anyBoolean()); sharePartitionManager.fetchMessages(groupId, memberId1.toString(), FETCH_PARAMS, partitionMaxBytes); - Mockito.verify(replicaManager, times(2)).readFromLog( + Mockito.verify(mockReplicaManager, times(2)).readFromLog( any(), any(), any(ReplicaQuota.class), anyBoolean()); sharePartitionManager.fetchMessages(groupId, memberId1.toString(), FETCH_PARAMS, partitionMaxBytes); - Mockito.verify(replicaManager, times(3)).readFromLog( + Mockito.verify(mockReplicaManager, times(3)).readFromLog( any(), any(), any(ReplicaQuota.class), anyBoolean()); Map> expectedMetrics = new HashMap<>(); @@ -1089,15 +1098,14 @@ public class SharePartitionManagerTest { partitionMaxBytes.put(tp3, PARTITION_MAX_BYTES); final Time time = new MockTime(0, System.currentTimeMillis(), 0); - ReplicaManager replicaManager = mock(ReplicaManager.class); DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory<>( - "TestShareFetch", mockTimer, replicaManager.localBrokerId(), + "TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(), DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true); - mockReplicaManagerDelayedShareFetch(replicaManager, delayedShareFetchPurgatory); + mockReplicaManagerDelayedShareFetch(mockReplicaManager, delayedShareFetchPurgatory); SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() .withTime(time) - .withReplicaManager(replicaManager) + .withReplicaManager(mockReplicaManager) .withTimer(mockTimer) .build(); @@ -1141,7 +1149,7 @@ public class SharePartitionManagerTest { assertEquals(26, sp2.nextFetchOffset()); assertEquals(16, sp3.nextFetchOffset()); return buildLogReadResult(partitionMaxBytes.keySet()); - }).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); + }).when(mockReplicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); int threadCount = 100; ExecutorService executorService = Executors.newFixedThreadPool(threadCount); @@ -1160,9 +1168,9 @@ public class SharePartitionManagerTest { executorService.shutdown(); } // We are checking the number of replicaManager readFromLog() calls - Mockito.verify(replicaManager, atMost(100)).readFromLog( + Mockito.verify(mockReplicaManager, atMost(100)).readFromLog( any(), any(), any(ReplicaQuota.class), anyBoolean()); - Mockito.verify(replicaManager, atLeast(10)).readFromLog( + Mockito.verify(mockReplicaManager, atLeast(10)).readFromLog( any(), any(), any(ReplicaQuota.class), anyBoolean()); } @@ -1175,8 +1183,6 @@ public class SharePartitionManagerTest { Map partitionMaxBytes = new HashMap<>(); partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES); - ReplicaManager replicaManager = mock(ReplicaManager.class); - SharePartition sp0 = mock(SharePartition.class); when(sp0.maybeAcquireFetchLock()).thenReturn(true); when(sp0.canAcquireRecords()).thenReturn(false); @@ -1185,19 +1191,19 @@ public class SharePartitionManagerTest { partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0); DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory<>( - "TestShareFetch", mockTimer, replicaManager.localBrokerId(), + "TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(), DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true); - mockReplicaManagerDelayedShareFetch(replicaManager, delayedShareFetchPurgatory); + mockReplicaManagerDelayedShareFetch(mockReplicaManager, delayedShareFetchPurgatory); SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() .withPartitionCacheMap(partitionCacheMap) - .withReplicaManager(replicaManager) + .withReplicaManager(mockReplicaManager) .withTimer(mockTimer) .build(); CompletableFuture> future = sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, partitionMaxBytes); - Mockito.verify(replicaManager, times(0)).readFromLog( + Mockito.verify(mockReplicaManager, times(0)).readFromLog( any(), any(), any(ReplicaQuota.class), anyBoolean()); Map result = future.join(); assertEquals(0, result.size()); @@ -1209,27 +1215,24 @@ public class SharePartitionManagerTest { Uuid memberId = Uuid.randomUuid(); Uuid fooId = Uuid.randomUuid(); TopicIdPartition tp0 = new TopicIdPartition(fooId, new TopicPartition("foo", 0)); - Map partitionMaxBytes = new HashMap<>(); - partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES); - - ReplicaManager replicaManager = mock(ReplicaManager.class); + Map partitionMaxBytes = Collections.singletonMap(tp0, PARTITION_MAX_BYTES); DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory<>( - "TestShareFetch", mockTimer, replicaManager.localBrokerId(), + "TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(), DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true); - mockReplicaManagerDelayedShareFetch(replicaManager, delayedShareFetchPurgatory); + mockReplicaManagerDelayedShareFetch(mockReplicaManager, delayedShareFetchPurgatory); SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() - .withReplicaManager(replicaManager) + .withReplicaManager(mockReplicaManager) .withTimer(mockTimer) .build(); - doAnswer(invocation -> buildLogReadResult(partitionMaxBytes.keySet())).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); + doAnswer(invocation -> buildLogReadResult(partitionMaxBytes.keySet())).when(mockReplicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, partitionMaxBytes); // Since the nextFetchOffset does not point to endOffset + 1, i.e. some of the records in the cachedState are AVAILABLE, // even though the maxInFlightMessages limit is exceeded, replicaManager.readFromLog should be called - Mockito.verify(replicaManager, times(1)).readFromLog( + Mockito.verify(mockReplicaManager, times(1)).readFromLog( any(), any(), any(ReplicaQuota.class), anyBoolean()); } @@ -1643,12 +1646,11 @@ public class SharePartitionManagerTest { new CompletableFuture<>(), partitionMaxBytes, 100); - ReplicaManager replicaManager = mock(ReplicaManager.class); DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory<>( - "TestShareFetch", mockTimer, replicaManager.localBrokerId(), + "TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(), DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true); - mockReplicaManagerDelayedShareFetch(replicaManager, delayedShareFetchPurgatory); + mockReplicaManagerDelayedShareFetch(mockReplicaManager, delayedShareFetchPurgatory); // Initially you cannot acquire records for both sp1 and sp2. when(sp1.maybeAcquireFetchLock()).thenReturn(true); @@ -1661,13 +1663,13 @@ public class SharePartitionManagerTest { SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() .withPartitionCacheMap(partitionCacheMap) - .withReplicaManager(replicaManager) + .withReplicaManager(mockReplicaManager) .withTimer(mockTimer) .build(); DelayedShareFetch delayedShareFetch = DelayedShareFetchTest.DelayedShareFetchBuilder.builder() .withShareFetchData(shareFetchData) - .withReplicaManager(replicaManager) + .withReplicaManager(mockReplicaManager) .withSharePartitionManager(sharePartitionManager) .build(); @@ -1677,7 +1679,7 @@ public class SharePartitionManagerTest { // Since acquisition lock for sp1 and sp2 cannot be acquired, we should have 2 watched keys. assertEquals(2, delayedShareFetchPurgatory.watched()); - doAnswer(invocation -> buildLogReadResult(partitionMaxBytes.keySet())).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); + doAnswer(invocation -> buildLogReadResult(partitionMaxBytes.keySet())).when(mockReplicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); Map> acknowledgeTopics = new HashMap<>(); acknowledgeTopics.put(tp1, Arrays.asList( @@ -1739,12 +1741,11 @@ public class SharePartitionManagerTest { new CompletableFuture<>(), partitionMaxBytes, 100); - ReplicaManager replicaManager = mock(ReplicaManager.class); DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory<>( - "TestShareFetch", mockTimer, replicaManager.localBrokerId(), + "TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(), DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true); - mockReplicaManagerDelayedShareFetch(replicaManager, delayedShareFetchPurgatory); + mockReplicaManagerDelayedShareFetch(mockReplicaManager, delayedShareFetchPurgatory); // Initially you cannot acquire records for both all 3 share partitions. when(sp1.maybeAcquireFetchLock()).thenReturn(true); @@ -1759,13 +1760,13 @@ public class SharePartitionManagerTest { SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() .withPartitionCacheMap(partitionCacheMap) - .withReplicaManager(replicaManager) + .withReplicaManager(mockReplicaManager) .withTimer(mockTimer) .build(); DelayedShareFetch delayedShareFetch = DelayedShareFetchTest.DelayedShareFetchBuilder.builder() .withShareFetchData(shareFetchData) - .withReplicaManager(replicaManager) + .withReplicaManager(mockReplicaManager) .withSharePartitionManager(sharePartitionManager) .build(); @@ -1832,12 +1833,11 @@ public class SharePartitionManagerTest { new CompletableFuture<>(), partitionMaxBytes, 100); - ReplicaManager replicaManager = mock(ReplicaManager.class); DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory<>( - "TestShareFetch", mockTimer, replicaManager.localBrokerId(), + "TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(), DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true); - mockReplicaManagerDelayedShareFetch(replicaManager, delayedShareFetchPurgatory); + mockReplicaManagerDelayedShareFetch(mockReplicaManager, delayedShareFetchPurgatory); // Initially you cannot acquire records for both sp1 and sp2. when(sp1.maybeAcquireFetchLock()).thenReturn(true); @@ -1851,13 +1851,13 @@ public class SharePartitionManagerTest { SharePartitionManager sharePartitionManager = spy(SharePartitionManagerBuilder.builder() .withPartitionCacheMap(partitionCacheMap) .withCache(cache) - .withReplicaManager(replicaManager) + .withReplicaManager(mockReplicaManager) .withTimer(mockTimer) .build()); DelayedShareFetch delayedShareFetch = DelayedShareFetchTest.DelayedShareFetchBuilder.builder() .withShareFetchData(shareFetchData) - .withReplicaManager(replicaManager) + .withReplicaManager(mockReplicaManager) .withSharePartitionManager(sharePartitionManager) .build(); @@ -1867,7 +1867,7 @@ public class SharePartitionManagerTest { // Since acquisition lock for sp1 and sp2 cannot be acquired, we should have 2 watched keys. assertEquals(2, delayedShareFetchPurgatory.watched()); - doAnswer(invocation -> buildLogReadResult(partitionMaxBytes.keySet())).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); + doAnswer(invocation -> buildLogReadResult(partitionMaxBytes.keySet())).when(mockReplicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); assertEquals(2, delayedShareFetchPurgatory.watched()); @@ -1932,12 +1932,11 @@ public class SharePartitionManagerTest { new CompletableFuture<>(), partitionMaxBytes, 100); - ReplicaManager replicaManager = mock(ReplicaManager.class); DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory<>( - "TestShareFetch", mockTimer, replicaManager.localBrokerId(), + "TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(), DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true); - mockReplicaManagerDelayedShareFetch(replicaManager, delayedShareFetchPurgatory); + mockReplicaManagerDelayedShareFetch(mockReplicaManager, delayedShareFetchPurgatory); // Initially you cannot acquire records for both all 3 share partitions. when(sp1.maybeAcquireFetchLock()).thenReturn(true); @@ -1953,13 +1952,13 @@ public class SharePartitionManagerTest { SharePartitionManager sharePartitionManager = spy(SharePartitionManagerBuilder.builder() .withPartitionCacheMap(partitionCacheMap) .withCache(cache) - .withReplicaManager(replicaManager) + .withReplicaManager(mockReplicaManager) .withTimer(mockTimer) .build()); DelayedShareFetch delayedShareFetch = DelayedShareFetchTest.DelayedShareFetchBuilder.builder() .withShareFetchData(shareFetchData) - .withReplicaManager(replicaManager) + .withReplicaManager(mockReplicaManager) .withSharePartitionManager(sharePartitionManager) .build(); @@ -2000,16 +1999,13 @@ public class SharePartitionManagerTest { CompletableFuture pendingInitializationFuture = new CompletableFuture<>(); when(sp0.maybeInitialize()).thenReturn(pendingInitializationFuture); - // Mock replica manager to verify no calls are made to fetchMessages. - ReplicaManager replicaManager = mock(ReplicaManager.class); - DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory<>( - "TestShareFetch", mockTimer, replicaManager.localBrokerId(), + "TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(), DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true); - mockReplicaManagerDelayedShareFetch(replicaManager, delayedShareFetchPurgatory); + mockReplicaManagerDelayedShareFetch(mockReplicaManager, delayedShareFetchPurgatory); SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() - .withPartitionCacheMap(partitionCacheMap).withReplicaManager(replicaManager).withTimer(mockTimer) + .withPartitionCacheMap(partitionCacheMap).withReplicaManager(mockReplicaManager).withTimer(mockTimer) .build(); CompletableFuture> future = @@ -2021,7 +2017,7 @@ public class SharePartitionManagerTest { () -> "Processing in delayed share fetch queue never ended."); assertTrue(future.join().isEmpty()); // Verify that replica manager fetch is not called. - Mockito.verify(replicaManager, times(0)).readFromLog( + Mockito.verify(mockReplicaManager, times(0)).readFromLog( any(), any(), any(ReplicaQuota.class), anyBoolean()); // Complete the pending initialization future. pendingInitializationFuture.complete(null); @@ -2039,14 +2035,13 @@ public class SharePartitionManagerTest { Map partitionCacheMap = new HashMap<>(); partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0); - ReplicaManager replicaManager = mock(ReplicaManager.class); DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory<>( - "TestShareFetch", mockTimer, replicaManager.localBrokerId(), + "TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(), DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true); - mockReplicaManagerDelayedShareFetch(replicaManager, delayedShareFetchPurgatory); + mockReplicaManagerDelayedShareFetch(mockReplicaManager, delayedShareFetchPurgatory); SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() - .withPartitionCacheMap(partitionCacheMap).withReplicaManager(replicaManager).withTimer(mockTimer) + .withPartitionCacheMap(partitionCacheMap).withReplicaManager(mockReplicaManager).withTimer(mockTimer) .build(); // Return LeaderNotAvailableException to simulate initialization failure. @@ -2061,6 +2056,8 @@ public class SharePartitionManagerTest { // between SharePartitionManager and SharePartition to retry the request as SharePartition is not yet ready. assertFalse(future.isCompletedExceptionally()); assertTrue(future.join().isEmpty()); + // Verify that the share partition is still in the cache on LeaderNotAvailableException. + assertEquals(1, partitionCacheMap.size()); // Return IllegalStateException to simulate initialization failure. when(sp0.maybeInitialize()).thenReturn(FutureUtils.failedFuture(new IllegalStateException("Illegal state"))); @@ -2069,9 +2066,11 @@ public class SharePartitionManagerTest { future::isDone, DELAYED_SHARE_FETCH_TIMEOUT_MS, () -> "Processing in delayed share fetch queue never ended."); - assertTrue(future.isCompletedExceptionally()); - assertFutureThrows(future, IllegalStateException.class); + validateShareFetchFutureException(future, tp0, Errors.UNKNOWN_SERVER_ERROR, "Illegal state"); + assertTrue(partitionCacheMap.isEmpty()); + // The last exception removes the share partition from the cache hence re-add the share partition to cache. + partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0); // Return CoordinatorNotAvailableException to simulate initialization failure. when(sp0.maybeInitialize()).thenReturn(FutureUtils.failedFuture(new CoordinatorNotAvailableException("Coordinator not available"))); future = sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, partitionMaxBytes); @@ -2079,9 +2078,11 @@ public class SharePartitionManagerTest { future::isDone, DELAYED_SHARE_FETCH_TIMEOUT_MS, () -> "Processing in delayed share fetch queue never ended."); - assertTrue(future.isCompletedExceptionally()); - assertFutureThrows(future, CoordinatorNotAvailableException.class); + validateShareFetchFutureException(future, tp0, Errors.COORDINATOR_NOT_AVAILABLE, "Coordinator not available"); + assertTrue(partitionCacheMap.isEmpty()); + // The last exception removes the share partition from the cache hence re-add the share partition to cache. + partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0); // Return InvalidRequestException to simulate initialization failure. when(sp0.maybeInitialize()).thenReturn(FutureUtils.failedFuture(new InvalidRequestException("Invalid request"))); future = sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, partitionMaxBytes); @@ -2089,21 +2090,19 @@ public class SharePartitionManagerTest { future::isDone, DELAYED_SHARE_FETCH_TIMEOUT_MS, () -> "Processing in delayed share fetch queue never ended."); - assertTrue(future.isCompletedExceptionally()); - assertFutureThrows(future, InvalidRequestException.class); + validateShareFetchFutureException(future, tp0, Errors.INVALID_REQUEST, "Invalid request"); + assertTrue(partitionCacheMap.isEmpty()); + // The last exception removes the share partition from the cache hence re-add the share partition to cache. + partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0); // Return FencedStateEpochException to simulate initialization failure. when(sp0.maybeInitialize()).thenReturn(FutureUtils.failedFuture(new FencedStateEpochException("Fenced state epoch"))); - // Assert that partitionCacheMap contains instance before the fetch request. - assertEquals(1, partitionCacheMap.size()); future = sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, partitionMaxBytes); TestUtils.waitForCondition( future::isDone, DELAYED_SHARE_FETCH_TIMEOUT_MS, () -> "Processing in delayed share fetch queue never ended."); - assertTrue(future.isCompletedExceptionally()); - assertFutureThrows(future, FencedStateEpochException.class); - // Verify that the share partition is removed from the cache. + validateShareFetchFutureException(future, tp0, Errors.FENCED_STATE_EPOCH, "Fenced state epoch"); assertTrue(partitionCacheMap.isEmpty()); // The last exception removes the share partition from the cache hence re-add the share partition to cache. @@ -2115,9 +2114,7 @@ public class SharePartitionManagerTest { future::isDone, DELAYED_SHARE_FETCH_TIMEOUT_MS, () -> "Processing in delayed share fetch queue never ended."); - assertTrue(future.isCompletedExceptionally()); - assertFutureThrows(future, NotLeaderOrFollowerException.class); - // Verify that the share partition is removed from the cache. + validateShareFetchFutureException(future, tp0, Errors.NOT_LEADER_OR_FOLLOWER, "Not leader or follower"); assertTrue(partitionCacheMap.isEmpty()); // The last exception removes the share partition from the cache hence re-add the share partition to cache. @@ -2129,10 +2126,11 @@ public class SharePartitionManagerTest { future::isDone, DELAYED_SHARE_FETCH_TIMEOUT_MS, () -> "Processing in delayed share fetch queue never ended."); - assertTrue(future.isCompletedExceptionally()); - assertFutureThrows(future, RuntimeException.class); + validateShareFetchFutureException(future, tp0, Errors.UNKNOWN_SERVER_ERROR, "Runtime exception"); + assertTrue(partitionCacheMap.isEmpty()); } + @Test @SuppressWarnings("unchecked") public void testShareFetchProcessingExceptions() throws Exception { @@ -2140,12 +2138,10 @@ public class SharePartitionManagerTest { TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); Map partitionMaxBytes = Collections.singletonMap(tp0, PARTITION_MAX_BYTES); - SharePartition sp0 = mock(SharePartition.class); Map partitionCacheMap = (Map) mock(Map.class); // Throw the exception for first fetch request. Return share partition for next. when(partitionCacheMap.computeIfAbsent(any(), any())) - .thenThrow(new RuntimeException("Error creating instance")) - .thenReturn(sp0); + .thenThrow(new RuntimeException("Error creating instance")); SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() .withPartitionCacheMap(partitionCacheMap) @@ -2157,19 +2153,204 @@ public class SharePartitionManagerTest { future::isDone, DELAYED_SHARE_FETCH_TIMEOUT_MS, () -> "Processing for delayed share fetch request not finished."); - assertTrue(future.isCompletedExceptionally()); - assertFutureThrows(future, RuntimeException.class, "Error creating instance"); + validateShareFetchFutureException(future, tp0, Errors.UNKNOWN_SERVER_ERROR, "Error creating instance"); + } - // Throw exception from share partition for second fetch request. - when(sp0.maybeInitialize()).thenThrow(new RuntimeException("Error initializing instance")); + @Test + public void testSharePartitionInitializationFailure() throws Exception { + String groupId = "grp"; + TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + Map partitionMaxBytes = Collections.singletonMap(tp0, PARTITION_MAX_BYTES); + // Send map to check no share partition is created. + Map partitionCacheMap = new HashMap<>(); + // Validate when partition is not the leader. + Partition partition = mock(Partition.class); + when(partition.isLeader()).thenReturn(false); + + ReplicaManager replicaManager = mock(ReplicaManager.class); + // First check should throw KafkaStorageException, second check should return partition which + // is not leader. + when(replicaManager.getPartitionOrException(any())) + .thenThrow(new KafkaStorageException("Exception")) + .thenReturn(partition); + SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() + .withReplicaManager(replicaManager) + .withPartitionCacheMap(partitionCacheMap) + .build(); + + // Validate when exception is thrown. + CompletableFuture> future = + sharePartitionManager.fetchMessages(groupId, Uuid.randomUuid().toString(), FETCH_PARAMS, partitionMaxBytes); + TestUtils.waitForCondition( + future::isDone, + DELAYED_SHARE_FETCH_TIMEOUT_MS, + () -> "Processing for delayed share fetch request not finished."); + validateShareFetchFutureException(future, tp0, Errors.KAFKA_STORAGE_ERROR, "Exception"); + assertTrue(partitionCacheMap.isEmpty()); + + // Validate when partition is not leader. future = sharePartitionManager.fetchMessages(groupId, Uuid.randomUuid().toString(), FETCH_PARAMS, partitionMaxBytes); TestUtils.waitForCondition( future::isDone, DELAYED_SHARE_FETCH_TIMEOUT_MS, () -> "Processing for delayed share fetch request not finished."); - assertTrue(future.isCompletedExceptionally()); - assertFutureThrows(future, RuntimeException.class, "Error initializing instance"); + validateShareFetchFutureException(future, tp0, Errors.NOT_LEADER_OR_FOLLOWER); + assertTrue(partitionCacheMap.isEmpty()); + } + + @Test + public void testSharePartitionPartialInitializationFailure() throws Exception { + String groupId = "grp"; + Uuid memberId1 = Uuid.randomUuid(); + TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(memberId1, new TopicPartition("foo", 1)); + Map partitionMaxBytes = Map.of(tp0, PARTITION_MAX_BYTES, tp1, PARTITION_MAX_BYTES); + + // Mark partition1 as not the leader. + Partition partition1 = mock(Partition.class); + when(partition1.isLeader()).thenReturn(false); + + ReplicaManager replicaManager = mock(ReplicaManager.class); + when(replicaManager.getPartitionOrException(any())) + .thenReturn(partition1); + + SharePartition sp1 = mock(SharePartition.class); + Map partitionCacheMap = new HashMap<>(); + partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1); + + when(sp1.maybeAcquireFetchLock()).thenReturn(true); + when(sp1.canAcquireRecords()).thenReturn(true); + when(sp1.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null)); + when(sp1.acquire(anyString(), anyInt(), any())).thenReturn(new ShareAcquiredRecords(Collections.emptyList(), 0)); + + DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory<>( + "TestShareFetch", mockTimer, replicaManager.localBrokerId(), + DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true); + mockReplicaManagerDelayedShareFetch(replicaManager, delayedShareFetchPurgatory); + + doAnswer(invocation -> buildLogReadResult(partitionMaxBytes.keySet())).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); + + SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() + .withReplicaManager(replicaManager) + .withPartitionCacheMap(partitionCacheMap) + .build(); + + // Validate when exception is thrown. + CompletableFuture> future = + sharePartitionManager.fetchMessages(groupId, Uuid.randomUuid().toString(), FETCH_PARAMS, partitionMaxBytes); + assertTrue(future.isDone()); + assertFalse(future.isCompletedExceptionally()); + + Map partitionDataMap = future.get(); + // For now only 1 successful partition is included, this will be fixed in subsequents PRs. + assertEquals(1, partitionDataMap.size()); + assertTrue(partitionDataMap.containsKey(tp1)); + assertEquals(Errors.NONE.code(), partitionDataMap.get(tp1).errorCode()); + + Mockito.verify(replicaManager, times(1)).readFromLog( + any(), any(), any(ReplicaQuota.class), anyBoolean()); + } + + @Test + public void testReplicaManagerFetchException() { + String groupId = "grp"; + Uuid memberId = Uuid.randomUuid(); + TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + Map partitionMaxBytes = Collections.singletonMap(tp0, PARTITION_MAX_BYTES); + + SharePartition sp0 = mock(SharePartition.class); + when(sp0.maybeAcquireFetchLock()).thenReturn(true); + when(sp0.canAcquireRecords()).thenReturn(true); + when(sp0.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null)); + Map partitionCacheMap = new HashMap<>(); + partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0); + + DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory<>( + "TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(), + DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true); + mockReplicaManagerDelayedShareFetch(mockReplicaManager, delayedShareFetchPurgatory); + + doThrow(new RuntimeException("Exception")).when(mockReplicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); + + SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() + .withPartitionCacheMap(partitionCacheMap) + .withReplicaManager(mockReplicaManager) + .withTimer(mockTimer) + .build(); + + CompletableFuture> future = + sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, partitionMaxBytes); + validateShareFetchFutureException(future, tp0, Errors.UNKNOWN_SERVER_ERROR, "Exception"); + // Verify that the share partition is still in the cache on exception. + assertEquals(1, partitionCacheMap.size()); + + // Throw NotLeaderOrFollowerException from replica manager fetch which should evict instance from the cache. + doThrow(new NotLeaderOrFollowerException("Leader exception")).when(mockReplicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); + + future = sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, partitionMaxBytes); + validateShareFetchFutureException(future, tp0, Errors.NOT_LEADER_OR_FOLLOWER, "Leader exception"); + assertTrue(partitionCacheMap.isEmpty()); + } + + @Test + public void testReplicaManagerFetchMultipleSharePartitionsException() { + String groupId = "grp"; + Uuid memberId = Uuid.randomUuid(); + + TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("bar", 0)); + Map partitionMaxBytes = new HashMap<>(); + partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES); + partitionMaxBytes.put(tp1, PARTITION_MAX_BYTES); + + SharePartition sp0 = mock(SharePartition.class); + when(sp0.maybeAcquireFetchLock()).thenReturn(true); + when(sp0.canAcquireRecords()).thenReturn(true); + when(sp0.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null)); + + SharePartition sp1 = mock(SharePartition.class); + // Do not make the share partition acquirable hence it shouldn't be removed from the cache, + // as it won't be part of replica manger readFromLog request. + when(sp1.maybeAcquireFetchLock()).thenReturn(false); + when(sp1.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null)); + + Map partitionCacheMap = new HashMap<>(); + partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0); + partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1); + + DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory<>( + "TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(), + DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true); + mockReplicaManagerDelayedShareFetch(mockReplicaManager, delayedShareFetchPurgatory); + + // Throw FencedStateEpochException from replica manager fetch which should evict instance from the cache. + doThrow(new FencedStateEpochException("Fenced exception")).when(mockReplicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); + + SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() + .withPartitionCacheMap(partitionCacheMap) + .withReplicaManager(mockReplicaManager) + .withTimer(mockTimer) + .build(); + + CompletableFuture> future = + sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, partitionMaxBytes); + validateShareFetchFutureException(future, tp0, Errors.FENCED_STATE_EPOCH, "Fenced exception"); + // Verify that tp1 is still in the cache on exception. + assertEquals(1, partitionCacheMap.size()); + assertEquals(sp1, partitionCacheMap.get(new SharePartitionKey(groupId, tp1))); + + // Make sp1 acquirable and add sp0 back in partition cache. Both share partitions should be + // removed from the cache. + when(sp1.maybeAcquireFetchLock()).thenReturn(true); + when(sp1.canAcquireRecords()).thenReturn(true); + partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0); + // Throw FencedStateEpochException from replica manager fetch which should evict instance from the cache. + doThrow(new FencedStateEpochException("Fenced exception again")).when(mockReplicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); + + future = sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, partitionMaxBytes); + validateShareFetchFutureException(future, List.of(tp0, tp1), Errors.FENCED_STATE_EPOCH, "Fenced exception again"); + assertTrue(partitionCacheMap.isEmpty()); } private ShareFetchResponseData.PartitionData noErrorShareFetchResponse() { @@ -2220,7 +2401,7 @@ public class SharePartitionManagerTest { private void assertErroneousAndValidTopicIdPartitions( ErroneousAndValidPartitionData erroneousAndValidPartitionData, - List expectedErroneous, List expectedValid) { + List expectedErroneous, List expectedValid) { Set expectedErroneousSet = new HashSet<>(expectedErroneous); Set expectedValidSet = new HashSet<>(expectedValid); Set actualErroneousPartitions = new HashSet<>(); @@ -2233,6 +2414,37 @@ public class SharePartitionManagerTest { assertEquals(expectedValidSet, actualValidPartitions); } + private Partition mockPartition() { + Partition partition = mock(Partition.class); + when(partition.isLeader()).thenReturn(true); + when(partition.getLeaderEpoch()).thenReturn(1); + + return partition; + } + + private void validateShareFetchFutureException(CompletableFuture> future, + TopicIdPartition topicIdPartition, Errors error) { + validateShareFetchFutureException(future, Collections.singletonList(topicIdPartition), error, null); + } + + private void validateShareFetchFutureException(CompletableFuture> future, + TopicIdPartition topicIdPartition, Errors error, String message) { + validateShareFetchFutureException(future, Collections.singletonList(topicIdPartition), error, message); + } + + private void validateShareFetchFutureException(CompletableFuture> future, + List topicIdPartitions, Errors error, String message) { + assertFalse(future.isCompletedExceptionally()); + Map result = future.join(); + assertEquals(topicIdPartitions.size(), result.size()); + topicIdPartitions.forEach(topicIdPartition -> { + assertTrue(result.containsKey(topicIdPartition)); + assertEquals(topicIdPartition.partition(), result.get(topicIdPartition).partitionIndex()); + assertEquals(error.code(), result.get(topicIdPartition).errorCode()); + assertEquals(message, result.get(topicIdPartition).errorMessage()); + }); + } + static Seq> buildLogReadResult(Set topicIdPartitions) { List> logReadResults = new ArrayList<>(); topicIdPartitions.forEach(topicIdPartition -> logReadResults.add(new Tuple2<>(topicIdPartition, new LogReadResult( diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java b/core/src/test/java/kafka/server/share/SharePartitionTest.java index c008e772956..6538a1f4012 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java @@ -26,10 +26,12 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.common.compress.Compression; import org.apache.kafka.common.errors.CoordinatorNotAvailableException; import org.apache.kafka.common.errors.FencedStateEpochException; +import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.errors.InvalidRecordStateException; import org.apache.kafka.common.errors.InvalidRequestException; import org.apache.kafka.common.errors.NotLeaderOrFollowerException; import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.message.ShareFetchResponseData.AcquiredRecords; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.MemoryRecords; @@ -336,7 +338,7 @@ public class SharePartitionTest { result = sharePartition.maybeInitialize(); assertTrue(result.isDone()); assertTrue(result.isCompletedExceptionally()); - assertFutureThrows(result, InvalidRequestException.class); + assertFutureThrows(result, GroupIdNotFoundException.class); assertEquals(SharePartitionState.FAILED, sharePartition.partitionState()); // Mock UNKNOWN_TOPIC_OR_PARTITION error. @@ -350,7 +352,7 @@ public class SharePartitionTest { result = sharePartition.maybeInitialize(); assertTrue(result.isDone()); assertTrue(result.isCompletedExceptionally()); - assertFutureThrows(result, InvalidRequestException.class); + assertFutureThrows(result, UnknownTopicOrPartitionException.class); assertEquals(SharePartitionState.FAILED, sharePartition.partitionState()); // Mock FENCED_STATE_EPOCH error. @@ -541,7 +543,7 @@ public class SharePartitionTest { @Test public void testAcquireSingleRecord() { - SharePartition sharePartition = SharePartitionBuilder.builder().build(); + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); MemoryRecords records = memoryRecords(1); List acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( @@ -564,7 +566,7 @@ public class SharePartitionTest { @Test public void testAcquireMultipleRecords() { - SharePartition sharePartition = SharePartitionBuilder.builder().build(); + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); MemoryRecords records = memoryRecords(5, 10); List acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( @@ -587,7 +589,7 @@ public class SharePartitionTest { @Test public void testAcquireWithMaxFetchRecords() { - SharePartition sharePartition = SharePartitionBuilder.builder().build(); + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); // Less-number of records than max fetch records. MemoryRecords records = memoryRecords(5); List acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( @@ -631,7 +633,7 @@ public class SharePartitionTest { @Test public void testAcquireWithMultipleBatchesAndMaxFetchRecords() { - SharePartition sharePartition = SharePartitionBuilder.builder().build(); + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); // Create 3 batches of records. ByteBuffer buffer = ByteBuffer.allocate(4096); @@ -665,7 +667,7 @@ public class SharePartitionTest { @Test public void testAcquireMultipleRecordsWithOverlapAndNewBatch() { - SharePartition sharePartition = SharePartitionBuilder.builder().build(); + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); MemoryRecords records = memoryRecords(5, 0); List acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( @@ -694,7 +696,7 @@ public class SharePartitionTest { @Test public void testAcquireSameBatchAgain() { - SharePartition sharePartition = SharePartitionBuilder.builder().build(); + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); MemoryRecords records = memoryRecords(5, 10); List acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( @@ -736,7 +738,7 @@ public class SharePartitionTest { @Test public void testAcquireWithEmptyFetchRecords() { - SharePartition sharePartition = SharePartitionBuilder.builder().build(); + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); List acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( MEMBER_ID, MAX_FETCH_RECORDS, @@ -750,13 +752,13 @@ public class SharePartitionTest { @Test public void testNextFetchOffsetInitialState() { - SharePartition sharePartition = SharePartitionBuilder.builder().build(); + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); assertEquals(0, sharePartition.nextFetchOffset()); } @Test public void testNextFetchOffsetWithCachedStateAcquired() { - SharePartition sharePartition = SharePartitionBuilder.builder().build(); + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); sharePartition.acquire( MEMBER_ID, MAX_FETCH_RECORDS, @@ -767,7 +769,7 @@ public class SharePartitionTest { @Test public void testNextFetchOffsetWithFindAndCachedStateEmpty() { - SharePartition sharePartition = SharePartitionBuilder.builder().build(); + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); sharePartition.findNextFetchOffset(true); assertTrue(sharePartition.findNextFetchOffset()); assertEquals(0, sharePartition.nextFetchOffset()); @@ -776,7 +778,7 @@ public class SharePartitionTest { @Test public void testNextFetchOffsetWithFindAndCachedState() { - SharePartition sharePartition = SharePartitionBuilder.builder().build(); + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); sharePartition.findNextFetchOffset(true); assertTrue(sharePartition.findNextFetchOffset()); sharePartition.acquire( @@ -808,7 +810,10 @@ public class SharePartitionTest { @Test public void testCanAcquireRecordsWithCachedDataAndLimitReached() { - SharePartition sharePartition = SharePartitionBuilder.builder().withMaxInflightMessages(1).build(); + SharePartition sharePartition = SharePartitionBuilder.builder() + .withMaxInflightMessages(1) + .withState(SharePartitionState.ACTIVE) + .build(); sharePartition.acquire( MEMBER_ID, MAX_FETCH_RECORDS, @@ -833,7 +838,7 @@ public class SharePartitionTest { @Test public void testAcknowledgeSingleRecordBatch() { - SharePartition sharePartition = SharePartitionBuilder.builder().build(); + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); MemoryRecords records1 = memoryRecords(1, 0); MemoryRecords records2 = memoryRecords(1, 1); @@ -872,7 +877,7 @@ public class SharePartitionTest { @Test public void testAcknowledgeMultipleRecordBatch() { - SharePartition sharePartition = SharePartitionBuilder.builder().build(); + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); MemoryRecords records = memoryRecords(10, 5); List acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( @@ -896,7 +901,7 @@ public class SharePartitionTest { @Test public void testAcknowledgeMultipleRecordBatchWithGapOffsets() { - SharePartition sharePartition = SharePartitionBuilder.builder().build(); + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); MemoryRecords records1 = memoryRecords(2, 5); // Untracked gap of 3 offsets from 7-9. MemoryRecordsBuilder recordsBuilder = memoryRecordsBuilder(5, 10); @@ -959,7 +964,7 @@ public class SharePartitionTest { @Test public void testAcknowledgeMultipleSubsetRecordBatchWithGapOffsets() { - SharePartition sharePartition = SharePartitionBuilder.builder().build(); + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); MemoryRecords records1 = memoryRecords(2, 5); // Untracked gap of 3 offsets from 7-9. MemoryRecordsBuilder recordsBuilder = memoryRecordsBuilder(2, 10); @@ -1031,7 +1036,7 @@ public class SharePartitionTest { @Test public void testAcknowledgeOutOfRangeCachedData() { - SharePartition sharePartition = SharePartitionBuilder.builder().build(); + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); // Acknowledge a batch when cache is empty. CompletableFuture ackResult = sharePartition.acknowledge( MEMBER_ID, @@ -1061,7 +1066,7 @@ public class SharePartitionTest { @Test public void testAcknowledgeOutOfRangeCachedDataFirstBatch() { - SharePartition sharePartition = SharePartitionBuilder.builder().build(); + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); // Create data for the batch with offsets 0-4. MemoryRecords records = memoryRecords(5, 0); @@ -1114,7 +1119,7 @@ public class SharePartitionTest { @Test public void testAcknowledgeWithAnotherMember() { - SharePartition sharePartition = SharePartitionBuilder.builder().build(); + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); MemoryRecords records = memoryRecords(5, 5); List acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( MEMBER_ID, @@ -1137,7 +1142,7 @@ public class SharePartitionTest { @Test public void testAcknowledgeWhenOffsetNotAcquired() { - SharePartition sharePartition = SharePartitionBuilder.builder().build(); + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); MemoryRecords records = memoryRecords(5, 5); List acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( MEMBER_ID, @@ -1190,7 +1195,7 @@ public class SharePartitionTest { @Test public void testAcknowledgeRollbackWithFullBatchError() { - SharePartition sharePartition = SharePartitionBuilder.builder().build(); + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); MemoryRecords records1 = memoryRecords(5, 5); MemoryRecords records2 = memoryRecords(5, 10); MemoryRecords records3 = memoryRecords(5, 15); @@ -1243,7 +1248,7 @@ public class SharePartitionTest { @Test public void testAcknowledgeRollbackWithSubsetError() { - SharePartition sharePartition = SharePartitionBuilder.builder().build(); + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); MemoryRecords records1 = memoryRecords(5, 5); MemoryRecords records2 = memoryRecords(5, 10); MemoryRecords records3 = memoryRecords(5, 15); @@ -1298,7 +1303,7 @@ public class SharePartitionTest { @Test public void testAcquireReleasedRecord() { - SharePartition sharePartition = SharePartitionBuilder.builder().build(); + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); MemoryRecords records = memoryRecords(5, 10); List acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( @@ -1343,7 +1348,7 @@ public class SharePartitionTest { @Test public void testAcquireReleasedRecordMultipleBatches() { - SharePartition sharePartition = SharePartitionBuilder.builder().build(); + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); // First fetch request with 5 records starting from offset 10. MemoryRecords records1 = memoryRecords(5, 10); // Second fetch request with 5 records starting from offset 15. @@ -1506,7 +1511,10 @@ public class SharePartitionTest { @Test public void testAcquisitionLockForAcquiringSingleRecord() throws InterruptedException { - SharePartition sharePartition = SharePartitionBuilder.builder().withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS).build(); + SharePartition sharePartition = SharePartitionBuilder.builder() + .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS) + .withState(SharePartitionState.ACTIVE) + .build(); sharePartition.acquire(MEMBER_ID, MAX_FETCH_RECORDS, new FetchPartitionData(Errors.NONE, 3, 0, memoryRecords(1), Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)); @@ -1527,7 +1535,10 @@ public class SharePartitionTest { @Test public void testAcquisitionLockForAcquiringMultipleRecords() throws InterruptedException { - SharePartition sharePartition = SharePartitionBuilder.builder().withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS).build(); + SharePartition sharePartition = SharePartitionBuilder.builder() + .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS) + .withState(SharePartitionState.ACTIVE) + .build(); sharePartition.acquire(MEMBER_ID, MAX_FETCH_RECORDS, new FetchPartitionData(Errors.NONE, 20, 3, memoryRecords(5, 10), Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)); @@ -1548,7 +1559,10 @@ public class SharePartitionTest { @Test public void testAcquisitionLockForAcquiringMultipleRecordsWithOverlapAndNewBatch() throws InterruptedException { - SharePartition sharePartition = SharePartitionBuilder.builder().withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS).build(); + SharePartition sharePartition = SharePartitionBuilder.builder() + .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS) + .withState(SharePartitionState.ACTIVE) + .build(); sharePartition.acquire(MEMBER_ID, MAX_FETCH_RECORDS, new FetchPartitionData(Errors.NONE, 20, 3, memoryRecords(5, 0), Optional.empty(), OptionalLong.empty(), Optional.empty(), @@ -1580,7 +1594,10 @@ public class SharePartitionTest { @Test public void testAcquisitionLockForAcquiringSameBatchAgain() throws InterruptedException { - SharePartition sharePartition = SharePartitionBuilder.builder().withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS).build(); + SharePartition sharePartition = SharePartitionBuilder.builder() + .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS) + .withState(SharePartitionState.ACTIVE) + .build(); sharePartition.acquire(MEMBER_ID, MAX_FETCH_RECORDS, new FetchPartitionData(Errors.NONE, 20, 3, memoryRecords(5, 10), Optional.empty(), OptionalLong.empty(), Optional.empty(), @@ -1609,7 +1626,7 @@ public class SharePartitionTest { @Test public void testAcquisitionLockOnAcknowledgingSingleRecordBatch() throws InterruptedException { - SharePartition sharePartition = SharePartitionBuilder.builder().build(); + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); sharePartition.acquire(MEMBER_ID, MAX_FETCH_RECORDS, new FetchPartitionData(Errors.NONE, 10, 0, memoryRecords(1, 0), Optional.empty(), OptionalLong.empty(), Optional.empty(), @@ -1643,7 +1660,7 @@ public class SharePartitionTest { @Test public void testAcquisitionLockOnAcknowledgingMultipleRecordBatch() throws InterruptedException { - SharePartition sharePartition = SharePartitionBuilder.builder().build(); + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); sharePartition.acquire(MEMBER_ID, MAX_FETCH_RECORDS, new FetchPartitionData(Errors.NONE, 20, 0, memoryRecords(10, 5), Optional.empty(), OptionalLong.empty(), Optional.empty(), @@ -1674,7 +1691,10 @@ public class SharePartitionTest { @Test public void testAcquisitionLockOnAcknowledgingMultipleRecordBatchWithGapOffsets() throws InterruptedException { - SharePartition sharePartition = SharePartitionBuilder.builder().withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS).build(); + SharePartition sharePartition = SharePartitionBuilder.builder() + .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS) + .withState(SharePartitionState.ACTIVE) + .build(); MemoryRecords records1 = memoryRecords(2, 5); // Untracked gap of 3 offsets from 7-9. MemoryRecordsBuilder recordsBuilder = memoryRecordsBuilder(5, 10); @@ -1728,7 +1748,10 @@ public class SharePartitionTest { @Test public void testAcquisitionLockForAcquiringSubsetBatchAgain() throws InterruptedException { - SharePartition sharePartition = SharePartitionBuilder.builder().withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS).build(); + SharePartition sharePartition = SharePartitionBuilder.builder() + .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS) + .withState(SharePartitionState.ACTIVE) + .build(); sharePartition.acquire(MEMBER_ID, MAX_FETCH_RECORDS, new FetchPartitionData(Errors.NONE, 20, 3, memoryRecords(8, 10), Optional.empty(), OptionalLong.empty(), Optional.empty(), @@ -1794,7 +1817,10 @@ public class SharePartitionTest { @Test public void testAcquisitionLockOnAcknowledgingMultipleSubsetRecordBatchWithGapOffsets() throws InterruptedException { - SharePartition sharePartition = SharePartitionBuilder.builder().withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS).build(); + SharePartition sharePartition = SharePartitionBuilder.builder() + .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS) + .withState(SharePartitionState.ACTIVE) + .build(); MemoryRecords records1 = memoryRecords(2, 5); // Untracked gap of 3 offsets from 7-9. MemoryRecordsBuilder recordsBuilder = memoryRecordsBuilder(2, 10); @@ -1892,9 +1918,10 @@ public class SharePartitionTest { @Test public void testAcquisitionLockTimeoutCauseMaxDeliveryCountExceed() throws InterruptedException { SharePartition sharePartition = SharePartitionBuilder.builder() - .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS) - .withMaxDeliveryCount(2) // Only 2 delivery attempts will be made before archiving the records - .build(); + .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS) + .withMaxDeliveryCount(2) // Only 2 delivery attempts will be made before archiving the records + .withState(SharePartitionState.ACTIVE) + .build(); // Adding memoryRecords(10, 0) in the sharePartition to make sure that SPSO doesn't move forward when delivery count of records2 // exceed the max delivery count. @@ -1944,9 +1971,10 @@ public class SharePartitionTest { @Test public void testAcquisitionLockTimeoutCauseSPSOMoveForward() throws InterruptedException { SharePartition sharePartition = SharePartitionBuilder.builder() - .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS) - .withMaxDeliveryCount(2) // Only 2 delivery attempts will be made before archiving the records - .build(); + .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS) + .withMaxDeliveryCount(2) // Only 2 delivery attempts will be made before archiving the records + .withState(SharePartitionState.ACTIVE) + .build(); sharePartition.acquire(MEMBER_ID, MAX_FETCH_RECORDS, new FetchPartitionData(Errors.NONE, 3, 0, memoryRecords(10, 0), Optional.empty(), OptionalLong.empty(), Optional.empty(), @@ -2020,9 +2048,10 @@ public class SharePartitionTest { @Test public void testAcquisitionLockTimeoutCauseSPSOMoveForwardAndClearCachedState() throws InterruptedException { SharePartition sharePartition = SharePartitionBuilder.builder() - .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS) - .withMaxDeliveryCount(2) // Only 2 delivery attempts will be made before archiving the records - .build(); + .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS) + .withMaxDeliveryCount(2) // Only 2 delivery attempts will be made before archiving the records + .withState(SharePartitionState.ACTIVE) + .build(); sharePartition.acquire(MEMBER_ID, MAX_FETCH_RECORDS, new FetchPartitionData(Errors.NONE, 3, 0, memoryRecords(10, 0), Optional.empty(), OptionalLong.empty(), Optional.empty(), @@ -2060,7 +2089,10 @@ public class SharePartitionTest { @Test public void testAcknowledgeAfterAcquisitionLockTimeout() throws InterruptedException { - SharePartition sharePartition = SharePartitionBuilder.builder().withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS).build(); + SharePartition sharePartition = SharePartitionBuilder.builder() + .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS) + .withState(SharePartitionState.ACTIVE) + .build(); sharePartition.acquire(MEMBER_ID, MAX_FETCH_RECORDS, new FetchPartitionData(Errors.NONE, 20, 0, memoryRecords(5, 5), Optional.empty(), OptionalLong.empty(), Optional.empty(), @@ -2097,7 +2129,10 @@ public class SharePartitionTest { @Test public void testAcquisitionLockAfterDifferentAcknowledges() throws InterruptedException { - SharePartition sharePartition = SharePartitionBuilder.builder().withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS).build(); + SharePartition sharePartition = SharePartitionBuilder.builder() + .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS) + .withState(SharePartitionState.ACTIVE) + .build(); sharePartition.acquire(MEMBER_ID, MAX_FETCH_RECORDS, new FetchPartitionData(Errors.NONE, 20, 0, memoryRecords(5, 5), Optional.empty(), OptionalLong.empty(), Optional.empty(), @@ -2155,8 +2190,9 @@ public class SharePartitionTest { Persister persister = Mockito.mock(Persister.class); mockPersisterReadStateMethod(persister); SharePartition sharePartition = SharePartitionBuilder.builder().withPersister(persister) - .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS) - .build(); + .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS) + .withState(SharePartitionState.ACTIVE) + .build(); // Mock persister writeState method so that sharePartition.isWriteShareGroupStateSuccessful() returns false. WriteShareGroupStateResult writeShareGroupStateResult = Mockito.mock(WriteShareGroupStateResult.class); @@ -2187,8 +2223,9 @@ public class SharePartitionTest { Persister persister = Mockito.mock(Persister.class); mockPersisterReadStateMethod(persister); SharePartition sharePartition = SharePartitionBuilder.builder().withPersister(persister) - .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS) - .build(); + .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS) + .withState(SharePartitionState.ACTIVE) + .build(); // Mock persister writeState method so that sharePartition.isWriteShareGroupStateSuccessful() returns true for acknowledge to pass. WriteShareGroupStateResult writeShareGroupStateResult = Mockito.mock(WriteShareGroupStateResult.class); @@ -2237,7 +2274,7 @@ public class SharePartitionTest { @Test public void testReleaseSingleRecordBatch() { - SharePartition sharePartition = SharePartitionBuilder.builder().build(); + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); sharePartition.acquire(MEMBER_ID, MAX_FETCH_RECORDS, new FetchPartitionData(Errors.NONE, 10, 0, memoryRecords(1, 0), Optional.empty(), OptionalLong.empty(), Optional.empty(), @@ -2256,7 +2293,7 @@ public class SharePartitionTest { @Test public void testReleaseMultipleRecordBatch() { - SharePartition sharePartition = SharePartitionBuilder.builder().build(); + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); sharePartition.acquire(MEMBER_ID, MAX_FETCH_RECORDS, new FetchPartitionData(Errors.NONE, 20, 0, memoryRecords(10, 5), Optional.empty(), OptionalLong.empty(), Optional.empty(), @@ -2275,7 +2312,7 @@ public class SharePartitionTest { @Test public void testReleaseMultipleAcknowledgedRecordBatch() { - SharePartition sharePartition = SharePartitionBuilder.builder().build(); + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); MemoryRecords records0 = memoryRecords(5, 0); MemoryRecords records1 = memoryRecords(2, 5); // Untracked gap of 3 offsets from 7-9. @@ -2308,7 +2345,7 @@ public class SharePartitionTest { @Test public void testReleaseAcknowledgedMultipleSubsetRecordBatch() { - SharePartition sharePartition = SharePartitionBuilder.builder().build(); + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); MemoryRecords records1 = memoryRecords(2, 5); // Untracked gap of 3 offsets from 7-9. @@ -2365,7 +2402,7 @@ public class SharePartitionTest { @Test public void testReleaseAcquiredRecordsWithAnotherMember() { - SharePartition sharePartition = SharePartitionBuilder.builder().build(); + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); MemoryRecords records1 = memoryRecords(1, 5); // Untracked gap of 3 offsets from 7-9. MemoryRecordsBuilder recordsBuilder = memoryRecordsBuilder(2, 10); @@ -2438,7 +2475,7 @@ public class SharePartitionTest { @Test public void testReleaseAcquiredRecordsWithAnotherMemberAndSubsetAcknowledged() { - SharePartition sharePartition = SharePartitionBuilder.builder().build(); + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); MemoryRecords records1 = memoryRecords(2, 5); // Untracked gap of 3 offsets from 7-9. MemoryRecordsBuilder recordsBuilder = memoryRecordsBuilder(2, 10); @@ -2517,7 +2554,7 @@ public class SharePartitionTest { @Test public void testReleaseAcquiredRecordsForEmptyCachedData() { - SharePartition sharePartition = SharePartitionBuilder.builder().build(); + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); // Release a batch when cache is empty. CompletableFuture releaseResult = sharePartition.releaseAcquiredRecords(MEMBER_ID); assertNull(releaseResult.join()); @@ -2528,7 +2565,7 @@ public class SharePartitionTest { @Test public void testReleaseAcquiredRecordsAfterDifferentAcknowledges() { - SharePartition sharePartition = SharePartitionBuilder.builder().build(); + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); sharePartition.acquire(MEMBER_ID, MAX_FETCH_RECORDS, new FetchPartitionData(Errors.NONE, 20, 0, memoryRecords(5, 5), Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)); @@ -2554,7 +2591,10 @@ public class SharePartitionTest { @Test public void testMaxDeliveryCountLimitExceededForRecordsSubsetAfterReleaseAcquiredRecords() { - SharePartition sharePartition = SharePartitionBuilder.builder().withMaxDeliveryCount(2).build(); + SharePartition sharePartition = SharePartitionBuilder.builder() + .withMaxDeliveryCount(2) + .withState(SharePartitionState.ACTIVE) + .build(); sharePartition.acquire(MEMBER_ID, MAX_FETCH_RECORDS, new FetchPartitionData(Errors.NONE, 40, 3, memoryRecords(10, 0), Optional.empty(), OptionalLong.empty(), Optional.empty(), @@ -2584,7 +2624,10 @@ public class SharePartitionTest { @Test public void testMaxDeliveryCountLimitExceededForRecordsSubsetAfterReleaseAcquiredRecordsSubset() { - SharePartition sharePartition = SharePartitionBuilder.builder().withMaxDeliveryCount(2).build(); + SharePartition sharePartition = SharePartitionBuilder.builder() + .withMaxDeliveryCount(2) + .withState(SharePartitionState.ACTIVE) + .build(); // First fetch request with 5 records starting from offset 10. MemoryRecords records1 = memoryRecords(5, 10); // Second fetch request with 5 records starting from offset 15. @@ -2657,7 +2700,10 @@ public class SharePartitionTest { @Test public void testMaxDeliveryCountLimitExceededForRecordsSubsetCacheCleared() { - SharePartition sharePartition = SharePartitionBuilder.builder().withMaxDeliveryCount(2).build(); + SharePartition sharePartition = SharePartitionBuilder.builder() + .withMaxDeliveryCount(2) + .withState(SharePartitionState.ACTIVE) + .build(); // First fetch request with 5 records starting from offset 10. MemoryRecords records1 = memoryRecords(5, 10); // Second fetch request with 5 records starting from offset 15. @@ -2708,7 +2754,7 @@ public class SharePartitionTest { @Test public void testReleaseAcquiredRecordsSubsetWithAnotherMember() { - SharePartition sharePartition = SharePartitionBuilder.builder().build(); + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); sharePartition.acquire(MEMBER_ID, MAX_FETCH_RECORDS, new FetchPartitionData(Errors.NONE, 30, 0, memoryRecords(7, 5), @@ -2738,7 +2784,10 @@ public class SharePartitionTest { public void testReleaseBatchWithWriteShareGroupStateFailure() { Persister persister = Mockito.mock(Persister.class); mockPersisterReadStateMethod(persister); - SharePartition sharePartition = SharePartitionBuilder.builder().withPersister(persister).build(); + SharePartition sharePartition = SharePartitionBuilder.builder() + .withPersister(persister) + .withState(SharePartitionState.ACTIVE) + .build(); // Mock persister writeState method so that sharePartition.isWriteShareGroupStateSuccessful() returns false. WriteShareGroupStateResult writeShareGroupStateResult = Mockito.mock(WriteShareGroupStateResult.class); @@ -2753,7 +2802,7 @@ public class SharePartitionTest { CompletableFuture releaseResult = sharePartition.releaseAcquiredRecords(MEMBER_ID); assertTrue(releaseResult.isCompletedExceptionally()); - assertFutureThrows(releaseResult, InvalidRequestException.class); + assertFutureThrows(releaseResult, GroupIdNotFoundException.class); // Due to failure in writeShareGroupState, the cached state should not be updated. assertEquals(1, sharePartition.cachedState().size()); @@ -2765,7 +2814,10 @@ public class SharePartitionTest { public void testReleaseOffsetWithWriteShareGroupStateFailure() { Persister persister = Mockito.mock(Persister.class); mockPersisterReadStateMethod(persister); - SharePartition sharePartition = SharePartitionBuilder.builder().withPersister(persister).build(); + SharePartition sharePartition = SharePartitionBuilder.builder() + .withPersister(persister) + .withState(SharePartitionState.ACTIVE) + .build(); // Mock persister writeState method so that sharePartition.isWriteShareGroupStateSuccessful() returns true for acknowledge to pass. WriteShareGroupStateResult writeShareGroupStateResult = Mockito.mock(WriteShareGroupStateResult.class); @@ -2789,7 +2841,7 @@ public class SharePartitionTest { CompletableFuture releaseResult = sharePartition.releaseAcquiredRecords(MEMBER_ID); assertTrue(releaseResult.isCompletedExceptionally()); - assertFutureThrows(releaseResult, InvalidRequestException.class); + assertFutureThrows(releaseResult, GroupIdNotFoundException.class); // Due to failure in writeShareGroupState, the cached state should not be updated. assertEquals(1, sharePartition.cachedState().size()); @@ -2810,7 +2862,7 @@ public class SharePartitionTest { @Test public void testAcquisitionLockOnReleasingMultipleRecordBatch() { - SharePartition sharePartition = SharePartitionBuilder.builder().build(); + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); sharePartition.acquire(MEMBER_ID, MAX_FETCH_RECORDS, new FetchPartitionData(Errors.NONE, 20, 0, memoryRecords(10, 5), Optional.empty(), OptionalLong.empty(), Optional.empty(), @@ -2832,7 +2884,10 @@ public class SharePartitionTest { @Test public void testAcquisitionLockOnReleasingAcknowledgedMultipleSubsetRecordBatchWithGapOffsets() { - SharePartition sharePartition = SharePartitionBuilder.builder().withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS).build(); + SharePartition sharePartition = SharePartitionBuilder.builder() + .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS) + .withState(SharePartitionState.ACTIVE) + .build(); MemoryRecords records1 = memoryRecords(2, 5); // Untracked gap of 3 offsets from 7-9. MemoryRecordsBuilder recordsBuilder = memoryRecordsBuilder(2, 10); @@ -2908,7 +2963,7 @@ public class SharePartitionTest { @Test public void testLsoMovementOnInitializationSharePartition() { // LSO is at 0. - SharePartition sharePartition = SharePartitionBuilder.builder().build(); + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); sharePartition.updateCacheAndOffsets(0); assertEquals(0, sharePartition.nextFetchOffset()); assertEquals(0, sharePartition.startOffset()); @@ -2923,7 +2978,7 @@ public class SharePartitionTest { @Test public void testLsoMovementForArchivingBatches() { - SharePartition sharePartition = SharePartitionBuilder.builder().build(); + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); sharePartition.acquire(MEMBER_ID, MAX_FETCH_RECORDS, new FetchPartitionData(Errors.NONE, 20, 0, memoryRecords(5, 2), Optional.empty(), OptionalLong.empty(), Optional.empty(), @@ -2991,7 +3046,7 @@ public class SharePartitionTest { @Test public void testLsoMovementForArchivingOffsets() { - SharePartition sharePartition = SharePartitionBuilder.builder().build(); + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); sharePartition.acquire(MEMBER_ID, MAX_FETCH_RECORDS, new FetchPartitionData(Errors.NONE, 20, 0, memoryRecords(5, 2), Optional.empty(), OptionalLong.empty(), Optional.empty(), @@ -3043,7 +3098,7 @@ public class SharePartitionTest { @Test public void testLsoMovementForArchivingOffsetsWithStartAndEndBatchesNotFullMatches() { - SharePartition sharePartition = SharePartitionBuilder.builder().build(); + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); sharePartition.acquire(MEMBER_ID, MAX_FETCH_RECORDS, new FetchPartitionData(Errors.NONE, 20, 0, memoryRecords(5, 2), Optional.empty(), OptionalLong.empty(), Optional.empty(), @@ -3083,7 +3138,7 @@ public class SharePartitionTest { @Test public void testLsoMovementForArchivingOffsetsWithStartOffsetNotFullMatches() { - SharePartition sharePartition = SharePartitionBuilder.builder().build(); + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); sharePartition.acquire(MEMBER_ID, MAX_FETCH_RECORDS, new FetchPartitionData(Errors.NONE, 20, 0, memoryRecords(5, 2), Optional.empty(), OptionalLong.empty(), Optional.empty(), @@ -3123,7 +3178,7 @@ public class SharePartitionTest { @Test public void testLsoMovementForArchivingOffsetsWithStartOffsetNotFullMatchesPostAcceptAcknowledgement() { - SharePartition sharePartition = SharePartitionBuilder.builder().build(); + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); sharePartition.acquire(MEMBER_ID, MAX_FETCH_RECORDS, new FetchPartitionData(Errors.NONE, 20, 0, memoryRecords(5, 2), Optional.empty(), OptionalLong.empty(), Optional.empty(), @@ -3173,7 +3228,7 @@ public class SharePartitionTest { @Test public void testLsoMovementForArchivingOffsetsWithStartOffsetNotFullMatchesPostReleaseAcknowledgement() { - SharePartition sharePartition = SharePartitionBuilder.builder().build(); + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); sharePartition.acquire(MEMBER_ID, MAX_FETCH_RECORDS, new FetchPartitionData(Errors.NONE, 20, 0, memoryRecords(5, 2), Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)); @@ -3221,7 +3276,7 @@ public class SharePartitionTest { @Test public void testLsoMovementToEndOffset() { - SharePartition sharePartition = SharePartitionBuilder.builder().build(); + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); sharePartition.acquire(MEMBER_ID, MAX_FETCH_RECORDS, new FetchPartitionData(Errors.NONE, 20, 0, memoryRecords(5, 2), Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)); @@ -3255,7 +3310,7 @@ public class SharePartitionTest { @Test public void testLsoMovementToEndOffsetWhereEndOffsetIsAvailable() { - SharePartition sharePartition = SharePartitionBuilder.builder().build(); + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); sharePartition.acquire(MEMBER_ID, MAX_FETCH_RECORDS, new FetchPartitionData(Errors.NONE, 20, 0, memoryRecords(5, 2), Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)); @@ -3290,7 +3345,7 @@ public class SharePartitionTest { @Test public void testLsoMovementAheadOfEndOffsetPostAcknowledgment() { - SharePartition sharePartition = SharePartitionBuilder.builder().build(); + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); sharePartition.acquire(MEMBER_ID, MAX_FETCH_RECORDS, new FetchPartitionData(Errors.NONE, 20, 0, memoryRecords(5, 2), Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)); @@ -3324,7 +3379,7 @@ public class SharePartitionTest { @Test public void testLsoMovementAheadOfEndOffset() { - SharePartition sharePartition = SharePartitionBuilder.builder().build(); + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); sharePartition.acquire(MEMBER_ID, MAX_FETCH_RECORDS, new FetchPartitionData(Errors.NONE, 20, 0, memoryRecords(5, 2), Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)); @@ -3348,7 +3403,7 @@ public class SharePartitionTest { @Test public void testLsoMovementWithGapsInCachedStateMap() { - SharePartition sharePartition = SharePartitionBuilder.builder().build(); + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); MemoryRecords records1 = memoryRecords(5, 2); // Gap of 7-9. @@ -3383,7 +3438,7 @@ public class SharePartitionTest { @Test public void testLsoMovementWithGapsInCachedStateMapAndAcknowledgedBatch() { - SharePartition sharePartition = SharePartitionBuilder.builder().build(); + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); MemoryRecords records1 = memoryRecords(5, 2); // Gap of 7-9. @@ -3415,7 +3470,7 @@ public class SharePartitionTest { @Test public void testLsoMovementPostGapsInAcknowledgments() { - SharePartition sharePartition = SharePartitionBuilder.builder().build(); + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); MemoryRecords records1 = memoryRecords(2, 5); // Untracked gap of 3 offsets from 7-9. @@ -3463,7 +3518,7 @@ public class SharePartitionTest { @Test public void testReleaseAcquiredRecordsBatchesPostStartOffsetMovement() { - SharePartition sharePartition = SharePartitionBuilder.builder().build(); + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); sharePartition.acquire(MEMBER_ID, MAX_FETCH_RECORDS, new FetchPartitionData(Errors.NONE, 20, 0, memoryRecords(5, 5), Optional.empty(), OptionalLong.empty(), Optional.empty(), @@ -3550,7 +3605,7 @@ public class SharePartitionTest { @Test public void testReleaseAcquiredRecordsBatchesPostStartOffsetMovementToStartOfBatch() { - SharePartition sharePartition = SharePartitionBuilder.builder().build(); + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); sharePartition.acquire(MEMBER_ID, MAX_FETCH_RECORDS, new FetchPartitionData(Errors.NONE, 20, 0, memoryRecords(5, 5), Optional.empty(), OptionalLong.empty(), Optional.empty(), @@ -3581,7 +3636,7 @@ public class SharePartitionTest { @Test public void testReleaseAcquiredRecordsBatchesPostStartOffsetMovementToMiddleOfBatch() { - SharePartition sharePartition = SharePartitionBuilder.builder().build(); + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); sharePartition.acquire(MEMBER_ID, MAX_FETCH_RECORDS, new FetchPartitionData(Errors.NONE, 20, 0, memoryRecords(5, 5), Optional.empty(), OptionalLong.empty(), Optional.empty(), @@ -3619,7 +3674,9 @@ public class SharePartitionTest { @Test public void testAcquisitionLockTimeoutForBatchesPostStartOffsetMovement() throws InterruptedException { SharePartition sharePartition = SharePartitionBuilder.builder() - .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS).build(); + .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS) + .withState(SharePartitionState.ACTIVE) + .build(); sharePartition.acquire(MEMBER_ID, MAX_FETCH_RECORDS, new FetchPartitionData(Errors.NONE, 20, 0, memoryRecords(5, 5), Optional.empty(), OptionalLong.empty(), Optional.empty(), @@ -3702,7 +3759,9 @@ public class SharePartitionTest { @Test public void testAcquisitionLockTimeoutForBatchesPostStartOffsetMovementToStartOfBatch() throws InterruptedException { SharePartition sharePartition = SharePartitionBuilder.builder() - .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS).build(); + .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS) + .withState(SharePartitionState.ACTIVE) + .build(); sharePartition.acquire(MEMBER_ID, MAX_FETCH_RECORDS, new FetchPartitionData(Errors.NONE, 20, 0, memoryRecords(5, 5), Optional.empty(), OptionalLong.empty(), Optional.empty(), @@ -3732,7 +3791,9 @@ public class SharePartitionTest { @Test public void testAcquisitionLockTimeoutForBatchesPostStartOffsetMovementToMiddleOfBatch() throws InterruptedException { SharePartition sharePartition = SharePartitionBuilder.builder() - .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS).build(); + .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS) + .withState(SharePartitionState.ACTIVE) + .build(); sharePartition.acquire(MEMBER_ID, MAX_FETCH_RECORDS, new FetchPartitionData(Errors.NONE, 20, 0, memoryRecords(5, 5), Optional.empty(), OptionalLong.empty(), Optional.empty(), @@ -3816,7 +3877,7 @@ public class SharePartitionTest { @Test public void testAcknowledgeBatchAndOffsetPostLsoMovement() { - SharePartition sharePartition = SharePartitionBuilder.builder().build(); + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); sharePartition.acquire(MEMBER_ID, MAX_FETCH_RECORDS, new FetchPartitionData(Errors.NONE, 20, 0, memoryRecords(5, 2), Optional.empty(), OptionalLong.empty(), Optional.empty(), @@ -3875,7 +3936,7 @@ public class SharePartitionTest { @Test public void testAcknowledgeBatchPostLsoMovement() { - SharePartition sharePartition = SharePartitionBuilder.builder().build(); + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); sharePartition.acquire(MEMBER_ID, MAX_FETCH_RECORDS, new FetchPartitionData(Errors.NONE, 20, 0, memoryRecords(5, 2), Optional.empty(), OptionalLong.empty(), Optional.empty(), @@ -3935,7 +3996,9 @@ public class SharePartitionTest { @Test public void testLsoMovementThenAcquisitionLockTimeoutThenAcknowledge() throws InterruptedException { SharePartition sharePartition = SharePartitionBuilder.builder() - .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS).build(); + .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS) + .withState(SharePartitionState.ACTIVE) + .build(); sharePartition.acquire(MEMBER_ID, MAX_FETCH_RECORDS, new FetchPartitionData(Errors.NONE, 20, 0, memoryRecords(5, 2), Optional.empty(), OptionalLong.empty(), Optional.empty(), @@ -3985,7 +4048,10 @@ public class SharePartitionTest { @Test public void testLsoMovementThenAcquisitionLockTimeoutThenAcknowledgeBatchLastOffsetAheadOfStartOffsetBatch() throws InterruptedException { - SharePartition sharePartition = SharePartitionBuilder.builder().withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS).build(); + SharePartition sharePartition = SharePartitionBuilder.builder() + .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS) + .withState(SharePartitionState.ACTIVE) + .build(); sharePartition.acquire(MEMBER_ID, MAX_FETCH_RECORDS, new FetchPartitionData(Errors.NONE, 20, 0, memoryRecords(2, 1), Optional.empty(), OptionalLong.empty(), Optional.empty(), @@ -4062,7 +4128,7 @@ public class SharePartitionTest { WriteShareGroupStateResult writeShareGroupStateResult = Mockito.mock(WriteShareGroupStateResult.class); Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(null); Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult)); - CompletableFuture result = sharePartition.writeShareGroupState(Mockito.anyList()); + CompletableFuture result = sharePartition.writeShareGroupState(anyList()); assertTrue(result.isCompletedExceptionally()); assertFutureThrows(result, IllegalStateException.class); } @@ -4077,7 +4143,7 @@ public class SharePartitionTest { // TopicsData is empty. Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(Collections.emptyList()); Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult)); - CompletableFuture writeResult = sharePartition.writeShareGroupState(Mockito.anyList()); + CompletableFuture writeResult = sharePartition.writeShareGroupState(anyList()); assertTrue(writeResult.isCompletedExceptionally()); assertFutureThrows(writeResult, IllegalStateException.class); @@ -4086,7 +4152,7 @@ public class SharePartitionTest { new TopicData<>(TOPIC_ID_PARTITION.topicId(), Collections.emptyList()), new TopicData<>(Uuid.randomUuid(), Collections.emptyList()))); Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult)); - writeResult = sharePartition.writeShareGroupState(Mockito.anyList()); + writeResult = sharePartition.writeShareGroupState(anyList()); assertTrue(writeResult.isCompletedExceptionally()); assertFutureThrows(writeResult, IllegalStateException.class); @@ -4094,7 +4160,7 @@ public class SharePartitionTest { Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList( new TopicData<>(TOPIC_ID_PARTITION.topicId(), Collections.emptyList()))); Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult)); - writeResult = sharePartition.writeShareGroupState(Mockito.anyList()); + writeResult = sharePartition.writeShareGroupState(anyList()); assertTrue(writeResult.isCompletedExceptionally()); assertFutureThrows(writeResult, IllegalStateException.class); @@ -4103,7 +4169,7 @@ public class SharePartitionTest { new TopicData<>(Uuid.randomUuid(), Collections.singletonList( PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), Errors.NONE.message()))))); Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult)); - writeResult = sharePartition.writeShareGroupState(Mockito.anyList()); + writeResult = sharePartition.writeShareGroupState(anyList()); assertTrue(writeResult.isCompletedExceptionally()); assertFutureThrows(writeResult, IllegalStateException.class); @@ -4113,7 +4179,7 @@ public class SharePartitionTest { PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), Errors.NONE.message()), PartitionFactory.newPartitionErrorData(1, Errors.NONE.code(), Errors.NONE.message()))))); Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult)); - writeResult = sharePartition.writeShareGroupState(Mockito.anyList()); + writeResult = sharePartition.writeShareGroupState(anyList()); assertTrue(writeResult.isCompletedExceptionally()); assertFutureThrows(writeResult, IllegalStateException.class); @@ -4122,7 +4188,7 @@ public class SharePartitionTest { new TopicData<>(TOPIC_ID_PARTITION.topicId(), Collections.singletonList( PartitionFactory.newPartitionErrorData(1, Errors.NONE.code(), Errors.NONE.message()))))); Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult)); - writeResult = sharePartition.writeShareGroupState(Mockito.anyList()); + writeResult = sharePartition.writeShareGroupState(anyList()); assertTrue(writeResult.isCompletedExceptionally()); assertFutureThrows(writeResult, IllegalStateException.class); } @@ -4134,7 +4200,7 @@ public class SharePartitionTest { SharePartition sharePartition = SharePartitionBuilder.builder().withPersister(persister).build(); Mockito.when(persister.writeState(Mockito.any())).thenReturn(FutureUtils.failedFuture(new RuntimeException("Write exception"))); - CompletableFuture writeResult = sharePartition.writeShareGroupState(Mockito.anyList()); + CompletableFuture writeResult = sharePartition.writeShareGroupState(anyList()); assertTrue(writeResult.isCompletedExceptionally()); assertFutureThrows(writeResult, IllegalStateException.class); } @@ -4151,7 +4217,7 @@ public class SharePartitionTest { PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), Errors.NONE.message()))))); Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult)); - CompletableFuture result = sharePartition.writeShareGroupState(Mockito.anyList()); + CompletableFuture result = sharePartition.writeShareGroupState(anyList()); assertNull(result.join()); assertFalse(result.isCompletedExceptionally()); } @@ -4160,7 +4226,10 @@ public class SharePartitionTest { public void testWriteShareGroupStateFailure() { Persister persister = Mockito.mock(Persister.class); mockPersisterReadStateMethod(persister); - SharePartition sharePartition = SharePartitionBuilder.builder().withPersister(persister).build(); + SharePartition sharePartition = SharePartitionBuilder.builder() + .withPersister(persister) + .withState(SharePartitionState.ACTIVE) + .build(); // Mock Write state RPC to return error response, NOT_COORDINATOR. WriteShareGroupStateResult writeShareGroupStateResult = Mockito.mock(WriteShareGroupStateResult.class); Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList( @@ -4168,7 +4237,7 @@ public class SharePartitionTest { PartitionFactory.newPartitionErrorData(0, Errors.NOT_COORDINATOR.code(), Errors.NOT_COORDINATOR.message()))))); Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult)); - CompletableFuture result = sharePartition.writeShareGroupState(Mockito.anyList()); + CompletableFuture result = sharePartition.writeShareGroupState(anyList()); assertTrue(result.isCompletedExceptionally()); assertFutureThrows(result, CoordinatorNotAvailableException.class); @@ -4178,7 +4247,7 @@ public class SharePartitionTest { PartitionFactory.newPartitionErrorData(0, Errors.COORDINATOR_NOT_AVAILABLE.code(), Errors.COORDINATOR_NOT_AVAILABLE.message()))))); Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult)); - result = sharePartition.writeShareGroupState(Mockito.anyList()); + result = sharePartition.writeShareGroupState(anyList()); assertTrue(result.isCompletedExceptionally()); assertFutureThrows(result, CoordinatorNotAvailableException.class); @@ -4188,7 +4257,7 @@ public class SharePartitionTest { PartitionFactory.newPartitionErrorData(0, Errors.COORDINATOR_LOAD_IN_PROGRESS.code(), Errors.COORDINATOR_LOAD_IN_PROGRESS.message()))))); Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult)); - result = sharePartition.writeShareGroupState(Mockito.anyList()); + result = sharePartition.writeShareGroupState(anyList()); assertTrue(result.isCompletedExceptionally()); assertFutureThrows(result, CoordinatorNotAvailableException.class); @@ -4198,9 +4267,9 @@ public class SharePartitionTest { PartitionFactory.newPartitionErrorData(0, Errors.GROUP_ID_NOT_FOUND.code(), Errors.GROUP_ID_NOT_FOUND.message()))))); Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult)); - result = sharePartition.writeShareGroupState(Mockito.anyList()); + result = sharePartition.writeShareGroupState(anyList()); assertTrue(result.isCompletedExceptionally()); - assertFutureThrows(result, InvalidRequestException.class); + assertFutureThrows(result, GroupIdNotFoundException.class); // Mock Write state RPC to return error response, UNKNOWN_TOPIC_OR_PARTITION. Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList( @@ -4208,9 +4277,9 @@ public class SharePartitionTest { PartitionFactory.newPartitionErrorData(0, Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), Errors.UNKNOWN_TOPIC_OR_PARTITION.message()))))); Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult)); - result = sharePartition.writeShareGroupState(Mockito.anyList()); + result = sharePartition.writeShareGroupState(anyList()); assertTrue(result.isCompletedExceptionally()); - assertFutureThrows(result, InvalidRequestException.class); + assertFutureThrows(result, UnknownTopicOrPartitionException.class); // Mock Write state RPC to return error response, FENCED_STATE_EPOCH. Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList( @@ -4218,7 +4287,7 @@ public class SharePartitionTest { PartitionFactory.newPartitionErrorData(0, Errors.FENCED_STATE_EPOCH.code(), Errors.FENCED_STATE_EPOCH.message()))))); Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult)); - result = sharePartition.writeShareGroupState(Mockito.anyList()); + result = sharePartition.writeShareGroupState(anyList()); assertTrue(result.isCompletedExceptionally()); assertFutureThrows(result, FencedStateEpochException.class); @@ -4228,7 +4297,7 @@ public class SharePartitionTest { PartitionFactory.newPartitionErrorData(0, Errors.FENCED_LEADER_EPOCH.code(), Errors.FENCED_LEADER_EPOCH.message()))))); Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult)); - result = sharePartition.writeShareGroupState(Mockito.anyList()); + result = sharePartition.writeShareGroupState(anyList()); assertTrue(result.isCompletedExceptionally()); assertFutureThrows(result, NotLeaderOrFollowerException.class); @@ -4238,14 +4307,14 @@ public class SharePartitionTest { PartitionFactory.newPartitionErrorData(0, Errors.UNKNOWN_SERVER_ERROR.code(), Errors.UNKNOWN_SERVER_ERROR.message()))))); Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult)); - result = sharePartition.writeShareGroupState(Mockito.anyList()); + result = sharePartition.writeShareGroupState(anyList()); assertTrue(result.isCompletedExceptionally()); assertFutureThrows(result, UnknownServerException.class); } @Test public void testWriteShareGroupStateWithNoOpShareStatePersister() { - SharePartition sharePartition = SharePartitionBuilder.builder().build(); + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); List stateBatches = Arrays.asList( new PersisterStateBatch(5L, 10L, RecordState.AVAILABLE.id, (short) 2), new PersisterStateBatch(11L, 15L, RecordState.ARCHIVED.id, (short) 3)); @@ -4257,7 +4326,7 @@ public class SharePartitionTest { @Test public void testMaybeUpdateCachedStateWhenAcknowledgementTypeAccept() { - SharePartition sharePartition = SharePartitionBuilder.builder().build(); + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); sharePartition.acquire(MEMBER_ID, MAX_FETCH_RECORDS, new FetchPartitionData(Errors.NONE, 20, 0, memoryRecords(250, 0), Optional.empty(), OptionalLong.empty(), Optional.empty(), @@ -4279,7 +4348,7 @@ public class SharePartitionTest { @Test public void testMaybeUpdateCachedStateWhenAcknowledgementTypeReject() { - SharePartition sharePartition = SharePartitionBuilder.builder().build(); + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); sharePartition.acquire(MEMBER_ID, MAX_FETCH_RECORDS, new FetchPartitionData(Errors.NONE, 20, 0, memoryRecords(250, 0), Optional.empty(), OptionalLong.empty(), Optional.empty(), @@ -4301,7 +4370,7 @@ public class SharePartitionTest { @Test public void testMaybeUpdateCachedStateWhenAcknowledgementTypeRelease() { - SharePartition sharePartition = SharePartitionBuilder.builder().build(); + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); sharePartition.acquire(MEMBER_ID, MAX_FETCH_RECORDS, new FetchPartitionData(Errors.NONE, 20, 0, memoryRecords(250, 0), Optional.empty(), OptionalLong.empty(), Optional.empty(), @@ -4324,7 +4393,10 @@ public class SharePartitionTest { @Test public void testMaybeUpdateCachedStateWhenAcknowledgementsFromBeginningForBatchSubset() { - SharePartition sharePartition = SharePartitionBuilder.builder().withMaxInflightMessages(20).build(); + SharePartition sharePartition = SharePartitionBuilder.builder() + .withMaxInflightMessages(20) + .withState(SharePartitionState.ACTIVE) + .build(); sharePartition.acquire(MEMBER_ID, MAX_FETCH_RECORDS, new FetchPartitionData(Errors.NONE, 20, 0, memoryRecords(15, 0), Optional.empty(), OptionalLong.empty(), Optional.empty(), @@ -4351,7 +4423,10 @@ public class SharePartitionTest { @Test public void testMaybeUpdateCachedStateWhenAcknowledgementsFromBeginningForEntireBatch() { - SharePartition sharePartition = SharePartitionBuilder.builder().withMaxInflightMessages(20).build(); + SharePartition sharePartition = SharePartitionBuilder.builder() + .withMaxInflightMessages(20) + .withState(SharePartitionState.ACTIVE) + .build(); sharePartition.acquire(MEMBER_ID, MAX_FETCH_RECORDS, new FetchPartitionData(Errors.NONE, 20, 0, memoryRecords(15, 0), Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)); @@ -4377,7 +4452,10 @@ public class SharePartitionTest { @Test public void testMaybeUpdateCachedStateWhenAcknowledgementsInBetween() { - SharePartition sharePartition = SharePartitionBuilder.builder().withMaxInflightMessages(20).build(); + SharePartition sharePartition = SharePartitionBuilder.builder() + .withMaxInflightMessages(20) + .withState(SharePartitionState.ACTIVE) + .build(); sharePartition.acquire(MEMBER_ID, MAX_FETCH_RECORDS, new FetchPartitionData(Errors.NONE, 20, 0, memoryRecords(15, 0), Optional.empty(), OptionalLong.empty(), Optional.empty(), @@ -4407,7 +4485,10 @@ public class SharePartitionTest { @Test public void testMaybeUpdateCachedStateWhenAllRecordsInCachedStateAreAcknowledged() { - SharePartition sharePartition = SharePartitionBuilder.builder().withMaxInflightMessages(20).build(); + SharePartition sharePartition = SharePartitionBuilder.builder() + .withMaxInflightMessages(20) + .withState(SharePartitionState.ACTIVE) + .build(); sharePartition.acquire(MEMBER_ID, MAX_FETCH_RECORDS, new FetchPartitionData(Errors.NONE, 20, 0, memoryRecords(15, 0), Optional.empty(), OptionalLong.empty(), Optional.empty(), @@ -4430,7 +4511,10 @@ public class SharePartitionTest { @Test public void testMaybeUpdateCachedStateMultipleAcquisitionsAndAcknowledgements() { - SharePartition sharePartition = SharePartitionBuilder.builder().withMaxInflightMessages(100).build(); + SharePartition sharePartition = SharePartitionBuilder.builder() + .withMaxInflightMessages(100) + .withState(SharePartitionState.ACTIVE) + .build(); sharePartition.acquire(MEMBER_ID, MAX_FETCH_RECORDS, new FetchPartitionData(Errors.NONE, 20, 0, memoryRecords(20, 0), Optional.empty(), OptionalLong.empty(), Optional.empty(), @@ -4501,7 +4585,7 @@ public class SharePartitionTest { @Test public void testCanAcquireRecordsReturnsTrue() { - SharePartition sharePartition = SharePartitionBuilder.builder().build(); + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); assertEquals(0, sharePartition.startOffset()); assertEquals(0, sharePartition.endOffset()); @@ -4517,7 +4601,7 @@ public class SharePartitionTest { @Test public void testCanAcquireRecordsChangeResponsePostAcknowledgement() { - SharePartition sharePartition = SharePartitionBuilder.builder().build(); + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); assertEquals(0, sharePartition.startOffset()); assertEquals(0, sharePartition.endOffset()); @@ -4546,7 +4630,7 @@ public class SharePartitionTest { @Test public void testCanAcquireRecordsAfterReleaseAcknowledgement() { - SharePartition sharePartition = SharePartitionBuilder.builder().build(); + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); sharePartition.acquire(MEMBER_ID, MAX_FETCH_RECORDS, new FetchPartitionData(Errors.NONE, 20, 0, memoryRecords(150, 0), Optional.empty(), OptionalLong.empty(), Optional.empty(), @@ -4576,7 +4660,7 @@ public class SharePartitionTest { @Test public void testCanAcquireRecordsAfterArchiveAcknowledgement() { - SharePartition sharePartition = SharePartitionBuilder.builder().build(); + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); sharePartition.acquire(MEMBER_ID, MAX_FETCH_RECORDS, new FetchPartitionData(Errors.NONE, 20, 0, memoryRecords(150, 0), Optional.empty(), OptionalLong.empty(), Optional.empty(), @@ -4605,7 +4689,7 @@ public class SharePartitionTest { @Test public void testCanAcquireRecordsAfterAcceptAcknowledgement() { - SharePartition sharePartition = SharePartitionBuilder.builder().build(); + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); sharePartition.acquire(MEMBER_ID, MAX_FETCH_RECORDS, new FetchPartitionData(Errors.NONE, 20, 0, memoryRecords(150, 0), Optional.empty(), OptionalLong.empty(), Optional.empty(), @@ -4635,7 +4719,10 @@ public class SharePartitionTest { public void testAcknowledgeBatchWithWriteShareGroupStateFailure() { Persister persister = Mockito.mock(Persister.class); mockPersisterReadStateMethod(persister); - SharePartition sharePartition = SharePartitionBuilder.builder().withPersister(persister).build(); + SharePartition sharePartition = SharePartitionBuilder.builder() + .withPersister(persister) + .withState(SharePartitionState.ACTIVE) + .build(); // Mock persister writeState method so that sharePartition.isWriteShareGroupStateSuccessful() returns false. WriteShareGroupStateResult writeShareGroupStateResult = Mockito.mock(WriteShareGroupStateResult.class); @@ -4651,7 +4738,7 @@ public class SharePartitionTest { CompletableFuture ackResult = sharePartition.acknowledge(MEMBER_ID, Collections.singletonList(new ShareAcknowledgementBatch(5, 14, Collections.singletonList((byte) 1)))); assertTrue(ackResult.isCompletedExceptionally()); - assertFutureThrows(ackResult, InvalidRequestException.class); + assertFutureThrows(ackResult, UnknownTopicOrPartitionException.class); // Due to failure in writeShareGroupState, the cached state should not be updated. assertEquals(1, sharePartition.cachedState().size()); @@ -4663,7 +4750,10 @@ public class SharePartitionTest { public void testAcknowledgeOffsetWithWriteShareGroupStateFailure() { Persister persister = Mockito.mock(Persister.class); mockPersisterReadStateMethod(persister); - SharePartition sharePartition = SharePartitionBuilder.builder().withPersister(persister).build(); + SharePartition sharePartition = SharePartitionBuilder.builder() + .withPersister(persister) + .withState(SharePartitionState.ACTIVE) + .build(); // Mock persister writeState method so that sharePartition.isWriteShareGroupStateSuccessful() returns false. WriteShareGroupStateResult writeShareGroupStateResult = Mockito.mock(WriteShareGroupStateResult.class); @@ -4700,7 +4790,7 @@ public class SharePartitionTest { @Test public void testAcknowledgeSubsetWithAnotherMember() { - SharePartition sharePartition = SharePartitionBuilder.builder().build(); + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); sharePartition.acquire(MEMBER_ID, MAX_FETCH_RECORDS, new FetchPartitionData(Errors.NONE, 30, 0, memoryRecords(7, 5), Optional.empty(), OptionalLong.empty(), Optional.empty(), @@ -4718,7 +4808,7 @@ public class SharePartitionTest { @Test public void testAcknowledgeWithAnotherMemberRollbackBatchError() { - SharePartition sharePartition = SharePartitionBuilder.builder().build(); + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); sharePartition.acquire(MEMBER_ID, MAX_FETCH_RECORDS, new FetchPartitionData(Errors.NONE, 20, 0, memoryRecords(5, 5), Optional.empty(), OptionalLong.empty(), Optional.empty(), @@ -4755,7 +4845,7 @@ public class SharePartitionTest { @Test public void testAcknowledgeWithAnotherMemberRollbackSubsetError() { - SharePartition sharePartition = SharePartitionBuilder.builder().build(); + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); sharePartition.acquire(MEMBER_ID, MAX_FETCH_RECORDS, new FetchPartitionData(Errors.NONE, 20, 0, memoryRecords(5, 5), Optional.empty(), OptionalLong.empty(), Optional.empty(), @@ -4792,7 +4882,10 @@ public class SharePartitionTest { @Test public void testMaxDeliveryCountLimitExceededForRecordBatch() { - SharePartition sharePartition = SharePartitionBuilder.builder().withMaxDeliveryCount(2).build(); + SharePartition sharePartition = SharePartitionBuilder.builder() + .withMaxDeliveryCount(2) + .withState(SharePartitionState.ACTIVE) + .build(); MemoryRecords records = memoryRecords(10, 5); sharePartition.acquire(MEMBER_ID, MAX_FETCH_RECORDS, new FetchPartitionData(Errors.NONE, 20, 0, records, @@ -4818,7 +4911,10 @@ public class SharePartitionTest { @Test public void testMaxDeliveryCountLimitExceededForRecordsSubset() { - SharePartition sharePartition = SharePartitionBuilder.builder().withMaxDeliveryCount(2).build(); + SharePartition sharePartition = SharePartitionBuilder.builder() + .withMaxDeliveryCount(2) + .withState(SharePartitionState.ACTIVE) + .build(); // First fetch request with 5 records starting from offset 10. MemoryRecords records1 = memoryRecords(5, 10); // Second fetch request with 5 records starting from offset 15. @@ -4859,7 +4955,10 @@ public class SharePartitionTest { @Test public void testMaxDeliveryCountLimitExceededForRecordsSubsetAndCachedStateNotCleared() { - SharePartition sharePartition = SharePartitionBuilder.builder().withMaxDeliveryCount(2).build(); + SharePartition sharePartition = SharePartitionBuilder.builder() + .withMaxDeliveryCount(2) + .withState(SharePartitionState.ACTIVE) + .build(); // First fetch request with 5 records starting from offset 0. MemoryRecords records1 = memoryRecords(5, 0); @@ -4892,7 +4991,7 @@ public class SharePartitionTest { @Test public void testNextFetchOffsetPostAcquireAndAcknowledgeFunctionality() { - SharePartition sharePartition = SharePartitionBuilder.builder().build(); + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); MemoryRecords records1 = memoryRecords(10, 0); String memberId1 = "memberId-1"; String memberId2 = "memberId-2"; @@ -4928,7 +5027,10 @@ public class SharePartitionTest { @Test public void testNextFetchOffsetWithMultipleConsumers() { - SharePartition sharePartition = SharePartitionBuilder.builder().withMaxInflightMessages(100).build(); + SharePartition sharePartition = SharePartitionBuilder.builder() + .withMaxInflightMessages(100) + .withState(SharePartitionState.ACTIVE) + .build(); MemoryRecords records1 = memoryRecords(3, 0); String memberId1 = MEMBER_ID; String memberId2 = "member-2"; @@ -4959,7 +5061,9 @@ public class SharePartitionTest { @Test public void testNumberOfWriteCallsOnUpdates() { - SharePartition sharePartition = Mockito.spy(SharePartitionBuilder.builder().build()); + SharePartition sharePartition = Mockito.spy(SharePartitionBuilder.builder() + .withState(SharePartitionState.ACTIVE) + .build()); sharePartition.acquire(MEMBER_ID, MAX_FETCH_RECORDS, new FetchPartitionData(Errors.NONE, 10, 0, memoryRecords(5, 2), Optional.empty(), OptionalLong.empty(), Optional.empty(), @@ -4978,7 +5082,7 @@ public class SharePartitionTest { @Test public void testReacquireSubsetWithAnotherMember() { - SharePartition sharePartition = SharePartitionBuilder.builder().build(); + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); MemoryRecords records1 = memoryRecords(5, 5); sharePartition.acquire(MEMBER_ID, MAX_FETCH_RECORDS, new FetchPartitionData(Errors.NONE, 30, 0, records1, @@ -5131,6 +5235,7 @@ public class SharePartitionTest { private Persister persister = new NoOpShareStatePersister(); private final ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class); private GroupConfigManager groupConfigManager = Mockito.mock(GroupConfigManager.class); + private SharePartitionState state = SharePartitionState.EMPTY; private SharePartitionBuilder withMaxInflightMessages(int maxInflightMessages) { this.maxInflightMessages = maxInflightMessages; @@ -5157,13 +5262,18 @@ public class SharePartitionTest { return this; } + private SharePartitionBuilder withState(SharePartitionState state) { + this.state = state; + return this; + } + public static SharePartitionBuilder builder() { return new SharePartitionBuilder(); } public SharePartition build() { - return new SharePartition(GROUP_ID, TOPIC_ID_PARTITION, maxInflightMessages, maxDeliveryCount, - defaultAcquisitionLockTimeoutMs, mockTimer, MOCK_TIME, persister, replicaManager, groupConfigManager); + return new SharePartition(GROUP_ID, TOPIC_ID_PARTITION, 0, maxInflightMessages, maxDeliveryCount, + defaultAcquisitionLockTimeoutMs, mockTimer, MOCK_TIME, persister, replicaManager, groupConfigManager, state); } } } diff --git a/server-common/src/main/java/org/apache/kafka/server/storage/log/FetchParams.java b/server-common/src/main/java/org/apache/kafka/server/storage/log/FetchParams.java index 70cf5b2a16d..7980ed52f4a 100644 --- a/server-common/src/main/java/org/apache/kafka/server/storage/log/FetchParams.java +++ b/server-common/src/main/java/org/apache/kafka/server/storage/log/FetchParams.java @@ -33,6 +33,7 @@ public class FetchParams { public final int maxBytes; public final FetchIsolation isolation; public final Optional clientMetadata; + public final boolean shareFetchRequest; public FetchParams(short requestVersion, int replicaId, @@ -42,6 +43,18 @@ public class FetchParams { int maxBytes, FetchIsolation isolation, Optional clientMetadata) { + this(requestVersion, replicaId, replicaEpoch, maxWaitMs, minBytes, maxBytes, isolation, clientMetadata, false); + } + + public FetchParams(short requestVersion, + int replicaId, + long replicaEpoch, + long maxWaitMs, + int minBytes, + int maxBytes, + FetchIsolation isolation, + Optional clientMetadata, + boolean shareFetchRequest) { Objects.requireNonNull(isolation); Objects.requireNonNull(clientMetadata); this.requestVersion = requestVersion; @@ -52,6 +65,7 @@ public class FetchParams { this.maxBytes = maxBytes; this.isolation = isolation; this.clientMetadata = clientMetadata; + this.shareFetchRequest = shareFetchRequest; } public boolean isFromFollower() { @@ -67,7 +81,7 @@ public class FetchParams { } public boolean fetchOnlyLeader() { - return isFromFollower() || (isFromConsumer() && !clientMetadata.isPresent()); + return isFromFollower() || (isFromConsumer() && !clientMetadata.isPresent()) || shareFetchRequest; } public boolean hardMaxBytesLimit() { @@ -86,7 +100,8 @@ public class FetchParams { && minBytes == that.minBytes && maxBytes == that.maxBytes && isolation.equals(that.isolation) - && clientMetadata.equals(that.clientMetadata); + && clientMetadata.equals(that.clientMetadata) + && shareFetchRequest == that.shareFetchRequest; } @Override @@ -99,6 +114,7 @@ public class FetchParams { result = 31 * result + maxBytes; result = 31 * result + isolation.hashCode(); result = 31 * result + clientMetadata.hashCode(); + result = 31 * result + Boolean.hashCode(shareFetchRequest); return result; } @@ -113,6 +129,7 @@ public class FetchParams { ", maxBytes=" + maxBytes + ", isolation=" + isolation + ", clientMetadata=" + clientMetadata + + ", shareFetchRequest=" + shareFetchRequest + ')'; } }