KAFKA-16718-3/n: Added the ShareGroupStatePartitionMetadata record during deletion of share group offsets (#19478)

This is a follow up PR for implementation of DeleteShareGroupOffsets
RPC. This PR adds the ShareGroupStatePartitionMetadata record to
__consumer__offsets topic to make sure the topic is removed from the
initializedTopics list. This PR also removes partitions from the request
and response schemas for DeleteShareGroupState RPC

Reviewers: Sushant Mahajan <smahajan@confluent.io>, Andrew Schofield <aschofield@confluent.io>
This commit is contained in:
Chirag Wadhwa 2025-04-26 02:31:48 +05:30 committed by GitHub
parent 6462f7a0e2
commit 2f9c2dd828
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
21 changed files with 1346 additions and 608 deletions

View File

@ -1951,28 +1951,28 @@ public interface Admin extends AutoCloseable {
}
/**
* Delete offsets for a set of partitions in a share group.
* Delete offsets for a set of topics in a share group.
*
* @param groupId The group for which to delete offsets.
* @param partitions The topic-partitions.
* @param topics The topics for which to delete offsets.
* @param options The options to use when deleting offsets in a share group.
* @return The DeleteShareGroupOffsetsResult.
*/
DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId, Set<TopicPartition> partitions, DeleteShareGroupOffsetsOptions options);
DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId, Set<String> topics, DeleteShareGroupOffsetsOptions options);
/**
* Delete offsets for a set of partitions in a share group with the default options.
* Delete offsets for a set of topics in a share group with the default options.
*
* <p>
* This is a convenience method for {@link #deleteShareGroupOffsets(String, Set, DeleteShareGroupOffsetsOptions)} with default options.
* See the overload for more details.
*
* @param groupId The group for which to delete offsets.
* @param partitions The topic-partitions.
* @param topics The topics for which to delete offsets.
* @return The DeleteShareGroupOffsetsResult.
*/
default DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId, Set<TopicPartition> partitions) {
return deleteShareGroupOffsets(groupId, partitions, new DeleteShareGroupOffsetsOptions());
default DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId, Set<String> topics) {
return deleteShareGroupOffsets(groupId, topics, new DeleteShareGroupOffsetsOptions());
}
/**

View File

@ -17,7 +17,6 @@
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
@ -33,27 +32,27 @@ import java.util.Set;
@InterfaceStability.Evolving
public class DeleteShareGroupOffsetsResult {
private final KafkaFuture<Map<TopicPartition, ApiException>> future;
private final Set<TopicPartition> partitions;
private final KafkaFuture<Map<String, ApiException>> future;
private final Set<String> topics;
DeleteShareGroupOffsetsResult(KafkaFuture<Map<TopicPartition, ApiException>> future, Set<TopicPartition> partitions) {
DeleteShareGroupOffsetsResult(KafkaFuture<Map<String, ApiException>> future, Set<String> topics) {
this.future = future;
this.partitions = partitions;
this.topics = topics;
}
/**
* Return a future which succeeds only if all the deletions succeed.
* If not, the first partition error shall be returned.
* If not, the first topic error shall be returned.
*/
public KafkaFuture<Void> all() {
final KafkaFutureImpl<Void> result = new KafkaFutureImpl<>();
this.future.whenComplete((topicPartitions, throwable) -> {
this.future.whenComplete((topicResults, throwable) -> {
if (throwable != null) {
result.completeExceptionally(throwable);
} else {
for (TopicPartition partition : partitions) {
if (maybeCompleteExceptionally(topicPartitions, partition, result)) {
for (String topic : topics) {
if (maybeCompleteExceptionally(topicResults, topic, result)) {
return;
}
}
@ -64,32 +63,32 @@ public class DeleteShareGroupOffsetsResult {
}
/**
* Return a future which can be used to check the result for a given partition.
* Return a future which can be used to check the result for a given topic.
*/
public KafkaFuture<Void> partitionResult(final TopicPartition partition) {
if (!partitions.contains(partition)) {
throw new IllegalArgumentException("Partition " + partition + " was not included in the original request");
public KafkaFuture<Void> topicResult(final String topic) {
if (!topics.contains(topic)) {
throw new IllegalArgumentException("Topic " + topic + " was not included in the original request");
}
final KafkaFutureImpl<Void> result = new KafkaFutureImpl<>();
this.future.whenComplete((topicPartitions, throwable) -> {
this.future.whenComplete((topicResults, throwable) -> {
if (throwable != null) {
result.completeExceptionally(throwable);
} else if (!maybeCompleteExceptionally(topicPartitions, partition, result)) {
} else if (!maybeCompleteExceptionally(topicResults, topic, result)) {
result.complete(null);
}
});
return result;
}
private boolean maybeCompleteExceptionally(Map<TopicPartition, ApiException> partitionLevelErrors,
TopicPartition partition,
private boolean maybeCompleteExceptionally(Map<String, ApiException> topicLevelErrors,
String topic,
KafkaFutureImpl<Void> result) {
Throwable exception;
if (!partitionLevelErrors.containsKey(partition)) {
exception = new IllegalArgumentException("Offset deletion result for partition \"" + partition + "\" was not included in the response");
if (!topicLevelErrors.containsKey(topic)) {
exception = new IllegalArgumentException("Offset deletion result for topic \"" + topic + "\" was not included in the response");
} else {
exception = partitionLevelErrors.get(partition);
exception = topicLevelErrors.get(topic);
}
if (exception != null) {

View File

@ -336,8 +336,8 @@ public class ForwardingAdmin implements Admin {
}
@Override
public DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId, Set<TopicPartition> partitions, DeleteShareGroupOffsetsOptions options) {
return delegate.deleteShareGroupOffsets(groupId, partitions, options);
public DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId, Set<String> topics, DeleteShareGroupOffsetsOptions options) {
return delegate.deleteShareGroupOffsets(groupId, topics, options);
}
@Override

View File

@ -3846,11 +3846,11 @@ public class KafkaAdminClient extends AdminClient {
}
@Override
public DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId, Set<TopicPartition> partitions, DeleteShareGroupOffsetsOptions options) {
SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition, ApiException>> future = DeleteShareGroupOffsetsHandler.newFuture(groupId);
DeleteShareGroupOffsetsHandler handler = new DeleteShareGroupOffsetsHandler(groupId, partitions, logContext);
public DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId, Set<String> topics, DeleteShareGroupOffsetsOptions options) {
SimpleAdminApiFuture<CoordinatorKey, Map<String, ApiException>> future = DeleteShareGroupOffsetsHandler.newFuture(groupId);
DeleteShareGroupOffsetsHandler handler = new DeleteShareGroupOffsetsHandler(groupId, topics, logContext);
invokeDriver(handler, future, options.timeoutMs);
return new DeleteShareGroupOffsetsResult(future.get(CoordinatorKey.byGroupId(groupId)), partitions);
return new DeleteShareGroupOffsetsResult(future.get(CoordinatorKey.byGroupId(groupId)), topics);
}
@Override

View File

@ -19,7 +19,6 @@ package org.apache.kafka.clients.admin.internals;
import org.apache.kafka.clients.admin.DeleteShareGroupOffsetsOptions;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.message.DeleteShareGroupOffsetsRequestData;
import org.apache.kafka.common.protocol.Errors;
@ -38,24 +37,23 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
/**
* This class is the handler for {@link KafkaAdminClient#deleteShareGroupOffsets(String, Set, DeleteShareGroupOffsetsOptions)} call
*/
public class DeleteShareGroupOffsetsHandler extends AdminApiHandler.Batched<CoordinatorKey, Map<TopicPartition, ApiException>> {
public class DeleteShareGroupOffsetsHandler extends AdminApiHandler.Batched<CoordinatorKey, Map<String, ApiException>> {
private final CoordinatorKey groupId;
private final Logger log;
private final Set<TopicPartition> partitions;
private final Set<String> topics;
private final CoordinatorStrategy lookupStrategy;
public DeleteShareGroupOffsetsHandler(String groupId, Set<TopicPartition> partitions, LogContext logContext) {
public DeleteShareGroupOffsetsHandler(String groupId, Set<String> topics, LogContext logContext) {
this.groupId = CoordinatorKey.byGroupId(groupId);
this.partitions = partitions;
this.topics = topics;
this.log = logContext.logger(DeleteShareGroupOffsetsHandler.class);
this.lookupStrategy = new CoordinatorStrategy(FindCoordinatorRequest.CoordinatorType.GROUP, logContext);
}
@ -70,7 +68,7 @@ public class DeleteShareGroupOffsetsHandler extends AdminApiHandler.Batched<Coor
return lookupStrategy;
}
public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition, ApiException>> newFuture(String groupId) {
public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, Map<String, ApiException>> newFuture(String groupId) {
return AdminApiFuture.forKeys(Collections.singleton(CoordinatorKey.byGroupId(groupId)));
}
@ -85,26 +83,22 @@ public class DeleteShareGroupOffsetsHandler extends AdminApiHandler.Batched<Coor
DeleteShareGroupOffsetsRequest.Builder buildBatchedRequest(int brokerId, Set<CoordinatorKey> groupIds) {
validateKeys(groupIds);
final List<DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic> topics =
final List<DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic> requestTopics =
new ArrayList<>();
partitions.stream().collect(Collectors.groupingBy(TopicPartition::topic)).forEach((topic, topicPartitions) -> topics.add(
topics.forEach(topic -> requestTopics.add(
new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(topic)
.setPartitions(topicPartitions.stream()
.map(TopicPartition::partition)
.collect(Collectors.toList())
)
));
return new DeleteShareGroupOffsetsRequest.Builder(
new DeleteShareGroupOffsetsRequestData()
.setGroupId(groupId.idValue)
.setTopics(topics)
.setTopics(requestTopics)
);
}
@Override
public ApiResult<CoordinatorKey, Map<TopicPartition, ApiException>> handleResponse(
public ApiResult<CoordinatorKey, Map<String, ApiException>> handleResponse(
Node coordinator,
Set<CoordinatorKey> groupIds,
AbstractResponse abstractResponse
@ -123,23 +117,21 @@ public class DeleteShareGroupOffsetsHandler extends AdminApiHandler.Batched<Coor
return new ApiResult<>(Collections.emptyMap(), groupsFailed, new ArrayList<>(groupsToUnmap));
} else {
final Map<TopicPartition, ApiException> partitionResults = new HashMap<>();
response.data().responses().forEach(topic ->
topic.partitions().forEach(partition -> {
if (partition.errorCode() != Errors.NONE.code()) {
final Errors partitionError = Errors.forCode(partition.errorCode());
final String partitionErrorMessage = partition.errorMessage();
log.debug("DeleteShareGroupOffsets request for group id {}, topic {} and partition {} failed and returned error {}." + partitionErrorMessage,
groupId.idValue, topic.topicName(), partition.partitionIndex(), partitionError);
}
partitionResults.put(
new TopicPartition(topic.topicName(), partition.partitionIndex()),
Errors.forCode(partition.errorCode()).exception(partition.errorMessage())
);
})
);
final Map<String, ApiException> topicResults = new HashMap<>();
response.data().responses().forEach(topic -> {
if (topic.errorCode() != Errors.NONE.code()) {
final Errors topicError = Errors.forCode(topic.errorCode());
final String topicErrorMessage = topic.errorMessage();
log.debug("DeleteShareGroupOffsets request for group id {} and topic {} failed and returned error {}." + topicErrorMessage,
groupId.idValue, topic.topicName(), topicError);
}
topicResults.put(
topic.topicName(),
Errors.forCode(topic.errorCode()).exception(topic.errorMessage())
);
});
return ApiResult.completed(groupId, partitionResults);
return ApiResult.completed(groupId, topicResults);
}
}

View File

@ -43,9 +43,7 @@ public class DeleteShareGroupOffsetsResponse extends AbstractResponse {
Map<Errors, Integer> counts = new EnumMap<>(Errors.class);
updateErrorCounts(counts, Errors.forCode(data.errorCode()));
data.responses().forEach(
topicResult -> topicResult.partitions().forEach(
partitionResult -> updateErrorCounts(counts, Errors.forCode(partitionResult.errorCode()))
)
topicResult -> updateErrorCounts(counts, Errors.forCode(topicResult.errorCode()))
);
return counts;
}

View File

@ -26,9 +26,7 @@
{ "name": "Topics", "type": "[]DeleteShareGroupOffsetsRequestTopic", "versions": "0+",
"about": "The topics to delete offsets for.", "fields": [
{ "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
"about": "The topic name." },
{ "name": "Partitions", "type": "[]int32", "versions": "0+",
"about": "The partitions." }
"about": "The topic name." }
]}
]
}

View File

@ -30,6 +30,7 @@
// - KAFKA_STORAGE_ERROR (version 0+)
// - INVALID_REQUEST (version 0+)
// - UNKNOWN_SERVER_ERROR (version 0+)
// - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
@ -43,14 +44,10 @@
"about": "The topic name." },
{ "name": "TopicId", "type": "uuid", "versions": "0+",
"about": "The unique topic ID." },
{ "name": "Partitions", "type": "[]DeleteShareGroupOffsetsResponsePartition", "versions": "0+", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
"about": "The partition index." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The partition-level error code, or 0 if there was no error." },
{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "The partition-level error message, or null if there was no error." }
]}
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The topic-level error code, or 0 if there was no error." },
{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "The topic-level error message, or null if there was no error." }
]}
]
}

View File

@ -10801,10 +10801,10 @@ public class KafkaAdminClientTest {
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
final Set<TopicPartition> partitions = Collections.singleton(new TopicPartition("A", 0));
final Set<String> topics = Collections.singleton("A");
final DeleteShareGroupOffsetsOptions options = new DeleteShareGroupOffsetsOptions();
env.adminClient().deleteShareGroupOffsets(GROUP_ID, partitions, options);
env.adminClient().deleteShareGroupOffsets(GROUP_ID, topics, options);
final MockClient mockClient = env.kafkaClient();
waitForRequest(mockClient, ApiKeys.DELETE_SHARE_GROUP_OFFSETS);
@ -10825,26 +10825,27 @@ public class KafkaAdminClientTest {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
Uuid fooId = Uuid.randomUuid();
String fooName = "foo";
Uuid barId = Uuid.randomUuid();
String barName = "bar";
String zooName = "zoo";
DeleteShareGroupOffsetsResponseData data = new DeleteShareGroupOffsetsResponseData().setResponses(
List.of(
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic().setTopicName("foo").setPartitions(List.of(new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition().setPartitionIndex(0), new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition().setPartitionIndex(1))),
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic().setTopicName("bar").setPartitions(List.of(new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition().setPartitionIndex(0)))
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic().setTopicName(fooName).setTopicId(fooId),
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic().setTopicName(barName).setTopicId(barId)
)
);
TopicPartition fooTopicPartition0 = new TopicPartition("foo", 0);
TopicPartition fooTopicPartition1 = new TopicPartition("foo", 1);
TopicPartition barPartition0 = new TopicPartition("bar", 0);
TopicPartition zooTopicPartition0 = new TopicPartition("zoo", 0);
env.kafkaClient().prepareResponse(new DeleteShareGroupOffsetsResponse(data));
final DeleteShareGroupOffsetsResult result = env.adminClient().deleteShareGroupOffsets(GROUP_ID, Set.of(fooTopicPartition0, fooTopicPartition1, barPartition0));
final DeleteShareGroupOffsetsResult result = env.adminClient().deleteShareGroupOffsets(GROUP_ID, Set.of(fooName, barName));
assertNull(result.all().get());
assertNull(result.partitionResult(fooTopicPartition0).get());
assertNull(result.partitionResult(fooTopicPartition1).get());
assertNull(result.partitionResult(barPartition0).get());
assertThrows(IllegalArgumentException.class, () -> result.partitionResult(zooTopicPartition0));
assertNull(result.topicResult(fooName).get());
assertNull(result.topicResult(barName).get());
assertThrows(IllegalArgumentException.class, () -> result.topicResult(zooName));
}
}
@ -10856,7 +10857,7 @@ public class KafkaAdminClientTest {
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
DeleteShareGroupOffsetsResponseData data = new DeleteShareGroupOffsetsResponseData().setResponses(
Collections.emptyList()
List.of()
);
env.kafkaClient().prepareResponse(new DeleteShareGroupOffsetsResponse(data));
@ -10875,41 +10876,46 @@ public class KafkaAdminClientTest {
.setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code())
.setErrorMessage(Errors.GROUP_AUTHORIZATION_FAILED.message());
TopicPartition fooTopicPartition0 = new TopicPartition("foo", 0);
TopicPartition fooTopicPartition1 = new TopicPartition("foo", 1);
TopicPartition barTopicPartition0 = new TopicPartition("bar", 0);
String fooName = "foo";
String barName = "bar";
env.kafkaClient().prepareResponse(new DeleteShareGroupOffsetsResponse(data));
final DeleteShareGroupOffsetsResult result = env.adminClient().deleteShareGroupOffsets(GROUP_ID, Set.of(fooTopicPartition0, fooTopicPartition1, barTopicPartition0));
final DeleteShareGroupOffsetsResult result = env.adminClient().deleteShareGroupOffsets(GROUP_ID, Set.of(fooName, barName));
TestUtils.assertFutureThrows(Errors.GROUP_AUTHORIZATION_FAILED.exception().getClass(), result.all());
}
}
@Test
public void testDeleteShareGroupOffsetsWithErrorInOnePartition() throws Exception {
public void testDeleteShareGroupOffsetsWithErrorInOneTopic() throws Exception {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
Uuid fooId = Uuid.randomUuid();
String fooName = "foo";
Uuid barId = Uuid.randomUuid();
String barName = "bar";
DeleteShareGroupOffsetsResponseData data = new DeleteShareGroupOffsetsResponseData().setResponses(
List.of(
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic().setTopicName("foo").setPartitions(List.of(new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition().setPartitionIndex(0), new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition().setPartitionIndex(1).setErrorCode(Errors.KAFKA_STORAGE_ERROR.code()).setErrorMessage(Errors.KAFKA_STORAGE_ERROR.message()))),
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic().setTopicName("bar").setPartitions(List.of(new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition().setPartitionIndex(0)))
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
.setTopicName(fooName)
.setTopicId(fooId)
.setErrorCode(Errors.KAFKA_STORAGE_ERROR.code())
.setErrorMessage(Errors.KAFKA_STORAGE_ERROR.message()),
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
.setTopicName(barName)
.setTopicId(barId)
)
);
TopicPartition fooTopicPartition0 = new TopicPartition("foo", 0);
TopicPartition fooTopicPartition1 = new TopicPartition("foo", 1);
TopicPartition barTopicPartition0 = new TopicPartition("bar", 0);
env.kafkaClient().prepareResponse(new DeleteShareGroupOffsetsResponse(data));
final DeleteShareGroupOffsetsResult result = env.adminClient().deleteShareGroupOffsets(GROUP_ID, Set.of(fooTopicPartition0, fooTopicPartition1, barTopicPartition0));
final DeleteShareGroupOffsetsResult result = env.adminClient().deleteShareGroupOffsets(GROUP_ID, Set.of(fooName, barName));
TestUtils.assertFutureThrows(Errors.KAFKA_STORAGE_ERROR.exception().getClass(), result.all());
assertNull(result.partitionResult(fooTopicPartition0).get());
TestUtils.assertFutureThrows(Errors.KAFKA_STORAGE_ERROR.exception().getClass(), result.partitionResult(fooTopicPartition1));
assertNull(result.partitionResult(barTopicPartition0).get());
TestUtils.assertFutureThrows(Errors.KAFKA_STORAGE_ERROR.exception().getClass(), result.topicResult(fooName));
assertNull(result.topicResult(barName).get());
}
}
@ -10919,24 +10925,25 @@ public class KafkaAdminClientTest {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
Uuid fooId = Uuid.randomUuid();
String fooName = "foo";
String barName = "bar";
DeleteShareGroupOffsetsResponseData data = new DeleteShareGroupOffsetsResponseData().setResponses(
List.of(
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic().setTopicName("foo").setPartitions(List.of(new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition().setPartitionIndex(0), new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition().setPartitionIndex(1))),
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic().setTopicName("bar").setPartitions(List.of(new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition().setPartitionIndex(0)))
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
.setTopicName(fooName)
.setTopicId(fooId)
)
);
TopicPartition fooTopicPartition0 = new TopicPartition("foo", 0);
TopicPartition fooTopicPartition1 = new TopicPartition("foo", 1);
TopicPartition barTopicPartition0 = new TopicPartition("bar", 0);
TopicPartition barTopicPartition1 = new TopicPartition("bar", 1);
env.kafkaClient().prepareResponse(new DeleteShareGroupOffsetsResponse(data));
final DeleteShareGroupOffsetsResult result = env.adminClient().deleteShareGroupOffsets(GROUP_ID, Set.of(fooTopicPartition0, fooTopicPartition1, barTopicPartition0));
final DeleteShareGroupOffsetsResult result = env.adminClient().deleteShareGroupOffsets(GROUP_ID, Set.of(fooName));
assertDoesNotThrow(() -> result.all().get());
assertThrows(IllegalArgumentException.class, () -> result.partitionResult(barTopicPartition1));
assertNull(result.partitionResult(barTopicPartition0).get());
assertThrows(IllegalArgumentException.class, () -> result.topicResult(barName));
assertNull(result.topicResult(fooName).get());
}
}

View File

@ -1431,7 +1431,7 @@ public class MockAdminClient extends AdminClient {
}
@Override
public synchronized DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId, Set<TopicPartition> partitions, DeleteShareGroupOffsetsOptions options) {
public synchronized DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId, Set<String> topics, DeleteShareGroupOffsetsOptions options) {
throw new UnsupportedOperationException("Not implemented yet");
}

View File

@ -3791,8 +3791,7 @@ public class RequestResponseTest {
DeleteShareGroupOffsetsRequestData data = new DeleteShareGroupOffsetsRequestData()
.setGroupId("group")
.setTopics(List.of(new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName("topic-1")
.setPartitions(List.of(0))));
.setTopicName("topic-1")));
return new DeleteShareGroupOffsetsRequest.Builder(data).build(version);
}
@ -3827,9 +3826,7 @@ public class RequestResponseTest {
.setResponses(List.of(new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
.setTopicName("topic-1")
.setTopicId(Uuid.randomUuid())
.setPartitions(List.of(new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
.setPartitionIndex(0)
.setErrorCode(Errors.NONE.code())))));
.setErrorCode(Errors.NONE.code())));
return new DeleteShareGroupOffsetsResponse(data);
}

View File

@ -78,7 +78,6 @@ import java.util.concurrent.{CompletableFuture, ConcurrentHashMap}
import java.util.stream.Collectors
import java.util.{Collections, Optional}
import scala.annotation.nowarn
import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable`
import scala.collection.mutable.ArrayBuffer
import scala.collection.{Map, Seq, Set, mutable}
import scala.jdk.CollectionConverters._
@ -3657,12 +3656,8 @@ class KafkaApis(val requestChannel: RequestChannel,
deleteShareGroupOffsetsResponseTopics.add(
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
.setTopicName(topic.topicName)
.setPartitions(topic.partitions.map(partition => {
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
.setPartitionIndex(partition)
.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
.setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message())
}).toList.asJava)
.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
.setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message())
)
} else {
authorizedTopics.add(topic)
@ -3670,7 +3665,11 @@ class KafkaApis(val requestChannel: RequestChannel,
}
if (authorizedTopics.isEmpty) {
requestHelper.sendMaybeThrottle(request, new DeleteShareGroupOffsetsResponse(new DeleteShareGroupOffsetsResponseData()))
requestHelper.sendMaybeThrottle(
request,
new DeleteShareGroupOffsetsResponse(
new DeleteShareGroupOffsetsResponseData()
.setResponses(deleteShareGroupOffsetsResponseTopics)))
return
}
@ -3679,7 +3678,10 @@ class KafkaApis(val requestChannel: RequestChannel,
new DeleteShareGroupOffsetsRequestData().setGroupId(groupId).setTopics(authorizedTopics)
).handle[Unit] {(responseData, exception) => {
if (exception != null) {
requestHelper.sendMaybeThrottle(request, deleteShareGroupOffsetsRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, exception))
requestHelper.sendMaybeThrottle(request, deleteShareGroupOffsetsRequest.getErrorResponse(
AbstractResponse.DEFAULT_THROTTLE_TIME,
Errors.forException(exception).code(),
exception.getMessage()))
} else if (responseData.errorCode() != Errors.NONE.code) {
requestHelper.sendMaybeThrottle(
request,

View File

@ -44,7 +44,7 @@ import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData.{Descri
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult
import org.apache.kafka.common.message.DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic
import org.apache.kafka.common.message.DeleteShareGroupOffsetsResponseData.{DeleteShareGroupOffsetsResponsePartition, DeleteShareGroupOffsetsResponseTopic}
import org.apache.kafka.common.message.DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic
import org.apache.kafka.common.message.DescribeShareGroupOffsetsRequestData.{DescribeShareGroupOffsetsRequestGroup, DescribeShareGroupOffsetsRequestTopic}
import org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData.{DescribeShareGroupOffsetsResponseGroup, DescribeShareGroupOffsetsResponsePartition, DescribeShareGroupOffsetsResponseTopic}
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResource => IAlterConfigsResource, AlterConfigsResourceCollection => IAlterConfigsResourceCollection, AlterableConfig => IAlterableConfig, AlterableConfigCollection => IAlterableConfigCollection}
@ -12046,7 +12046,7 @@ class KafkaApisTest extends Logging {
def testDeleteShareGroupOffsetsReturnsUnsupportedVersion(): Unit = {
val deleteShareGroupOffsetsRequest = new DeleteShareGroupOffsetsRequestData()
.setGroupId("group")
.setTopics(util.List.of(new DeleteShareGroupOffsetsRequestTopic().setTopicName("topic-1").setPartitions(util.List.of(1))))
.setTopics(util.List.of(new DeleteShareGroupOffsetsRequestTopic().setTopicName("topic-1")))
val requestChannelRequest = buildRequest(new DeleteShareGroupOffsetsRequest.Builder(deleteShareGroupOffsetsRequest).build())
metadataCache = new KRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
@ -12054,14 +12054,14 @@ class KafkaApisTest extends Logging {
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
val response = verifyNoThrottling[DeleteShareGroupOffsetsResponse](requestChannelRequest)
response.data.responses.forEach(topic => topic.partitions.forEach(partition => assertEquals(Errors.UNSUPPORTED_VERSION.code, partition.errorCode)))
response.data.responses.forEach(topic => assertEquals(Errors.UNSUPPORTED_VERSION.code, topic.errorCode))
}
@Test
def testDeleteShareGroupOffsetsRequestsGroupAuthorizationFailed(): Unit = {
val deleteShareGroupOffsetsRequest = new DeleteShareGroupOffsetsRequestData()
.setGroupId("group")
.setTopics(util.List.of(new DeleteShareGroupOffsetsRequestTopic().setTopicName("topic-1").setPartitions(util.List.of(1))))
.setTopics(util.List.of(new DeleteShareGroupOffsetsRequestTopic().setTopicName("topic-1")))
val requestChannelRequest = buildRequest(new DeleteShareGroupOffsetsRequest.Builder(deleteShareGroupOffsetsRequest).build)
@ -12106,11 +12106,9 @@ class KafkaApisTest extends Logging {
val deleteShareGroupOffsetsRequestTopic1 = new DeleteShareGroupOffsetsRequestTopic()
.setTopicName(topicName1)
.setPartitions(util.List.of(0, 1))
val deleteShareGroupOffsetsRequestTopic2 = new DeleteShareGroupOffsetsRequestTopic()
.setTopicName(topicName2)
.setPartitions(util.List.of(0, 1))
val deleteShareGroupOffsetsRequestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId(groupId)
@ -12149,33 +12147,18 @@ class KafkaApisTest extends Logging {
new DeleteShareGroupOffsetsResponseTopic()
.setTopicName(topicName2)
.setTopicId(topicId2)
.setPartitions(util.List.of(
new DeleteShareGroupOffsetsResponsePartition()
.setPartitionIndex(0)
.setErrorMessage(null)
.setErrorCode(Errors.NONE.code()),
new DeleteShareGroupOffsetsResponsePartition()
.setPartitionIndex(1)
.setErrorMessage(null)
.setErrorCode(Errors.NONE.code())
))
))
.setErrorMessage(null)
.setErrorCode(Errors.NONE.code())
)
)
val expectedResponseTopics: util.List[DeleteShareGroupOffsetsResponseTopic] = new util.ArrayList[DeleteShareGroupOffsetsResponseTopic]()
expectedResponseTopics.add(
new DeleteShareGroupOffsetsResponseTopic()
.setTopicName(topicName1)
.setPartitions(util.List.of(
new DeleteShareGroupOffsetsResponsePartition()
.setPartitionIndex(0)
.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
.setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message()),
new DeleteShareGroupOffsetsResponsePartition()
.setPartitionIndex(1)
.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
.setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message())
))
.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
.setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message())
)
deleteShareGroupOffsetsResponseData.responses.forEach{ topic => {
@ -12207,15 +12190,12 @@ class KafkaApisTest extends Logging {
val deleteShareGroupOffsetsRequestTopic1 = new DeleteShareGroupOffsetsRequestTopic()
.setTopicName(topicName1)
.setPartitions(util.List.of(0))
val deleteShareGroupOffsetsRequestTopic2 = new DeleteShareGroupOffsetsRequestTopic()
.setTopicName(topicName2)
.setPartitions(util.List.of(0, 1))
val deleteShareGroupOffsetsRequestTopic3 = new DeleteShareGroupOffsetsRequestTopic()
.setTopicName(topicName3)
.setPartitions(util.List.of(0, 1, 2))
val deleteShareGroupOffsetsRequestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId("group")
@ -12241,42 +12221,18 @@ class KafkaApisTest extends Logging {
new DeleteShareGroupOffsetsResponseTopic()
.setTopicName(topicName1)
.setTopicId(topicId1)
.setPartitions(util.List.of(
new DeleteShareGroupOffsetsResponsePartition()
.setPartitionIndex(0)
.setErrorMessage(null)
.setErrorCode(Errors.NONE.code())
)),
.setErrorMessage(null)
.setErrorCode(Errors.NONE.code()),
new DeleteShareGroupOffsetsResponseTopic()
.setTopicName(topicName2)
.setTopicId(topicId2)
.setPartitions(util.List.of(
new DeleteShareGroupOffsetsResponsePartition()
.setPartitionIndex(0)
.setErrorMessage(null)
.setErrorCode(Errors.NONE.code()),
new DeleteShareGroupOffsetsResponsePartition()
.setPartitionIndex(1)
.setErrorMessage(null)
.setErrorCode(Errors.NONE.code())
)),
.setErrorMessage(null)
.setErrorCode(Errors.NONE.code()),
new DeleteShareGroupOffsetsResponseTopic()
.setTopicName(topicName3)
.setTopicId(topicId3)
.setPartitions(util.List.of(
new DeleteShareGroupOffsetsResponsePartition()
.setPartitionIndex(0)
.setErrorMessage(null)
.setErrorCode(Errors.NONE.code()),
new DeleteShareGroupOffsetsResponsePartition()
.setPartitionIndex(1)
.setErrorMessage(null)
.setErrorCode(Errors.NONE.code()),
new DeleteShareGroupOffsetsResponsePartition()
.setPartitionIndex(2)
.setErrorMessage(null)
.setErrorCode(Errors.NONE.code())
))
.setErrorMessage(null)
.setErrorCode(Errors.NONE.code()),
))
resultFuture.complete(deleteShareGroupOffsetsResponseData)
@ -12296,11 +12252,9 @@ class KafkaApisTest extends Logging {
val deleteShareGroupOffsetsRequestTopic1 = new DeleteShareGroupOffsetsRequestTopic()
.setTopicName(topicName1)
.setPartitions(util.List.of(0))
val deleteShareGroupOffsetsRequestTopic2 = new DeleteShareGroupOffsetsRequestTopic()
.setTopicName(topicName2)
.setPartitions(util.List.of(0, 1))
val deleteShareGroupOffsetsRequestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId("group")
@ -12338,11 +12292,9 @@ class KafkaApisTest extends Logging {
val deleteShareGroupOffsetsRequestTopic1 = new DeleteShareGroupOffsetsRequestTopic()
.setTopicName(topicName1)
.setPartitions(util.List.of(0))
val deleteShareGroupOffsetsRequestTopic2 = new DeleteShareGroupOffsetsRequestTopic()
.setTopicName(topicName2)
.setPartitions(util.List.of(0, 1))
val deleteShareGroupOffsetsRequestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId("group")

View File

@ -2303,7 +2303,7 @@ RULE:[n:string](regexp)s/pattern/replacement/g/U</code></pre>
<td>DELETE_SHARE_GROUP_OFFSETS (92)</td>
<td>Delete</td>
<td>Group</td>
<td>To delete the offset information for a partition in a share group, the application must have privileges on the
<td>To delete the offset information for a topic in a share group, the application must have privileges on the
group and the topic. Group access is checked first, then topic access.</td>
</tr>
<tr>

View File

@ -1448,43 +1448,6 @@ public class GroupCoordinatorService implements GroupCoordinator {
});
}
private CompletableFuture<DeleteShareGroupOffsetsResponseData> persistDeleteShareGroupOffsets(
DeleteShareGroupStateParameters deleteStateRequestParameters,
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> errorTopicResponseList
) {
return persister.deleteState(deleteStateRequestParameters)
.thenCompose(result -> {
if (result == null || result.topicsData() == null) {
log.error("Result is null for the delete share group state");
Exception exception = new IllegalStateException("Result is null for the delete share group state");
return CompletableFuture.completedFuture(
DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.forException(exception))
);
}
result.topicsData().forEach(topicData ->
errorTopicResponseList.add(
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
.setTopicId(topicData.topicId())
.setTopicName(metadataImage.topics().topicIdToNameView().get(topicData.topicId()))
.setPartitions(topicData.partitions().stream().map(
partitionData -> new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
.setPartitionIndex(partitionData.partition())
.setErrorMessage(partitionData.errorCode() == Errors.NONE.code() ? null : Errors.forCode(partitionData.errorCode()).message())
.setErrorCode(partitionData.errorCode())
).toList())
)
);
return CompletableFuture.completedFuture(
new DeleteShareGroupOffsetsResponseData()
.setResponses(errorTopicResponseList)
);
}).exceptionally(throwable -> {
log.error("Failed to delete share group state");
return DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.forException(throwable));
});
}
/**
* See {@link GroupCoordinator#fetchOffsets(AuthorizableRequestContext, OffsetFetchRequestData.OffsetFetchRequestGroup, boolean)}.
*/
@ -1781,49 +1744,124 @@ public class GroupCoordinatorService implements GroupCoordinator {
);
}
return runtime.scheduleReadOperation(
"share-group-delete-offsets-request",
return runtime.scheduleWriteOperation(
"initiate-delete-share-group-offsets",
topicPartitionFor(groupId),
(coordinator, lastCommittedOffset) -> coordinator.shareGroupDeleteOffsetsRequest(groupId, requestData)
Duration.ofMillis(config.offsetCommitTimeoutMs()),
coordinator -> coordinator.initiateDeleteShareGroupOffsets(groupId, requestData)
)
.thenCompose(resultHolder -> {
if (resultHolder == null) {
log.error("Failed to retrieve deleteState request parameters from group coordinator for the group {}", groupId);
return CompletableFuture.completedFuture(
DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.UNKNOWN_SERVER_ERROR)
);
}
.thenCompose(resultHolder -> deleteShareGroupOffsetsState(groupId, resultHolder))
.exceptionally(exception -> handleOperationException(
"initiate-delete-share-group-offsets",
groupId,
exception,
(error, __) -> DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(error),
log
));
}
if (resultHolder.topLevelErrorCode() != Errors.NONE.code()) {
return CompletableFuture.completedFuture(
DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(
resultHolder.topLevelErrorCode(),
resultHolder.topLevelErrorMessage()
)
);
}
private CompletableFuture<DeleteShareGroupOffsetsResponseData> deleteShareGroupOffsetsState(
String groupId,
GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder resultHolder
) {
if (resultHolder == null) {
log.error("Failed to retrieve deleteState request parameters from group coordinator for the group {}", groupId);
return CompletableFuture.completedFuture(
DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.UNKNOWN_SERVER_ERROR)
);
}
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> errorTopicResponseList =
resultHolder.errorTopicResponseList() == null ? new ArrayList<>() : new ArrayList<>(resultHolder.errorTopicResponseList());
if (resultHolder.topLevelErrorCode() != Errors.NONE.code()) {
return CompletableFuture.completedFuture(
DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(
resultHolder.topLevelErrorCode(),
resultHolder.topLevelErrorMessage()
)
);
}
if (resultHolder.deleteStateRequestParameters() == null) {
return CompletableFuture.completedFuture(
new DeleteShareGroupOffsetsResponseData()
.setResponses(errorTopicResponseList)
);
}
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> errorTopicResponseList =
resultHolder.errorTopicResponseList() == null ? new ArrayList<>() : new ArrayList<>(resultHolder.errorTopicResponseList());
return persistDeleteShareGroupOffsets(
resultHolder.deleteStateRequestParameters(),
errorTopicResponseList
);
})
if (resultHolder.deleteStateRequestParameters() == null) {
return CompletableFuture.completedFuture(
new DeleteShareGroupOffsetsResponseData()
.setResponses(errorTopicResponseList)
);
}
return persister.deleteState(resultHolder.deleteStateRequestParameters())
.thenCompose(result -> handleDeleteShareGroupOffsetStateResult(groupId, result, errorTopicResponseList))
.exceptionally(throwable -> {
log.error("Failed to retrieve deleteState request parameters from group coordinator for the group {}", groupId, throwable);
log.error("Failed to delete share group state due to: {}", throwable.getMessage(), throwable);
return DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.forException(throwable));
});
}
private CompletableFuture<DeleteShareGroupOffsetsResponseData> handleDeleteShareGroupOffsetStateResult(
String groupId,
DeleteShareGroupStateResult result,
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> errorTopicResponses
) {
if (result == null || result.topicsData() == null) {
log.error("Result is null for the delete share group state");
Exception exception = new IllegalStateException("Result is null for the delete share group state");
return CompletableFuture.completedFuture(
DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.forException(exception))
);
}
Map<Uuid, String> successTopics = new HashMap<>();
result.topicsData().forEach(topicData -> {
Optional<PartitionErrorData> errItem = topicData.partitions().stream()
.filter(errData -> errData.errorCode() != Errors.NONE.code())
.findAny();
if (errItem.isPresent()) {
errorTopicResponses.add(
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
.setTopicId(topicData.topicId())
.setTopicName(metadataImage.topics().topicIdToNameView().get(topicData.topicId()))
.setErrorMessage(Errors.forCode(errItem.get().errorCode()).message())
.setErrorCode(errItem.get().errorCode())
);
} else {
successTopics.put(
topicData.topicId(),
metadataImage.topics().topicIdToNameView().get(topicData.topicId())
);
}
});
// If there are no topics for which persister delete state request succeeded, then we can return directly from here
if (successTopics.isEmpty()) {
return CompletableFuture.completedFuture(
new DeleteShareGroupOffsetsResponseData()
.setResponses(errorTopicResponses)
);
}
return completeDeleteShareGroupOffsets(groupId, successTopics, errorTopicResponses);
}
private CompletableFuture<DeleteShareGroupOffsetsResponseData> completeDeleteShareGroupOffsets(
String groupId,
Map<Uuid, String> successTopics,
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> errorTopicResponses
) {
return runtime.scheduleWriteOperation(
"complete-delete-share-group-offsets",
topicPartitionFor(groupId),
Duration.ofMillis(config.offsetCommitTimeoutMs()),
coordinator -> coordinator.completeDeleteShareGroupOffsets(groupId, successTopics, errorTopicResponses)
).exceptionally(exception -> handleOperationException(
"complete-delete-share-group-offsets",
groupId,
exception,
(error, __) -> DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(error),
log
));
}
/**
* See {@link GroupCoordinator#commitOffsets(AuthorizableRequestContext, OffsetCommitRequestData, BufferSupplier)}.
*/

View File

@ -662,7 +662,9 @@ public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord
* @param groupIds - A list of groupIds as string
* @return A result object containing a map keyed on groupId and value pair (req, error) and related coordinator records.
*/
public CoordinatorResult<Map<String, Map.Entry<DeleteShareGroupStateParameters, Errors>>, CoordinatorRecord> sharePartitionDeleteRequests(List<String> groupIds) {
public CoordinatorResult<Map<String, Map.Entry<DeleteShareGroupStateParameters, Errors>>, CoordinatorRecord> sharePartitionDeleteRequests(
List<String> groupIds
) {
Map<String, Map.Entry<DeleteShareGroupStateParameters, Errors>> responseMap = new HashMap<>();
List<CoordinatorRecord> records = new ArrayList<>();
for (String groupId : groupIds) {
@ -682,20 +684,23 @@ public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord
}
/**
* Does the following checks to make sure that a DeleteShareGroupOffsets request is valid and can be processed further
* Ensure the following checks are true to make sure that a DeleteShareGroupOffsets request is valid and can be processed further
* 1. Checks whether the provided group is empty
* 2. Checks the requested topics are presented in the metadataImage
* 3. Checks the requested share partitions are initialized for the group
* Once these checks are passed, an appropriate ShareGroupStatePartitionMetadataRecord is created by adding the topics to
* deleting topics list and removing them from the initialized topics list.
*
* @param groupId - The group ID
* @param requestData - The request data for DeleteShareGroupOffsetsRequest
* @return {@link DeleteShareGroupOffsetsResultHolder} an object containing top level error code, list of topic responses
* and persister deleteState request parameters
*/
public DeleteShareGroupOffsetsResultHolder shareGroupDeleteOffsetsRequest(
public CoordinatorResult<DeleteShareGroupOffsetsResultHolder, CoordinatorRecord> initiateDeleteShareGroupOffsets(
String groupId,
DeleteShareGroupOffsetsRequestData requestData
) {
List<CoordinatorRecord> records = new ArrayList<>();
try {
ShareGroup group = groupMetadataManager.shareGroup(groupId);
group.validateDeleteGroup();
@ -705,33 +710,81 @@ public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord
groupMetadataManager.sharePartitionsEligibleForOffsetDeletion(
groupId,
requestData,
errorTopicResponseList
errorTopicResponseList,
records
);
if (deleteShareGroupStateRequestTopicsData.isEmpty()) {
return new DeleteShareGroupOffsetsResultHolder(Errors.NONE.code(), null, errorTopicResponseList);
return new CoordinatorResult<>(
records,
new DeleteShareGroupOffsetsResultHolder(Errors.NONE.code(), null, errorTopicResponseList)
);
}
DeleteShareGroupStateRequestData deleteShareGroupStateRequestData = new DeleteShareGroupStateRequestData()
.setGroupId(requestData.groupId())
.setTopics(deleteShareGroupStateRequestTopicsData);
return new DeleteShareGroupOffsetsResultHolder(
Errors.NONE.code(),
null,
errorTopicResponseList,
DeleteShareGroupStateParameters.from(deleteShareGroupStateRequestData)
return new CoordinatorResult<>(
records,
new DeleteShareGroupOffsetsResultHolder(
Errors.NONE.code(),
null,
errorTopicResponseList,
DeleteShareGroupStateParameters.from(deleteShareGroupStateRequestData)
)
);
} catch (GroupIdNotFoundException exception) {
log.error("groupId {} not found", groupId, exception);
return new DeleteShareGroupOffsetsResultHolder(Errors.GROUP_ID_NOT_FOUND.code(), exception.getMessage());
return new CoordinatorResult<>(
records,
new DeleteShareGroupOffsetsResultHolder(Errors.GROUP_ID_NOT_FOUND.code(), exception.getMessage())
);
} catch (GroupNotEmptyException exception) {
log.error("Provided group {} is not empty", groupId);
return new DeleteShareGroupOffsetsResultHolder(Errors.NON_EMPTY_GROUP.code(), exception.getMessage());
return new CoordinatorResult<>(
records,
new DeleteShareGroupOffsetsResultHolder(Errors.NON_EMPTY_GROUP.code(), exception.getMessage())
);
}
}
/**
* Completes the share group offset deletion by creating a ShareGroupStatePartitionMetadataRecord removing the
* deleted topics from deletingTopics set. Returns the final response for DeleteShareGroupOffsetsRequest
*
* @param groupId - The group ID
* @param topics - The set of topics which were deleted successfully by the persister
* @return the final response {@link DeleteShareGroupOffsetsResponseData} for the DeleteShareGroupOffsetsRequest
*/
public CoordinatorResult<DeleteShareGroupOffsetsResponseData, CoordinatorRecord> completeDeleteShareGroupOffsets(
String groupId,
Map<Uuid, String> topics,
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> errorTopicResponseList
) {
List<CoordinatorRecord> records = new ArrayList<>();
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> topicResponseList = new ArrayList<>();
topicResponseList.addAll(
groupMetadataManager.completeDeleteShareGroupOffsets(
groupId,
topics,
records
)
);
topicResponseList.addAll(errorTopicResponseList);
return new CoordinatorResult<>(
records,
new DeleteShareGroupOffsetsResponseData()
.setErrorCode(Errors.NONE.code())
.setErrorMessage(null)
.setResponses(topicResponseList)
);
}
/**
* Fetch offsets for a given set of partitions and a given group.
*

View File

@ -7986,50 +7986,131 @@ public class GroupMetadataManager {
/**
* Returns a list of delete share group state request topic objects to be used with the persister.
* @param groupId - group ID of the share group
* @param requestData - the request data for DeleteShareGroupOffsets request
* @param errorTopicResponseList - the list of topics not found in the metadata image
* @param groupId group ID of the share group
* @param requestData the request data for DeleteShareGroupOffsets request
* @param errorTopicResponseList the list of topics not found in the metadata image
* @param records List of coordinator records to append to
*
* @return List of objects representing the share group state delete request for topics.
*/
public List<DeleteShareGroupStateRequestData.DeleteStateData> sharePartitionsEligibleForOffsetDeletion(
String groupId,
DeleteShareGroupOffsetsRequestData requestData,
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> errorTopicResponseList
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> errorTopicResponseList,
List<CoordinatorRecord> records
) {
List<DeleteShareGroupStateRequestData.DeleteStateData> deleteShareGroupStateRequestTopicsData = new ArrayList<>();
Map<Uuid, Set<Integer>> initializedTopics = new HashMap<>();
ShareGroupStatePartitionMetadataInfo currentMap = shareGroupPartitionMetadata.get(groupId);
if (currentMap == null) {
return deleteShareGroupStateRequestTopicsData;
}
currentMap.initializedTopics().forEach((topicId, partitions) -> initializedTopics.put(topicId, new HashSet<>(partitions)));
Set<Uuid> deletingTopics = new HashSet<>(currentMap.deletingTopics());
Map<Uuid, Set<Integer>> initializedSharePartitions = initializedShareGroupPartitions(groupId);
requestData.topics().forEach(topic -> {
Uuid topicId = metadataImage.topics().topicNameToIdView().get(topic.topicName());
if (topicId != null) {
TopicImage topicImage = metadataImage.topics().getTopic(topic.topicName());
if (topicImage != null) {
Uuid topicId = topicImage.id();
// A deleteState request to persister should only be sent with those topic partitions for which corresponding
// share partitions are initialized for the group.
if (initializedSharePartitions.containsKey(topicId)) {
if (initializedTopics.containsKey(topicId)) {
List<DeleteShareGroupStateRequestData.PartitionData> partitions = new ArrayList<>();
topic.partitions().forEach(partition -> {
if (initializedSharePartitions.get(topicId).contains(partition)) {
partitions.add(new DeleteShareGroupStateRequestData.PartitionData().setPartition(partition));
}
});
deleteShareGroupStateRequestTopicsData.add(new DeleteShareGroupStateRequestData.DeleteStateData()
.setTopicId(topicId)
.setPartitions(partitions));
initializedTopics.get(topicId).forEach(partition ->
partitions.add(new DeleteShareGroupStateRequestData.PartitionData().setPartition(partition)));
deleteShareGroupStateRequestTopicsData.add(
new DeleteShareGroupStateRequestData.DeleteStateData()
.setTopicId(topicId)
.setPartitions(partitions)
);
// Removing the topic from initializedTopics map.
initializedTopics.remove(topicId);
// Adding the topic to deletingTopics map.
deletingTopics.add(topicId);
} else if (deletingTopics.contains(topicId)) {
// If the topic for which delete share group offsets request is sent is already present in the deletingTopics set,
// we will include that topic in the delete share group state request.
List<DeleteShareGroupStateRequestData.PartitionData> partitions = new ArrayList<>();
topicImage.partitions().keySet().forEach(partition ->
partitions.add(new DeleteShareGroupStateRequestData.PartitionData().setPartition(partition)));
deleteShareGroupStateRequestTopicsData.add(
new DeleteShareGroupStateRequestData.DeleteStateData()
.setTopicId(topicId)
.setPartitions(partitions)
);
} else {
errorTopicResponseList.add(
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
.setTopicName(topic.topicName())
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
.setErrorMessage("There is no offset information to delete."));
}
} else {
errorTopicResponseList.add(new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
.setTopicName(topic.topicName())
.setPartitions(topic.partitions().stream().map(
partition -> new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
.setPartitionIndex(partition)
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())
).collect(Collectors.toCollection(ArrayList::new))));
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())
);
}
});
records.add(
GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord(
groupId,
attachTopicName(currentMap.initializingTopics()),
attachTopicName(initializedTopics),
attachTopicName(deletingTopics)
)
);
return deleteShareGroupStateRequestTopicsData;
}
/**
* Returns a list of {@link DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic} corresponding to the
* topics for which persister delete share group state request was successful
* @param groupId group ID of the share group
* @param topics a map of topicId to topic name
* @param records List of coordinator records to append to
*
* @return List of objects for which request was successful
*/
public List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> completeDeleteShareGroupOffsets(
String groupId,
Map<Uuid, String> topics,
List<CoordinatorRecord> records
) {
ShareGroupStatePartitionMetadataInfo currentMap = shareGroupPartitionMetadata.get(groupId);
if (currentMap == null) {
return List.of();
}
Set<Uuid> updatedDeletingTopics = new HashSet<>(currentMap.deletingTopics());
topics.keySet().forEach(updatedDeletingTopics::remove);
records.add(
GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord(
groupId,
attachTopicName(currentMap.initializingTopics()),
attachTopicName(currentMap.initializedTopics()),
attachTopicName(updatedDeletingTopics)
)
);
return topics.entrySet().stream().map(entry ->
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
.setTopicId(entry.getKey())
.setTopicName(entry.getValue())
.setErrorCode(Errors.NONE.code())
.setErrorMessage(null)
).toList();
}
/**
* Validates the DeleteGroups request.
*

View File

@ -3929,7 +3929,6 @@ public class GroupCoordinatorServiceTest {
.setGroupId(groupId)
.setTopics(List.of(new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(TOPIC_NAME)
.setPartitions(List.of(partition))
));
DeleteShareGroupOffsetsResponseData responseData = new DeleteShareGroupOffsetsResponseData()
@ -3937,10 +3936,8 @@ public class GroupCoordinatorServiceTest {
List.of(new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
.setTopicName(TOPIC_NAME)
.setTopicId(TOPIC_ID)
.setPartitions(List.of(new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
.setPartitionIndex(partition)
.setErrorCode(PartitionFactory.DEFAULT_ERROR_CODE)
.setErrorMessage(null))))
.setErrorCode(PartitionFactory.DEFAULT_ERROR_CODE)
.setErrorMessage(null))
);
GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder deleteShareGroupOffsetsResultHolder =
@ -3962,12 +3959,20 @@ public class GroupCoordinatorServiceTest {
)
);
when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("share-group-delete-offsets-request"),
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("initiate-delete-share-group-offsets"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.eq(Duration.ofMillis(5000)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(deleteShareGroupOffsetsResultHolder));
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("complete-delete-share-group-offsets"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.eq(Duration.ofMillis(5000)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(responseData));
CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
service.deleteShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS), requestData);
@ -3992,7 +3997,6 @@ public class GroupCoordinatorServiceTest {
.setGroupId(groupId)
.setTopics(List.of(new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(TOPIC_NAME)
.setPartitions(List.of(partition))
));
DeleteShareGroupStateRequestData deleteShareGroupStateRequestData = new DeleteShareGroupStateRequestData()
@ -4007,10 +4011,8 @@ public class GroupCoordinatorServiceTest {
List.of(new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
.setTopicName(TOPIC_NAME)
.setTopicId(TOPIC_ID)
.setPartitions(List.of(new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
.setPartitionIndex(partition)
.setErrorCode(Errors.NONE.code())
.setErrorMessage(null))))
.setErrorCode(Errors.NONE.code())
.setErrorMessage(null))
);
DeleteShareGroupStateResponseData deleteShareGroupStateResponseData = new DeleteShareGroupStateResponseData()
@ -4032,12 +4034,20 @@ public class GroupCoordinatorServiceTest {
DeleteShareGroupStateParameters.from(deleteShareGroupStateRequestData)
);
when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("share-group-delete-offsets-request"),
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("initiate-delete-share-group-offsets"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.eq(Duration.ofMillis(5000)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(deleteShareGroupOffsetsResultHolder));
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("complete-delete-share-group-offsets"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.eq(Duration.ofMillis(5000)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(responseData));
DeleteShareGroupStateParameters deleteShareGroupStateParameters = DeleteShareGroupStateParameters.from(deleteShareGroupStateRequestData);
DeleteShareGroupStateResult deleteShareGroupStateResult = DeleteShareGroupStateResult.from(deleteShareGroupStateResponseData);
when(persister.deleteState(
@ -4058,12 +4068,10 @@ public class GroupCoordinatorServiceTest {
.setRuntime(runtime)
.build();
int partition = 1;
DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId("share-group-id")
.setTopics(List.of(new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(TOPIC_NAME)
.setPartitions(List.of(partition))
));
DeleteShareGroupOffsetsResponseData responseData = new DeleteShareGroupOffsetsResponseData()
@ -4087,12 +4095,10 @@ public class GroupCoordinatorServiceTest {
// Forcing a null Metadata Image
service.onNewMetadataImage(null, null);
int partition = 1;
DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId("share-group-id")
.setTopics(List.of(new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(TOPIC_NAME)
.setPartitions(List.of(partition))
));
DeleteShareGroupOffsetsResponseData responseData = new DeleteShareGroupOffsetsResponseData()
@ -4116,12 +4122,10 @@ public class GroupCoordinatorServiceTest {
.build(true);
service.startup(() -> 1);
int partition = 1;
DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId("")
.setTopics(List.of(new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(TOPIC_NAME)
.setPartitions(List.of(partition))
));
DeleteShareGroupOffsetsResponseData responseData = new DeleteShareGroupOffsetsResponseData()
@ -4156,6 +4160,29 @@ public class GroupCoordinatorServiceTest {
assertEquals(responseData, future.get());
}
@Test
public void testDeleteShareGroupOffsetsNullTopicsInRequest() throws InterruptedException, ExecutionException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
Persister persister = mock(DefaultStatePersister.class);
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
.setConfig(createConfig())
.setRuntime(runtime)
.setPersister(persister)
.build(true);
service.startup(() -> 1);
DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId("share-group-id")
.setTopics(null);
DeleteShareGroupOffsetsResponseData responseData = new DeleteShareGroupOffsetsResponseData();
CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS), requestData);
assertEquals(responseData, future.get());
}
@Test
public void testDeleteShareGroupOffsetsRequestThrowsError() throws InterruptedException, ExecutionException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
@ -4169,23 +4196,22 @@ public class GroupCoordinatorServiceTest {
String groupId = "share-group-id";
int partition = 1;
DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId(groupId)
.setTopics(List.of(new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(TOPIC_NAME)
.setPartitions(List.of(partition))
));
DeleteShareGroupOffsetsResponseData responseData = new DeleteShareGroupOffsetsResponseData()
.setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
.setErrorMessage(Errors.UNKNOWN_SERVER_ERROR.message());
when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("share-group-delete-offsets-request"),
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("initiate-delete-share-group-offsets"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.eq(Duration.ofMillis(5000)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(FutureUtils.failedFuture(Errors.UNKNOWN_SERVER_ERROR.exception())));
)).thenReturn(FutureUtils.failedFuture(Errors.UNKNOWN_SERVER_ERROR.exception()));
CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS), requestData);
@ -4194,7 +4220,7 @@ public class GroupCoordinatorServiceTest {
}
@Test
public void testDeleteShareGroupOffsetsRequestReturnsNull() throws InterruptedException, ExecutionException {
public void testDeleteShareGroupOffsetsRequestNullResultHolder() throws InterruptedException, ExecutionException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
Persister persister = mock(DefaultStatePersister.class);
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
@ -4206,21 +4232,20 @@ public class GroupCoordinatorServiceTest {
String groupId = "share-group-id";
int partition = 1;
DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId(groupId)
.setTopics(List.of(new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(TOPIC_NAME)
.setPartitions(List.of(partition))
));
DeleteShareGroupOffsetsResponseData responseData = new DeleteShareGroupOffsetsResponseData()
.setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
.setErrorMessage(Errors.UNKNOWN_SERVER_ERROR.message());
when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("share-group-delete-offsets-request"),
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("initiate-delete-share-group-offsets"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.eq(Duration.ofMillis(5000)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(null));
@ -4243,12 +4268,10 @@ public class GroupCoordinatorServiceTest {
String groupId = "share-group-id";
int partition = 1;
DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId(groupId)
.setTopics(List.of(new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(TOPIC_NAME)
.setPartitions(List.of(partition))
));
DeleteShareGroupOffsetsResponseData responseData = new DeleteShareGroupOffsetsResponseData()
@ -4263,9 +4286,10 @@ public class GroupCoordinatorServiceTest {
null
);
when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("share-group-delete-offsets-request"),
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("initiate-delete-share-group-offsets"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.eq(Duration.ofMillis(5000)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(deleteShareGroupOffsetsResultHolder));
@ -4288,12 +4312,10 @@ public class GroupCoordinatorServiceTest {
String groupId = "share-group-id";
int partition = 1;
DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId(groupId)
.setTopics(List.of(new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(TOPIC_NAME)
.setPartitions(List.of(partition))
));
DeleteShareGroupOffsetsResponseData responseData = new DeleteShareGroupOffsetsResponseData()
@ -4308,9 +4330,10 @@ public class GroupCoordinatorServiceTest {
null
);
when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("share-group-delete-offsets-request"),
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("initiate-delete-share-group-offsets"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.eq(Duration.ofMillis(5000)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(deleteShareGroupOffsetsResultHolder));
@ -4333,12 +4356,10 @@ public class GroupCoordinatorServiceTest {
String groupId = "share-group-id";
int partition = 1;
DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId(groupId)
.setTopics(List.of(new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(TOPIC_NAME)
.setPartitions(List.of(partition))
));
DeleteShareGroupOffsetsResponseData responseData = new DeleteShareGroupOffsetsResponseData();
@ -4351,9 +4372,10 @@ public class GroupCoordinatorServiceTest {
null
);
when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("share-group-delete-offsets-request"),
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("initiate-delete-share-group-offsets"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.eq(Duration.ofMillis(5000)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(deleteShareGroupOffsetsResultHolder));
@ -4378,27 +4400,22 @@ public class GroupCoordinatorServiceTest {
Uuid badTopicId = Uuid.randomUuid();
String groupId = "share-group-id";
int partition = 1;
DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId(groupId)
.setTopics(List.of(
new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(TOPIC_NAME)
.setPartitions(List.of(partition)),
.setTopicName(TOPIC_NAME),
new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(badTopicName)
.setPartitions(List.of(partition))
));
DeleteShareGroupOffsetsResponseData responseData = new DeleteShareGroupOffsetsResponseData()
.setResponses(List.of(new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
.setTopicName(badTopicName)
.setTopicId(badTopicId)
.setPartitions(List.of(new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
.setPartitionIndex(partition)
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())
))));
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())
));
GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder deleteShareGroupOffsetsResultHolder =
new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder(
@ -4407,17 +4424,16 @@ public class GroupCoordinatorServiceTest {
List.of(new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
.setTopicName(badTopicName)
.setTopicId(badTopicId)
.setPartitions(List.of(new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
.setPartitionIndex(partition)
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())
))),
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())
),
null
);
when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("share-group-delete-offsets-request"),
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("initiate-delete-share-group-offsets"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.eq(Duration.ofMillis(5000)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(deleteShareGroupOffsetsResultHolder));
@ -4428,7 +4444,7 @@ public class GroupCoordinatorServiceTest {
}
@Test
public void testDeleteShareGroupOffsetsWithDefaultPersisterThrowsError() throws InterruptedException, ExecutionException {
public void testDeleteShareGroupOffsetsPersisterThrowsError() throws InterruptedException, ExecutionException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
Persister persister = mock(DefaultStatePersister.class);
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
@ -4445,7 +4461,6 @@ public class GroupCoordinatorServiceTest {
.setGroupId(groupId)
.setTopics(List.of(new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(TOPIC_NAME)
.setPartitions(List.of(partition))
));
Exception persisterException = new Exception("Unable to validate delete share group state request");
@ -4473,9 +4488,10 @@ public class GroupCoordinatorServiceTest {
)
);
when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("share-group-delete-offsets-request"),
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("initiate-delete-share-group-offsets"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.eq(Duration.ofMillis(5000)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(deleteShareGroupOffsetsResultHolder));
@ -4489,7 +4505,7 @@ public class GroupCoordinatorServiceTest {
}
@Test
public void testDeleteShareGroupOffsetsWithDefaultPersisterNullResult() throws InterruptedException, ExecutionException {
public void testDeleteShareGroupOffsetsPersisterReturnsNull() throws InterruptedException, ExecutionException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
Persister persister = mock(DefaultStatePersister.class);
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
@ -4506,7 +4522,6 @@ public class GroupCoordinatorServiceTest {
.setGroupId(groupId)
.setTopics(List.of(new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(TOPIC_NAME)
.setPartitions(List.of(partition))
));
Exception persisterException = new IllegalStateException("Result is null for the delete share group state");
@ -4534,9 +4549,10 @@ public class GroupCoordinatorServiceTest {
)
);
when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("share-group-delete-offsets-request"),
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("initiate-delete-share-group-offsets"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.eq(Duration.ofMillis(5000)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(deleteShareGroupOffsetsResultHolder));
@ -4550,7 +4566,7 @@ public class GroupCoordinatorServiceTest {
}
@Test
public void testDeleteShareGroupOffsetsWithDefaultPersisterNullTopicData() throws InterruptedException, ExecutionException {
public void testDeleteShareGroupOffsetsPersisterReturnsNullTopicData() throws InterruptedException, ExecutionException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
Persister persister = mock(DefaultStatePersister.class);
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
@ -4567,7 +4583,6 @@ public class GroupCoordinatorServiceTest {
.setGroupId(groupId)
.setTopics(List.of(new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(TOPIC_NAME)
.setPartitions(List.of(partition))
));
Exception persisterException = new IllegalStateException("Result is null for the delete share group state");
@ -4595,9 +4610,10 @@ public class GroupCoordinatorServiceTest {
)
);
when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("share-group-delete-offsets-request"),
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("initiate-delete-share-group-offsets"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.eq(Duration.ofMillis(5000)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(deleteShareGroupOffsetsResultHolder));
@ -4614,7 +4630,7 @@ public class GroupCoordinatorServiceTest {
}
@Test
public void testDeleteShareGroupOffsetsSuccessWithErrorTopicPartitions() throws InterruptedException, ExecutionException {
public void testDeleteShareGroupOffsetsPersisterReturnsNoSuccessfulTopics() throws InterruptedException, ExecutionException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
Persister persister = mock(DefaultStatePersister.class);
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
@ -4624,21 +4640,19 @@ public class GroupCoordinatorServiceTest {
.build(true);
service.startup(() -> 1);
String badTopicName = "bad-topic";
Uuid badTopicId = Uuid.randomUuid();
String groupId = "share-group-id";
int partition = 1;
String badTopicName = "bad-topic";
Uuid badTopicId = Uuid.randomUuid();
int partition = 1;
DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId(groupId)
.setTopics(List.of(
new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(TOPIC_NAME)
.setPartitions(List.of(partition)),
.setTopicName(TOPIC_NAME),
new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(badTopicName)
.setPartitions(List.of(partition))
));
DeleteShareGroupStateRequestData deleteShareGroupStateRequestData = new DeleteShareGroupStateRequestData()
@ -4649,28 +4663,18 @@ public class GroupCoordinatorServiceTest {
.setPartition(partition)))));
DeleteShareGroupOffsetsResponseData responseData = new DeleteShareGroupOffsetsResponseData()
.setResponses(
List.of(
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
.setTopicName(badTopicName)
.setTopicId(badTopicId)
.setPartitions(List.of(
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
.setPartitionIndex(partition)
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())
)),
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
.setTopicName(TOPIC_NAME)
.setTopicId(TOPIC_ID)
.setPartitions(List.of(
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
.setPartitionIndex(partition)
.setErrorCode(Errors.NONE.code())
.setErrorMessage(null)
))
)
);
.setResponses(List.of(
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
.setTopicName(badTopicName)
.setTopicId(badTopicId)
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message()),
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
.setTopicName(TOPIC_NAME)
.setTopicId(TOPIC_ID)
.setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
.setErrorMessage(Errors.UNKNOWN_SERVER_ERROR.message())
));
DeleteShareGroupStateResponseData deleteShareGroupStateResponseData = new DeleteShareGroupStateResponseData()
.setResults(
@ -4678,8 +4682,8 @@ public class GroupCoordinatorServiceTest {
.setTopicId(TOPIC_ID)
.setPartitions(List.of(new DeleteShareGroupStateResponseData.PartitionResult()
.setPartition(partition)
.setErrorCode(Errors.NONE.code())
.setErrorMessage(null)))
.setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
.setErrorMessage(Errors.UNKNOWN_SERVER_ERROR.message())))
)
);
@ -4687,20 +4691,20 @@ public class GroupCoordinatorServiceTest {
new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder(
Errors.NONE.code(),
null,
List.of(new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
.setTopicName(badTopicName)
.setTopicId(badTopicId)
.setPartitions(List.of(new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
.setPartitionIndex(partition)
List.of(
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
.setTopicId(badTopicId)
.setTopicName(badTopicName)
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())
))),
),
DeleteShareGroupStateParameters.from(deleteShareGroupStateRequestData)
);
when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("share-group-delete-offsets-request"),
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("initiate-delete-share-group-offsets"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.eq(Duration.ofMillis(5000)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(deleteShareGroupOffsetsResultHolder));
@ -4716,6 +4720,211 @@ public class GroupCoordinatorServiceTest {
assertEquals(responseData, future.get());
}
@Test
public void testDeleteShareGroupOffsetsCompleteDeleteStateReturnsError() throws InterruptedException, ExecutionException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
Persister persister = mock(DefaultStatePersister.class);
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
.setConfig(createConfig())
.setRuntime(runtime)
.setPersister(persister)
.build(true);
service.startup(() -> 1);
String groupId = "share-group-id";
int partition = 1;
DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId(groupId)
.setTopics(List.of(
new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(TOPIC_NAME)
));
DeleteShareGroupStateRequestData deleteShareGroupStateRequestData = new DeleteShareGroupStateRequestData()
.setGroupId(groupId)
.setTopics(List.of(new DeleteShareGroupStateRequestData.DeleteStateData()
.setTopicId(TOPIC_ID)
.setPartitions(List.of(new DeleteShareGroupStateRequestData.PartitionData()
.setPartition(partition)))));
DeleteShareGroupOffsetsResponseData responseData = new DeleteShareGroupOffsetsResponseData()
.setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
.setErrorMessage(Errors.UNKNOWN_SERVER_ERROR.message());
DeleteShareGroupStateResponseData deleteShareGroupStateResponseData = new DeleteShareGroupStateResponseData()
.setResults(
List.of(new DeleteShareGroupStateResponseData.DeleteStateResult()
.setTopicId(TOPIC_ID)
.setPartitions(List.of(
new DeleteShareGroupStateResponseData.PartitionResult()
.setPartition(partition)
.setErrorCode(Errors.NONE.code())
.setErrorMessage(null)
))
)
);
GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder deleteShareGroupOffsetsResultHolder =
new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder(
Errors.NONE.code(),
null,
List.of(),
DeleteShareGroupStateParameters.from(deleteShareGroupStateRequestData)
);
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("initiate-delete-share-group-offsets"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.eq(Duration.ofMillis(5000)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(deleteShareGroupOffsetsResultHolder));
DeleteShareGroupStateParameters deleteShareGroupStateParameters = DeleteShareGroupStateParameters.from(deleteShareGroupStateRequestData);
DeleteShareGroupStateResult deleteShareGroupStateResult = DeleteShareGroupStateResult.from(deleteShareGroupStateResponseData);
when(persister.deleteState(
ArgumentMatchers.eq(deleteShareGroupStateParameters)
)).thenReturn(CompletableFuture.completedFuture(deleteShareGroupStateResult));
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("complete-delete-share-group-offsets"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.eq(Duration.ofMillis(5000)),
ArgumentMatchers.any()
)).thenReturn(FutureUtils.failedFuture(Errors.UNKNOWN_SERVER_ERROR.exception()));
CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS), requestData);
assertEquals(responseData, future.get());
}
@Test
public void testDeleteShareGroupOffsetsSuccessWithErrorTopics() throws InterruptedException, ExecutionException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
Persister persister = mock(DefaultStatePersister.class);
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
.setConfig(createConfig())
.setRuntime(runtime)
.setPersister(persister)
.build(true);
service.startup(() -> 1);
String groupId = "share-group-id";
String badTopicName1 = "bad-topic-1";
Uuid badTopicId1 = Uuid.randomUuid();
String badTopicName2 = "bad-topic-2";
Uuid badTopicId2 = Uuid.randomUuid();
int partition = 1;
DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId(groupId)
.setTopics(List.of(
new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(TOPIC_NAME),
new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(badTopicName1),
new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(badTopicName2)
));
DeleteShareGroupStateRequestData deleteShareGroupStateRequestData = new DeleteShareGroupStateRequestData()
.setGroupId(groupId)
.setTopics(List.of(
new DeleteShareGroupStateRequestData.DeleteStateData()
.setTopicId(TOPIC_ID)
.setPartitions(List.of(
new DeleteShareGroupStateRequestData.PartitionData()
.setPartition(partition)
)),
new DeleteShareGroupStateRequestData.DeleteStateData()
.setTopicId(badTopicId2)
.setPartitions(List.of(
new DeleteShareGroupStateRequestData.PartitionData()
.setPartition(partition)
))
));
DeleteShareGroupOffsetsResponseData responseData = new DeleteShareGroupOffsetsResponseData()
.setResponses(List.of(
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
.setTopicName(badTopicName1)
.setTopicId(badTopicId1)
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message()),
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
.setTopicName(badTopicName2)
.setTopicId(badTopicId2)
.setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
.setErrorMessage(Errors.UNKNOWN_SERVER_ERROR.message()),
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
.setTopicName(TOPIC_NAME)
.setTopicId(TOPIC_ID)
.setErrorCode(Errors.NONE.code())
.setErrorMessage(null)
));
DeleteShareGroupStateResponseData deleteShareGroupStateResponseData = new DeleteShareGroupStateResponseData()
.setResults(List.of(
new DeleteShareGroupStateResponseData.DeleteStateResult()
.setTopicId(badTopicId2)
.setPartitions(List.of(
new DeleteShareGroupStateResponseData.PartitionResult()
.setPartition(partition)
.setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
.setErrorMessage(Errors.UNKNOWN_SERVER_ERROR.message())
)),
new DeleteShareGroupStateResponseData.DeleteStateResult()
.setTopicId(TOPIC_ID)
.setPartitions(List.of(
new DeleteShareGroupStateResponseData.PartitionResult()
.setPartition(partition)
.setErrorCode(Errors.NONE.code())
.setErrorMessage(null)
))
));
GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder deleteShareGroupOffsetsResultHolder =
new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder(
Errors.NONE.code(),
null,
List.of(
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
.setTopicId(badTopicId1)
.setTopicName(badTopicName1)
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())
),
DeleteShareGroupStateParameters.from(deleteShareGroupStateRequestData)
);
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("initiate-delete-share-group-offsets"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.eq(Duration.ofMillis(5000)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(deleteShareGroupOffsetsResultHolder));
DeleteShareGroupStateParameters deleteShareGroupStateParameters = DeleteShareGroupStateParameters.from(deleteShareGroupStateRequestData);
DeleteShareGroupStateResult deleteShareGroupStateResult = DeleteShareGroupStateResult.from(deleteShareGroupStateResponseData);
when(persister.deleteState(
ArgumentMatchers.eq(deleteShareGroupStateParameters)
)).thenReturn(CompletableFuture.completedFuture(deleteShareGroupStateResult));
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("complete-delete-share-group-offsets"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.eq(Duration.ofMillis(5000)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(responseData));
CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS), requestData);
assertEquals(responseData, future.get());
}
@Test
public void testPersisterInitializeSuccess() {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();

View File

@ -1955,7 +1955,7 @@ public class GroupCoordinatorShardTest {
}
@Test
public void testShareGroupDeleteOffsetsRequestGroupNotFound() {
public void testInitiateDeleteShareGroupOffsetsGroupNotFound() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
@ -1976,24 +1976,26 @@ public class GroupCoordinatorShardTest {
.setGroupId(groupId)
.setTopics(List.of(new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName("topic-1")
.setPartitions(List.of(0))
));
GroupIdNotFoundException exception = new GroupIdNotFoundException("group Id not found");
doThrow(exception).when(groupMetadataManager).shareGroup(eq(groupId));
GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder expectedResult =
new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder(Errors.forException(exception).code(), exception.getMessage());
CoordinatorResult<GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder, CoordinatorRecord> expectedResult =
new CoordinatorResult<>(
List.of(),
new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder(Errors.forException(exception).code(), exception.getMessage())
);
assertEquals(expectedResult, coordinator.shareGroupDeleteOffsetsRequest(groupId, requestData));
assertEquals(expectedResult, coordinator.initiateDeleteShareGroupOffsets(groupId, requestData));
verify(groupMetadataManager, times(1)).shareGroup(eq(groupId));
// Not called because of Group not found.
verify(groupMetadataManager, times(0)).sharePartitionsEligibleForOffsetDeletion(any(), any(), any());
verify(groupMetadataManager, times(0)).sharePartitionsEligibleForOffsetDeletion(any(), any(), any(), any());
}
@Test
public void testShareGroupDeleteOffsetsRequestNonEmptyShareGroup() {
public void testInitiateDeleteShareGroupOffsetsNonEmptyShareGroup() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
@ -2014,7 +2016,6 @@ public class GroupCoordinatorShardTest {
.setGroupId(groupId)
.setTopics(List.of(new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName("topic-1")
.setPartitions(List.of(0))
));
ShareGroup shareGroup = mock(ShareGroup.class);
@ -2023,17 +2024,20 @@ public class GroupCoordinatorShardTest {
when(groupMetadataManager.shareGroup(eq(groupId))).thenReturn(shareGroup);
GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder expectedResult =
new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder(Errors.forException(exception).code(), exception.getMessage());
CoordinatorResult<GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder, CoordinatorRecord> expectedResult =
new CoordinatorResult<>(
List.of(),
new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder(Errors.forException(exception).code(), exception.getMessage())
);
assertEquals(expectedResult, coordinator.shareGroupDeleteOffsetsRequest(groupId, requestData));
assertEquals(expectedResult, coordinator.initiateDeleteShareGroupOffsets(groupId, requestData));
verify(groupMetadataManager, times(1)).shareGroup(eq(groupId));
// Not called because of Group not found.
verify(groupMetadataManager, times(0)).sharePartitionsEligibleForOffsetDeletion(any(), any(), any());
verify(groupMetadataManager, times(0)).sharePartitionsEligibleForOffsetDeletion(any(), any(), any(), any());
}
@Test
public void testShareGroupDeleteOffsetsRequestEmptyResult() {
public void testInitiateDeleteShareGroupOffsetsEmptyResult() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
@ -2052,12 +2056,10 @@ public class GroupCoordinatorShardTest {
String groupId = "share-group";
String topicName = "topic-1";
Uuid topicId = Uuid.randomUuid();
int partition = 0;
DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId(groupId)
.setTopics(List.of(new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(topicName)
.setPartitions(List.of(partition))
));
ShareGroup shareGroup = mock(ShareGroup.class);
@ -2069,29 +2071,32 @@ public class GroupCoordinatorShardTest {
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
.setTopicName(topicName)
.setTopicId(topicId)
.setPartitions(List.of(new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
.setPartitionIndex(partition)
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())))
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())
);
when(groupMetadataManager.sharePartitionsEligibleForOffsetDeletion(eq(groupId), eq(requestData), any()))
List<CoordinatorRecord> records = new ArrayList<>();
when(groupMetadataManager.sharePartitionsEligibleForOffsetDeletion(eq(groupId), eq(requestData), any(), any()))
.thenAnswer(invocation -> {
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> inputList = invocation.getArgument(2);
inputList.addAll(errorTopicResponseList);
return List.of();
});
GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder expectedResult =
new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder(Errors.NONE.code(), null, errorTopicResponseList);
CoordinatorResult<GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder, CoordinatorRecord> expectedResult =
new CoordinatorResult<>(
records,
new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder(Errors.NONE.code(), null, errorTopicResponseList)
);
assertEquals(expectedResult, coordinator.shareGroupDeleteOffsetsRequest(groupId, requestData));
assertEquals(expectedResult, coordinator.initiateDeleteShareGroupOffsets(groupId, requestData));
verify(groupMetadataManager, times(1)).shareGroup(eq(groupId));
verify(groupMetadataManager, times(1)).sharePartitionsEligibleForOffsetDeletion(any(), any(), any());
verify(groupMetadataManager, times(1)).sharePartitionsEligibleForOffsetDeletion(any(), any(), any(), any());
}
@Test
public void testShareGroupDeleteOffsetsRequestSuccess() {
public void testInitiateDeleteShareGroupOffsetsSuccess() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
@ -2117,11 +2122,9 @@ public class GroupCoordinatorShardTest {
.setGroupId(groupId)
.setTopics(List.of(
new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(topicName1)
.setPartitions(List.of(partition)),
.setTopicName(topicName1),
new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(topicName2)
.setPartitions(List.of(partition))
));
ShareGroup shareGroup = mock(ShareGroup.class);
@ -2145,28 +2148,46 @@ public class GroupCoordinatorShardTest {
))
);
when(groupMetadataManager.sharePartitionsEligibleForOffsetDeletion(eq(groupId), eq(requestData), any()))
.thenReturn(deleteShareGroupStateRequestTopicsData);
List<CoordinatorRecord> expectedRecords = List.of(
GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord(
groupId,
Map.of(),
Map.of(),
Map.of(
topicId1, topicName1,
topicId2, topicName2
)
)
);
when(groupMetadataManager.sharePartitionsEligibleForOffsetDeletion(eq(groupId), eq(requestData), any(), any()))
.thenAnswer(invocation -> {
List<CoordinatorRecord> records = invocation.getArgument(3);
records.addAll(expectedRecords);
return deleteShareGroupStateRequestTopicsData;
});
GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder expectedResult =
new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder(
Errors.NONE.code(),
null,
List.of(),
DeleteShareGroupStateParameters.from(
new DeleteShareGroupStateRequestData()
.setGroupId(requestData.groupId())
.setTopics(deleteShareGroupStateRequestTopicsData)
));
CoordinatorResult<GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder, CoordinatorRecord> expectedResult =
new CoordinatorResult<>(
expectedRecords,
new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder(
Errors.NONE.code(),
null,
List.of(),
DeleteShareGroupStateParameters.from(
new DeleteShareGroupStateRequestData()
.setGroupId(requestData.groupId())
.setTopics(deleteShareGroupStateRequestTopicsData)
))
);
assertEquals(expectedResult, coordinator.shareGroupDeleteOffsetsRequest(groupId, requestData));
assertEquals(expectedResult, coordinator.initiateDeleteShareGroupOffsets(groupId, requestData));
verify(groupMetadataManager, times(1)).shareGroup(eq(groupId));
verify(groupMetadataManager, times(1)).sharePartitionsEligibleForOffsetDeletion(any(), any(), any());
verify(groupMetadataManager, times(1)).sharePartitionsEligibleForOffsetDeletion(any(), any(), any(), any());
}
@Test
public void testShareGroupDeleteOffsetsRequestSuccessWithErrorTopics() {
public void testInitiateDeleteShareGroupOffsetsSuccessWithErrorTopics() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
@ -2192,11 +2213,9 @@ public class GroupCoordinatorShardTest {
.setGroupId(groupId)
.setTopics(List.of(
new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(topicName1)
.setPartitions(List.of(partition)),
.setTopicName(topicName1),
new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(topicName2)
.setPartitions(List.of(partition))
));
ShareGroup shareGroup = mock(ShareGroup.class);
@ -2219,36 +2238,208 @@ public class GroupCoordinatorShardTest {
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
.setTopicName(topicName2)
.setTopicId(topicId2)
.setPartitions(List.of(
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
.setPartitionIndex(partition)
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())
))
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())
);
when(groupMetadataManager.sharePartitionsEligibleForOffsetDeletion(eq(groupId), eq(requestData), any()))
List<CoordinatorRecord> expectedRecord = List.of(
GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord(
groupId,
Map.of(),
Map.of(),
Map.of(
topicId1, topicName1
)
)
);
when(groupMetadataManager.sharePartitionsEligibleForOffsetDeletion(eq(groupId), eq(requestData), any(), any()))
.thenAnswer(invocation -> {
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> inputList = invocation.getArgument(2);
inputList.addAll(errorTopicResponseList);
List<CoordinatorRecord> records = invocation.getArgument(3);
records.addAll(expectedRecord);
return deleteShareGroupStateRequestTopicsData;
});
GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder expectedResult =
new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder(
Errors.NONE.code(),
null,
errorTopicResponseList,
DeleteShareGroupStateParameters.from(
new DeleteShareGroupStateRequestData()
.setGroupId(requestData.groupId())
.setTopics(deleteShareGroupStateRequestTopicsData)
));
CoordinatorResult<GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder, CoordinatorRecord> expectedResult =
new CoordinatorResult<>(
expectedRecord,
new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder(
Errors.NONE.code(),
null,
errorTopicResponseList,
DeleteShareGroupStateParameters.from(
new DeleteShareGroupStateRequestData()
.setGroupId(requestData.groupId())
.setTopics(deleteShareGroupStateRequestTopicsData)
))
);
assertEquals(expectedResult, coordinator.shareGroupDeleteOffsetsRequest(groupId, requestData));
assertEquals(expectedResult, coordinator.initiateDeleteShareGroupOffsets(groupId, requestData));
verify(groupMetadataManager, times(1)).shareGroup(eq(groupId));
verify(groupMetadataManager, times(1)).sharePartitionsEligibleForOffsetDeletion(any(), any(), any());
verify(groupMetadataManager, times(1)).sharePartitionsEligibleForOffsetDeletion(any(), any(), any(), any());
}
@Test
public void testCompleteDeleteShareGroupOffsetsSuccess() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
offsetMetadataManager,
Time.SYSTEM,
new MockCoordinatorTimer<>(Time.SYSTEM),
mock(GroupCoordinatorConfig.class),
coordinatorMetrics,
metricsShard
);
String groupId = "share-group";
String topicName1 = "topic-1";
Uuid topicId1 = Uuid.randomUuid();
String topicName2 = "topic-2";
Uuid topicId2 = Uuid.randomUuid();
Map<Uuid, String> topics = Map.of(
topicId1, topicName1,
topicId2, topicName2
);
ShareGroup shareGroup = mock(ShareGroup.class);
doNothing().when(shareGroup).validateDeleteGroup();
when(groupMetadataManager.shareGroup(eq(groupId))).thenReturn(shareGroup);
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> resultTopics = List.of(
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
.setTopicId(topicId1)
.setTopicName(topicName1)
.setErrorCode(Errors.NONE.code())
.setErrorMessage(null),
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
.setTopicId(topicId2)
.setTopicName(topicName2)
.setErrorCode(Errors.NONE.code())
.setErrorMessage(null)
);
List<CoordinatorRecord> expectedRecords = List.of(
GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord(
groupId,
Map.of(),
Map.of(),
Map.of()
)
);
when(groupMetadataManager.completeDeleteShareGroupOffsets(eq(groupId), eq(topics), any()))
.thenAnswer(invocation -> {
List<CoordinatorRecord> records = invocation.getArgument(2);
records.addAll(expectedRecords);
return resultTopics;
});
CoordinatorResult<DeleteShareGroupOffsetsResponseData, CoordinatorRecord> expectedResult =
new CoordinatorResult<>(
expectedRecords,
new DeleteShareGroupOffsetsResponseData()
.setResponses(resultTopics)
);
assertEquals(expectedResult, coordinator.completeDeleteShareGroupOffsets(groupId, topics, List.of()));
verify(groupMetadataManager, times(1)).completeDeleteShareGroupOffsets(any(), any(), any());
}
@Test
public void testCompleteDeleteShareGroupOffsetsSuccessWithErrorTopics() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
offsetMetadataManager,
Time.SYSTEM,
new MockCoordinatorTimer<>(Time.SYSTEM),
mock(GroupCoordinatorConfig.class),
coordinatorMetrics,
metricsShard
);
String groupId = "share-group";
String topicName1 = "topic-1";
Uuid topicId1 = Uuid.randomUuid();
String topicName2 = "topic-2";
Uuid topicId2 = Uuid.randomUuid();
String topicName3 = "topic-3";
Uuid topicId3 = Uuid.randomUuid();
Map<Uuid, String> topics = Map.of(
topicId1, topicName1,
topicId2, topicName2
);
ShareGroup shareGroup = mock(ShareGroup.class);
doNothing().when(shareGroup).validateDeleteGroup();
when(groupMetadataManager.shareGroup(eq(groupId))).thenReturn(shareGroup);
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> resultTopics = List.of(
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
.setTopicId(topicId1)
.setTopicName(topicName1)
.setErrorCode(Errors.NONE.code())
.setErrorMessage(null),
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
.setTopicId(topicId2)
.setTopicName(topicName2)
.setErrorCode(Errors.NONE.code())
.setErrorMessage(null)
);
List<CoordinatorRecord> expectedRecords = List.of(
GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord(
groupId,
Map.of(),
Map.of(),
Map.of()
)
);
when(groupMetadataManager.completeDeleteShareGroupOffsets(eq(groupId), eq(topics), any()))
.thenAnswer(invocation -> {
List<CoordinatorRecord> records = invocation.getArgument(2);
records.addAll(expectedRecords);
return resultTopics;
});
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> errorTopicResponseList = new ArrayList<>();
errorTopicResponseList.add(
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
.setTopicId(topicId3)
.setTopicName(topicName3)
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())
);
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> expectedResultTopics = new ArrayList<>(resultTopics);
expectedResultTopics.addAll(errorTopicResponseList);
CoordinatorResult<DeleteShareGroupOffsetsResponseData, CoordinatorRecord> expectedResult =
new CoordinatorResult<>(
expectedRecords,
new DeleteShareGroupOffsetsResponseData()
.setResponses(expectedResultTopics)
);
assertEquals(expectedResult, coordinator.completeDeleteShareGroupOffsets(groupId, topics, errorTopicResponseList));
verify(groupMetadataManager, times(1)).completeDeleteShareGroupOffsets(any(), any(), any());
}
}

View File

@ -20667,7 +20667,6 @@ public class GroupMetadataManagerTest {
.build();
String groupId = "share-group";
Uuid memberId = Uuid.randomUuid();
String topicName1 = "topic-1";
String topicName2 = "topic-2";
Uuid topicId1 = Uuid.randomUuid();
@ -20680,31 +20679,22 @@ public class GroupMetadataManagerTest {
context.groupMetadataManager.onNewMetadataImage(image, mock(MetadataDelta.class));
context.shareGroupHeartbeat(
new ShareGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId.toString())
.setMemberEpoch(0)
.setSubscribedTopicNames(List.of(topicName1, topicName2)));
context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 0, 0));
context.groupMetadataManager.replay(
new ShareGroupStatePartitionMetadataKey()
.setGroupId(groupId),
new ShareGroupStatePartitionMetadataValue()
.setInitializingTopics(List.of())
.setInitializedTopics(List.of(
new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo()
.setTopicId(topicId1)
.setTopicName(topicName1)
.setPartitions(List.of(0, 1, 2)),
new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo()
.setTopicId(topicId2)
.setTopicName(topicName2)
.setPartitions(List.of(0, 1))
))
.setDeletingTopics(List.of())
context.replay(
GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord(
groupId,
Map.of(),
Map.of(
topicId1, Map.entry(topicName1, Set.of(0, 1, 2)),
topicId2, Map.entry(topicName2, Set.of(0, 1))
),
Map.of()
)
);
context.commit();
List<DeleteShareGroupStateRequestData.DeleteStateData> expectedResult = List.of(
new DeleteShareGroupStateRequestData.DeleteStateData()
.setTopicId(topicId1)
@ -20726,23 +20716,148 @@ public class GroupMetadataManagerTest {
))
);
List<CoordinatorRecord> expectedRecords = List.of(
newShareGroupStatePartitionMetadataRecord(
groupId,
Map.of(),
Map.of(),
Map.of(topicId1, topicName1, topicId2, topicName2)
)
);
DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId(groupId)
.setTopics(List.of(
new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(topicName1)
.setPartitions(List.of(0, 1, 2)),
.setTopicName(topicName1),
new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(topicName2)
.setPartitions(List.of(0, 1))
));
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> errorTopicResponseList = new ArrayList<>();
List<CoordinatorRecord> records = new ArrayList<>();
List<DeleteShareGroupStateRequestData.DeleteStateData> result =
context.groupMetadataManager.sharePartitionsEligibleForOffsetDeletion(groupId, requestData, errorTopicResponseList);
context.groupMetadataManager.sharePartitionsEligibleForOffsetDeletion(groupId, requestData, errorTopicResponseList, records);
assertTrue(errorTopicResponseList.isEmpty());
assertEquals(expectedResult, result);
assertRecordsEquals(expectedRecords, records);
}
@Test
public void testSharePartitionsEligibleForOffsetDeletionContainsDeletingTopics() {
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
assignor.prepareGroupAssignment(new GroupAssignment(Map.of()));
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withShareGroupAssignor(assignor)
.build();
String groupId = "share-group";
String topicName1 = "topic-1";
String topicName2 = "topic-2";
String topicName3 = "topic-3";
String topicName4 = "topic-4";
Uuid topicId1 = Uuid.randomUuid();
Uuid topicId2 = Uuid.randomUuid();
Uuid topicId3 = Uuid.randomUuid();
Uuid topicId4 = Uuid.randomUuid();
MetadataImage image = new MetadataImageBuilder()
.addTopic(topicId1, topicName1, 3)
.addTopic(topicId2, topicName2, 2)
.addTopic(topicId3, topicName3, 2)
.addTopic(topicId4, topicName4, 2)
.build();
context.groupMetadataManager.onNewMetadataImage(image, mock(MetadataDelta.class));
context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 0, 0));
context.replay(
GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord(
groupId,
Map.of(),
Map.of(
topicId1, Map.entry(topicName1, Set.of(0, 1, 2)),
topicId2, Map.entry(topicName2, Set.of(0, 1))
),
Map.of(
topicId3, topicName3,
topicId4, topicName4
)
)
);
context.commit();
// Because "topic-4" not a part of the request data, it will not be added to the result, even though it is part
// of the deletingTopics set. "topic-3" isn't currently initialized for the group, but since it is part of the
// deletingTopics set, it will still be included in the result and tried to get deleted by the persister.
List<DeleteShareGroupStateRequestData.DeleteStateData> expectedResult = List.of(
new DeleteShareGroupStateRequestData.DeleteStateData()
.setTopicId(topicId1)
.setPartitions(List.of(
new DeleteShareGroupStateRequestData.PartitionData()
.setPartition(0),
new DeleteShareGroupStateRequestData.PartitionData()
.setPartition(1),
new DeleteShareGroupStateRequestData.PartitionData()
.setPartition(2)
)),
new DeleteShareGroupStateRequestData.DeleteStateData()
.setTopicId(topicId2)
.setPartitions(List.of(
new DeleteShareGroupStateRequestData.PartitionData()
.setPartition(0),
new DeleteShareGroupStateRequestData.PartitionData()
.setPartition(1)
)),
new DeleteShareGroupStateRequestData.DeleteStateData()
.setTopicId(topicId3)
.setPartitions(List.of(
new DeleteShareGroupStateRequestData.PartitionData()
.setPartition(0),
new DeleteShareGroupStateRequestData.PartitionData()
.setPartition(1)
))
);
// The ShareGroupStatePartitionMetadata record will contain all 4 topics in the deletingTopics list
List<CoordinatorRecord> expectedRecords = List.of(
newShareGroupStatePartitionMetadataRecord(
groupId,
Map.of(),
Map.of(),
Map.of(
topicId1, topicName1,
topicId2, topicName2,
topicId3, topicName3,
topicId4, topicName4
)
)
);
DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId(groupId)
.setTopics(List.of(
new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(topicName1),
new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(topicName2),
new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(topicName3)
));
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> errorTopicResponseList = new ArrayList<>();
List<CoordinatorRecord> records = new ArrayList<>();
List<DeleteShareGroupStateRequestData.DeleteStateData> result =
context.groupMetadataManager.sharePartitionsEligibleForOffsetDeletion(groupId, requestData, errorTopicResponseList, records);
assertTrue(errorTopicResponseList.isEmpty());
assertEquals(expectedResult, result);
assertRecordsEquals(expectedRecords, records);
}
@Test
@ -20754,7 +20869,6 @@ public class GroupMetadataManagerTest {
.build();
String groupId = "share-group";
Uuid memberId = Uuid.randomUuid();
String topicName1 = "topic-1";
String topicName2 = "topic-2";
Uuid topicId1 = Uuid.randomUuid();
@ -20765,27 +20879,19 @@ public class GroupMetadataManagerTest {
context.groupMetadataManager.onNewMetadataImage(image, mock(MetadataDelta.class));
context.shareGroupHeartbeat(
new ShareGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId.toString())
.setMemberEpoch(0)
.setSubscribedTopicNames(List.of(topicName1)));
context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 0, 0));
context.groupMetadataManager.replay(
new ShareGroupStatePartitionMetadataKey()
.setGroupId(groupId),
new ShareGroupStatePartitionMetadataValue()
.setInitializingTopics(List.of())
.setInitializedTopics(List.of(
new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo()
.setTopicId(topicId1)
.setTopicName(topicName1)
.setPartitions(List.of(0, 1, 2))
))
.setDeletingTopics(List.of())
context.replay(
GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord(
groupId,
Map.of(),
Map.of(topicId1, Map.entry(topicName1, Set.of(0, 1, 2))),
Map.of()
)
);
context.commit();
List<DeleteShareGroupStateRequestData.DeleteStateData> expectedResult = List.of(
new DeleteShareGroupStateRequestData.DeleteStateData()
.setTopicId(topicId1)
@ -20799,39 +20905,41 @@ public class GroupMetadataManagerTest {
))
);
List<CoordinatorRecord> expectedRecords = List.of(
newShareGroupStatePartitionMetadataRecord(
groupId,
Map.of(),
Map.of(),
Map.of(topicId1, topicName1)
)
);
DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId(groupId)
.setTopics(List.of(
new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(topicName1)
.setPartitions(List.of(0, 1, 2)),
.setTopicName(topicName1),
new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(topicName2)
.setPartitions(List.of(0, 1))
));
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> errorTopicResponseList = new ArrayList<>();
List<CoordinatorRecord> records = new ArrayList<>();
List<DeleteShareGroupStateRequestData.DeleteStateData> result =
context.groupMetadataManager.sharePartitionsEligibleForOffsetDeletion(groupId, requestData, errorTopicResponseList);
context.groupMetadataManager.sharePartitionsEligibleForOffsetDeletion(groupId, requestData, errorTopicResponseList, records);
assertEquals(
List.of(
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
.setTopicName(topicName2)
.setPartitions(List.of(
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
.setPartitionIndex(0)
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message()),
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
.setPartitionIndex(1)
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())
))
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())
),
errorTopicResponseList
);
assertEquals(expectedResult, result);
assertRecordsEquals(expectedRecords, records);
}
@Test
@ -20843,7 +20951,6 @@ public class GroupMetadataManagerTest {
.build();
String groupId = "share-group";
Uuid memberId = Uuid.randomUuid();
String topicName1 = "topic-1";
String topicName2 = "topic-2";
Uuid topicId1 = Uuid.randomUuid();
@ -20856,32 +20963,19 @@ public class GroupMetadataManagerTest {
context.groupMetadataManager.onNewMetadataImage(image, mock(MetadataDelta.class));
context.shareGroupHeartbeat(
new ShareGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId.toString())
.setMemberEpoch(0)
.setSubscribedTopicNames(List.of(topicName1, topicName2)));
context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 0, 0));
context.groupMetadataManager.replay(
new ShareGroupStatePartitionMetadataKey()
.setGroupId(groupId),
new ShareGroupStatePartitionMetadataValue()
.setInitializedTopics(List.of(
new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo()
.setTopicId(topicId1)
.setTopicName(topicName1)
.setPartitions(List.of(0, 1, 2))
))
.setInitializingTopics(List.of(
new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo()
.setTopicId(topicId2)
.setTopicName(topicName2)
.setPartitions(List.of(0, 1))
))
.setDeletingTopics(List.of())
context.replay(
GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord(
groupId,
Map.of(topicId2, Map.entry(topicName2, Set.of(0, 1))),
Map.of(topicId1, Map.entry(topicName1, Set.of(0, 1, 2))),
Map.of()
)
);
context.commit();
List<DeleteShareGroupStateRequestData.DeleteStateData> expectedResult = List.of(
new DeleteShareGroupStateRequestData.DeleteStateData()
.setTopicId(topicId1)
@ -20895,23 +20989,41 @@ public class GroupMetadataManagerTest {
))
);
List<CoordinatorRecord> expectedRecords = List.of(
newShareGroupStatePartitionMetadataRecord(
groupId,
Map.of(topicId2, Map.entry(topicName2, Set.of(0, 1))),
Map.of(),
Map.of(topicId1, topicName1)
)
);
DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId(groupId)
.setTopics(List.of(
new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(topicName1)
.setPartitions(List.of(0, 1, 2)),
.setTopicName(topicName1),
new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(topicName2)
.setPartitions(List.of(0, 1))
));
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> errorTopicResponseList = new ArrayList<>();
List<DeleteShareGroupStateRequestData.DeleteStateData> result =
context.groupMetadataManager.sharePartitionsEligibleForOffsetDeletion(groupId, requestData, errorTopicResponseList);
List<CoordinatorRecord> records = new ArrayList<>();
assertTrue(errorTopicResponseList.isEmpty());
List<DeleteShareGroupStateRequestData.DeleteStateData> result =
context.groupMetadataManager.sharePartitionsEligibleForOffsetDeletion(groupId, requestData, errorTopicResponseList, records);
assertEquals(
List.of(
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
.setTopicName(topicName2)
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
.setErrorMessage("There is no offset information to delete.")
),
errorTopicResponseList
);
assertEquals(expectedResult, result);
assertRecordsEquals(expectedRecords, records);
}
@Test
@ -20923,7 +21035,6 @@ public class GroupMetadataManagerTest {
.build();
String groupId = "share-group";
Uuid memberId = Uuid.randomUuid();
String topicName1 = "topic-1";
String topicName2 = "topic-2";
String topicName3 = "topic-3";
@ -20937,32 +21048,19 @@ public class GroupMetadataManagerTest {
context.groupMetadataManager.onNewMetadataImage(image, mock(MetadataDelta.class));
context.shareGroupHeartbeat(
new ShareGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId.toString())
.setMemberEpoch(0)
.setSubscribedTopicNames(List.of(topicName1, topicName2)));
context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 0, 0));
context.groupMetadataManager.replay(
new ShareGroupStatePartitionMetadataKey()
.setGroupId(groupId),
new ShareGroupStatePartitionMetadataValue()
.setInitializedTopics(List.of(
new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo()
.setTopicId(topicId1)
.setTopicName(topicName1)
.setPartitions(List.of(0, 1, 2))
))
.setInitializingTopics(List.of(
new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo()
.setTopicId(topicId2)
.setTopicName(topicName2)
.setPartitions(List.of(0, 1))
))
.setDeletingTopics(List.of())
context.replay(
GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord(
groupId,
Map.of(topicId2, Map.entry(topicName2, Set.of(0, 1))),
Map.of(topicId1, Map.entry(topicName1, Set.of(0, 1, 2))),
Map.of()
)
);
context.commit();
List<DeleteShareGroupStateRequestData.DeleteStateData> expectedResult = List.of(
new DeleteShareGroupStateRequestData.DeleteStateData()
.setTopicId(topicId1)
@ -20976,42 +21074,161 @@ public class GroupMetadataManagerTest {
))
);
List<CoordinatorRecord> expectedRecords = List.of(
newShareGroupStatePartitionMetadataRecord(
groupId,
Map.of(topicId2, Map.entry(topicName2, Set.of(0, 1))),
Map.of(),
Map.of(topicId1, topicName1)
)
);
DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId(groupId)
.setTopics(List.of(
new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(topicName1)
.setPartitions(List.of(0, 1, 2)),
.setTopicName(topicName1),
new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(topicName2)
.setPartitions(List.of(0, 1)),
.setTopicName(topicName2),
new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(topicName3)
.setPartitions(List.of(0, 1))
));
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> errorTopicResponseList = new ArrayList<>();
List<CoordinatorRecord> records = new ArrayList<>();
List<DeleteShareGroupStateRequestData.DeleteStateData> result =
context.groupMetadataManager.sharePartitionsEligibleForOffsetDeletion(groupId, requestData, errorTopicResponseList);
context.groupMetadataManager.sharePartitionsEligibleForOffsetDeletion(groupId, requestData, errorTopicResponseList, records);
assertEquals(
List.of(
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
.setTopicName(topicName2)
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
.setErrorMessage("There is no offset information to delete."),
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
.setTopicName(topicName3)
.setPartitions(List.of(
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
.setPartitionIndex(0)
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message()),
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
.setPartitionIndex(1)
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())
))
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())
),
errorTopicResponseList
);
assertEquals(expectedResult, result);
assertRecordsEquals(expectedRecords, records);
}
@Test
public void testCompleteDeleteShareGroupOffsets() {
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
assignor.prepareGroupAssignment(new GroupAssignment(Map.of()));
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withShareGroupAssignor(assignor)
.build();
String groupId = "share-group";
String topicName1 = "topic-1";
String topicName2 = "topic-2";
Uuid topicId1 = Uuid.randomUuid();
Uuid topicId2 = Uuid.randomUuid();
MetadataImage image = new MetadataImageBuilder()
.addTopic(topicId1, topicName1, 3)
.addTopic(topicId2, topicName2, 2)
.build();
context.groupMetadataManager.onNewMetadataImage(image, mock(MetadataDelta.class));
context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 0, 0));
context.replay(
GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord(
groupId,
Map.of(),
Map.of(),
Map.of(
topicId1, topicName1,
topicId2, topicName2
)
)
);
context.commit();
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> expectedResult = List.of(
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
.setTopicId(topicId1)
.setTopicName(topicName1)
.setErrorCode(Errors.NONE.code())
.setErrorMessage(null),
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
.setTopicId(topicId2)
.setTopicName(topicName2)
.setErrorCode(Errors.NONE.code())
.setErrorMessage(null)
);
List<CoordinatorRecord> expectedRecords = List.of(
newShareGroupStatePartitionMetadataRecord(
groupId,
Map.of(),
Map.of(),
Map.of()
)
);
Map<Uuid, String> topics = Map.of(
topicId1, topicName1,
topicId2, topicName2
);
List<CoordinatorRecord> records = new ArrayList<>();
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> result =
context.groupMetadataManager.completeDeleteShareGroupOffsets(groupId, topics, records);
assertEquals(convertResponseTopicListToMap(expectedResult), convertResponseTopicListToMap(result));
assertRecordsEquals(expectedRecords, records);
}
@Test
public void testCompleteDeleteShareGroupOffsetsEmptyResult() {
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
assignor.prepareGroupAssignment(new GroupAssignment(Map.of()));
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withShareGroupAssignor(assignor)
.build();
String groupId = "share-group";
String topicName1 = "topic-1";
String topicName2 = "topic-2";
Uuid topicId1 = Uuid.randomUuid();
Uuid topicId2 = Uuid.randomUuid();
MetadataImage image = new MetadataImageBuilder()
.addTopic(topicId1, topicName1, 3)
.addTopic(topicId2, topicName2, 2)
.build();
context.groupMetadataManager.onNewMetadataImage(image, mock(MetadataDelta.class));
context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 0, 0));
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> expectedResult = List.of();
List<CoordinatorRecord> expectedRecords = List.of();
Map<Uuid, String> topics = Map.of(
topicId1, topicName1,
topicId2, topicName2
);
List<CoordinatorRecord> records = new ArrayList<>();
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> result =
context.groupMetadataManager.completeDeleteShareGroupOffsets(groupId, topics, records);
assertEquals(convertResponseTopicListToMap(expectedResult), convertResponseTopicListToMap(result));
assertRecordsEquals(expectedRecords, records);
}
@Test
@ -21603,4 +21820,11 @@ public class GroupMetadataManagerTest {
assertTrue(deleteRequest.isEmpty());
}
}
private Map<Uuid, DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> convertResponseTopicListToMap(
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> responseTopics
) {
return responseTopics.stream()
.collect(Collectors.toMap(DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic::topicId, Function.identity()));
}
}

View File

@ -477,8 +477,8 @@ public class TestingMetricsInterceptingAdminClient extends AdminClient {
}
@Override
public DeleteShareGroupOffsetsResult deleteShareGroupOffsets(final String groupId, final Set<TopicPartition> partitions, final DeleteShareGroupOffsetsOptions options) {
return adminDelegate.deleteShareGroupOffsets(groupId, partitions, options);
public DeleteShareGroupOffsetsResult deleteShareGroupOffsets(final String groupId, final Set<String> topics, final DeleteShareGroupOffsetsOptions options) {
return adminDelegate.deleteShareGroupOffsets(groupId, topics, options);
}
@Override