KAFKA-19231-1: Handle fetch request when share session cache is full (#19701)

According to the recent changes in KIP-932, when the share session cache
is full and a broker receives a Share Fetch request with Initial Share
Session Epoch (0), then the error code `SHARE_SESSION_LIMIT_REACHED` is
returned after a delay of maxWaitMs. This PR implements this logic. In
order to add a delay between subsequent share fetch requests, the timer
is delayed operation purgatory is used. A new `IdleShareFetchTimeTask`
has been added which takes in a CompletableFuture<Void>. Upon the
expiration, this future is completed with null. When the future is
completes, a response is sent back to the client with the error code
`SHARE_SESSION_LIMIT_REACHED`

Reviewers: Andrew Schofield <aschofield@confluent.io>, Apoorv Mittal <apoorvmittal10@gmail.com>
This commit is contained in:
Chirag Wadhwa 2025-05-15 19:06:44 +05:30 committed by GitHub
parent 847968e530
commit f9064f8bcd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 180 additions and 6 deletions

View File

@ -41,7 +41,7 @@
files="(KafkaClusterTestKit).java"/>
<suppress checks="NPathComplexity" files="TestKitNodes.java"/>
<suppress checks="JavaNCSS"
files="(SharePartitionManagerTest|SharePartitionTest).java"/>
files="(SharePartitionManagerTest|SharePartitionTest|ShareConsumerTest).java"/>
<suppress checks="ClassDataAbstractionCoupling|ClassFanOutComplexity" files="SharePartitionManagerTest"/>
<suppress checks="CyclomaticComplexity" files="SharePartition.java"/>

View File

@ -2199,6 +2199,41 @@ public class ShareConsumerTest {
shareConsumer4.close();
}
@ClusterTest(
brokers = 1,
serverProperties = {
@ClusterConfigProperty(key = "group.share.max.size", value = "1"), // Setting max group size to 1
@ClusterConfigProperty(key = "group.share.max.share.sessions", value = "1") // Setting max share sessions value to 1
}
)
public void testShareGroupShareSessionCacheIsFull() {
alterShareAutoOffsetReset("group1", "earliest");
try (Producer<byte[], byte[]> producer = createProducer();
ShareConsumer<byte[], byte[]> shareConsumer1 = createShareConsumer("group1");
ShareConsumer<byte[], byte[]> shareConsumer2 = createShareConsumer("group2")) {
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());
producer.send(record);
producer.flush();
shareConsumer1.subscribe(Set.of(tp.topic()));
shareConsumer2.subscribe(Set.of(tp.topic()));
ConsumerRecords<byte[], byte[]> records = waitedPoll(shareConsumer1, 2500L, 1);
assertEquals(1, records.count());
producer.send(record);
producer.flush();
// The second share consumer should not throw any exception, but should not receive any records as well.
records = shareConsumer2.poll(Duration.ofMillis(1000));
assertEquals(0, records.count());
shareConsumer1.close();
shareConsumer2.close();
}
}
@ClusterTest
public void testReadCommittedIsolationLevel() {
alterShareAutoOffsetReset("group1", "earliest");

View File

@ -58,6 +58,7 @@ import org.apache.kafka.server.util.FutureUtils;
import org.apache.kafka.server.util.timer.SystemTimer;
import org.apache.kafka.server.util.timer.SystemTimerReaper;
import org.apache.kafka.server.util.timer.Timer;
import org.apache.kafka.server.util.timer.TimerTask;
import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
import org.slf4j.Logger;
@ -390,6 +391,21 @@ public class SharePartitionManager implements AutoCloseable {
return mapAcknowledgementFutures(futuresMap, Optional.empty());
}
/**
* The method creates a timer task to delay the share fetch request for maxWaitMs duration.
* A future is created and returned which will be completed when the timer task is completed.
*
* @param maxWaitMs The duration after which the timer task will be completed.
*
* @return A future that will be completed when the timer task is completed.
*/
public CompletableFuture<Void> createIdleShareFetchTimerTask(long maxWaitMs) {
CompletableFuture<Void> future = new CompletableFuture<>();
TimerTask idleShareFetchTimerTask = new IdleShareFetchTimerTask(maxWaitMs, future);
replicaManager.addShareFetchTimerRequest(idleShareFetchTimerTask);
return future;
}
private CompletableFuture<Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData>> mapAcknowledgementFutures(
Map<TopicIdPartition, CompletableFuture<Throwable>> futuresMap,
Optional<Consumer<Set<String>>> failedMetricsHandler
@ -461,7 +477,7 @@ public class SharePartitionManager implements AutoCloseable {
cachedSharePartitions, clientConnectionId);
if (responseShareSessionKey == null) {
log.error("Could not create a share session for group {} member {}", groupId, reqMetadata.memberId());
throw Errors.SHARE_SESSION_NOT_FOUND.exception();
throw Errors.SHARE_SESSION_LIMIT_REACHED.exception();
}
context = new ShareSessionContext(reqMetadata, shareFetchData);
@ -847,4 +863,35 @@ public class SharePartitionManager implements AutoCloseable {
}
}
}
/**
* The IdleShareFetchTimerTask creates a timer task for a share fetch request which tries to initialize a new share
* session when the share session cache is full. Such a request is delayed for maxWaitMs by passing the corresponding
* IdleShareFetchTimerTask to {@link ReplicaManager#delayedShareFetchTimer}.
*/
private static class IdleShareFetchTimerTask extends TimerTask {
/**
* This future is used to complete the share fetch request when the timer task is completed.
*/
private final CompletableFuture<Void> future;
public IdleShareFetchTimerTask(
long delayMs,
CompletableFuture<Void> future
) {
super(delayMs);
this.future = future;
}
/**
* The run method which is executed when the timer task expires. This completes the future indicating that the
* delay for the corresponding share fetch request is over.
*/
@Override
public void run() {
future.complete(null);
}
}
}

