KAFKA-16717 [1/2]: Add AdminClient.alterShareGroupOffsets (#18819)

KAFKA-16720 aims to add the support for the AlterShareGroupOffsets AdminClient. Key Changes in the PR:

1. Added handing of alterShareGroupOffsets() in KafkaAdminClient and introduce AlterShareGroupOffsetRequest/AlterShareGroupOffsetResponse/AlterShareGroupOffsetsOptions classes.
2. Corresponding test in KafkaAdminClientTest.
3. Added ALTER_SHARE_GROUP_OFFSETS API (will finish it in next PR and the share coordinator pieces)

Reviewers: poorv Mittal <apoorvmittal10@gmail.com>, Andrew Schofield <aschofield@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Jimmy Wang 2025-02-15 02:35:46 +08:00 committed by GitHub
parent 48283ad2e5
commit 6a6b80215d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
21 changed files with 689 additions and 1 deletions

View File

@ -1793,6 +1793,32 @@ public interface Admin extends AutoCloseable {
return describeShareGroups(groupIds, new DescribeShareGroupsOptions());
}
/**
* Alters offsets for the specified group. In order to succeed, the group must be empty.
*
* <p>This operation is not transactional, so it may succeed for some partitions while fail for others.
*
* @param groupId The group for which to alter offsets.
* @param offsets A map of offsets by partition. Partitions not specified in the map are ignored.
* @param options The options to use when altering the offsets.
* @return The AlterShareGroupOffsetsResult.
*/
AlterShareGroupOffsetsResult alterShareGroupOffsets(String groupId, Map<TopicPartition, Long> offsets, AlterShareGroupOffsetsOptions options);
/**
* Alters offsets for the specified group. In order to succeed, the group must be empty.
*
* <p>This is a convenience method for {@link #alterShareGroupOffsets(String, Map, AlterShareGroupOffsetsOptions)} with default options.
* See the overload for more details.
*
* @param groupId The group for which to alter offsets.
* @param offsets A map of offsets by partition.
* @return The AlterShareGroupOffsetsResult.
*/
default AlterShareGroupOffsetsResult alterShareGroupOffsets(String groupId, Map<TopicPartition, Long> offsets) {
return alterShareGroupOffsets(groupId, offsets, new AlterShareGroupOffsetsOptions());
}
/**
* List the share group offsets available in the cluster for the specified share groups.
*

View File

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.Map;
/**
* Options for the {@link Admin#alterShareGroupOffsets(String, Map, AlterShareGroupOffsetsOptions)} call.
* <p>
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class AlterShareGroupOffsetsOptions extends AbstractOptions<AlterShareGroupOffsetsOptions> {
}

View File

@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.protocol.Errors;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* The result of the {@link Admin#alterShareGroupOffsets(String, Map, AlterShareGroupOffsetsOptions)} call.
* <p>
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class AlterShareGroupOffsetsResult {
private final KafkaFuture<Map<TopicPartition, Errors>> future;
AlterShareGroupOffsetsResult(KafkaFuture<Map<TopicPartition, Errors>> future) {
this.future = future;
}
/**
* Return a future which can be used to check the result for a given partition.
*/
public KafkaFuture<Void> partitionResult(final TopicPartition partition) {
final KafkaFutureImpl<Void> result = new KafkaFutureImpl<>();
this.future.whenComplete((topicPartitions, throwable) -> {
if (throwable != null) {
result.completeExceptionally(throwable);
} else if (!topicPartitions.containsKey(partition)) {
result.completeExceptionally(new IllegalArgumentException(
"Alter offset for partition \"" + partition + "\" was not attempted"));
} else {
final Errors error = topicPartitions.get(partition);
if (error == Errors.NONE) {
result.complete(null);
} else {
result.completeExceptionally(error.exception());
}
}
});
return result;
}
/**
* Return a future which succeeds if all the alter offsets succeed.
*/
public KafkaFuture<Void> all() {
return this.future.thenApply(topicPartitionErrorsMap -> {
List<TopicPartition> partitionsFailed = topicPartitionErrorsMap.entrySet()
.stream()
.filter(e -> e.getValue() != Errors.NONE)
.map(Map.Entry::getKey)
.collect(Collectors.toList());
for (Errors error : topicPartitionErrorsMap.values()) {
if (error != Errors.NONE) {
throw error.exception(
"Failed altering share group offsets for the following partitions: " + partitionsFailed);
}
}
return null;
});
}
}

