KAFKA-18285: Add describeStreamsGroup to Admin API (#19116)

Adds `describeStreamsGroup` to Admin API.

This exposes the result of the `DESCRIBE_STREAMS_GROUP` RPC in the Admin
API.

Reviewers: Bill Bejeck <bill@confluent.io>
This commit is contained in:
Lucas Brutschy 2025-03-07 15:56:07 +01:00 committed by GitHub
parent 53b2935c51
commit 618ea2c1ca
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 1683 additions and 0 deletions

View File

@ -1959,6 +1959,29 @@ public interface Admin extends AutoCloseable {
*/
DeleteShareGroupsResult deleteShareGroups(Collection<String> groupIds, DeleteShareGroupsOptions options);
/**
* Describe streams groups in the cluster.
*
* @param groupIds The IDs of the groups to describe.
* @param options The options to use when describing the groups.
* @return The DescribeStreamsGroupsResult.
*/
DescribeStreamsGroupsResult describeStreamsGroups(Collection<String> groupIds,
DescribeStreamsGroupsOptions options);
/**
* Describe streams groups in the cluster, with the default options.
* <p>
* This is a convenience method for {@link #describeStreamsGroups(Collection, DescribeStreamsGroupsOptions)}
* with default options. See the overload for more details.
*
* @param groupIds The IDs of the groups to describe.
* @return The DescribeStreamsGroupsResult.
*/
default DescribeStreamsGroupsResult describeStreamsGroups(Collection<String> groupIds) {
return describeStreamsGroups(groupIds, new DescribeStreamsGroupsOptions());
}
/**
* Describe some classic groups in the cluster.
*

View File

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.Collection;
/**
* Options for {@link Admin#describeStreamsGroups(Collection, DescribeStreamsGroupsOptions)}.
* <p>
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class DescribeStreamsGroupsOptions extends AbstractOptions<DescribeStreamsGroupsOptions> {
private boolean includeAuthorizedOperations;
public DescribeStreamsGroupsOptions includeAuthorizedOperations(boolean includeAuthorizedOperations) {
this.includeAuthorizedOperations = includeAuthorizedOperations;
return this;
}
public boolean includeAuthorizedOperations() {
return includeAuthorizedOperations;
}
}

View File

@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
/**
* The result of the {@link KafkaAdminClient#describeStreamsGroups(Collection, DescribeStreamsGroupsOptions)}} call.
* <p>
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class DescribeStreamsGroupsResult {
private final Map<String, KafkaFuture<StreamsGroupDescription>> futures;
public DescribeStreamsGroupsResult(final Map<String, KafkaFuture<StreamsGroupDescription>> futures) {
this.futures = Map.copyOf(futures);
}
/**
* Return a map from group id to futures which yield streams group descriptions.
*/
public Map<String, KafkaFuture<StreamsGroupDescription>> describedGroups() {
return new HashMap<>(futures);
}
/**
* Return a future which yields all StreamsGroupDescription objects, if all the describes succeed.
*/
public KafkaFuture<Map<String, StreamsGroupDescription>> all() {
return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture<?>[0])).thenApply(
nil -> {
Map<String, StreamsGroupDescription> descriptions = new HashMap<>(futures.size());
futures.forEach((key, future) -> {
try {
descriptions.put(key, future.get());
} catch (InterruptedException | ExecutionException e) {
// This should be unreachable, since the KafkaFuture#allOf already ensured
// that all of the futures completed successfully.
throw new RuntimeException(e);
}
});
return descriptions;
});
}
}

View File

