KAFKA-17002: Integrated partition leader epoch for Persister APIs (KIP-932) (#16842)

The PR integrates leader epoch for partition while invoking Persister APIs. The write RPC is retried once on leader epoch failure.

Reviewers: Abhinav Dixit <adixit@confluent.io>, Andrew Schofield <aschofield@confluent.io>, Jun Rao <junrao@gmail.com>, David Arthur <mumrah@gmail.com>
This commit is contained in:
Apoorv Mittal 2024-10-30 21:41:39 +00:00 committed by GitHub
parent fb65dfeb11
commit ff116df015
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 820 additions and 344 deletions

View File

@ -127,7 +127,7 @@ public class DelayedShareFetch extends DelayedOperation {
shareFetchData.future().complete(result);
} catch (Exception e) {
log.error("Error processing delayed share fetch request", e);
shareFetchData.future().completeExceptionally(e);
sharePartitionManager.handleFetchException(shareFetchData.groupId(), topicPartitionData.keySet(), shareFetchData.future(), e);
} finally {
// Releasing the lock to move ahead with the next request in queue.
releasePartitionLocks(shareFetchData.groupId(), topicPartitionData.keySet());

View File

@ -16,9 +16,12 @@
*/
package kafka.server.share;
import kafka.cluster.Partition;
import kafka.server.ReplicaManager;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.message.ShareFetchResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.FileRecords;
@ -128,4 +131,13 @@ public class ShareFetchUtils {
Optional.empty(), true).timestampAndOffsetOpt();
return timestampAndOffset.isEmpty() ? (long) 0 : timestampAndOffset.get().offset;
}
static int leaderEpoch(ReplicaManager replicaManager, TopicPartition tp) {
Partition partition = replicaManager.getPartitionOrException(tp);
if (!partition.isLeader()) {
log.debug("The broker is not the leader for topic partition: {}-{}", tp.topic(), tp.partition());
throw new NotLeaderOrFollowerException();
}
return partition.getLeaderEpoch();
}
}

View File