View File

@ -298,6 +298,11 @@ public class ForwardingAdmin implements Admin {
return delegate.describeShareGroups(groupIds, options);
}
@Override
public AlterShareGroupOffsetsResult alterShareGroupOffsets(String groupId, Map<TopicPartition, Long> offsets, AlterShareGroupOffsetsOptions options) {
return delegate.alterShareGroupOffsets(groupId, offsets, options);
}
@Override
public ListShareGroupOffsetsResult listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec> groupSpecs, ListShareGroupOffsetsOptions options) {
return delegate.listShareGroupOffsets(groupSpecs, options);

View File

@ -45,6 +45,7 @@ import org.apache.kafka.clients.admin.internals.AdminFetchMetricsManager;
import org.apache.kafka.clients.admin.internals.AdminMetadataManager;
import org.apache.kafka.clients.admin.internals.AllBrokersStrategy;
import org.apache.kafka.clients.admin.internals.AlterConsumerGroupOffsetsHandler;
import org.apache.kafka.clients.admin.internals.AlterShareGroupOffsetsHandler;
import org.apache.kafka.clients.admin.internals.CoordinatorKey;
import org.apache.kafka.clients.admin.internals.DeleteConsumerGroupOffsetsHandler;
import org.apache.kafka.clients.admin.internals.DeleteConsumerGroupsHandler;
@ -3800,6 +3801,14 @@ public class KafkaAdminClient extends AdminClient {
.collect(Collectors.toMap(entry -> entry.getKey().idValue, Map.Entry::getValue)));
}
@Override
public AlterShareGroupOffsetsResult alterShareGroupOffsets(String groupId, Map<TopicPartition, Long> offsets, AlterShareGroupOffsetsOptions options) {
SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition, Errors>> future = AlterShareGroupOffsetsHandler.newFuture(groupId);
AlterShareGroupOffsetsHandler handler = new AlterShareGroupOffsetsHandler(groupId, offsets, logContext);
invokeDriver(handler, future, options.timeoutMs);
return new AlterShareGroupOffsetsResult(future.get(CoordinatorKey.byGroupId(groupId)));
}
@Override
public ListShareGroupOffsetsResult listShareGroupOffsets(final Map<String, ListShareGroupOffsetsSpec> groupSpecs,
final ListShareGroupOffsetsOptions options) {

View File

@ -0,0 +1,165 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin.internals;
import org.apache.kafka.clients.admin.AlterShareGroupOffsetsOptions;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.AlterShareGroupOffsetsRequestData;
import org.apache.kafka.common.message.AlterShareGroupOffsetsResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.AlterShareGroupOffsetsRequest;
import org.apache.kafka.common.requests.AlterShareGroupOffsetsResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.utils.LogContext;
import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
/**
* This class is the handler for {@link KafkaAdminClient#alterShareGroupOffsets(String, Map, AlterShareGroupOffsetsOptions)} call
*/
public class AlterShareGroupOffsetsHandler extends AdminApiHandler.Batched<CoordinatorKey, Map<TopicPartition, Errors>> {
private final CoordinatorKey groupId;
private final Logger log;
private final Map<TopicPartition, Long> offsets;
private final CoordinatorStrategy lookupStrategy;
public AlterShareGroupOffsetsHandler(String groupId, Map<TopicPartition, Long> offsets, LogContext logContext) {
this.groupId = CoordinatorKey.byGroupId(groupId);
this.offsets = offsets;
this.log = logContext.logger(AlterShareGroupOffsetsHandler.class);
this.lookupStrategy = new CoordinatorStrategy(FindCoordinatorRequest.CoordinatorType.GROUP, logContext);
}
public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition, Errors>> newFuture(String groupId) {
return AdminApiFuture.forKeys(Collections.singleton(CoordinatorKey.byGroupId(groupId)));
}
@Override
AlterShareGroupOffsetsRequest.Builder buildBatchedRequest(int brokerId, Set<CoordinatorKey> groupIds) {
var data = new AlterShareGroupOffsetsRequestData().setGroupId(groupId.idValue);
offsets.forEach((tp, offset) -> {
var topic = data.topics().find(tp.topic());
if (topic == null) {
topic = new AlterShareGroupOffsetsRequestData.AlterShareGroupOffsetsRequestTopic()
.setTopicName(tp.topic());
data.topics().add(topic);
}
topic.partitions().add(new AlterShareGroupOffsetsRequestData.AlterShareGroupOffsetsRequestPartition()
.setPartitionIndex(tp.partition())
.setStartOffset(offset));
});
return new AlterShareGroupOffsetsRequest.Builder(data);
}
@Override
public String apiName() {
return "alterShareGroupOffsets";
}
@Override
public ApiResult<CoordinatorKey, Map<TopicPartition, Errors>> handleResponse(Node broker, Set<CoordinatorKey> keys, AbstractResponse abstractResponse) {
AlterShareGroupOffsetsResponse response = (AlterShareGroupOffsetsResponse) abstractResponse;
final Map<TopicPartition, Errors> partitionResults = new HashMap<>();
final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();
final Set<CoordinatorKey> groupsToRetry = new HashSet<>();
for (AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic topic : response.data().responses()) {
for (AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition partition : topic.partitions()) {
TopicPartition topicPartition = new TopicPartition(topic.topicName(), partition.partitionIndex());
Errors error = Errors.forCode(partition.errorCode());
if (error != Errors.NONE) {
handleError(
groupId,
topicPartition,
error,
partitionResults,
groupsToUnmap,
groupsToRetry
);
} else {
partitionResults.put(topicPartition, error);
}
}
}
if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) {
return ApiResult.completed(groupId, partitionResults);
} else {
return ApiResult.unmapped(new ArrayList<>(groupsToUnmap));
}
}
private void handleError(
CoordinatorKey groupId,
TopicPartition topicPartition,
Errors error,
Map<TopicPartition, Errors> partitionResults,
Set<CoordinatorKey> groupsToUnmap,
Set<CoordinatorKey> groupsToRetry
) {
switch (error) {
case COORDINATOR_LOAD_IN_PROGRESS:
case REBALANCE_IN_PROGRESS:
log.debug("AlterShareGroupOffsets request for group id {} returned error {}. Will retry.",
groupId.idValue, error);
groupsToRetry.add(groupId);
break;
case COORDINATOR_NOT_AVAILABLE:
case NOT_COORDINATOR:
log.debug("AlterShareGroupOffsets request for group id {} returned error {}. Will rediscover the coordinator and retry.",
groupId.idValue, error);
groupsToUnmap.add(groupId);
break;
case GROUP_ID_NOT_FOUND:
case NON_EMPTY_GROUP:
case INVALID_REQUEST:
case UNKNOWN_SERVER_ERROR:
case KAFKA_STORAGE_ERROR:
case GROUP_AUTHORIZATION_FAILED:
log.debug("AlterShareGroupOffsets request for group id {} and partition {} failed due" +
" to error {}.", groupId.idValue, topicPartition, error);
partitionResults.put(topicPartition, error);
break;
default:
log.error("AlterShareGroupOffsets request for group id {} and partition {} failed due" +
" to unexpected error {}.", groupId.idValue, topicPartition, error);
partitionResults.put(topicPartition, error);
}
}
@Override
public AdminApiLookupStrategy<CoordinatorKey> lookupStrategy() {
return lookupStrategy;
}
}