@ -333,6 +333,11 @@ public class ForwardingAdmin implements Admin {
return delegate.deleteShareGroups(groupIds, options);
}
@Override
public DescribeStreamsGroupsResult describeStreamsGroups(Collection<String> groupIds, DescribeStreamsGroupsOptions options) {
return delegate.describeStreamsGroups(groupIds, options);
}
@Override
public ListGroupsResult listGroups(ListGroupsOptions options) {
return delegate.listGroups(options);

View File

@ -55,6 +55,7 @@ import org.apache.kafka.clients.admin.internals.DescribeClassicGroupsHandler;
import org.apache.kafka.clients.admin.internals.DescribeConsumerGroupsHandler;
import org.apache.kafka.clients.admin.internals.DescribeProducersHandler;
import org.apache.kafka.clients.admin.internals.DescribeShareGroupsHandler;
import org.apache.kafka.clients.admin.internals.DescribeStreamsGroupsHandler;
import org.apache.kafka.clients.admin.internals.DescribeTransactionsHandler;
import org.apache.kafka.clients.admin.internals.FenceProducersHandler;
import org.apache.kafka.clients.admin.internals.ListConsumerGroupOffsetsHandler;
@ -3840,6 +3841,17 @@ public class KafkaAdminClient extends AdminClient {
return new ListShareGroupOffsetsResult(future.all());
}
@Override
public DescribeStreamsGroupsResult describeStreamsGroups(final Collection<String> groupIds,
final DescribeStreamsGroupsOptions options) {
SimpleAdminApiFuture<CoordinatorKey, StreamsGroupDescription> future =
DescribeStreamsGroupsHandler.newFuture(groupIds);
DescribeStreamsGroupsHandler handler = new DescribeStreamsGroupsHandler(options.includeAuthorizedOperations(), logContext);
invokeDriver(handler, future, options.timeoutMs);
return new DescribeStreamsGroupsResult(future.all().entrySet().stream()
.collect(Collectors.toMap(entry -> entry.getKey().idValue, Map.Entry::getValue)));
}
@Override
public DescribeClassicGroupsResult describeClassicGroups(final Collection<String> groupIds,
final DescribeClassicGroupsOptions options) {

View File

@ -0,0 +1,180 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.Collection;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
/**
* A detailed description of a single streams group in the cluster.
*/
@InterfaceStability.Evolving
public class StreamsGroupDescription {
private final String groupId;
private final int groupEpoch;
private final int targetAssignmentEpoch;
private final int topologyEpoch;
private final Collection<StreamsGroupSubtopologyDescription> subtopologies;
private final Collection<StreamsGroupMemberDescription> members;
private final GroupState groupState;
private final Node coordinator;
private final Set<AclOperation> authorizedOperations;
public StreamsGroupDescription(
final String groupId,
final int groupEpoch,
final int targetAssignmentEpoch,
final int topologyEpoch,
final Collection<StreamsGroupSubtopologyDescription> subtopologies,
final Collection<StreamsGroupMemberDescription> members,
final GroupState groupState,
final Node coordinator,
final Set<AclOperation> authorizedOperations
) {
this.groupId = Objects.requireNonNull(groupId, "groupId must be non-null");
this.groupEpoch = groupEpoch;
this.targetAssignmentEpoch = targetAssignmentEpoch;
this.topologyEpoch = topologyEpoch;
this.subtopologies = Objects.requireNonNull(subtopologies, "subtopologies must be non-null");
this.members = Objects.requireNonNull(members, "members must be non-null");
this.groupState = Objects.requireNonNull(groupState, "groupState must be non-null");
this.coordinator = Objects.requireNonNull(coordinator, "coordinator must be non-null");
this.authorizedOperations = authorizedOperations;
}
/**
* The id of the streams group.
*/
public String groupId() {
return groupId;
}
/**
* The epoch of the consumer group.
*/
public int groupEpoch() {
return groupEpoch;
}
/**
* The epoch of the target assignment.
*/
public int targetAssignmentEpoch() {
return targetAssignmentEpoch;
}
/**
* The epoch of the currently used topology.
*/
public int topologyEpoch() {
return topologyEpoch;
}
/**
* A list of the members of the streams group.
*/
public Collection<StreamsGroupMemberDescription> members() {
return members;
}
/**
* A list of the subtopologies in the streams group.
*/
public Collection<StreamsGroupSubtopologyDescription> subtopologies() {
return subtopologies;
}
/**
* The state of the streams group, or UNKNOWN if the state is too new for us to parse.
*/
public GroupState groupState() {
return groupState;
}
/**
* The group coordinator, or null if the coordinator is not known.
*/
public Node coordinator() {
return coordinator;
}
/**
* authorizedOperations for this group, or null if that information is not known.
*/
public Set<AclOperation> authorizedOperations() {
return authorizedOperations;
}
@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final StreamsGroupDescription that = (StreamsGroupDescription) o;
return groupEpoch == that.groupEpoch
&& targetAssignmentEpoch == that.targetAssignmentEpoch
&& topologyEpoch == that.topologyEpoch
&& Objects.equals(groupId, that.groupId)
&& Objects.equals(subtopologies, that.subtopologies)
&& Objects.equals(members, that.members)
&& groupState == that.groupState
&& Objects.equals(coordinator, that.coordinator)
&& Objects.equals(authorizedOperations, that.authorizedOperations);
}
@Override
public int hashCode() {
return Objects.hash(
groupId,
groupEpoch,
targetAssignmentEpoch,
topologyEpoch,
subtopologies,
members,
groupState,
coordinator,
authorizedOperations
);
}
@Override
public String toString() {
return "(" +
"groupId=" + groupId +
", groupEpoch=" + groupEpoch +
", targetAssignmentEpoch=" + targetAssignmentEpoch +
", topologyEpoch=" + topologyEpoch +
", subtopologies=" + subtopologies.stream().map(StreamsGroupSubtopologyDescription::toString).collect(Collectors.joining(",")) +
", members=" + members.stream().map(StreamsGroupMemberDescription::toString).collect(Collectors.joining(",")) +
", groupState=" + groupState +
", coordinator=" + coordinator +
", authorizedOperations=" + authorizedOperations.stream().map(AclOperation::toString).collect(Collectors.joining(",")) +
')';
}
}

View File

@ -0,0 +1,150 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
/**
* A description of the assignments of a specific group member.
*/
@InterfaceStability.Evolving
public class StreamsGroupMemberAssignment {
private final List<TaskIds> activeTasks;
private final List<TaskIds> standbyTasks;
private final List<TaskIds> warmupTasks;
public StreamsGroupMemberAssignment(
final List<TaskIds> activeTasks,
final List<TaskIds> standbyTasks,
final List<TaskIds> warmupTasks
) {
this.activeTasks = activeTasks;
this.standbyTasks = standbyTasks;
this.warmupTasks = warmupTasks;
}
/**
* Active tasks for this client.
*/
public List<TaskIds> activeTasks() {
return List.copyOf(activeTasks);
}
/**
* Standby tasks for this client.
*/
public List<TaskIds> standbyTasks() {
return List.copyOf(standbyTasks);
}
/**
* Warmup tasks for this client.
*/
public List<TaskIds> warmupTasks() {
return List.copyOf(warmupTasks);
}
@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final StreamsGroupMemberAssignment that = (StreamsGroupMemberAssignment) o;
return Objects.equals(activeTasks, that.activeTasks)
&& Objects.equals(standbyTasks, that.standbyTasks)
&& Objects.equals(warmupTasks, that.warmupTasks);
}
@Override
public int hashCode() {
return Objects.hash(
activeTasks,
standbyTasks,
warmupTasks
);
}
@Override
public String toString() {
return "(" +
"activeTasks=" + activeTasks.stream().map(TaskIds::toString).collect(Collectors.joining(",")) +
", standbyTasks=" + standbyTasks.stream().map(TaskIds::toString).collect(Collectors.joining(",")) +
", warmupTasks=" + warmupTasks.stream().map(TaskIds::toString).collect(Collectors.joining(",")) +
')';
}
/**
* All tasks for one subtopology of a member.
*/
public static class TaskIds {
private final String subtopologyId;
private final List<Integer> partitions;
public TaskIds(final String subtopologyId, final List<Integer> partitions) {
this.subtopologyId = Objects.requireNonNull(subtopologyId, "subtopologyId must be non-null");
this.partitions = Objects.requireNonNull(partitions, "partitions must be non-null");
}
/**
* The subtopology identifier.
*/
public String subtopologyId() {
return subtopologyId;
}
/**
* The partitions of the subtopology processed by this member.
*/
public List<Integer> partitions() {
return List.copyOf(partitions);
}
@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final TaskIds taskIds = (TaskIds) o;
return Objects.equals(subtopologyId, taskIds.subtopologyId)
&& Objects.equals(partitions, taskIds.partitions);
}
@Override
public int hashCode() {
return Objects.hash(
subtopologyId,
partitions
);
}
@Override
public String toString() {
return partitions.stream().map(x -> subtopologyId + "_" + x).collect(Collectors.joining(","));
}
}
}

View File

