KAFKA-16718-2/n: KafkaAdminClient and GroupCoordinator implementation for DeleteShareGroupOffsets RPC (#18976)

This PR contains the implementation of KafkaAdminClient and
GroupCoordinator for DeleteShareGroupOffsets RPC.

- Added `deleteShareGroupOffsets` to `KafkaAdminClient`
- Added implementation for `handleDeleteShareGroupOffsetsRequest` in
`KafkaApis.scala`
- Added `deleteShareGroupOffsets` to `GroupCoordinator` as well.
internally this makes use of `persister.deleteState` to persist the
changes in share coordinator

Reviewers: Andrew Schofield <aschofield@confluent.io>, Sushant Mahajan <smahajan@confluent.io>
This commit is contained in:
Chirag Wadhwa 2025-04-09 12:01:06 +05:30 committed by GitHub
parent 43e22ef5d6
commit 5148174196
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 1680 additions and 24 deletions

View File

@ -1947,13 +1947,28 @@ public interface Admin extends AutoCloseable {
} }
/** /**
* Delete share groups from the cluster with the default options. * Delete offsets for a set of partitions in a share group.
* *
* @param groupIds Collection of share group ids which are to be deleted. * @param groupId The group for which to delete offsets.
* @return The DeleteShareGroupsResult. * @param partitions The topic-partitions.
* @param options The options to use when deleting offsets in a share group.
* @return The DeleteShareGroupOffsetsResult.
*/ */
default DeleteShareGroupsResult deleteShareGroups(Collection<String> groupIds) { DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId, Set<TopicPartition> partitions, DeleteShareGroupOffsetsOptions options);
return deleteShareGroups(groupIds, new DeleteShareGroupsOptions());
/**
* Delete offsets for a set of partitions in a share group with the default options.
*
* <p>
* This is a convenience method for {@link #deleteShareGroupOffsets(String, Set, DeleteShareGroupOffsetsOptions)} with default options.
* See the overload for more details.
*
* @param groupId The group for which to delete offsets.
* @param partitions The topic-partitions.
* @return The DeleteShareGroupOffsetsResult.
*/
default DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId, Set<TopicPartition> partitions) {
return deleteShareGroupOffsets(groupId, partitions, new DeleteShareGroupOffsetsOptions());
} }
/** /**
@ -1965,6 +1980,16 @@ public interface Admin extends AutoCloseable {
*/ */
DeleteShareGroupsResult deleteShareGroups(Collection<String> groupIds, DeleteShareGroupsOptions options); DeleteShareGroupsResult deleteShareGroups(Collection<String> groupIds, DeleteShareGroupsOptions options);
/**
* Delete share groups from the cluster with the default options.
*
* @param groupIds Collection of share group ids which are to be deleted.
* @return The DeleteShareGroupsResult.
*/
default DeleteShareGroupsResult deleteShareGroups(Collection<String> groupIds) {
return deleteShareGroups(groupIds, new DeleteShareGroupsOptions());
}
/** /**
* Describe streams groups in the cluster. * Describe streams groups in the cluster.
* *

View File

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.Set;
/**
* Options for the {@link Admin#deleteShareGroupOffsets(String, Set, DeleteShareGroupOffsetsOptions)} call.
* <p>
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class DeleteShareGroupOffsetsOptions extends AbstractOptions<DeleteShareGroupOffsetsOptions> {
}

View File

@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import java.util.Map;
import java.util.Set;
/**
* The result of the {@link Admin#deleteShareGroupOffsets(String, Set, DeleteShareGroupOffsetsOptions)} call.
* <p>
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class DeleteShareGroupOffsetsResult {
private final KafkaFuture<Map<TopicPartition, ApiException>> future;
private final Set<TopicPartition> partitions;
DeleteShareGroupOffsetsResult(KafkaFuture<Map<TopicPartition, ApiException>> future, Set<TopicPartition> partitions) {
this.future = future;
this.partitions = partitions;
}
/**
* Return a future which succeeds only if all the deletions succeed.
* If not, the first partition error shall be returned.
*/
public KafkaFuture<Void> all() {
final KafkaFutureImpl<Void> result = new KafkaFutureImpl<>();
this.future.whenComplete((topicPartitions, throwable) -> {
if (throwable != null) {
result.completeExceptionally(throwable);
} else {
for (TopicPartition partition : partitions) {
if (maybeCompleteExceptionally(topicPartitions, partition, result)) {
return;
}
}
result.complete(null);
}
});
return result;
}
/**
* Return a future which can be used to check the result for a given partition.
*/
public KafkaFuture<Void> partitionResult(final TopicPartition partition) {
if (!partitions.contains(partition)) {
throw new IllegalArgumentException("Partition " + partition + " was not included in the original request");
}
final KafkaFutureImpl<Void> result = new KafkaFutureImpl<>();
this.future.whenComplete((topicPartitions, throwable) -> {
if (throwable != null) {
result.completeExceptionally(throwable);
} else if (!maybeCompleteExceptionally(topicPartitions, partition, result)) {
result.complete(null);
}
});
return result;
}
private boolean maybeCompleteExceptionally(Map<TopicPartition, ApiException> partitionLevelErrors,
TopicPartition partition,
KafkaFutureImpl<Void> result) {
Throwable exception;
if (!partitionLevelErrors.containsKey(partition)) {
exception = new IllegalArgumentException("Offset deletion result for partition \"" + partition + "\" was not included in the response");
} else {
exception = partitionLevelErrors.get(partition);
}
if (exception != null) {
result.completeExceptionally(exception);
return true;
} else {
return false;
}
}
}

View File