View File

@ -133,7 +133,8 @@ public enum ApiKeys {
READ_SHARE_GROUP_STATE_SUMMARY(ApiMessageType.READ_SHARE_GROUP_STATE_SUMMARY, true),
STREAMS_GROUP_HEARTBEAT(ApiMessageType.STREAMS_GROUP_HEARTBEAT),
STREAMS_GROUP_DESCRIBE(ApiMessageType.STREAMS_GROUP_DESCRIBE),
DESCRIBE_SHARE_GROUP_OFFSETS(ApiMessageType.DESCRIBE_SHARE_GROUP_OFFSETS);
DESCRIBE_SHARE_GROUP_OFFSETS(ApiMessageType.DESCRIBE_SHARE_GROUP_OFFSETS),
ALTER_SHARE_GROUP_OFFSETS(ApiMessageType.ALTER_SHARE_GROUP_OFFSETS);
private static final Map<ApiMessageType.ListenerType, EnumSet<ApiKeys>> APIS_BY_LISTENER =

View File

@ -348,6 +348,8 @@ public abstract class AbstractRequest implements AbstractRequestResponse {
return StreamsGroupDescribeRequest.parse(buffer, apiVersion);
case DESCRIBE_SHARE_GROUP_OFFSETS:
return DescribeShareGroupOffsetsRequest.parse(buffer, apiVersion);
case ALTER_SHARE_GROUP_OFFSETS:
return AlterShareGroupOffsetsRequest.parse(buffer, apiVersion);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " +
"code should be updated to do so.", apiKey));