@ -0,0 +1,371 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
/**
* A detailed description of a single streams groups member in the cluster.
*/
@InterfaceStability.Evolving
public class StreamsGroupMemberDescription {
private final String memberId;
private final int memberEpoch;
private final Optional<String> instanceId;
private final Optional<String> rackId;
private final String clientId;
private final String clientHost;
private final int topologyEpoch;
private final String processId;
private final Optional<Endpoint> userEndpoint;
private final Map<String, String> clientTags;
private final List<TaskOffset> taskOffsets;
private final List<TaskOffset> taskEndOffsets;
private final StreamsGroupMemberAssignment assignment;
private final StreamsGroupMemberAssignment targetAssignment;
private final boolean isClassic;
@SuppressWarnings("ParameterNumber")
public StreamsGroupMemberDescription(
final String memberId,
final int memberEpoch,
final Optional<String> instanceId,
final Optional<String> rackId,
final String clientId,
final String clientHost,
final int topologyEpoch,
final String processId,
final Optional<Endpoint> userEndpoint,
final Map<String, String> clientTags,
final List<TaskOffset> taskOffsets,
final List<TaskOffset> taskEndOffsets,
final StreamsGroupMemberAssignment assignment,
final StreamsGroupMemberAssignment targetAssignment,
final boolean isClassic
) {
this.memberId = Objects.requireNonNull(memberId);
this.memberEpoch = memberEpoch;
this.instanceId = Objects.requireNonNull(instanceId);
this.rackId = Objects.requireNonNull(rackId);
this.clientId = Objects.requireNonNull(clientId);
this.clientHost = Objects.requireNonNull(clientHost);
this.topologyEpoch = topologyEpoch;
this.processId = Objects.requireNonNull(processId);
this.userEndpoint = Objects.requireNonNull(userEndpoint);
this.clientTags = Objects.requireNonNull(clientTags);
this.taskOffsets = Objects.requireNonNull(taskOffsets);
this.taskEndOffsets = Objects.requireNonNull(taskEndOffsets);
this.assignment = Objects.requireNonNull(assignment);
this.targetAssignment = Objects.requireNonNull(targetAssignment);
this.isClassic = isClassic;
}
/**
* The id of the group member.
*/
public String memberId() {
return memberId;
}
/**
* The epoch of the group member.
*/
public int memberEpoch() {
return memberEpoch;
}
/**
* The id of the instance, used for static membership, if available.
*/
public Optional<String> instanceId() {
return instanceId;
}
/**
* The rack ID of the group member.
*/
public Optional<String> rackId() {
return rackId;
}
/**
* The client ID of the group member.
*/
public String clientId() {
return clientId;
}
/**
* The host of the group member.
*/
public String clientHost() {
return clientHost;
}
/**
* The epoch of the topology present on the client.
*/
public int topologyEpoch() {
return topologyEpoch;
}
/**
* Identity of the streams instance that may have multiple clients.
*/
public String processId() {
return processId;
}
/**
* User-defined endpoint for Interactive Queries.
*/
public Optional<Endpoint> userEndpoint() {
return userEndpoint;
}
/**
* Used for rack-aware assignment algorithm.
*/
public Map<String, String> clientTags() {
return Map.copyOf(clientTags);
}
/**
* Cumulative offsets for tasks.
*/
public List<TaskOffset> taskOffsets() {
return List.copyOf(taskOffsets);
}
/**
* Cumulative task changelog end offsets for tasks.
*/
public List<TaskOffset> taskEndOffsets() {
return List.copyOf(taskEndOffsets);
}
/**
* The current assignment.
*/
public StreamsGroupMemberAssignment assignment() {
return assignment;
}
/**
* The target assignment.
*/
public StreamsGroupMemberAssignment targetAssignment() {
return targetAssignment;
}
/**
* The flag indicating whether a member is classic.
*/
public boolean isClassic() {
return isClassic;
}
@SuppressWarnings("CyclomaticComplexity")
@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final StreamsGroupMemberDescription that = (StreamsGroupMemberDescription) o;
return memberEpoch == that.memberEpoch
&& topologyEpoch == that.topologyEpoch
&& isClassic == that.isClassic
&& Objects.equals(memberId, that.memberId)
&& Objects.equals(instanceId, that.instanceId)
&& Objects.equals(rackId, that.rackId)
&& Objects.equals(clientId, that.clientId)
&& Objects.equals(clientHost, that.clientHost)
&& Objects.equals(processId, that.processId)
&& Objects.equals(userEndpoint, that.userEndpoint)
&& Objects.equals(clientTags, that.clientTags)
&& Objects.equals(taskOffsets, that.taskOffsets)
&& Objects.equals(taskEndOffsets, that.taskEndOffsets)
&& Objects.equals(assignment, that.assignment)
&& Objects.equals(targetAssignment, that.targetAssignment);
}
@Override
public int hashCode() {
return Objects.hash(
memberId,
memberEpoch,
instanceId,
rackId,
clientId,
clientHost,
topologyEpoch,
processId,
userEndpoint,
clientTags,
taskOffsets,
taskEndOffsets,
assignment,
targetAssignment,
isClassic
);
}
@Override
public String toString() {
return "(" +
"memberId=" + memberId +
", memberEpoch=" + memberEpoch +
", instanceId=" + instanceId.orElse("null") +
", rackId=" + rackId.orElse("null") +
", clientId=" + clientId +
", clientHost=" + clientHost +
", topologyEpoch=" + topologyEpoch +
", processId=" + processId +
", userEndpoint=" + userEndpoint.map(Endpoint::toString).orElse("null") +
", clientTags=" + clientTags +
", taskOffsets=" + taskOffsets.stream().map(TaskOffset::toString).collect(Collectors.joining(",")) +
", taskEndOffsets=" + taskEndOffsets.stream().map(TaskOffset::toString).collect(Collectors.joining(",")) +
", assignment=" + assignment +
", targetAssignment=" + targetAssignment +
", isClassic=" + isClassic +
')';
}
/**
* The user-defined endpoint for the member.
*/
public static class Endpoint {
private final String host;
private final int port;
public Endpoint(final String host, final int port) {
this.host = Objects.requireNonNull(host);
this.port = port;
}
public String host() {
return host;
}
public int port() {
return port;
}
@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final Endpoint endpoint = (Endpoint) o;
return port == endpoint.port && Objects.equals(host, endpoint.host);
}
@Override
public int hashCode() {
return Objects.hash(host, port);
}
@Override
public String toString() {
return "(" +
"host=" + host +
", port=" + port +
')';
}
}
/**
* The cumulative offset for one task.
*/
public static class TaskOffset {
private final String subtopologyId;
private final int partition;
private final long offset;
public TaskOffset(final String subtopologyId, final int partition, final long offset) {
this.subtopologyId = Objects.requireNonNull(subtopologyId);
this.partition = partition;
this.offset = offset;
}
/**
* The subtopology identifier.
*/
public String subtopologyId() {
return subtopologyId;
}
/**
* The partition of the task.
*/
public int partition() {
return partition;
}
/**
* The cumulative offset (sum of offsets in all input partitions).
*/
public long offset() {
return offset;
}
@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final TaskOffset that = (TaskOffset) o;
return partition == that.partition
&& offset == that.offset
&& Objects.equals(subtopologyId, that.subtopologyId);
}
@Override
public int hashCode() {
return Objects.hash(
subtopologyId,
partition,
offset
);
}
@Override
public String toString() {
return subtopologyId +
"_" + partition +
"=" + offset;
}
}
}