@ -23,11 +23,13 @@ import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
import org.apache.kafka.common.errors.FencedStateEpochException;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.InvalidRecordStateException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.message.ShareFetchResponseData.AcquiredRecords;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.RecordBatch;
@ -103,7 +105,11 @@ public class SharePartition {
/**
* The share partition failed to initialize with persisted state.
*/
FAILED
FAILED,
/**
* The share partition is fenced and cannot be used.
*/
FENCED
}
/**
@ -181,6 +187,11 @@ public class SharePartition {
*/
private final TopicIdPartition topicIdPartition;
/**
* The leader epoch is used to track the partition epoch.
*/
private final int leaderEpoch;
/**
* The in-flight record is used to track the state of a record that has been fetched from the
* leader. The state of the record is used to determine if the record should be re-fetched or if it
@ -280,6 +291,7 @@ public class SharePartition {
SharePartition(
String groupId,
TopicIdPartition topicIdPartition,
int leaderEpoch,
int maxInFlightMessages,
int maxDeliveryCount,
int defaultRecordLockDurationMs,
@ -288,9 +300,28 @@ public class SharePartition {
Persister persister,
ReplicaManager replicaManager,
GroupConfigManager groupConfigManager
) {
this(groupId, topicIdPartition, leaderEpoch, maxInFlightMessages, maxDeliveryCount, defaultRecordLockDurationMs,
timer, time, persister, replicaManager, groupConfigManager, SharePartitionState.EMPTY);
}
SharePartition(
String groupId,
TopicIdPartition topicIdPartition,
int leaderEpoch,
int maxInFlightMessages,
int maxDeliveryCount,
int defaultRecordLockDurationMs,
Timer timer,
Time time,
Persister persister,
ReplicaManager replicaManager,
GroupConfigManager groupConfigManager,
SharePartitionState sharePartitionState
) {
this.groupId = groupId;
this.topicIdPartition = topicIdPartition;
this.leaderEpoch = leaderEpoch;
this.maxInFlightMessages = maxInFlightMessages;
this.maxDeliveryCount = maxDeliveryCount;
this.cachedState = new ConcurrentSkipListMap<>();
@ -301,7 +332,7 @@ public class SharePartition {
this.timer = timer;
this.time = time;
this.persister = persister;
this.partitionState = SharePartitionState.EMPTY;
this.partitionState = sharePartitionState;
this.replicaManager = replicaManager;
this.groupConfigManager = groupConfigManager;
}
@ -341,7 +372,7 @@ public class SharePartition {
.setGroupTopicPartitionData(new GroupTopicPartitionData.Builder<PartitionIdLeaderEpochData>()
.setGroupId(this.groupId)
.setTopicsData(Collections.singletonList(new TopicData<>(topicIdPartition.topicId(),
Collections.singletonList(PartitionFactory.newPartitionIdLeaderEpochData(topicIdPartition.partition(), 0)))))
Collections.singletonList(PartitionFactory.newPartitionIdLeaderEpochData(topicIdPartition.partition(), leaderEpoch)))))
.build())
.build()
).whenComplete((result, exception) -> {
@ -520,13 +551,14 @@ public class SharePartition {
* @param fetchPartitionData The fetched records for the share partition.
* @return The acquired records for the share partition.
*/
@SuppressWarnings("cyclomaticcomplexity") // Consider refactoring to avoid suppression
public ShareAcquiredRecords acquire(
String memberId,
int maxFetchRecords,
FetchPartitionData fetchPartitionData
) {
log.trace("Received acquire request for share partition: {}-{} memberId: {}", groupId, topicIdPartition, memberId);
if (maxFetchRecords <= 0) {
if (stateNotActive() || maxFetchRecords <= 0) {
// Nothing to acquire.
return ShareAcquiredRecords.empty();
}
@ -1040,7 +1072,7 @@ public class SharePartition {
* @return A boolean which indicates whether the fetch lock is acquired.
*/
boolean maybeAcquireFetchLock() {
if (partitionState() != SharePartitionState.ACTIVE) {
if (stateNotActive()) {
return false;
}
return fetchLock.compareAndSet(false, true);
@ -1053,6 +1085,22 @@ public class SharePartition {
fetchLock.set(false);
}
/**
* Marks the share partition as fenced.
*/
void markFenced() {
lock.writeLock().lock();
try {
partitionState = SharePartitionState.FENCED;
} finally {
lock.writeLock().unlock();
}
}
private boolean stateNotActive() {
return partitionState() != SharePartitionState.ACTIVE;
}
private void completeInitializationWithException(CompletableFuture<Void> future, Throwable exception) {
lock.writeLock().lock();
try {
@ -1075,6 +1123,9 @@ public class SharePartition {
case INITIALIZING:
future.completeExceptionally(new LeaderNotAvailableException(String.format("Share partition is already initializing %s-%s", groupId, topicIdPartition)));
return;
case FENCED:
future.completeExceptionally(new FencedStateEpochException(String.format("Share partition is fenced %s-%s", groupId, topicIdPartition)));
return;
case EMPTY:
// Do not complete the future as the share partition is not yet initialized.
break;
@ -1743,7 +1794,7 @@ public class SharePartition {
.setGroupId(this.groupId)
.setTopicsData(Collections.singletonList(new TopicData<>(topicIdPartition.topicId(),
Collections.singletonList(PartitionFactory.newPartitionStateBatchData(
topicIdPartition.partition(), stateEpoch, startOffset, 0, stateBatches))))
topicIdPartition.partition(), stateEpoch, startOffset, leaderEpoch, stateBatches))))
).build()).build())
.whenComplete((result, exception) -> {
if (exception != null) {
@ -1792,8 +1843,9 @@ public class SharePartition {
case COORDINATOR_LOAD_IN_PROGRESS:
return new CoordinatorNotAvailableException(errorMessage);
case GROUP_ID_NOT_FOUND:
return new GroupIdNotFoundException(errorMessage);
case UNKNOWN_TOPIC_OR_PARTITION:
return new InvalidRequestException(errorMessage);
return new UnknownTopicOrPartitionException(errorMessage);
case FENCED_STATE_EPOCH:
return new FencedStateEpochException(errorMessage);
case FENCED_LEADER_EPOCH:

View File

@ -23,8 +23,10 @@ import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.FencedStateEpochException;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.message.ShareAcknowledgeResponseData;
import org.apache.kafka.common.message.ShareFetchResponseData.PartitionData;
import org.apache.kafka.common.metrics.Metrics;
@ -69,6 +71,7 @@ import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import scala.jdk.javaapi.CollectionConverters;
@ -271,11 +274,13 @@ public class SharePartitionManager implements AutoCloseable {
this.shareGroupMetrics.shareAcknowledgement();
Map<TopicIdPartition, CompletableFuture<Errors>> futures = new HashMap<>();
acknowledgeTopics.forEach((topicIdPartition, acknowledgePartitionBatches) -> {
SharePartition sharePartition = partitionCacheMap.get(sharePartitionKey(groupId, topicIdPartition));
SharePartitionKey sharePartitionKey = sharePartitionKey(groupId, topicIdPartition);
SharePartition sharePartition = partitionCacheMap.get(sharePartitionKey);
if (sharePartition != null) {
CompletableFuture<Errors> future = new CompletableFuture<>();
sharePartition.acknowledge(memberId, acknowledgePartitionBatches).whenComplete((result, throwable) -> {
if (throwable != null) {
handleFencedSharePartitionException(sharePartitionKey, throwable);
future.complete(Errors.forException(throwable));
return;
}
@ -339,7 +344,8 @@ public class SharePartitionManager implements AutoCloseable {
Map<TopicIdPartition, CompletableFuture<Errors>> futuresMap = new HashMap<>();
topicIdPartitions.forEach(topicIdPartition -> {
SharePartition sharePartition = partitionCacheMap.get(sharePartitionKey(groupId, topicIdPartition));
SharePartitionKey sharePartitionKey = sharePartitionKey(groupId, topicIdPartition);
SharePartition sharePartition = partitionCacheMap.get(sharePartitionKey);
if (sharePartition == null) {
log.error("No share partition found for groupId {} topicPartition {} while releasing acquired topic partitions", groupId, topicIdPartition);
futuresMap.put(topicIdPartition, CompletableFuture.completedFuture(Errors.UNKNOWN_TOPIC_OR_PARTITION));
@ -347,6 +353,7 @@ public class SharePartitionManager implements AutoCloseable {
CompletableFuture<Errors> future = new CompletableFuture<>();
sharePartition.releaseAcquiredRecords(memberId).whenComplete((result, throwable) -> {
if (throwable != null) {
handleFencedSharePartitionException(sharePartitionKey, throwable);
future.complete(Errors.forException(throwable));
return;
}
@ -490,6 +497,30 @@ public class SharePartitionManager implements AutoCloseable {
}
}
/**
* The handleFetchException method is used to handle the exception that occurred while reading from log.
* The method will handle the exception for each topic-partition in the request. The share partition
* might get removed from the cache.
* <p>
* The replica read request might error out for one share partition
* but as we cannot determine which share partition errored out, we might remove all the share partitions
* in the request.
*
* @param groupId The group id in the share fetch request.
* @param topicIdPartitions The topic-partitions in the replica read request.
* @param future The future to complete with the exception.
* @param throwable The exception that occurred while fetching messages.
*/
public void handleFetchException(
String groupId,
Set<TopicIdPartition> topicIdPartitions,
CompletableFuture<Map<TopicIdPartition, PartitionData>> future,
Throwable throwable
) {
topicIdPartitions.forEach(topicIdPartition -> handleFencedSharePartitionException(sharePartitionKey(groupId, topicIdPartition), throwable));
maybeCompleteShareFetchWithException(future, topicIdPartitions, throwable);
}
/**
* The cachedTopicIdPartitionsInShareSession method is used to get the cached topic-partitions in the share session.
*
@ -540,57 +571,76 @@ public class SharePartitionManager implements AutoCloseable {
return;
}
try {
shareFetchData.partitionMaxBytes().keySet().forEach(topicIdPartition -> {
SharePartitionKey sharePartitionKey = sharePartitionKey(
shareFetchData.groupId(),
topicIdPartition
);
SharePartition sharePartition = getOrCreateSharePartition(sharePartitionKey);
// Initialize lazily, if required.
Map<TopicIdPartition, Throwable> erroneous = null;
Set<DelayedShareFetchKey> delayedShareFetchWatchKeys = new HashSet<>();
for (TopicIdPartition topicIdPartition : shareFetchData.partitionMaxBytes().keySet()) {
SharePartitionKey sharePartitionKey = sharePartitionKey(
shareFetchData.groupId(),
topicIdPartition
);
// The share partition is initialized asynchronously, so we need to wait for it to be initialized.
// But if the share partition is already initialized, then the future will be completed immediately.
// Hence, it's safe to call the maybeInitialize method and then wait for the future to be completed.
// TopicPartitionData list will be populated only if the share partition is already initialized.
sharePartition.maybeInitialize().whenComplete((result, throwable) -> {
if (throwable != null) {
maybeCompleteInitializationWithException(sharePartitionKey, shareFetchData.future(), throwable);
}
});
});
Set<DelayedShareFetchKey> delayedShareFetchWatchKeys = new HashSet<>();
shareFetchData.partitionMaxBytes().keySet().forEach(
topicIdPartition -> {
// We add a key corresponding to each share partition in the request in the group so that when there are
// acknowledgements/acquisition lock timeout etc, we have a way to perform checkAndComplete for all
// such requests which are delayed because of lack of data to acquire for the share partition.
delayedShareFetchWatchKeys.add(new DelayedShareFetchGroupKey(shareFetchData.groupId(), topicIdPartition.topicId(), topicIdPartition.partition()));
// We add a key corresponding to each topic partition in the request so that when the HWM is updated
// for any topic partition, we have a way to perform checkAndComplete for all such requests which are
// delayed because of lack of data to acquire for the topic partition.
delayedShareFetchWatchKeys.add(new DelayedShareFetchPartitionKey(topicIdPartition.topicId(), topicIdPartition.partition()));
});
// Add the share fetch to the delayed share fetch purgatory to process the fetch request.
addDelayedShareFetch(new DelayedShareFetch(shareFetchData, replicaManager, this),
delayedShareFetchWatchKeys);
} catch (Exception e) {
// In case exception occurs then release the locks so queue can be further processed.
log.error("Error processing fetch queue for share partitions", e);
if (!shareFetchData.future().isDone()) {
shareFetchData.future().completeExceptionally(e);
SharePartition sharePartition;
try {
sharePartition = getOrCreateSharePartition(sharePartitionKey);
} catch (Exception e) {
// Complete the whole fetch request with an exception if there is an error processing.
// The exception currently can be thrown only if there is an error while initializing
// the share partition. But skip the processing for other share partitions in the request
// as this situation is not expected.
log.error("Error processing share fetch request", e);
if (erroneous == null) {
erroneous = new HashMap<>();
}
erroneous.put(topicIdPartition, e);
// Continue iteration for other partitions in the request.
continue;
}
// We add a key corresponding to each share partition in the request in the group so that when there are
// acknowledgements/acquisition lock timeout etc., we have a way to perform checkAndComplete for all
// such requests which are delayed because of lack of data to acquire for the share partition.
delayedShareFetchWatchKeys.add(new DelayedShareFetchGroupKey(shareFetchData.groupId(), topicIdPartition.topicId(), topicIdPartition.partition()));
// We add a key corresponding to each topic partition in the request so that when the HWM is updated
// for any topic partition, we have a way to perform checkAndComplete for all such requests which are
// delayed because of lack of data to acquire for the topic partition.
delayedShareFetchWatchKeys.add(new DelayedShareFetchPartitionKey(topicIdPartition.topicId(), topicIdPartition.partition()));
// The share partition is initialized asynchronously, so we need to wait for it to be initialized.
// But if the share partition is already initialized, then the future will be completed immediately.
// Hence, it's safe to call the maybeInitialize method and then wait for the future to be completed.
// TopicPartitionData list will be populated only if the share partition is already initialized.
sharePartition.maybeInitialize().whenComplete((result, throwable) -> {
if (throwable != null) {
// TODO: Complete error handling for initialization. We have to record the error
// for respective share partition as completing the full request might result in
// some acquired records to not being sent: https://issues.apache.org/jira/browse/KAFKA-17510
maybeCompleteInitializationWithException(sharePartitionKey, shareFetchData.future(), throwable);
}
});
}
// If all the partitions in the request errored out, then complete the fetch request with an exception.
if (erroneous != null && erroneous.size() == shareFetchData.partitionMaxBytes().size()) {
completeShareFetchWithException(shareFetchData.future(), erroneous);
// Do not proceed with share fetch processing as all the partitions errored out.
return;
}
// TODO: If there exists some erroneous partitions then they will not be part of response.
// Add the share fetch to the delayed share fetch purgatory to process the fetch request.
addDelayedShareFetch(new DelayedShareFetch(shareFetchData, replicaManager, this), delayedShareFetchWatchKeys);
}
private SharePartition getOrCreateSharePartition(SharePartitionKey sharePartitionKey) {
return partitionCacheMap.computeIfAbsent(sharePartitionKey,
k -> {
long start = time.hiResClockMs();
int leaderEpoch = ShareFetchUtils.leaderEpoch(replicaManager, sharePartitionKey.topicIdPartition().topicPartition());
SharePartition partition = new SharePartition(
sharePartitionKey.groupId(),
sharePartitionKey.topicIdPartition(),
leaderEpoch,
maxInFlightMessages,
maxDeliveryCount,
defaultRecordLockDurationMs,
@ -617,22 +667,47 @@ public class SharePartitionManager implements AutoCloseable {
return;
}
if (throwable instanceof NotLeaderOrFollowerException || throwable instanceof FencedStateEpochException) {
// Remove the partition from the cache as it's failed to initialize.
partitionCacheMap.remove(sharePartitionKey);
// The partition initialization failed, so complete the request with the exception.
// The server should not be in this state, so log the error on broker and surface the same
// to the client. The broker should not be in this state, investigate the root cause of the error.
log.error("Error initializing share partition with key {}", sharePartitionKey, throwable);
maybeCompleteShareFetchWithException(future, Collections.singletonList(sharePartitionKey.topicIdPartition()), throwable);
}
private void handleFencedSharePartitionException(
SharePartitionKey sharePartitionKey,
Throwable throwable
) {
if (throwable instanceof NotLeaderOrFollowerException || throwable instanceof FencedStateEpochException ||
throwable instanceof GroupIdNotFoundException || throwable instanceof UnknownTopicOrPartitionException) {
log.info("The share partition with key {} is fenced: {}", sharePartitionKey, throwable.getMessage());
// The share partition is fenced hence remove the partition from map and let the client retry.
// But surface the error to the client so client might take some action i.e. re-fetch
// the metadata and retry the fetch on new leader.
partitionCacheMap.remove(sharePartitionKey);
future.completeExceptionally(throwable);
return;
SharePartition sharePartition = partitionCacheMap.remove(sharePartitionKey);
if (sharePartition != null) {
sharePartition.markFenced();
}
}
}
// The partition initialization failed, so complete the request with the exception.
// The server should not be in this state, so log the error on broker and surface the same
// to the client. As of now this state is in-recoverable for the broker, and we should
// investigate the root cause of the error.
log.error("Error initializing share partition with key {}", sharePartitionKey, throwable);
future.completeExceptionally(throwable);
private void maybeCompleteShareFetchWithException(CompletableFuture<Map<TopicIdPartition, PartitionData>> future,
Collection<TopicIdPartition> topicIdPartitions, Throwable throwable) {
if (!future.isDone()) {
future.complete(topicIdPartitions.stream().collect(Collectors.toMap(
tp -> tp, tp -> new PartitionData().setErrorCode(Errors.forException(throwable).code()).setErrorMessage(throwable.getMessage()))));
}
}
private void completeShareFetchWithException(CompletableFuture<Map<TopicIdPartition, PartitionData>> future,
Map<TopicIdPartition, Throwable> erroneous) {
future.complete(erroneous.entrySet().stream().collect(Collectors.toMap(
Map.Entry::getKey, entry -> {
Throwable t = entry.getValue();
return new PartitionData().setErrorCode(Errors.forException(t).code()).setErrorMessage(t.getMessage());
})));
}
private SharePartitionKey sharePartitionKey(String groupId, TopicIdPartition topicIdPartition) {

View File

@ -4249,7 +4249,8 @@ class KafkaApis(val requestChannel: RequestChannel,
fetchMinBytes,
fetchMaxBytes,
FetchIsolation.HIGH_WATERMARK,
clientMetadata
clientMetadata,
true
)
// call the share partition manager to fetch messages from the local replica.

View File

@ -76,9 +76,9 @@ public class DelayedShareFetchTest {
private static final int MAX_FETCH_RECORDS = 100;
private static final FetchParams FETCH_PARAMS = new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(),
FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS, 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK,
Optional.empty());
Optional.empty(), true);
private static Timer mockTimer;
private Timer mockTimer;
@BeforeEach
public void setUp() {

View File

@ -67,6 +67,10 @@ import static org.mockito.Mockito.when;
public class ShareFetchUtilsTest {
private static final FetchParams FETCH_PARAMS = new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(),
FetchRequest.ORDINARY_CONSUMER_ID, -1, 0, 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK,
Optional.empty(), true);
@Test
public void testProcessFetchResponse() {
String groupId = "grp";
@ -97,22 +101,20 @@ public class ShareFetchUtilsTest {
when(sharePartitionManager.sharePartition(groupId, tp0)).thenReturn(sp0);
when(sharePartitionManager.sharePartition(groupId, tp1)).thenReturn(sp1);
ShareFetchData shareFetchData = new ShareFetchData(
new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, 0,
1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, memberId,
new CompletableFuture<>(), partitionMaxBytes, 100);
ShareFetchData shareFetchData = new ShareFetchData(FETCH_PARAMS, groupId, memberId,
new CompletableFuture<>(), partitionMaxBytes, 100);
MemoryRecords records = MemoryRecords.withRecords(Compression.NONE,
new SimpleRecord("0".getBytes(), "v".getBytes()),
new SimpleRecord("1".getBytes(), "v".getBytes()),
new SimpleRecord("2".getBytes(), "v".getBytes()),
new SimpleRecord(null, "value".getBytes()));
new SimpleRecord("0".getBytes(), "v".getBytes()),
new SimpleRecord("1".getBytes(), "v".getBytes()),
new SimpleRecord("2".getBytes(), "v".getBytes()),
new SimpleRecord(null, "value".getBytes()));
MemoryRecords records1 = MemoryRecords.withRecords(100L, Compression.NONE,
new SimpleRecord("0".getBytes(), "v".getBytes()),
new SimpleRecord("1".getBytes(), "v".getBytes()),
new SimpleRecord("2".getBytes(), "v".getBytes()),
new SimpleRecord(null, "value".getBytes()));
new SimpleRecord("0".getBytes(), "v".getBytes()),
new SimpleRecord("1".getBytes(), "v".getBytes()),
new SimpleRecord("2".getBytes(), "v".getBytes()),
new SimpleRecord(null, "value".getBytes()));
Map<TopicIdPartition, FetchPartitionData> responseData = new HashMap<>();
responseData.put(tp0, new FetchPartitionData(Errors.NONE, 0L, 0L,
@ -165,20 +167,18 @@ public class ShareFetchUtilsTest {
when(sharePartitionManager.sharePartition(groupId, tp0)).thenReturn(sp0);
when(sharePartitionManager.sharePartition(groupId, tp1)).thenReturn(sp1);
ShareFetchData shareFetchData = new ShareFetchData(
new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, 0,
1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, memberId,
new CompletableFuture<>(), partitionMaxBytes, 100);
ShareFetchData shareFetchData = new ShareFetchData(FETCH_PARAMS, groupId, memberId,
new CompletableFuture<>(), partitionMaxBytes, 100);
Map<TopicIdPartition, FetchPartitionData> responseData = new HashMap<>();
responseData.put(tp0, new FetchPartitionData(Errors.NONE, 0L, 0L,
MemoryRecords.EMPTY, Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
MemoryRecords.EMPTY, Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
responseData.put(tp1, new FetchPartitionData(Errors.NONE, 0L, 0L,
MemoryRecords.EMPTY, Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
MemoryRecords.EMPTY, Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
Map<TopicIdPartition, ShareFetchResponseData.PartitionData> resultData =
ShareFetchUtils.processFetchResponse(shareFetchData, responseData, sharePartitionManager, mock(ReplicaManager.class));
ShareFetchUtils.processFetchResponse(shareFetchData, responseData, sharePartitionManager, mock(ReplicaManager.class));
assertEquals(2, resultData.size());
assertTrue(resultData.containsKey(tp0));
@ -209,10 +209,8 @@ public class ShareFetchUtilsTest {
when(sharePartitionManager.sharePartition(groupId, tp0)).thenReturn(sp0);
when(sharePartitionManager.sharePartition(groupId, tp1)).thenReturn(sp1);
ShareFetchData shareFetchData = new ShareFetchData(
new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, 0,
1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()),
groupId, Uuid.randomUuid().toString(), new CompletableFuture<>(), partitionMaxBytes, 100);
ShareFetchData shareFetchData = new ShareFetchData(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(),
new CompletableFuture<>(), partitionMaxBytes, 100);
ReplicaManager replicaManager = mock(ReplicaManager.class);
@ -222,6 +220,7 @@ public class ShareFetchUtilsTest {
when(sp0.nextFetchOffset()).thenReturn((long) 0, (long) 5);
when(sp1.nextFetchOffset()).thenReturn((long) 4, (long) 4);
when(sp0.acquire(anyString(), anyInt(), any(FetchPartitionData.class))).thenReturn(
ShareAcquiredRecords.empty(),
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords()
@ -235,20 +234,20 @@ public class ShareFetchUtilsTest {
doNothing().when(sp0).updateCacheAndOffsets(any(Long.class));
MemoryRecords records1 = MemoryRecords.withRecords(Compression.NONE,
new SimpleRecord("0".getBytes(), "v".getBytes()),
new SimpleRecord("1".getBytes(), "v".getBytes()),
new SimpleRecord("2".getBytes(), "v".getBytes()),
new SimpleRecord(null, "value".getBytes()));
new SimpleRecord("0".getBytes(), "v".getBytes()),
new SimpleRecord("1".getBytes(), "v".getBytes()),
new SimpleRecord("2".getBytes(), "v".getBytes()),
new SimpleRecord(null, "value".getBytes()));
Map<TopicIdPartition, FetchPartitionData> responseData1 = new HashMap<>();
responseData1.put(tp0, new FetchPartitionData(Errors.OFFSET_OUT_OF_RANGE, 0L, 0L,
MemoryRecords.EMPTY, Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
MemoryRecords.EMPTY, Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
responseData1.put(tp1, new FetchPartitionData(Errors.NONE, 0L, 0L,
records1, Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
records1, Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
Map<TopicIdPartition, ShareFetchResponseData.PartitionData> resultData1 =
ShareFetchUtils.processFetchResponse(shareFetchData, responseData1, sharePartitionManager, replicaManager);
ShareFetchUtils.processFetchResponse(shareFetchData, responseData1, sharePartitionManager, replicaManager);
assertEquals(2, resultData1.size());
assertTrue(resultData1.containsKey(tp0));
@ -264,20 +263,20 @@ public class ShareFetchUtilsTest {
Mockito.verify(sp1, times(0)).updateCacheAndOffsets(any(Long.class));
MemoryRecords records2 = MemoryRecords.withRecords(100L, Compression.NONE,
new SimpleRecord("0".getBytes(), "v".getBytes()),
new SimpleRecord("1".getBytes(), "v".getBytes()),
new SimpleRecord("2".getBytes(), "v".getBytes()),
new SimpleRecord(null, "value".getBytes()));
new SimpleRecord("0".getBytes(), "v".getBytes()),
new SimpleRecord("1".getBytes(), "v".getBytes()),
new SimpleRecord("2".getBytes(), "v".getBytes()),
new SimpleRecord(null, "value".getBytes()));
Map<TopicIdPartition, FetchPartitionData> responseData2 = new HashMap<>();
responseData2.put(tp0, new FetchPartitionData(Errors.NONE, 0L, 0L,
records2, Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
records2, Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
responseData2.put(tp1, new FetchPartitionData(Errors.NONE, 0L, 0L,
MemoryRecords.EMPTY, Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
MemoryRecords.EMPTY, Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
Map<TopicIdPartition, ShareFetchResponseData.PartitionData> resultData2 =
ShareFetchUtils.processFetchResponse(shareFetchData, responseData2, sharePartitionManager, replicaManager);
ShareFetchUtils.processFetchResponse(shareFetchData, responseData2, sharePartitionManager, replicaManager);
assertEquals(2, resultData2.size());
assertTrue(resultData2.containsKey(tp0));
@ -304,10 +303,8 @@ public class ShareFetchUtilsTest {
SharePartitionManager sharePartitionManager = mock(SharePartitionManager.class);
when(sharePartitionManager.sharePartition(groupId, tp0)).thenReturn(sp0);
ShareFetchData shareFetchData = new ShareFetchData(
new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, 0,
1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()),
groupId, Uuid.randomUuid().toString(), new CompletableFuture<>(), partitionMaxBytes, 100);
ShareFetchData shareFetchData = new ShareFetchData(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(),
new CompletableFuture<>(), partitionMaxBytes, 100);
ReplicaManager replicaManager = mock(ReplicaManager.class);

View File

@ -16,6 +16,7 @@
*/
package kafka.server.share;
import kafka.cluster.Partition;
import kafka.server.DelayedOperationPurgatory;
import kafka.server.LogReadResult;
import kafka.server.ReplicaManager;
@ -31,11 +32,13 @@ import org.apache.kafka.common.errors.FencedStateEpochException;
import org.apache.kafka.common.errors.InvalidRecordStateException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.InvalidShareSessionEpochException;
import org.apache.kafka.common.errors.KafkaStorageException;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.errors.ShareSessionNotFoundException;
import org.apache.kafka.common.message.ShareAcknowledgeResponseData;
import org.apache.kafka.common.message.ShareFetchResponseData;
import org.apache.kafka.common.message.ShareFetchResponseData.PartitionData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
@ -56,6 +59,7 @@ import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
import org.apache.kafka.server.share.context.FinalContext;
import org.apache.kafka.server.share.context.ShareFetchContext;
import org.apache.kafka.server.share.context.ShareSessionContext;
import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
import org.apache.kafka.server.share.fetch.ShareFetchData;
import org.apache.kafka.server.share.persister.NoOpShareStatePersister;
import org.apache.kafka.server.share.persister.Persister;
@ -102,7 +106,6 @@ import scala.Tuple2;
import scala.collection.Seq;
import scala.jdk.javaapi.CollectionConverters;
import static org.apache.kafka.test.TestUtils.assertFutureThrows;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@ -113,9 +116,12 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.atMost;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
@ -133,12 +139,13 @@ public class SharePartitionManagerTest {
private static final int DELAYED_SHARE_FETCH_TIMEOUT_MS = 3000;
private static final FetchParams FETCH_PARAMS = new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(),
FetchRequest.ORDINARY_CONSUMER_ID, -1, DELAYED_SHARE_FETCH_MAX_WAIT_MS,
1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty());
1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty(), true);
static final int PARTITION_MAX_BYTES = 40000;
static final int DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL = 1000;
private static Timer mockTimer;
private Timer mockTimer;
private ReplicaManager mockReplicaManager;
private static final List<TopicIdPartition> EMPTY_PART_LIST = Collections.unmodifiableList(new ArrayList<>());
@ -146,6 +153,9 @@ public class SharePartitionManagerTest {
public void setUp() {
mockTimer = new SystemTimerReaper("sharePartitionManagerTestReaper",
new SystemTimer("sharePartitionManagerTestTimer"));
mockReplicaManager = mock(ReplicaManager.class);
Partition partition = mockPartition();
when(mockReplicaManager.getPartitionOrException(Mockito.any())).thenReturn(partition);
}
@AfterEach
@ -1026,34 +1036,33 @@ public class SharePartitionManagerTest {
partitionMaxBytes.put(tp5, PARTITION_MAX_BYTES);
partitionMaxBytes.put(tp6, PARTITION_MAX_BYTES);
ReplicaManager replicaManager = mock(ReplicaManager.class);
Time time = mock(Time.class);
when(time.hiResClockMs()).thenReturn(0L).thenReturn(100L);
Metrics metrics = new Metrics();
DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
"TestShareFetch", mockTimer, replicaManager.localBrokerId(),
"TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(),
DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
mockReplicaManagerDelayedShareFetch(replicaManager, delayedShareFetchPurgatory);
mockReplicaManagerDelayedShareFetch(mockReplicaManager, delayedShareFetchPurgatory);
SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder()
.withReplicaManager(replicaManager)
.withReplicaManager(mockReplicaManager)
.withTime(time)
.withMetrics(metrics)
.withTimer(mockTimer)
.build();
doAnswer(invocation -> buildLogReadResult(partitionMaxBytes.keySet())).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
doAnswer(invocation -> buildLogReadResult(partitionMaxBytes.keySet())).when(mockReplicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
sharePartitionManager.fetchMessages(groupId, memberId1.toString(), FETCH_PARAMS, partitionMaxBytes);
Mockito.verify(replicaManager, times(1)).readFromLog(
Mockito.verify(mockReplicaManager, times(1)).readFromLog(
any(), any(), any(ReplicaQuota.class), anyBoolean());
sharePartitionManager.fetchMessages(groupId, memberId1.toString(), FETCH_PARAMS, partitionMaxBytes);
Mockito.verify(replicaManager, times(2)).readFromLog(
Mockito.verify(mockReplicaManager, times(2)).readFromLog(
any(), any(), any(ReplicaQuota.class), anyBoolean());
sharePartitionManager.fetchMessages(groupId, memberId1.toString(), FETCH_PARAMS, partitionMaxBytes);
Mockito.verify(replicaManager, times(3)).readFromLog(
Mockito.verify(mockReplicaManager, times(3)).readFromLog(
any(), any(), any(ReplicaQuota.class), anyBoolean());
Map<MetricName, Consumer<Double>> expectedMetrics = new HashMap<>();
@ -1089,15 +1098,14 @@ public class SharePartitionManagerTest {
partitionMaxBytes.put(tp3, PARTITION_MAX_BYTES);
final Time time = new MockTime(0, System.currentTimeMillis(), 0);
ReplicaManager replicaManager = mock(ReplicaManager.class);
DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
"TestShareFetch", mockTimer, replicaManager.localBrokerId(),
"TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(),
DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
mockReplicaManagerDelayedShareFetch(replicaManager, delayedShareFetchPurgatory);
mockReplicaManagerDelayedShareFetch(mockReplicaManager, delayedShareFetchPurgatory);
SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder()
.withTime(time)
.withReplicaManager(replicaManager)
.withReplicaManager(mockReplicaManager)
.withTimer(mockTimer)
.build();
@ -1141,7 +1149,7 @@ public class SharePartitionManagerTest {
assertEquals(26, sp2.nextFetchOffset());
assertEquals(16, sp3.nextFetchOffset());
return buildLogReadResult(partitionMaxBytes.keySet());
}).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
}).when(mockReplicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
int threadCount = 100;
ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
@ -1160,9 +1168,9 @@ public class SharePartitionManagerTest {
executorService.shutdown();
}
// We are checking the number of replicaManager readFromLog() calls
Mockito.verify(replicaManager, atMost(100)).readFromLog(
Mockito.verify(mockReplicaManager, atMost(100)).readFromLog(
any(), any(), any(ReplicaQuota.class), anyBoolean());
Mockito.verify(replicaManager, atLeast(10)).readFromLog(
Mockito.verify(mockReplicaManager, atLeast(10)).readFromLog(
any(), any(), any(ReplicaQuota.class), anyBoolean());
}
@ -1175,8 +1183,6 @@ public class SharePartitionManagerTest {
Map<TopicIdPartition, Integer> partitionMaxBytes = new HashMap<>();
partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES);
ReplicaManager replicaManager = mock(ReplicaManager.class);
SharePartition sp0 = mock(SharePartition.class);
when(sp0.maybeAcquireFetchLock()).thenReturn(true);
when(sp0.canAcquireRecords()).thenReturn(false);
@ -1185,19 +1191,19 @@ public class SharePartitionManagerTest {
partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
"TestShareFetch", mockTimer, replicaManager.localBrokerId(),
"TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(),
DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
mockReplicaManagerDelayedShareFetch(replicaManager, delayedShareFetchPurgatory);
mockReplicaManagerDelayedShareFetch(mockReplicaManager, delayedShareFetchPurgatory);
SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder()
.withPartitionCacheMap(partitionCacheMap)
.withReplicaManager(replicaManager)
.withReplicaManager(mockReplicaManager)
.withTimer(mockTimer)
.build();
CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future =
sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, partitionMaxBytes);
Mockito.verify(replicaManager, times(0)).readFromLog(
Mockito.verify(mockReplicaManager, times(0)).readFromLog(
any(), any(), any(ReplicaQuota.class), anyBoolean());
Map<TopicIdPartition, ShareFetchResponseData.PartitionData> result = future.join();
assertEquals(0, result.size());
@ -1209,27 +1215,24 @@ public class SharePartitionManagerTest {
Uuid memberId = Uuid.randomUuid();
Uuid fooId = Uuid.randomUuid();
TopicIdPartition tp0 = new TopicIdPartition(fooId, new TopicPartition("foo", 0));
Map<TopicIdPartition, Integer> partitionMaxBytes = new HashMap<>();
partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES);
ReplicaManager replicaManager = mock(ReplicaManager.class);
Map<TopicIdPartition, Integer> partitionMaxBytes = Collections.singletonMap(tp0, PARTITION_MAX_BYTES);
DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
"TestShareFetch", mockTimer, replicaManager.localBrokerId(),
"TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(),
DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
mockReplicaManagerDelayedShareFetch(replicaManager, delayedShareFetchPurgatory);
mockReplicaManagerDelayedShareFetch(mockReplicaManager, delayedShareFetchPurgatory);
SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder()
.withReplicaManager(replicaManager)
.withReplicaManager(mockReplicaManager)
.withTimer(mockTimer)
.build();
doAnswer(invocation -> buildLogReadResult(partitionMaxBytes.keySet())).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
doAnswer(invocation -> buildLogReadResult(partitionMaxBytes.keySet())).when(mockReplicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, partitionMaxBytes);
// Since the nextFetchOffset does not point to endOffset + 1, i.e. some of the records in the cachedState are AVAILABLE,
// even though the maxInFlightMessages limit is exceeded, replicaManager.readFromLog should be called
Mockito.verify(replicaManager, times(1)).readFromLog(
Mockito.verify(mockReplicaManager, times(1)).readFromLog(
any(), any(), any(ReplicaQuota.class), anyBoolean());
}
@ -1643,12 +1646,11 @@ public class SharePartitionManagerTest {
new CompletableFuture<>(),
partitionMaxBytes,
100);
ReplicaManager replicaManager = mock(ReplicaManager.class);
DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
"TestShareFetch", mockTimer, replicaManager.localBrokerId(),
"TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(),
DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
mockReplicaManagerDelayedShareFetch(replicaManager, delayedShareFetchPurgatory);
mockReplicaManagerDelayedShareFetch(mockReplicaManager, delayedShareFetchPurgatory);
// Initially you cannot acquire records for both sp1 and sp2.
when(sp1.maybeAcquireFetchLock()).thenReturn(true);
@ -1661,13 +1663,13 @@ public class SharePartitionManagerTest {
SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder()
.withPartitionCacheMap(partitionCacheMap)
.withReplicaManager(replicaManager)
.withReplicaManager(mockReplicaManager)
.withTimer(mockTimer)
.build();
DelayedShareFetch delayedShareFetch = DelayedShareFetchTest.DelayedShareFetchBuilder.builder()
.withShareFetchData(shareFetchData)
.withReplicaManager(replicaManager)
.withReplicaManager(mockReplicaManager)
.withSharePartitionManager(sharePartitionManager)
.build();
@ -1677,7 +1679,7 @@ public class SharePartitionManagerTest {
// Since acquisition lock for sp1 and sp2 cannot be acquired, we should have 2 watched keys.
assertEquals(2, delayedShareFetchPurgatory.watched());
doAnswer(invocation -> buildLogReadResult(partitionMaxBytes.keySet())).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
doAnswer(invocation -> buildLogReadResult(partitionMaxBytes.keySet())).when(mockReplicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
Map<TopicIdPartition, List<ShareAcknowledgementBatch>> acknowledgeTopics = new HashMap<>();
acknowledgeTopics.put(tp1, Arrays.asList(
@ -1739,12 +1741,11 @@ public class SharePartitionManagerTest {
new CompletableFuture<>(),
partitionMaxBytes,
100);
ReplicaManager replicaManager = mock(ReplicaManager.class);
DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
"TestShareFetch", mockTimer, replicaManager.localBrokerId(),
"TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(),
DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
mockReplicaManagerDelayedShareFetch(replicaManager, delayedShareFetchPurgatory);
mockReplicaManagerDelayedShareFetch(mockReplicaManager, delayedShareFetchPurgatory);
// Initially you cannot acquire records for both all 3 share partitions.
when(sp1.maybeAcquireFetchLock()).thenReturn(true);
@ -1759,13 +1760,13 @@ public class SharePartitionManagerTest {
SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder()
.withPartitionCacheMap(partitionCacheMap)
.withReplicaManager(replicaManager)
.withReplicaManager(mockReplicaManager)
.withTimer(mockTimer)
.build();
DelayedShareFetch delayedShareFetch = DelayedShareFetchTest.DelayedShareFetchBuilder.builder()
.withShareFetchData(shareFetchData)
.withReplicaManager(replicaManager)
.withReplicaManager(mockReplicaManager)
.withSharePartitionManager(sharePartitionManager)
.build();
@ -1832,12 +1833,11 @@ public class SharePartitionManagerTest {
new CompletableFuture<>(),
partitionMaxBytes,
100);
ReplicaManager replicaManager = mock(ReplicaManager.class);
DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
"TestShareFetch", mockTimer, replicaManager.localBrokerId(),
"TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(),
DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
mockReplicaManagerDelayedShareFetch(replicaManager, delayedShareFetchPurgatory);
mockReplicaManagerDelayedShareFetch(mockReplicaManager, delayedShareFetchPurgatory);
// Initially you cannot acquire records for both sp1 and sp2.
when(sp1.maybeAcquireFetchLock()).thenReturn(true);
@ -1851,13 +1851,13 @@ public class SharePartitionManagerTest {
SharePartitionManager sharePartitionManager = spy(SharePartitionManagerBuilder.builder()
.withPartitionCacheMap(partitionCacheMap)
.withCache(cache)
.withReplicaManager(replicaManager)
.withReplicaManager(mockReplicaManager)
.withTimer(mockTimer)
.build());
DelayedShareFetch delayedShareFetch = DelayedShareFetchTest.DelayedShareFetchBuilder.builder()
.withShareFetchData(shareFetchData)
.withReplicaManager(replicaManager)
.withReplicaManager(mockReplicaManager)
.withSharePartitionManager(sharePartitionManager)
.build();
@ -1867,7 +1867,7 @@ public class SharePartitionManagerTest {
// Since acquisition lock for sp1 and sp2 cannot be acquired, we should have 2 watched keys.
assertEquals(2, delayedShareFetchPurgatory.watched());
doAnswer(invocation -> buildLogReadResult(partitionMaxBytes.keySet())).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
doAnswer(invocation -> buildLogReadResult(partitionMaxBytes.keySet())).when(mockReplicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
assertEquals(2, delayedShareFetchPurgatory.watched());
@ -1932,12 +1932,11 @@ public class SharePartitionManagerTest {
new CompletableFuture<>(),
partitionMaxBytes,
100);
ReplicaManager replicaManager = mock(ReplicaManager.class);
DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
"TestShareFetch", mockTimer, replicaManager.localBrokerId(),
"TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(),
DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
mockReplicaManagerDelayedShareFetch(replicaManager, delayedShareFetchPurgatory);
mockReplicaManagerDelayedShareFetch(mockReplicaManager, delayedShareFetchPurgatory);
// Initially you cannot acquire records for both all 3 share partitions.
when(sp1.maybeAcquireFetchLock()).thenReturn(true);
@ -1953,13 +1952,13 @@ public class SharePartitionManagerTest {
SharePartitionManager sharePartitionManager = spy(SharePartitionManagerBuilder.builder()
.withPartitionCacheMap(partitionCacheMap)
.withCache(cache)
.withReplicaManager(replicaManager)
.withReplicaManager(mockReplicaManager)
.withTimer(mockTimer)
.build());
DelayedShareFetch delayedShareFetch = DelayedShareFetchTest.DelayedShareFetchBuilder.builder()
.withShareFetchData(shareFetchData)
.withReplicaManager(replicaManager)
.withReplicaManager(mockReplicaManager)
.withSharePartitionManager(sharePartitionManager)
.build();
@ -2000,16 +1999,13 @@ public class SharePartitionManagerTest {
CompletableFuture<Void> pendingInitializationFuture = new CompletableFuture<>();
when(sp0.maybeInitialize()).thenReturn(pendingInitializationFuture);
// Mock replica manager to verify no calls are made to fetchMessages.
ReplicaManager replicaManager = mock(ReplicaManager.class);
DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
"TestShareFetch", mockTimer, replicaManager.localBrokerId(),
"TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(),
DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
mockReplicaManagerDelayedShareFetch(replicaManager, delayedShareFetchPurgatory);
mockReplicaManagerDelayedShareFetch(mockReplicaManager, delayedShareFetchPurgatory);
SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder()
.withPartitionCacheMap(partitionCacheMap).withReplicaManager(replicaManager).withTimer(mockTimer)
.withPartitionCacheMap(partitionCacheMap).withReplicaManager(mockReplicaManager).withTimer(mockTimer)
.build();
CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future =
@ -2021,7 +2017,7 @@ public class SharePartitionManagerTest {
() -> "Processing in delayed share fetch queue never ended.");
assertTrue(future.join().isEmpty());
// Verify that replica manager fetch is not called.
Mockito.verify(replicaManager, times(0)).readFromLog(
Mockito.verify(mockReplicaManager, times(0)).readFromLog(
any(), any(), any(ReplicaQuota.class), anyBoolean());
// Complete the pending initialization future.
pendingInitializationFuture.complete(null);
@ -2039,14 +2035,13 @@ public class SharePartitionManagerTest {
Map<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
ReplicaManager replicaManager = mock(ReplicaManager.class);
DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
"TestShareFetch", mockTimer, replicaManager.localBrokerId(),
"TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(),
DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
mockReplicaManagerDelayedShareFetch(replicaManager, delayedShareFetchPurgatory);
mockReplicaManagerDelayedShareFetch(mockReplicaManager, delayedShareFetchPurgatory);
SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder()
.withPartitionCacheMap(partitionCacheMap).withReplicaManager(replicaManager).withTimer(mockTimer)
.withPartitionCacheMap(partitionCacheMap).withReplicaManager(mockReplicaManager).withTimer(mockTimer)
.build();
// Return LeaderNotAvailableException to simulate initialization failure.
@ -2061,6 +2056,8 @@ public class SharePartitionManagerTest {
// between SharePartitionManager and SharePartition to retry the request as SharePartition is not yet ready.
assertFalse(future.isCompletedExceptionally());
assertTrue(future.join().isEmpty());
// Verify that the share partition is still in the cache on LeaderNotAvailableException.
assertEquals(1, partitionCacheMap.size());
// Return IllegalStateException to simulate initialization failure.
when(sp0.maybeInitialize()).thenReturn(FutureUtils.failedFuture(new IllegalStateException("Illegal state")));
@ -2069,9 +2066,11 @@ public class SharePartitionManagerTest {
future::isDone,
DELAYED_SHARE_FETCH_TIMEOUT_MS,
() -> "Processing in delayed share fetch queue never ended.");
assertTrue(future.isCompletedExceptionally());
assertFutureThrows(future, IllegalStateException.class);
validateShareFetchFutureException(future, tp0, Errors.UNKNOWN_SERVER_ERROR, "Illegal state");
assertTrue(partitionCacheMap.isEmpty());
// The last exception removes the share partition from the cache hence re-add the share partition to cache.
partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
// Return CoordinatorNotAvailableException to simulate initialization failure.
when(sp0.maybeInitialize()).thenReturn(FutureUtils.failedFuture(new CoordinatorNotAvailableException("Coordinator not available")));
future = sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, partitionMaxBytes);
@ -2079,9 +2078,11 @@ public class SharePartitionManagerTest {
future::isDone,
DELAYED_SHARE_FETCH_TIMEOUT_MS,
() -> "Processing in delayed share fetch queue never ended.");
assertTrue(future.isCompletedExceptionally());
assertFutureThrows(future, CoordinatorNotAvailableException.class);
validateShareFetchFutureException(future, tp0, Errors.COORDINATOR_NOT_AVAILABLE, "Coordinator not available");
assertTrue(partitionCacheMap.isEmpty());
// The last exception removes the share partition from the cache hence re-add the share partition to cache.
partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
// Return InvalidRequestException to simulate initialization failure.
when(sp0.maybeInitialize()).thenReturn(FutureUtils.failedFuture(new InvalidRequestException("Invalid request")));
future = sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, partitionMaxBytes);
@ -2089,21 +2090,19 @@ public class SharePartitionManagerTest {
future::isDone,
DELAYED_SHARE_FETCH_TIMEOUT_MS,
() -> "Processing in delayed share fetch queue never ended.");
assertTrue(future.isCompletedExceptionally());
assertFutureThrows(future, InvalidRequestException.class);
validateShareFetchFutureException(future, tp0, Errors.INVALID_REQUEST, "Invalid request");
assertTrue(partitionCacheMap.isEmpty());
// The last exception removes the share partition from the cache hence re-add the share partition to cache.
partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
// Return FencedStateEpochException to simulate initialization failure.
when(sp0.maybeInitialize()).thenReturn(FutureUtils.failedFuture(new FencedStateEpochException("Fenced state epoch")));
// Assert that partitionCacheMap contains instance before the fetch request.
assertEquals(1, partitionCacheMap.size());
future = sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, partitionMaxBytes);
TestUtils.waitForCondition(
future::isDone,
DELAYED_SHARE_FETCH_TIMEOUT_MS,
() -> "Processing in delayed share fetch queue never ended.");
assertTrue(future.isCompletedExceptionally());
assertFutureThrows(future, FencedStateEpochException.class);
// Verify that the share partition is removed from the cache.
validateShareFetchFutureException(future, tp0, Errors.FENCED_STATE_EPOCH, "Fenced state epoch");
assertTrue(partitionCacheMap.isEmpty());
// The last exception removes the share partition from the cache hence re-add the share partition to cache.
@ -2115,9 +2114,7 @@ public class SharePartitionManagerTest {
future::isDone,
DELAYED_SHARE_FETCH_TIMEOUT_MS,
() -> "Processing in delayed share fetch queue never ended.");
assertTrue(future.isCompletedExceptionally());
assertFutureThrows(future, NotLeaderOrFollowerException.class);
// Verify that the share partition is removed from the cache.
validateShareFetchFutureException(future, tp0, Errors.NOT_LEADER_OR_FOLLOWER, "Not leader or follower");
assertTrue(partitionCacheMap.isEmpty());
// The last exception removes the share partition from the cache hence re-add the share partition to cache.
@ -2129,10 +2126,11 @@ public class SharePartitionManagerTest {
future::isDone,
DELAYED_SHARE_FETCH_TIMEOUT_MS,
() -> "Processing in delayed share fetch queue never ended.");
assertTrue(future.isCompletedExceptionally());
assertFutureThrows(future, RuntimeException.class);
validateShareFetchFutureException(future, tp0, Errors.UNKNOWN_SERVER_ERROR, "Runtime exception");
assertTrue(partitionCacheMap.isEmpty());
}
@Test
@SuppressWarnings("unchecked")
public void testShareFetchProcessingExceptions() throws Exception {
@ -2140,12 +2138,10 @@ public class SharePartitionManagerTest {
TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
Map<TopicIdPartition, Integer> partitionMaxBytes = Collections.singletonMap(tp0, PARTITION_MAX_BYTES);
SharePartition sp0 = mock(SharePartition.class);
Map<SharePartitionKey, SharePartition> partitionCacheMap = (Map<SharePartitionKey, SharePartition>) mock(Map.class);
// Throw the exception for first fetch request. Return share partition for next.
when(partitionCacheMap.computeIfAbsent(any(), any()))
.thenThrow(new RuntimeException("Error creating instance"))
.thenReturn(sp0);
.thenThrow(new RuntimeException("Error creating instance"));
SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder()
.withPartitionCacheMap(partitionCacheMap)
@ -2157,19 +2153,204 @@ public class SharePartitionManagerTest {
future::isDone,
DELAYED_SHARE_FETCH_TIMEOUT_MS,
() -> "Processing for delayed share fetch request not finished.");
assertTrue(future.isCompletedExceptionally());
assertFutureThrows(future, RuntimeException.class, "Error creating instance");
validateShareFetchFutureException(future, tp0, Errors.UNKNOWN_SERVER_ERROR, "Error creating instance");
}
// Throw exception from share partition for second fetch request.
when(sp0.maybeInitialize()).thenThrow(new RuntimeException("Error initializing instance"));
@Test
public void testSharePartitionInitializationFailure() throws Exception {
String groupId = "grp";
TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
Map<TopicIdPartition, Integer> partitionMaxBytes = Collections.singletonMap(tp0, PARTITION_MAX_BYTES);
// Send map to check no share partition is created.
Map<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
// Validate when partition is not the leader.
Partition partition = mock(Partition.class);
when(partition.isLeader()).thenReturn(false);
ReplicaManager replicaManager = mock(ReplicaManager.class);
// First check should throw KafkaStorageException, second check should return partition which
// is not leader.
when(replicaManager.getPartitionOrException(any()))
.thenThrow(new KafkaStorageException("Exception"))
.thenReturn(partition);
SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder()
.withReplicaManager(replicaManager)
.withPartitionCacheMap(partitionCacheMap)
.build();
// Validate when exception is thrown.
CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future =
sharePartitionManager.fetchMessages(groupId, Uuid.randomUuid().toString(), FETCH_PARAMS, partitionMaxBytes);
TestUtils.waitForCondition(
future::isDone,
DELAYED_SHARE_FETCH_TIMEOUT_MS,
() -> "Processing for delayed share fetch request not finished.");
validateShareFetchFutureException(future, tp0, Errors.KAFKA_STORAGE_ERROR, "Exception");
assertTrue(partitionCacheMap.isEmpty());
// Validate when partition is not leader.
future = sharePartitionManager.fetchMessages(groupId, Uuid.randomUuid().toString(), FETCH_PARAMS, partitionMaxBytes);
TestUtils.waitForCondition(
future::isDone,
DELAYED_SHARE_FETCH_TIMEOUT_MS,
() -> "Processing for delayed share fetch request not finished.");
assertTrue(future.isCompletedExceptionally());
assertFutureThrows(future, RuntimeException.class, "Error initializing instance");
validateShareFetchFutureException(future, tp0, Errors.NOT_LEADER_OR_FOLLOWER);
assertTrue(partitionCacheMap.isEmpty());
}
@Test
public void testSharePartitionPartialInitializationFailure() throws Exception {
String groupId = "grp";
Uuid memberId1 = Uuid.randomUuid();
TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
TopicIdPartition tp1 = new TopicIdPartition(memberId1, new TopicPartition("foo", 1));
Map<TopicIdPartition, Integer> partitionMaxBytes = Map.of(tp0, PARTITION_MAX_BYTES, tp1, PARTITION_MAX_BYTES);
// Mark partition1 as not the leader.
Partition partition1 = mock(Partition.class);
when(partition1.isLeader()).thenReturn(false);
ReplicaManager replicaManager = mock(ReplicaManager.class);
when(replicaManager.getPartitionOrException(any()))
.thenReturn(partition1);
SharePartition sp1 = mock(SharePartition.class);
Map<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
when(sp1.maybeAcquireFetchLock()).thenReturn(true);
when(sp1.canAcquireRecords()).thenReturn(true);
when(sp1.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null));
when(sp1.acquire(anyString(), anyInt(), any())).thenReturn(new ShareAcquiredRecords(Collections.emptyList(), 0));
DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
"TestShareFetch", mockTimer, replicaManager.localBrokerId(),
DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
mockReplicaManagerDelayedShareFetch(replicaManager, delayedShareFetchPurgatory);
doAnswer(invocation -> buildLogReadResult(partitionMaxBytes.keySet())).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder()
.withReplicaManager(replicaManager)
.withPartitionCacheMap(partitionCacheMap)
.build();
// Validate when exception is thrown.
CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future =
sharePartitionManager.fetchMessages(groupId, Uuid.randomUuid().toString(), FETCH_PARAMS, partitionMaxBytes);
assertTrue(future.isDone());
assertFalse(future.isCompletedExceptionally());
Map<TopicIdPartition, PartitionData> partitionDataMap = future.get();
// For now only 1 successful partition is included, this will be fixed in subsequents PRs.
assertEquals(1, partitionDataMap.size());
assertTrue(partitionDataMap.containsKey(tp1));
assertEquals(Errors.NONE.code(), partitionDataMap.get(tp1).errorCode());
Mockito.verify(replicaManager, times(1)).readFromLog(
any(), any(), any(ReplicaQuota.class), anyBoolean());
}
@Test
public void testReplicaManagerFetchException() {
String groupId = "grp";
Uuid memberId = Uuid.randomUuid();
TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
Map<TopicIdPartition, Integer> partitionMaxBytes = Collections.singletonMap(tp0, PARTITION_MAX_BYTES);
SharePartition sp0 = mock(SharePartition.class);
when(sp0.maybeAcquireFetchLock()).thenReturn(true);
when(sp0.canAcquireRecords()).thenReturn(true);
when(sp0.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null));
Map<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
"TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(),
DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
mockReplicaManagerDelayedShareFetch(mockReplicaManager, delayedShareFetchPurgatory);
doThrow(new RuntimeException("Exception")).when(mockReplicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder()
.withPartitionCacheMap(partitionCacheMap)
.withReplicaManager(mockReplicaManager)
.withTimer(mockTimer)
.build();
CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future =
sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, partitionMaxBytes);
validateShareFetchFutureException(future, tp0, Errors.UNKNOWN_SERVER_ERROR, "Exception");
// Verify that the share partition is still in the cache on exception.
assertEquals(1, partitionCacheMap.size());
// Throw NotLeaderOrFollowerException from replica manager fetch which should evict instance from the cache.
doThrow(new NotLeaderOrFollowerException("Leader exception")).when(mockReplicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
future = sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, partitionMaxBytes);
validateShareFetchFutureException(future, tp0, Errors.NOT_LEADER_OR_FOLLOWER, "Leader exception");
assertTrue(partitionCacheMap.isEmpty());
}
@Test
public void testReplicaManagerFetchMultipleSharePartitionsException() {
String groupId = "grp";
Uuid memberId = Uuid.randomUuid();
TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("bar", 0));
Map<TopicIdPartition, Integer> partitionMaxBytes = new HashMap<>();
partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES);
partitionMaxBytes.put(tp1, PARTITION_MAX_BYTES);
SharePartition sp0 = mock(SharePartition.class);
when(sp0.maybeAcquireFetchLock()).thenReturn(true);
when(sp0.canAcquireRecords()).thenReturn(true);
when(sp0.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null));
SharePartition sp1 = mock(SharePartition.class);
// Do not make the share partition acquirable hence it shouldn't be removed from the cache,
// as it won't be part of replica manger readFromLog request.
when(sp1.maybeAcquireFetchLock()).thenReturn(false);
when(sp1.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null));
Map<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
"TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(),
DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
mockReplicaManagerDelayedShareFetch(mockReplicaManager, delayedShareFetchPurgatory);
// Throw FencedStateEpochException from replica manager fetch which should evict instance from the cache.
doThrow(new FencedStateEpochException("Fenced exception")).when(mockReplicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder()
.withPartitionCacheMap(partitionCacheMap)
.withReplicaManager(mockReplicaManager)
.withTimer(mockTimer)
.build();
CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future =
sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, partitionMaxBytes);
validateShareFetchFutureException(future, tp0, Errors.FENCED_STATE_EPOCH, "Fenced exception");
// Verify that tp1 is still in the cache on exception.
assertEquals(1, partitionCacheMap.size());
assertEquals(sp1, partitionCacheMap.get(new SharePartitionKey(groupId, tp1)));
// Make sp1 acquirable and add sp0 back in partition cache. Both share partitions should be
// removed from the cache.
when(sp1.maybeAcquireFetchLock()).thenReturn(true);
when(sp1.canAcquireRecords()).thenReturn(true);
partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
// Throw FencedStateEpochException from replica manager fetch which should evict instance from the cache.
doThrow(new FencedStateEpochException("Fenced exception again")).when(mockReplicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
future = sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, partitionMaxBytes);
validateShareFetchFutureException(future, List.of(tp0, tp1), Errors.FENCED_STATE_EPOCH, "Fenced exception again");
assertTrue(partitionCacheMap.isEmpty());
}
private ShareFetchResponseData.PartitionData noErrorShareFetchResponse() {
@ -2220,7 +2401,7 @@ public class SharePartitionManagerTest {
private void assertErroneousAndValidTopicIdPartitions(
ErroneousAndValidPartitionData erroneousAndValidPartitionData,
List<TopicIdPartition> expectedErroneous, List<TopicIdPartition> expectedValid) {
List<TopicIdPartition> expectedErroneous, List<TopicIdPartition> expectedValid) {
Set<TopicIdPartition> expectedErroneousSet = new HashSet<>(expectedErroneous);
Set<TopicIdPartition> expectedValidSet = new HashSet<>(expectedValid);
Set<TopicIdPartition> actualErroneousPartitions = new HashSet<>();
@ -2233,6 +2414,37 @@ public class SharePartitionManagerTest {
assertEquals(expectedValidSet, actualValidPartitions);
}
private Partition mockPartition() {
Partition partition = mock(Partition.class);
when(partition.isLeader()).thenReturn(true);
when(partition.getLeaderEpoch()).thenReturn(1);
return partition;
}
private void validateShareFetchFutureException(CompletableFuture<Map<TopicIdPartition, PartitionData>> future,
TopicIdPartition topicIdPartition, Errors error) {
validateShareFetchFutureException(future, Collections.singletonList(topicIdPartition), error, null);
}
private void validateShareFetchFutureException(CompletableFuture<Map<TopicIdPartition, PartitionData>> future,
TopicIdPartition topicIdPartition, Errors error, String message) {
validateShareFetchFutureException(future, Collections.singletonList(topicIdPartition), error, message);
}
private void validateShareFetchFutureException(CompletableFuture<Map<TopicIdPartition, PartitionData>> future,
List<TopicIdPartition> topicIdPartitions, Errors error, String message) {
assertFalse(future.isCompletedExceptionally());
Map<TopicIdPartition, ShareFetchResponseData.PartitionData> result = future.join();
assertEquals(topicIdPartitions.size(), result.size());
topicIdPartitions.forEach(topicIdPartition -> {
assertTrue(result.containsKey(topicIdPartition));
assertEquals(topicIdPartition.partition(), result.get(topicIdPartition).partitionIndex());
assertEquals(error.code(), result.get(topicIdPartition).errorCode());
assertEquals(message, result.get(topicIdPartition).errorMessage());
});
}
static Seq<Tuple2<TopicIdPartition, LogReadResult>> buildLogReadResult(Set<TopicIdPartition> topicIdPartitions) {
List<Tuple2<TopicIdPartition, LogReadResult>> logReadResults = new ArrayList<>();
topicIdPartitions.forEach(topicIdPartition -> logReadResults.add(new Tuple2<>(topicIdPartition, new LogReadResult(

File diff suppressed because it is too large Load Diff

View File

@ -33,6 +33,7 @@ public class FetchParams {
public final int maxBytes;
public final FetchIsolation isolation;
public final Optional<ClientMetadata> clientMetadata;
public final boolean shareFetchRequest;
public FetchParams(short requestVersion,
int replicaId,
@ -42,6 +43,18 @@ public class FetchParams {
int maxBytes,
FetchIsolation isolation,
Optional<ClientMetadata> clientMetadata) {
this(requestVersion, replicaId, replicaEpoch, maxWaitMs, minBytes, maxBytes, isolation, clientMetadata, false);
}
public FetchParams(short requestVersion,
int replicaId,
long replicaEpoch,
long maxWaitMs,
int minBytes,
int maxBytes,
FetchIsolation isolation,
Optional<ClientMetadata> clientMetadata,
boolean shareFetchRequest) {
Objects.requireNonNull(isolation);
Objects.requireNonNull(clientMetadata);
this.requestVersion = requestVersion;
@ -52,6 +65,7 @@ public class FetchParams {
this.maxBytes = maxBytes;
this.isolation = isolation;
this.clientMetadata = clientMetadata;
this.shareFetchRequest = shareFetchRequest;
}
public boolean isFromFollower() {
@ -67,7 +81,7 @@ public class FetchParams {
}
public boolean fetchOnlyLeader() {
return isFromFollower() || (isFromConsumer() && !clientMetadata.isPresent());
return isFromFollower() || (isFromConsumer() && !clientMetadata.isPresent()) || shareFetchRequest;
}
public boolean hardMaxBytesLimit() {
@ -86,7 +100,8 @@ public class FetchParams {
&& minBytes == that.minBytes
&& maxBytes == that.maxBytes
&& isolation.equals(that.isolation)
&& clientMetadata.equals(that.clientMetadata);
&& clientMetadata.equals(that.clientMetadata)
&& shareFetchRequest == that.shareFetchRequest;
}
@Override
@ -99,6 +114,7 @@ public class FetchParams {
result = 31 * result + maxBytes;
result = 31 * result + isolation.hashCode();
result = 31 * result + clientMetadata.hashCode();
result = 31 * result + Boolean.hashCode(shareFetchRequest);
return result;
}
@ -113,6 +129,7 @@ public class FetchParams {
", maxBytes=" + maxBytes +
", isolation=" + isolation +
", clientMetadata=" + clientMetadata +
", shareFetchRequest=" + shareFetchRequest +
')';
}
}