@ -333,6 +333,11 @@ public class ForwardingAdmin implements Admin {
return delegate.listShareGroupOffsets(groupSpecs, options); return delegate.listShareGroupOffsets(groupSpecs, options);
} }
@Override
public DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId, Set<TopicPartition> partitions, DeleteShareGroupOffsetsOptions options) {
return delegate.deleteShareGroupOffsets(groupId, partitions, options);
}
@Override @Override
public DeleteShareGroupsResult deleteShareGroups(Collection<String> groupIds, DeleteShareGroupsOptions options) { public DeleteShareGroupsResult deleteShareGroups(Collection<String> groupIds, DeleteShareGroupsOptions options) {
return delegate.deleteShareGroups(groupIds, options); return delegate.deleteShareGroups(groupIds, options);

View File

@ -50,6 +50,7 @@ import org.apache.kafka.clients.admin.internals.CoordinatorKey;
import org.apache.kafka.clients.admin.internals.DeleteConsumerGroupOffsetsHandler; import org.apache.kafka.clients.admin.internals.DeleteConsumerGroupOffsetsHandler;
import org.apache.kafka.clients.admin.internals.DeleteConsumerGroupsHandler; import org.apache.kafka.clients.admin.internals.DeleteConsumerGroupsHandler;
import org.apache.kafka.clients.admin.internals.DeleteRecordsHandler; import org.apache.kafka.clients.admin.internals.DeleteRecordsHandler;
import org.apache.kafka.clients.admin.internals.DeleteShareGroupOffsetsHandler;
import org.apache.kafka.clients.admin.internals.DeleteShareGroupsHandler; import org.apache.kafka.clients.admin.internals.DeleteShareGroupsHandler;
import org.apache.kafka.clients.admin.internals.DescribeClassicGroupsHandler; import org.apache.kafka.clients.admin.internals.DescribeClassicGroupsHandler;
import org.apache.kafka.clients.admin.internals.DescribeConsumerGroupsHandler; import org.apache.kafka.clients.admin.internals.DescribeConsumerGroupsHandler;
@ -3841,6 +3842,14 @@ public class KafkaAdminClient extends AdminClient {
return new ListShareGroupOffsetsResult(future.all()); return new ListShareGroupOffsetsResult(future.all());
} }
@Override
public DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId, Set<TopicPartition> partitions, DeleteShareGroupOffsetsOptions options) {
SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition, ApiException>> future = DeleteShareGroupOffsetsHandler.newFuture(groupId);
DeleteShareGroupOffsetsHandler handler = new DeleteShareGroupOffsetsHandler(groupId, partitions, logContext);
invokeDriver(handler, future, options.timeoutMs);
return new DeleteShareGroupOffsetsResult(future.get(CoordinatorKey.byGroupId(groupId)), partitions);
}
@Override @Override
public DescribeStreamsGroupsResult describeStreamsGroups(final Collection<String> groupIds, public DescribeStreamsGroupsResult describeStreamsGroups(final Collection<String> groupIds,
final DescribeStreamsGroupsOptions options) { final DescribeStreamsGroupsOptions options) {
@ -3851,7 +3860,7 @@ public class KafkaAdminClient extends AdminClient {
return new DescribeStreamsGroupsResult(future.all().entrySet().stream() return new DescribeStreamsGroupsResult(future.all().entrySet().stream()
.collect(Collectors.toMap(entry -> entry.getKey().idValue, Map.Entry::getValue))); .collect(Collectors.toMap(entry -> entry.getKey().idValue, Map.Entry::getValue)));
} }
@Override @Override
public DescribeClassicGroupsResult describeClassicGroups(final Collection<String> groupIds, public DescribeClassicGroupsResult describeClassicGroups(final Collection<String> groupIds,
final DescribeClassicGroupsOptions options) { final DescribeClassicGroupsOptions options) {

View File

@ -0,0 +1,184 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin.internals;
import org.apache.kafka.clients.admin.DeleteShareGroupOffsetsOptions;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.message.DeleteShareGroupOffsetsRequestData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.DeleteShareGroupOffsetsRequest;
import org.apache.kafka.common.requests.DeleteShareGroupOffsetsResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.utils.LogContext;
import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
/**
* This class is the handler for {@link KafkaAdminClient#deleteShareGroupOffsets(String, Set, DeleteShareGroupOffsetsOptions)} call
*/
public class DeleteShareGroupOffsetsHandler extends AdminApiHandler.Batched<CoordinatorKey, Map<TopicPartition, ApiException>> {
private final CoordinatorKey groupId;
private final Logger log;
private final Set<TopicPartition> partitions;
private final CoordinatorStrategy lookupStrategy;
public DeleteShareGroupOffsetsHandler(String groupId, Set<TopicPartition> partitions, LogContext logContext) {
this.groupId = CoordinatorKey.byGroupId(groupId);
this.partitions = partitions;
this.log = logContext.logger(DeleteShareGroupOffsetsHandler.class);
this.lookupStrategy = new CoordinatorStrategy(FindCoordinatorRequest.CoordinatorType.GROUP, logContext);
}
@Override
public String apiName() {
return "deleteShareGroupOffsets";
}
@Override
public AdminApiLookupStrategy<CoordinatorKey> lookupStrategy() {
return lookupStrategy;
}
public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition, ApiException>> newFuture(String groupId) {
return AdminApiFuture.forKeys(Collections.singleton(CoordinatorKey.byGroupId(groupId)));
}
private void validateKeys(Set<CoordinatorKey> groupIds) {
if (!groupIds.equals(Collections.singleton(groupId))) {
throw new IllegalArgumentException("Received unexpected group ids " + groupIds +
" (expected only " + Collections.singleton(groupId) + ")");
}
}
@Override
DeleteShareGroupOffsetsRequest.Builder buildBatchedRequest(int brokerId, Set<CoordinatorKey> groupIds) {
validateKeys(groupIds);
final List<DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic> topics =
new ArrayList<>();
partitions.stream().collect(Collectors.groupingBy(TopicPartition::topic)).forEach((topic, topicPartitions) -> topics.add(
new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(topic)
.setPartitions(topicPartitions.stream()
.map(TopicPartition::partition)
.collect(Collectors.toList())
)
));
return new DeleteShareGroupOffsetsRequest.Builder(
new DeleteShareGroupOffsetsRequestData()
.setGroupId(groupId.idValue)
.setTopics(topics),
true
);
}
@Override
public ApiResult<CoordinatorKey, Map<TopicPartition, ApiException>> handleResponse(
Node coordinator,
Set<CoordinatorKey> groupIds,
AbstractResponse abstractResponse
) {
validateKeys(groupIds);
final DeleteShareGroupOffsetsResponse response = (DeleteShareGroupOffsetsResponse) abstractResponse;
final Errors groupError = Errors.forCode(response.data().errorCode());
final String groupErrorMessage = response.data().errorMessage();
if (groupError != Errors.NONE) {
final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();
final Map<CoordinatorKey, Throwable> groupsFailed = new HashMap<>();
handleGroupError(groupId, groupError, groupErrorMessage, groupsFailed, groupsToUnmap);
return new ApiResult<>(Collections.emptyMap(), groupsFailed, new ArrayList<>(groupsToUnmap));
} else {
final Map<TopicPartition, ApiException> partitionResults = new HashMap<>();
response.data().responses().forEach(topic ->
topic.partitions().forEach(partition -> {
if (partition.errorCode() != Errors.NONE.code()) {
final Errors partitionError = Errors.forCode(partition.errorCode());
final String partitionErrorMessage = partition.errorMessage();
log.debug("DeleteShareGroupOffsets request for group id {}, topic {} and partition {} failed and returned error {}." + partitionErrorMessage,
groupId.idValue, topic.topicName(), partition.partitionIndex(), partitionError);
}
partitionResults.put(
new TopicPartition(topic.topicName(), partition.partitionIndex()),
Errors.forCode(partition.errorCode()).exception(partition.errorMessage())
);
})
);
return ApiResult.completed(groupId, partitionResults);
}
}
private void handleGroupError(
CoordinatorKey groupId,
Errors error,
String errorMessage,
Map<CoordinatorKey, Throwable> failed,
Set<CoordinatorKey> groupsToUnmap
) {
switch (error) {
case COORDINATOR_LOAD_IN_PROGRESS:
case REBALANCE_IN_PROGRESS:
// If the coordinator is in the middle of loading, then we just need to retry
log.debug("DeleteShareGroupOffsets request for group id {} failed because the coordinator" +
" is still in the process of loading state. Will retry. " + errorMessage, groupId.idValue);
break;
case COORDINATOR_NOT_AVAILABLE:
case NOT_COORDINATOR:
// If the coordinator is unavailable or there was a coordinator change, then we unmap
// the key so that we retry the `FindCoordinator` request
log.debug("DeleteShareGroupOffsets request for group id {} returned error {}. Will rediscover the coordinator and retry. " + errorMessage,
groupId.idValue, error);
groupsToUnmap.add(groupId);
break;
case INVALID_GROUP_ID:
case GROUP_ID_NOT_FOUND:
case NON_EMPTY_GROUP:
case INVALID_REQUEST:
case UNKNOWN_SERVER_ERROR:
case KAFKA_STORAGE_ERROR:
case GROUP_AUTHORIZATION_FAILED:
log.debug("DeleteShareGroupOffsets request for group id {} failed due to error {}. " + errorMessage, groupId.idValue, error);
failed.put(groupId, error.exception(errorMessage));
break;
default:
log.error("DeleteShareGroupOffsets request for group id {} failed due to unexpected error {}. " + errorMessage, groupId.idValue, error);
failed.put(groupId, error.exception(errorMessage));
}
}
}

View File

@ -23,10 +23,6 @@ import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.Readable; import org.apache.kafka.common.protocol.Readable;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
public class DeleteShareGroupOffsetsRequest extends AbstractRequest { public class DeleteShareGroupOffsetsRequest extends AbstractRequest {
public static class Builder extends AbstractRequest.Builder<DeleteShareGroupOffsetsRequest> { public static class Builder extends AbstractRequest.Builder<DeleteShareGroupOffsetsRequest> {
@ -59,19 +55,20 @@ public class DeleteShareGroupOffsetsRequest extends AbstractRequest {
this.data = data; this.data = data;
} }
DeleteShareGroupOffsetsResponse getErrorResponse(int throttleTimeMs, Errors error) {
return getErrorResponse(throttleTimeMs, error.code(), error.message());
}
public DeleteShareGroupOffsetsResponse getErrorResponse(int throttleTimeMs, short errorCode, String errorMessage) {
return new DeleteShareGroupOffsetsResponse(new DeleteShareGroupOffsetsResponseData()
.setThrottleTimeMs(throttleTimeMs)
.setErrorMessage(errorMessage)
.setErrorCode(errorCode));
}
@Override @Override
public DeleteShareGroupOffsetsResponse getErrorResponse(int throttleTimeMs, Throwable e) { public DeleteShareGroupOffsetsResponse getErrorResponse(int throttleTimeMs, Throwable e) {
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> results = new ArrayList<>(); return getErrorResponse(throttleTimeMs, Errors.forException(e));
data.topics().forEach(
topicResult -> results.add(new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
.setTopicName(topicResult.topicName())
.setPartitions(topicResult.partitions().stream()
.map(partitionData -> new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
.setPartitionIndex(partitionData)
.setErrorCode(Errors.forException(e).code()))
.collect(Collectors.toList()))));
return new DeleteShareGroupOffsetsResponse(new DeleteShareGroupOffsetsResponseData()
.setResponses(results));
} }
@Override @Override
@ -85,4 +82,14 @@ public class DeleteShareGroupOffsetsRequest extends AbstractRequest {
version version
); );
} }
public static DeleteShareGroupOffsetsResponseData getErrorDeleteResponseData(Errors error) {
return getErrorDeleteResponseData(error.code(), error.message());
}
public static DeleteShareGroupOffsetsResponseData getErrorDeleteResponseData(short errorCode, String errorMessage) {
return new DeleteShareGroupOffsetsResponseData()
.setErrorCode(errorCode)
.setErrorMessage(errorMessage);
}
} }

View File

@ -42,6 +42,7 @@ public class DeleteShareGroupOffsetsResponse extends AbstractResponse {
@Override @Override
public Map<Errors, Integer> errorCounts() { public Map<Errors, Integer> errorCounts() {
Map<Errors, Integer> counts = new EnumMap<>(Errors.class); Map<Errors, Integer> counts = new EnumMap<>(Errors.class);
updateErrorCounts(counts, Errors.forCode(data.errorCode()));
data.responses().forEach( data.responses().forEach(
topicResult -> topicResult.partitions().forEach( topicResult -> topicResult.partitions().forEach(
partitionResult -> updateErrorCounts(counts, Errors.forCode(partitionResult.errorCode())) partitionResult -> updateErrorCounts(counts, Errors.forCode(partitionResult.errorCode()))

View File

@ -94,6 +94,7 @@ public class ShareGroupDescribeRequest extends AbstractRequest {
.map(groupId -> new ShareGroupDescribeResponseData.DescribedGroup() .map(groupId -> new ShareGroupDescribeResponseData.DescribedGroup()
.setGroupId(groupId) .setGroupId(groupId)
.setErrorCode(error.code()) .setErrorCode(error.code())
.setErrorMessage(error.message())
).collect(Collectors.toList()); ).collect(Collectors.toList());
} }
} }

View File

@ -106,6 +106,8 @@ import org.apache.kafka.common.message.DeleteGroupsResponseData;
import org.apache.kafka.common.message.DeleteGroupsResponseData.DeletableGroupResult; import org.apache.kafka.common.message.DeleteGroupsResponseData.DeletableGroupResult;
import org.apache.kafka.common.message.DeleteGroupsResponseData.DeletableGroupResultCollection; import org.apache.kafka.common.message.DeleteGroupsResponseData.DeletableGroupResultCollection;
import org.apache.kafka.common.message.DeleteRecordsResponseData; import org.apache.kafka.common.message.DeleteRecordsResponseData;
import org.apache.kafka.common.message.DeleteShareGroupOffsetsRequestData;
import org.apache.kafka.common.message.DeleteShareGroupOffsetsResponseData;
import org.apache.kafka.common.message.DeleteTopicsResponseData; import org.apache.kafka.common.message.DeleteTopicsResponseData;
import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult; import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult;
import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResultCollection; import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResultCollection;
@ -191,6 +193,8 @@ import org.apache.kafka.common.requests.CreateTopicsResponse;
import org.apache.kafka.common.requests.DeleteAclsResponse; import org.apache.kafka.common.requests.DeleteAclsResponse;
import org.apache.kafka.common.requests.DeleteGroupsResponse; import org.apache.kafka.common.requests.DeleteGroupsResponse;
import org.apache.kafka.common.requests.DeleteRecordsResponse; import org.apache.kafka.common.requests.DeleteRecordsResponse;
import org.apache.kafka.common.requests.DeleteShareGroupOffsetsRequest;
import org.apache.kafka.common.requests.DeleteShareGroupOffsetsResponse;
import org.apache.kafka.common.requests.DeleteTopicsRequest; import org.apache.kafka.common.requests.DeleteTopicsRequest;
import org.apache.kafka.common.requests.DeleteTopicsResponse; import org.apache.kafka.common.requests.DeleteTopicsResponse;
import org.apache.kafka.common.requests.DescribeAclsResponse; import org.apache.kafka.common.requests.DescribeAclsResponse;
@ -10780,6 +10784,156 @@ public class KafkaAdminClientTest {
} }
} }
@Test
public void testDeleteShareGroupOffsetsOptionsWithBatchedApi() throws Exception {
final Cluster cluster = mockCluster(3, 0);
final Time time = new MockTime();
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster,
AdminClientConfig.RETRIES_CONFIG, "0")) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
final Set<TopicPartition> partitions = Collections.singleton(new TopicPartition("A", 0));
final DeleteShareGroupOffsetsOptions options = new DeleteShareGroupOffsetsOptions();
env.adminClient().deleteShareGroupOffsets(GROUP_ID, partitions, options);
final MockClient mockClient = env.kafkaClient();
waitForRequest(mockClient, ApiKeys.DELETE_SHARE_GROUP_OFFSETS);
ClientRequest clientRequest = mockClient.requests().peek();
assertNotNull(clientRequest);
DeleteShareGroupOffsetsRequestData data = ((DeleteShareGroupOffsetsRequest.Builder) clientRequest.requestBuilder()).build().data();
assertEquals(GROUP_ID, data.groupId());
assertEquals(1, data.topics().size());
assertEquals(Collections.singletonList("A"),
data.topics().stream().map(DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic::topicName).collect(Collectors.toList()));
}
}
@Test
public void testDeleteShareGroupOffsets() throws Exception {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
DeleteShareGroupOffsetsResponseData data = new DeleteShareGroupOffsetsResponseData().setResponses(
List.of(
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic().setTopicName("foo").setPartitions(List.of(new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition().setPartitionIndex(0), new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition().setPartitionIndex(1))),
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic().setTopicName("bar").setPartitions(List.of(new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition().setPartitionIndex(0)))
)
);
TopicPartition fooTopicPartition0 = new TopicPartition("foo", 0);
TopicPartition fooTopicPartition1 = new TopicPartition("foo", 1);
TopicPartition barPartition0 = new TopicPartition("bar", 0);
TopicPartition zooTopicPartition0 = new TopicPartition("zoo", 0);
env.kafkaClient().prepareResponse(new DeleteShareGroupOffsetsResponse(data));
final DeleteShareGroupOffsetsResult result = env.adminClient().deleteShareGroupOffsets(GROUP_ID, Set.of(fooTopicPartition0, fooTopicPartition1, barPartition0));
assertNull(result.all().get());
assertNull(result.partitionResult(fooTopicPartition0).get());
assertNull(result.partitionResult(fooTopicPartition1).get());
assertNull(result.partitionResult(barPartition0).get());
assertThrows(IllegalArgumentException.class, () -> result.partitionResult(zooTopicPartition0));
}
}
@Test
public void testDeleteShareGroupOffsetsEmpty() throws Exception {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
DeleteShareGroupOffsetsResponseData data = new DeleteShareGroupOffsetsResponseData().setResponses(
Collections.emptyList()
);
env.kafkaClient().prepareResponse(new DeleteShareGroupOffsetsResponse(data));
final DeleteShareGroupOffsetsResult result = env.adminClient().deleteShareGroupOffsets(GROUP_ID, Collections.emptySet());
assertDoesNotThrow(() -> result.all().get());
}
}
@Test
public void testDeleteShareGroupOffsetsWithErrorInGroup() throws Exception {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
DeleteShareGroupOffsetsResponseData data = new DeleteShareGroupOffsetsResponseData()
.setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code())
.setErrorMessage(Errors.GROUP_AUTHORIZATION_FAILED.message());
TopicPartition fooTopicPartition0 = new TopicPartition("foo", 0);
TopicPartition fooTopicPartition1 = new TopicPartition("foo", 1);
TopicPartition barTopicPartition0 = new TopicPartition("bar", 0);
env.kafkaClient().prepareResponse(new DeleteShareGroupOffsetsResponse(data));
final DeleteShareGroupOffsetsResult result = env.adminClient().deleteShareGroupOffsets(GROUP_ID, Set.of(fooTopicPartition0, fooTopicPartition1, barTopicPartition0));
TestUtils.assertFutureThrows(Errors.GROUP_AUTHORIZATION_FAILED.exception().getClass(), result.all());
}
}
@Test
public void testDeleteShareGroupOffsetsWithErrorInOnePartition() throws Exception {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
DeleteShareGroupOffsetsResponseData data = new DeleteShareGroupOffsetsResponseData().setResponses(
List.of(
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic().setTopicName("foo").setPartitions(List.of(new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition().setPartitionIndex(0), new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition().setPartitionIndex(1).setErrorCode(Errors.KAFKA_STORAGE_ERROR.code()).setErrorMessage(Errors.KAFKA_STORAGE_ERROR.message()))),
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic().setTopicName("bar").setPartitions(List.of(new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition().setPartitionIndex(0)))
)
);
TopicPartition fooTopicPartition0 = new TopicPartition("foo", 0);
TopicPartition fooTopicPartition1 = new TopicPartition("foo", 1);
TopicPartition barTopicPartition0 = new TopicPartition("bar", 0);
env.kafkaClient().prepareResponse(new DeleteShareGroupOffsetsResponse(data));
final DeleteShareGroupOffsetsResult result = env.adminClient().deleteShareGroupOffsets(GROUP_ID, Set.of(fooTopicPartition0, fooTopicPartition1, barTopicPartition0));
TestUtils.assertFutureThrows(Errors.KAFKA_STORAGE_ERROR.exception().getClass(), result.all());
assertNull(result.partitionResult(fooTopicPartition0).get());
TestUtils.assertFutureThrows(Errors.KAFKA_STORAGE_ERROR.exception().getClass(), result.partitionResult(fooTopicPartition1));
assertNull(result.partitionResult(barTopicPartition0).get());
}
}
@Test
public void testDeleteShareGroupOffsetsWithPartitionNotPresentInResult() throws Exception {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
DeleteShareGroupOffsetsResponseData data = new DeleteShareGroupOffsetsResponseData().setResponses(
List.of(
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic().setTopicName("foo").setPartitions(List.of(new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition().setPartitionIndex(0), new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition().setPartitionIndex(1))),
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic().setTopicName("bar").setPartitions(List.of(new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition().setPartitionIndex(0)))
)
);
TopicPartition fooTopicPartition0 = new TopicPartition("foo", 0);
TopicPartition fooTopicPartition1 = new TopicPartition("foo", 1);
TopicPartition barTopicPartition0 = new TopicPartition("bar", 0);
TopicPartition barTopicPartition1 = new TopicPartition("bar", 1);
env.kafkaClient().prepareResponse(new DeleteShareGroupOffsetsResponse(data));
final DeleteShareGroupOffsetsResult result = env.adminClient().deleteShareGroupOffsets(GROUP_ID, Set.of(fooTopicPartition0, fooTopicPartition1, barTopicPartition0));
assertDoesNotThrow(() -> result.all().get());
assertThrows(IllegalArgumentException.class, () -> result.partitionResult(barTopicPartition1));
assertNull(result.partitionResult(barTopicPartition0).get());
}
}
private static StreamsGroupDescribeResponseData makeFullStreamsGroupDescribeResponse() { private static StreamsGroupDescribeResponseData makeFullStreamsGroupDescribeResponse() {
StreamsGroupDescribeResponseData data; StreamsGroupDescribeResponseData data;
StreamsGroupDescribeResponseData.TaskIds activeTasks1 = new StreamsGroupDescribeResponseData.TaskIds() StreamsGroupDescribeResponseData.TaskIds activeTasks1 = new StreamsGroupDescribeResponseData.TaskIds()

View File

@ -1429,6 +1429,11 @@ public class MockAdminClient extends AdminClient {
throw new UnsupportedOperationException("Not implemented yet"); throw new UnsupportedOperationException("Not implemented yet");
} }
@Override
public synchronized DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId, Set<TopicPartition> partitions, DeleteShareGroupOffsetsOptions options) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override @Override
public synchronized DeleteShareGroupsResult deleteShareGroups(Collection<String> groupIds, DeleteShareGroupsOptions options) { public synchronized DeleteShareGroupsResult deleteShareGroups(Collection<String> groupIds, DeleteShareGroupsOptions options) {
throw new UnsupportedOperationException("Not implemented yet"); throw new UnsupportedOperationException("Not implemented yet");

View File

@ -32,6 +32,8 @@ import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, SHARE
import org.apache.kafka.common.internals.{FatalExitError, Topic} import org.apache.kafka.common.internals.{FatalExitError, Topic}
import org.apache.kafka.common.message.AddPartitionsToTxnResponseData.{AddPartitionsToTxnResult, AddPartitionsToTxnResultCollection} import org.apache.kafka.common.message.AddPartitionsToTxnResponseData.{AddPartitionsToTxnResult, AddPartitionsToTxnResultCollection}
import org.apache.kafka.common.message.DeleteRecordsResponseData.{DeleteRecordsPartitionResult, DeleteRecordsTopicResult} import org.apache.kafka.common.message.DeleteRecordsResponseData.{DeleteRecordsPartitionResult, DeleteRecordsTopicResult}
import org.apache.kafka.common.message.DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic
import org.apache.kafka.common.message.DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic
import org.apache.kafka.common.message.ListClientMetricsResourcesResponseData.ClientMetricsResource import org.apache.kafka.common.message.ListClientMetricsResourcesResponseData.ClientMetricsResource
import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition
import org.apache.kafka.common.message.ListOffsetsResponseData.{ListOffsetsPartitionResponse, ListOffsetsTopicResponse} import org.apache.kafka.common.message.ListOffsetsResponseData.{ListOffsetsPartitionResponse, ListOffsetsTopicResponse}
@ -75,6 +77,7 @@ import java.util.concurrent.{CompletableFuture, ConcurrentHashMap}
import java.util.stream.Collectors import java.util.stream.Collectors
import java.util.{Collections, Optional} import java.util.{Collections, Optional}
import scala.annotation.nowarn import scala.annotation.nowarn
import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable`
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
import scala.collection.{Map, Seq, Set, mutable} import scala.collection.{Map, Seq, Set, mutable}
import scala.jdk.CollectionConverters._ import scala.jdk.CollectionConverters._
@ -3625,8 +3628,64 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleDeleteShareGroupOffsetsRequest(request: RequestChannel.Request): Unit = { def handleDeleteShareGroupOffsetsRequest(request: RequestChannel.Request): Unit = {
val deleteShareGroupOffsetsRequest = request.body[DeleteShareGroupOffsetsRequest] val deleteShareGroupOffsetsRequest = request.body[DeleteShareGroupOffsetsRequest]
requestHelper.sendMaybeThrottle(request, deleteShareGroupOffsetsRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
CompletableFuture.completedFuture[Unit](()) val groupId = deleteShareGroupOffsetsRequest.data.groupId
if (!isShareGroupProtocolEnabled) {
requestHelper.sendMaybeThrottle(request, deleteShareGroupOffsetsRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, Errors.UNSUPPORTED_VERSION.exception))
return
} else if (!authHelper.authorize(request.context, DELETE, GROUP, groupId)) {
requestHelper.sendMaybeThrottle(request, deleteShareGroupOffsetsRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, Errors.GROUP_AUTHORIZATION_FAILED.exception))
return
}
val deleteShareGroupOffsetsResponseTopics: util.List[DeleteShareGroupOffsetsResponseTopic] = new util.ArrayList[DeleteShareGroupOffsetsResponseTopic]()
val authorizedTopics: util.List[DeleteShareGroupOffsetsRequestTopic] =
new util.ArrayList[DeleteShareGroupOffsetsRequestTopic]
deleteShareGroupOffsetsRequest.data.topics.forEach{ topic =>
if (!authHelper.authorize(request.context, READ, TOPIC, topic.topicName)) {
deleteShareGroupOffsetsResponseTopics.add(
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
.setTopicName(topic.topicName)
.setPartitions(topic.partitions.map(partition => {
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
.setPartitionIndex(partition)
.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
.setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message())
}).toList.asJava)
)
} else {
authorizedTopics.add(topic)
}
}
if (authorizedTopics.isEmpty) {
requestHelper.sendMaybeThrottle(request, new DeleteShareGroupOffsetsResponse(new DeleteShareGroupOffsetsResponseData()))
return
}
groupCoordinator.deleteShareGroupOffsets(
request.context,
new DeleteShareGroupOffsetsRequestData().setGroupId(groupId).setTopics(authorizedTopics)
).handle[Unit] {(responseData, exception) => {
if (exception != null) {
requestHelper.sendMaybeThrottle(request, deleteShareGroupOffsetsRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, exception))
} else if (responseData.errorCode() != Errors.NONE.code) {
requestHelper.sendMaybeThrottle(
request,
deleteShareGroupOffsetsRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, responseData.errorCode(), responseData.errorMessage())
)
} else {
responseData.responses.forEach { topic => {
deleteShareGroupOffsetsResponseTopics.add(topic)
}}
val deleteShareGroupStateResponse = new DeleteShareGroupOffsetsResponse(new DeleteShareGroupOffsetsResponseData()
.setResponses(deleteShareGroupOffsetsResponseTopics))
requestHelper.sendMaybeThrottle(request, deleteShareGroupStateResponse)
}
}}
} }
// Visible for Testing // Visible for Testing

View File

@ -43,6 +43,8 @@ import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData.{DescribedGroup, TopicPartitions} import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData.{DescribedGroup, TopicPartitions}
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult
import org.apache.kafka.common.message.DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic
import org.apache.kafka.common.message.DeleteShareGroupOffsetsResponseData.{DeleteShareGroupOffsetsResponsePartition, DeleteShareGroupOffsetsResponseTopic}
import org.apache.kafka.common.message.DescribeShareGroupOffsetsRequestData.{DescribeShareGroupOffsetsRequestGroup, DescribeShareGroupOffsetsRequestTopic} import org.apache.kafka.common.message.DescribeShareGroupOffsetsRequestData.{DescribeShareGroupOffsetsRequestGroup, DescribeShareGroupOffsetsRequestTopic}
import org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData.{DescribeShareGroupOffsetsResponseGroup, DescribeShareGroupOffsetsResponsePartition, DescribeShareGroupOffsetsResponseTopic} import org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData.{DescribeShareGroupOffsetsResponseGroup, DescribeShareGroupOffsetsResponsePartition, DescribeShareGroupOffsetsResponseTopic}
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResource => IAlterConfigsResource, AlterConfigsResourceCollection => IAlterConfigsResourceCollection, AlterableConfig => IAlterableConfig, AlterableConfigCollection => IAlterableConfigCollection} import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResource => IAlterConfigsResource, AlterConfigsResourceCollection => IAlterConfigsResourceCollection, AlterableConfig => IAlterableConfig, AlterableConfigCollection => IAlterableConfigCollection}
@ -11809,6 +11811,358 @@ class KafkaApisTest extends Logging {
assertEquals(describeShareGroupOffsetsResponse, response.data) assertEquals(describeShareGroupOffsetsResponse, response.data)
} }
@Test
def testDeleteShareGroupOffsetsReturnsUnsupportedVersion(): Unit = {
val deleteShareGroupOffsetsRequest = new DeleteShareGroupOffsetsRequestData()
.setGroupId("group")
.setTopics(util.List.of(new DeleteShareGroupOffsetsRequestTopic().setTopicName("topic-1").setPartitions(util.List.of(1))))
val requestChannelRequest = buildRequest(new DeleteShareGroupOffsetsRequest.Builder(deleteShareGroupOffsetsRequest, true).build())
metadataCache = new KRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
kafkaApis = createKafkaApis()
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
val response = verifyNoThrottling[DeleteShareGroupOffsetsResponse](requestChannelRequest)
response.data.responses.forEach(topic => topic.partitions.forEach(partition => assertEquals(Errors.UNSUPPORTED_VERSION.code, partition.errorCode)))
}
@Test
def testDeleteShareGroupOffsetsRequestsGroupAuthorizationFailed(): Unit = {
val deleteShareGroupOffsetsRequest = new DeleteShareGroupOffsetsRequestData()
.setGroupId("group")
.setTopics(util.List.of(new DeleteShareGroupOffsetsRequestTopic().setTopicName("topic-1").setPartitions(util.List.of(1))))
val requestChannelRequest = buildRequest(new DeleteShareGroupOffsetsRequest.Builder(deleteShareGroupOffsetsRequest, true).build)
val authorizer: Authorizer = mock(classOf[Authorizer])
when(authorizer.authorize(any[RequestContext], any[util.List[Action]]))
.thenReturn(util.List.of(AuthorizationResult.DENIED))
metadataCache = new KRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
kafkaApis = createKafkaApis(
overrideProperties = Map(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
authorizer = Some(authorizer),
)
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
val response = verifyNoThrottling[DeleteShareGroupOffsetsResponse](requestChannelRequest)
assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, response.data.errorCode)
}
@Test
def testDeleteShareGroupOffsetsRequestsTopicAuthorizationFailed(): Unit = {
def buildExpectedActionsTopic(topic: String): util.List[Action] = {
val pattern = new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL)
val action = new Action(AclOperation.READ, pattern, 1, true, true)
Collections.singletonList(action)
}
def buildExpectedActionsGroup(topic: String): util.List[Action] = {
val pattern = new ResourcePattern(ResourceType.GROUP, topic, PatternType.LITERAL)
val action = new Action(AclOperation.DELETE, pattern, 1, true, true)
Collections.singletonList(action)
}
val groupId = "group"
val topicName1 = "topic-1"
val topicId1 = Uuid.randomUuid
val topicName2 = "topic-2"
val topicId2 = Uuid.randomUuid
metadataCache = new KRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
addTopicToMetadataCache(topicName1, 2, topicId = topicId1)
addTopicToMetadataCache(topicName2, 2, topicId = topicId2)
val deleteShareGroupOffsetsRequestTopic1 = new DeleteShareGroupOffsetsRequestTopic()
.setTopicName(topicName1)
.setPartitions(util.List.of(0, 1))
val deleteShareGroupOffsetsRequestTopic2 = new DeleteShareGroupOffsetsRequestTopic()
.setTopicName(topicName2)
.setPartitions(util.List.of(0, 1))
val deleteShareGroupOffsetsRequestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId(groupId)
.setTopics(util.List.of(deleteShareGroupOffsetsRequestTopic1, deleteShareGroupOffsetsRequestTopic2))
val deleteShareGroupOffsetsGroupCoordinatorRequestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId(groupId)
.setTopics(util.List.of(deleteShareGroupOffsetsRequestTopic2))
val requestChannelRequest = buildRequest(new DeleteShareGroupOffsetsRequest.Builder(deleteShareGroupOffsetsRequestData, true).build)
val resultFuture = new CompletableFuture[DeleteShareGroupOffsetsResponseData]
when(groupCoordinator.deleteShareGroupOffsets(
requestChannelRequest.context,
deleteShareGroupOffsetsGroupCoordinatorRequestData
)).thenReturn(resultFuture)
val authorizer: Authorizer = mock(classOf[Authorizer])
when(authorizer.authorize(any[RequestContext], ArgumentMatchers.eq(buildExpectedActionsGroup(groupId))))
.thenReturn(util.List.of(AuthorizationResult.ALLOWED))
when(authorizer.authorize(any[RequestContext], ArgumentMatchers.eq(buildExpectedActionsTopic(topicName1))))
.thenReturn(util.List.of(AuthorizationResult.DENIED))
when(authorizer.authorize(any[RequestContext], ArgumentMatchers.eq(buildExpectedActionsTopic(topicName2))))
.thenReturn(util.List.of(AuthorizationResult.ALLOWED))
kafkaApis = createKafkaApis(
overrideProperties = Map(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
authorizer = Some(authorizer)
)
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
val deleteShareGroupOffsetsResponseData = new DeleteShareGroupOffsetsResponseData()
.setErrorMessage(null)
.setErrorCode(Errors.NONE.code())
.setResponses(util.List.of(
new DeleteShareGroupOffsetsResponseTopic()
.setTopicName(topicName2)
.setTopicId(topicId2)
.setPartitions(util.List.of(
new DeleteShareGroupOffsetsResponsePartition()
.setPartitionIndex(0)
.setErrorMessage(null)
.setErrorCode(Errors.NONE.code()),
new DeleteShareGroupOffsetsResponsePartition()
.setPartitionIndex(1)
.setErrorMessage(null)
.setErrorCode(Errors.NONE.code())
))
))
val expectedResponseTopics: util.List[DeleteShareGroupOffsetsResponseTopic] = new util.ArrayList[DeleteShareGroupOffsetsResponseTopic]()
expectedResponseTopics.add(
new DeleteShareGroupOffsetsResponseTopic()
.setTopicName(topicName1)
.setPartitions(util.List.of(
new DeleteShareGroupOffsetsResponsePartition()
.setPartitionIndex(0)
.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
.setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message()),
new DeleteShareGroupOffsetsResponsePartition()
.setPartitionIndex(1)
.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
.setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message())
))
)
deleteShareGroupOffsetsResponseData.responses.forEach{ topic => {
expectedResponseTopics.add(topic)
}}
val expectedResponseData: DeleteShareGroupOffsetsResponseData = new DeleteShareGroupOffsetsResponseData()
.setErrorCode(Errors.NONE.code())
.setErrorMessage(null)
.setResponses(expectedResponseTopics)
resultFuture.complete(deleteShareGroupOffsetsResponseData)
val response = verifyNoThrottling[DeleteShareGroupOffsetsResponse](requestChannelRequest)
assertEquals(expectedResponseData, response.data)
}
@Test
def testDeleteShareGroupOffsetsRequestSuccess(): Unit = {
val topicName1 = "topic-1"
val topicId1 = Uuid.randomUuid
val topicName2 = "topic-2"
val topicId2 = Uuid.randomUuid
val topicName3 = "topic-3"
val topicId3 = Uuid.randomUuid
metadataCache = new KRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
addTopicToMetadataCache(topicName1, 1, topicId = topicId1)
addTopicToMetadataCache(topicName2, 2, topicId = topicId2)
addTopicToMetadataCache(topicName3, 3, topicId = topicId3)
val deleteShareGroupOffsetsRequestTopic1 = new DeleteShareGroupOffsetsRequestTopic()
.setTopicName(topicName1)
.setPartitions(util.List.of(0))
val deleteShareGroupOffsetsRequestTopic2 = new DeleteShareGroupOffsetsRequestTopic()
.setTopicName(topicName2)
.setPartitions(util.List.of(0, 1))
val deleteShareGroupOffsetsRequestTopic3 = new DeleteShareGroupOffsetsRequestTopic()
.setTopicName(topicName3)
.setPartitions(util.List.of(0, 1, 2))
val deleteShareGroupOffsetsRequestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId("group")
.setTopics(util.List.of(deleteShareGroupOffsetsRequestTopic1, deleteShareGroupOffsetsRequestTopic2, deleteShareGroupOffsetsRequestTopic3))
val requestChannelRequest = buildRequest(new DeleteShareGroupOffsetsRequest.Builder(deleteShareGroupOffsetsRequestData, true).build)
val resultFuture = new CompletableFuture[DeleteShareGroupOffsetsResponseData]
when(groupCoordinator.deleteShareGroupOffsets(
requestChannelRequest.context,
deleteShareGroupOffsetsRequestData
)).thenReturn(resultFuture)
kafkaApis = createKafkaApis(
overrideProperties = Map(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
)
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
val deleteShareGroupOffsetsResponseData = new DeleteShareGroupOffsetsResponseData()
.setErrorMessage(null)
.setErrorCode(Errors.NONE.code())
.setResponses(util.List.of(
new DeleteShareGroupOffsetsResponseTopic()
.setTopicName(topicName1)
.setTopicId(topicId1)
.setPartitions(util.List.of(
new DeleteShareGroupOffsetsResponsePartition()
.setPartitionIndex(0)
.setErrorMessage(null)
.setErrorCode(Errors.NONE.code())
)),
new DeleteShareGroupOffsetsResponseTopic()
.setTopicName(topicName2)
.setTopicId(topicId2)
.setPartitions(util.List.of(
new DeleteShareGroupOffsetsResponsePartition()
.setPartitionIndex(0)
.setErrorMessage(null)
.setErrorCode(Errors.NONE.code()),
new DeleteShareGroupOffsetsResponsePartition()
.setPartitionIndex(1)
.setErrorMessage(null)
.setErrorCode(Errors.NONE.code())
)),
new DeleteShareGroupOffsetsResponseTopic()
.setTopicName(topicName3)
.setTopicId(topicId3)
.setPartitions(util.List.of(
new DeleteShareGroupOffsetsResponsePartition()
.setPartitionIndex(0)
.setErrorMessage(null)
.setErrorCode(Errors.NONE.code()),
new DeleteShareGroupOffsetsResponsePartition()
.setPartitionIndex(1)
.setErrorMessage(null)
.setErrorCode(Errors.NONE.code()),
new DeleteShareGroupOffsetsResponsePartition()
.setPartitionIndex(2)
.setErrorMessage(null)
.setErrorCode(Errors.NONE.code())
))
))
resultFuture.complete(deleteShareGroupOffsetsResponseData)
val response = verifyNoThrottling[DeleteShareGroupOffsetsResponse](requestChannelRequest)
assertEquals(deleteShareGroupOffsetsResponseData, response.data)
}
@Test
def testDeleteShareGroupOffsetsRequestGroupCoordinatorThrowsError(): Unit = {
val topicName1 = "topic-1"
val topicId1 = Uuid.randomUuid
val topicName2 = "topic-2"
val topicId2 = Uuid.randomUuid
metadataCache = new KRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
addTopicToMetadataCache(topicName1, 1, topicId = topicId1)
addTopicToMetadataCache(topicName2, 2, topicId = topicId2)
val deleteShareGroupOffsetsRequestTopic1 = new DeleteShareGroupOffsetsRequestTopic()
.setTopicName(topicName1)
.setPartitions(util.List.of(0))
val deleteShareGroupOffsetsRequestTopic2 = new DeleteShareGroupOffsetsRequestTopic()
.setTopicName(topicName2)
.setPartitions(util.List.of(0, 1))
val deleteShareGroupOffsetsRequestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId("group")
.setTopics(util.List.of(deleteShareGroupOffsetsRequestTopic1, deleteShareGroupOffsetsRequestTopic2))
val requestChannelRequest = buildRequest(new DeleteShareGroupOffsetsRequest.Builder(deleteShareGroupOffsetsRequestData, true).build)
when(groupCoordinator.deleteShareGroupOffsets(
requestChannelRequest.context,
deleteShareGroupOffsetsRequestData
)).thenReturn(CompletableFuture.failedFuture(Errors.UNKNOWN_SERVER_ERROR.exception))
kafkaApis = createKafkaApis(
overrideProperties = Map(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
)
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
val deleteShareGroupOffsetsResponseData = new DeleteShareGroupOffsetsResponseData()
.setErrorMessage(Errors.UNKNOWN_SERVER_ERROR.message())
.setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
val response = verifyNoThrottling[DeleteShareGroupOffsetsResponse](requestChannelRequest)
assertEquals(deleteShareGroupOffsetsResponseData, response.data)
}
@Test
def testDeleteShareGroupOffsetsRequestGroupCoordinatorErrorResponse(): Unit = {
val topicName1 = "topic-1"
val topicId1 = Uuid.randomUuid
val topicName2 = "topic-2"
val topicId2 = Uuid.randomUuid
metadataCache = new KRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
addTopicToMetadataCache(topicName1, 1, topicId = topicId1)
addTopicToMetadataCache(topicName2, 2, topicId = topicId2)
val deleteShareGroupOffsetsRequestTopic1 = new DeleteShareGroupOffsetsRequestTopic()
.setTopicName(topicName1)
.setPartitions(util.List.of(0))
val deleteShareGroupOffsetsRequestTopic2 = new DeleteShareGroupOffsetsRequestTopic()
.setTopicName(topicName2)
.setPartitions(util.List.of(0, 1))
val deleteShareGroupOffsetsRequestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId("group")
.setTopics(util.List.of(deleteShareGroupOffsetsRequestTopic1, deleteShareGroupOffsetsRequestTopic2))
val requestChannelRequest = buildRequest(new DeleteShareGroupOffsetsRequest.Builder(deleteShareGroupOffsetsRequestData, true).build)
val groupCoordinatorResponse: DeleteShareGroupOffsetsResponseData = new DeleteShareGroupOffsetsResponseData()
.setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
.setErrorMessage(Errors.UNKNOWN_SERVER_ERROR.message())
when(groupCoordinator.deleteShareGroupOffsets(
requestChannelRequest.context,
deleteShareGroupOffsetsRequestData
)).thenReturn(CompletableFuture.completedFuture(groupCoordinatorResponse))
kafkaApis = createKafkaApis(
overrideProperties = Map(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
)
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
val deleteShareGroupOffsetsResponseData = new DeleteShareGroupOffsetsResponseData()
.setErrorMessage(Errors.UNKNOWN_SERVER_ERROR.message())
.setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
val response = verifyNoThrottling[DeleteShareGroupOffsetsResponse](requestChannelRequest)
assertEquals(deleteShareGroupOffsetsResponseData, response.data)
}
@Test
def testDeleteShareGroupOffsetsRequestEmptyTopicsSuccess(): Unit = {
metadataCache = new KRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
val deleteShareGroupOffsetsRequest = new DeleteShareGroupOffsetsRequestData()
.setGroupId("group")
val requestChannelRequest = buildRequest(new DeleteShareGroupOffsetsRequest.Builder(deleteShareGroupOffsetsRequest, true).build)
val resultFuture = new CompletableFuture[DeleteShareGroupOffsetsResponseData]
kafkaApis = createKafkaApis(
overrideProperties = Map(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
)
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
val deleteShareGroupOffsetsResponse = new DeleteShareGroupOffsetsResponseData()
resultFuture.complete(deleteShareGroupOffsetsResponse)
val response = verifyNoThrottling[DeleteShareGroupOffsetsResponse](requestChannelRequest)
assertEquals(deleteShareGroupOffsetsResponse, response.data)
}
@Test @Test
def testWriteShareGroupStateSuccess(): Unit = { def testWriteShareGroupStateSuccess(): Unit = {
val topicId = Uuid.randomUuid(); val topicId = Uuid.randomUuid();

View File

@ -21,6 +21,8 @@ import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.message.DeleteGroupsResponseData; import org.apache.kafka.common.message.DeleteGroupsResponseData;
import org.apache.kafka.common.message.DeleteShareGroupOffsetsRequestData;
import org.apache.kafka.common.message.DeleteShareGroupOffsetsResponseData;
import org.apache.kafka.common.message.DescribeGroupsResponseData; import org.apache.kafka.common.message.DescribeGroupsResponseData;
import org.apache.kafka.common.message.DescribeShareGroupOffsetsRequestData; import org.apache.kafka.common.message.DescribeShareGroupOffsetsRequestData;
import org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData; import org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData;
@ -311,6 +313,20 @@ public interface GroupCoordinator {
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup request DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup request
); );
/**
* Delete the Share Group Offsets for a given group.
*
* @param context The request context
* @param request The DeleteShareGroupOffsetsRequestGroup request.
*
* @return A future yielding the results.
* The error codes of the response are set to indicate the errors occurred during the execution.
*/
CompletableFuture<DeleteShareGroupOffsetsResponseData> deleteShareGroupOffsets(
RequestContext context,
DeleteShareGroupOffsetsRequestData request
);
/** /**
* Commit offsets for a given Group. * Commit offsets for a given Group.
* *

View File

@ -26,6 +26,9 @@ import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.message.DeleteGroupsResponseData; import org.apache.kafka.common.message.DeleteGroupsResponseData;
import org.apache.kafka.common.message.DeleteShareGroupOffsetsRequestData;
import org.apache.kafka.common.message.DeleteShareGroupOffsetsResponseData;
import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
import org.apache.kafka.common.message.DescribeGroupsResponseData; import org.apache.kafka.common.message.DescribeGroupsResponseData;
import org.apache.kafka.common.message.DescribeShareGroupOffsetsRequestData; import org.apache.kafka.common.message.DescribeShareGroupOffsetsRequestData;
import org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData; import org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData;
@ -59,6 +62,7 @@ import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiError; import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.requests.ConsumerGroupDescribeRequest; import org.apache.kafka.common.requests.ConsumerGroupDescribeRequest;
import org.apache.kafka.common.requests.DeleteGroupsRequest; import org.apache.kafka.common.requests.DeleteGroupsRequest;
import org.apache.kafka.common.requests.DeleteShareGroupOffsetsRequest;
import org.apache.kafka.common.requests.DescribeGroupsRequest; import org.apache.kafka.common.requests.DescribeGroupsRequest;
import org.apache.kafka.common.requests.DescribeShareGroupOffsetsRequest; import org.apache.kafka.common.requests.DescribeShareGroupOffsetsRequest;
import org.apache.kafka.common.requests.OffsetCommitRequest; import org.apache.kafka.common.requests.OffsetCommitRequest;
@ -932,7 +936,8 @@ public class GroupCoordinatorService implements GroupCoordinator {
@Override @Override
public CompletableFuture<List<DescribedGroup>> shareGroupDescribe( public CompletableFuture<List<DescribedGroup>> shareGroupDescribe(
RequestContext context, RequestContext context,
List<String> groupIds) { List<String> groupIds
) {
if (!isActive.get()) { if (!isActive.get()) {
return CompletableFuture.completedFuture(ShareGroupDescribeRequest.getErrorDescribedGroupList( return CompletableFuture.completedFuture(ShareGroupDescribeRequest.getErrorDescribedGroupList(
groupIds, groupIds,
@ -1243,6 +1248,48 @@ public class GroupCoordinatorService implements GroupCoordinator {
}); });
} }
private void populateDeleteShareGroupOffsetsFuture(
DeleteShareGroupOffsetsRequestData requestData,
CompletableFuture<DeleteShareGroupOffsetsResponseData> future,
Map<Uuid, String> requestTopicIdToNameMapping,
List<DeleteShareGroupStateRequestData.DeleteStateData> deleteShareGroupStateRequestTopicsData,
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> deleteShareGroupOffsetsResponseTopicList
) {
DeleteShareGroupStateRequestData deleteShareGroupStateRequestData = new DeleteShareGroupStateRequestData()
.setGroupId(requestData.groupId())
.setTopics(deleteShareGroupStateRequestTopicsData);
persister.deleteState(DeleteShareGroupStateParameters.from(deleteShareGroupStateRequestData))
.whenComplete((result, error) -> {
if (error != null) {
log.error("Failed to delete share group state");
future.completeExceptionally(error);
return;
}
if (result == null || result.topicsData() == null) {
log.error("Result is null for the delete share group state");
future.completeExceptionally(new IllegalStateException("Result is null for the delete share group state"));
return;
}
result.topicsData().forEach(topicData ->
deleteShareGroupOffsetsResponseTopicList.add(new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
.setTopicId(topicData.topicId())
.setTopicName(requestTopicIdToNameMapping.get(topicData.topicId()))
.setPartitions(topicData.partitions().stream().map(
partitionData -> new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
.setPartitionIndex(partitionData.partition())
.setErrorMessage(partitionData.errorCode() == Errors.NONE.code() ? null : Errors.forCode(partitionData.errorCode()).message())
.setErrorCode(partitionData.errorCode())
).toList())
));
future.complete(
new DeleteShareGroupOffsetsResponseData()
.setResponses(deleteShareGroupOffsetsResponseTopicList));
});
}
/** /**
* See {@link GroupCoordinator#fetchOffsets(RequestContext, OffsetFetchRequestData.OffsetFetchRequestGroup, boolean)}. * See {@link GroupCoordinator#fetchOffsets(RequestContext, OffsetFetchRequestData.OffsetFetchRequestGroup, boolean)}.
*/ */
@ -1508,6 +1555,110 @@ public class GroupCoordinatorService implements GroupCoordinator {
return future; return future;
} }
/**
* See {@link GroupCoordinator#deleteShareGroupOffsets(RequestContext, DeleteShareGroupOffsetsRequestData)}.
*/
@Override
public CompletableFuture<DeleteShareGroupOffsetsResponseData> deleteShareGroupOffsets(
RequestContext context,
DeleteShareGroupOffsetsRequestData requestData
) {
if (!isActive.get()) {
return CompletableFuture.completedFuture(
DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.COORDINATOR_NOT_AVAILABLE));
}
if (metadataImage == null) {
return CompletableFuture.completedFuture(
DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.COORDINATOR_NOT_AVAILABLE));
}
String groupId = requestData.groupId();
if (!isGroupIdNotEmpty(groupId)) {
return CompletableFuture.completedFuture(
DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.INVALID_GROUP_ID));
}
Map<Uuid, String> requestTopicIdToNameMapping = new HashMap<>();
List<DeleteShareGroupStateRequestData.DeleteStateData> deleteShareGroupStateRequestTopicsData = new ArrayList<>(requestData.topics().size());
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> deleteShareGroupOffsetsResponseTopicList = new ArrayList<>(requestData.topics().size());
requestData.topics().forEach(topic -> {
Uuid topicId = metadataImage.topics().topicNameToIdView().get(topic.topicName());
if (topicId != null) {
requestTopicIdToNameMapping.put(topicId, topic.topicName());
deleteShareGroupStateRequestTopicsData.add(new DeleteShareGroupStateRequestData.DeleteStateData()
.setTopicId(topicId)
.setPartitions(
topic.partitions().stream().map(
partitionIndex -> new DeleteShareGroupStateRequestData.PartitionData().setPartition(partitionIndex)
).toList()
));
} else {
deleteShareGroupOffsetsResponseTopicList.add(new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
.setTopicName(topic.topicName())
.setPartitions(topic.partitions().stream().map(
partition -> new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
.setPartitionIndex(partition)
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())
).toList()));
}
});
// If the request for the persister is empty, just complete the operation right away.
if (deleteShareGroupStateRequestTopicsData.isEmpty()) {
return CompletableFuture.completedFuture(
new DeleteShareGroupOffsetsResponseData()
.setResponses(deleteShareGroupOffsetsResponseTopicList));
}
CompletableFuture<DeleteShareGroupOffsetsResponseData> future = new CompletableFuture<>();
TopicPartition topicPartition = topicPartitionFor(groupId);
// This is done to make sure the provided group is empty. Offsets can be deleted only for an empty share group.
CompletableFuture<List<ShareGroupDescribeResponseData.DescribedGroup>> describeGroupFuture =
runtime.scheduleReadOperation(
"share-group-describe",
topicPartition,
(coordinator, lastCommittedOffset) -> coordinator.shareGroupDescribe(List.of(groupId), lastCommittedOffset)
).exceptionally(exception -> handleOperationException(
"share-group-describe",
List.of(groupId),
exception,
(error, __) -> ShareGroupDescribeRequest.getErrorDescribedGroupList(List.of(groupId), error),
log
));
describeGroupFuture.whenComplete((groups, throwable) -> {
if (throwable != null) {
log.error("Failed to describe the share group {}", groupId, throwable);
future.complete(DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.forException(throwable)));
} else if (groups == null || groups.isEmpty()) {
log.error("Describe share group resulted in empty response for group {}", groupId);
future.complete(DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.GROUP_ID_NOT_FOUND));
} else if (groups.get(0).errorCode() != Errors.NONE.code()) {
log.error("Failed to describe the share group {}", groupId);
future.complete(DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(groups.get(0).errorCode(), groups.get(0).errorMessage()));
} else if (groups.get(0).members() != null && !groups.get(0).members().isEmpty()) {
log.error("Provided group {} is not empty", groupId);
future.complete(DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.NON_EMPTY_GROUP));
} else {
populateDeleteShareGroupOffsetsFuture(
requestData,
future,
requestTopicIdToNameMapping,
deleteShareGroupStateRequestTopicsData,
deleteShareGroupOffsetsResponseTopicList
);
}
});
return future;
}
/** /**
* See {@link GroupCoordinator#commitOffsets(RequestContext, OffsetCommitRequestData, BufferSupplier)}. * See {@link GroupCoordinator#commitOffsets(RequestContext, OffsetCommitRequestData, BufferSupplier)}.
*/ */

View File

@ -42,6 +42,10 @@ import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.message.DeleteGroupsResponseData; import org.apache.kafka.common.message.DeleteGroupsResponseData;
import org.apache.kafka.common.message.DeleteShareGroupOffsetsRequestData;
import org.apache.kafka.common.message.DeleteShareGroupOffsetsResponseData;
import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
import org.apache.kafka.common.message.DeleteShareGroupStateResponseData;
import org.apache.kafka.common.message.DescribeGroupsResponseData; import org.apache.kafka.common.message.DescribeGroupsResponseData;
import org.apache.kafka.common.message.DescribeShareGroupOffsetsRequestData; import org.apache.kafka.common.message.DescribeShareGroupOffsetsRequestData;
import org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData; import org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData;
@ -117,6 +121,7 @@ import org.mockito.ArgumentMatchers;
import java.net.InetAddress; import java.net.InetAddress;
import java.time.Duration; import java.time.Duration;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
@ -2789,6 +2794,7 @@ public class GroupCoordinatorServiceTest {
List.of(new ShareGroupDescribeResponseData.DescribedGroup() List.of(new ShareGroupDescribeResponseData.DescribedGroup()
.setGroupId("share-group-id") .setGroupId("share-group-id")
.setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()) .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code())
.setErrorMessage(Errors.COORDINATOR_LOAD_IN_PROGRESS.message())
), ),
future.get() future.get()
); );
@ -2816,6 +2822,7 @@ public class GroupCoordinatorServiceTest {
List.of(new ShareGroupDescribeResponseData.DescribedGroup() List.of(new ShareGroupDescribeResponseData.DescribedGroup()
.setGroupId("share-group-id") .setGroupId("share-group-id")
.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()) .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
.setErrorMessage(Errors.COORDINATOR_NOT_AVAILABLE.message())
), ),
future.get() future.get()
); );
@ -3282,6 +3289,531 @@ public class GroupCoordinatorServiceTest {
assertEquals(responseData, future.get()); assertEquals(responseData, future.get());
} }
@Test
public void testDeleteShareGroupOffsetsWithNoOpPersister() throws InterruptedException, ExecutionException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
.setConfig(createConfig())
.setRuntime(runtime)
.build(true);
service.startup(() -> 1);
int partition = 1;
DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId("share-group-id")
.setTopics(List.of(new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(TOPIC_NAME)
.setPartitions(List.of(partition))
));
DeleteShareGroupOffsetsResponseData responseData = new DeleteShareGroupOffsetsResponseData()
.setResponses(
List.of(new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
.setTopicName(TOPIC_NAME)
.setTopicId(TOPIC_ID)
.setPartitions(List.of(new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
.setPartitionIndex(partition)
.setErrorCode(PartitionFactory.DEFAULT_ERROR_CODE)
.setErrorMessage(null))))
);
ShareGroupDescribeResponseData.DescribedGroup describedGroup = new ShareGroupDescribeResponseData.DescribedGroup()
.setGroupId("share-group-id-1");
when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("share-group-describe"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(List.of(describedGroup)));
CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
service.deleteShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS), requestData);
assertEquals(responseData, future.get());
}
@Test
public void testDeleteShareGroupOffsetsWithDefaultPersister() throws InterruptedException, ExecutionException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
Persister persister = mock(DefaultStatePersister.class);
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
.setConfig(createConfig())
.setRuntime(runtime)
.setPersister(persister)
.build(true);
service.startup(() -> 1);
int partition = 1;
DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId("share-group-id")
.setTopics(List.of(new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(TOPIC_NAME)
.setPartitions(List.of(partition))
));
DeleteShareGroupStateRequestData deleteShareGroupStateRequestData = new DeleteShareGroupStateRequestData()
.setGroupId("share-group-id")
.setTopics(List.of(new DeleteShareGroupStateRequestData.DeleteStateData()
.setTopicId(TOPIC_ID)
.setPartitions(List.of(new DeleteShareGroupStateRequestData.PartitionData()
.setPartition(partition)))));
DeleteShareGroupOffsetsResponseData responseData = new DeleteShareGroupOffsetsResponseData()
.setResponses(
List.of(new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
.setTopicName(TOPIC_NAME)
.setTopicId(TOPIC_ID)
.setPartitions(List.of(new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
.setPartitionIndex(partition)
.setErrorCode(Errors.NONE.code())
.setErrorMessage(null))))
);
DeleteShareGroupStateResponseData deleteShareGroupStateResponseData = new DeleteShareGroupStateResponseData()
.setResults(
List.of(new DeleteShareGroupStateResponseData.DeleteStateResult()
.setTopicId(TOPIC_ID)
.setPartitions(List.of(new DeleteShareGroupStateResponseData.PartitionResult()
.setPartition(partition)
.setErrorCode(Errors.NONE.code())
.setErrorMessage(null)))
)
);
ShareGroupDescribeResponseData.DescribedGroup describedGroup = new ShareGroupDescribeResponseData.DescribedGroup()
.setGroupId("share-group-id-1");
when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("share-group-describe"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(List.of(describedGroup)));
DeleteShareGroupStateParameters deleteShareGroupStateParameters = DeleteShareGroupStateParameters.from(deleteShareGroupStateRequestData);
DeleteShareGroupStateResult deleteShareGroupStateResult = DeleteShareGroupStateResult.from(deleteShareGroupStateResponseData);
when(persister.deleteState(
ArgumentMatchers.eq(deleteShareGroupStateParameters)
)).thenReturn(CompletableFuture.completedFuture(deleteShareGroupStateResult));
CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS), requestData);
assertEquals(responseData, future.get());
}
@Test
public void testDeleteShareGroupOffsetsNonexistentTopicWithDefaultPersister() throws InterruptedException, ExecutionException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
Persister persister = mock(DefaultStatePersister.class);
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
.setConfig(createConfig())
.setRuntime(runtime)
.setPersister(persister)
.build(true);
service.startup(() -> 1);
int partition = 1;
DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId("share-group-id")
.setTopics(List.of(new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName("badtopic")
.setPartitions(List.of(partition))
));
DeleteShareGroupOffsetsResponseData responseData = new DeleteShareGroupOffsetsResponseData()
.setResponses(
List.of(new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
.setTopicName("badtopic")
.setPartitions(List.of(new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
.setPartitionIndex(partition)
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message()))))
);
CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS), requestData);
assertEquals(responseData, future.get());
}
@Test
public void testDeleteShareGroupOffsetsWithDefaultPersisterThrowsError() {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
Persister persister = mock(DefaultStatePersister.class);
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
.setConfig(createConfig())
.setRuntime(runtime)
.setPersister(persister)
.build(true);
service.startup(() -> 1);
int partition = 1;
DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId("share-group-id")
.setTopics(List.of(new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(TOPIC_NAME)
.setPartitions(List.of(partition))
));
ShareGroupDescribeResponseData.DescribedGroup describedGroup = new ShareGroupDescribeResponseData.DescribedGroup()
.setGroupId("share-group-id-1");
when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("share-group-describe"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(List.of(describedGroup)));
when(persister.deleteState(ArgumentMatchers.any()))
.thenReturn(CompletableFuture.failedFuture(new Exception("Unable to validate delete share group state request")));
CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS), requestData);
assertFutureThrows(Exception.class, future, "Unable to validate delete share group state request");
}
@Test
public void testDeleteShareGroupOffsetsWithDefaultPersisterNullResult() {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
Persister persister = mock(DefaultStatePersister.class);
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
.setConfig(createConfig())
.setRuntime(runtime)
.setPersister(persister)
.build(true);
service.startup(() -> 1);
int partition = 1;
DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId("share-group-id")
.setTopics(List.of(new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(TOPIC_NAME)
.setPartitions(List.of(partition))
));
ShareGroupDescribeResponseData.DescribedGroup describedGroup = new ShareGroupDescribeResponseData.DescribedGroup()
.setGroupId("share-group-id-1");
when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("share-group-describe"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(List.of(describedGroup)));
when(persister.deleteState(ArgumentMatchers.any()))
.thenReturn(CompletableFuture.completedFuture(null));
CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS), requestData);
assertFutureThrows(IllegalStateException.class, future, "Result is null for the delete share group state");
}
@Test
public void testDeleteShareGroupOffsetsWithDefaultPersisterNullTopicData() {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
Persister persister = mock(DefaultStatePersister.class);
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
.setConfig(createConfig())
.setRuntime(runtime)
.setPersister(persister)
.build(true);
service.startup(() -> 1);
int partition = 1;
DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId("share-group-id")
.setTopics(List.of(new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(TOPIC_NAME)
.setPartitions(List.of(partition))
));
ShareGroupDescribeResponseData.DescribedGroup describedGroup = new ShareGroupDescribeResponseData.DescribedGroup()
.setGroupId("share-group-id-1");
when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("share-group-describe"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(List.of(describedGroup)));
DeleteShareGroupStateResult deleteShareGroupStateResult =
new DeleteShareGroupStateResult.Builder().setTopicsData(null).build();
when(persister.deleteState(ArgumentMatchers.any()))
.thenReturn(CompletableFuture.completedFuture(deleteShareGroupStateResult));
CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS), requestData);
assertFutureThrows(IllegalStateException.class, future, "Result is null for the delete share group state");
}
@Test
public void testDeleteShareGroupOffsetsCoordinatorNotActive() throws ExecutionException, InterruptedException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
.setConfig(createConfig())
.setRuntime(runtime)
.build();
int partition = 1;
DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId("share-group-id")
.setTopics(List.of(new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(TOPIC_NAME)
.setPartitions(List.of(partition))
));
DeleteShareGroupOffsetsResponseData responseData = new DeleteShareGroupOffsetsResponseData()
.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
.setErrorMessage(Errors.COORDINATOR_NOT_AVAILABLE.message());
CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS), requestData);
assertEquals(responseData, future.get());
}
@Test
public void testDeleteShareGroupOffsetsMetadataImageNull() throws ExecutionException, InterruptedException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
.setConfig(createConfig())
.setRuntime(runtime)
.build(true);
// Forcing a null Metadata Image
service.onNewMetadataImage(null, null);
int partition = 1;
DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId("share-group-id")
.setTopics(List.of(new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(TOPIC_NAME)
.setPartitions(List.of(partition))
));
DeleteShareGroupOffsetsResponseData responseData = new DeleteShareGroupOffsetsResponseData()
.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
.setErrorMessage(Errors.COORDINATOR_NOT_AVAILABLE.message());
CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS), requestData);
assertEquals(responseData, future.get());
}
@Test
public void testDeleteShareGroupOffsetsInvalidGroupId() throws InterruptedException, ExecutionException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
Persister persister = mock(DefaultStatePersister.class);
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
.setConfig(createConfig())
.setRuntime(runtime)
.setPersister(persister)
.build(true);
service.startup(() -> 1);
int partition = 1;
DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId("")
.setTopics(List.of(new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(TOPIC_NAME)
.setPartitions(List.of(partition))
));
DeleteShareGroupOffsetsResponseData responseData = new DeleteShareGroupOffsetsResponseData()
.setErrorCode(Errors.INVALID_GROUP_ID.code())
.setErrorMessage(Errors.INVALID_GROUP_ID.message());
CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS), requestData);
assertEquals(responseData, future.get());
}
@Test
public void testDeleteShareGroupOffsetsDescribeThrowsError() throws InterruptedException, ExecutionException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
Persister persister = mock(DefaultStatePersister.class);
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
.setConfig(createConfig())
.setRuntime(runtime)
.setPersister(persister)
.build(true);
service.startup(() -> 1);
int partition = 1;
DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId("share-group-id")
.setTopics(List.of(new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(TOPIC_NAME)
.setPartitions(List.of(partition))
));
DeleteShareGroupOffsetsResponseData responseData = new DeleteShareGroupOffsetsResponseData()
.setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
.setErrorMessage(Errors.UNKNOWN_SERVER_ERROR.message());
when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("share-group-describe"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.any()
)).thenReturn(FutureUtils.failedFuture(Errors.UNKNOWN_SERVER_ERROR.exception()));
CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS), requestData);
assertEquals(responseData, future.get());
}
@Test
public void testDeleteShareGroupOffsetsDescribeReturnsNull() throws InterruptedException, ExecutionException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
Persister persister = mock(DefaultStatePersister.class);
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
.setConfig(createConfig())
.setRuntime(runtime)
.setPersister(persister)
.build(true);
service.startup(() -> 1);
int partition = 1;
DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId("share-group-id")
.setTopics(List.of(new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(TOPIC_NAME)
.setPartitions(List.of(partition))
));
DeleteShareGroupOffsetsResponseData responseData = new DeleteShareGroupOffsetsResponseData()
.setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
.setErrorMessage(Errors.GROUP_ID_NOT_FOUND.message());
when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("share-group-describe"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(null));
CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS), requestData);
assertEquals(responseData, future.get());
}
@Test
public void testDeleteShareGroupOffsetsDescribeReturnsEmpty() throws InterruptedException, ExecutionException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
Persister persister = mock(DefaultStatePersister.class);
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
.setConfig(createConfig())
.setRuntime(runtime)
.setPersister(persister)
.build(true);
service.startup(() -> 1);
int partition = 1;
DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId("share-group-id")
.setTopics(List.of(new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(TOPIC_NAME)
.setPartitions(List.of(partition))
));
DeleteShareGroupOffsetsResponseData responseData = new DeleteShareGroupOffsetsResponseData()
.setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
.setErrorMessage(Errors.GROUP_ID_NOT_FOUND.message());
when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("share-group-describe"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(Collections.emptyList()));
CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS), requestData);
assertEquals(responseData, future.get());
}
@Test
public void testDeleteShareGroupOffsetsDescribeReturnsError() throws InterruptedException, ExecutionException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
Persister persister = mock(DefaultStatePersister.class);
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
.setConfig(createConfig())
.setRuntime(runtime)
.setPersister(persister)
.build(true);
service.startup(() -> 1);
int partition = 1;
DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId("share-group-id")
.setTopics(List.of(new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(TOPIC_NAME)
.setPartitions(List.of(partition))
));
DeleteShareGroupOffsetsResponseData responseData = new DeleteShareGroupOffsetsResponseData()
.setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
.setErrorMessage(Errors.GROUP_ID_NOT_FOUND.message());
ShareGroupDescribeResponseData.DescribedGroup describedGroup = new ShareGroupDescribeResponseData.DescribedGroup()
.setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
.setErrorMessage(Errors.GROUP_ID_NOT_FOUND.message());
when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("share-group-describe"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(List.of(describedGroup)));
CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS), requestData);
assertEquals(responseData, future.get());
}
@Test
public void testDeleteShareGroupOffsetsGroupIsNotEmpty() throws InterruptedException, ExecutionException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
Persister persister = mock(DefaultStatePersister.class);
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
.setConfig(createConfig())
.setRuntime(runtime)
.setPersister(persister)
.build(true);
service.startup(() -> 1);
int partition = 1;
DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId("share-group-id")
.setTopics(List.of(new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(TOPIC_NAME)
.setPartitions(List.of(partition))
));
DeleteShareGroupOffsetsResponseData responseData = new DeleteShareGroupOffsetsResponseData()
.setErrorCode(Errors.NON_EMPTY_GROUP.code())
.setErrorMessage(Errors.NON_EMPTY_GROUP.message());
ShareGroupDescribeResponseData.DescribedGroup describedGroup = new ShareGroupDescribeResponseData.DescribedGroup()
.setGroupId("share-group-id-1")
.setMembers(List.of(new ShareGroupDescribeResponseData.Member()));
when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("share-group-describe"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(List.of(describedGroup)));
CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS), requestData);
assertEquals(responseData, future.get());
}
@Test @Test
public void testPersisterInitializeSuccess() { public void testPersisterInitializeSuccess() {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime(); CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();

View File

@ -20,6 +20,7 @@ package org.apache.kafka.server.share.persister;
import org.apache.kafka.common.message.DeleteShareGroupStateRequestData; import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
import java.util.List; import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
@ -64,4 +65,16 @@ public class DeleteShareGroupStateParameters implements PersisterParameters {
return new DeleteShareGroupStateParameters(groupTopicPartitionData); return new DeleteShareGroupStateParameters(groupTopicPartitionData);
} }
} }
@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
DeleteShareGroupStateParameters that = (DeleteShareGroupStateParameters) o;
return Objects.equals(groupTopicPartitionData, that.groupTopicPartitionData);
}
@Override
public int hashCode() {
return Objects.hashCode(groupTopicPartitionData);
}
} }

View File

@ -57,6 +57,8 @@ import org.apache.kafka.clients.admin.DeleteConsumerGroupsOptions;
import org.apache.kafka.clients.admin.DeleteConsumerGroupsResult; import org.apache.kafka.clients.admin.DeleteConsumerGroupsResult;
import org.apache.kafka.clients.admin.DeleteRecordsOptions; import org.apache.kafka.clients.admin.DeleteRecordsOptions;
import org.apache.kafka.clients.admin.DeleteRecordsResult; import org.apache.kafka.clients.admin.DeleteRecordsResult;
import org.apache.kafka.clients.admin.DeleteShareGroupOffsetsOptions;
import org.apache.kafka.clients.admin.DeleteShareGroupOffsetsResult;
import org.apache.kafka.clients.admin.DeleteShareGroupsOptions; import org.apache.kafka.clients.admin.DeleteShareGroupsOptions;
import org.apache.kafka.clients.admin.DeleteShareGroupsResult; import org.apache.kafka.clients.admin.DeleteShareGroupsResult;
import org.apache.kafka.clients.admin.DeleteStreamsGroupOffsetsOptions; import org.apache.kafka.clients.admin.DeleteStreamsGroupOffsetsOptions;
@ -473,6 +475,11 @@ public class TestingMetricsInterceptingAdminClient extends AdminClient {
return adminDelegate.listShareGroupOffsets(groupSpecs, options); return adminDelegate.listShareGroupOffsets(groupSpecs, options);
} }
@Override
public DeleteShareGroupOffsetsResult deleteShareGroupOffsets(final String groupId, final Set<TopicPartition> partitions, final DeleteShareGroupOffsetsOptions options) {
return adminDelegate.deleteShareGroupOffsets(groupId, partitions, options);
}
@Override @Override
public DeleteShareGroupsResult deleteShareGroups(final Collection<String> groupIds, final DeleteShareGroupsOptions options) { public DeleteShareGroupsResult deleteShareGroups(final Collection<String> groupIds, final DeleteShareGroupsOptions options) {
return adminDelegate.deleteShareGroups(groupIds, options); return adminDelegate.deleteShareGroups(groupIds, options);