View File

@ -0,0 +1,196 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
/**
* A detailed description of a subtopology in a streams group.
*/
@InterfaceStability.Evolving
public class StreamsGroupSubtopologyDescription {
private final String subtopologyId;
private final List<String> sourceTopics;
private final List<String> repartitionSinkTopics;
private final Map<String, TopicInfo> stateChangelogTopics;
private final Map<String, TopicInfo> repartitionSourceTopics;
public StreamsGroupSubtopologyDescription(
final String subtopologyId,
final List<String> sourceTopics,
final List<String> repartitionSinkTopics,
final Map<String, TopicInfo> stateChangelogTopics,
final Map<String, TopicInfo> repartitionSourceTopics
) {
this.subtopologyId = Objects.requireNonNull(subtopologyId, "subtopologyId must be non-null");
this.sourceTopics = Objects.requireNonNull(sourceTopics, "sourceTopics must be non-null");
this.repartitionSinkTopics = Objects.requireNonNull(repartitionSinkTopics, "repartitionSinkTopics must be non-null");
this.stateChangelogTopics = Objects.requireNonNull(stateChangelogTopics, "stateChangelogTopics must be non-null");
this.repartitionSourceTopics = Objects.requireNonNull(repartitionSourceTopics, "repartitionSourceTopics must be non-null");
}
/**
* String to uniquely identify the subtopology.
*/
public String subtopologyId() {
return subtopologyId;
}
/**
* The topics the topology reads from.
*/
public List<String> sourceTopics() {
return List.copyOf(sourceTopics);
}
/**
* The repartition topics the topology writes to.
*/
public List<String> repartitionSinkTopics() {
return List.copyOf(repartitionSinkTopics);
}
/**
* The set of state changelog topics associated with this subtopology.
*/
public Map<String, TopicInfo> stateChangelogTopics() {
return Map.copyOf(stateChangelogTopics);
}
/**
* The set of source topics that are internally created repartition topics.
*/
public Map<String, TopicInfo> repartitionSourceTopics() {
return Map.copyOf(repartitionSourceTopics);
}
@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final StreamsGroupSubtopologyDescription that = (StreamsGroupSubtopologyDescription) o;
return Objects.equals(subtopologyId, that.subtopologyId)
&& Objects.equals(sourceTopics, that.sourceTopics)
&& Objects.equals(repartitionSinkTopics, that.repartitionSinkTopics)
&& Objects.equals(stateChangelogTopics, that.stateChangelogTopics)
&& Objects.equals(repartitionSourceTopics, that.repartitionSourceTopics);
}
@Override
public int hashCode() {
return Objects.hash(
subtopologyId,
sourceTopics,
repartitionSinkTopics,
stateChangelogTopics,
repartitionSourceTopics
);
}
@Override
public String toString() {
return "(" +
"subtopologyId='" + subtopologyId + '\'' +
", sourceTopics=" + sourceTopics +
", repartitionSinkTopics=" + repartitionSinkTopics +
", stateChangelogTopics=" + stateChangelogTopics +
", repartitionSourceTopics=" + repartitionSourceTopics +
')';
}
/**
* Information about a topic. These configs reflect what is required by the topology, but may differ from the current state on the
* broker.
*/
public static class TopicInfo {
private final int partitions;
private final int replicationFactor;
private final Map<String, String> topicConfigs;
public TopicInfo(final int partitions, final int replicationFactor, final Map<String, String> topicConfigs) {
this.partitions = partitions;
this.replicationFactor = replicationFactor;
this.topicConfigs = Objects.requireNonNull(topicConfigs, "topicConfigs must be non-null");
}
/**
* The number of partitions in the topic. Can be 0 if no specific number of partitions is enforced.
*/
public int partitions() {
return partitions;
}
/**
* The replication factor of the topic. Can be 0 if the default replication factor is used.
*/
public int replicationFactor() {
return replicationFactor;
}
/**
* Topic-level configurations as key-value pairs. Default configuration can be omitted.
*/
public Map<String, String> topicConfigs() {
return Map.copyOf(topicConfigs);
}
@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final TopicInfo topicInfo = (TopicInfo) o;
return partitions == topicInfo.partitions
&& replicationFactor == topicInfo.replicationFactor
&& Objects.equals(topicConfigs, topicInfo.topicConfigs);
}
@Override
public int hashCode() {
return Objects.hash(
partitions,
replicationFactor,
topicConfigs
);
}
@Override
public String toString() {
return "TopicInfo(" +
"partitions=" + partitions +
", replicationFactor=" + replicationFactor +
", topicConfigs=" + topicConfigs.entrySet().stream().map(x -> x.getKey() + "=" + x.getValue())
.collect(Collectors.joining(",")) +
')';
}
}
}

View File