View File

@ -3102,6 +3102,17 @@ class KafkaApis(val requestChannel: RequestChannel,
// Creating the shareFetchContext for Share Session Handling. if context creation fails, the request is failed directly here.
shareFetchContext = sharePartitionManager.newContext(groupId, shareFetchData, forgottenTopics, newReqMetadata, isAcknowledgeDataPresent, request.context.connectionId)
} catch {
case _: ShareSessionLimitReachedException =>
sharePartitionManager.createIdleShareFetchTimerTask(shareFetchRequest.maxWait).handle(
(_, exception) => {
if (exception != null) {
requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, exception))
} else {
requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, Errors.SHARE_SESSION_LIMIT_REACHED.exception))
}
}
)
return
case e: Exception =>
requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, e))
return

View File

@ -59,6 +59,7 @@ import org.apache.kafka.server.network.BrokerEndPoint
import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, DelayedOperationPurgatory, DelayedRemoteListOffsets, DeleteRecordsPartitionStatus, ListOffsetsPartitionStatus, TopicPartitionOperationKey}
import org.apache.kafka.server.share.fetch.{DelayedShareFetchKey, DelayedShareFetchPartitionKey}
import org.apache.kafka.server.storage.log.{FetchParams, FetchPartitionData}
import org.apache.kafka.server.util.timer.{SystemTimer, TimerTask}
import org.apache.kafka.server.util.{Scheduler, ShutdownableThread}
import org.apache.kafka.server.{ActionQueue, DelayedActionQueue, common}
import org.apache.kafka.storage.internals.checkpoint.{LazyOffsetCheckpoints, OffsetCheckpointFile, OffsetCheckpoints}
@ -290,6 +291,8 @@ class ReplicaManager(val config: KafkaConfig,
) extends Logging {
private val metricsGroup = new KafkaMetricsGroup(this.getClass)
private val addPartitionsToTxnConfig = new AddPartitionsToTxnConfig(config)
private val shareFetchPurgatoryName = "ShareFetch"
private val delayedShareFetchTimer = new SystemTimer(shareFetchPurgatoryName)
val delayedProducePurgatory = delayedProducePurgatoryParam.getOrElse(
new DelayedOperationPurgatory[DelayedProduce](
@ -311,7 +314,7 @@ class ReplicaManager(val config: KafkaConfig,
"RemoteListOffsets", config.brokerId))
val delayedShareFetchPurgatory = delayedShareFetchPurgatoryParam.getOrElse(
new DelayedOperationPurgatory[DelayedShareFetch](
"ShareFetch", config.brokerId,
shareFetchPurgatoryName, delayedShareFetchTimer, config.brokerId,
config.shareGroupConfig.shareFetchPurgatoryPurgeIntervalRequests))
/* epoch of the controller that last changed the leader */
@ -448,6 +451,14 @@ class ReplicaManager(val config: KafkaConfig,
delayedShareFetchPurgatory.tryCompleteElseWatch(delayedShareFetch, delayedShareFetchKeys)
}
/**
* Add a timer task to the delayedShareFetchTimer.
* @param timerTask The timer task to be added to the delayedShareFetchTimer
*/
private[server] def addShareFetchTimerRequest(timerTask: TimerTask): Unit = {
delayedShareFetchTimer.add(timerTask)
}
/**
* Registers the provided listener to the partition iff the partition is online.
*/

View File

@ -47,7 +47,6 @@ import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.ShareFetchResponse;
import org.apache.kafka.common.requests.ShareRequestMetadata;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.GroupConfigManager;
import org.apache.kafka.server.common.ShareVersion;
@ -75,10 +74,12 @@ import org.apache.kafka.server.storage.log.FetchIsolation;
import org.apache.kafka.server.storage.log.FetchParams;
import org.apache.kafka.server.storage.log.FetchPartitionData;
import org.apache.kafka.server.util.FutureUtils;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.server.util.timer.MockTimer;
import org.apache.kafka.server.util.timer.SystemTimer;
import org.apache.kafka.server.util.timer.SystemTimerReaper;
import org.apache.kafka.server.util.timer.Timer;
import org.apache.kafka.server.util.timer.TimerTask;
import org.apache.kafka.storage.internals.log.FetchDataInfo;
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
import org.apache.kafka.storage.internals.log.OffsetResultHolder;
@ -158,7 +159,7 @@ public class SharePartitionManagerTest {
static final int DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL = 1000;
private Time time;
private MockTime time;
private ReplicaManager mockReplicaManager;
private BrokerTopicStats brokerTopicStats;
private SharePartitionManager sharePartitionManager;
@ -2804,6 +2805,39 @@ public class SharePartitionManagerTest {
validateRotatedListEquals(topicIdPartitions, resultShareFetch.topicIdPartitions(), 1);
}
@Test
public void testCreateIdleShareFetchTask() throws Exception {
ReplicaManager replicaManager = mock(ReplicaManager.class);
MockTimer mockTimer = new MockTimer(time);
long maxWaitMs = 1000L;
// Set up the mock to capture and add the timer task
Mockito.doAnswer(invocation -> {
TimerTask timerTask = invocation.getArgument(0);
mockTimer.add(timerTask);
return null;
}).when(replicaManager).addShareFetchTimerRequest(Mockito.any(TimerTask.class));
sharePartitionManager = SharePartitionManagerBuilder.builder()
.withReplicaManager(replicaManager)
.withTime(time)
.withTimer(mockTimer)
.build();
CompletableFuture<Void> future = sharePartitionManager.createIdleShareFetchTimerTask(maxWaitMs);
// Future should not be completed immediately
assertFalse(future.isDone());
mockTimer.advanceClock(maxWaitMs / 2);
assertFalse(future.isDone());
mockTimer.advanceClock((maxWaitMs / 2) + 1);
// Verify the future is completed after the wait time
assertTrue(future.isDone());
assertFalse(future.isCompletedExceptionally());
}
@Test
public void testOnShareVersionToggle() {
String groupId = "grp";

View File

@ -4824,6 +4824,38 @@ class KafkaApisTest extends Logging {
assertEquals(Errors.INVALID_SHARE_SESSION_EPOCH.code, responseData.errorCode)
}
@Test
def testHandleShareFetchRequestWhenShareSessionCacheIsFull(): Unit = {
val topicId = Uuid.randomUuid()
metadataCache = initializeMetadataCacheWithShareGroupsEnabled()
addTopicToMetadataCache("foo", 1, topicId = topicId)
when(sharePartitionManager.newContext(any(), any(), any(), any(), any(), any()))
.thenThrow(Errors.SHARE_SESSION_LIMIT_REACHED.exception)
when(sharePartitionManager.createIdleShareFetchTimerTask(anyLong()))
.thenReturn(CompletableFuture.completedFuture(null))
val shareFetchRequestData = new ShareFetchRequestData().
setGroupId("group").
setMemberId(Uuid.randomUuid.toString).
setShareSessionEpoch(0).
setTopics(util.List.of(new ShareFetchRequestData.FetchTopic().
setTopicId(topicId).
setPartitions(util.List.of(
new ShareFetchRequestData.FetchPartition()
.setPartitionIndex(0)))))
val shareFetchRequest = new ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion)
val request = buildRequest(shareFetchRequest)
kafkaApis = createKafkaApis()
kafkaApis.handleShareFetchRequest(request)
val response = verifyNoThrottling[ShareFetchResponse](request)
val responseData = response.data()
assertEquals(Errors.SHARE_SESSION_LIMIT_REACHED.code, responseData.errorCode)
}
@Test
def testHandleShareFetchRequestShareSessionSuccessfullyEstablished(): Unit = {
val topicName = "foo"

View File

@ -2106,7 +2106,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty)
val shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket3)
val shareFetchResponseData = shareFetchResponse.data()
shareFetchResponseData.errorCode == Errors.SHARE_SESSION_NOT_FOUND.code
shareFetchResponseData.errorCode == Errors.SHARE_SESSION_LIMIT_REACHED.code
}, "Share fetch request failed", 5000)
// Now we will close the socket connections for the members, mimicking a client disconnection

View File

@ -58,6 +58,10 @@ public class DelayedOperationPurgatory<T extends DelayedOperation> {
this(purgatoryName, timer, brokerId, 1000, reaperEnabled, true);
}
public DelayedOperationPurgatory(String purgatoryName, Timer timer, int brokerId, int purgeInterval) {
this(purgatoryName, timer, brokerId, purgeInterval, true, true);
}
public DelayedOperationPurgatory(String purgatoryName, int brokerId) {
this(purgatoryName, brokerId, 1000);
}