KAFKA-19232: Handle Share session limit reached exception in clients. (#19619)
CI / build (push) Waiting to run Details

Handle the new `ShareSessionLimitReachedException` in
`ShareSessionHandler` in the client to reset the ShareSession.  Added a
unit test verifying the change.

Reviewers: Andrew Schofield <aschofield@confluent.io>
This commit is contained in:
Shivsundar R 2025-05-04 14:59:40 -04:00 committed by GitHub
parent bff5ba4ad9
commit fedbb90c12
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 8 additions and 4 deletions

View File

@ -219,7 +219,8 @@ public class ShareSessionHandler {
*/ */
public boolean handleResponse(ShareFetchResponse response, short version) { public boolean handleResponse(ShareFetchResponse response, short version) {
if ((response.error() == Errors.SHARE_SESSION_NOT_FOUND) || if ((response.error() == Errors.SHARE_SESSION_NOT_FOUND) ||
(response.error() == Errors.INVALID_SHARE_SESSION_EPOCH)) { (response.error() == Errors.INVALID_SHARE_SESSION_EPOCH) ||
(response.error() == Errors.SHARE_SESSION_LIMIT_REACHED)) {
log.info("Node {} was unable to process the ShareFetch request with {}: {}.", log.info("Node {} was unable to process the ShareFetch request with {}: {}.",
node, nextMetadata, response.error()); node, nextMetadata, response.error());
nextMetadata = nextMetadata.nextCloseExistingAttemptNew(); nextMetadata = nextMetadata.nextCloseExistingAttemptNew();

View File

@ -31,6 +31,8 @@ import org.apache.kafka.common.utils.LogContext;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
@ -149,8 +151,9 @@ public class ShareSessionHandlerTest {
return topicIdPartitionToPartition; return topicIdPartitionToPartition;
} }
@Test @ParameterizedTest
public void testShareSession() { @EnumSource(value = Errors.class, names = {"INVALID_SHARE_SESSION_EPOCH", "SHARE_SESSION_NOT_FOUND", "SHARE_SESSION_LIMIT_REACHED"})
public void testShareSession(Errors error) {
String groupId = "G1"; String groupId = "G1";
Uuid memberId = Uuid.randomUuid(); Uuid memberId = Uuid.randomUuid();
ShareSessionHandler handler = new ShareSessionHandler(LOG_CONTEXT, 1, memberId); ShareSessionHandler handler = new ShareSessionHandler(LOG_CONTEXT, 1, memberId);
@ -199,7 +202,7 @@ public class ShareSessionHandlerTest {
handler.handleResponse(resp2, ApiKeys.SHARE_FETCH.latestVersion(true)); handler.handleResponse(resp2, ApiKeys.SHARE_FETCH.latestVersion(true));
// A top-level error code will reset the session epoch // A top-level error code will reset the session epoch
ShareFetchResponse resp3 = ShareFetchResponse.of(Errors.INVALID_SHARE_SESSION_EPOCH, 0, new LinkedHashMap<>(), List.of(), 0); ShareFetchResponse resp3 = ShareFetchResponse.of(error, 0, new LinkedHashMap<>(), List.of(), 0);
handler.handleResponse(resp3, ApiKeys.SHARE_FETCH.latestVersion(true)); handler.handleResponse(resp3, ApiKeys.SHARE_FETCH.latestVersion(true));
ShareFetchRequestData requestData4 = handler.newShareFetchBuilder(groupId, fetchConfig).build().data(); ShareFetchRequestData requestData4 = handler.newShareFetchBuilder(groupId, fetchConfig).build().data();