@ -0,0 +1,284 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin.internals;
import org.apache.kafka.clients.admin.StreamsGroupDescription;
import org.apache.kafka.clients.admin.StreamsGroupMemberAssignment;
import org.apache.kafka.clients.admin.StreamsGroupMemberDescription;
import org.apache.kafka.clients.admin.StreamsGroupSubtopologyDescription;
import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.message.StreamsGroupDescribeRequestData;
import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
import org.apache.kafka.common.requests.StreamsGroupDescribeRequest;
import org.apache.kafka.common.requests.StreamsGroupDescribeResponse;
import org.apache.kafka.common.utils.LogContext;
import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.kafka.clients.admin.internals.AdminUtils.validAclOperations;
public class DescribeStreamsGroupsHandler extends AdminApiHandler.Batched<CoordinatorKey, StreamsGroupDescription> {
private final boolean includeAuthorizedOperations;
private final Logger log;
private final AdminApiLookupStrategy<CoordinatorKey> lookupStrategy;
public DescribeStreamsGroupsHandler(
boolean includeAuthorizedOperations,
LogContext logContext) {
this.includeAuthorizedOperations = includeAuthorizedOperations;
this.log = logContext.logger(DescribeStreamsGroupsHandler.class);
this.lookupStrategy = new CoordinatorStrategy(CoordinatorType.GROUP, logContext);
}
private static Set<CoordinatorKey> buildKeySet(Collection<String> groupIds) {
return groupIds.stream()
.map(CoordinatorKey::byGroupId)
.collect(Collectors.toSet());
}
public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, StreamsGroupDescription> newFuture(Collection<String> groupIds) {
return AdminApiFuture.forKeys(buildKeySet(groupIds));
}
@Override
public String apiName() {
return "describeStreamsGroups";
}
@Override
public AdminApiLookupStrategy<CoordinatorKey> lookupStrategy() {
return lookupStrategy;
}
@Override
public StreamsGroupDescribeRequest.Builder buildBatchedRequest(int coordinatorId, Set<CoordinatorKey> keys) {
List<String> groupIds = keys.stream().map(key -> {
if (key.type != CoordinatorType.GROUP) {
throw new IllegalArgumentException("Invalid group coordinator key " + key +
" when building `DescribeStreamsGroups` request");
}
return key.idValue;
}).collect(Collectors.toList());
StreamsGroupDescribeRequestData data = new StreamsGroupDescribeRequestData()
.setGroupIds(groupIds)
.setIncludeAuthorizedOperations(includeAuthorizedOperations);
return new StreamsGroupDescribeRequest.Builder(data, true);
}
@Override
public ApiResult<CoordinatorKey, StreamsGroupDescription> handleResponse(
Node coordinator,
Set<CoordinatorKey> groupIds,
AbstractResponse abstractResponse) {
final StreamsGroupDescribeResponse response = (StreamsGroupDescribeResponse) abstractResponse;
final Map<CoordinatorKey, StreamsGroupDescription> completed = new HashMap<>();
final Map<CoordinatorKey, Throwable> failed = new HashMap<>();
final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();
for (StreamsGroupDescribeResponseData.DescribedGroup describedGroup : response.data().groups()) {
CoordinatorKey groupIdKey = CoordinatorKey.byGroupId(describedGroup.groupId());
Errors error = Errors.forCode(describedGroup.errorCode());
if (error != Errors.NONE) {
handleError(groupIdKey, describedGroup, coordinator, error, describedGroup.errorMessage(), completed, failed, groupsToUnmap);
continue;
}
if (describedGroup.topology() == null) {
log.error("`DescribeStreamsGroups` response for group id {} is missing the topology information", groupIdKey.idValue);
failed.put(groupIdKey, new IllegalStateException("Topology information is missing"));
continue;
}
final Set<AclOperation> authorizedOperations = validAclOperations(describedGroup.authorizedOperations());
final StreamsGroupDescription streamsGroupDescription = new StreamsGroupDescription(
describedGroup.groupId(),
describedGroup.groupEpoch(),
describedGroup.assignmentEpoch(),
describedGroup.topology().epoch(),
convertSubtopologies(describedGroup.topology().subtopologies()),
convertMembers(describedGroup.members()),
GroupState.parse(describedGroup.groupState()),
coordinator,
authorizedOperations
);
completed.put(groupIdKey, streamsGroupDescription);
}
return new ApiResult<>(completed, failed, new ArrayList<>(groupsToUnmap));
}
private Collection<StreamsGroupMemberDescription> convertMembers(final List<StreamsGroupDescribeResponseData.Member> members) {
final List<StreamsGroupMemberDescription> memberDescriptions = new ArrayList<>(members.size());
members.forEach(groupMember ->
memberDescriptions.add(new StreamsGroupMemberDescription(
groupMember.memberId(),
groupMember.memberEpoch(),
Optional.ofNullable(groupMember.instanceId()),
Optional.ofNullable(groupMember.rackId()),
groupMember.clientId(),
groupMember.clientHost(),
groupMember.topologyEpoch(),
groupMember.processId(),
Optional.ofNullable(groupMember.userEndpoint()).map(this::convertEndpoint),
convertClientTags(groupMember.clientTags()),
convertTaskOffsets(groupMember.taskOffsets()),
convertTaskOffsets(groupMember.taskEndOffsets()),
convertAssignment(groupMember.assignment()),
convertAssignment(groupMember.targetAssignment()),
groupMember.isClassic()
))
);
return memberDescriptions;
}
private Collection<StreamsGroupSubtopologyDescription> convertSubtopologies(final List<StreamsGroupDescribeResponseData.Subtopology> subtopologies) {
final List<StreamsGroupSubtopologyDescription> subtopologyDescriptions = new ArrayList<>(subtopologies.size());
subtopologies.forEach(subtopology ->
subtopologyDescriptions.add(new StreamsGroupSubtopologyDescription(
subtopology.subtopologyId(),
subtopology.sourceTopics(),
subtopology.repartitionSinkTopics(),
convertTopicInfos(subtopology.stateChangelogTopics()),
convertTopicInfos(subtopology.repartitionSourceTopics())
))
);
return subtopologyDescriptions;
}
private Map<String, StreamsGroupSubtopologyDescription.TopicInfo> convertTopicInfos(final List<StreamsGroupDescribeResponseData.TopicInfo> topicInfos) {
return topicInfos.stream().collect(Collectors.toMap(
StreamsGroupDescribeResponseData.TopicInfo::name,
topicInfo -> new StreamsGroupSubtopologyDescription.TopicInfo(
topicInfo.partitions(),
topicInfo.replicationFactor(),
topicInfo.topicConfigs().stream().collect(Collectors.toMap(
StreamsGroupDescribeResponseData.KeyValue::key,
StreamsGroupDescribeResponseData.KeyValue::value
))
)
));
}
private StreamsGroupMemberAssignment.TaskIds convertTaskIds(final StreamsGroupDescribeResponseData.TaskIds taskIds) {
return new StreamsGroupMemberAssignment.TaskIds(
taskIds.subtopologyId(),
taskIds.partitions()
);
}
private StreamsGroupMemberAssignment convertAssignment(final StreamsGroupDescribeResponseData.Assignment assignment) {
return new StreamsGroupMemberAssignment(
assignment.activeTasks().stream().map(this::convertTaskIds).collect(Collectors.toList()),
assignment.standbyTasks().stream().map(this::convertTaskIds).collect(Collectors.toList()),
assignment.warmupTasks().stream().map(this::convertTaskIds).collect(Collectors.toList())
);
}
private List<StreamsGroupMemberDescription.TaskOffset> convertTaskOffsets(final List<StreamsGroupDescribeResponseData.TaskOffset> taskOffsets) {
return taskOffsets.stream().map(taskOffset ->
new StreamsGroupMemberDescription.TaskOffset(
taskOffset.subtopologyId(),
taskOffset.partition(),
taskOffset.offset()
)
).collect(Collectors.toList());
}
private Map<String, String> convertClientTags(final List<StreamsGroupDescribeResponseData.KeyValue> keyValues) {
return keyValues.stream().collect(Collectors.toMap(
StreamsGroupDescribeResponseData.KeyValue::key,
StreamsGroupDescribeResponseData.KeyValue::value
));
}
private StreamsGroupMemberDescription.Endpoint convertEndpoint(final StreamsGroupDescribeResponseData.Endpoint endpoint) {
return new StreamsGroupMemberDescription.Endpoint(endpoint.host(), endpoint.port());
}
private void handleError(
CoordinatorKey groupId,
StreamsGroupDescribeResponseData.DescribedGroup describedGroup,
Node coordinator,
Errors error,
String errorMsg,
Map<CoordinatorKey, StreamsGroupDescription> completed,
Map<CoordinatorKey, Throwable> failed,
Set<CoordinatorKey> groupsToUnmap) {
switch (error) {
case GROUP_AUTHORIZATION_FAILED:
log.debug("`DescribeStreamsGroups` request for group id {} failed due to error {}", groupId.idValue, error);
failed.put(groupId, error.exception(errorMsg));
break;
case COORDINATOR_LOAD_IN_PROGRESS:
// If the coordinator is in the middle of loading, then we just need to retry
log.debug("`DescribeStreamsGroups` request for group id {} failed because the coordinator " +
"is still in the process of loading state. Will retry", groupId.idValue);
break;
case COORDINATOR_NOT_AVAILABLE:
case NOT_COORDINATOR:
// If the coordinator is unavailable or there was a coordinator change, then we unmap
// the key so that we retry the `FindCoordinator` request
log.debug("`DescribeStreamsGroups` request for group id {} returned error {}. " +
"Will attempt to find the coordinator again and retry", groupId.idValue, error);
groupsToUnmap.add(groupId);
break;
case GROUP_ID_NOT_FOUND:
// In order to maintain compatibility with describeConsumerGroups, an unknown group ID is
// reported as a DEAD streams group, and the admin client operation did not fail
log.debug("`DescribeStreamsGroups` request for group id {} failed because the group does not exist. {}",
groupId.idValue, errorMsg != null ? errorMsg : "");
final StreamsGroupDescription streamsGroupDescription =
new StreamsGroupDescription(
groupId.idValue,
-1,
-1,
-1,
Collections.emptySet(),
Collections.emptySet(),
GroupState.DEAD,
coordinator,
validAclOperations(describedGroup.authorizedOperations()));
completed.put(groupId, streamsGroupDescription);
break;
default:
log.error("`DescribeStreamsGroups` request for group id {} failed due to unexpected error {}", groupId.idValue, error);
failed.put(groupId, error.exception(errorMsg));
}
}
}

