KAFKA-19291: Increase the timeout of remote storage share fetch requests in purgatory (#19757)
CI / build (push) Waiting to run Details

### About
Consumer groups have a different timeout `REMOTE_FETCH_MAX_WAIT_MS_PROP`
in delayed fetch purgatory for fetch requests having remote storage
fetch ([code

link](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L1669)).
This is done before the request enters the purgatory, so its easy to
change. At the moment share groups can only have a `waitTimeMs` `of
shareFetch.fetchParams().maxWaitMs` (default value `500ms`) for delayed
share fetch purgatory regardless of whether they are remote
storage/local log fetch.
This PR introduces a way to increase the timeout of remote storage fetch
requests if a remote storage fetch request couldn't complete within
`shareFetch.fetchParams().maxWaitMs`, then we create a timer task which
can be interrupted whenever `pendingFetches` is finished. The change has
been done to avoid the expiration of remote storage share fetch
requests.

### Testing
The code has been tested with the help of unit tests and
`LocalTieredStorage.java`

Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>
This commit is contained in:
Abhinav Dixit 2025-05-22 12:11:33 +05:30 committed by GitHub
parent e88c10d595
commit 239dce3e04
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 274 additions and 7 deletions

View File

@ -40,6 +40,7 @@ import org.apache.kafka.server.share.fetch.ShareFetchPartitionData;
import org.apache.kafka.server.share.metrics.ShareGroupMetrics; import org.apache.kafka.server.share.metrics.ShareGroupMetrics;
import org.apache.kafka.server.storage.log.FetchIsolation; import org.apache.kafka.server.storage.log.FetchIsolation;
import org.apache.kafka.server.storage.log.FetchPartitionData; import org.apache.kafka.server.storage.log.FetchPartitionData;
import org.apache.kafka.server.util.timer.TimerTask;
import org.apache.kafka.storage.internals.log.FetchDataInfo; import org.apache.kafka.storage.internals.log.FetchDataInfo;
import org.apache.kafka.storage.internals.log.LogOffsetMetadata; import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
import org.apache.kafka.storage.internals.log.LogOffsetSnapshot; import org.apache.kafka.storage.internals.log.LogOffsetSnapshot;
@ -64,6 +65,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -107,6 +109,8 @@ public class DelayedShareFetch extends DelayedOperation {
private LinkedHashMap<TopicIdPartition, LogReadResult> localPartitionsAlreadyFetched; private LinkedHashMap<TopicIdPartition, LogReadResult> localPartitionsAlreadyFetched;
private Optional<PendingRemoteFetches> pendingRemoteFetchesOpt; private Optional<PendingRemoteFetches> pendingRemoteFetchesOpt;
private Optional<Exception> remoteStorageFetchException; private Optional<Exception> remoteStorageFetchException;
private final AtomicBoolean outsidePurgatoryCallbackLock;
private final long remoteFetchMaxWaitMs;
/** /**
* This function constructs an instance of delayed share fetch operation for completing share fetch * This function constructs an instance of delayed share fetch operation for completing share fetch
@ -118,6 +122,7 @@ public class DelayedShareFetch extends DelayedOperation {
* @param sharePartitions The share partitions referenced in the share fetch request. * @param sharePartitions The share partitions referenced in the share fetch request.
* @param shareGroupMetrics The share group metrics to record the metrics. * @param shareGroupMetrics The share group metrics to record the metrics.
* @param time The system time. * @param time The system time.
* @param remoteFetchMaxWaitMs The max wait time for a share fetch request having remote storage fetch.
*/ */
public DelayedShareFetch( public DelayedShareFetch(
ShareFetch shareFetch, ShareFetch shareFetch,
@ -125,7 +130,8 @@ public class DelayedShareFetch extends DelayedOperation {
BiConsumer<SharePartitionKey, Throwable> exceptionHandler, BiConsumer<SharePartitionKey, Throwable> exceptionHandler,
LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions, LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions,
ShareGroupMetrics shareGroupMetrics, ShareGroupMetrics shareGroupMetrics,
Time time Time time,
long remoteFetchMaxWaitMs
) { ) {
this(shareFetch, this(shareFetch,
replicaManager, replicaManager,
@ -135,7 +141,8 @@ public class DelayedShareFetch extends DelayedOperation {
shareGroupMetrics, shareGroupMetrics,
time, time,
Optional.empty(), Optional.empty(),
Uuid.randomUuid() Uuid.randomUuid(),
remoteFetchMaxWaitMs
); );
} }
@ -151,6 +158,7 @@ public class DelayedShareFetch extends DelayedOperation {
* @param shareGroupMetrics The share group metrics to record the metrics. * @param shareGroupMetrics The share group metrics to record the metrics.
* @param time The system time. * @param time The system time.
* @param pendingRemoteFetchesOpt Optional containing an in-flight remote fetch object or an empty optional. * @param pendingRemoteFetchesOpt Optional containing an in-flight remote fetch object or an empty optional.
* @param remoteFetchMaxWaitMs The max wait time for a share fetch request having remote storage fetch.
*/ */
DelayedShareFetch( DelayedShareFetch(
ShareFetch shareFetch, ShareFetch shareFetch,
@ -161,7 +169,8 @@ public class DelayedShareFetch extends DelayedOperation {
ShareGroupMetrics shareGroupMetrics, ShareGroupMetrics shareGroupMetrics,
Time time, Time time,
Optional<PendingRemoteFetches> pendingRemoteFetchesOpt, Optional<PendingRemoteFetches> pendingRemoteFetchesOpt,
Uuid fetchId Uuid fetchId,
long remoteFetchMaxWaitMs
) { ) {
super(shareFetch.fetchParams().maxWaitMs, Optional.empty()); super(shareFetch.fetchParams().maxWaitMs, Optional.empty());
this.shareFetch = shareFetch; this.shareFetch = shareFetch;
@ -177,6 +186,8 @@ public class DelayedShareFetch extends DelayedOperation {
this.pendingRemoteFetchesOpt = pendingRemoteFetchesOpt; this.pendingRemoteFetchesOpt = pendingRemoteFetchesOpt;
this.remoteStorageFetchException = Optional.empty(); this.remoteStorageFetchException = Optional.empty();
this.fetchId = fetchId; this.fetchId = fetchId;
this.outsidePurgatoryCallbackLock = new AtomicBoolean(false);
this.remoteFetchMaxWaitMs = remoteFetchMaxWaitMs;
// Register metrics for DelayedShareFetch. // Register metrics for DelayedShareFetch.
KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup("kafka.server", "DelayedShareFetchMetrics"); KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup("kafka.server", "DelayedShareFetchMetrics");
this.expiredRequestMeter = metricsGroup.newMeter(EXPIRES_PER_SEC, "requests", TimeUnit.SECONDS); this.expiredRequestMeter = metricsGroup.newMeter(EXPIRES_PER_SEC, "requests", TimeUnit.SECONDS);
@ -205,6 +216,12 @@ public class DelayedShareFetch extends DelayedOperation {
if (remoteStorageFetchException.isPresent()) { if (remoteStorageFetchException.isPresent()) {
completeErroneousRemoteShareFetchRequest(); completeErroneousRemoteShareFetchRequest();
} else if (pendingRemoteFetchesOpt.isPresent()) { } else if (pendingRemoteFetchesOpt.isPresent()) {
if (maybeRegisterCallbackPendingRemoteFetch()) {
log.trace("Registered remote storage fetch callback for group {}, member {}, "
+ "topic partitions {}", shareFetch.groupId(), shareFetch.memberId(),
partitionsAcquired.keySet());
return;
}
completeRemoteStorageShareFetchRequest(); completeRemoteStorageShareFetchRequest();
} else { } else {
completeLocalLogShareFetchRequest(); completeLocalLogShareFetchRequest();
@ -626,6 +643,16 @@ public class DelayedShareFetch extends DelayedOperation {
return pendingRemoteFetchesOpt.orElse(null); return pendingRemoteFetchesOpt.orElse(null);
} }
// Visible for testing.
boolean outsidePurgatoryCallbackLock() {
return outsidePurgatoryCallbackLock.get();
}
// Only used for testing purpose.
void updatePartitionsAcquired(LinkedHashMap<TopicIdPartition, Long> partitionsAcquired) {
this.partitionsAcquired = partitionsAcquired;
}
// Visible for testing. // Visible for testing.
Meter expiredRequestMeter() { Meter expiredRequestMeter() {
return expiredRequestMeter; return expiredRequestMeter;
@ -666,6 +693,28 @@ public class DelayedShareFetch extends DelayedOperation {
return maybeCompletePendingRemoteFetch(); return maybeCompletePendingRemoteFetch();
} }
private boolean maybeRegisterCallbackPendingRemoteFetch() {
log.trace("Registering callback pending remote fetch");
PendingRemoteFetches pendingFetch = pendingRemoteFetchesOpt.get();
if (!pendingFetch.isDone() && shareFetch.fetchParams().maxWaitMs < remoteFetchMaxWaitMs) {
TimerTask timerTask = new PendingRemoteFetchTimerTask();
pendingFetch.invokeCallbackOnCompletion(((ignored, throwable) -> {
timerTask.cancel();
log.trace("Invoked remote storage fetch callback for group {}, member {}, "
+ "topic partitions {}", shareFetch.groupId(), shareFetch.memberId(),
partitionsAcquired.keySet());
if (throwable != null) {
log.error("Remote storage fetch failed for group {}, member {}, topic partitions {}",
shareFetch.groupId(), shareFetch.memberId(), sharePartitions.keySet(), throwable);
}
completeRemoteShareFetchRequestOutsidePurgatory();
}));
replicaManager.addShareFetchTimerRequest(timerTask);
return true;
}
return false;
}
/** /**
* Throws an exception if a task for remote storage fetch could not be scheduled successfully else updates pendingRemoteFetchesOpt. * Throws an exception if a task for remote storage fetch could not be scheduled successfully else updates pendingRemoteFetchesOpt.
* @param remoteStorageFetchInfoMap - The remote storage fetch information. * @param remoteStorageFetchInfoMap - The remote storage fetch information.
@ -904,4 +953,26 @@ public class DelayedShareFetch extends DelayedOperation {
} }
return completedByMe; return completedByMe;
} }
private void completeRemoteShareFetchRequestOutsidePurgatory() {
if (outsidePurgatoryCallbackLock.compareAndSet(false, true)) {
completeRemoteStorageShareFetchRequest();
}
}
private class PendingRemoteFetchTimerTask extends TimerTask {
public PendingRemoteFetchTimerTask() {
super(remoteFetchMaxWaitMs - shareFetch.fetchParams().maxWaitMs);
}
@Override
public void run() {
log.trace("Expired remote storage fetch callback for group {}, member {}, "
+ "topic partitions {}", shareFetch.groupId(), shareFetch.memberId(),
partitionsAcquired.keySet());
expiredRequestMeter.mark();
completeRemoteShareFetchRequestOutsidePurgatory();
}
}
} }

View File

@ -23,10 +23,12 @@ import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
import org.apache.kafka.storage.internals.log.RemoteLogReadResult; import org.apache.kafka.storage.internals.log.RemoteLogReadResult;
import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo; import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo;
import java.util.ArrayList;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.function.BiConsumer;
/** /**
* This class is used to store the remote storage fetch information for topic partitions in a share fetch request. * This class is used to store the remote storage fetch information for topic partitions in a share fetch request.
@ -48,6 +50,12 @@ public class PendingRemoteFetches {
return true; return true;
} }
public void invokeCallbackOnCompletion(BiConsumer<Void, Throwable> callback) {
List<CompletableFuture<RemoteLogReadResult>> remoteFetchResult = new ArrayList<>();
remoteFetches.forEach(remoteFetch -> remoteFetchResult.add(remoteFetch.remoteFetchResult()));
CompletableFuture.allOf(remoteFetchResult.toArray(new CompletableFuture<?>[0])).whenComplete(callback);
}
public List<RemoteFetch> remoteFetches() { public List<RemoteFetch> remoteFetches() {
return remoteFetches; return remoteFetches;
} }

View File

@ -130,6 +130,10 @@ public class SharePartitionManager implements AutoCloseable {
* The max delivery count is the maximum number of times a message can be delivered before it is considered to be archived. * The max delivery count is the maximum number of times a message can be delivered before it is considered to be archived.
*/ */
private final int maxDeliveryCount; private final int maxDeliveryCount;
/**
* The max wait time for a share fetch request having remote storage fetch.
*/
private final long remoteFetchMaxWaitMs;
/** /**
* The persister is used to persist the share partition state. * The persister is used to persist the share partition state.
@ -153,6 +157,7 @@ public class SharePartitionManager implements AutoCloseable {
int defaultRecordLockDurationMs, int defaultRecordLockDurationMs,
int maxDeliveryCount, int maxDeliveryCount,
int maxInFlightMessages, int maxInFlightMessages,
long remoteFetchMaxWaitMs,
Persister persister, Persister persister,
GroupConfigManager groupConfigManager, GroupConfigManager groupConfigManager,
BrokerTopicStats brokerTopicStats BrokerTopicStats brokerTopicStats
@ -164,6 +169,7 @@ public class SharePartitionManager implements AutoCloseable {
defaultRecordLockDurationMs, defaultRecordLockDurationMs,
maxDeliveryCount, maxDeliveryCount,
maxInFlightMessages, maxInFlightMessages,
remoteFetchMaxWaitMs,
persister, persister,
groupConfigManager, groupConfigManager,
new ShareGroupMetrics(time), new ShareGroupMetrics(time),
@ -179,6 +185,7 @@ public class SharePartitionManager implements AutoCloseable {
int defaultRecordLockDurationMs, int defaultRecordLockDurationMs,
int maxDeliveryCount, int maxDeliveryCount,
int maxInFlightMessages, int maxInFlightMessages,
long remoteFetchMaxWaitMs,
Persister persister, Persister persister,
GroupConfigManager groupConfigManager, GroupConfigManager groupConfigManager,
ShareGroupMetrics shareGroupMetrics, ShareGroupMetrics shareGroupMetrics,
@ -193,6 +200,7 @@ public class SharePartitionManager implements AutoCloseable {
new SystemTimer("share-group-lock-timeout")), new SystemTimer("share-group-lock-timeout")),
maxDeliveryCount, maxDeliveryCount,
maxInFlightMessages, maxInFlightMessages,
remoteFetchMaxWaitMs,
persister, persister,
groupConfigManager, groupConfigManager,
shareGroupMetrics, shareGroupMetrics,
@ -210,6 +218,7 @@ public class SharePartitionManager implements AutoCloseable {
Timer timer, Timer timer,
int maxDeliveryCount, int maxDeliveryCount,
int maxInFlightMessages, int maxInFlightMessages,
long remoteFetchMaxWaitMs,
Persister persister, Persister persister,
GroupConfigManager groupConfigManager, GroupConfigManager groupConfigManager,
ShareGroupMetrics shareGroupMetrics, ShareGroupMetrics shareGroupMetrics,
@ -223,6 +232,7 @@ public class SharePartitionManager implements AutoCloseable {
this.timer = timer; this.timer = timer;
this.maxDeliveryCount = maxDeliveryCount; this.maxDeliveryCount = maxDeliveryCount;
this.maxInFlightMessages = maxInFlightMessages; this.maxInFlightMessages = maxInFlightMessages;
this.remoteFetchMaxWaitMs = remoteFetchMaxWaitMs;
this.persister = persister; this.persister = persister;
this.groupConfigManager = groupConfigManager; this.groupConfigManager = groupConfigManager;
this.shareGroupMetrics = shareGroupMetrics; this.shareGroupMetrics = shareGroupMetrics;
@ -683,7 +693,7 @@ public class SharePartitionManager implements AutoCloseable {
// Add the share fetch to the delayed share fetch purgatory to process the fetch request. // Add the share fetch to the delayed share fetch purgatory to process the fetch request.
// The request will be added irrespective of whether the share partition is initialized or not. // The request will be added irrespective of whether the share partition is initialized or not.
// Once the share partition is initialized, the delayed share fetch will be completed. // Once the share partition is initialized, the delayed share fetch will be completed.
addDelayedShareFetch(new DelayedShareFetch(shareFetch, replicaManager, fencedSharePartitionHandler(), sharePartitions, shareGroupMetrics, time), delayedShareFetchWatchKeys); addDelayedShareFetch(new DelayedShareFetch(shareFetch, replicaManager, fencedSharePartitionHandler(), sharePartitions, shareGroupMetrics, time, remoteFetchMaxWaitMs), delayedShareFetchWatchKeys);
} }
private SharePartition getOrCreateSharePartition(SharePartitionKey sharePartitionKey) { private SharePartition getOrCreateSharePartition(SharePartitionKey sharePartitionKey) {

View File

@ -441,6 +441,7 @@ class BrokerServer(
config.shareGroupConfig.shareGroupRecordLockDurationMs, config.shareGroupConfig.shareGroupRecordLockDurationMs,
config.shareGroupConfig.shareGroupDeliveryCountLimit, config.shareGroupConfig.shareGroupDeliveryCountLimit,
config.shareGroupConfig.shareGroupPartitionMaxRecordLocks, config.shareGroupConfig.shareGroupPartitionMaxRecordLocks,
config.remoteLogManagerConfig.remoteFetchMaxWaitMs().toLong,
persister, persister,
groupConfigManager, groupConfigManager,
brokerTopicStats brokerTopicStats

View File

@ -48,6 +48,7 @@ import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.server.util.timer.SystemTimer; import org.apache.kafka.server.util.timer.SystemTimer;
import org.apache.kafka.server.util.timer.SystemTimerReaper; import org.apache.kafka.server.util.timer.SystemTimerReaper;
import org.apache.kafka.server.util.timer.Timer; import org.apache.kafka.server.util.timer.Timer;
import org.apache.kafka.server.util.timer.TimerTask;
import org.apache.kafka.storage.internals.log.FetchDataInfo; import org.apache.kafka.storage.internals.log.FetchDataInfo;
import org.apache.kafka.storage.internals.log.LogOffsetMetadata; import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
import org.apache.kafka.storage.internals.log.LogOffsetSnapshot; import org.apache.kafka.storage.internals.log.LogOffsetSnapshot;
@ -81,6 +82,7 @@ import scala.jdk.javaapi.CollectionConverters;
import static kafka.server.share.PendingRemoteFetches.RemoteFetch; import static kafka.server.share.PendingRemoteFetches.RemoteFetch;
import static kafka.server.share.SharePartitionManagerTest.DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL; import static kafka.server.share.SharePartitionManagerTest.DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL;
import static kafka.server.share.SharePartitionManagerTest.REMOTE_FETCH_MAX_WAIT_MS;
import static kafka.server.share.SharePartitionManagerTest.buildLogReadResult; import static kafka.server.share.SharePartitionManagerTest.buildLogReadResult;
import static kafka.server.share.SharePartitionManagerTest.mockReplicaManagerDelayedShareFetch; import static kafka.server.share.SharePartitionManagerTest.mockReplicaManagerDelayedShareFetch;
import static org.apache.kafka.server.share.fetch.ShareFetchTestUtils.createShareAcquiredRecords; import static org.apache.kafka.server.share.fetch.ShareFetchTestUtils.createShareAcquiredRecords;
@ -1427,6 +1429,13 @@ public class DelayedShareFetchTest {
when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager)); when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager));
when(replicaManager.getPartitionOrException(tp2.topicPartition())).thenThrow(mock(KafkaStorageException.class)); when(replicaManager.getPartitionOrException(tp2.topicPartition())).thenThrow(mock(KafkaStorageException.class));
// Mock the behaviour of replica manager such that remote storage fetch completion timer task completes on adding it to the watch queue.
doAnswer(invocationOnMock -> {
TimerTask timerTask = invocationOnMock.getArgument(0);
timerTask.run();
return null;
}).when(replicaManager).addShareFetchTimerRequest(any());
Uuid fetchId = Uuid.randomUuid(); Uuid fetchId = Uuid.randomUuid();
DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder()
.withShareFetchData(shareFetch) .withShareFetchData(shareFetch)
@ -1777,6 +1786,165 @@ public class DelayedShareFetchTest {
delayedShareFetch.lock().unlock(); delayedShareFetch.lock().unlock();
} }
@Test
public void testRemoteStorageFetchCompletionPostRegisteringCallbackByPendingFetchesCompletion() {
ReplicaManager replicaManager = mock(ReplicaManager.class);
TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
SharePartition sp0 = mock(SharePartition.class);
when(sp0.canAcquireRecords()).thenReturn(true);
when(sp0.nextFetchOffset()).thenReturn(10L);
LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<>();
sharePartitions.put(tp0, sp0);
CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>();
ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(),
future, List.of(tp0), BATCH_SIZE, MAX_FETCH_RECORDS,
BROKER_TOPIC_STATS);
PendingRemoteFetches pendingRemoteFetches = mock(PendingRemoteFetches.class);
Uuid fetchId = Uuid.randomUuid();
DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder()
.withShareFetchData(shareFetch)
.withReplicaManager(replicaManager)
.withSharePartitions(sharePartitions)
.withPartitionMaxBytesStrategy(PartitionMaxBytesStrategy.type(PartitionMaxBytesStrategy.StrategyType.UNIFORM))
.withPendingRemoteFetches(pendingRemoteFetches)
.withFetchId(fetchId)
.build());
LinkedHashMap<TopicIdPartition, Long> partitionsAcquired = new LinkedHashMap<>();
partitionsAcquired.put(tp0, 10L);
// Manually update acquired partitions.
delayedShareFetch.updatePartitionsAcquired(partitionsAcquired);
// Mock remote fetch result.
RemoteFetch remoteFetch = mock(RemoteFetch.class);
when(remoteFetch.topicIdPartition()).thenReturn(tp0);
when(remoteFetch.remoteFetchResult()).thenReturn(CompletableFuture.completedFuture(
new RemoteLogReadResult(Optional.of(REMOTE_FETCH_INFO), Optional.empty()))
);
when(remoteFetch.logReadResult()).thenReturn(new LogReadResult(
REMOTE_FETCH_INFO,
Option.empty(),
-1L,
-1L,
-1L,
-1L,
-1L,
Option.empty(),
Option.empty(),
Option.empty()
));
when(pendingRemoteFetches.remoteFetches()).thenReturn(List.of(remoteFetch));
when(pendingRemoteFetches.isDone()).thenReturn(false);
// Make sure that the callback is called to complete remote storage share fetch result.
doAnswer(invocationOnMock -> {
BiConsumer<Void, Throwable> callback = invocationOnMock.getArgument(0);
callback.accept(mock(Void.class), null);
return null;
}).when(pendingRemoteFetches).invokeCallbackOnCompletion(any());
when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class), any())).thenReturn(
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
assertFalse(delayedShareFetch.isCompleted());
delayedShareFetch.forceComplete();
assertTrue(delayedShareFetch.isCompleted());
// the future of shareFetch completes.
assertTrue(shareFetch.isCompleted());
assertEquals(Set.of(tp0), future.join().keySet());
// Verify the locks are released for tp0.
Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp0));
assertTrue(delayedShareFetch.outsidePurgatoryCallbackLock());
assertTrue(delayedShareFetch.lock().tryLock());
delayedShareFetch.lock().unlock();
}
@Test
public void testRemoteStorageFetchCompletionPostRegisteringCallbackByTimerTaskCompletion() {
ReplicaManager replicaManager = mock(ReplicaManager.class);
TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
SharePartition sp0 = mock(SharePartition.class);
when(sp0.canAcquireRecords()).thenReturn(true);
when(sp0.nextFetchOffset()).thenReturn(10L);
LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<>();
sharePartitions.put(tp0, sp0);
CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>();
ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(),
future, List.of(tp0), BATCH_SIZE, MAX_FETCH_RECORDS,
BROKER_TOPIC_STATS);
PendingRemoteFetches pendingRemoteFetches = mock(PendingRemoteFetches.class);
Uuid fetchId = Uuid.randomUuid();
DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder()
.withShareFetchData(shareFetch)
.withReplicaManager(replicaManager)
.withSharePartitions(sharePartitions)
.withPartitionMaxBytesStrategy(PartitionMaxBytesStrategy.type(PartitionMaxBytesStrategy.StrategyType.UNIFORM))
.withPendingRemoteFetches(pendingRemoteFetches)
.withFetchId(fetchId)
.build());
LinkedHashMap<TopicIdPartition, Long> partitionsAcquired = new LinkedHashMap<>();
partitionsAcquired.put(tp0, 10L);
// Manually update acquired partitions.
delayedShareFetch.updatePartitionsAcquired(partitionsAcquired);
// Mock remote fetch result.
RemoteFetch remoteFetch = mock(RemoteFetch.class);
when(remoteFetch.topicIdPartition()).thenReturn(tp0);
when(remoteFetch.remoteFetchResult()).thenReturn(CompletableFuture.completedFuture(
new RemoteLogReadResult(Optional.of(REMOTE_FETCH_INFO), Optional.empty()))
);
when(remoteFetch.logReadResult()).thenReturn(new LogReadResult(
REMOTE_FETCH_INFO,
Option.empty(),
-1L,
-1L,
-1L,
-1L,
-1L,
Option.empty(),
Option.empty(),
Option.empty()
));
when(pendingRemoteFetches.remoteFetches()).thenReturn(List.of(remoteFetch));
when(pendingRemoteFetches.isDone()).thenReturn(false);
// Make sure that the callback to complete remote storage share fetch result is not called.
doAnswer(invocationOnMock -> null).when(pendingRemoteFetches).invokeCallbackOnCompletion(any());
// Mock the behaviour of replica manager such that remote storage fetch completion timer task completes on adding it to the watch queue.
doAnswer(invocationOnMock -> {
TimerTask timerTask = invocationOnMock.getArgument(0);
timerTask.run();
return null;
}).when(replicaManager).addShareFetchTimerRequest(any());
when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class), any())).thenReturn(
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
assertFalse(delayedShareFetch.isCompleted());
delayedShareFetch.forceComplete();
assertTrue(delayedShareFetch.isCompleted());
// the future of shareFetch completes.
assertTrue(shareFetch.isCompleted());
assertEquals(Set.of(tp0), future.join().keySet());
// Verify the locks are released for tp0.
Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp0));
assertTrue(delayedShareFetch.outsidePurgatoryCallbackLock());
assertTrue(delayedShareFetch.lock().tryLock());
delayedShareFetch.lock().unlock();
}
static void mockTopicIdPartitionToReturnDataEqualToMinBytes(ReplicaManager replicaManager, TopicIdPartition topicIdPartition, int minBytes) { static void mockTopicIdPartitionToReturnDataEqualToMinBytes(ReplicaManager replicaManager, TopicIdPartition topicIdPartition, int minBytes) {
LogOffsetMetadata hwmOffsetMetadata = new LogOffsetMetadata(1, 1, minBytes); LogOffsetMetadata hwmOffsetMetadata = new LogOffsetMetadata(1, 1, minBytes);
LogOffsetSnapshot endOffsetSnapshot = new LogOffsetSnapshot(1, mock(LogOffsetMetadata.class), LogOffsetSnapshot endOffsetSnapshot = new LogOffsetSnapshot(1, mock(LogOffsetMetadata.class),
@ -1847,7 +2015,7 @@ public class DelayedShareFetchTest {
private LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = mock(LinkedHashMap.class); private LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = mock(LinkedHashMap.class);
private PartitionMaxBytesStrategy partitionMaxBytesStrategy = mock(PartitionMaxBytesStrategy.class); private PartitionMaxBytesStrategy partitionMaxBytesStrategy = mock(PartitionMaxBytesStrategy.class);
private Time time = new MockTime(); private Time time = new MockTime();
private final Optional<PendingRemoteFetches> pendingRemoteFetches = Optional.empty(); private Optional<PendingRemoteFetches> pendingRemoteFetches = Optional.empty();
private ShareGroupMetrics shareGroupMetrics = mock(ShareGroupMetrics.class); private ShareGroupMetrics shareGroupMetrics = mock(ShareGroupMetrics.class);
private Uuid fetchId = Uuid.randomUuid(); private Uuid fetchId = Uuid.randomUuid();
@ -1886,6 +2054,11 @@ public class DelayedShareFetchTest {
return this; return this;
} }
private DelayedShareFetchBuilder withPendingRemoteFetches(PendingRemoteFetches pendingRemoteFetches) {
this.pendingRemoteFetches = Optional.of(pendingRemoteFetches);
return this;
}
private DelayedShareFetchBuilder withFetchId(Uuid fetchId) { private DelayedShareFetchBuilder withFetchId(Uuid fetchId) {
this.fetchId = fetchId; this.fetchId = fetchId;
return this; return this;
@ -1905,7 +2078,8 @@ public class DelayedShareFetchTest {
shareGroupMetrics, shareGroupMetrics,
time, time,
pendingRemoteFetches, pendingRemoteFetches,
fetchId); fetchId,
REMOTE_FETCH_MAX_WAIT_MS);
} }
} }
} }

View File

@ -158,6 +158,7 @@ public class SharePartitionManagerTest {
private static final String CONNECTION_ID = "id-1"; private static final String CONNECTION_ID = "id-1";
static final int DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL = 1000; static final int DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL = 1000;
static final long REMOTE_FETCH_MAX_WAIT_MS = 6000L;
private MockTime time; private MockTime time;
private ReplicaManager mockReplicaManager; private ReplicaManager mockReplicaManager;
@ -3242,6 +3243,7 @@ public class SharePartitionManagerTest {
timer, timer,
MAX_DELIVERY_COUNT, MAX_DELIVERY_COUNT,
MAX_IN_FLIGHT_MESSAGES, MAX_IN_FLIGHT_MESSAGES,
REMOTE_FETCH_MAX_WAIT_MS,
persister, persister,
mock(GroupConfigManager.class), mock(GroupConfigManager.class),
shareGroupMetrics, shareGroupMetrics,

View File

@ -6123,7 +6123,8 @@ class ReplicaManagerTest {
mock(classOf[BiConsumer[SharePartitionKey, Throwable]]), mock(classOf[BiConsumer[SharePartitionKey, Throwable]]),
sharePartitions, sharePartitions,
mock(classOf[ShareGroupMetrics]), mock(classOf[ShareGroupMetrics]),
time)) time,
500))
val delayedShareFetchWatchKeys : util.List[DelayedShareFetchKey] = new util.ArrayList[DelayedShareFetchKey] val delayedShareFetchWatchKeys : util.List[DelayedShareFetchKey] = new util.ArrayList[DelayedShareFetchKey]
topicPartitions.forEach((topicIdPartition: TopicIdPartition) => delayedShareFetchWatchKeys.add(new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId, topicIdPartition.partition))) topicPartitions.forEach((topicIdPartition: TopicIdPartition) => delayedShareFetchWatchKeys.add(new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId, topicIdPartition.partition)))