View File

@ -285,6 +285,8 @@ public abstract class AbstractResponse implements AbstractRequestResponse {
return StreamsGroupDescribeResponse.parse(responseBuffer, version);
case DESCRIBE_SHARE_GROUP_OFFSETS:
return DescribeShareGroupOffsetsResponse.parse(responseBuffer, version);
case ALTER_SHARE_GROUP_OFFSETS:
return AlterShareGroupOffsetsResponse.parse(responseBuffer, version);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " +
"code should be updated to do so.", apiKey));

View File

@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.AlterShareGroupOffsetsRequestData;
import org.apache.kafka.common.message.AlterShareGroupOffsetsResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
public class AlterShareGroupOffsetsRequest extends AbstractRequest {
private final AlterShareGroupOffsetsRequestData data;
public AlterShareGroupOffsetsRequest(AlterShareGroupOffsetsRequestData data, short version) {
super(ApiKeys.ALTER_SHARE_GROUP_OFFSETS, version);
this.data = data;
}
public static class Builder extends AbstractRequest.Builder<AlterShareGroupOffsetsRequest> {
private final AlterShareGroupOffsetsRequestData data;
public Builder(AlterShareGroupOffsetsRequestData data) {
this(data, true);
}
public Builder(AlterShareGroupOffsetsRequestData data, boolean enableUnstableLastVersion) {
super(ApiKeys.ALTER_SHARE_GROUP_OFFSETS, enableUnstableLastVersion);
this.data = data;
}
@Override
public AlterShareGroupOffsetsRequest build(short version) {
return new AlterShareGroupOffsetsRequest(data, version);
}
@Override
public String toString() {
return data.toString();
}
}
@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
List<AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic> results = new ArrayList<>();
data.topics().forEach(
topicResult -> results.add(new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic()
.setTopicName(topicResult.topicName())
.setPartitions(topicResult.partitions().stream()
.map(partitionData -> new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition()
.setPartitionIndex(partitionData.partitionIndex())
.setErrorCode(Errors.forException(e).code()))
.collect(Collectors.toList()))));
return new AlterShareGroupOffsetsResponse(new AlterShareGroupOffsetsResponseData()
.setResponses(results));
}
public static AlterShareGroupOffsetsRequest parse(ByteBuffer buffer, short version) {
return new AlterShareGroupOffsetsRequest(
new AlterShareGroupOffsetsRequestData(new ByteBufferAccessor(buffer), version),
version
);
}
@Override
public AlterShareGroupOffsetsRequestData data() {
return data;
}
}

View File

@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.AlterShareGroupOffsetsResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
public class AlterShareGroupOffsetsResponse extends AbstractResponse {
private final AlterShareGroupOffsetsResponseData data;
public AlterShareGroupOffsetsResponse(AlterShareGroupOffsetsResponseData data) {
super(ApiKeys.ALTER_SHARE_GROUP_OFFSETS);
this.data = data;
}
@Override
public Map<Errors, Integer> errorCounts() {
Map<Errors, Integer> counts = new HashMap<>();
data.responses().forEach(topic -> topic.partitions().forEach(partitionResponse ->
updateErrorCounts(counts, Errors.forCode(partitionResponse.errorCode()))
));
return counts;
}
@Override
public int throttleTimeMs() {
return data.throttleTimeMs();
}
@Override
public void maybeSetThrottleTimeMs(int throttleTimeMs) {
data.setThrottleTimeMs(throttleTimeMs);
}
@Override
public AlterShareGroupOffsetsResponseData data() {
return data;
}
public static AlterShareGroupOffsetsResponse parse(ByteBuffer buffer, short version) {
return new AlterShareGroupOffsetsResponse(
new AlterShareGroupOffsetsResponseData(new ByteBufferAccessor(buffer), version)
);
}
}

View File

@ -0,0 +1,40 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
{
"apiKey": 91,
"type": "request",
"listeners": ["broker"],
"name": "AlterShareGroupOffsetsRequest",
"validVersions": "0",
"flexibleVersions": "0+",
"latestVersionUnstable": true,
"fields": [
{ "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
"about": "The group identifier." },
{ "name": "Topics", "type": "[]AlterShareGroupOffsetsRequestTopic", "versions": "0+",
"about": "The topics to alter offsets for.", "fields": [
{ "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName", "mapKey": true,
"about": "The topic name." },
{ "name": "Partitions", "type": "[]AlterShareGroupOffsetsRequestPartition", "versions": "0+",
"about": "Each partition to alter offsets for.", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
"about": "The partition index." },
{ "name": "StartOffset", "type": "int64", "versions": "0+",
"about": "The share-partition start offset." }
]}
]}
]
}