View File

@ -160,6 +160,7 @@ import org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequest
import org.apache.kafka.common.message.RemoveRaftVoterRequestData;
import org.apache.kafka.common.message.RemoveRaftVoterResponseData;
import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
import org.apache.kafka.common.message.UnregisterBrokerResponseData;
import org.apache.kafka.common.message.WriteTxnMarkersResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
@ -237,6 +238,7 @@ import org.apache.kafka.common.requests.RemoveRaftVoterRequest;
import org.apache.kafka.common.requests.RemoveRaftVoterResponse;
import org.apache.kafka.common.requests.RequestTestUtils;
import org.apache.kafka.common.requests.ShareGroupDescribeResponse;
import org.apache.kafka.common.requests.StreamsGroupDescribeResponse;
import org.apache.kafka.common.requests.UnregisterBrokerResponse;
import org.apache.kafka.common.requests.UpdateFeaturesRequest;
import org.apache.kafka.common.requests.UpdateFeaturesResponse;
@ -5762,6 +5764,233 @@ public class KafkaAdminClientTest {
}
}
@Test
public void testDescribeStreamsGroups() throws Exception {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
// Retriable FindCoordinatorResponse errors should be retried
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode()));
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Node.noNode()));
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
StreamsGroupDescribeResponseData data = new StreamsGroupDescribeResponseData();
// Retriable errors should be retried
data.groups().add(new StreamsGroupDescribeResponseData.DescribedGroup()
.setGroupId(GROUP_ID)
.setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()));
env.kafkaClient().prepareResponse(new StreamsGroupDescribeResponse(data));
// We need to return two responses here, one with NOT_COORDINATOR error when calling describe streams group
// api using coordinator that has moved. This will retry whole operation. So we need to again respond with a
// FindCoordinatorResponse.
//
// And the same reason for COORDINATOR_NOT_AVAILABLE error response
data = new StreamsGroupDescribeResponseData();
data.groups().add(new StreamsGroupDescribeResponseData.DescribedGroup()
.setGroupId(GROUP_ID)
.setErrorCode(Errors.NOT_COORDINATOR.code()));
env.kafkaClient().prepareResponse(new StreamsGroupDescribeResponse(data));
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
data = new StreamsGroupDescribeResponseData();
data.groups().add(new StreamsGroupDescribeResponseData.DescribedGroup()
.setGroupId(GROUP_ID)
.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()));
env.kafkaClient().prepareResponse(new StreamsGroupDescribeResponse(data));
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
data = makeFullStreamsGroupDescribeResponse();
env.kafkaClient().prepareResponse(new StreamsGroupDescribeResponse(data));
final DescribeStreamsGroupsResult result = env.adminClient().describeStreamsGroups(singletonList(GROUP_ID));
final StreamsGroupDescription groupDescription = result.describedGroups().get(GROUP_ID).get();
final String subtopologyId = "my_subtopology";
StreamsGroupMemberAssignment.TaskIds expectedActiveTasks1 =
new StreamsGroupMemberAssignment.TaskIds(subtopologyId, asList(0, 1, 2));
StreamsGroupMemberAssignment.TaskIds expectedStandbyTasks1 =
new StreamsGroupMemberAssignment.TaskIds(subtopologyId, asList(3, 4, 5));
StreamsGroupMemberAssignment.TaskIds expectedWarmupTasks1 =
new StreamsGroupMemberAssignment.TaskIds(subtopologyId, asList(6, 7, 8));
StreamsGroupMemberAssignment.TaskIds expectedActiveTasks2 =
new StreamsGroupMemberAssignment.TaskIds(subtopologyId, asList(3, 4, 5));
StreamsGroupMemberAssignment.TaskIds expectedStandbyTasks2 =
new StreamsGroupMemberAssignment.TaskIds(subtopologyId, asList(6, 7, 8));
StreamsGroupMemberAssignment.TaskIds expectedWarmupTasks2 =
new StreamsGroupMemberAssignment.TaskIds(subtopologyId, asList(0, 1, 2));
StreamsGroupMemberAssignment expectedMemberAssignment = new StreamsGroupMemberAssignment(
singletonList(expectedActiveTasks1),
singletonList(expectedStandbyTasks1),
singletonList(expectedWarmupTasks1)
);
StreamsGroupMemberAssignment expectedTargetAssignment = new StreamsGroupMemberAssignment(
singletonList(expectedActiveTasks2),
singletonList(expectedStandbyTasks2),
singletonList(expectedWarmupTasks2)
);
final String instanceId = "instance-id";
final String rackId = "rack-id";
StreamsGroupMemberDescription expectedMemberOne = new StreamsGroupMemberDescription(
"0",
1,
Optional.of(instanceId),
Optional.of(rackId),
"clientId0",
"clientHost",
0,
"processId",
Optional.of(new StreamsGroupMemberDescription.Endpoint("localhost", 8080)),
Collections.singletonMap("key", "value"),
Collections.singletonList(new StreamsGroupMemberDescription.TaskOffset(subtopologyId, 0, 0)),
Collections.singletonList(new StreamsGroupMemberDescription.TaskOffset(subtopologyId, 0, 1)),
expectedMemberAssignment,
expectedTargetAssignment,
true
);
StreamsGroupMemberDescription expectedMemberTwo = new StreamsGroupMemberDescription(
"1",
2,
Optional.empty(),
Optional.empty(),
"clientId1",
"clientHost",
1,
"processId2",
Optional.empty(),
Collections.emptyMap(),
Collections.emptyList(),
Collections.emptyList(),
new StreamsGroupMemberAssignment(Collections.emptyList(), Collections.emptyList(), Collections.emptyList()),
new StreamsGroupMemberAssignment(Collections.emptyList(), Collections.emptyList(), Collections.emptyList()),
false
);
StreamsGroupSubtopologyDescription expectedSubtopologyDescription = new StreamsGroupSubtopologyDescription(
subtopologyId,
Collections.singletonList("my_source_topic"),
Collections.singletonList("my_repartition_sink_topic"),
Collections.singletonMap(
"my_changelog_topic",
new StreamsGroupSubtopologyDescription.TopicInfo(
0,
(short) 3,
Collections.singletonMap("key1", "value1")
)
),
Collections.singletonMap(
"my_repartition_topic",
new StreamsGroupSubtopologyDescription.TopicInfo(
99,
(short) 0,
Collections.emptyMap()
)
)
);
assertEquals(1, result.describedGroups().size());
assertEquals(GROUP_ID, groupDescription.groupId());
assertEquals(2, groupDescription.members().size());
Iterator<StreamsGroupMemberDescription> members = groupDescription.members().iterator();
assertEquals(expectedMemberOne, members.next());
assertEquals(expectedMemberTwo, members.next());
assertEquals(1, groupDescription.subtopologies().size());
assertEquals(expectedSubtopologyDescription, groupDescription.subtopologies().iterator().next());
assertEquals(2, groupDescription.groupEpoch());
assertEquals(1, groupDescription.targetAssignmentEpoch());
}
}
@Test
public void testDescribeStreamsGroupsWithAuthorizedOperationsOmitted() throws Exception {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(
prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
StreamsGroupDescribeResponseData data = makeFullStreamsGroupDescribeResponse();
data.groups().iterator().next()
.setAuthorizedOperations(MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED);
env.kafkaClient().prepareResponse(new StreamsGroupDescribeResponse(data));
final DescribeStreamsGroupsResult result = env.adminClient().describeStreamsGroups(singletonList(GROUP_ID));
final StreamsGroupDescription groupDescription = result.describedGroups().get(GROUP_ID).get();
assertNull(groupDescription.authorizedOperations());
}
}
@Test
public void testDescribeMultipleStreamsGroups() {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
StreamsGroupDescribeResponseData.TaskIds activeTasks = new StreamsGroupDescribeResponseData.TaskIds()
.setSubtopologyId("my_subtopology")
.setPartitions(asList(0, 1, 2));
StreamsGroupDescribeResponseData.TaskIds standbyTasks = new StreamsGroupDescribeResponseData.TaskIds()
.setSubtopologyId("my_subtopology")
.setPartitions(asList(3, 4, 5));
StreamsGroupDescribeResponseData.TaskIds warmupTasks = new StreamsGroupDescribeResponseData.TaskIds()
.setSubtopologyId("my_subtopology")
.setPartitions(asList(6, 7, 8));
final StreamsGroupDescribeResponseData.Assignment memberAssignment = new StreamsGroupDescribeResponseData.Assignment()
.setActiveTasks(singletonList(activeTasks))
.setStandbyTasks(singletonList(standbyTasks))
.setWarmupTasks(singletonList(warmupTasks));
StreamsGroupDescribeResponseData group0Data = new StreamsGroupDescribeResponseData();
group0Data.groups().add(new StreamsGroupDescribeResponseData.DescribedGroup()
.setGroupId(GROUP_ID)
.setGroupState(GroupState.STABLE.toString())
.setMembers(asList(
new StreamsGroupDescribeResponseData.Member()
.setMemberId("0")
.setClientId("clientId0")
.setClientHost("clientHost")
.setAssignment(memberAssignment),
new StreamsGroupDescribeResponseData.Member()
.setMemberId("1")
.setClientId("clientId1")
.setClientHost("clientHost")
.setAssignment(memberAssignment))));
StreamsGroupDescribeResponseData group1Data = new StreamsGroupDescribeResponseData();
group1Data.groups().add(new StreamsGroupDescribeResponseData.DescribedGroup()
.setGroupId("group-1")
.setGroupState(GroupState.STABLE.toString())
.setMembers(asList(
new StreamsGroupDescribeResponseData.Member()
.setMemberId("0")
.setClientId("clientId0")
.setClientHost("clientHost")
.setAssignment(memberAssignment),
new StreamsGroupDescribeResponseData.Member()
.setMemberId("1")
.setClientId("clientId1")
.setClientHost("clientHost")
.setAssignment(memberAssignment))));
env.kafkaClient().prepareResponse(new StreamsGroupDescribeResponse(group0Data));
env.kafkaClient().prepareResponse(new StreamsGroupDescribeResponse(group1Data));
Collection<String> groups = new HashSet<>();
groups.add(GROUP_ID);
groups.add("group-1");
final DescribeStreamsGroupsResult result = env.adminClient().describeStreamsGroups(groups);
assertEquals(2, result.describedGroups().size());
assertEquals(groups, result.describedGroups().keySet());
}
}
@Test
public void testDescribeShareGroups() throws Exception {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
@ -10281,4 +10510,116 @@ public class KafkaAdminClientTest {
assertNull(result.partitionResult(barPartition0).get());
}
}
private static StreamsGroupDescribeResponseData makeFullStreamsGroupDescribeResponse() {
StreamsGroupDescribeResponseData data;
StreamsGroupDescribeResponseData.TaskIds activeTasks1 = new StreamsGroupDescribeResponseData.TaskIds()
.setSubtopologyId("my_subtopology")
.setPartitions(asList(0, 1, 2));
StreamsGroupDescribeResponseData.TaskIds standbyTasks1 = new StreamsGroupDescribeResponseData.TaskIds()
.setSubtopologyId("my_subtopology")
.setPartitions(asList(3, 4, 5));
StreamsGroupDescribeResponseData.TaskIds warmupTasks1 = new StreamsGroupDescribeResponseData.TaskIds()
.setSubtopologyId("my_subtopology")
.setPartitions(asList(6, 7, 8));
StreamsGroupDescribeResponseData.TaskIds activeTasks2 = new StreamsGroupDescribeResponseData.TaskIds()
.setSubtopologyId("my_subtopology")
.setPartitions(asList(3, 4, 5));
StreamsGroupDescribeResponseData.TaskIds standbyTasks2 = new StreamsGroupDescribeResponseData.TaskIds()
.setSubtopologyId("my_subtopology")
.setPartitions(asList(6, 7, 8));
StreamsGroupDescribeResponseData.TaskIds warmupTasks2 = new StreamsGroupDescribeResponseData.TaskIds()
.setSubtopologyId("my_subtopology")
.setPartitions(asList(0, 1, 2));
StreamsGroupDescribeResponseData.Assignment memberAssignment = new StreamsGroupDescribeResponseData.Assignment()
.setActiveTasks(singletonList(activeTasks1))
.setStandbyTasks(singletonList(standbyTasks1))
.setWarmupTasks(singletonList(warmupTasks1));
StreamsGroupDescribeResponseData.Assignment targetAssignment = new StreamsGroupDescribeResponseData.Assignment()
.setActiveTasks(singletonList(activeTasks2))
.setStandbyTasks(singletonList(standbyTasks2))
.setWarmupTasks(singletonList(warmupTasks2));
StreamsGroupDescribeResponseData.Member memberOne = new StreamsGroupDescribeResponseData.Member()
.setMemberId("0")
.setMemberEpoch(1)
.setInstanceId("instance-id")
.setRackId("rack-id")
.setClientId("clientId0")
.setClientHost("clientHost")
.setTopologyEpoch(0)
.setProcessId("processId")
.setUserEndpoint(new StreamsGroupDescribeResponseData.Endpoint()
.setHost("localhost")
.setPort(8080)
)
.setClientTags(Collections.singletonList(new StreamsGroupDescribeResponseData.KeyValue()
.setKey("key")
.setValue("value")
))
.setTaskOffsets(Collections.singletonList(new StreamsGroupDescribeResponseData.TaskOffset()
.setSubtopologyId("my_subtopology")
.setPartition(0)
.setOffset(0)
))
.setTaskEndOffsets(Collections.singletonList(new StreamsGroupDescribeResponseData.TaskOffset()
.setSubtopologyId("my_subtopology")
.setPartition(0)
.setOffset(1)
))
.setAssignment(memberAssignment)
.setTargetAssignment(targetAssignment)
.setIsClassic(true);
StreamsGroupDescribeResponseData.Member memberTwo = new StreamsGroupDescribeResponseData.Member()
.setMemberId("1")
.setMemberEpoch(2)
.setInstanceId(null)
.setRackId(null)
.setClientId("clientId1")
.setClientHost("clientHost")
.setTopologyEpoch(1)
.setProcessId("processId2")
.setUserEndpoint(null)
.setClientTags(Collections.emptyList())
.setTaskOffsets(Collections.emptyList())
.setTaskEndOffsets(Collections.emptyList())
.setAssignment(new StreamsGroupDescribeResponseData.Assignment())
.setTargetAssignment(new StreamsGroupDescribeResponseData.Assignment())
.setIsClassic(false);
StreamsGroupDescribeResponseData.Subtopology subtopologyDescription = new StreamsGroupDescribeResponseData.Subtopology()
.setSubtopologyId("my_subtopology")
.setSourceTopics(Collections.singletonList("my_source_topic"))
.setRepartitionSinkTopics(Collections.singletonList("my_repartition_sink_topic"))
.setStateChangelogTopics(Collections.singletonList(
new StreamsGroupDescribeResponseData.TopicInfo()
.setName("my_changelog_topic")
.setPartitions(0)
.setReplicationFactor((short) 3)
.setTopicConfigs(Collections.singletonList(new StreamsGroupDescribeResponseData.KeyValue()
.setKey("key1")
.setValue("value1")
))
))
.setRepartitionSourceTopics(Collections.singletonList(
new StreamsGroupDescribeResponseData.TopicInfo()
.setName("my_repartition_topic")
.setPartitions(99)
.setReplicationFactor((short) 0)
.setTopicConfigs(Collections.emptyList())
));
data = new StreamsGroupDescribeResponseData();
data.groups().add(new StreamsGroupDescribeResponseData.DescribedGroup()
.setGroupId(GROUP_ID)
.setGroupState(GroupState.STABLE.toString())
.setMembers(asList(memberOne, memberTwo))
.setTopology(new StreamsGroupDescribeResponseData.Topology()
.setEpoch(1)
.setSubtopologies(Collections.singletonList(subtopologyDescription))
)
.setGroupEpoch(2)
.setAssignmentEpoch(1));
return data;
}
}

