KAFKA-19440: Handle top-level errors in AlterShareGroupOffsets RPC (#20049)

While testing the code in https://github.com/apache/kafka/pull/19820, it
became clear that the error handling problems were due to the underlying
Admin API. This PR fixes the error handling for top-level errors in the
AlterShareGroupOffsets RPC.

Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, Lan Ding
 <isDing_L@163.com>, TaiJuWu <tjwu1217@gmail.com>
This commit is contained in:
Andrew Schofield 2025-07-03 11:00:56 +01:00 committed by GitHub
parent eb378da99c
commit 729f9ccf06
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 238 additions and 130 deletions

View File

@ -20,6 +20,7 @@ package org.apache.kafka.clients.admin;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.protocol.Errors;
@ -35,9 +36,9 @@ import java.util.stream.Collectors;
@InterfaceStability.Evolving
public class AlterShareGroupOffsetsResult {
private final KafkaFuture<Map<TopicPartition, Errors>> future;
private final KafkaFuture<Map<TopicPartition, ApiException>> future;
AlterShareGroupOffsetsResult(KafkaFuture<Map<TopicPartition, Errors>> future) {
AlterShareGroupOffsetsResult(KafkaFuture<Map<TopicPartition, ApiException>> future) {
this.future = future;
}
@ -54,11 +55,11 @@ public class AlterShareGroupOffsetsResult {
result.completeExceptionally(new IllegalArgumentException(
"Alter offset for partition \"" + partition + "\" was not attempted"));
} else {
final Errors error = topicPartitions.get(partition);
if (error == Errors.NONE) {
final ApiException exception = topicPartitions.get(partition);
if (exception == null) {
result.complete(null);
} else {
result.completeExceptionally(error.exception());
result.completeExceptionally(exception);
}
}
});
@ -68,22 +69,22 @@ public class AlterShareGroupOffsetsResult {
/**
* Return a future which succeeds if all the alter offsets succeed.
* If not, the first topic error shall be returned.
*/
public KafkaFuture<Void> all() {
return this.future.thenApply(topicPartitionErrorsMap -> {
List<TopicPartition> partitionsFailed = topicPartitionErrorsMap.entrySet()
.stream()
.filter(e -> e.getValue() != Errors.NONE)
.filter(e -> e.getValue() != null)
.map(Map.Entry::getKey)
.collect(Collectors.toList());
for (Errors error : topicPartitionErrorsMap.values()) {
if (error != Errors.NONE) {
throw error.exception(
"Failed altering share group offsets for the following partitions: " + partitionsFailed);
for (ApiException exception : topicPartitionErrorsMap.values()) {
if (exception != null) {
throw Errors.forException(exception).exception(
"Failed altering group offsets for the following partitions: " + partitionsFailed);
}
}
return null;
});
}
}

View File

@ -3804,8 +3804,10 @@ public class KafkaAdminClient extends AdminClient {
}
@Override
public AlterShareGroupOffsetsResult alterShareGroupOffsets(String groupId, Map<TopicPartition, Long> offsets, AlterShareGroupOffsetsOptions options) {
SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition, Errors>> future = AlterShareGroupOffsetsHandler.newFuture(groupId);
public AlterShareGroupOffsetsResult alterShareGroupOffsets(final String groupId,
final Map<TopicPartition, Long> offsets,
final AlterShareGroupOffsetsOptions options) {
SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition, ApiException>> future = AlterShareGroupOffsetsHandler.newFuture(groupId);
AlterShareGroupOffsetsHandler handler = new AlterShareGroupOffsetsHandler(groupId, offsets, logContext);
invokeDriver(handler, future, options.timeoutMs);
return new AlterShareGroupOffsetsResult(future.get(CoordinatorKey.byGroupId(groupId)));
@ -3821,7 +3823,9 @@ public class KafkaAdminClient extends AdminClient {
}
@Override
public DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId, Set<String> topics, DeleteShareGroupOffsetsOptions options) {
public DeleteShareGroupOffsetsResult deleteShareGroupOffsets(final String groupId,
final Set<String> topics,
final DeleteShareGroupOffsetsOptions options) {
SimpleAdminApiFuture<CoordinatorKey, Map<String, ApiException>> future = DeleteShareGroupOffsetsHandler.newFuture(groupId);
DeleteShareGroupOffsetsHandler handler = new DeleteShareGroupOffsetsHandler(groupId, topics, logContext);
invokeDriver(handler, future, options.timeoutMs);

View File

@ -21,8 +21,8 @@ import org.apache.kafka.clients.admin.AlterShareGroupOffsetsOptions;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.message.AlterShareGroupOffsetsRequestData;
import org.apache.kafka.common.message.AlterShareGroupOffsetsResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.AlterShareGroupOffsetsRequest;
@ -33,7 +33,6 @@ import org.apache.kafka.common.utils.LogContext;
import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@ -42,7 +41,7 @@ import java.util.Set;
/**
* This class is the handler for {@link KafkaAdminClient#alterShareGroupOffsets(String, Map, AlterShareGroupOffsetsOptions)} call
*/
public class AlterShareGroupOffsetsHandler extends AdminApiHandler.Batched<CoordinatorKey, Map<TopicPartition, Errors>> {
public class AlterShareGroupOffsetsHandler extends AdminApiHandler.Batched<CoordinatorKey, Map<TopicPartition, ApiException>> {
private final CoordinatorKey groupId;
@ -52,7 +51,6 @@ public class AlterShareGroupOffsetsHandler extends AdminApiHandler.Batched<Coord
private final CoordinatorStrategy lookupStrategy;
public AlterShareGroupOffsetsHandler(String groupId, Map<TopicPartition, Long> offsets, LogContext logContext) {
this.groupId = CoordinatorKey.byGroupId(groupId);
this.offsets = offsets;
@ -60,8 +58,15 @@ public class AlterShareGroupOffsetsHandler extends AdminApiHandler.Batched<Coord
this.lookupStrategy = new CoordinatorStrategy(FindCoordinatorRequest.CoordinatorType.GROUP, logContext);
}
public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition, Errors>> newFuture(String groupId) {
return AdminApiFuture.forKeys(Collections.singleton(CoordinatorKey.byGroupId(groupId)));
public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition, ApiException>> newFuture(String groupId) {
return AdminApiFuture.forKeys(Set.of(CoordinatorKey.byGroupId(groupId)));
}
private void validateKeys(Set<CoordinatorKey> groupIds) {
if (!groupIds.equals(Set.of(groupId))) {
throw new IllegalArgumentException("Received unexpected group ids " + groupIds +
" (expected only " + Set.of(groupId) + ")");
}
}
@Override
@ -87,30 +92,38 @@ public class AlterShareGroupOffsetsHandler extends AdminApiHandler.Batched<Coord
}
@Override
public ApiResult<CoordinatorKey, Map<TopicPartition, Errors>> handleResponse(Node broker, Set<CoordinatorKey> keys, AbstractResponse abstractResponse) {
public ApiResult<CoordinatorKey, Map<TopicPartition, ApiException>> handleResponse(Node broker, Set<CoordinatorKey> keys, AbstractResponse abstractResponse) {
validateKeys(keys);
AlterShareGroupOffsetsResponse response = (AlterShareGroupOffsetsResponse) abstractResponse;
final Map<TopicPartition, Errors> partitionResults = new HashMap<>();
final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();
final Set<CoordinatorKey> groupsToRetry = new HashSet<>();
final Map<TopicPartition, ApiException> partitionResults = new HashMap<>();
for (AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic topic : response.data().responses()) {
for (AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition partition : topic.partitions()) {
TopicPartition topicPartition = new TopicPartition(topic.topicName(), partition.partitionIndex());
Errors error = Errors.forCode(partition.errorCode());
if (response.data().errorCode() != Errors.NONE.code()) {
final Errors topLevelError = Errors.forCode(response.data().errorCode());
final String topLevelErrorMessage = response.data().errorMessage();
if (error != Errors.NONE) {
handleError(
groupId,
topicPartition,
error,
partitionResults,
groupsToUnmap,
groupsToRetry
);
} else {
partitionResults.put(topicPartition, error);
offsets.forEach((topicPartition, offset) ->
handleError(
groupId,
topicPartition,
topLevelError,
topLevelErrorMessage,
partitionResults,
groupsToUnmap,
groupsToRetry
));
} else {
response.data().responses().forEach(topic -> topic.partitions().forEach(partition -> {
if (partition.errorCode() != Errors.NONE.code()) {
final Errors partitionError = Errors.forCode(partition.errorCode());
final String partitionErrorMessage = partition.errorMessage();
log.debug("AlterShareGroupOffsets request for group id {} and topic-partition {}-{} failed and returned error {}." + partitionErrorMessage,
groupId.idValue, topic.topicName(), partition.partitionIndex(), partitionError);
}
}
partitionResults.put(new TopicPartition(topic.topicName(), partition.partitionIndex()), Errors.forCode(partition.errorCode()).exception(partition.errorMessage()));
}));
}
if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) {
@ -121,23 +134,23 @@ public class AlterShareGroupOffsetsHandler extends AdminApiHandler.Batched<Coord
}
private void handleError(
CoordinatorKey groupId,
TopicPartition topicPartition,
Errors error,
Map<TopicPartition, Errors> partitionResults,
Set<CoordinatorKey> groupsToUnmap,
Set<CoordinatorKey> groupsToRetry
CoordinatorKey groupId,
TopicPartition topicPartition,
Errors error,
String errorMessage,
Map<TopicPartition, ApiException> partitionResults,
Set<CoordinatorKey> groupsToUnmap,
Set<CoordinatorKey> groupsToRetry
) {
switch (error) {
case COORDINATOR_LOAD_IN_PROGRESS:
case REBALANCE_IN_PROGRESS:
log.debug("AlterShareGroupOffsets request for group id {} returned error {}. Will retry.",
groupId.idValue, error);
log.debug("AlterShareGroupOffsets request for group id {} returned error {}. Will retry." + errorMessage, groupId.idValue, error);
groupsToRetry.add(groupId);
break;
case COORDINATOR_NOT_AVAILABLE:
case NOT_COORDINATOR:
log.debug("AlterShareGroupOffsets request for group id {} returned error {}. Will rediscover the coordinator and retry.",
log.debug("AlterShareGroupOffsets request for group id {} returned error {}. Will rediscover the coordinator and retry." + errorMessage,
groupId.idValue, error);
groupsToUnmap.add(groupId);
break;
@ -147,14 +160,12 @@ public class AlterShareGroupOffsetsHandler extends AdminApiHandler.Batched<Coord
case UNKNOWN_SERVER_ERROR:
case KAFKA_STORAGE_ERROR:
case GROUP_AUTHORIZATION_FAILED:
log.debug("AlterShareGroupOffsets request for group id {} and partition {} failed due" +
" to error {}.", groupId.idValue, topicPartition, error);
partitionResults.put(topicPartition, error);
log.debug("AlterShareGroupOffsets request for group id {} failed due to error {}." + errorMessage, groupId.idValue, error);
partitionResults.put(topicPartition, error.exception(errorMessage));
break;
default:
log.error("AlterShareGroupOffsets request for group id {} and partition {} failed due" +
" to unexpected error {}.", groupId.idValue, topicPartition, error);
partitionResults.put(topicPartition, error);
log.error("AlterShareGroupOffsets request for group id {} failed due to unexpected error {}." + errorMessage, groupId.idValue, error);
partitionResults.put(topicPartition, error.exception(errorMessage));
}
}

View File

@ -53,26 +53,31 @@ public class AlterShareGroupOffsetsRequest extends AbstractRequest {
}
@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
Errors error = Errors.forException(e);
return new AlterShareGroupOffsetsResponse(getErrorResponse(throttleTimeMs, error));
public AlterShareGroupOffsetsResponse getErrorResponse(int throttleTimeMs, Throwable e) {
return getErrorResponse(throttleTimeMs, Errors.forException(e));
}
public static AlterShareGroupOffsetsResponseData getErrorResponse(int throttleTimeMs, Errors error) {
return new AlterShareGroupOffsetsResponseData()
.setThrottleTimeMs(throttleTimeMs)
.setErrorCode(error.code())
.setErrorMessage(error.message());
public AlterShareGroupOffsetsResponse getErrorResponse(int throttleTimeMs, Errors error) {
return getErrorResponse(throttleTimeMs, error.code(), error.message());
}
public static AlterShareGroupOffsetsResponseData getErrorResponse(Errors error) {
return getErrorResponse(error.code(), error.message());
}
public static AlterShareGroupOffsetsResponseData getErrorResponse(short errorCode, String errorMessage) {
return new AlterShareGroupOffsetsResponseData()
public AlterShareGroupOffsetsResponse getErrorResponse(int throttleTimeMs, short errorCode, String message) {
return new AlterShareGroupOffsetsResponse(
new AlterShareGroupOffsetsResponseData()
.setThrottleTimeMs(throttleTimeMs)
.setErrorCode(errorCode)
.setErrorMessage(errorMessage);
.setErrorMessage(message)
);
}
public static AlterShareGroupOffsetsResponseData getErrorResponseData(Errors error) {
return getErrorResponseData(error, null);
}
public static AlterShareGroupOffsetsResponseData getErrorResponseData(Errors error, String errorMessage) {
return new AlterShareGroupOffsetsResponseData()
.setErrorCode(error.code())
.setErrorMessage(errorMessage == null ? error.message() : errorMessage);
}
public static AlterShareGroupOffsetsRequest parse(Readable readable, short version) {

View File

@ -80,12 +80,18 @@ public class DeleteShareGroupOffsetsRequest extends AbstractRequest {
}
public static DeleteShareGroupOffsetsResponseData getErrorDeleteResponseData(Errors error) {
return getErrorDeleteResponseData(error.code(), error.message());
return getErrorDeleteResponseData(error, null);
}
public static DeleteShareGroupOffsetsResponseData getErrorDeleteResponseData(short errorCode, String errorMessage) {
return new DeleteShareGroupOffsetsResponseData()
.setErrorCode(errorCode)
.setErrorMessage(errorMessage);
.setErrorMessage(errorMessage == null ? Errors.forCode(errorCode).message() : errorMessage);
}
public static DeleteShareGroupOffsetsResponseData getErrorDeleteResponseData(Errors error, String errorMessage) {
return new DeleteShareGroupOffsetsResponseData()
.setErrorCode(error.code())
.setErrorMessage(errorMessage == null ? error.message() : errorMessage);
}
}

View File

@ -56,7 +56,6 @@ import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.DuplicateVoterException;
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.GroupNotEmptyException;
import org.apache.kafka.common.errors.GroupSubscribedToTopicException;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
@ -11351,6 +11350,28 @@ public class KafkaAdminClientTest {
}
}
@Test
public void testAlterShareGroupOffsetsWithTopLevelError() throws Exception {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
AlterShareGroupOffsetsResponseData data = new AlterShareGroupOffsetsResponseData().setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code()).setErrorMessage("Group authorization failed.");
TopicPartition fooTopicPartition0 = new TopicPartition("foo", 0);
TopicPartition fooTopicPartition1 = new TopicPartition("foo", 1);
TopicPartition barPartition0 = new TopicPartition("bar", 0);
TopicPartition zooTopicPartition0 = new TopicPartition("zoo", 0);
env.kafkaClient().prepareResponse(new AlterShareGroupOffsetsResponse(data));
final AlterShareGroupOffsetsResult result = env.adminClient().alterShareGroupOffsets(GROUP_ID, Map.of(fooTopicPartition0, 1L, fooTopicPartition1, 2L, barPartition0, 1L));
TestUtils.assertFutureThrows(GroupAuthorizationException.class, result.all());
TestUtils.assertFutureThrows(GroupAuthorizationException.class, result.partitionResult(fooTopicPartition1));
TestUtils.assertFutureThrows(IllegalArgumentException.class, result.partitionResult(zooTopicPartition0));
}
}
@Test
public void testAlterShareGroupOffsetsWithErrorInOnePartition() throws Exception {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
@ -11359,7 +11380,8 @@ public class KafkaAdminClientTest {
AlterShareGroupOffsetsResponseData data = new AlterShareGroupOffsetsResponseData().setResponses(
new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopicCollection(List.of(
new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic().setTopicName("foo").setPartitions(List.of(new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition().setPartitionIndex(0), new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition().setPartitionIndex(1).setErrorCode(Errors.NON_EMPTY_GROUP.code()).setErrorMessage("The group is not empty"))),
new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic().setTopicName("foo").setPartitions(List.of(new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition().setPartitionIndex(0),
new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition().setPartitionIndex(1).setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code()).setErrorMessage("Topic authorization failed."))),
new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic().setTopicName("bar").setPartitions(List.of(new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition().setPartitionIndex(0)))
).iterator())
);
@ -11371,9 +11393,9 @@ public class KafkaAdminClientTest {
env.kafkaClient().prepareResponse(new AlterShareGroupOffsetsResponse(data));
final AlterShareGroupOffsetsResult result = env.adminClient().alterShareGroupOffsets(GROUP_ID, Map.of(fooTopicPartition0, 1L, fooTopicPartition1, 2L, barPartition0, 1L));
TestUtils.assertFutureThrows(GroupNotEmptyException.class, result.all());
TestUtils.assertFutureThrows(TopicAuthorizationException.class, result.all());
assertNull(result.partitionResult(fooTopicPartition0).get());
TestUtils.assertFutureThrows(GroupNotEmptyException.class, result.partitionResult(fooTopicPartition1));
TestUtils.assertFutureThrows(TopicAuthorizationException.class, result.partitionResult(fooTopicPartition1));
assertNull(result.partitionResult(barPartition0).get());
}
}

View File

@ -3756,7 +3756,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val groupId = alterShareGroupOffsetsRequest.data.groupId
if (!isShareGroupProtocolEnabled) {
requestHelper.sendMaybeThrottle(request, alterShareGroupOffsetsRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, Errors.UNSUPPORTED_VERSION.exception))
requestHelper.sendMaybeThrottle(request, alterShareGroupOffsetsRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
return CompletableFuture.completedFuture[Unit](())
} else if (!authHelper.authorize(request.context, READ, GROUP, groupId)) {
requestHelper.sendMaybeThrottle(request, alterShareGroupOffsetsRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
@ -3766,9 +3766,9 @@ class KafkaApis(val requestChannel: RequestChannel,
alterShareGroupOffsetsRequest.data.topics.forEach(topic => {
val topicError = {
if (!authHelper.authorize(request.context, READ, TOPIC, topic.topicName())) {
if (!authHelper.authorize(request.context, READ, TOPIC, topic.topicName)) {
Some(new ApiError(Errors.TOPIC_AUTHORIZATION_FAILED))
} else if (!metadataCache.contains(topic.topicName())) {
} else if (!metadataCache.contains(topic.topicName)) {
Some(new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION))
} else {
None
@ -3776,9 +3776,9 @@ class KafkaApis(val requestChannel: RequestChannel,
}
topicError match {
case Some(error) =>
topic.partitions().forEach(partition => responseBuilder.addPartition(topic.topicName(), partition.partitionIndex(), metadataCache.topicNamesToIds(), error.error))
topic.partitions.forEach(partition => responseBuilder.addPartition(topic.topicName, partition.partitionIndex, metadataCache.topicNamesToIds, error.error))
case None =>
authorizedTopicPartitions.add(topic)
authorizedTopicPartitions.add(topic.duplicate)
}
})
@ -3792,8 +3792,10 @@ class KafkaApis(val requestChannel: RequestChannel,
).handle[Unit] { (response, exception) =>
if (exception != null) {
requestHelper.sendMaybeThrottle(request, alterShareGroupOffsetsRequest.getErrorResponse(exception))
} else if (response.errorCode != Errors.NONE.code) {
requestHelper.sendMaybeThrottle(request, alterShareGroupOffsetsRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, response.errorCode, response.errorMessage))
} else {
requestHelper.sendMaybeThrottle(request, responseBuilder.merge(response, metadataCache.topicNamesToIds()).build())
requestHelper.sendMaybeThrottle(request, responseBuilder.merge(response, metadataCache.topicNamesToIds).build())
}
}
}
@ -3824,22 +3826,13 @@ class KafkaApis(val requestChannel: RequestChannel,
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
.setTopicName(topic.topicName)
.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
.setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message())
.setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message)
)
} else {
authorizedTopics.add(topic)
}
}
if (authorizedTopics.isEmpty) {
requestHelper.sendMaybeThrottle(
request,
new DeleteShareGroupOffsetsResponse(
new DeleteShareGroupOffsetsResponseData()
.setResponses(deleteShareGroupOffsetsResponseTopics)))
return CompletableFuture.completedFuture[Unit](())
}
groupCoordinator.deleteShareGroupOffsets(
request.context,
new DeleteShareGroupOffsetsRequestData().setGroupId(groupId).setTopics(authorizedTopics)
@ -3847,12 +3840,12 @@ class KafkaApis(val requestChannel: RequestChannel,
if (exception != null) {
requestHelper.sendMaybeThrottle(request, deleteShareGroupOffsetsRequest.getErrorResponse(
AbstractResponse.DEFAULT_THROTTLE_TIME,
Errors.forException(exception).code(),
exception.getMessage()))
Errors.forException(exception).code,
exception.getMessage))
} else if (responseData.errorCode() != Errors.NONE.code) {
requestHelper.sendMaybeThrottle(
request,
deleteShareGroupOffsetsRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, responseData.errorCode(), responseData.errorMessage())
deleteShareGroupOffsetsRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, responseData.errorCode, responseData.errorMessage)
)
} else {
responseData.responses.forEach { topic => {

View File

@ -3254,6 +3254,18 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
removeAllClientAcls()
}
private def createEmptyShareGroup(): Unit = {
createTopicWithBrokerPrincipal(topic)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, READ, ALLOW)), shareGroupResource)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, READ, ALLOW)), topicResource)
shareConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, shareGroup)
val consumer = createShareConsumer()
consumer.subscribe(util.Set.of(topic))
consumer.poll(Duration.ofMillis(500L))
consumer.close()
removeAllClientAcls()
}
@Test
def testShareGroupDescribeWithGroupDescribeAndTopicDescribeAcl(): Unit = {
createShareGroupToDescribe()
@ -3614,6 +3626,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
@Test
def testDeleteShareGroupOffsetsWithoutTopicReadAcl(): Unit = {
createEmptyShareGroup()
addAndVerifyAcls(shareGroupDeleteAcl(shareGroupResource), shareGroupResource)
val request = deleteShareGroupOffsetsRequest
@ -3663,6 +3676,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
@Test
def testAlterShareGroupOffsetsWithoutTopicReadAcl(): Unit = {
createEmptyShareGroup()
addAndVerifyAcls(shareGroupReadAcl(shareGroupResource), shareGroupResource)
val request = alterShareGroupOffsetsRequest

View File

@ -12863,10 +12863,18 @@ class KafkaApisTest extends Logging {
def testDeleteShareGroupOffsetsRequestEmptyTopicsSuccess(): Unit = {
metadataCache = initializeMetadataCacheWithShareGroupsEnabled()
val deleteShareGroupOffsetsRequest = new DeleteShareGroupOffsetsRequestData()
val deleteShareGroupOffsetsRequestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId("group")
val requestChannelRequest = buildRequest(new DeleteShareGroupOffsetsRequest.Builder(deleteShareGroupOffsetsRequest).build)
val requestChannelRequest = buildRequest(new DeleteShareGroupOffsetsRequest.Builder(deleteShareGroupOffsetsRequestData).build)
val groupCoordinatorResponse: DeleteShareGroupOffsetsResponseData = new DeleteShareGroupOffsetsResponseData()
.setErrorCode(Errors.NONE.code())
when(groupCoordinator.deleteShareGroupOffsets(
requestChannelRequest.context,
deleteShareGroupOffsetsRequestData
)).thenReturn(CompletableFuture.completedFuture(groupCoordinatorResponse))
val resultFuture = new CompletableFuture[DeleteShareGroupOffsetsResponseData]
kafkaApis = createKafkaApis()

View File

@ -698,6 +698,10 @@ public class GroupCoordinatorService implements GroupCoordinator {
InitializeShareGroupStateParameters request,
AlterShareGroupOffsetsResponseData response
) {
if (request.groupTopicPartitionData().topicsData().isEmpty()) {
return CompletableFuture.completedFuture(response);
}
return persister.initializeState(request)
.handle((result, exp) -> {
if (exp == null) {
@ -1233,16 +1237,20 @@ public class GroupCoordinatorService implements GroupCoordinator {
* See {@link GroupCoordinator#alterShareGroupOffsets(AuthorizableRequestContext, String, AlterShareGroupOffsetsRequestData)}.
*/
@Override
public CompletableFuture<AlterShareGroupOffsetsResponseData> alterShareGroupOffsets(AuthorizableRequestContext context, String groupId, AlterShareGroupOffsetsRequestData request) {
public CompletableFuture<AlterShareGroupOffsetsResponseData> alterShareGroupOffsets(
AuthorizableRequestContext context,
String groupId,
AlterShareGroupOffsetsRequestData request
) {
if (!isActive.get() || metadataImage == null) {
return CompletableFuture.completedFuture(AlterShareGroupOffsetsRequest.getErrorResponse(Errors.COORDINATOR_NOT_AVAILABLE));
return CompletableFuture.completedFuture(AlterShareGroupOffsetsRequest.getErrorResponseData(Errors.COORDINATOR_NOT_AVAILABLE));
}
if (groupId == null || groupId.isEmpty()) {
return CompletableFuture.completedFuture(AlterShareGroupOffsetsRequest.getErrorResponse(Errors.INVALID_GROUP_ID));
return CompletableFuture.completedFuture(AlterShareGroupOffsetsRequest.getErrorResponseData(Errors.INVALID_GROUP_ID));
}
if (request.topics() == null || request.topics().isEmpty()) {
if (request.topics() == null) {
return CompletableFuture.completedFuture(new AlterShareGroupOffsetsResponseData());
}
@ -1257,7 +1265,7 @@ public class GroupCoordinatorService implements GroupCoordinator {
"share-group-offsets-alter",
request,
exception,
(error, message) -> AlterShareGroupOffsetsRequest.getErrorResponse(error),
(error, message) -> AlterShareGroupOffsetsRequest.getErrorResponseData(error, message),
log
));
}
@ -1822,26 +1830,18 @@ public class GroupCoordinatorService implements GroupCoordinator {
AuthorizableRequestContext context,
DeleteShareGroupOffsetsRequestData requestData
) {
if (!isActive.get()) {
return CompletableFuture.completedFuture(
DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.COORDINATOR_NOT_AVAILABLE));
}
if (metadataImage == null) {
return CompletableFuture.completedFuture(
DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.COORDINATOR_NOT_AVAILABLE));
if (!isActive.get() || metadataImage == null) {
return CompletableFuture.completedFuture(DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.COORDINATOR_NOT_AVAILABLE));
}
String groupId = requestData.groupId();
if (!isGroupIdNotEmpty(groupId)) {
return CompletableFuture.completedFuture(
DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.INVALID_GROUP_ID));
return CompletableFuture.completedFuture(DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.INVALID_GROUP_ID));
}
if (requestData.topics() == null || requestData.topics().isEmpty()) {
return CompletableFuture.completedFuture(
new DeleteShareGroupOffsetsResponseData()
if (requestData.topics() == null) {
return CompletableFuture.completedFuture(new DeleteShareGroupOffsetsResponseData()
);
}
@ -1850,15 +1850,14 @@ public class GroupCoordinatorService implements GroupCoordinator {
topicPartitionFor(groupId),
Duration.ofMillis(config.offsetCommitTimeoutMs()),
coordinator -> coordinator.initiateDeleteShareGroupOffsets(groupId, requestData)
)
.thenCompose(resultHolder -> deleteShareGroupOffsetsState(groupId, resultHolder))
.exceptionally(exception -> handleOperationException(
"initiate-delete-share-group-offsets",
groupId,
exception,
(error, __) -> DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(error),
log
));
).thenCompose(resultHolder -> deleteShareGroupOffsetsState(groupId, resultHolder)
).exceptionally(exception -> handleOperationException(
"initiate-delete-share-group-offsets",
groupId,
exception,
(error, message) -> DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(error, message),
log
));
}
private CompletableFuture<DeleteShareGroupOffsetsResponseData> deleteShareGroupOffsetsState(

View File

@ -4280,10 +4280,26 @@ public class GroupCoordinatorServiceTest {
service.startup(() -> 1);
DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId("share-group-id");
.setGroupId("share-group-id")
.setTopics(List.of());
DeleteShareGroupOffsetsResponseData responseData = new DeleteShareGroupOffsetsResponseData();
GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder deleteShareGroupOffsetsResultHolder =
new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder(
Errors.NONE.code(),
null,
null,
null
);
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("initiate-delete-share-group-offsets"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.eq(Duration.ofMillis(5000)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(deleteShareGroupOffsetsResultHolder));
CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS), requestData);
@ -4291,7 +4307,7 @@ public class GroupCoordinatorServiceTest {
}
@Test
public void testDeleteShareGroupOffsetsNullTopicsInRequest() throws InterruptedException, ExecutionException {
public void testDeleteShareGroupOffsetsEmptyTopicsInRequest() throws InterruptedException, ExecutionException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
Persister persister = mock(DefaultStatePersister.class);
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
@ -4303,10 +4319,25 @@ public class GroupCoordinatorServiceTest {
DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId("share-group-id")
.setTopics(null);
.setTopics(List.of());
DeleteShareGroupOffsetsResponseData responseData = new DeleteShareGroupOffsetsResponseData();
GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder deleteShareGroupOffsetsResultHolder =
new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder(
Errors.NONE.code(),
null,
null,
null
);
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("initiate-delete-share-group-offsets"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.eq(Duration.ofMillis(5000)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(deleteShareGroupOffsetsResultHolder));
CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS), requestData);
@ -4400,9 +4431,7 @@ public class GroupCoordinatorServiceTest {
DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId(groupId)
.setTopics(List.of(new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(TOPIC_NAME)
));
.setTopics(List.of());
DeleteShareGroupOffsetsResponseData responseData = new DeleteShareGroupOffsetsResponseData()
.setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
@ -5376,13 +5405,29 @@ public class GroupCoordinatorServiceTest {
AlterShareGroupOffsetsRequestData request = new AlterShareGroupOffsetsRequestData()
.setGroupId(groupId);
AlterShareGroupOffsetsResponseData data = new AlterShareGroupOffsetsResponseData();
Map.Entry<AlterShareGroupOffsetsResponseData, InitializeShareGroupStateParameters> alterShareGroupOffsetsIntermediate =
Map.entry(
new AlterShareGroupOffsetsResponseData()
.setResponses(new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopicCollection()),
new InitializeShareGroupStateParameters.Builder()
.setGroupTopicPartitionData(new GroupTopicPartitionData<>("share-group", List.of()))
.build());
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("share-group-offsets-alter"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.eq(Duration.ofMillis(5000)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(alterShareGroupOffsetsIntermediate));
CompletableFuture<AlterShareGroupOffsetsResponseData> future = service.alterShareGroupOffsets(
requestContext(ApiKeys.ALTER_SHARE_GROUP_OFFSETS),
groupId,
request
);
AlterShareGroupOffsetsResponseData data = new AlterShareGroupOffsetsResponseData();
assertEquals(data, future.get());
}
@ -5416,7 +5461,7 @@ public class GroupCoordinatorServiceTest {
AlterShareGroupOffsetsResponseData response = new AlterShareGroupOffsetsResponseData()
.setErrorCode(Errors.NON_EMPTY_GROUP.code())
.setErrorMessage(Errors.NON_EMPTY_GROUP.message());
.setErrorMessage("bad stuff");
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("share-group-offsets-alter"),