View File

@ -0,0 +1,37 @@
{
"apiKey": 91,
"type": "response",
"name": "AlterShareGroupOffsetsResponse",
"validVersions": "0",
"flexibleVersions": "0+",
// Supported errors:
// - GROUP_AUTHORIZATION_FAILED (version 0+)
// - NOT_COORDINATOR (version 0+)
// - COORDINATOR_NOT_AVAILABLE (version 0+)
// - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
// - GROUP_ID_NOT_FOUND (version 0+)
// - NON_EMPTY_GROUP (version 0+)
// - KAFKA_STORAGE_ERROR (version 0+)
// - INVALID_REQUEST (version 0+)
// - UNKNOWN_SERVER_ERROR (version 0+)
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "Responses", "type": "[]AlterShareGroupOffsetsResponseTopic", "versions": "0+",
"about": "The results for each topic.", "fields": [
{ "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
"about": "The topic name." },
{ "name": "TopicId", "type": "uuid", "versions": "0+",
"about": "The unique topic ID." },
{ "name": "Partitions", "type": "[]AlterShareGroupOffsetsResponsePartition", "versions": "0+", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
"about": "The partition index." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The error code, or 0 if there was no error." },
{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "The error message, or null if there was no error." }
]}
]}
]
}

View File

@ -84,6 +84,7 @@ import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.R
import org.apache.kafka.common.message.AlterReplicaLogDirsResponseData;
import org.apache.kafka.common.message.AlterReplicaLogDirsResponseData.AlterReplicaLogDirPartitionResult;
import org.apache.kafka.common.message.AlterReplicaLogDirsResponseData.AlterReplicaLogDirTopicResult;
import org.apache.kafka.common.message.AlterShareGroupOffsetsResponseData;
import org.apache.kafka.common.message.AlterUserScramCredentialsResponseData;
import org.apache.kafka.common.message.ApiMessageType;
import org.apache.kafka.common.message.ApiVersionsResponseData;
@ -168,6 +169,7 @@ import org.apache.kafka.common.requests.AddRaftVoterResponse;
import org.apache.kafka.common.requests.AlterClientQuotasResponse;
import org.apache.kafka.common.requests.AlterPartitionReassignmentsResponse;
import org.apache.kafka.common.requests.AlterReplicaLogDirsResponse;
import org.apache.kafka.common.requests.AlterShareGroupOffsetsResponse;
import org.apache.kafka.common.requests.AlterUserScramCredentialsResponse;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.requests.ApiVersionsRequest;
@ -9390,4 +9392,60 @@ public class KafkaAdminClientTest {
assertEquals(500, partitionToOffsetAndMetadata.get(myTopicPartition3));
}
}
@Test
public void testAlterShareGroupOffsets() throws Exception {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
AlterShareGroupOffsetsResponseData data = new AlterShareGroupOffsetsResponseData().setResponses(
List.of(
new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic().setTopicName("foo").setPartitions(List.of(new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition().setPartitionIndex(0), new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition().setPartitionIndex(1))),
new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic().setTopicName("bar").setPartitions(List.of(new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition().setPartitionIndex(0)))
)
);
TopicPartition fooTopicPartition0 = new TopicPartition("foo", 0);
TopicPartition fooTopicPartition1 = new TopicPartition("foo", 1);
TopicPartition barPartition0 = new TopicPartition("bar", 0);
TopicPartition zooTopicPartition0 = new TopicPartition("zoo", 0);
env.kafkaClient().prepareResponse(new AlterShareGroupOffsetsResponse(data));
final AlterShareGroupOffsetsResult result = env.adminClient().alterShareGroupOffsets(GROUP_ID, Map.of(fooTopicPartition0, 1L, fooTopicPartition1, 2L, barPartition0, 1L));
assertNull(result.all().get());
assertNull(result.partitionResult(fooTopicPartition0).get());
assertNull(result.partitionResult(fooTopicPartition1).get());
assertNull(result.partitionResult(barPartition0).get());
TestUtils.assertFutureThrows(result.partitionResult(zooTopicPartition0), IllegalArgumentException.class);
}
}
@Test
public void testAlterShareGroupOffsetsWithErrorInOnePartition() throws Exception {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
AlterShareGroupOffsetsResponseData data = new AlterShareGroupOffsetsResponseData().setResponses(
List.of(
new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic().setTopicName("foo").setPartitions(List.of(new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition().setPartitionIndex(0), new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition().setPartitionIndex(1).setErrorCode(Errors.NON_EMPTY_GROUP.code()).setErrorMessage("The group is not empty"))),
new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic().setTopicName("bar").setPartitions(List.of(new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition().setPartitionIndex(0)))
)
);
TopicPartition fooTopicPartition0 = new TopicPartition("foo", 0);
TopicPartition fooTopicPartition1 = new TopicPartition("foo", 1);
TopicPartition barPartition0 = new TopicPartition("bar", 0);
env.kafkaClient().prepareResponse(new AlterShareGroupOffsetsResponse(data));
final AlterShareGroupOffsetsResult result = env.adminClient().alterShareGroupOffsets(GROUP_ID, Map.of(fooTopicPartition0, 1L, fooTopicPartition1, 2L, barPartition0, 1L));
TestUtils.assertFutureThrows(result.all(), Errors.NON_EMPTY_GROUP.exception().getClass());
assertNull(result.partitionResult(fooTopicPartition0).get());
TestUtils.assertFutureThrows(result.partitionResult(fooTopicPartition1), Errors.NON_EMPTY_GROUP.exception().getClass());
assertNull(result.partitionResult(barPartition0).get());
}
}
}