View File

@ -1429,6 +1429,11 @@ public class MockAdminClient extends AdminClient {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public synchronized DescribeStreamsGroupsResult describeStreamsGroups(Collection<String> groupIds, DescribeStreamsGroupsOptions options) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public synchronized DescribeClassicGroupsResult describeClassicGroups(Collection<String> groupIds, DescribeClassicGroupsOptions options) {
throw new UnsupportedOperationException("Not implemented yet");

View File

@ -91,6 +91,8 @@ import org.apache.kafka.clients.admin.DescribeReplicaLogDirsOptions;
import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult;
import org.apache.kafka.clients.admin.DescribeShareGroupsOptions;
import org.apache.kafka.clients.admin.DescribeShareGroupsResult;
import org.apache.kafka.clients.admin.DescribeStreamsGroupsOptions;
import org.apache.kafka.clients.admin.DescribeStreamsGroupsResult;
import org.apache.kafka.clients.admin.DescribeTopicsOptions;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.DescribeTransactionsOptions;
@ -449,6 +451,11 @@ public class TestingMetricsInterceptingAdminClient extends AdminClient {
return adminDelegate.describeShareGroups(groupIds, options);
}
@Override
public DescribeStreamsGroupsResult describeStreamsGroups(final Collection<String> groupIds, final DescribeStreamsGroupsOptions options) {
return adminDelegate.describeStreamsGroups(groupIds, options);
}
@Override
public AlterShareGroupOffsetsResult alterShareGroupOffsets(final String groupId, final Map<TopicPartition, Long> offsets, final AlterShareGroupOffsetsOptions options) {
return adminDelegate.alterShareGroupOffsets(groupId, offsets, options);