View File

@ -1389,6 +1389,11 @@ public class MockAdminClient extends AdminClient {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public AlterShareGroupOffsetsResult alterShareGroupOffsets(String groupId, Map<TopicPartition, Long> offsets, AlterShareGroupOffsetsOptions options) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public synchronized ListShareGroupOffsetsResult listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec> groupSpecs, ListShareGroupOffsetsOptions options) {
throw new UnsupportedOperationException("Not implemented yet");

View File

@ -58,6 +58,8 @@ import org.apache.kafka.common.message.AlterReplicaLogDirsRequestData;
import org.apache.kafka.common.message.AlterReplicaLogDirsRequestData.AlterReplicaLogDirTopic;
import org.apache.kafka.common.message.AlterReplicaLogDirsRequestData.AlterReplicaLogDirTopicCollection;
import org.apache.kafka.common.message.AlterReplicaLogDirsResponseData;
import org.apache.kafka.common.message.AlterShareGroupOffsetsRequestData;
import org.apache.kafka.common.message.AlterShareGroupOffsetsResponseData;
import org.apache.kafka.common.message.AlterUserScramCredentialsRequestData;
import org.apache.kafka.common.message.AlterUserScramCredentialsResponseData;
import org.apache.kafka.common.message.ApiMessageType;
@ -1039,6 +1041,7 @@ public class RequestResponseTest {
case STREAMS_GROUP_HEARTBEAT: return createStreamsGroupHeartbeatRequest(version);
case STREAMS_GROUP_DESCRIBE: return createStreamsGroupDescribeRequest(version);
case DESCRIBE_SHARE_GROUP_OFFSETS: return createDescribeShareGroupOffsetsRequest(version);
case ALTER_SHARE_GROUP_OFFSETS: return createAlterShareGroupOffsetsRequest(version);
default: throw new IllegalArgumentException("Unknown API key " + apikey);
}
}
@ -1132,6 +1135,7 @@ public class RequestResponseTest {
case STREAMS_GROUP_HEARTBEAT: return createStreamsGroupHeartbeatResponse();
case STREAMS_GROUP_DESCRIBE: return createStreamsGroupDescribeResponse();
case DESCRIBE_SHARE_GROUP_OFFSETS: return createDescribeShareGroupOffsetsResponse();
case ALTER_SHARE_GROUP_OFFSETS: return createAlterShareGroupOffsetsResponse();
default: throw new IllegalArgumentException("Unknown API key " + apikey);
}
}
@ -3709,6 +3713,20 @@ public class RequestResponseTest {
return new DescribeShareGroupOffsetsRequest.Builder(data).build(version);
}
private AlterShareGroupOffsetsRequest createAlterShareGroupOffsetsRequest(short version) {
AlterShareGroupOffsetsRequestData.AlterShareGroupOffsetsRequestTopicCollection alterShareGroupOffsetsRequestTopics = new AlterShareGroupOffsetsRequestData.AlterShareGroupOffsetsRequestTopicCollection();
alterShareGroupOffsetsRequestTopics.add(new AlterShareGroupOffsetsRequestData.AlterShareGroupOffsetsRequestTopic()
.setTopicName("topic")
.setPartitions(List.of(new AlterShareGroupOffsetsRequestData.AlterShareGroupOffsetsRequestPartition()
.setPartitionIndex(0)
.setStartOffset(0)))
);
AlterShareGroupOffsetsRequestData data = new AlterShareGroupOffsetsRequestData()
.setGroupId("group")
.setTopics(alterShareGroupOffsetsRequestTopics);
return new AlterShareGroupOffsetsRequest.Builder(data).build(version);
}
private DescribeShareGroupOffsetsResponse createDescribeShareGroupOffsetsResponse() {
DescribeShareGroupOffsetsResponseData data = new DescribeShareGroupOffsetsResponseData()
.setGroups(Collections.singletonList(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
@ -3724,6 +3742,17 @@ public class RequestResponseTest {
return new DescribeShareGroupOffsetsResponse(data);
}
private AlterShareGroupOffsetsResponse createAlterShareGroupOffsetsResponse() {
AlterShareGroupOffsetsResponseData data = new AlterShareGroupOffsetsResponseData()
.setResponses(List.of(new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic()
.setPartitions(List.of(new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition()
.setPartitionIndex(0)
.setErrorCode(Errors.NONE.code())))
.setTopicName("topic")
.setTopicId(Uuid.randomUuid())));
return new AlterShareGroupOffsetsResponse(data);
}
private AbstractRequest createStreamsGroupDescribeRequest(final short version) {
return new StreamsGroupDescribeRequest.Builder(new StreamsGroupDescribeRequestData()
.setGroupIds(Collections.singletonList("group"))

View File

@ -235,6 +235,7 @@ class KafkaApis(val requestChannel: RequestChannel,
case ApiKeys.DELETE_SHARE_GROUP_STATE => handleDeleteShareGroupStateRequest(request)
case ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY => handleReadShareGroupStateSummaryRequest(request)
case ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS => handleDescribeShareGroupOffsetsRequest(request)
case ApiKeys.ALTER_SHARE_GROUP_OFFSETS => handleAlterShareGroupOffsetsRequest(request)
case _ => throw new IllegalStateException(s"No handler for request api key ${request.header.apiKey}")
}
} catch {
@ -3269,6 +3270,12 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
def handleAlterShareGroupOffsetsRequest(request: RequestChannel.Request): Unit = {
val alterShareGroupOffsetsRequest = request.body[AlterShareGroupOffsetsRequest]
requestHelper.sendMaybeThrottle(request, alterShareGroupOffsetsRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
CompletableFuture.completedFuture[Unit](())
}
// Visible for Testing
def getAcknowledgeBatchesFromShareAcknowledgeRequest(shareAcknowledgeRequest: ShareAcknowledgeRequest,
topicIdNames: util.Map[Uuid, String],

View File

@ -688,6 +688,9 @@ class RequestQuotaTest extends BaseRequestTest {
case ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS =>
new DescribeShareGroupOffsetsRequest.Builder(new DescribeShareGroupOffsetsRequestData(), true)
case ApiKeys.ALTER_SHARE_GROUP_OFFSETS =>
new AlterShareGroupOffsetsRequest.Builder(new AlterShareGroupOffsetsRequestData(), true)
case _ =>
throw new IllegalArgumentException("Unsupported API key " + apiKey)
}

View File

@ -2271,6 +2271,12 @@ RULE:[n:string](regexp)s/pattern/replacement/g/U</code></pre>
<td>Group</td>
<td></td>
</tr>
<tr>
<td>ALTER_SHARE_GROUP_OFFSETS (91)</td>
<td>Alter</td>
<td>Group</td>
<td></td>
</tr>
</tbody>
</table>

View File

@ -34,6 +34,8 @@ import org.apache.kafka.common.message.AlterPartitionRequestDataJsonConverter;
import org.apache.kafka.common.message.AlterPartitionResponseDataJsonConverter;
import org.apache.kafka.common.message.AlterReplicaLogDirsRequestDataJsonConverter;
import org.apache.kafka.common.message.AlterReplicaLogDirsResponseDataJsonConverter;
import org.apache.kafka.common.message.AlterShareGroupOffsetsRequestDataJsonConverter;
import org.apache.kafka.common.message.AlterShareGroupOffsetsResponseDataJsonConverter;
import org.apache.kafka.common.message.AlterUserScramCredentialsRequestDataJsonConverter;
import org.apache.kafka.common.message.AlterUserScramCredentialsResponseDataJsonConverter;
import org.apache.kafka.common.message.ApiVersionsRequestDataJsonConverter;
@ -212,6 +214,8 @@ import org.apache.kafka.common.requests.AlterPartitionRequest;
import org.apache.kafka.common.requests.AlterPartitionResponse;
import org.apache.kafka.common.requests.AlterReplicaLogDirsRequest;
import org.apache.kafka.common.requests.AlterReplicaLogDirsResponse;
import org.apache.kafka.common.requests.AlterShareGroupOffsetsRequest;
import org.apache.kafka.common.requests.AlterShareGroupOffsetsResponse;
import org.apache.kafka.common.requests.AlterUserScramCredentialsRequest;
import org.apache.kafka.common.requests.AlterUserScramCredentialsResponse;
import org.apache.kafka.common.requests.ApiVersionsRequest;
@ -459,6 +463,8 @@ public class RequestConvertToJson {
return DescribeQuorumRequestDataJsonConverter.write(((DescribeQuorumRequest) request).data(), request.version());
case DESCRIBE_SHARE_GROUP_OFFSETS:
return DescribeShareGroupOffsetsRequestDataJsonConverter.write(((DescribeShareGroupOffsetsRequest) request).data(), request.version());
case ALTER_SHARE_GROUP_OFFSETS:
return AlterShareGroupOffsetsRequestDataJsonConverter.write(((AlterShareGroupOffsetsRequest) request).data(), request.version());
case DESCRIBE_TOPIC_PARTITIONS:
return DescribeTopicPartitionsRequestDataJsonConverter.write(((DescribeTopicPartitionsRequest) request).data(), request.version());
case DESCRIBE_TRANSACTIONS:
@ -641,6 +647,8 @@ public class RequestConvertToJson {
return DescribeQuorumResponseDataJsonConverter.write(((DescribeQuorumResponse) response).data(), version);
case DESCRIBE_SHARE_GROUP_OFFSETS:
return DescribeShareGroupOffsetsResponseDataJsonConverter.write(((DescribeShareGroupOffsetsResponse) response).data(), version);
case ALTER_SHARE_GROUP_OFFSETS:
return AlterShareGroupOffsetsResponseDataJsonConverter.write(((AlterShareGroupOffsetsResponse) response).data(), version);
case DESCRIBE_TOPIC_PARTITIONS:
return DescribeTopicPartitionsResponseDataJsonConverter.write(((DescribeTopicPartitionsResponse) response).data(), version);
case DESCRIBE_TRANSACTIONS:

View File

@ -35,6 +35,8 @@ import org.apache.kafka.clients.admin.AlterPartitionReassignmentsOptions;
import org.apache.kafka.clients.admin.AlterPartitionReassignmentsResult;
import org.apache.kafka.clients.admin.AlterReplicaLogDirsOptions;
import org.apache.kafka.clients.admin.AlterReplicaLogDirsResult;
import org.apache.kafka.clients.admin.AlterShareGroupOffsetsOptions;
import org.apache.kafka.clients.admin.AlterShareGroupOffsetsResult;
import org.apache.kafka.clients.admin.AlterUserScramCredentialsOptions;
import org.apache.kafka.clients.admin.AlterUserScramCredentialsResult;
import org.apache.kafka.clients.admin.CreateAclsOptions;
@ -416,6 +418,11 @@ public class TestingMetricsInterceptingAdminClient extends AdminClient {
return adminDelegate.describeShareGroups(groupIds, options);
}
@Override
public AlterShareGroupOffsetsResult alterShareGroupOffsets(final String groupId, final Map<TopicPartition, Long> offsets, final AlterShareGroupOffsetsOptions options) {
return adminDelegate.alterShareGroupOffsets(groupId, offsets, options);
}
@Override
public ListShareGroupOffsetsResult listShareGroupOffsets(final Map<String, ListShareGroupOffsetsSpec> groupSpecs, final ListShareGroupOffsetsOptions options) {
return adminDelegate.listShareGroupOffsets(groupSpecs, options);